diff options
author | Araq <rumpf_a@web.de> | 2014-11-08 11:18:25 +0100 |
---|---|---|
committer | Araq <rumpf_a@web.de> | 2014-11-08 11:18:25 +0100 |
commit | 943d4ee71448f7efd2a7feb1126a401f68573979 (patch) | |
tree | b6df500d783f6065a6831dfed73612dcefbb536a /lib/pure | |
parent | 06e9932e8ae6860b1c96c57138d2d56e51f7036d (diff) | |
download | Nim-943d4ee71448f7efd2a7feb1126a401f68573979.tar.gz |
fixed the deadlock that happens when stress testing ToFreeQueue
Diffstat (limited to 'lib/pure')
-rw-r--r-- | lib/pure/concurrency/threadpool.nim | 55 |
1 files changed, 30 insertions, 25 deletions
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim index 7959a6c92..1e1f0c26d 100644 --- a/lib/pure/concurrency/threadpool.nim +++ b/lib/pure/concurrency/threadpool.nim @@ -112,8 +112,8 @@ type ToFreeQueue = object len: int lock: TLock - empty: TCond - data: array[2, pointer] + empty: CondVar + data: array[128, pointer] WorkerProc = proc (thread, args: pointer) {.nimcall, gcsafe.} Worker = object @@ -143,13 +143,27 @@ proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool = await(w.taskStarted) result = true +proc cleanFlowVars(w: ptr Worker) = + let q = addr(w.q) + acquire(q.lock) + for i in 0 .. <q.len: + GC_unref(cast[RootRef](q.data[i])) + #echo "GC_unref" + q.len = 0 + release(q.lock) + proc wakeupWorkerToProcessQueue(w: ptr Worker) = - # Note that if this fails somebody else already woke up the thread so it's - # perfectly fine to do nothing: - if cas(addr w.ready, true, false): - w.data = nil - w.f = proc (t, a: pointer) {.nimcall.} = discard - signal(w.taskArrived) + # we have to ensure it's us who wakes up the owning thread. + # This is quite horrible code, but it runs so rarely that it doesn't matter: + while not cas(addr w.ready, true, false): + cpuRelax() + discard + w.data = nil + w.f = proc (w, a: pointer) {.nimcall.} = + let w = cast[ptr Worker](w) + cleanFlowVars(w) + signal(w.q.empty) + signal(w.taskArrived) proc finished(fv: FlowVarBase) = doAssert fv.ai.isNil, "flowVar is still attached to an 'awaitAny'" @@ -160,29 +174,17 @@ proc finished(fv: FlowVarBase) = if fv.data.isNil: return let owner = cast[ptr Worker](fv.owner) let q = addr(owner.q) - var waited = false acquire(q.lock) while not (q.len < q.data.len): #echo "EXHAUSTED!" + release(q.lock) wakeupWorkerToProcessQueue(owner) - wait(q.empty, q.lock) - waited = true + await(q.empty) + acquire(q.lock) q.data[q.len] = cast[pointer](fv.data) inc q.len release(q.lock) fv.data = nil - # wakeup other potentially waiting threads: - if waited: signal(q.empty) - -proc cleanFlowVars(w: ptr Worker) = - let q = addr(w.q) - acquire(q.lock) - for i in 0 .. <q.len: - GC_unref(cast[RootRef](q.data[i])) - #echo "GC_unref" - q.len = 0 - release(q.lock) - signal(q.empty) proc fvFinalizer[T](fv: FlowVar[T]) = finished(fv) @@ -273,7 +275,10 @@ var proc slave(w: ptr Worker) {.thread.} = while true: - w.ready = true + when declared(atomicStoreN): + atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST) + else: + w.ready = true readyWorker = w signal(gSomeReady) await(w.taskArrived) @@ -305,7 +310,7 @@ proc activateThread(i: int) {.noinline.} = workersData[i].taskArrived = createCondVar() workersData[i].taskStarted = createCondVar() workersData[i].initialized = true - initCond(workersData[i].q.empty) + workersData[i].q.empty = createCondVar() initLock(workersData[i].q.lock) createThread(workers[i], slave, addr(workersData[i])) |