diff options
Diffstat (limited to 'lib/system/channels.nim')
-rwxr-xr-x | lib/system/channels.nim | 151 |
1 files changed, 49 insertions, 102 deletions
diff --git a/lib/system/channels.nim b/lib/system/channels.nim index d4a4b1eb4..e443dd6c1 100755 --- a/lib/system/channels.nim +++ b/lib/system/channels.nim @@ -7,7 +7,7 @@ # distribution, for details about the copyright. # -## Message passing for threads. **Note**: This is part of the system module. +## Channel support for threads. **Note**: This is part of the system module. ## Do not import it directly. To activate thread support you need to compile ## with the ``--threads:on`` command line switch. ## @@ -16,7 +16,7 @@ type pbytes = ptr array[0.. 0xffff, byte] - TInbox {.pure, final.} = object ## msg queue for a thread + TRawChannel {.pure, final.} = object ## msg queue for a thread rd, wr, count, mask: int data: pbytes lock: TSysLock @@ -24,28 +24,30 @@ type elemType: PNimType ready: bool region: TMemRegion - PInbox = ptr TInbox - TLoadStoreMode = enum mStore, mLoad + PRawChannel = ptr TRawChannel + TLoadStoreMode = enum mStore, mLoad + TChannel*[TMsg] = TRawChannel ## a channel for thread communication -const ThreadDeadMask = -2 +const ChannelDeadMask = -2 -proc initInbox(p: pointer) = - var inbox = cast[PInbox](p) - initSysLock(inbox.lock) - initSysCond(inbox.cond) - inbox.mask = -1 +proc initRawChannel(p: pointer) = + var c = cast[PRawChannel](p) + initSysLock(c.lock) + initSysCond(c.cond) + c.mask = -1 -proc freeInbox(p: pointer) = - var inbox = cast[PInbox](p) +proc deinitRawChannel(p: pointer) = + var c = cast[PRawChannel](p) # we need to grab the lock to be save against sending threads! - acquireSys(inbox.lock) - inbox.mask = ThreadDeadMask - deallocOsPages(inbox.region) - deinitSys(inbox.lock) - deinitSysCond(inbox.cond) - -proc storeAux(dest, src: Pointer, mt: PNimType, t: PInbox, mode: TLoadStoreMode) -proc storeAux(dest, src: Pointer, n: ptr TNimNode, t: PInbox, + acquireSys(c.lock) + c.mask = ChannelDeadMask + deallocOsPages(c.region) + deinitSys(c.lock) + deinitSysCond(c.cond) + +proc storeAux(dest, src: Pointer, mt: PNimType, t: PRawChannel, + mode: TLoadStoreMode) +proc storeAux(dest, src: Pointer, n: ptr TNimNode, t: PRawChannel, mode: TLoadStoreMode) = var d = cast[TAddress](dest) @@ -62,7 +64,7 @@ proc storeAux(dest, src: Pointer, n: ptr TNimNode, t: PInbox, if m != nil: storeAux(dest, src, m, t, mode) of nkNone: sysAssert(false) -proc storeAux(dest, src: Pointer, mt: PNimType, t: PInbox, +proc storeAux(dest, src: Pointer, mt: PNimType, t: PRawChannel, mode: TLoadStoreMode) = var d = cast[TAddress](dest) @@ -138,9 +140,9 @@ proc storeAux(dest, src: Pointer, mt: PNimType, t: PInbox, if mode == mStore: x[] = Alloc(t.region, mt.base.size) else: - # XXX we should use the dynamic type here too, but that is not stored in - # the inbox at all --> use source[]'s object type? but how? we need a - # tyRef to the object! + # XXX we should use the dynamic type here too, but that is not stored + # in the inbox at all --> use source[]'s object type? but how? we need + # a tyRef to the object! var obj = newObj(mt.base, mt.base.size) unsureAsgnRef(x, obj) storeAux(x[], s, mt.base, t, mode) @@ -148,7 +150,7 @@ proc storeAux(dest, src: Pointer, mt: PNimType, t: PInbox, else: copyMem(dest, src, mt.size) # copy raw bits -proc rawSend(q: PInbox, data: pointer, typ: PNimType) = +proc rawSend(q: PRawChannel, data: pointer, typ: PNimType) = ## adds an `item` to the end of the queue `q`. var cap = q.mask+1 if q.count >= cap: @@ -172,19 +174,19 @@ proc rawSend(q: PInbox, data: pointer, typ: PNimType) = inc q.count q.wr = (q.wr + 1) and q.mask -proc rawRecv(q: PInbox, data: pointer, typ: PNimType) = +proc rawRecv(q: PRawChannel, data: pointer, typ: PNimType) = assert q.count > 0 dec q.count storeAux(data, addr(q.data[q.rd * typ.size]), typ, q, mLoad) q.rd = (q.rd + 1) and q.mask -template lockInbox(q: expr, action: stmt) = +template lockChannel(q: expr, action: stmt) = acquireSys(q.lock) action releaseSys(q.lock) template sendImpl(q: expr) = - if q.mask == ThreadDeadMask: + if q.mask == ChannelDeadMask: raise newException(EDeadThread, "cannot send message; thread died") acquireSys(q.lock) var m: TMsg @@ -195,17 +197,12 @@ template sendImpl(q: expr) = releaseSys(q.lock) SignalSysCond(q.cond) -proc send*[TMsg](receiver: var TThread[TMsg], msg: TMsg) = - ## sends a message to a thread. `msg` is deeply copied. - var q = cast[PInbox](getInBoxMem(receiver)) - sendImpl(q) - -proc send*[TMsg](receiver: TThreadId[TMsg], msg: TMsg) = +proc send*[TMsg](c: var TChannel[TMsg], msg: TMsg) = ## sends a message to a thread. `msg` is deeply copied. - var q = cast[PInbox](getInBoxMem(receiver[])) + var q = cast[PRawChannel](addr(c)) sendImpl(q) -proc llRecv(q: PInbox, res: pointer, typ: PNimType) = +proc llRecv(q: PRawChannel, res: pointer, typ: PNimType) = # to save space, the generic is as small as possible acquireSys(q.lock) q.ready = true @@ -218,80 +215,30 @@ proc llRecv(q: PInbox, res: pointer, typ: PNimType) = rawRecv(q, res, typ) releaseSys(q.lock) -proc recv*[TMsg](): TMsg = - ## receives a message from its internal message queue. This blocks until +proc recv*[TMsg](c: var TChannel[TMsg]): TMsg = + ## receives a message from the channel `c`. This blocks until ## a message has arrived! You may use ``peek`` to avoid the blocking. - var q = cast[PInbox](getInBoxMem()) + var q = cast[PRawChannel](addr(c)) llRecv(q, addr(result), cast[PNimType](getTypeInfo(result))) -proc peek*(): int = - ## returns the current number of messages in the inbox. - var q = cast[PInbox](getInBoxMem()) - lockInbox(q): - result = q.count - -proc peek*[TMsg](t: var TThread[TMsg]): int = - ## returns the current number of messages in the inbox of thread `t`. - var q = cast[PInbox](getInBoxMem(t)) - if q.mask != ThreadDeadMask: - lockInbox(q): +proc peek*[TMsg](c: var TChannel[TMsg]): int = + ## returns the current number of messages in the channel `c`. + var q = cast[PRawChannel](addr(c)) + if q.mask != ChannelDeadMask: + lockChannel(q): result = q.count -proc ready*[TMsg](t: var TThread[TMsg]): bool = - ## returns true iff the thread `t` is waiting on ``recv`` for new messages. - var q = cast[PInbox](getInBoxMem(t)) - result = q.ready - -# ---------------------- channel support ------------------------------------- - -type - TChannel*[TMsg] = TInbox ## a channel for thread communication - TChannelId*[TMsg] = ptr TChannel[TMsg] ## the current implementation uses - ## a pointer as a channel ID. - proc open*[TMsg](c: var TChannel[TMsg]) = ## opens a channel `c` for inter thread communication. - initInbox(addr(c)) + initRawChannel(addr(c)) proc close*[TMsg](c: var TChannel[TMsg]) = ## closes a channel `c` and frees its associated resources. - freeInbox(addr(c)) - -proc channelId*[TMsg](c: var TChannel[TMsg]): TChannelId[TMsg] {.inline.} = - ## returns the channel ID of `c`. - result = addr(c) - -proc send*[TMsg](c: var TChannel[TMsg], msg: TMsg) = - ## sends a message to a channel. `msg` is deeply copied. - var q = cast[PInbox](addr(c)) - sendImpl(q) - -proc send*[TMsg](c: TChannelId[TMsg], msg: TMsg) = - ## sends a message to a thread. `msg` is deeply copied. - var q = cast[PInbox](c) - sendImpl(q) - -proc peek*[TMsg](c: var TChannel[TMsg]): int = - ## returns the current number of messages in the channel `c`. - var q = cast[PInbox](addr(c)) - lockInbox(q): - result = q.count - -proc peek*[TMsg](c: TChannelId[TMsg]): int = - ## returns the current number of messages in the channel `c`. - var q = cast[PInbox](c) - lockInbox(q): - result = q.count - -proc recv*[TMsg](c: TChannelId[TMsg]): TMsg = - ## receives a message from the channel `c`. This blocks until - ## a message has arrived! You may use ``peek`` to avoid the blocking. - var q = cast[PInbox](c) - llRecv(q, addr(result), cast[PNimType](getTypeInfo(result))) - -proc recv*[TMsg](c: var TChannel[TMsg]): TMsg = - ## receives a message from the channel `c`. This blocks until - ## a message has arrived! You may use ``peek`` to avoid the blocking. - var q = cast[PInbox](addr(c)) - llRecv(q, addr(result), cast[PNimType](getTypeInfo(result))) + deinitRawChannel(addr(c)) +proc ready*[TMsg](c: var TChannel[TMsg]): bool = + ## returns true iff some thread is waiting on the channel `c` for + ## new messages. + var q = cast[PRawChannel](addr(c)) + result = q.ready + |