diff options
author | Araq <rumpf_a@web.de> | 2011-07-09 01:18:33 +0200 |
---|---|---|
committer | Araq <rumpf_a@web.de> | 2011-07-09 01:18:33 +0200 |
commit | 2565ff8ddec9fcf43fbda2fae6f04806c1bc6e8a (patch) | |
tree | 8ab9b57efd41ac133eaf29b0a751d42cce1f66df | |
parent | 99bcc233cd8fb3bb9b6f3f0857e477dd9b33c9e8 (diff) | |
download | Nim-2565ff8ddec9fcf43fbda2fae6f04806c1bc6e8a.tar.gz |
basic message passing working
-rwxr-xr-x | lib/system.nim | 7 | ||||
-rw-r--r-- | lib/system/inboxes.nim | 27 | ||||
-rwxr-xr-x | lib/system/threads.nim | 37 | ||||
-rwxr-xr-x | todo.txt | 2 |
4 files changed, 56 insertions, 17 deletions
diff --git a/lib/system.nim b/lib/system.nim index 5c7102664..6f20a9b4d 100755 --- a/lib/system.nim +++ b/lib/system.nim @@ -236,7 +236,7 @@ type EInvalidObjectAssignment* = object of ESynch ## is raised if an object gets assigned to its - ## farther's object. + ## parent's object. EInvalidObjectConversion* = object of ESynch ## is raised if an object is converted to an incompatible @@ -261,7 +261,10 @@ type ## that cannot be represented with infinite ## precision -- for example, 2.0 / 3.0, log(1.1) ## NOTE: Nimrod currently does not detect these! - + EDeadThread* = + object of ESynch ## is raised if it is attempted to send a message to a + ## dead thread. + TResult* = enum Failure, Success proc sizeof*[T](x: T): natural {.magic: "SizeOf", noSideEffect.} diff --git a/lib/system/inboxes.nim b/lib/system/inboxes.nim index 8f683f612..1f5d56c16 100644 --- a/lib/system/inboxes.nim +++ b/lib/system/inboxes.nim @@ -22,6 +22,8 @@ type PInbox = ptr TInbox TLoadStoreMode = enum mStore, mLoad +const ThreadDeadMask = -2 + proc initInbox(p: pointer) = var inbox = cast[PInbox](p) initSysLock(inbox.lock) @@ -30,6 +32,9 @@ proc initInbox(p: pointer) = proc freeInbox(p: pointer) = var inbox = cast[PInbox](p) + # we need to grab the lock to be save against sending threads! + acquireSys(inbox.lock) + inbox.mask = ThreadDeadMask deallocOsPages(inbox.region) deinitSys(inbox.lock) deinitSysCond(inbox.cond) @@ -158,7 +163,6 @@ proc rawSend(q: PInbox, data: pointer, typ: PNimType) = q.mask = cap*2 - 1 q.wr = q.count q.rd = 0 - #echo "came here" storeAux(addr(q.data[q.wr * typ.size]), data, typ, q, mStore) inc q.count q.wr = (q.wr + 1) and q.mask @@ -177,23 +181,34 @@ template lockInbox(q: expr, action: stmt) = proc send*[TMsg](receiver: var TThread[TMsg], msg: TMsg) = ## sends a message to a thread. `msg` is deeply copied. var q = cast[PInbox](getInBoxMem(receiver)) + if q.mask == ThreadDeadMask: + raise newException(EDeadThread, "cannot send message; thread died") acquireSys(q.lock) var m: TMsg shallowCopy(m, msg) - rawSend(q, addr(m), cast[PNimType](getTypeInfo(msg))) + var typ = cast[PNimType](getTypeInfo(msg)) + rawSend(q, addr(m), typ) + q.elemType = typ releaseSys(q.lock) SignalSysCond(q.cond) -proc recv*[TMsg](): TMsg = - ## receives a message from its internal message queue. This blocks until - ## a message has arrived! You may use ``peek`` to avoid the blocking. +proc llRecv(res: pointer, typ: PNimType) = + # to save space, the generic is as small as possible var q = cast[PInbox](getInBoxMem()) acquireSys(q.lock) while q.count <= 0: WaitSysCond(q.cond, q.lock) - rawRecv(q, addr(result), cast[PNimType](getTypeInfo(result))) + if typ != q.elemType: + releaseSys(q.lock) + raise newException(EInvalidValue, "cannot receive message of wrong type") + rawRecv(q, res, typ) releaseSys(q.lock) +proc recv*[TMsg](): TMsg = + ## receives a message from its internal message queue. This blocks until + ## a message has arrived! You may use ``peek`` to avoid the blocking. + llRecv(addr(result), cast[PNimType](getTypeInfo(result))) + proc peek*(): int = ## returns the current number of messages in the inbox. var q = cast[PInbox](getInBoxMem()) diff --git a/lib/system/threads.nim b/lib/system/threads.nim index 9bb67863b..bd361760d 100755 --- a/lib/system/threads.nim +++ b/lib/system/threads.nim @@ -249,7 +249,8 @@ when not defined(useNimRtl): type TThread* {.pure, final.}[TParam] = object of TGcThread ## Nimrod thread. - fn: proc (p: TParam) + emptyFn: proc () + dataFn: proc (p: TParam) data: TParam proc initInbox(p: pointer) @@ -268,14 +269,14 @@ template ThreadProcWrapperBody(closure: expr) = initGC() t.stackBottom = addr(t) registerThread(t) - initInbox(addr(t.inbox)) try: when false: var a = addr(tls) var b = MaskStackPointer(1293920-372736-303104-36864) c_fprintf(c_stdout, "TLS: %p\nmasked: %p\ndiff: %ld\n", a, b, cast[int](a) - cast[int](b)) - t.fn(t.data) + if t.emptyFn == nil: t.dataFn(t.data) + else: t.emptyFn() finally: # XXX shut-down is not executed when the thread is forced down! freeInbox(addr(t.inbox)) @@ -326,8 +327,28 @@ proc createThread*[TParam](t: var TThread[TParam], ## creates a new thread `t` and starts its execution. Entry point is the ## proc `tp`. `param` is passed to `tp`. t.data = param - t.fn = tp + t.dataFn = tp + t.stackSize = ThreadStackSize + initInbox(addr(t.inbox)) + when hostOS == "windows": + var dummyThreadId: int32 + t.sys = CreateThread(nil, ThreadStackSize, threadProcWrapper[TParam], + addr(t), 0'i32, dummyThreadId) + if t.sys <= 0: + raise newException(EResourceExhausted, "cannot create thread") + else: + var a: Tpthread_attr + pthread_attr_init(a) + pthread_attr_setstacksize(a, ThreadStackSize) + if pthread_create(t.sys, a, threadProcWrapper[TParam], addr(t)) != 0: + raise newException(EResourceExhausted, "cannot create thread") + +proc createThread*[TParam](t: var TThread[TParam], tp: proc () {.thread.}) = + ## creates a new thread `t` and starts its execution. Entry point is the + ## proc `tp`. + t.emptyFn = tp t.stackSize = ThreadStackSize + initInbox(addr(t.inbox)) when hostOS == "windows": var dummyThreadId: int32 t.sys = CreateThread(nil, ThreadStackSize, threadProcWrapper[TParam], @@ -342,9 +363,9 @@ proc createThread*[TParam](t: var TThread[TParam], raise newException(EResourceExhausted, "cannot create thread") when useStackMaskHack: - proc runMain(tp: proc (dummy: pointer) {.thread.}) {.compilerproc.} = + proc runMain(tp: proc () {.thread.}) {.compilerproc.} = var mainThread: TThread[pointer] - createThread(mainThread, tp, nil) + createThread(mainThread, tp) joinThread(mainThread) # --------------------------- lock handling ---------------------------------- @@ -462,9 +483,9 @@ proc Release*(lock: var TLock) = # ------------------------ message passing support --------------------------- -proc getInBoxMem*[TMsg](t: var TThread[TMsg]): pointer {.inline.} = +proc getInBoxMem[TMsg](t: var TThread[TMsg]): pointer {.inline.} = result = addr(t.inbox) -proc getInBoxMem*(): pointer {.inline.} = +proc getInBoxMem(): pointer {.inline.} = result = addr(cast[PGcThread](ThreadVarGetValue(globalsSlot)).inbox) diff --git a/todo.txt b/todo.txt index 710f9b8aa..3c9754e10 100755 --- a/todo.txt +++ b/todo.txt @@ -1,12 +1,12 @@ High priority (version 0.8.12) ============================== +* ``force_nosideEffect`` or some similar pragma is needed (``loophole``?) * test threads on windows * test thread analysis: var x = globalString # ok, copied; `x` is mine! vs var x = globalRef # read access, `x` is theirs! -* test message passing built-ins * make threadvar efficient again on linux after testing * document Nimrod's threads * document Nimrod's two phase symbol lookup for generics |