summary refs log tree commit diff stats
path: root/lib/pure/actors.nim
diff options
context:
space:
mode:
authorpdw <algorithicimperative@gmail.com>2015-05-24 22:30:19 -0500
committerAraq <rumpf_a@web.de>2015-06-04 13:17:11 +0200
commit192ba3bbc0ae7f570c1277c9211d24eeea2cf48a (patch)
tree19f6c13c99fbe289341d6db94ffee914723ff3af /lib/pure/actors.nim
parent1c426c613c4310b7ecf9358160c07074da73d327 (diff)
downloadNim-192ba3bbc0ae7f570c1277c9211d24eeea2cf48a.tar.gz
lib/pure/a-c - Dropped 'T' from types
Diffstat (limited to 'lib/pure/actors.nim')
-rw-r--r--lib/pure/actors.nim104
1 files changed, 53 insertions, 51 deletions
diff --git a/lib/pure/actors.nim b/lib/pure/actors.nim
index 294c24741..da9037285 100644
--- a/lib/pure/actors.nim
+++ b/lib/pure/actors.nim
@@ -16,7 +16,7 @@
 ## .. code-block:: nim
 ##
 ##      var
-##        a: TActorPool[int, void]
+##        a: ActorPool[int, void]
 ##      createActorPool(a)
 ##      for i in 0 .. < 300:
 ##        a.spawn(i, proc (x: int) {.thread.} = echo x)
@@ -30,75 +30,76 @@
 from os import sleep
 
 type
-  TTask*[TIn, TOut] = object{.pure, final.} ## a task
-    when TOut isnot void:
-      receiver*: ptr TChannel[TOut] ## the receiver channel of the response
-    action*: proc (x: TIn): TOut {.thread.} ## action to execute;
+  Task*[In, Out] = object{.pure, final.} ## a task
+    when Out isnot void:
+      receiver*: ptr Channel[Out] ## the receiver channel of the response
+    action*: proc (x: In): Out {.thread.} ## action to execute;
                                             ## sometimes useful
     shutDown*: bool ## set to tell an actor to shut-down
-    data*: TIn ## the data to process
+    data*: In ## the data to process
 
-  TActor[TIn, TOut] = object{.pure, final.}
-    i: TChannel[TTask[TIn, TOut]]
-    t: TThread[ptr TActor[TIn, TOut]]
+  Actor[In, Out] = object{.pure, final.}
+    i: Channel[Task[In, Out]]
+    t: TThread[ptr Actor[In, Out]]
     
-  PActor*[TIn, TOut] = ptr TActor[TIn, TOut] ## an actor
-  
-proc spawn*[TIn, TOut](action: proc(
-    self: PActor[TIn, TOut]){.thread.}): PActor[TIn, TOut] =
+  PActor*[In, Out] = ptr Actor[In, Out] ## an actor
+{.deprecated: [TTask: Task, TActor: Actor].}
+
+proc spawn*[In, Out](action: proc(
+    self: PActor[In, Out]){.thread.}): PActor[In, Out] =
   ## creates an actor; that is a thread with an inbox. The caller MUST call
   ## ``join`` because that also frees the actor's associated resources.
-  result = cast[PActor[TIn, TOut]](allocShared0(sizeof(result[])))
+  result = cast[PActor[In, Out]](allocShared0(sizeof(result[])))
   open(result.i)
   createThread(result.t, action, result)
 
-proc inbox*[TIn, TOut](self: PActor[TIn, TOut]): ptr TChannel[TIn] =
+proc inbox*[In, Out](self: PActor[In, Out]): ptr Channel[In] =
   ## gets a pointer to the associated inbox of the actor `self`.
   result = addr(self.i)
 
-proc running*[TIn, TOut](a: PActor[TIn, TOut]): bool =
+proc running*[In, Out](a: PActor[In, Out]): bool =
   ## returns true if the actor `a` is running.
   result = running(a.t)
 
-proc ready*[TIn, TOut](a: PActor[TIn, TOut]): bool =
+proc ready*[In, Out](a: PActor[In, Out]): bool =
   ## returns true if the actor `a` is ready to process new messages.
   result = ready(a.i)
 
-proc join*[TIn, TOut](a: PActor[TIn, TOut]) =
+proc join*[In, Out](a: PActor[In, Out]) =
   ## joins an actor.
   joinThread(a.t)
   close(a.i)
   deallocShared(a)
 
-proc recv*[TIn, TOut](a: PActor[TIn, TOut]): TTask[TIn, TOut] =
+proc recv*[In, Out](a: PActor[In, Out]): Task[In, Out] =
   ## receives a task from `a`'s inbox.
   result = recv(a.i)
 
-proc send*[TIn, TOut, X, Y](receiver: PActor[TIn, TOut], msg: TIn,
+proc send*[In, Out, X, Y](receiver: PActor[In, Out], msg: In,
                             sender: PActor[X, Y]) =
   ## sends a message to `a`'s inbox.
-  var t: TTask[TIn, TOut]
+  var t: Task[In, Out]
   t.receiver = addr(sender.i)
   shallowCopy(t.data, msg)
   send(receiver.i, t)
 
-proc send*[TIn, TOut](receiver: PActor[TIn, TOut], msg: TIn, 
-                      sender: ptr TChannel[TOut] = nil) =
+proc send*[In, Out](receiver: PActor[In, Out], msg: In, 
+                      sender: ptr Channel[Out] = nil) =
   ## sends a message to `receiver`'s inbox.
-  var t: TTask[TIn, TOut]
+  var t: Task[In, Out]
   t.receiver = sender
   shallowCopy(t.data, msg)
   send(receiver.i, t)
 
-proc sendShutdown*[TIn, TOut](receiver: PActor[TIn, TOut]) =
+proc sendShutdown*[In, Out](receiver: PActor[In, Out]) =
   ## send a shutdown message to `receiver`.
-  var t: TTask[TIn, TOut]
+  var t: Task[In, Out]
   t.shutdown = true
   send(receiver.i, t)
 
-proc reply*[TIn, TOut](t: TTask[TIn, TOut], m: TOut) =
+proc reply*[In, Out](t: Task[In, Out], m: Out) =
   ## sends a message to io's output message box.
-  when TOut is void:
+  when Out is void:
     {.error: "you cannot reply to a void outbox".}
   assert t.receiver != nil
   send(t.receiver[], m)
@@ -107,34 +108,35 @@ proc reply*[TIn, TOut](t: TTask[TIn, TOut], m: TOut) =
 # ----------------- actor pools ----------------------------------------------
 
 type
-  TActorPool*[TIn, TOut] = object{.pure, final.}  ## an actor pool
-    actors: seq[PActor[TIn, TOut]]
-    when TOut isnot void:
-      outputs: TChannel[TOut]
+  ActorPool*[In, Out] = object{.pure, final.}  ## an actor pool
+    actors: seq[PActor[In, Out]]
+    when Out isnot void:
+      outputs: Channel[Out]
+{.deprecated: [TActorPool: ActorPool].}
 
-proc `^`*[T](f: ptr TChannel[T]): T =
+proc `^`*[T](f: ptr Channel[T]): T =
   ## alias for 'recv'.
   result = recv(f[])
 
-proc poolWorker[TIn, TOut](self: PActor[TIn, TOut]) {.thread.} =
+proc poolWorker[In, Out](self: PActor[In, Out]) {.thread.} =
   while true:
     var m = self.recv
     if m.shutDown: break
-    when TOut is void:
+    when Out is void:
       m.action(m.data)
     else:
       send(m.receiver[], m.action(m.data))
       #self.reply()
 
-proc createActorPool*[TIn, TOut](a: var TActorPool[TIn, TOut], poolSize = 4) =
+proc createActorPool*[In, Out](a: var ActorPool[In, Out], poolSize = 4) =
   ## creates an actor pool.
   newSeq(a.actors, poolSize)
-  when TOut isnot void:
+  when Out isnot void:
     open(a.outputs)
   for i in 0 .. < a.actors.len:
-    a.actors[i] = spawn(poolWorker[TIn, TOut])
+    a.actors[i] = spawn(poolWorker[In, Out])
 
-proc sync*[TIn, TOut](a: var TActorPool[TIn, TOut], polling=50) =
+proc sync*[In, Out](a: var ActorPool[In, Out], polling=50) =
   ## waits for every actor of `a` to finish with its work. Currently this is
   ## implemented as polling every `polling` ms and has a slight chance 
   ## of failing since we check for every actor to be in `ready` state and not
@@ -157,18 +159,18 @@ proc sync*[TIn, TOut](a: var TActorPool[TIn, TOut], polling=50) =
       if allReadyCount > 1: break
     sleep(polling)
 
-proc terminate*[TIn, TOut](a: var TActorPool[TIn, TOut]) =
+proc terminate*[In, Out](a: var ActorPool[In, Out]) =
   ## terminates each actor in the actor pool `a` and frees the
   ## resources attached to `a`.
-  var t: TTask[TIn, TOut]
+  var t: Task[In, Out]
   t.shutdown = true
   for i in 0.. <a.actors.len: send(a.actors[i].i, t)
   for i in 0.. <a.actors.len: join(a.actors[i])
-  when TOut isnot void:
+  when Out isnot void:
     close(a.outputs)
   a.actors = nil
 
-proc join*[TIn, TOut](a: var TActorPool[TIn, TOut]) =
+proc join*[In, Out](a: var ActorPool[In, Out]) =
   ## short-cut for `sync` and then `terminate`.
   sync(a)
   terminate(a)
@@ -202,28 +204,28 @@ template schedule =
   else:
     raise newException(DeadThreadError, "cannot send message; thread died")
 
-proc spawn*[TIn, TOut](p: var TActorPool[TIn, TOut], input: TIn,
-                       action: proc (input: TIn): TOut {.thread.}
-                       ): ptr TChannel[TOut] =
+proc spawn*[In, Out](p: var ActorPool[In, Out], input: In,
+                       action: proc (input: In): Out {.thread.}
+                       ): ptr Channel[Out] =
   ## uses the actor pool to run ``action(input)`` concurrently.
   ## `spawn` is guaranteed to not block.
-  var t: TTask[TIn, TOut]
+  var t: Task[In, Out]
   setupTask()
   result = addr(p.outputs)
   t.receiver = result
   schedule()
 
-proc spawn*[TIn](p: var TActorPool[TIn, void], input: TIn,
-                 action: proc (input: TIn) {.thread.}) =
+proc spawn*[In](p: var ActorPool[In, void], input: In,
+                 action: proc (input: In) {.thread.}) =
   ## uses the actor pool to run ``action(input)`` concurrently.
   ## `spawn` is guaranteed to not block.
-  var t: TTask[TIn, void]
+  var t: Task[In, void]
   setupTask()
   schedule()
   
 when not defined(testing) and isMainModule:
   var
-    a: TActorPool[int, void]
+    a: ActorPool[int, void]
   createActorPool(a)
   for i in 0 .. < 300:
     a.spawn(i, proc (x: int) {.thread.} = echo x)