From b5586264a0e0515f88f7ad4944bb987930fbd7b9 Mon Sep 17 00:00:00 2001 From: Araq Date: Fri, 7 Nov 2014 00:27:31 +0100 Subject: broken attempt to fix queue exhaustion --- lib/pure/concurrency/threadpool.nim | 50 +++++++++++++++++++++---------------- 1 file changed, 29 insertions(+), 21 deletions(-) (limited to 'lib/pure') diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim index fbd344e4e..727bbffa2 100644 --- a/lib/pure/concurrency/threadpool.nim +++ b/lib/pure/concurrency/threadpool.nim @@ -100,7 +100,7 @@ type # for 'awaitAny' support ai: ptr AwaitInfo idx: int - data: pointer # we incRef and unref it to keep it alive + data: RootRef # we incRef and unref it to keep it alive owner: pointer # ptr Worker FlowVarObj[T] = object of FlowVarBaseObj @@ -112,7 +112,7 @@ type len: int lock: TLock empty: TCond - data: array[512, pointer] + data: array[2, pointer] WorkerProc = proc (thread, args: pointer) {.nimcall, gcsafe.} Worker = object @@ -134,6 +134,22 @@ proc await*(fv: FlowVarBase) = await(fv.cv) destroyCondVar(fv.cv) +proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool = + if cas(addr w.ready, true, false): + w.data = data + w.f = fn + signal(w.taskArrived) + await(w.taskStarted) + result = true + +proc wakeupWorkerToProcessQueue(w: ptr Worker) = + # Note that if this fails somebody else already woke up the thread so it's + # perfectly fine to do nothing: + if cas(addr w.ready, true, false): + w.data = nil + w.f = proc (t, a: pointer) {.nimcall.} = discard + signal(w.taskArrived) + proc finished(fv: FlowVarBase) = doAssert fv.ai.isNil, "flowVar is still attached to an 'awaitAny'" # we have to protect against the rare cases where the owner of the flowVar @@ -144,18 +160,17 @@ proc finished(fv: FlowVarBase) = let owner = cast[ptr Worker](fv.owner) let q = addr(owner.q) var waited = false - while true: + acquire(q.lock) + while not (q.len < q.data.len): + echo "EXHAUSTED!" + release(q.lock) + wakeupWorkerToProcessQueue(owner) acquire(q.lock) - if q.len < q.data.len: - q.data[q.len] = fv.data - inc q.len - release(q.lock) - break - else: - # the queue is exhausted! We block until it has been cleaned: - release(q.lock) - wait(q.empty, q.lock) - waited = true + wait(q.empty, q.lock) + waited = true + q.data[q.len] = cast[pointer](fv.data) + inc q.len + release(q.lock) fv.data = nil # wakeup other potentially waiting threads: if waited: signal(q.empty) @@ -165,6 +180,7 @@ proc cleanFlowVars(w: ptr Worker) = acquire(q.lock) for i in 0 ..