summary refs log tree commit diff stats
path: root/lib/system/channels.nim
diff options
context:
space:
mode:
Diffstat (limited to 'lib/system/channels.nim')
-rwxr-xr-xlib/system/channels.nim151
1 files changed, 49 insertions, 102 deletions
diff --git a/lib/system/channels.nim b/lib/system/channels.nim
index d4a4b1eb4..e443dd6c1 100755
--- a/lib/system/channels.nim
+++ b/lib/system/channels.nim
@@ -7,7 +7,7 @@
 #    distribution, for details about the copyright.

 #

 

-## Message passing for threads. **Note**: This is part of the system module.

+## Channel support for threads. **Note**: This is part of the system module.

 ## Do not import it directly. To activate thread support you need to compile

 ## with the ``--threads:on`` command line switch.

 ##

@@ -16,7 +16,7 @@
 

 type

   pbytes = ptr array[0.. 0xffff, byte]

-  TInbox {.pure, final.} = object ## msg queue for a thread

+  TRawChannel {.pure, final.} = object ## msg queue for a thread

     rd, wr, count, mask: int

     data: pbytes

     lock: TSysLock

@@ -24,28 +24,30 @@ type
     elemType: PNimType

     ready: bool

     region: TMemRegion

-  PInbox = ptr TInbox

-  TLoadStoreMode = enum mStore, mLoad

+  PRawChannel = ptr TRawChannel

+  TLoadStoreMode = enum mStore, mLoad
+  TChannel*[TMsg] = TRawChannel ## a channel for thread communication

 

-const ThreadDeadMask = -2

+const ChannelDeadMask = -2

 

-proc initInbox(p: pointer) =

-  var inbox = cast[PInbox](p)

-  initSysLock(inbox.lock)

-  initSysCond(inbox.cond)

-  inbox.mask = -1

+proc initRawChannel(p: pointer) =

+  var c = cast[PRawChannel](p)

+  initSysLock(c.lock)

+  initSysCond(c.cond)

+  c.mask = -1

 

-proc freeInbox(p: pointer) =

-  var inbox = cast[PInbox](p)

+proc deinitRawChannel(p: pointer) =

+  var c = cast[PRawChannel](p)

   # we need to grab the lock to be save against sending threads!

-  acquireSys(inbox.lock)

-  inbox.mask = ThreadDeadMask

-  deallocOsPages(inbox.region)

-  deinitSys(inbox.lock)

-  deinitSysCond(inbox.cond)

-

-proc storeAux(dest, src: Pointer, mt: PNimType, t: PInbox, mode: TLoadStoreMode)

-proc storeAux(dest, src: Pointer, n: ptr TNimNode, t: PInbox,

+  acquireSys(c.lock)

+  c.mask = ChannelDeadMask

+  deallocOsPages(c.region)

+  deinitSys(c.lock)

+  deinitSysCond(c.cond)

+

+proc storeAux(dest, src: Pointer, mt: PNimType, t: PRawChannel, 
+              mode: TLoadStoreMode)

+proc storeAux(dest, src: Pointer, n: ptr TNimNode, t: PRawChannel,

               mode: TLoadStoreMode) =

   var

     d = cast[TAddress](dest)

@@ -62,7 +64,7 @@ proc storeAux(dest, src: Pointer, n: ptr TNimNode, t: PInbox,
     if m != nil: storeAux(dest, src, m, t, mode)

   of nkNone: sysAssert(false)

 

-proc storeAux(dest, src: Pointer, mt: PNimType, t: PInbox, 

+proc storeAux(dest, src: Pointer, mt: PNimType, t: PRawChannel, 

               mode: TLoadStoreMode) =

   var

     d = cast[TAddress](dest)

@@ -138,9 +140,9 @@ proc storeAux(dest, src: Pointer, mt: PNimType, t: PInbox,
       if mode == mStore:

         x[] = Alloc(t.region, mt.base.size)

       else:

-        # XXX we should use the dynamic type here too, but that is not stored in

-        # the inbox at all --> use source[]'s object type? but how? we need a

-        # tyRef to the object!

+        # XXX we should use the dynamic type here too, but that is not stored
+        # in the inbox at all --> use source[]'s object type? but how? we need
+        # a tyRef to the object!

         var obj = newObj(mt.base, mt.base.size)

         unsureAsgnRef(x, obj)

       storeAux(x[], s, mt.base, t, mode)

@@ -148,7 +150,7 @@ proc storeAux(dest, src: Pointer, mt: PNimType, t: PInbox,
   else:

     copyMem(dest, src, mt.size) # copy raw bits

 

-proc rawSend(q: PInbox, data: pointer, typ: PNimType) =

+proc rawSend(q: PRawChannel, data: pointer, typ: PNimType) =

   ## adds an `item` to the end of the queue `q`.

   var cap = q.mask+1

   if q.count >= cap:

@@ -172,19 +174,19 @@ proc rawSend(q: PInbox, data: pointer, typ: PNimType) =
   inc q.count

   q.wr = (q.wr + 1) and q.mask

 

-proc rawRecv(q: PInbox, data: pointer, typ: PNimType) =

+proc rawRecv(q: PRawChannel, data: pointer, typ: PNimType) =

   assert q.count > 0

   dec q.count

   storeAux(data, addr(q.data[q.rd * typ.size]), typ, q, mLoad)

   q.rd = (q.rd + 1) and q.mask

 

-template lockInbox(q: expr, action: stmt) =

+template lockChannel(q: expr, action: stmt) =

   acquireSys(q.lock)

   action

   releaseSys(q.lock)

 

 template sendImpl(q: expr) =  

-  if q.mask == ThreadDeadMask:

+  if q.mask == ChannelDeadMask:

     raise newException(EDeadThread, "cannot send message; thread died")

   acquireSys(q.lock)

   var m: TMsg

@@ -195,17 +197,12 @@ template sendImpl(q: expr) =
   releaseSys(q.lock)

   SignalSysCond(q.cond)

 

-proc send*[TMsg](receiver: var TThread[TMsg], msg: TMsg) =

-  ## sends a message to a thread. `msg` is deeply copied.

-  var q = cast[PInbox](getInBoxMem(receiver))

-  sendImpl(q)

-

-proc send*[TMsg](receiver: TThreadId[TMsg], msg: TMsg) =

+proc send*[TMsg](c: var TChannel[TMsg], msg: TMsg) =

   ## sends a message to a thread. `msg` is deeply copied.

-  var q = cast[PInbox](getInBoxMem(receiver[]))

+  var q = cast[PRawChannel](addr(c))

   sendImpl(q)

 

-proc llRecv(q: PInbox, res: pointer, typ: PNimType) =

+proc llRecv(q: PRawChannel, res: pointer, typ: PNimType) =

   # to save space, the generic is as small as possible

   acquireSys(q.lock)

   q.ready = true

@@ -218,80 +215,30 @@ proc llRecv(q: PInbox, res: pointer, typ: PNimType) =
   rawRecv(q, res, typ)

   releaseSys(q.lock)

 

-proc recv*[TMsg](): TMsg =

-  ## receives a message from its internal message queue. This blocks until

+proc recv*[TMsg](c: var TChannel[TMsg]): TMsg =

+  ## receives a message from the channel `c`. This blocks until

   ## a message has arrived! You may use ``peek`` to avoid the blocking.

-  var q = cast[PInbox](getInBoxMem())

+  var q = cast[PRawChannel](addr(c))

   llRecv(q, addr(result), cast[PNimType](getTypeInfo(result)))

 

-proc peek*(): int =

-  ## returns the current number of messages in the inbox.

-  var q = cast[PInbox](getInBoxMem())

-  lockInbox(q):

-    result = q.count

-

-proc peek*[TMsg](t: var TThread[TMsg]): int =

-  ## returns the current number of messages in the inbox of thread `t`.

-  var q = cast[PInbox](getInBoxMem(t))

-  if q.mask != ThreadDeadMask:

-    lockInbox(q):

+proc peek*[TMsg](c: var TChannel[TMsg]): int =

+  ## returns the current number of messages in the channel `c`.

+  var q = cast[PRawChannel](addr(c))

+  if q.mask != ChannelDeadMask:

+    lockChannel(q):

       result = q.count

 

-proc ready*[TMsg](t: var TThread[TMsg]): bool =

-  ## returns true iff the thread `t` is waiting on ``recv`` for new messages.

-  var q = cast[PInbox](getInBoxMem(t))

-  result = q.ready

-

-# ---------------------- channel support -------------------------------------

-

-type

-  TChannel*[TMsg] = TInbox ## a channel for thread communication

-  TChannelId*[TMsg] = ptr TChannel[TMsg] ## the current implementation uses

-                                         ## a pointer as a channel ID.

-

 proc open*[TMsg](c: var TChannel[TMsg]) =

   ## opens a channel `c` for inter thread communication.

-  initInbox(addr(c))

+  initRawChannel(addr(c))

 

 proc close*[TMsg](c: var TChannel[TMsg]) =

   ## closes a channel `c` and frees its associated resources.

-  freeInbox(addr(c))

-

-proc channelId*[TMsg](c: var TChannel[TMsg]): TChannelId[TMsg] {.inline.} =

-  ## returns the channel ID of `c`.

-  result = addr(c)

-  

-proc send*[TMsg](c: var TChannel[TMsg], msg: TMsg) =

-  ## sends a message to a channel. `msg` is deeply copied.

-  var q = cast[PInbox](addr(c))

-  sendImpl(q)

-

-proc send*[TMsg](c: TChannelId[TMsg], msg: TMsg) =

-  ## sends a message to a thread. `msg` is deeply copied.

-  var q = cast[PInbox](c)

-  sendImpl(q)

-

-proc peek*[TMsg](c: var TChannel[TMsg]): int =

-  ## returns the current number of messages in the channel `c`.

-  var q = cast[PInbox](addr(c))

-  lockInbox(q):

-    result = q.count

-

-proc peek*[TMsg](c: TChannelId[TMsg]): int =

-  ## returns the current number of messages in the channel `c`.

-  var q = cast[PInbox](c)

-  lockInbox(q):

-    result = q.count

-

-proc recv*[TMsg](c: TChannelId[TMsg]): TMsg =

-  ## receives a message from the channel `c`. This blocks until

-  ## a message has arrived! You may use ``peek`` to avoid the blocking.

-  var q = cast[PInbox](c)

-  llRecv(q, addr(result), cast[PNimType](getTypeInfo(result)))

-

-proc recv*[TMsg](c: var TChannel[TMsg]): TMsg =

-  ## receives a message from the channel `c`. This blocks until

-  ## a message has arrived! You may use ``peek`` to avoid the blocking.

-  var q = cast[PInbox](addr(c))

-  llRecv(q, addr(result), cast[PNimType](getTypeInfo(result)))

+  deinitRawChannel(addr(c))

 

+proc ready*[TMsg](c: var TChannel[TMsg]): bool =

+  ## returns true iff some thread is waiting on the channel `c` for
+  ## new messages.

+  var q = cast[PRawChannel](addr(c))

+  result = q.ready

+