summary refs log tree commit diff stats
path: root/lib/pure/actors.nim
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pure/actors.nim')
-rw-r--r--lib/pure/actors.nim234
1 files changed, 0 insertions, 234 deletions
diff --git a/lib/pure/actors.nim b/lib/pure/actors.nim
deleted file mode 100644
index 2d902debe..000000000
--- a/lib/pure/actors.nim
+++ /dev/null
@@ -1,234 +0,0 @@
-#
-#
-#            Nimrod's Runtime Library
-#        (c) Copyright 2012 Andreas Rumpf
-#
-#    See the file "copying.txt", included in this
-#    distribution, for details about the copyright.
-#
-
-## `Actor`:idx: support for Nimrod. An actor is implemented as a thread with
-## a channel as its inbox. This module requires the ``--threads:on``
-## command line switch.
-##
-## Example:
-##
-## .. code-block:: nimrod
-##
-##      var
-##        a: TActorPool[int, void]
-##      createActorPool(a)
-##      for i in 0 .. < 300:
-##        a.spawn(i, proc (x: int) {.thread.} = echo x)
-##      a.join()
-
-from os import sleep
-
-type
-  TTask*[TIn, TOut] = object{.pure, final.} ## a task
-    when TOut isnot void:
-      receiver*: ptr TChannel[TOut] ## the receiver channel of the response
-    action*: proc (x: TIn): TOut {.thread.} ## action to execute;
-                                            ## sometimes useful
-    shutDown*: bool ## set to tell an actor to shut-down
-    data*: TIn ## the data to process
-
-  TActor[TIn, TOut] = object{.pure, final.}
-    i: TChannel[TTask[TIn, TOut]]
-    t: TThread[ptr TActor[TIn, TOut]]
-    
-  PActor*[TIn, TOut] = ptr TActor[TIn, TOut] ## an actor
-  
-proc spawn*[TIn, TOut](action: proc(
-    self: PActor[TIn, TOut]){.thread.}): PActor[TIn, TOut] =
-  ## creates an actor; that is a thread with an inbox. The caller MUST call
-  ## ``join`` because that also frees the actor's associated resources.
-  result = cast[PActor[TIn, TOut]](allocShared0(sizeof(result[])))
-  open(result.i)
-  createThread(result.t, action, result)
-
-proc inbox*[TIn, TOut](self: PActor[TIn, TOut]): ptr TChannel[TIn] =
-  ## gets a pointer to the associated inbox of the actor `self`.
-  result = addr(self.i)
-
-proc running*[TIn, TOut](a: PActor[TIn, TOut]): bool =
-  ## returns true if the actor `a` is running.
-  result = running(a.t)
-
-proc ready*[TIn, TOut](a: PActor[TIn, TOut]): bool =
-  ## returns true if the actor `a` is ready to process new messages.
-  result = ready(a.i)
-
-proc join*[TIn, TOut](a: PActor[TIn, TOut]) =
-  ## joins an actor.
-  joinThread(a.t)
-  close(a.i)
-  deallocShared(a)
-
-proc recv*[TIn, TOut](a: PActor[TIn, TOut]): TTask[TIn, TOut] =
-  ## receives a task from `a`'s inbox.
-  result = recv(a.i)
-
-proc send*[TIn, TOut, X, Y](receiver: PActor[TIn, TOut], msg: TIn,
-                            sender: PActor[X, Y]) =
-  ## sends a message to `a`'s inbox.
-  var t: TTask[TIn, TOut]
-  t.receiver = addr(sender.i)
-  shallowCopy(t.data, msg)
-  send(receiver.i, t)
-
-proc send*[TIn, TOut](receiver: PActor[TIn, TOut], msg: TIn, 
-                      sender: ptr TChannel[TOut] = nil) =
-  ## sends a message to `receiver`'s inbox.
-  var t: TTask[TIn, TOut]
-  t.receiver = sender
-  shallowCopy(t.data, msg)
-  send(receiver.i, t)
-
-proc sendShutdown*[TIn, TOut](receiver: PActor[TIn, TOut]) =
-  ## send a shutdown message to `receiver`.
-  var t: TTask[TIn, TOut]
-  t.shutdown = true
-  send(receiver.i, t)
-
-proc reply*[TIn, TOut](t: TTask[TIn, TOut], m: TOut) =
-  ## sends a message to io's output message box.
-  when TOut is void:
-    {.error: "you cannot reply to a void outbox".}
-  assert t.receiver != nil
-  send(t.receiver[], m)
-
-
-# ----------------- actor pools ----------------------------------------------
-
-type
-  TActorPool*[TIn, TOut] = object{.pure, final.}  ## an actor pool
-    actors: seq[PActor[TIn, TOut]]
-    when TOut isnot void:
-      outputs: TChannel[TOut]
-
-proc `^`*[T](f: ptr TChannel[T]): T =
-  ## alias for 'recv'.
-  result = recv(f[])
-
-proc poolWorker[TIn, TOut](self: PActor[TIn, TOut]) {.thread.} =
-  while true:
-    var m = self.recv
-    if m.shutDown: break
-    when TOut is void:
-      m.action(m.data)
-    else:
-      send(m.receiver[], m.action(m.data))
-      #self.reply()
-
-proc createActorPool*[TIn, TOut](a: var TActorPool[TIn, TOut], poolSize = 4) =
-  ## creates an actor pool.
-  newSeq(a.actors, poolSize)
-  when TOut isnot void:
-    open(a.outputs)
-  for i in 0 .. < a.actors.len:
-    a.actors[i] = spawn(poolWorker[TIn, TOut])
-
-proc sync*[TIn, TOut](a: var TActorPool[TIn, TOut], polling=50) =
-  ## waits for every actor of `a` to finish with its work. Currently this is
-  ## implemented as polling every `polling` ms and has a slight chance 
-  ## of failing since we check for every actor to be in `ready` state and not
-  ## for messages still in ether. This will change in a later
-  ## version, however.
-  var allReadyCount = 0
-  while true:
-    var wait = false
-    for i in 0..high(a.actors):
-      if not a.actors[i].i.ready: 
-        wait = true
-        allReadyCount = 0
-        break
-    if not wait:
-      # it's possible that some actor sent a message to some other actor but
-      # both appeared to be non-working as the message takes some time to
-      # arrive. We assume that this won't take longer than `polling` and
-      # simply attempt a second time and declare victory then. ;-)
-      inc allReadyCount
-      if allReadyCount > 1: break
-    sleep(polling)
-
-proc terminate*[TIn, TOut](a: var TActorPool[TIn, TOut]) =
-  ## terminates each actor in the actor pool `a` and frees the
-  ## resources attached to `a`.
-  var t: TTask[TIn, TOut]
-  t.shutdown = true
-  for i in 0.. <a.actors.len: send(a.actors[i].i, t)
-  for i in 0.. <a.actors.len: join(a.actors[i])
-  when TOut isnot void:
-    close(a.outputs)
-  a.actors = nil
-
-proc join*[TIn, TOut](a: var TActorPool[TIn, TOut]) =
-  ## short-cut for `sync` and then `terminate`.
-  sync(a)
-  terminate(a)
-
-template setupTask =
-  t.action = action
-  shallowCopy(t.data, input)
-
-template schedule =
-  # extremely simple scheduler: We always try the first thread first, so that
-  # it remains 'hot' ;-). Round-robin hurts for keeping threads hot.
-  for i in 0..high(p.actors):
-    if p.actors[i].i.ready:
-      p.actors[i].i.send(t)
-      return
-  # no thread ready :-( --> send message to the thread which has the least
-  # messages pending:
-  var minIdx = -1
-  var minVal = high(int)
-  for i in 0..high(p.actors):
-    var curr = p.actors[i].i.peek
-    if curr == 0:
-      # ok, is ready now:
-      p.actors[i].i.send(t)
-      return
-    if curr < minVal and curr >= 0:
-      minVal = curr
-      minIdx = i
-  if minIdx >= 0:
-    p.actors[minIdx].i.send(t)
-  else:
-    raise newException(EDeadThread, "cannot send message; thread died")
-
-proc spawn*[TIn, TOut](p: var TActorPool[TIn, TOut], input: TIn,
-                       action: proc (input: TIn): TOut {.thread.}
-                       ): ptr TChannel[TOut] =
-  ## uses the actor pool to run ``action(input)`` concurrently.
-  ## `spawn` is guaranteed to not block.
-  var t: TTask[TIn, TOut]
-  setupTask()
-  result = addr(p.outputs)
-  t.receiver = result
-  schedule()
-
-proc spawn*[TIn](p: var TActorPool[TIn, void], input: TIn,
-                 action: proc (input: TIn) {.thread.}) =
-  ## uses the actor pool to run ``action(input)`` concurrently.
-  ## `spawn` is guaranteed to not block.
-  var t: TTask[TIn, void]
-  setupTask()
-  schedule()
-  
-when isMainModule:
-  var
-    a: TActorPool[int, void]
-  createActorPool(a)
-  for i in 0 .. < 300:
-    a.spawn(i, proc (x: int) {.thread.} = echo x)
-
-  when false:
-    proc treeDepth(n: PNode): int {.thread.} =
-      var x = a.spawn(treeDepth, n.le)
-      var y = a.spawn(treeDepth, n.ri)
-      result = max(^x, ^y) + 1
-
-  a.join()
-
-