diff options
-rwxr-xr-x | compiler/ecmasgen.nim | 5 | ||||
-rw-r--r-- | lib/pure/actors.nim | 43 | ||||
-rwxr-xr-x | lib/system/channels.nim | 5 | ||||
-rw-r--r-- | tests/run/tuserassert.nim | 1 | ||||
-rw-r--r-- | tests/specials.nim | 1 | ||||
-rw-r--r-- | tests/threads/trecursive_actor.nim | 19 |
6 files changed, 62 insertions, 12 deletions
diff --git a/compiler/ecmasgen.nim b/compiler/ecmasgen.nim index 588abfc93..56749a139 100755 --- a/compiler/ecmasgen.nim +++ b/compiler/ecmasgen.nim @@ -720,7 +720,8 @@ proc generateHeader(p: var TProc, typ: PType): PRope = const nodeKindsNeedNoCopy = {nkCharLit..nkInt64Lit, nkStrLit..nkTripleStrLit, nkFloatLit..nkFloat64Lit, nkCurly, nkPar, nkStringToCString, - nkCStringToString, nkCall, nkCommand, nkHiddenCallConv, nkCallStrLit} + nkCStringToString, nkCall, nkPrefix, nkPostfix, nkInfix, + nkCommand, nkHiddenCallConv, nkCallStrLit} proc needsNoCopy(y: PNode): bool = result = (y.kind in nodeKindsNeedNoCopy) or @@ -1402,7 +1403,7 @@ proc gen(p: var TProc, n: PNode, r: var TCompRes) = else: r.res = toRope(f.ToStrMaxPrecision) of nkBlockExpr: genBlock(p, n, r) of nkIfExpr: genIfExpr(p, n, r) - of nkCall, nkHiddenCallConv, nkCommand, nkCallStrLit: + of nkCallKinds: if (n.sons[0].kind == nkSym) and (n.sons[0].sym.magic != mNone): genMagic(p, n, r) else: diff --git a/lib/pure/actors.nim b/lib/pure/actors.nim index 2510bb8cd..c07adfd93 100644 --- a/lib/pure/actors.nim +++ b/lib/pure/actors.nim @@ -11,6 +11,8 @@ ## a channel as its inbox. This module requires the ``--threads:on`` ## command line switch. +from os import sleep + type TTask*[TIn, TOut] = object{.pure, final.} when TOut isnot void: @@ -29,7 +31,7 @@ type proc spawn*[TIn, TOut](action: proc( self: PActor[TIn, TOut]){.thread.}): PActor[TIn, TOut] = ## creates an actor; that is a thread with an inbox. The caller MUST call - ## ``join`` because that also frees the associated resources with the actor. + ## ``join`` because that also frees the actor's associated resources. result = cast[PActor[TIn, TOut]](allocShared0(sizeof(result[]))) open(result.i) createThread(result.t, action, result) @@ -42,6 +44,10 @@ proc running*[TIn, TOut](a: PActor[TIn, TOut]) = ## returns true if the actor `a` is running. result = running(a.t) +proc ready*[TIn, TOut](a: PActor[TIn, TOut]): bool = + ## returns true if the actor `a` is ready to process new messages. + result = ready(a.i) + proc join*[TIn, TOut](a: PActor[TIn, TOut]) = ## joins an actor. joinThread(a.t) @@ -111,17 +117,35 @@ proc createActorPool*[TIn, TOut](a: var TActorPool[TIn, TOut], poolSize = 4) = for i in 0 .. < a.actors.len: a.actors[i] = spawn(poolWorker[TIn, TOut]) -proc join*[TIn, TOut](a: var TActorPool[TIn, TOut]) = - ## waits for each actor in the actor pool `a` to finish and frees the +proc sync*[TIn, TOut](a: var TActorPool[TIn, TOut], polling=50) = + ## waits for every actor of `a` to finish with its work. Currently this is + ## implemented as polling every `polling` ms. This will change in a later + ## version, however. + while true: + var wait = false + for i in 0..high(a.actors): + if not a.actors[i].i.ready: + wait = true + break + if not wait: break + sleep(polling) + +proc terminate*[TIn, TOut](a: var TActorPool[TIn, TOut]) = + ## terminates each actor in the actor pool `a` and frees the ## resources attached to `a`. var t: TTask[TIn, TOut] t.shutdown = true - for i in 0 .. < a.actors.len: send(a.actors[i].i, t) - for i in 0 .. < a.actors.len: join(a.actors[i]) + for i in 0.. <a.actors.len: send(a.actors[i].i, t) + for i in 0.. <a.actors.len: join(a.actors[i]) when TOut isnot void: close(a.outputs) a.actors = nil +proc join*[TIn, TOut](a: var TActorPool[TIn, TOut]) = + ## short-cut for `sync` and then `terminate`. + sync(a) + terminate(a) + template setupTask = t.action = action shallowCopy(t.data, input) @@ -135,7 +159,7 @@ template schedule = return # no thread ready :-( --> send message to the thread which has the least # messages pending: - var minIdx = 0 + var minIdx = -1 var minVal = high(int) for i in 0..high(p.actors): var curr = p.actors[i].i.peek @@ -143,10 +167,13 @@ template schedule = # ok, is ready now: p.actors[i].i.send(t) return - if curr < minVal: + if curr < minVal and curr >= 0: minVal = curr minIdx = i - p.actors[minIdx].i.send(t) + if minIdx >= 0: + p.actors[minIdx].i.send(t) + else: + raise newException(EDeadThread, "cannot send message; thread died") proc spawn*[TIn, TOut](p: var TActorPool[TIn, TOut], input: TIn, action: proc (input: TIn): TOut {.thread.} diff --git a/lib/system/channels.nim b/lib/system/channels.nim index fe93d6840..47fa5b2e5 100755 --- a/lib/system/channels.nim +++ b/lib/system/channels.nim @@ -222,11 +222,14 @@ proc recv*[TMsg](c: var TChannel[TMsg]): TMsg = llRecv(q, addr(result), cast[PNimType](getTypeInfo(result))) proc peek*[TMsg](c: var TChannel[TMsg]): int = - ## returns the current number of messages in the channel `c`. + ## returns the current number of messages in the channel `c`. Returns -1 + ## if the channel has been closed. var q = cast[PRawChannel](addr(c)) if q.mask != ChannelDeadMask: lockChannel(q): result = q.count + else: + result = -1 proc open*[TMsg](c: var TChannel[TMsg]) = ## opens a channel `c` for inter thread communication. diff --git a/tests/run/tuserassert.nim b/tests/run/tuserassert.nim index 958da2fe1..cf12c4e8b 100644 --- a/tests/run/tuserassert.nim +++ b/tests/run/tuserassert.nim @@ -5,7 +5,6 @@ discard """ template myAssert(cond: expr) = when rand(3) < 2: let c = cond.astToStr - {.warning: "code activated: " & c.} if not cond: echo c, "ugh" diff --git a/tests/specials.nim b/tests/specials.nim index 1f3013e90..a1aa1bc5a 100644 --- a/tests/specials.nim +++ b/tests/specials.nim @@ -118,6 +118,7 @@ proc runThreadTests(r: var TResults, options: string) = test "tactors" test "threadex" + test "trecursive_actor" #test "threadring" #test "tthreadanalysis" #test "tthreadsort" diff --git a/tests/threads/trecursive_actor.nim b/tests/threads/trecursive_actor.nim new file mode 100644 index 000000000..e2774704c --- /dev/null +++ b/tests/threads/trecursive_actor.nim @@ -0,0 +1,19 @@ +discard """ + outputsub: "0" +""" + +import actors + +var + a: TActorPool[int, void] +createActorPool(a) + +proc task(i: int) {.thread.} = + echo i + if i != 0: a.spawn (i-1, task) + +# count from 9 till 0 and check 0 is somewhere in the output +a.spawn(9, task) +a.join() + + |