diff options
author | Zahary Karadjov <zahary@gmail.com> | 2011-12-11 11:26:50 +0200 |
---|---|---|
committer | Zahary Karadjov <zahary@gmail.com> | 2011-12-11 11:26:50 +0200 |
commit | 67bc23bb60dda2895c47ae0747d106b6075c6a90 (patch) | |
tree | 4f4c66fad9ae5c42beefd31274091e295f31b639 /lib | |
parent | d171a8b36f10f42d35e64a7ddefa57376b419908 (diff) | |
parent | af792da0bbee6e9587b8aafafcd8f898f8fe9fd4 (diff) | |
download | Nim-67bc23bb60dda2895c47ae0747d106b6075c6a90.tar.gz |
Merge branch 'master' of github.com:Araq/Nimrod into upstream
Diffstat (limited to 'lib')
-rw-r--r-- | lib/pure/actors.nim | 43 | ||||
-rwxr-xr-x | lib/system.nim | 14 | ||||
-rwxr-xr-x | lib/system/channels.nim | 5 |
3 files changed, 51 insertions, 11 deletions
diff --git a/lib/pure/actors.nim b/lib/pure/actors.nim index 2510bb8cd..c07adfd93 100644 --- a/lib/pure/actors.nim +++ b/lib/pure/actors.nim @@ -11,6 +11,8 @@ ## a channel as its inbox. This module requires the ``--threads:on`` ## command line switch. +from os import sleep + type TTask*[TIn, TOut] = object{.pure, final.} when TOut isnot void: @@ -29,7 +31,7 @@ type proc spawn*[TIn, TOut](action: proc( self: PActor[TIn, TOut]){.thread.}): PActor[TIn, TOut] = ## creates an actor; that is a thread with an inbox. The caller MUST call - ## ``join`` because that also frees the associated resources with the actor. + ## ``join`` because that also frees the actor's associated resources. result = cast[PActor[TIn, TOut]](allocShared0(sizeof(result[]))) open(result.i) createThread(result.t, action, result) @@ -42,6 +44,10 @@ proc running*[TIn, TOut](a: PActor[TIn, TOut]) = ## returns true if the actor `a` is running. result = running(a.t) +proc ready*[TIn, TOut](a: PActor[TIn, TOut]): bool = + ## returns true if the actor `a` is ready to process new messages. + result = ready(a.i) + proc join*[TIn, TOut](a: PActor[TIn, TOut]) = ## joins an actor. joinThread(a.t) @@ -111,17 +117,35 @@ proc createActorPool*[TIn, TOut](a: var TActorPool[TIn, TOut], poolSize = 4) = for i in 0 .. < a.actors.len: a.actors[i] = spawn(poolWorker[TIn, TOut]) -proc join*[TIn, TOut](a: var TActorPool[TIn, TOut]) = - ## waits for each actor in the actor pool `a` to finish and frees the +proc sync*[TIn, TOut](a: var TActorPool[TIn, TOut], polling=50) = + ## waits for every actor of `a` to finish with its work. Currently this is + ## implemented as polling every `polling` ms. This will change in a later + ## version, however. + while true: + var wait = false + for i in 0..high(a.actors): + if not a.actors[i].i.ready: + wait = true + break + if not wait: break + sleep(polling) + +proc terminate*[TIn, TOut](a: var TActorPool[TIn, TOut]) = + ## terminates each actor in the actor pool `a` and frees the ## resources attached to `a`. var t: TTask[TIn, TOut] t.shutdown = true - for i in 0 .. < a.actors.len: send(a.actors[i].i, t) - for i in 0 .. < a.actors.len: join(a.actors[i]) + for i in 0.. <a.actors.len: send(a.actors[i].i, t) + for i in 0.. <a.actors.len: join(a.actors[i]) when TOut isnot void: close(a.outputs) a.actors = nil +proc join*[TIn, TOut](a: var TActorPool[TIn, TOut]) = + ## short-cut for `sync` and then `terminate`. + sync(a) + terminate(a) + template setupTask = t.action = action shallowCopy(t.data, input) @@ -135,7 +159,7 @@ template schedule = return # no thread ready :-( --> send message to the thread which has the least # messages pending: - var minIdx = 0 + var minIdx = -1 var minVal = high(int) for i in 0..high(p.actors): var curr = p.actors[i].i.peek @@ -143,10 +167,13 @@ template schedule = # ok, is ready now: p.actors[i].i.send(t) return - if curr < minVal: + if curr < minVal and curr >= 0: minVal = curr minIdx = i - p.actors[minIdx].i.send(t) + if minIdx >= 0: + p.actors[minIdx].i.send(t) + else: + raise newException(EDeadThread, "cannot send message; thread died") proc spawn*[TIn, TOut](p: var TActorPool[TIn, TOut], input: TIn, action: proc (input: TIn): TOut {.thread.} diff --git a/lib/system.nim b/lib/system.nim index c7e26230a..8a99781cc 100755 --- a/lib/system.nim +++ b/lib/system.nim @@ -2071,14 +2071,24 @@ proc astToStr*[T](x: T): string {.magic: "AstToStr", noSideEffect.} ## converts the AST of `x` into a string representation. This is very useful ## for debugging. +proc raiseAssert(msg: string) {.noinline.} = + raise newException(EAssertionFailed, msg) + template assert*(cond: expr, msg = "") = ## provides a means to implement `programming by contracts`:idx: in Nimrod. ## ``assert`` evaluates expression ``cond`` and if ``cond`` is false, it ## raises an ``EAssertionFailure`` exception. However, the compiler may ## not generate any code at all for ``assert`` if it is advised to do so. ## Use ``assert`` for debugging purposes only. + bind raiseAssert when compileOption("assertions"): if not cond: - raise newException(EAssertionFailed, astToStr(cond) & ' ' & msg) - + raiseAssert(astToStr(cond) & ' ' & msg) + +template doAssert*(cond: expr, msg = "") = + ## same as `assert' but is always turned on and not affected by the + ## ``--assertions`` command line switch. + bind raiseAssert + if not cond: + raiseAssert(astToStr(cond) & ' ' & msg) diff --git a/lib/system/channels.nim b/lib/system/channels.nim index fe93d6840..47fa5b2e5 100755 --- a/lib/system/channels.nim +++ b/lib/system/channels.nim @@ -222,11 +222,14 @@ proc recv*[TMsg](c: var TChannel[TMsg]): TMsg = llRecv(q, addr(result), cast[PNimType](getTypeInfo(result))) proc peek*[TMsg](c: var TChannel[TMsg]): int = - ## returns the current number of messages in the channel `c`. + ## returns the current number of messages in the channel `c`. Returns -1 + ## if the channel has been closed. var q = cast[PRawChannel](addr(c)) if q.mask != ChannelDeadMask: lockChannel(q): result = q.count + else: + result = -1 proc open*[TMsg](c: var TChannel[TMsg]) = ## opens a channel `c` for inter thread communication. |