diff options
Diffstat (limited to 'lib/pure/concurrency')
-rw-r--r-- | lib/pure/concurrency/threadpool.nim | 54 |
1 files changed, 26 insertions, 28 deletions
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim index 6ec71e912..f3b13fac5 100644 --- a/lib/pure/concurrency/threadpool.nim +++ b/lib/pure/concurrency/threadpool.nim @@ -30,7 +30,7 @@ proc destroySemaphore(cv: var Semaphore) {.inline.} = deinitCond(cv.c) deinitLock(cv.L) -proc await(cv: var Semaphore) = +proc blockUntil(cv: var Semaphore) = acquire(cv.L) while cv.counter <= 0: wait(cv.c, cv.L) @@ -81,7 +81,7 @@ proc closeBarrier(b: ptr Barrier) {.compilerProc.} = fence() b.interest = true fence() - while b.left != b.entered: await(b.cv) + while b.left != b.entered: blockUntil(b.cv) destroySemaphore(b.cv) {.pop.} @@ -89,8 +89,6 @@ proc closeBarrier(b: ptr Barrier) {.compilerProc.} = # ---------------------------------------------------------------------------- type - foreign* = object ## a region that indicates the pointer comes from a - ## foreign thread heap. AwaitInfo = object cv: Semaphore idx: int @@ -99,7 +97,7 @@ type FlowVarBaseObj = object of RootObj ready, usesSemaphore, awaited: bool cv: Semaphore #\ - # for 'awaitAny' support + # for 'blockUntilAny' support ai: ptr AwaitInfo idx: int data: pointer # we incRef and unref it to keep it alive; note this MUST NOT @@ -130,12 +128,12 @@ type q: ToFreeQueue readyForTask: Semaphore -proc await*(fv: FlowVarBase) = +proc blockUntil*(fv: FlowVarBase) = ## waits until the value for the flowVar arrives. Usually it is not necessary ## to call this explicitly. if fv.usesSemaphore and not fv.awaited: fv.awaited = true - await(fv.cv) + blockUntil(fv.cv) destroySemaphore(fv.cv) proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool = @@ -143,7 +141,7 @@ proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool = w.data = data w.f = fn signal(w.taskArrived) - await(w.taskStarted) + blockUntil(w.taskStarted) result = true proc cleanFlowVars(w: ptr Worker) = @@ -178,11 +176,11 @@ proc attach(fv: FlowVarBase; i: int): bool = release(fv.cv.L) proc finished(fv: FlowVarBase) = - doAssert fv.ai.isNil, "flowVar is still attached to an 'awaitAny'" + doAssert fv.ai.isNil, "flowVar is still attached to an 'blockUntilAny'" # we have to protect against the rare cases where the owner of the flowVar # simply disregards the flowVar and yet the "flowVar" has not yet written # anything to it: - await(fv) + blockUntil(fv) if fv.data.isNil: return let owner = cast[ptr Worker](fv.owner) let q = addr(owner.q) @@ -191,7 +189,7 @@ proc finished(fv: FlowVarBase) = #echo "EXHAUSTED!" release(q.lock) wakeupWorkerToProcessQueue(owner) - await(q.empty) + blockUntil(q.empty) acquire(q.lock) q.data[q.len] = cast[pointer](fv.data) inc q.len @@ -222,7 +220,7 @@ proc awaitAndThen*[T](fv: FlowVar[T]; action: proc (x: T) {.closure.}) = ## to ``action``. Note that due to Nim's parameter passing semantics this ## means that ``T`` doesn't need to be copied and so ``awaitAndThen`` can ## sometimes be more efficient than ``^``. - await(fv) + blockUntil(fv) when T is string or T is seq: action(cast[T](fv.data)) elif T is ref: @@ -231,31 +229,31 @@ proc awaitAndThen*[T](fv: FlowVar[T]; action: proc (x: T) {.closure.}) = action(fv.blob) finished(fv) -proc unsafeRead*[T](fv: FlowVar[ref T]): foreign ptr T = +proc unsafeRead*[T](fv: FlowVar[ref T]): ptr T = ## blocks until the value is available and then returns this value. - await(fv) - result = cast[foreign ptr T](fv.data) + blockUntil(fv) + result = cast[ptr T](fv.data) proc `^`*[T](fv: FlowVar[ref T]): ref T = ## blocks until the value is available and then returns this value. - await(fv) + blockUntil(fv) let src = cast[ref T](fv.data) deepCopy result, src proc `^`*[T](fv: FlowVar[T]): T = ## blocks until the value is available and then returns this value. - await(fv) + blockUntil(fv) when T is string or T is seq: # XXX closures? deepCopy? result = cast[T](fv.data) else: result = fv.blob -proc awaitAny*(flowVars: openArray[FlowVarBase]): int = +proc blockUntilAny*(flowVars: openArray[FlowVarBase]): int = ## awaits any of the given flowVars. Returns the index of one flowVar for - ## which a value arrived. A flowVar only supports one call to 'awaitAny' at - ## the same time. That means if you awaitAny([a,b]) and awaitAny([b,c]) the second - ## call will only await 'c'. If there is no flowVar left to be able to wait + ## which a value arrived. A flowVar only supports one call to 'blockUntilAny' at + ## the same time. That means if you blockUntilAny([a,b]) and blockUntilAny([b,c]) the second + ## call will only blockUntil 'c'. If there is no flowVar left to be able to wait ## on, -1 is returned. ## **Note**: This results in non-deterministic behaviour and should be avoided. var ai: AwaitInfo @@ -271,7 +269,7 @@ proc awaitAny*(flowVars: openArray[FlowVarBase]): int = inc conflicts if conflicts < flowVars.len: if result < 0: - await(ai.cv) + blockUntil(ai.cv) result = ai.idx for i in 0 .. flowVars.high: discard cas(addr flowVars[i].ai, addr ai, nil) @@ -328,7 +326,7 @@ proc slave(w: ptr Worker) {.thread.} = w.ready = true readyWorker = w signal(gSomeReady) - await(w.taskArrived) + blockUntil(w.taskArrived) # XXX Somebody needs to look into this (why does this assertion fail # in Visual Studio?) when not defined(vcc) and not defined(tcc): assert(not w.ready) @@ -353,7 +351,7 @@ proc distinguishedSlave(w: ptr Worker) {.thread.} = else: w.ready = true signal(w.readyForTask) - await(w.taskArrived) + blockUntil(w.taskArrived) assert(not w.ready) w.f(w, w.data) if w.q.len != 0: w.cleanFlowVars @@ -501,7 +499,7 @@ proc nimSpawn3(fn: WorkerProc; data: pointer) {.compilerProc.} = # on the current thread instead. var self = addr(workersData[localThreadId-1]) fn(self, data) - await(self.taskStarted) + blockUntil(self.taskStarted) return if isSlave: @@ -526,7 +524,7 @@ proc nimSpawn3(fn: WorkerProc; data: pointer) {.compilerProc.} = inc numSlavesWaiting - await(gSomeReady) + blockUntil(gSomeReady) if isSlave: withLock numSlavesLock: @@ -544,7 +542,7 @@ proc nimSpawn4(fn: WorkerProc; data: pointer; id: ThreadId) {.compilerProc.} = release(distinguishedLock) while true: if selectWorker(addr(distinguishedData[id]), fn, data): break - await(distinguishedData[id].readyForTask) + blockUntil(distinguishedData[id].readyForTask) proc sync*() = @@ -557,7 +555,7 @@ proc sync*() = if not allReady: break allReady = allReady and workersData[i].ready if allReady: break - await(gSomeReady) + blockUntil(gSomeReady) inc toRelease for i in 0 ..< toRelease: |