summary refs log tree commit diff stats
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/pure/actors.nim43
-rwxr-xr-xlib/system.nim14
-rwxr-xr-xlib/system/channels.nim5
3 files changed, 51 insertions, 11 deletions
diff --git a/lib/pure/actors.nim b/lib/pure/actors.nim
index 2510bb8cd..c07adfd93 100644
--- a/lib/pure/actors.nim
+++ b/lib/pure/actors.nim
@@ -11,6 +11,8 @@
 ## a channel as its inbox. This module requires the ``--threads:on``
 ## command line switch.
 
+from os import sleep
+
 type
   TTask*[TIn, TOut] = object{.pure, final.}
     when TOut isnot void:
@@ -29,7 +31,7 @@ type
 proc spawn*[TIn, TOut](action: proc(
     self: PActor[TIn, TOut]){.thread.}): PActor[TIn, TOut] =
   ## creates an actor; that is a thread with an inbox. The caller MUST call
-  ## ``join`` because that also frees the associated resources with the actor.
+  ## ``join`` because that also frees the actor's associated resources.
   result = cast[PActor[TIn, TOut]](allocShared0(sizeof(result[])))
   open(result.i)
   createThread(result.t, action, result)
@@ -42,6 +44,10 @@ proc running*[TIn, TOut](a: PActor[TIn, TOut]) =
   ## returns true if the actor `a` is running.
   result = running(a.t)
 
+proc ready*[TIn, TOut](a: PActor[TIn, TOut]): bool =
+  ## returns true if the actor `a` is ready to process new messages.
+  result = ready(a.i)
+
 proc join*[TIn, TOut](a: PActor[TIn, TOut]) =
   ## joins an actor.
   joinThread(a.t)
@@ -111,17 +117,35 @@ proc createActorPool*[TIn, TOut](a: var TActorPool[TIn, TOut], poolSize = 4) =
   for i in 0 .. < a.actors.len:
     a.actors[i] = spawn(poolWorker[TIn, TOut])
 
-proc join*[TIn, TOut](a: var TActorPool[TIn, TOut]) =
-  ## waits for each actor in the actor pool `a` to finish and frees the
+proc sync*[TIn, TOut](a: var TActorPool[TIn, TOut], polling=50) =
+  ## waits for every actor of `a` to finish with its work. Currently this is
+  ## implemented as polling every `polling` ms. This will change in a later
+  ## version, however.
+  while true:
+    var wait = false
+    for i in 0..high(a.actors):
+      if not a.actors[i].i.ready: 
+        wait = true
+        break
+    if not wait: break
+    sleep(polling)
+
+proc terminate*[TIn, TOut](a: var TActorPool[TIn, TOut]) =
+  ## terminates each actor in the actor pool `a` and frees the
   ## resources attached to `a`.
   var t: TTask[TIn, TOut]
   t.shutdown = true
-  for i in 0 .. < a.actors.len: send(a.actors[i].i, t)
-  for i in 0 .. < a.actors.len: join(a.actors[i])
+  for i in 0.. <a.actors.len: send(a.actors[i].i, t)
+  for i in 0.. <a.actors.len: join(a.actors[i])
   when TOut isnot void:
     close(a.outputs)
   a.actors = nil
 
+proc join*[TIn, TOut](a: var TActorPool[TIn, TOut]) =
+  ## short-cut for `sync` and then `terminate`.
+  sync(a)
+  terminate(a)
+
 template setupTask =
   t.action = action
   shallowCopy(t.data, input)
@@ -135,7 +159,7 @@ template schedule =
       return
   # no thread ready :-( --> send message to the thread which has the least
   # messages pending:
-  var minIdx = 0
+  var minIdx = -1
   var minVal = high(int)
   for i in 0..high(p.actors):
     var curr = p.actors[i].i.peek
@@ -143,10 +167,13 @@ template schedule =
       # ok, is ready now:
       p.actors[i].i.send(t)
       return
-    if curr < minVal:
+    if curr < minVal and curr >= 0:
       minVal = curr
       minIdx = i
-  p.actors[minIdx].i.send(t)
+  if minIdx >= 0:
+    p.actors[minIdx].i.send(t)
+  else:
+    raise newException(EDeadThread, "cannot send message; thread died")
 
 proc spawn*[TIn, TOut](p: var TActorPool[TIn, TOut], input: TIn,
                        action: proc (input: TIn): TOut {.thread.}
diff --git a/lib/system.nim b/lib/system.nim
index c7e26230a..8a99781cc 100755
--- a/lib/system.nim
+++ b/lib/system.nim
@@ -2071,14 +2071,24 @@ proc astToStr*[T](x: T): string {.magic: "AstToStr", noSideEffect.}
   ## converts the AST of `x` into a string representation. This is very useful
   ## for debugging.
   
+proc raiseAssert(msg: string) {.noinline.} =
+  raise newException(EAssertionFailed, msg)
+  
 template assert*(cond: expr, msg = "") =
   ## provides a means to implement `programming by contracts`:idx: in Nimrod.
   ## ``assert`` evaluates expression ``cond`` and if ``cond`` is false, it
   ## raises an ``EAssertionFailure`` exception. However, the compiler may
   ## not generate any code at all for ``assert`` if it is advised to do so.
   ## Use ``assert`` for debugging purposes only.
+  bind raiseAssert
   when compileOption("assertions"):
     if not cond:
-      raise newException(EAssertionFailed, astToStr(cond) & ' ' & msg)
-
+      raiseAssert(astToStr(cond) & ' ' & msg)
+
+template doAssert*(cond: expr, msg = "") =
+  ## same as `assert' but is always turned on and not affected by the
+  ## ``--assertions`` command line switch.
+  bind raiseAssert
+  if not cond:
+    raiseAssert(astToStr(cond) & ' ' & msg)
 
diff --git a/lib/system/channels.nim b/lib/system/channels.nim
index fe93d6840..47fa5b2e5 100755
--- a/lib/system/channels.nim
+++ b/lib/system/channels.nim
@@ -222,11 +222,14 @@ proc recv*[TMsg](c: var TChannel[TMsg]): TMsg =
   llRecv(q, addr(result), cast[PNimType](getTypeInfo(result)))

 

 proc peek*[TMsg](c: var TChannel[TMsg]): int =

-  ## returns the current number of messages in the channel `c`.

+  ## returns the current number of messages in the channel `c`. Returns -1

+  ## if the channel has been closed.

   var q = cast[PRawChannel](addr(c))

   if q.mask != ChannelDeadMask:

     lockChannel(q):

       result = q.count

+  else:

+    result = -1

 

 proc open*[TMsg](c: var TChannel[TMsg]) =

   ## opens a channel `c` for inter thread communication.

d2cd738a59aa07eea7cb69e944c7db9eed6a86'>^
8d3e0c7b9 ^

0a2f711b9 ^

89a21e4ec ^
a9bd78d57 ^
0a2f711b9 ^
13b3ea71d ^
0a2f711b9 ^


db8a62d48 ^

0a2f711b9 ^
8d3e0c7b9 ^
13b3ea71d ^
8d3e0c7b9 ^
0a2f711b9 ^
a9bd78d57 ^
0a2f711b9 ^
13b3ea71d ^
0a2f711b9 ^
8d3e0c7b9 ^

0a2f711b9 ^
9ac0cbdd5 ^
8d3e0c7b9 ^
db8a62d48 ^
8d3e0c7b9 ^

89a21e4ec ^



0a2f711b9 ^
8d3e0c7b9 ^

db8a62d48 ^
8d3e0c7b9 ^

db8a62d48 ^
8d3e0c7b9 ^


0a2f711b9 ^
89a21e4ec ^

13b3ea71d ^
dd6b0f81e ^
13b3ea71d ^
8d3e0c7b9 ^
0a2f711b9 ^

89a21e4ec ^
db8a62d48 ^
0a2f711b9 ^
db8a62d48 ^
0a2f711b9 ^
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99