diff options
Diffstat (limited to 'lib/pure')
-rw-r--r-- | lib/pure/actors.nim | 43 |
1 files changed, 35 insertions, 8 deletions
diff --git a/lib/pure/actors.nim b/lib/pure/actors.nim index 2510bb8cd..c07adfd93 100644 --- a/lib/pure/actors.nim +++ b/lib/pure/actors.nim @@ -11,6 +11,8 @@ ## a channel as its inbox. This module requires the ``--threads:on`` ## command line switch. +from os import sleep + type TTask*[TIn, TOut] = object{.pure, final.} when TOut isnot void: @@ -29,7 +31,7 @@ type proc spawn*[TIn, TOut](action: proc( self: PActor[TIn, TOut]){.thread.}): PActor[TIn, TOut] = ## creates an actor; that is a thread with an inbox. The caller MUST call - ## ``join`` because that also frees the associated resources with the actor. + ## ``join`` because that also frees the actor's associated resources. result = cast[PActor[TIn, TOut]](allocShared0(sizeof(result[]))) open(result.i) createThread(result.t, action, result) @@ -42,6 +44,10 @@ proc running*[TIn, TOut](a: PActor[TIn, TOut]) = ## returns true if the actor `a` is running. result = running(a.t) +proc ready*[TIn, TOut](a: PActor[TIn, TOut]): bool = + ## returns true if the actor `a` is ready to process new messages. + result = ready(a.i) + proc join*[TIn, TOut](a: PActor[TIn, TOut]) = ## joins an actor. joinThread(a.t) @@ -111,17 +117,35 @@ proc createActorPool*[TIn, TOut](a: var TActorPool[TIn, TOut], poolSize = 4) = for i in 0 .. < a.actors.len: a.actors[i] = spawn(poolWorker[TIn, TOut]) -proc join*[TIn, TOut](a: var TActorPool[TIn, TOut]) = - ## waits for each actor in the actor pool `a` to finish and frees the +proc sync*[TIn, TOut](a: var TActorPool[TIn, TOut], polling=50) = + ## waits for every actor of `a` to finish with its work. Currently this is + ## implemented as polling every `polling` ms. This will change in a later + ## version, however. + while true: + var wait = false + for i in 0..high(a.actors): + if not a.actors[i].i.ready: + wait = true + break + if not wait: break + sleep(polling) + +proc terminate*[TIn, TOut](a: var TActorPool[TIn, TOut]) = + ## terminates each actor in the actor pool `a` and frees the ## resources attached to `a`. var t: TTask[TIn, TOut] t.shutdown = true - for i in 0 .. < a.actors.len: send(a.actors[i].i, t) - for i in 0 .. < a.actors.len: join(a.actors[i]) + for i in 0.. <a.actors.len: send(a.actors[i].i, t) + for i in 0.. <a.actors.len: join(a.actors[i]) when TOut isnot void: close(a.outputs) a.actors = nil +proc join*[TIn, TOut](a: var TActorPool[TIn, TOut]) = + ## short-cut for `sync` and then `terminate`. + sync(a) + terminate(a) + template setupTask = t.action = action shallowCopy(t.data, input) @@ -135,7 +159,7 @@ template schedule = return # no thread ready :-( --> send message to the thread which has the least # messages pending: - var minIdx = 0 + var minIdx = -1 var minVal = high(int) for i in 0..high(p.actors): var curr = p.actors[i].i.peek @@ -143,10 +167,13 @@ template schedule = # ok, is ready now: p.actors[i].i.send(t) return - if curr < minVal: + if curr < minVal and curr >= 0: minVal = curr minIdx = i - p.actors[minIdx].i.send(t) + if minIdx >= 0: + p.actors[minIdx].i.send(t) + else: + raise newException(EDeadThread, "cannot send message; thread died") proc spawn*[TIn, TOut](p: var TActorPool[TIn, TOut], input: TIn, action: proc (input: TIn): TOut {.thread.} |