diff options
author | Yuriy Glukhov <yglukhov@users.noreply.github.com> | 2017-07-31 21:06:55 +0300 |
---|---|---|
committer | Andreas Rumpf <rumpf_a@web.de> | 2017-07-31 20:06:55 +0200 |
commit | 3d543b1539f8d5dde5746c90c5de5fa3df1cadfd (patch) | |
tree | a3efeafc2493aec7f2610c4868c6a5844b41034d /lib/system | |
parent | 6b38b37b4fa3168f1bf96822b3ff4471d8fd68c0 (diff) | |
download | Nim-3d543b1539f8d5dde5746c90c5de5fa3df1cadfd.tar.gz |
Channels can now block depending on maxItems (#6153)
Diffstat (limited to 'lib/system')
-rw-r--r-- | lib/system/channels.nim | 43 |
1 files changed, 31 insertions, 12 deletions
diff --git a/lib/system/channels.nim b/lib/system/channels.nim index e3baff797..1b90e245f 100644 --- a/lib/system/channels.nim +++ b/lib/system/channels.nim @@ -22,7 +22,7 @@ when not declared(NimString): type pbytes = ptr array[0.. 0xffff, byte] RawChannel {.pure, final.} = object ## msg queue for a thread - rd, wr, count, mask: int + rd, wr, count, mask, maxItems: int data: pbytes lock: SysLock cond: SysCond @@ -37,11 +37,12 @@ type const ChannelDeadMask = -2 -proc initRawChannel(p: pointer) = +proc initRawChannel(p: pointer, maxItems: int) = var c = cast[PRawChannel](p) initSysLock(c.lock) initSysCond(c.cond) c.mask = -1 + c.maxItems = maxItems proc deinitRawChannel(p: pointer) = var c = cast[PRawChannel](p) @@ -208,23 +209,36 @@ template lockChannel(q, action): untyped = action releaseSys(q.lock) -template sendImpl(q) = +proc sendImpl(q: PRawChannel, typ: PNimType, msg: pointer, noBlock: bool): bool = if q.mask == ChannelDeadMask: sysFatal(DeadThreadError, "cannot send message; thread died") acquireSys(q.lock) - var typ = cast[PNimType](getTypeInfo(msg)) - rawSend(q, unsafeAddr(msg), typ) + if q.maxItems > 0: + # Wait until count is less than maxItems + if noBlock and q.count >= q.maxItems: + releaseSys(q.lock) + return + + while q.count >= q.maxItems: + waitSysCond(q.cond, q.lock) + + rawSend(q, msg, typ) q.elemType = typ releaseSys(q.lock) signalSysCond(q.cond) + result = true -proc send*[TMsg](c: var Channel[TMsg], msg: TMsg) = +proc send*[TMsg](c: var Channel[TMsg], msg: TMsg) {.inline.} = ## sends a message to a thread. `msg` is deeply copied. - var q = cast[PRawChannel](addr(c)) - sendImpl(q) + discard sendImpl(cast[PRawChannel](addr c), cast[PNimType](getTypeInfo(msg)), unsafeAddr(msg), false) + +proc trySend*[TMsg](c: var Channel[TMsg], msg: TMsg): bool {.inline.} = + ## Tries to send a message to a thread. `msg` is deeply copied. Doesn't block. + ## Returns `false` if the message was not sent because number of pending items + ## in the cannel exceeded `maxItems`. + sendImpl(cast[PRawChannel](addr c), cast[PNimType](getTypeInfo(msg)), unsafeAddr(msg), true) proc llRecv(q: PRawChannel, res: pointer, typ: PNimType) = - # to save space, the generic is as small as possible q.ready = true while q.count <= 0: waitSysCond(q.cond, q.lock) @@ -233,6 +247,9 @@ proc llRecv(q: PRawChannel, res: pointer, typ: PNimType) = releaseSys(q.lock) sysFatal(ValueError, "cannot receive message of wrong type") rawRecv(q, res, typ) + if q.maxItems > 0 and q.count == q.maxItems - 1: + # Parent thread is awaiting in send. Wake it up. + signalSysCond(q.cond) proc recv*[TMsg](c: var Channel[TMsg]): TMsg = ## receives a message from the channel `c`. This blocks until @@ -267,9 +284,11 @@ proc peek*[TMsg](c: var Channel[TMsg]): int = else: result = -1 -proc open*[TMsg](c: var Channel[TMsg]) = - ## opens a channel `c` for inter thread communication. - initRawChannel(addr(c)) +proc open*[TMsg](c: var Channel[TMsg], maxItems: int = 0) = + ## opens a channel `c` for inter thread communication. The `send` operation + ## will block until number of unprocessed items is less than `maxItems`. + ## For unlimited queue set `maxItems` to 0. + initRawChannel(addr(c), maxItems) proc close*[TMsg](c: var Channel[TMsg]) = ## closes a channel `c` and frees its associated resources. |