summary refs log tree commit diff stats
path: root/lib/pure
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pure')
-rw-r--r--lib/pure/actors.nim184
1 files changed, 184 insertions, 0 deletions
diff --git a/lib/pure/actors.nim b/lib/pure/actors.nim
new file mode 100644
index 000000000..285e3241d
--- /dev/null
+++ b/lib/pure/actors.nim
@@ -0,0 +1,184 @@
+#
+#
+#            Nimrod's Runtime Library
+#        (c) Copyright 2011 Andreas Rumpf
+#
+#    See the file "copying.txt", included in this
+#    distribution, for details about the copyright.
+#
+
+## `Actor`:idx: support for Nimrod. An actor is implemented as a thread with
+## a channel as its inbox. This module requires the ``--threads:on``
+## command line switch.
+
+type
+  TTask*[TIn, TOut] = object{.pure, final.}
+    when TOut isnot void:
+      receiver*: ptr TChannel[TOut] ## the receiver channel of the response
+    action*: proc (x: TIn): TOut {.thread.} ## action to execute;
+                                            ## sometimes useful
+    shutDown*: bool ## set to tell an actor to shut-down
+    data*: TIn ## the data to process
+
+  TActor[TIn, TOut] = object{.pure, final.}
+    i: TChannel[TTask[TIn, TOut]]
+    t: TThread[ptr TActor[TIn, TOut]]
+    
+  PActor*[TIn, TOut] = ptr TActor[TIn, TOut] ## an actor
+  
+proc spawn*[TIn, TOut](action: proc(
+    self: PActor[TIn, TOut]){.thread.}): PActor[TIn, TOut] =
+  ## creates an actor; that is a thread with an inbox. The caller MUST call
+  ## ``join`` because that also frees the associated resources with the actor.
+  result = allocShared0(sizeof(result[]))
+  open(result.i)
+  createThread(result.t, action, result)
+
+proc inbox*[TIn, TOut](self: PActor[TIn, TOut]): ptr TChannel[TIn] =
+  ## gets a pointer to the associated inbox of the actor `self`.
+  result = addr(self.i)
+
+proc running*[TIn, TOut](a: PActor[TIn, TOut]) =
+  ## returns true if the actor `a` is running.
+  result = running(a.t)
+
+proc join*[TIn, TOut](a: PActor[TIn, TOut]) =
+  ## joins an actor.
+  joinThread(a.t)
+  close(a.i)
+  deallocShared(a)
+
+proc recv*[TIn, TOut](a: PActor[TIn, TOut]): TTask[TIn, TOut] =
+  ## receives a task from `a`'s inbox.
+  result = recv(a.i)
+
+proc send*[TIn, TOut, X, Y](sender: PActor[X, Z], 
+                            receiver: PActor[TIn, TOut], msg: TIn) =
+  ## sends a message to `a`'s inbox.
+  var t: TTask[TIn, TOut]
+  t.receiver = addr(sender.i)
+  shallowCopy(t.data, msg)
+  send(receiver.i, t)
+
+proc send*[TIn, TOut](receiver: PActor[TIn, TOut], msg: TIn, 
+                      sender: ptr TChannel[TOut] = nil) =
+  ## sends a message to `receiver`'s inbox.
+  var t: TTask[TIn, TOut]
+  t.receiver = sender
+  shallowCopy(t.data, msg)
+  send(receiver.i, t)
+
+proc sendShutdown*[TIn, TOut](receiver: PActor[TIn, TOut]) =
+  ## send a shutdown message to `receiver`.
+  var t: TTask[TIn, TOut]
+  t.shutdown = true
+  send(receiver.i, t)
+
+proc reply*[TIn, TOut](t: TTask[TIn, TOut], m: TOut) =
+  ## sends a message to io's output message box.
+  when TOut is void:
+    {.error: "you cannot reply to a void outbox".}
+  assert t.receiver != nil
+  send(t.receiver[], m)
+
+
+# ----------------- actor pools ----------------------------------------------
+
+type
+  TActorPool*[TIn, TOut] = object{.pure, final.}  ## an actor pool
+    actors: seq[PActor[TIn, TOut]]
+    when TOut isnot void:
+      outputs: TChannel[TOut]
+
+proc `^`*[T](f: ptr TChannel[T]): T =
+  ## alias for 'recv'.
+  result = recv(f[])
+
+proc poolWorker[TIn, TOut](self: PActor[TIn, TOut]) {.thread.} =
+  while true:
+    var m = self.recv
+    if m.shutDown: break
+    when TOut is void:
+      action(m.data)
+    else:
+      self.repy(action(m.data))
+
+proc createActorPool*[TIn, TOut](a: var TActorPool[TIn, TOut], poolSize = 4) =
+  ## creates an actor pool.
+  newSeq(a.actors, poolSize)
+  when TOut isnot void:
+    open(a.outputs)
+  for i in 0 .. < a.actors.len:
+    a.actors[i] = spawn(poolWorker)
+
+proc join*[TIn, TOut](a: var TActorPool[TIn, TOut]) =
+  ## waits for each actor in the actor pool `a` to finish and frees the
+  ## resources attached to `a`.
+  var t: TTask[TIn, TOut]
+  t.shutdown = true
+  for i in 0 .. < a.actors.len: send(a.actors[i], t)
+  for i in 0 .. < a.actors.len: join(a.actors[i])
+  when TOut isnot void:
+    close(a.outputs)
+  a.actors = nil
+
+template setupTask =
+  var t: TTask[TIn, TOut]
+  t.action = action
+  shallowCopy(t.data, input)
+
+template schedule =
+  # extremely simple scheduler: We always try the first thread first, so that
+  # it remains 'hot' ;-). Round-robin hurts for keeping threads hot.
+  for i in 0..high(a.actors):
+    if a.actors[i].i.ready:
+      a.actors[i].send(t)
+      return
+  # no thread ready :-( --> send message to the thread which has the least
+  # messages pending:
+  var minIdx = 0
+  var minVal = high(int)
+  for i in 0..high(a.actors):
+    var curr = a.actors[i].i.peek
+    if curr == 0:
+      # ok, is ready now:
+      a.actors[i].send(t)
+      return
+    if curr < minVal:
+      minVal = curr
+      minIdx = i
+  a.actors[minIdx].send(t)
+
+proc spawn*[TIn, TOut](p: var TActorPool[TIn, TOut],
+                       action: proc (input: TIn): TOut {.thread.}, 
+                       input: TIn): ptr TChannel[TOut] =
+  ## uses the actor pool to run `action` concurrently. `spawn` is guaranteed
+  ## to not block.
+  setupTask()
+  result = addr(p.outputs)
+  schedule()
+
+proc spawn*[TIn](p: var TActorPool[TIn, void],
+                 action: proc (input: TIn) {.thread.}, 
+                 input: TIn) =
+  ## uses the actor pool to run `action` concurrently. `spawn` is guaranteed
+  ## to not block.
+  setupTask()
+  schedule()
+  
+when isMainModule:
+  var
+    a: TActorPool[int, void]
+  createActorPool(a)
+  for i in 0 .. < 300:
+    a.spawn(proc (x: int) {.thread.} = echo x)
+
+  when false:
+    proc treeDepth(n: PNode): int {.thread.} =
+      var x = a.spawn(treeDepth, n.le)
+      var y = a.spawn(treeDepth, n.ri)
+      result = max(^x, ^y) + 1
+
+  a.join()
+
+