diff options
author | pdw <algorithicimperative@gmail.com> | 2015-05-24 22:30:19 -0500 |
---|---|---|
committer | Araq <rumpf_a@web.de> | 2015-06-04 13:17:11 +0200 |
commit | 192ba3bbc0ae7f570c1277c9211d24eeea2cf48a (patch) | |
tree | 19f6c13c99fbe289341d6db94ffee914723ff3af /lib/pure/actors.nim | |
parent | 1c426c613c4310b7ecf9358160c07074da73d327 (diff) | |
download | Nim-192ba3bbc0ae7f570c1277c9211d24eeea2cf48a.tar.gz |
lib/pure/a-c - Dropped 'T' from types
Diffstat (limited to 'lib/pure/actors.nim')
-rw-r--r-- | lib/pure/actors.nim | 104 |
1 files changed, 53 insertions, 51 deletions
diff --git a/lib/pure/actors.nim b/lib/pure/actors.nim index 294c24741..da9037285 100644 --- a/lib/pure/actors.nim +++ b/lib/pure/actors.nim @@ -16,7 +16,7 @@ ## .. code-block:: nim ## ## var -## a: TActorPool[int, void] +## a: ActorPool[int, void] ## createActorPool(a) ## for i in 0 .. < 300: ## a.spawn(i, proc (x: int) {.thread.} = echo x) @@ -30,75 +30,76 @@ from os import sleep type - TTask*[TIn, TOut] = object{.pure, final.} ## a task - when TOut isnot void: - receiver*: ptr TChannel[TOut] ## the receiver channel of the response - action*: proc (x: TIn): TOut {.thread.} ## action to execute; + Task*[In, Out] = object{.pure, final.} ## a task + when Out isnot void: + receiver*: ptr Channel[Out] ## the receiver channel of the response + action*: proc (x: In): Out {.thread.} ## action to execute; ## sometimes useful shutDown*: bool ## set to tell an actor to shut-down - data*: TIn ## the data to process + data*: In ## the data to process - TActor[TIn, TOut] = object{.pure, final.} - i: TChannel[TTask[TIn, TOut]] - t: TThread[ptr TActor[TIn, TOut]] + Actor[In, Out] = object{.pure, final.} + i: Channel[Task[In, Out]] + t: TThread[ptr Actor[In, Out]] - PActor*[TIn, TOut] = ptr TActor[TIn, TOut] ## an actor - -proc spawn*[TIn, TOut](action: proc( - self: PActor[TIn, TOut]){.thread.}): PActor[TIn, TOut] = + PActor*[In, Out] = ptr Actor[In, Out] ## an actor +{.deprecated: [TTask: Task, TActor: Actor].} + +proc spawn*[In, Out](action: proc( + self: PActor[In, Out]){.thread.}): PActor[In, Out] = ## creates an actor; that is a thread with an inbox. The caller MUST call ## ``join`` because that also frees the actor's associated resources. - result = cast[PActor[TIn, TOut]](allocShared0(sizeof(result[]))) + result = cast[PActor[In, Out]](allocShared0(sizeof(result[]))) open(result.i) createThread(result.t, action, result) -proc inbox*[TIn, TOut](self: PActor[TIn, TOut]): ptr TChannel[TIn] = +proc inbox*[In, Out](self: PActor[In, Out]): ptr Channel[In] = ## gets a pointer to the associated inbox of the actor `self`. result = addr(self.i) -proc running*[TIn, TOut](a: PActor[TIn, TOut]): bool = +proc running*[In, Out](a: PActor[In, Out]): bool = ## returns true if the actor `a` is running. result = running(a.t) -proc ready*[TIn, TOut](a: PActor[TIn, TOut]): bool = +proc ready*[In, Out](a: PActor[In, Out]): bool = ## returns true if the actor `a` is ready to process new messages. result = ready(a.i) -proc join*[TIn, TOut](a: PActor[TIn, TOut]) = +proc join*[In, Out](a: PActor[In, Out]) = ## joins an actor. joinThread(a.t) close(a.i) deallocShared(a) -proc recv*[TIn, TOut](a: PActor[TIn, TOut]): TTask[TIn, TOut] = +proc recv*[In, Out](a: PActor[In, Out]): Task[In, Out] = ## receives a task from `a`'s inbox. result = recv(a.i) -proc send*[TIn, TOut, X, Y](receiver: PActor[TIn, TOut], msg: TIn, +proc send*[In, Out, X, Y](receiver: PActor[In, Out], msg: In, sender: PActor[X, Y]) = ## sends a message to `a`'s inbox. - var t: TTask[TIn, TOut] + var t: Task[In, Out] t.receiver = addr(sender.i) shallowCopy(t.data, msg) send(receiver.i, t) -proc send*[TIn, TOut](receiver: PActor[TIn, TOut], msg: TIn, - sender: ptr TChannel[TOut] = nil) = +proc send*[In, Out](receiver: PActor[In, Out], msg: In, + sender: ptr Channel[Out] = nil) = ## sends a message to `receiver`'s inbox. - var t: TTask[TIn, TOut] + var t: Task[In, Out] t.receiver = sender shallowCopy(t.data, msg) send(receiver.i, t) -proc sendShutdown*[TIn, TOut](receiver: PActor[TIn, TOut]) = +proc sendShutdown*[In, Out](receiver: PActor[In, Out]) = ## send a shutdown message to `receiver`. - var t: TTask[TIn, TOut] + var t: Task[In, Out] t.shutdown = true send(receiver.i, t) -proc reply*[TIn, TOut](t: TTask[TIn, TOut], m: TOut) = +proc reply*[In, Out](t: Task[In, Out], m: Out) = ## sends a message to io's output message box. - when TOut is void: + when Out is void: {.error: "you cannot reply to a void outbox".} assert t.receiver != nil send(t.receiver[], m) @@ -107,34 +108,35 @@ proc reply*[TIn, TOut](t: TTask[TIn, TOut], m: TOut) = # ----------------- actor pools ---------------------------------------------- type - TActorPool*[TIn, TOut] = object{.pure, final.} ## an actor pool - actors: seq[PActor[TIn, TOut]] - when TOut isnot void: - outputs: TChannel[TOut] + ActorPool*[In, Out] = object{.pure, final.} ## an actor pool + actors: seq[PActor[In, Out]] + when Out isnot void: + outputs: Channel[Out] +{.deprecated: [TActorPool: ActorPool].} -proc `^`*[T](f: ptr TChannel[T]): T = +proc `^`*[T](f: ptr Channel[T]): T = ## alias for 'recv'. result = recv(f[]) -proc poolWorker[TIn, TOut](self: PActor[TIn, TOut]) {.thread.} = +proc poolWorker[In, Out](self: PActor[In, Out]) {.thread.} = while true: var m = self.recv if m.shutDown: break - when TOut is void: + when Out is void: m.action(m.data) else: send(m.receiver[], m.action(m.data)) #self.reply() -proc createActorPool*[TIn, TOut](a: var TActorPool[TIn, TOut], poolSize = 4) = +proc createActorPool*[In, Out](a: var ActorPool[In, Out], poolSize = 4) = ## creates an actor pool. newSeq(a.actors, poolSize) - when TOut isnot void: + when Out isnot void: open(a.outputs) for i in 0 .. < a.actors.len: - a.actors[i] = spawn(poolWorker[TIn, TOut]) + a.actors[i] = spawn(poolWorker[In, Out]) -proc sync*[TIn, TOut](a: var TActorPool[TIn, TOut], polling=50) = +proc sync*[In, Out](a: var ActorPool[In, Out], polling=50) = ## waits for every actor of `a` to finish with its work. Currently this is ## implemented as polling every `polling` ms and has a slight chance ## of failing since we check for every actor to be in `ready` state and not @@ -157,18 +159,18 @@ proc sync*[TIn, TOut](a: var TActorPool[TIn, TOut], polling=50) = if allReadyCount > 1: break sleep(polling) -proc terminate*[TIn, TOut](a: var TActorPool[TIn, TOut]) = +proc terminate*[In, Out](a: var ActorPool[In, Out]) = ## terminates each actor in the actor pool `a` and frees the ## resources attached to `a`. - var t: TTask[TIn, TOut] + var t: Task[In, Out] t.shutdown = true for i in 0.. <a.actors.len: send(a.actors[i].i, t) for i in 0.. <a.actors.len: join(a.actors[i]) - when TOut isnot void: + when Out isnot void: close(a.outputs) a.actors = nil -proc join*[TIn, TOut](a: var TActorPool[TIn, TOut]) = +proc join*[In, Out](a: var ActorPool[In, Out]) = ## short-cut for `sync` and then `terminate`. sync(a) terminate(a) @@ -202,28 +204,28 @@ template schedule = else: raise newException(DeadThreadError, "cannot send message; thread died") -proc spawn*[TIn, TOut](p: var TActorPool[TIn, TOut], input: TIn, - action: proc (input: TIn): TOut {.thread.} - ): ptr TChannel[TOut] = +proc spawn*[In, Out](p: var ActorPool[In, Out], input: In, + action: proc (input: In): Out {.thread.} + ): ptr Channel[Out] = ## uses the actor pool to run ``action(input)`` concurrently. ## `spawn` is guaranteed to not block. - var t: TTask[TIn, TOut] + var t: Task[In, Out] setupTask() result = addr(p.outputs) t.receiver = result schedule() -proc spawn*[TIn](p: var TActorPool[TIn, void], input: TIn, - action: proc (input: TIn) {.thread.}) = +proc spawn*[In](p: var ActorPool[In, void], input: In, + action: proc (input: In) {.thread.}) = ## uses the actor pool to run ``action(input)`` concurrently. ## `spawn` is guaranteed to not block. - var t: TTask[TIn, void] + var t: Task[In, void] setupTask() schedule() when not defined(testing) and isMainModule: var - a: TActorPool[int, void] + a: ActorPool[int, void] createActorPool(a) for i in 0 .. < 300: a.spawn(i, proc (x: int) {.thread.} = echo x) |