diff options
author | Araq <rumpf_a@web.de> | 2014-11-07 00:27:31 +0100 |
---|---|---|
committer | Araq <rumpf_a@web.de> | 2014-11-07 00:27:31 +0100 |
commit | b5586264a0e0515f88f7ad4944bb987930fbd7b9 (patch) | |
tree | aed2e14820214783049e9a4366f06e975b91222b /lib/pure | |
parent | 9500dfcc2e7b56a626744eff3000612b13c79575 (diff) | |
download | Nim-b5586264a0e0515f88f7ad4944bb987930fbd7b9.tar.gz |
broken attempt to fix queue exhaustion
Diffstat (limited to 'lib/pure')
-rw-r--r-- | lib/pure/concurrency/threadpool.nim | 50 |
1 files changed, 29 insertions, 21 deletions
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim index fbd344e4e..727bbffa2 100644 --- a/lib/pure/concurrency/threadpool.nim +++ b/lib/pure/concurrency/threadpool.nim @@ -100,7 +100,7 @@ type # for 'awaitAny' support ai: ptr AwaitInfo idx: int - data: pointer # we incRef and unref it to keep it alive + data: RootRef # we incRef and unref it to keep it alive owner: pointer # ptr Worker FlowVarObj[T] = object of FlowVarBaseObj @@ -112,7 +112,7 @@ type len: int lock: TLock empty: TCond - data: array[512, pointer] + data: array[2, pointer] WorkerProc = proc (thread, args: pointer) {.nimcall, gcsafe.} Worker = object @@ -134,6 +134,22 @@ proc await*(fv: FlowVarBase) = await(fv.cv) destroyCondVar(fv.cv) +proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool = + if cas(addr w.ready, true, false): + w.data = data + w.f = fn + signal(w.taskArrived) + await(w.taskStarted) + result = true + +proc wakeupWorkerToProcessQueue(w: ptr Worker) = + # Note that if this fails somebody else already woke up the thread so it's + # perfectly fine to do nothing: + if cas(addr w.ready, true, false): + w.data = nil + w.f = proc (t, a: pointer) {.nimcall.} = discard + signal(w.taskArrived) + proc finished(fv: FlowVarBase) = doAssert fv.ai.isNil, "flowVar is still attached to an 'awaitAny'" # we have to protect against the rare cases where the owner of the flowVar @@ -144,18 +160,17 @@ proc finished(fv: FlowVarBase) = let owner = cast[ptr Worker](fv.owner) let q = addr(owner.q) var waited = false - while true: + acquire(q.lock) + while not (q.len < q.data.len): + echo "EXHAUSTED!" + release(q.lock) + wakeupWorkerToProcessQueue(owner) acquire(q.lock) - if q.len < q.data.len: - q.data[q.len] = fv.data - inc q.len - release(q.lock) - break - else: - # the queue is exhausted! We block until it has been cleaned: - release(q.lock) - wait(q.empty, q.lock) - waited = true + wait(q.empty, q.lock) + waited = true + q.data[q.len] = cast[pointer](fv.data) + inc q.len + release(q.lock) fv.data = nil # wakeup other potentially waiting threads: if waited: signal(q.empty) @@ -165,6 +180,7 @@ proc cleanFlowVars(w: ptr Worker) = acquire(q.lock) for i in 0 .. <q.len: GC_unref(cast[RootRef](q.data[i])) + echo "GC_unref" q.len = 0 release(q.lock) signal(q.empty) @@ -331,14 +347,6 @@ var initLock stateLock -proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool = - if cas(addr w.ready, true, false): - w.data = data - w.f = fn - signal(w.taskArrived) - await(w.taskStarted) - result = true - proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} = # implementation of 'spawn' that is used by the code generator. while true: |