diff options
Diffstat (limited to 'lib/system/channels_builtin.nim')
-rw-r--r-- | lib/system/channels_builtin.nim | 454 |
1 files changed, 454 insertions, 0 deletions
diff --git a/lib/system/channels_builtin.nim b/lib/system/channels_builtin.nim new file mode 100644 index 000000000..c7a445766 --- /dev/null +++ b/lib/system/channels_builtin.nim @@ -0,0 +1,454 @@ +# +# +# Nim's Runtime Library +# (c) Copyright 2015 Andreas Rumpf +# +# See the file "copying.txt", included in this +# distribution, for details about the copyright. +# + +## Channel support for threads. +## +## **Note**: This is part of the system module. Do not import it directly. +## To activate thread support compile with the `--threads:on` command line switch. +## +## **Note:** Channels are designed for the `Thread` type. They are unstable when +## used with `spawn` +## +## **Note:** The current implementation of message passing does +## not work with cyclic data structures. +## +## **Note:** Channels cannot be passed between threads. Use globals or pass +## them by `ptr`. +## +## Example +## ======= +## The following is a simple example of two different ways to use channels: +## blocking and non-blocking. +## +## .. code-block :: Nim +## # Be sure to compile with --threads:on. +## # The channels and threads modules are part of system and should not be +## # imported. +## import std/os +## +## # Channels can either be: +## # - declared at the module level, or +## # - passed to procedures by ptr (raw pointer) -- see note on safety. +## # +## # For simplicity, in this example a channel is declared at module scope. +## # Channels are generic, and they include support for passing objects between +## # threads. +## # Note that objects passed through channels will be deeply copied. +## var chan: Channel[string] +## +## # This proc will be run in another thread using the threads module. +## proc firstWorker() = +## chan.send("Hello World!") +## +## # This is another proc to run in a background thread. This proc takes a while +## # to send the message since it sleeps for 2 seconds (or 2000 milliseconds). +## proc secondWorker() = +## sleep(2000) +## chan.send("Another message") +## +## # Initialize the channel. +## chan.open() +## +## # Launch the worker. +## var worker1: Thread[void] +## createThread(worker1, firstWorker) +## +## # Block until the message arrives, then print it out. +## echo chan.recv() # "Hello World!" +## +## # Wait for the thread to exit before moving on to the next example. +## worker1.joinThread() +## +## # Launch the other worker. +## var worker2: Thread[void] +## createThread(worker2, secondWorker) +## # This time, use a non-blocking approach with tryRecv. +## # Since the main thread is not blocked, it could be used to perform other +## # useful work while it waits for data to arrive on the channel. +## while true: +## let tried = chan.tryRecv() +## if tried.dataAvailable: +## echo tried.msg # "Another message" +## break +## +## echo "Pretend I'm doing useful work..." +## # For this example, sleep in order not to flood stdout with the above +## # message. +## sleep(400) +## +## # Wait for the second thread to exit before cleaning up the channel. +## worker2.joinThread() +## +## # Clean up the channel. +## chan.close() +## +## Sample output +## ------------- +## The program should output something similar to this, but keep in mind that +## exact results may vary in the real world:: +## Hello World! +## Pretend I'm doing useful work... +## Pretend I'm doing useful work... +## Pretend I'm doing useful work... +## Pretend I'm doing useful work... +## Pretend I'm doing useful work... +## Another message +## +## Passing Channels Safely +## ----------------------- +## Note that when passing objects to procedures on another thread by pointer +## (for example through a thread's argument), objects created using the default +## allocator will use thread-local, GC-managed memory. Thus it is generally +## safer to store channel objects in global variables (as in the above example), +## in which case they will use a process-wide (thread-safe) shared heap. +## +## However, it is possible to manually allocate shared memory for channels +## using e.g. `system.allocShared0` and pass these pointers through thread +## arguments: +## +## .. code-block :: Nim +## proc worker(channel: ptr Channel[string]) = +## let greeting = channel[].recv() +## echo greeting +## +## proc localChannelExample() = +## # Use allocShared0 to allocate some shared-heap memory and zero it. +## # The usual warnings about dealing with raw pointers apply. Exercise caution. +## var channel = cast[ptr Channel[string]]( +## allocShared0(sizeof(Channel[string])) +## ) +## channel[].open() +## # Create a thread which will receive the channel as an argument. +## var thread: Thread[ptr Channel[string]] +## createThread(thread, worker, channel) +## channel[].send("Hello from the main thread!") +## # Clean up resources. +## thread.joinThread() +## channel[].close() +## deallocShared(channel) +## +## localChannelExample() # "Hello from the main thread!" + +when not declared(ThisIsSystem): + {.error: "You must not import this module explicitly".} + +type + pbytes = ptr UncheckedArray[byte] + RawChannel {.pure, final.} = object ## msg queue for a thread + rd, wr, count, mask, maxItems: int + data: pbytes + lock: SysLock + cond: SysCond + elemType: PNimType + ready: bool + when not usesDestructors: + region: MemRegion + PRawChannel = ptr RawChannel + LoadStoreMode = enum mStore, mLoad + Channel*[TMsg] {.gcsafe.} = RawChannel ## a channel for thread communication + +const ChannelDeadMask = -2 + +proc initRawChannel(p: pointer, maxItems: int) = + var c = cast[PRawChannel](p) + initSysLock(c.lock) + initSysCond(c.cond) + c.mask = -1 + c.maxItems = maxItems + +proc deinitRawChannel(p: pointer) = + var c = cast[PRawChannel](p) + # we need to grab the lock to be safe against sending threads! + acquireSys(c.lock) + c.mask = ChannelDeadMask + when not usesDestructors: + deallocOsPages(c.region) + else: + if c.data != nil: deallocShared(c.data) + deinitSys(c.lock) + deinitSysCond(c.cond) + +when not usesDestructors: + + proc storeAux(dest, src: pointer, mt: PNimType, t: PRawChannel, + mode: LoadStoreMode) {.benign.} + + proc storeAux(dest, src: pointer, n: ptr TNimNode, t: PRawChannel, + mode: LoadStoreMode) {.benign.} = + var + d = cast[ByteAddress](dest) + s = cast[ByteAddress](src) + case n.kind + of nkSlot: storeAux(cast[pointer](d +% n.offset), + cast[pointer](s +% n.offset), n.typ, t, mode) + of nkList: + for i in 0..n.len-1: storeAux(dest, src, n.sons[i], t, mode) + of nkCase: + copyMem(cast[pointer](d +% n.offset), cast[pointer](s +% n.offset), + n.typ.size) + var m = selectBranch(src, n) + if m != nil: storeAux(dest, src, m, t, mode) + of nkNone: sysAssert(false, "storeAux") + + proc storeAux(dest, src: pointer, mt: PNimType, t: PRawChannel, + mode: LoadStoreMode) = + template `+!`(p: pointer; x: int): pointer = + cast[pointer](cast[int](p) +% x) + + var + d = cast[ByteAddress](dest) + s = cast[ByteAddress](src) + sysAssert(mt != nil, "mt == nil") + case mt.kind + of tyString: + if mode == mStore: + var x = cast[PPointer](dest) + var s2 = cast[PPointer](s)[] + if s2 == nil: + x[] = nil + else: + var ss = cast[NimString](s2) + var ns = cast[NimString](alloc(t.region, GenericSeqSize + ss.len+1)) + copyMem(ns, ss, ss.len+1 + GenericSeqSize) + x[] = ns + else: + var x = cast[PPointer](dest) + var s2 = cast[PPointer](s)[] + if s2 == nil: + unsureAsgnRef(x, s2) + else: + let y = copyDeepString(cast[NimString](s2)) + #echo "loaded ", cast[int](y), " ", cast[string](y) + unsureAsgnRef(x, y) + dealloc(t.region, s2) + of tySequence: + var s2 = cast[PPointer](src)[] + var seq = cast[PGenericSeq](s2) + var x = cast[PPointer](dest) + if s2 == nil: + if mode == mStore: + x[] = nil + else: + unsureAsgnRef(x, nil) + else: + sysAssert(dest != nil, "dest == nil") + if mode == mStore: + x[] = alloc0(t.region, align(GenericSeqSize, mt.base.align) +% seq.len *% mt.base.size) + else: + unsureAsgnRef(x, newSeq(mt, seq.len)) + var dst = cast[ByteAddress](cast[PPointer](dest)[]) + var dstseq = cast[PGenericSeq](dst) + dstseq.len = seq.len + dstseq.reserved = seq.len + for i in 0..seq.len-1: + storeAux( + cast[pointer](dst +% align(GenericSeqSize, mt.base.align) +% i *% mt.base.size), + cast[pointer](cast[ByteAddress](s2) +% align(GenericSeqSize, mt.base.align) +% + i *% mt.base.size), + mt.base, t, mode) + if mode != mStore: dealloc(t.region, s2) + of tyObject: + if mt.base != nil: + storeAux(dest, src, mt.base, t, mode) + else: + # copy type field: + var pint = cast[ptr PNimType](dest) + pint[] = cast[ptr PNimType](src)[] + storeAux(dest, src, mt.node, t, mode) + of tyTuple: + storeAux(dest, src, mt.node, t, mode) + of tyArray, tyArrayConstr: + for i in 0..(mt.size div mt.base.size)-1: + storeAux(cast[pointer](d +% i *% mt.base.size), + cast[pointer](s +% i *% mt.base.size), mt.base, t, mode) + of tyRef: + var s = cast[PPointer](src)[] + var x = cast[PPointer](dest) + if s == nil: + if mode == mStore: + x[] = nil + else: + unsureAsgnRef(x, nil) + else: + #let size = if mt.base.kind == tyObject: cast[ptr PNimType](s)[].size + # else: mt.base.size + if mode == mStore: + let dyntype = when declared(usrToCell): usrToCell(s).typ + else: mt + let size = dyntype.base.size + # we store the real dynamic 'ref type' at offset 0, so that + # no information is lost + let a = alloc0(t.region, size+sizeof(pointer)) + x[] = a + cast[PPointer](a)[] = dyntype + storeAux(a +! sizeof(pointer), s, dyntype.base, t, mode) + else: + let dyntype = cast[ptr PNimType](s)[] + var obj = newObj(dyntype, dyntype.base.size) + unsureAsgnRef(x, obj) + storeAux(x[], s +! sizeof(pointer), dyntype.base, t, mode) + dealloc(t.region, s) + else: + copyMem(dest, src, mt.size) # copy raw bits + +proc rawSend(q: PRawChannel, data: pointer, typ: PNimType) = + ## Adds an `item` to the end of the queue `q`. + var cap = q.mask+1 + if q.count >= cap: + # start with capacity for 2 entries in the queue: + if cap == 0: cap = 1 + when not usesDestructors: + var n = cast[pbytes](alloc0(q.region, cap*2*typ.size)) + else: + var n = cast[pbytes](allocShared0(cap*2*typ.size)) + var z = 0 + var i = q.rd + var c = q.count + while c > 0: + dec c + copyMem(addr(n[z*typ.size]), addr(q.data[i*typ.size]), typ.size) + i = (i + 1) and q.mask + inc z + if q.data != nil: + when not usesDestructors: + dealloc(q.region, q.data) + else: + deallocShared(q.data) + q.data = n + q.mask = cap*2 - 1 + q.wr = q.count + q.rd = 0 + when not usesDestructors: + storeAux(addr(q.data[q.wr * typ.size]), data, typ, q, mStore) + else: + copyMem(addr(q.data[q.wr * typ.size]), data, typ.size) + inc q.count + q.wr = (q.wr + 1) and q.mask + +proc rawRecv(q: PRawChannel, data: pointer, typ: PNimType) = + sysAssert q.count > 0, "rawRecv" + dec q.count + when not usesDestructors: + storeAux(data, addr(q.data[q.rd * typ.size]), typ, q, mLoad) + else: + copyMem(data, addr(q.data[q.rd * typ.size]), typ.size) + q.rd = (q.rd + 1) and q.mask + +template lockChannel(q, action): untyped = + acquireSys(q.lock) + action + releaseSys(q.lock) + +proc sendImpl(q: PRawChannel, typ: PNimType, msg: pointer, noBlock: bool): bool = + if q.mask == ChannelDeadMask: + sysFatal(DeadThreadDefect, "cannot send message; thread died") + acquireSys(q.lock) + if q.maxItems > 0: + # Wait until count is less than maxItems + if noBlock and q.count >= q.maxItems: + releaseSys(q.lock) + return + + while q.count >= q.maxItems: + waitSysCond(q.cond, q.lock) + + rawSend(q, msg, typ) + q.elemType = typ + releaseSys(q.lock) + signalSysCond(q.cond) + result = true + +proc send*[TMsg](c: var Channel[TMsg], msg: sink TMsg) {.inline.} = + ## Sends a message to a thread. `msg` is deeply copied. + discard sendImpl(cast[PRawChannel](addr c), cast[PNimType](getTypeInfo(msg)), unsafeAddr(msg), false) + when defined(gcDestructors): + wasMoved(msg) + +proc trySend*[TMsg](c: var Channel[TMsg], msg: sink TMsg): bool {.inline.} = + ## Tries to send a message to a thread. + ## + ## `msg` is deeply copied. Doesn't block. + ## + ## Returns `false` if the message was not sent because number of pending items + ## in the channel exceeded `maxItems`. + result = sendImpl(cast[PRawChannel](addr c), cast[PNimType](getTypeInfo(msg)), unsafeAddr(msg), true) + when defined(gcDestructors): + if result: + wasMoved(msg) + +proc llRecv(q: PRawChannel, res: pointer, typ: PNimType) = + q.ready = true + while q.count <= 0: + waitSysCond(q.cond, q.lock) + q.ready = false + if typ != q.elemType: + releaseSys(q.lock) + sysFatal(ValueError, "cannot receive message of wrong type") + rawRecv(q, res, typ) + if q.maxItems > 0 and q.count == q.maxItems - 1: + # Parent thread is awaiting in send. Wake it up. + signalSysCond(q.cond) + +proc recv*[TMsg](c: var Channel[TMsg]): TMsg = + ## Receives a message from the channel `c`. + ## + ## This blocks until a message has arrived! + ## You may use `peek proc <#peek,Channel[TMsg]>`_ to avoid the blocking. + var q = cast[PRawChannel](addr(c)) + acquireSys(q.lock) + llRecv(q, addr(result), cast[PNimType](getTypeInfo(result))) + releaseSys(q.lock) + +proc tryRecv*[TMsg](c: var Channel[TMsg]): tuple[dataAvailable: bool, + msg: TMsg] = + ## Tries to receive a message from the channel `c`, but this can fail + ## for all sort of reasons, including contention. + ## + ## If it fails, it returns `(false, default(msg))` otherwise it + ## returns `(true, msg)`. + var q = cast[PRawChannel](addr(c)) + if q.mask != ChannelDeadMask: + if tryAcquireSys(q.lock): + if q.count > 0: + llRecv(q, addr(result.msg), cast[PNimType](getTypeInfo(result.msg))) + result.dataAvailable = true + releaseSys(q.lock) + +proc peek*[TMsg](c: var Channel[TMsg]): int = + ## Returns the current number of messages in the channel `c`. + ## + ## Returns -1 if the channel has been closed. + ## + ## **Note**: This is dangerous to use as it encourages races. + ## It's much better to use `tryRecv proc <#tryRecv,Channel[TMsg]>`_ instead. + var q = cast[PRawChannel](addr(c)) + if q.mask != ChannelDeadMask: + lockChannel(q): + result = q.count + else: + result = -1 + +proc open*[TMsg](c: var Channel[TMsg], maxItems: int = 0) = + ## Opens a channel `c` for inter thread communication. + ## + ## The `send` operation will block until number of unprocessed items is + ## less than `maxItems`. + ## + ## For unlimited queue set `maxItems` to 0. + initRawChannel(addr(c), maxItems) + +proc close*[TMsg](c: var Channel[TMsg]) = + ## Closes a channel `c` and frees its associated resources. + deinitRawChannel(addr(c)) + +proc ready*[TMsg](c: var Channel[TMsg]): bool = + ## Returns true if some thread is waiting on the channel `c` for + ## new messages. + var q = cast[PRawChannel](addr(c)) + result = q.ready |