diff options
author | Araq <rumpf_a@web.de> | 2014-06-06 07:56:47 +0200 |
---|---|---|
committer | Araq <rumpf_a@web.de> | 2014-06-06 07:56:47 +0200 |
commit | 59c18eb7438dfa555bad1e94a9051a61edccb2fc (patch) | |
tree | 875f3b1a64bae22af0dc00baf82225c4bd603f9c /lib/pure/concurrency | |
parent | b7cbb08f99b39f42513849b88ffa454b6c4ad167 (diff) | |
download | Nim-59c18eb7438dfa555bad1e94a9051a61edccb2fc.tar.gz |
big rename: Promise -> FlowVar
Diffstat (limited to 'lib/pure/concurrency')
-rw-r--r-- | lib/pure/concurrency/threadpool.nim | 124 |
1 files changed, 62 insertions, 62 deletions
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim index c4ed42c05..c34b91e30 100644 --- a/lib/pure/concurrency/threadpool.nim +++ b/lib/pure/concurrency/threadpool.nim @@ -90,8 +90,8 @@ type cv: CondVar idx: int - RawPromise* = ref RawPromiseObj ## untyped base class for 'Promise[T]' - RawPromiseObj = object of TObject + RawFlowVar* = ref RawFlowVarObj ## untyped base class for 'FlowVar[T]' + RawFlowVarObj = object of TObject ready, usesCondVar: bool cv: CondVar #\ # for 'awaitAny' support @@ -100,10 +100,10 @@ type data: pointer # we incRef and unref it to keep it alive owner: pointer # ptr Worker - PromiseObj[T] = object of RawPromiseObj + FlowVarObj[T] = object of RawFlowVarObj blob: T - Promise*{.compilerProc.}[T] = ref PromiseObj[T] + FlowVar*{.compilerProc.}[T] = ref FlowVarObj[T] ## a data flow variable ToFreeQueue = object len: int @@ -123,28 +123,28 @@ type shutdown: bool # the pool requests to shut down this worker thread q: ToFreeQueue -proc await*(prom: RawPromise) = - ## waits until the value for the promise arrives. Usually it is not necessary +proc await*(fv: RawFlowVar) = + ## waits until the value for the flowVar arrives. Usually it is not necessary ## to call this explicitly. - if prom.usesCondVar: - prom.usesCondVar = false - await(prom.cv) - destroyCondVar(prom.cv) - -proc finished(prom: RawPromise) = - doAssert prom.ai.isNil, "promise is still attached to an 'awaitAny'" - # we have to protect against the rare cases where the owner of the promise - # simply disregards the promise and yet the "promiser" has not yet written + if fv.usesCondVar: + fv.usesCondVar = false + await(fv.cv) + destroyCondVar(fv.cv) + +proc finished(fv: RawFlowVar) = + doAssert fv.ai.isNil, "flowVar is still attached to an 'awaitAny'" + # we have to protect against the rare cases where the owner of the flowVar + # simply disregards the flowVar and yet the "flowVarr" has not yet written # anything to it: - await(prom) - if prom.data.isNil: return - let owner = cast[ptr Worker](prom.owner) + await(fv) + if fv.data.isNil: return + let owner = cast[ptr Worker](fv.owner) let q = addr(owner.q) var waited = false while true: acquire(q.lock) if q.len < q.data.len: - q.data[q.len] = prom.data + q.data[q.len] = fv.data inc q.len release(q.lock) break @@ -153,11 +153,11 @@ proc finished(prom: RawPromise) = release(q.lock) wait(q.empty, q.lock) waited = true - prom.data = nil + fv.data = nil # wakeup other potentially waiting threads: if waited: signal(q.empty) -proc cleanPromises(w: ptr Worker) = +proc cleanFlowVars(w: ptr Worker) = let q = addr(w.q) acquire(q.lock) for i in 0 .. <q.len: @@ -166,72 +166,72 @@ proc cleanPromises(w: ptr Worker) = release(q.lock) signal(q.empty) -proc promFinalizer[T](prom: Promise[T]) = finished(prom) +proc fvFinalizer[T](fv: FlowVar[T]) = finished(fv) -proc nimCreatePromise[T](): Promise[T] {.compilerProc.} = - new(result, promFinalizer) +proc nimCreateFlowVar[T](): FlowVar[T] {.compilerProc.} = + new(result, fvFinalizer) -proc nimPromiseCreateCondVar(prom: RawPromise) {.compilerProc.} = - prom.cv = createCondVar() - prom.usesCondVar = true +proc nimFlowVarCreateCondVar(fv: RawFlowVar) {.compilerProc.} = + fv.cv = createCondVar() + fv.usesCondVar = true -proc nimPromiseSignal(prom: RawPromise) {.compilerProc.} = - if prom.ai != nil: - acquire(prom.ai.cv.L) - prom.ai.idx = prom.idx - inc prom.ai.cv.counter - release(prom.ai.cv.L) - signal(prom.ai.cv.c) - if prom.usesCondVar: signal(prom.cv) +proc nimFlowVarSignal(fv: RawFlowVar) {.compilerProc.} = + if fv.ai != nil: + acquire(fv.ai.cv.L) + fv.ai.idx = fv.idx + inc fv.ai.cv.counter + release(fv.ai.cv.L) + signal(fv.ai.cv.c) + if fv.usesCondVar: signal(fv.cv) -proc awaitAndThen*[T](prom: Promise[T]; action: proc (x: T) {.closure.}) = - ## blocks until the ``prom`` is available and then passes its value +proc awaitAndThen*[T](fv: FlowVar[T]; action: proc (x: T) {.closure.}) = + ## blocks until the ``fv`` is available and then passes its value ## to ``action``. Note that due to Nimrod's parameter passing semantics this ## means that ``T`` doesn't need to be copied and so ``awaitAndThen`` can ## sometimes be more efficient than ``^``. - await(prom) + await(fv) when T is string or T is seq: - action(cast[T](prom.data)) + action(cast[T](fv.data)) elif T is ref: - {.error: "'awaitAndThen' not available for Promise[ref]".} + {.error: "'awaitAndThen' not available for FlowVar[ref]".} else: - action(prom.blob) - finished(prom) + action(fv.blob) + finished(fv) -proc `^`*[T](prom: Promise[ref T]): foreign ptr T = +proc `^`*[T](fv: FlowVar[ref T]): foreign ptr T = ## blocks until the value is available and then returns this value. - await(prom) - result = cast[foreign ptr T](prom.data) + await(fv) + result = cast[foreign ptr T](fv.data) -proc `^`*[T](prom: Promise[T]): T = +proc `^`*[T](fv: FlowVar[T]): T = ## blocks until the value is available and then returns this value. - await(prom) + await(fv) when T is string or T is seq: - result = cast[T](prom.data) + result = cast[T](fv.data) else: - result = prom.blob + result = fv.blob -proc awaitAny*(promises: openArray[RawPromise]): int = - ## awaits any of the given promises. Returns the index of one promise for - ## which a value arrived. A promise only supports one call to 'awaitAny' at +proc awaitAny*(flowVars: openArray[RawFlowVar]): int = + ## awaits any of the given flowVars. Returns the index of one flowVar for + ## which a value arrived. A flowVar only supports one call to 'awaitAny' at ## the same time. That means if you await([a,b]) and await([b,c]) the second - ## call will only await 'c'. If there is no promise left to be able to wait + ## call will only await 'c'. If there is no flowVar left to be able to wait ## on, -1 is returned. ## **Note**: This results in non-deterministic behaviour and so should be ## avoided. var ai: AwaitInfo ai.cv = createCondVar() var conflicts = 0 - for i in 0 .. promises.high: - if cas(addr promises[i].ai, nil, addr ai): - promises[i].idx = i + for i in 0 .. flowVars.high: + if cas(addr flowVars[i].ai, nil, addr ai): + flowVars[i].idx = i else: inc conflicts - if conflicts < promises.len: + if conflicts < flowVars.len: await(ai.cv) result = ai.idx - for i in 0 .. promises.high: - discard cas(addr promises[i].ai, addr ai, nil) + for i in 0 .. flowVars.high: + discard cas(addr flowVars[i].ai, addr ai, nil) else: result = -1 destroyCondVar(ai.cv) @@ -259,7 +259,7 @@ proc slave(w: ptr Worker) {.thread.} = await(w.taskArrived) assert(not w.ready) w.f(w, w.data) - if w.q.len != 0: w.cleanPromises + if w.q.len != 0: w.cleanFlowVars if w.shutdown: w.shutdown = false atomicDec currentPoolSize @@ -300,7 +300,7 @@ proc spawn*(call: expr): expr {.magic: "Spawn".} ## always spawns a new task, so that the 'call' is never executed on ## the calling thread. 'call' has to be proc call 'p(...)' where 'p' ## is gcsafe and has a return type that is either 'void' or compatible - ## with ``Promise[T]``. + ## with ``FlowVar[T]``. template spawnX*(call: expr): expr = ## spawns a new task if a CPU core is ready, otherwise executes the @@ -308,7 +308,7 @@ template spawnX*(call: expr): expr = ## use 'spawn' in order to not block the producer for an unknown ## amount of time. 'call' has to be proc call 'p(...)' where 'p' ## is gcsafe and has a return type that is either 'void' or compatible - ## with ``Promise[T]``. + ## with ``FlowVar[T]``. (if preferSpawn(): spawn call else: call) proc parallel*(body: stmt) {.magic: "Parallel".} |