summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rwxr-xr-xcompiler/ecmasgen.nim5
-rw-r--r--lib/pure/actors.nim43
-rwxr-xr-xlib/system/channels.nim5
-rw-r--r--tests/run/tuserassert.nim1
-rw-r--r--tests/specials.nim1
-rw-r--r--tests/threads/trecursive_actor.nim19
6 files changed, 62 insertions, 12 deletions
diff --git a/compiler/ecmasgen.nim b/compiler/ecmasgen.nim
index 588abfc93..56749a139 100755
--- a/compiler/ecmasgen.nim
+++ b/compiler/ecmasgen.nim
@@ -720,7 +720,8 @@ proc generateHeader(p: var TProc, typ: PType): PRope =
 const 
   nodeKindsNeedNoCopy = {nkCharLit..nkInt64Lit, nkStrLit..nkTripleStrLit, 
     nkFloatLit..nkFloat64Lit, nkCurly, nkPar, nkStringToCString, 
-    nkCStringToString, nkCall, nkCommand, nkHiddenCallConv, nkCallStrLit}
+    nkCStringToString, nkCall, nkPrefix, nkPostfix, nkInfix, 
+    nkCommand, nkHiddenCallConv, nkCallStrLit}
 
 proc needsNoCopy(y: PNode): bool = 
   result = (y.kind in nodeKindsNeedNoCopy) or
@@ -1402,7 +1403,7 @@ proc gen(p: var TProc, n: PNode, r: var TCompRes) =
     else: r.res = toRope(f.ToStrMaxPrecision)
   of nkBlockExpr: genBlock(p, n, r)
   of nkIfExpr: genIfExpr(p, n, r)
-  of nkCall, nkHiddenCallConv, nkCommand, nkCallStrLit: 
+  of nkCallKinds: 
     if (n.sons[0].kind == nkSym) and (n.sons[0].sym.magic != mNone): 
       genMagic(p, n, r)
     else: 
diff --git a/lib/pure/actors.nim b/lib/pure/actors.nim
index 2510bb8cd..c07adfd93 100644
--- a/lib/pure/actors.nim
+++ b/lib/pure/actors.nim
@@ -11,6 +11,8 @@
 ## a channel as its inbox. This module requires the ``--threads:on``
 ## command line switch.
 
+from os import sleep
+
 type
   TTask*[TIn, TOut] = object{.pure, final.}
     when TOut isnot void:
@@ -29,7 +31,7 @@ type
 proc spawn*[TIn, TOut](action: proc(
     self: PActor[TIn, TOut]){.thread.}): PActor[TIn, TOut] =
   ## creates an actor; that is a thread with an inbox. The caller MUST call
-  ## ``join`` because that also frees the associated resources with the actor.
+  ## ``join`` because that also frees the actor's associated resources.
   result = cast[PActor[TIn, TOut]](allocShared0(sizeof(result[])))
   open(result.i)
   createThread(result.t, action, result)
@@ -42,6 +44,10 @@ proc running*[TIn, TOut](a: PActor[TIn, TOut]) =
   ## returns true if the actor `a` is running.
   result = running(a.t)
 
+proc ready*[TIn, TOut](a: PActor[TIn, TOut]): bool =
+  ## returns true if the actor `a` is ready to process new messages.
+  result = ready(a.i)
+
 proc join*[TIn, TOut](a: PActor[TIn, TOut]) =
   ## joins an actor.
   joinThread(a.t)
@@ -111,17 +117,35 @@ proc createActorPool*[TIn, TOut](a: var TActorPool[TIn, TOut], poolSize = 4) =
   for i in 0 .. < a.actors.len:
     a.actors[i] = spawn(poolWorker[TIn, TOut])
 
-proc join*[TIn, TOut](a: var TActorPool[TIn, TOut]) =
-  ## waits for each actor in the actor pool `a` to finish and frees the
+proc sync*[TIn, TOut](a: var TActorPool[TIn, TOut], polling=50) =
+  ## waits for every actor of `a` to finish with its work. Currently this is
+  ## implemented as polling every `polling` ms. This will change in a later
+  ## version, however.
+  while true:
+    var wait = false
+    for i in 0..high(a.actors):
+      if not a.actors[i].i.ready: 
+        wait = true
+        break
+    if not wait: break
+    sleep(polling)
+
+proc terminate*[TIn, TOut](a: var TActorPool[TIn, TOut]) =
+  ## terminates each actor in the actor pool `a` and frees the
   ## resources attached to `a`.
   var t: TTask[TIn, TOut]
   t.shutdown = true
-  for i in 0 .. < a.actors.len: send(a.actors[i].i, t)
-  for i in 0 .. < a.actors.len: join(a.actors[i])
+  for i in 0.. <a.actors.len: send(a.actors[i].i, t)
+  for i in 0.. <a.actors.len: join(a.actors[i])
   when TOut isnot void:
     close(a.outputs)
   a.actors = nil
 
+proc join*[TIn, TOut](a: var TActorPool[TIn, TOut]) =
+  ## short-cut for `sync` and then `terminate`.
+  sync(a)
+  terminate(a)
+
 template setupTask =
   t.action = action
   shallowCopy(t.data, input)
@@ -135,7 +159,7 @@ template schedule =
       return
   # no thread ready :-( --> send message to the thread which has the least
   # messages pending:
-  var minIdx = 0
+  var minIdx = -1
   var minVal = high(int)
   for i in 0..high(p.actors):
     var curr = p.actors[i].i.peek
@@ -143,10 +167,13 @@ template schedule =
       # ok, is ready now:
       p.actors[i].i.send(t)
       return
-    if curr < minVal:
+    if curr < minVal and curr >= 0:
       minVal = curr
       minIdx = i
-  p.actors[minIdx].i.send(t)
+  if minIdx >= 0:
+    p.actors[minIdx].i.send(t)
+  else:
+    raise newException(EDeadThread, "cannot send message; thread died")
 
 proc spawn*[TIn, TOut](p: var TActorPool[TIn, TOut], input: TIn,
                        action: proc (input: TIn): TOut {.thread.}
diff --git a/lib/system/channels.nim b/lib/system/channels.nim
index fe93d6840..47fa5b2e5 100755
--- a/lib/system/channels.nim
+++ b/lib/system/channels.nim
@@ -222,11 +222,14 @@ proc recv*[TMsg](c: var TChannel[TMsg]): TMsg =
   llRecv(q, addr(result), cast[PNimType](getTypeInfo(result)))

 

 proc peek*[TMsg](c: var TChannel[TMsg]): int =

-  ## returns the current number of messages in the channel `c`.

+  ## returns the current number of messages in the channel `c`. Returns -1

+  ## if the channel has been closed.

   var q = cast[PRawChannel](addr(c))

   if q.mask != ChannelDeadMask:

     lockChannel(q):

       result = q.count

+  else:

+    result = -1

 

 proc open*[TMsg](c: var TChannel[TMsg]) =

   ## opens a channel `c` for inter thread communication.

diff --git a/tests/run/tuserassert.nim b/tests/run/tuserassert.nim
index 958da2fe1..cf12c4e8b 100644
--- a/tests/run/tuserassert.nim
+++ b/tests/run/tuserassert.nim
@@ -5,7 +5,6 @@ discard """
 template myAssert(cond: expr) = 
   when rand(3) < 2:
     let c = cond.astToStr
-    {.warning: "code activated: " & c.}
     if not cond:
       echo c, "ugh"
   
diff --git a/tests/specials.nim b/tests/specials.nim
index 1f3013e90..a1aa1bc5a 100644
--- a/tests/specials.nim
+++ b/tests/specials.nim
@@ -118,6 +118,7 @@ proc runThreadTests(r: var TResults, options: string) =
   
   test "tactors"
   test "threadex"
+  test "trecursive_actor"
   #test "threadring"
   #test "tthreadanalysis"
   #test "tthreadsort"
diff --git a/tests/threads/trecursive_actor.nim b/tests/threads/trecursive_actor.nim
new file mode 100644
index 000000000..e2774704c
--- /dev/null
+++ b/tests/threads/trecursive_actor.nim
@@ -0,0 +1,19 @@
+discard """
+  outputsub: "0"
+"""
+
+import actors
+
+var
+  a: TActorPool[int, void]
+createActorPool(a)
+
+proc task(i: int) {.thread.} =
+  echo i
+  if i != 0: a.spawn (i-1, task)
+
+# count from 9 till 0 and check 0 is somewhere in the output
+a.spawn(9, task)
+a.join()
+
+