diff options
-rwxr-xr-x | doc/lib.txt | 2 | ||||
-rw-r--r-- | lib/core/locks.nim | 5 | ||||
-rwxr-xr-x | lib/pure/sockets.nim | 2 | ||||
-rwxr-xr-x | lib/system.nim | 2 | ||||
-rwxr-xr-x | lib/system/channels.nim | 151 | ||||
-rwxr-xr-x | lib/system/threads.nim | 94 | ||||
-rwxr-xr-x | todo.txt | 11 | ||||
-rwxr-xr-x | web/nimrod.ini | 2 |
8 files changed, 93 insertions, 176 deletions
diff --git a/doc/lib.txt b/doc/lib.txt index 7595028db..ddfaaf46c 100755 --- a/doc/lib.txt +++ b/doc/lib.txt @@ -35,7 +35,7 @@ Core Nimrod thread support. **Note**: This is part of the system module. Do not import it explicitely. -* `inboxes <inboxes.html>`_ +* `channels <channels.html>`_ Nimrod message passing support for threads. **Note**: This is part of the system module. Do not import it explicitely. diff --git a/lib/core/locks.nim b/lib/core/locks.nim index 1fffb8e0a..6f139c7a2 100644 --- a/lib/core/locks.nim +++ b/lib/core/locks.nim @@ -19,7 +19,8 @@ include "lib/system/syslocks" type TLock* = TSysLock ## Nimrod lock; whether this is re-entrant - ## or not is unspecified! + ## or not is unspecified! However, compilation + ## in preventDeadlocks-mode guarantees re-entrancy. TCond* = TSysCond ## Nimrod condition variable const @@ -50,7 +51,7 @@ proc DeinitLock*(lock: var TLock) {.inline.} = ## Frees the resources associated with the lock. DeinitSys(lock) -proc TryAcquire*(lock: var TLock): bool {.inline.} = +proc TryAcquire*(lock: var TLock): bool = ## Tries to acquire the given lock. Returns `true` on success. result = TryAcquireSys(lock) when noDeadlocks: diff --git a/lib/pure/sockets.nim b/lib/pure/sockets.nim index 958b3483b..f18cf2ba8 100755 --- a/lib/pure/sockets.nim +++ b/lib/pure/sockets.nim @@ -406,9 +406,11 @@ proc connectAsync*(socket: TSocket, name: string, port = TPort(0), var err = WSAGetLastError() # Windows EINTR doesn't behave same as POSIX. if err == WSAEWOULDBLOCK: + freeaddrinfo(aiList) return else: if errno == EINTR or errno == EINPROGRESS: + freeaddrinfo(aiList) return it = it.ai_next diff --git a/lib/system.nim b/lib/system.nim index 1d8d18e1f..2e754ece7 100755 --- a/lib/system.nim +++ b/lib/system.nim @@ -1808,7 +1808,7 @@ when not defined(EcmaScript) and not defined(NimrodVM): include "system/sysio" when hasThreadSupport: - include "system/inboxes" + include "system/channels" iterator lines*(filename: string): string = ## Iterate over any line in the file named `filename`. diff --git a/lib/system/channels.nim b/lib/system/channels.nim index d4a4b1eb4..e443dd6c1 100755 --- a/lib/system/channels.nim +++ b/lib/system/channels.nim @@ -7,7 +7,7 @@ # distribution, for details about the copyright. # -## Message passing for threads. **Note**: This is part of the system module. +## Channel support for threads. **Note**: This is part of the system module. ## Do not import it directly. To activate thread support you need to compile ## with the ``--threads:on`` command line switch. ## @@ -16,7 +16,7 @@ type pbytes = ptr array[0.. 0xffff, byte] - TInbox {.pure, final.} = object ## msg queue for a thread + TRawChannel {.pure, final.} = object ## msg queue for a thread rd, wr, count, mask: int data: pbytes lock: TSysLock @@ -24,28 +24,30 @@ type elemType: PNimType ready: bool region: TMemRegion - PInbox = ptr TInbox - TLoadStoreMode = enum mStore, mLoad + PRawChannel = ptr TRawChannel + TLoadStoreMode = enum mStore, mLoad + TChannel*[TMsg] = TRawChannel ## a channel for thread communication -const ThreadDeadMask = -2 +const ChannelDeadMask = -2 -proc initInbox(p: pointer) = - var inbox = cast[PInbox](p) - initSysLock(inbox.lock) - initSysCond(inbox.cond) - inbox.mask = -1 +proc initRawChannel(p: pointer) = + var c = cast[PRawChannel](p) + initSysLock(c.lock) + initSysCond(c.cond) + c.mask = -1 -proc freeInbox(p: pointer) = - var inbox = cast[PInbox](p) +proc deinitRawChannel(p: pointer) = + var c = cast[PRawChannel](p) # we need to grab the lock to be save against sending threads! - acquireSys(inbox.lock) - inbox.mask = ThreadDeadMask - deallocOsPages(inbox.region) - deinitSys(inbox.lock) - deinitSysCond(inbox.cond) - -proc storeAux(dest, src: Pointer, mt: PNimType, t: PInbox, mode: TLoadStoreMode) -proc storeAux(dest, src: Pointer, n: ptr TNimNode, t: PInbox, + acquireSys(c.lock) + c.mask = ChannelDeadMask + deallocOsPages(c.region) + deinitSys(c.lock) + deinitSysCond(c.cond) + +proc storeAux(dest, src: Pointer, mt: PNimType, t: PRawChannel, + mode: TLoadStoreMode) +proc storeAux(dest, src: Pointer, n: ptr TNimNode, t: PRawChannel, mode: TLoadStoreMode) = var d = cast[TAddress](dest) @@ -62,7 +64,7 @@ proc storeAux(dest, src: Pointer, n: ptr TNimNode, t: PInbox, if m != nil: storeAux(dest, src, m, t, mode) of nkNone: sysAssert(false) -proc storeAux(dest, src: Pointer, mt: PNimType, t: PInbox, +proc storeAux(dest, src: Pointer, mt: PNimType, t: PRawChannel, mode: TLoadStoreMode) = var d = cast[TAddress](dest) @@ -138,9 +140,9 @@ proc storeAux(dest, src: Pointer, mt: PNimType, t: PInbox, if mode == mStore: x[] = Alloc(t.region, mt.base.size) else: - # XXX we should use the dynamic type here too, but that is not stored in - # the inbox at all --> use source[]'s object type? but how? we need a - # tyRef to the object! + # XXX we should use the dynamic type here too, but that is not stored + # in the inbox at all --> use source[]'s object type? but how? we need + # a tyRef to the object! var obj = newObj(mt.base, mt.base.size) unsureAsgnRef(x, obj) storeAux(x[], s, mt.base, t, mode) @@ -148,7 +150,7 @@ proc storeAux(dest, src: Pointer, mt: PNimType, t: PInbox, else: copyMem(dest, src, mt.size) # copy raw bits -proc rawSend(q: PInbox, data: pointer, typ: PNimType) = +proc rawSend(q: PRawChannel, data: pointer, typ: PNimType) = ## adds an `item` to the end of the queue `q`. var cap = q.mask+1 if q.count >= cap: @@ -172,19 +174,19 @@ proc rawSend(q: PInbox, data: pointer, typ: PNimType) = inc q.count q.wr = (q.wr + 1) and q.mask -proc rawRecv(q: PInbox, data: pointer, typ: PNimType) = +proc rawRecv(q: PRawChannel, data: pointer, typ: PNimType) = assert q.count > 0 dec q.count storeAux(data, addr(q.data[q.rd * typ.size]), typ, q, mLoad) q.rd = (q.rd + 1) and q.mask -template lockInbox(q: expr, action: stmt) = +template lockChannel(q: expr, action: stmt) = acquireSys(q.lock) action releaseSys(q.lock) template sendImpl(q: expr) = - if q.mask == ThreadDeadMask: + if q.mask == ChannelDeadMask: raise newException(EDeadThread, "cannot send message; thread died") acquireSys(q.lock) var m: TMsg @@ -195,17 +197,12 @@ template sendImpl(q: expr) = releaseSys(q.lock) SignalSysCond(q.cond) -proc send*[TMsg](receiver: var TThread[TMsg], msg: TMsg) = - ## sends a message to a thread. `msg` is deeply copied. - var q = cast[PInbox](getInBoxMem(receiver)) - sendImpl(q) - -proc send*[TMsg](receiver: TThreadId[TMsg], msg: TMsg) = +proc send*[TMsg](c: var TChannel[TMsg], msg: TMsg) = ## sends a message to a thread. `msg` is deeply copied. - var q = cast[PInbox](getInBoxMem(receiver[])) + var q = cast[PRawChannel](addr(c)) sendImpl(q) -proc llRecv(q: PInbox, res: pointer, typ: PNimType) = +proc llRecv(q: PRawChannel, res: pointer, typ: PNimType) = # to save space, the generic is as small as possible acquireSys(q.lock) q.ready = true @@ -218,80 +215,30 @@ proc llRecv(q: PInbox, res: pointer, typ: PNimType) = rawRecv(q, res, typ) releaseSys(q.lock) -proc recv*[TMsg](): TMsg = - ## receives a message from its internal message queue. This blocks until +proc recv*[TMsg](c: var TChannel[TMsg]): TMsg = + ## receives a message from the channel `c`. This blocks until ## a message has arrived! You may use ``peek`` to avoid the blocking. - var q = cast[PInbox](getInBoxMem()) + var q = cast[PRawChannel](addr(c)) llRecv(q, addr(result), cast[PNimType](getTypeInfo(result))) -proc peek*(): int = - ## returns the current number of messages in the inbox. - var q = cast[PInbox](getInBoxMem()) - lockInbox(q): - result = q.count - -proc peek*[TMsg](t: var TThread[TMsg]): int = - ## returns the current number of messages in the inbox of thread `t`. - var q = cast[PInbox](getInBoxMem(t)) - if q.mask != ThreadDeadMask: - lockInbox(q): +proc peek*[TMsg](c: var TChannel[TMsg]): int = + ## returns the current number of messages in the channel `c`. + var q = cast[PRawChannel](addr(c)) + if q.mask != ChannelDeadMask: + lockChannel(q): result = q.count -proc ready*[TMsg](t: var TThread[TMsg]): bool = - ## returns true iff the thread `t` is waiting on ``recv`` for new messages. - var q = cast[PInbox](getInBoxMem(t)) - result = q.ready - -# ---------------------- channel support ------------------------------------- - -type - TChannel*[TMsg] = TInbox ## a channel for thread communication - TChannelId*[TMsg] = ptr TChannel[TMsg] ## the current implementation uses - ## a pointer as a channel ID. - proc open*[TMsg](c: var TChannel[TMsg]) = ## opens a channel `c` for inter thread communication. - initInbox(addr(c)) + initRawChannel(addr(c)) proc close*[TMsg](c: var TChannel[TMsg]) = ## closes a channel `c` and frees its associated resources. - freeInbox(addr(c)) - -proc channelId*[TMsg](c: var TChannel[TMsg]): TChannelId[TMsg] {.inline.} = - ## returns the channel ID of `c`. - result = addr(c) - -proc send*[TMsg](c: var TChannel[TMsg], msg: TMsg) = - ## sends a message to a channel. `msg` is deeply copied. - var q = cast[PInbox](addr(c)) - sendImpl(q) - -proc send*[TMsg](c: TChannelId[TMsg], msg: TMsg) = - ## sends a message to a thread. `msg` is deeply copied. - var q = cast[PInbox](c) - sendImpl(q) - -proc peek*[TMsg](c: var TChannel[TMsg]): int = - ## returns the current number of messages in the channel `c`. - var q = cast[PInbox](addr(c)) - lockInbox(q): - result = q.count - -proc peek*[TMsg](c: TChannelId[TMsg]): int = - ## returns the current number of messages in the channel `c`. - var q = cast[PInbox](c) - lockInbox(q): - result = q.count - -proc recv*[TMsg](c: TChannelId[TMsg]): TMsg = - ## receives a message from the channel `c`. This blocks until - ## a message has arrived! You may use ``peek`` to avoid the blocking. - var q = cast[PInbox](c) - llRecv(q, addr(result), cast[PNimType](getTypeInfo(result))) - -proc recv*[TMsg](c: var TChannel[TMsg]): TMsg = - ## receives a message from the channel `c`. This blocks until - ## a message has arrived! You may use ``peek`` to avoid the blocking. - var q = cast[PInbox](addr(c)) - llRecv(q, addr(result), cast[PNimType](getTypeInfo(result))) + deinitRawChannel(addr(c)) +proc ready*[TMsg](c: var TChannel[TMsg]): bool = + ## returns true iff some thread is waiting on the channel `c` for + ## new messages. + var q = cast[PRawChannel](addr(c)) + result = q.ready + diff --git a/lib/system/threads.nim b/lib/system/threads.nim index 4ec1239a9..2079762f8 100755 --- a/lib/system/threads.nim +++ b/lib/system/threads.nim @@ -167,7 +167,6 @@ type PGcThread = ptr TGcThread TGcThread {.pure.} = object sys: TSysThread - inbox: TThreadLocalStorage when emulatedThreadVars and not useStackMaskHack: tls: TThreadLocalStorage else: @@ -249,27 +248,22 @@ when not defined(useNimRtl): # GC'ed closures in Nimrod. type - TThread* {.pure, final.}[TMsg] = + TThread* {.pure, final.}[TArg] = object of TGcThread ## Nimrod thread. A thread is a heavy object (~14K) ## that **must not** be part of a message! Use ## a ``TThreadId`` for that. - emptyFn: proc () - dataFn: proc (m: TMsg) - data: TMsg - TThreadId*[TMsg] = ptr TThread[TMsg] ## the current implementation uses + dataFn: proc (m: TArg) + when TArg isnot void: + data: TArg + TThreadId*[TArg] = ptr TThread[TArg] ## the current implementation uses ## a pointer as a thread ID. -proc initInbox(p: pointer) -proc freeInbox(p: pointer) when not defined(boehmgc) and not hasSharedHeap: proc deallocOsPages() -when defined(mainThread): - initInbox(addr(mainThread.inbox)) - template ThreadProcWrapperBody(closure: expr) = ThreadVarSetValue(globalsSlot, closure) - var t = cast[ptr TThread[TMsg]](closure) + var t = cast[ptr TThread[TArg]](closure) when useStackMaskHack: var tls: TThreadLocalStorage when not defined(boehmgc) and not defined(nogc) and not hasSharedHeap: @@ -279,9 +273,8 @@ template ThreadProcWrapperBody(closure: expr) = when defined(registerThread): t.stackBottom = addr(t) registerThread(t) - if t.emptyFn == nil: t.dataFn(t.data) - else: t.emptyFn() - freeInbox(addr(t.inbox)) + if TArg is void: t.dataFn() + else: t.dataFn(t.data) when defined(registerThread): unregisterThread(t) when defined(deallocOsPages): deallocOsPages() # Since an unhandled exception terminates the whole process (!), there is @@ -291,31 +284,30 @@ template ThreadProcWrapperBody(closure: expr) = # page! # mark as not running anymore: - t.emptyFn = nil t.dataFn = nil {.push stack_trace:off.} when defined(windows): - proc threadProcWrapper[TMsg](closure: pointer): int32 {.stdcall.} = + proc threadProcWrapper[TArg](closure: pointer): int32 {.stdcall.} = ThreadProcWrapperBody(closure) # implicitely return 0 else: - proc threadProcWrapper[TMsg](closure: pointer) {.noconv.} = + proc threadProcWrapper[TArg](closure: pointer) {.noconv.} = ThreadProcWrapperBody(closure) {.pop.} -proc running*[TMsg](t: TThread[TMsg]): bool {.inline.} = +proc running*[TArg](t: TThread[TArg]): bool {.inline.} = ## returns true if `t` is running. - result = t.emptyFn != nil or t.dataFn != nil + result = t.dataFn != nil -proc joinThread*[TMsg](t: TThread[TMsg]) {.inline.} = +proc joinThread*[TArg](t: TThread[TArg]) {.inline.} = ## waits for the thread `t` to finish. when hostOS == "windows": discard WaitForSingleObject(t.sys, -1'i32) else: discard pthread_join(t.sys, nil) -proc joinThreads*[TMsg](t: openArray[TThread[TMsg]]) = +proc joinThreads*[TArg](t: openArray[TThread[TArg]]) = ## waits for every thread in `t` to finish. when hostOS == "windows": var a: array[0..255, TSysThread] @@ -327,27 +319,28 @@ proc joinThreads*[TMsg](t: openArray[TThread[TMsg]]) = when false: # XXX a thread should really release its heap here somehow: - proc destroyThread*[TMsg](t: var TThread[TMsg]) = + proc destroyThread*[TArg](t: var TThread[TArg]) = ## forces the thread `t` to terminate. This is potentially dangerous if ## you don't have full control over `t` and its acquired resources. when hostOS == "windows": discard TerminateThread(t.sys, 1'i32) else: discard pthread_cancel(t.sys) - unregisterThread(addr(t)) + when defined(registerThread): unregisterThread(addr(t)) + t.dataFn = nil -proc createThread*[TMsg](t: var TThread[TMsg], - tp: proc (msg: TMsg) {.thread.}, - param: TMsg) = +proc createThread*[TArg](t: var TThread[TArg], + tp: proc (arg: TArg) {.thread.}, + param: TArg) = ## creates a new thread `t` and starts its execution. Entry point is the - ## proc `tp`. `param` is passed to `tp`. - t.data = param + ## proc `tp`. `param` is passed to `tp`. `TArg` can be ``void`` if you + ## don't need to pass any data to the thread. + when TArg isnot void: t.data = param t.dataFn = tp when hasSharedHeap: t.stackSize = ThreadStackSize - initInbox(addr(t.inbox)) when hostOS == "windows": var dummyThreadId: int32 - t.sys = CreateThread(nil, ThreadStackSize, threadProcWrapper[TMsg], + t.sys = CreateThread(nil, ThreadStackSize, threadProcWrapper[TArg], addr(t), 0'i32, dummyThreadId) if t.sys <= 0: raise newException(EResourceExhausted, "cannot create thread") @@ -355,39 +348,20 @@ proc createThread*[TMsg](t: var TThread[TMsg], var a: Tpthread_attr pthread_attr_init(a) pthread_attr_setstacksize(a, ThreadStackSize) - if pthread_create(t.sys, a, threadProcWrapper[TMsg], addr(t)) != 0: + if pthread_create(t.sys, a, threadProcWrapper[TArg], addr(t)) != 0: raise newException(EResourceExhausted, "cannot create thread") -proc createThread*[TMsg](t: var TThread[TMsg], tp: proc () {.thread.}) = - ## creates a new thread `t` and starts its execution. Entry point is the - ## proc `tp`. - t.emptyFn = tp - when hasSharedHeap: t.stackSize = ThreadStackSize - initInbox(addr(t.inbox)) - when hostOS == "windows": - var dummyThreadId: int32 - t.sys = CreateThread(nil, ThreadStackSize, threadProcWrapper[TMsg], - addr(t), 0'i32, dummyThreadId) - if t.sys <= 0: - raise newException(EResourceExhausted, "cannot create thread") - else: - var a: Tpthread_attr - pthread_attr_init(a) - pthread_attr_setstacksize(a, ThreadStackSize) - if pthread_create(t.sys, a, threadProcWrapper[TMsg], addr(t)) != 0: - raise newException(EResourceExhausted, "cannot create thread") - -proc threadId*[TMsg](t: var TThread[TMsg]): TThreadId[TMsg] {.inline.} = +proc threadId*[TArg](t: var TThread[TArg]): TThreadId[TArg] {.inline.} = ## returns the thread ID of `t`. result = addr(t) -proc myThreadId*[TMsg](): TThreadId[TMsg] = +proc myThreadId*[TArg](): TThreadId[TArg] = ## returns the thread ID of the thread that calls this proc. - result = cast[TThreadId[TMsg]](ThreadVarGetValue(globalsSlot)) + result = cast[TThreadId[TArg]](ThreadVarGetValue(globalsSlot)) -proc mainThreadId*[TMsg](): TThreadId[TMsg] = +proc mainThreadId*[TArg](): TThreadId[TArg] = ## returns the thread ID of the main thread. - result = cast[TThreadId[TMsg]](addr(mainThread)) + result = cast[TThreadId[TArg]](addr(mainThread)) when useStackMaskHack: proc runMain(tp: proc () {.thread.}) {.compilerproc.} = @@ -395,11 +369,3 @@ when useStackMaskHack: createThread(mainThread, tp) joinThread(mainThread) -# ------------------------ message passing support --------------------------- - -proc getInBoxMem[TMsg](t: var TThread[TMsg]): pointer {.inline.} = - result = addr(t.inbox) - -proc getInBoxMem(): pointer {.inline.} = - result = addr(cast[PGcThread](ThreadVarGetValue(globalsSlot)).inbox) - diff --git a/todo.txt b/todo.txt index b779eb6dc..ad80e4812 100755 --- a/todo.txt +++ b/todo.txt @@ -1,12 +1,11 @@ Version 0.8.14 ============== -- optional indentation for 'case' statement -- test the sort implementation again -- fix the 'const' issues - threads should not have an inbox per default - make threadvar efficient again on linux after testing - +- fix the 'const' issues +- test the sort implementation again +- optional indentation for 'case' statement version 0.9.0 ============= @@ -47,7 +46,9 @@ version 0.9.XX - checked exceptions - fix implicit generic routines - think about ``{:}.toTable[int, string]()`` -- nice idea: +- mocking support with ``tyProxy`` that does: + o.p(x) --> p(o, x) --> myMacro(o, p, x) +- nice idea: p(a, b): echo a diff --git a/web/nimrod.ini b/web/nimrod.ini index 156bc5221..c56b8bb84 100755 --- a/web/nimrod.ini +++ b/web/nimrod.ini @@ -26,7 +26,7 @@ doc: "tools;c2nim;niminst" pdf: "manual;lib;tut1;tut2;nimrodc;c2nim;niminst" srcdoc: "core/macros;pure/marshal;core/typeinfo" srcdoc: "impure/graphics;impure/re;pure/sockets" -srcdoc: "system.nim;system/threads.nim;system/inboxes.nim" +srcdoc: "system.nim;system/threads.nim;system/channels.nim" srcdoc: "pure/os;pure/strutils;pure/math;pure/matchers" srcdoc: "pure/complex;pure/times;pure/osproc;pure/pegs;pure/dynlib" srcdoc: "pure/parseopt;pure/hashes;pure/strtabs;pure/lexbase" |