summary refs log tree commit diff stats
path: root/lib/system
diff options
context:
space:
mode:
authorYuriy Glukhov <yglukhov@users.noreply.github.com>2017-07-31 21:06:55 +0300
committerAndreas Rumpf <rumpf_a@web.de>2017-07-31 20:06:55 +0200
commit3d543b1539f8d5dde5746c90c5de5fa3df1cadfd (patch)
treea3efeafc2493aec7f2610c4868c6a5844b41034d /lib/system
parent6b38b37b4fa3168f1bf96822b3ff4471d8fd68c0 (diff)
downloadNim-3d543b1539f8d5dde5746c90c5de5fa3df1cadfd.tar.gz
Channels can now block depending on maxItems (#6153)
Diffstat (limited to 'lib/system')
-rw-r--r--lib/system/channels.nim43
1 files changed, 31 insertions, 12 deletions
diff --git a/lib/system/channels.nim b/lib/system/channels.nim
index e3baff797..1b90e245f 100644
--- a/lib/system/channels.nim
+++ b/lib/system/channels.nim
@@ -22,7 +22,7 @@ when not declared(NimString):
 type
   pbytes = ptr array[0.. 0xffff, byte]
   RawChannel {.pure, final.} = object ## msg queue for a thread
-    rd, wr, count, mask: int
+    rd, wr, count, mask, maxItems: int
     data: pbytes
     lock: SysLock
     cond: SysCond
@@ -37,11 +37,12 @@ type
 
 const ChannelDeadMask = -2
 
-proc initRawChannel(p: pointer) =
+proc initRawChannel(p: pointer, maxItems: int) =
   var c = cast[PRawChannel](p)
   initSysLock(c.lock)
   initSysCond(c.cond)
   c.mask = -1
+  c.maxItems = maxItems
 
 proc deinitRawChannel(p: pointer) =
   var c = cast[PRawChannel](p)
@@ -208,23 +209,36 @@ template lockChannel(q, action): untyped =
   action
   releaseSys(q.lock)
 
-template sendImpl(q) =
+proc sendImpl(q: PRawChannel, typ: PNimType, msg: pointer, noBlock: bool): bool =
   if q.mask == ChannelDeadMask:
     sysFatal(DeadThreadError, "cannot send message; thread died")
   acquireSys(q.lock)
-  var typ = cast[PNimType](getTypeInfo(msg))
-  rawSend(q, unsafeAddr(msg), typ)
+  if q.maxItems > 0:
+    # Wait until count is less than maxItems
+    if noBlock and q.count >= q.maxItems:
+      releaseSys(q.lock)
+      return
+
+    while q.count >= q.maxItems:
+      waitSysCond(q.cond, q.lock)
+
+  rawSend(q, msg, typ)
   q.elemType = typ
   releaseSys(q.lock)
   signalSysCond(q.cond)
+  result = true
 
-proc send*[TMsg](c: var Channel[TMsg], msg: TMsg) =
+proc send*[TMsg](c: var Channel[TMsg], msg: TMsg) {.inline.} =
   ## sends a message to a thread. `msg` is deeply copied.
-  var q = cast[PRawChannel](addr(c))
-  sendImpl(q)
+  discard sendImpl(cast[PRawChannel](addr c), cast[PNimType](getTypeInfo(msg)), unsafeAddr(msg), false)
+
+proc trySend*[TMsg](c: var Channel[TMsg], msg: TMsg): bool {.inline.} =
+  ## Tries to send a message to a thread. `msg` is deeply copied. Doesn't block.
+  ## Returns `false` if the message was not sent because number of pending items
+  ## in the cannel exceeded `maxItems`.
+  sendImpl(cast[PRawChannel](addr c), cast[PNimType](getTypeInfo(msg)), unsafeAddr(msg), true)
 
 proc llRecv(q: PRawChannel, res: pointer, typ: PNimType) =
-  # to save space, the generic is as small as possible
   q.ready = true
   while q.count <= 0:
     waitSysCond(q.cond, q.lock)
@@ -233,6 +247,9 @@ proc llRecv(q: PRawChannel, res: pointer, typ: PNimType) =
     releaseSys(q.lock)
     sysFatal(ValueError, "cannot receive message of wrong type")
   rawRecv(q, res, typ)
+  if q.maxItems > 0 and q.count == q.maxItems - 1:
+    # Parent thread is awaiting in send. Wake it up.
+    signalSysCond(q.cond)
 
 proc recv*[TMsg](c: var Channel[TMsg]): TMsg =
   ## receives a message from the channel `c`. This blocks until
@@ -267,9 +284,11 @@ proc peek*[TMsg](c: var Channel[TMsg]): int =
   else:
     result = -1
 
-proc open*[TMsg](c: var Channel[TMsg]) =
-  ## opens a channel `c` for inter thread communication.
-  initRawChannel(addr(c))
+proc open*[TMsg](c: var Channel[TMsg], maxItems: int = 0) =
+  ## opens a channel `c` for inter thread communication. The `send` operation
+  ## will block until number of unprocessed items is less than `maxItems`.
+  ## For unlimited queue set `maxItems` to 0.
+  initRawChannel(addr(c), maxItems)
 
 proc close*[TMsg](c: var Channel[TMsg]) =
   ## closes a channel `c` and frees its associated resources.