summary refs log tree commit diff stats
path: root/lib/pure
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pure')
-rw-r--r--lib/pure/actors.nim43
1 files changed, 35 insertions, 8 deletions
diff --git a/lib/pure/actors.nim b/lib/pure/actors.nim
index 2510bb8cd..c07adfd93 100644
--- a/lib/pure/actors.nim
+++ b/lib/pure/actors.nim
@@ -11,6 +11,8 @@
 ## a channel as its inbox. This module requires the ``--threads:on``
 ## command line switch.
 
+from os import sleep
+
 type
   TTask*[TIn, TOut] = object{.pure, final.}
     when TOut isnot void:
@@ -29,7 +31,7 @@ type
 proc spawn*[TIn, TOut](action: proc(
     self: PActor[TIn, TOut]){.thread.}): PActor[TIn, TOut] =
   ## creates an actor; that is a thread with an inbox. The caller MUST call
-  ## ``join`` because that also frees the associated resources with the actor.
+  ## ``join`` because that also frees the actor's associated resources.
   result = cast[PActor[TIn, TOut]](allocShared0(sizeof(result[])))
   open(result.i)
   createThread(result.t, action, result)
@@ -42,6 +44,10 @@ proc running*[TIn, TOut](a: PActor[TIn, TOut]) =
   ## returns true if the actor `a` is running.
   result = running(a.t)
 
+proc ready*[TIn, TOut](a: PActor[TIn, TOut]): bool =
+  ## returns true if the actor `a` is ready to process new messages.
+  result = ready(a.i)
+
 proc join*[TIn, TOut](a: PActor[TIn, TOut]) =
   ## joins an actor.
   joinThread(a.t)
@@ -111,17 +117,35 @@ proc createActorPool*[TIn, TOut](a: var TActorPool[TIn, TOut], poolSize = 4) =
   for i in 0 .. < a.actors.len:
     a.actors[i] = spawn(poolWorker[TIn, TOut])
 
-proc join*[TIn, TOut](a: var TActorPool[TIn, TOut]) =
-  ## waits for each actor in the actor pool `a` to finish and frees the
+proc sync*[TIn, TOut](a: var TActorPool[TIn, TOut], polling=50) =
+  ## waits for every actor of `a` to finish with its work. Currently this is
+  ## implemented as polling every `polling` ms. This will change in a later
+  ## version, however.
+  while true:
+    var wait = false
+    for i in 0..high(a.actors):
+      if not a.actors[i].i.ready: 
+        wait = true
+        break
+    if not wait: break
+    sleep(polling)
+
+proc terminate*[TIn, TOut](a: var TActorPool[TIn, TOut]) =
+  ## terminates each actor in the actor pool `a` and frees the
   ## resources attached to `a`.
   var t: TTask[TIn, TOut]
   t.shutdown = true
-  for i in 0 .. < a.actors.len: send(a.actors[i].i, t)
-  for i in 0 .. < a.actors.len: join(a.actors[i])
+  for i in 0.. <a.actors.len: send(a.actors[i].i, t)
+  for i in 0.. <a.actors.len: join(a.actors[i])
   when TOut isnot void:
     close(a.outputs)
   a.actors = nil
 
+proc join*[TIn, TOut](a: var TActorPool[TIn, TOut]) =
+  ## short-cut for `sync` and then `terminate`.
+  sync(a)
+  terminate(a)
+
 template setupTask =
   t.action = action
   shallowCopy(t.data, input)
@@ -135,7 +159,7 @@ template schedule =
       return
   # no thread ready :-( --> send message to the thread which has the least
   # messages pending:
-  var minIdx = 0
+  var minIdx = -1
   var minVal = high(int)
   for i in 0..high(p.actors):
     var curr = p.actors[i].i.peek
@@ -143,10 +167,13 @@ template schedule =
       # ok, is ready now:
       p.actors[i].i.send(t)
       return
-    if curr < minVal:
+    if curr < minVal and curr >= 0:
       minVal = curr
       minIdx = i
-  p.actors[minIdx].i.send(t)
+  if minIdx >= 0:
+    p.actors[minIdx].i.send(t)
+  else:
+    raise newException(EDeadThread, "cannot send message; thread died")
 
 proc spawn*[TIn, TOut](p: var TActorPool[TIn, TOut], input: TIn,
                        action: proc (input: TIn): TOut {.thread.}