summary refs log tree commit diff stats
path: root/lib/pure
diff options
context:
space:
mode:
authorAraq <rumpf_a@web.de>2014-11-08 11:18:25 +0100
committerAraq <rumpf_a@web.de>2014-11-08 11:18:25 +0100
commit943d4ee71448f7efd2a7feb1126a401f68573979 (patch)
treeb6df500d783f6065a6831dfed73612dcefbb536a /lib/pure
parent06e9932e8ae6860b1c96c57138d2d56e51f7036d (diff)
downloadNim-943d4ee71448f7efd2a7feb1126a401f68573979.tar.gz
fixed the deadlock that happens when stress testing ToFreeQueue
Diffstat (limited to 'lib/pure')
-rw-r--r--lib/pure/concurrency/threadpool.nim55
1 files changed, 30 insertions, 25 deletions
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim
index 7959a6c92..1e1f0c26d 100644
--- a/lib/pure/concurrency/threadpool.nim
+++ b/lib/pure/concurrency/threadpool.nim
@@ -112,8 +112,8 @@ type
   ToFreeQueue = object
     len: int
     lock: TLock
-    empty: TCond
-    data: array[2, pointer]
+    empty: CondVar
+    data: array[128, pointer]
 
   WorkerProc = proc (thread, args: pointer) {.nimcall, gcsafe.}
   Worker = object
@@ -143,13 +143,27 @@ proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool =
     await(w.taskStarted)
     result = true
 
+proc cleanFlowVars(w: ptr Worker) =
+  let q = addr(w.q)
+  acquire(q.lock)
+  for i in 0 .. <q.len:
+    GC_unref(cast[RootRef](q.data[i]))
+    #echo "GC_unref"
+  q.len = 0
+  release(q.lock)
+
 proc wakeupWorkerToProcessQueue(w: ptr Worker) =
-  # Note that if this fails somebody else already woke up the thread so it's
-  # perfectly fine to do nothing:
-  if cas(addr w.ready, true, false):
-    w.data = nil
-    w.f = proc (t, a: pointer) {.nimcall.} = discard
-    signal(w.taskArrived)
+  # we have to ensure it's us who wakes up the owning thread.
+  # This is quite horrible code, but it runs so rarely that it doesn't matter:
+  while not cas(addr w.ready, true, false):
+    cpuRelax()
+    discard
+  w.data = nil
+  w.f = proc (w, a: pointer) {.nimcall.} =
+    let w = cast[ptr Worker](w)
+    cleanFlowVars(w)
+    signal(w.q.empty)
+  signal(w.taskArrived)
 
 proc finished(fv: FlowVarBase) =
   doAssert fv.ai.isNil, "flowVar is still attached to an 'awaitAny'"
@@ -160,29 +174,17 @@ proc finished(fv: FlowVarBase) =
   if fv.data.isNil: return
   let owner = cast[ptr Worker](fv.owner)
   let q = addr(owner.q)
-  var waited = false
   acquire(q.lock)
   while not (q.len < q.data.len):
     #echo "EXHAUSTED!"
+    release(q.lock)
     wakeupWorkerToProcessQueue(owner)
-    wait(q.empty, q.lock)
-    waited = true
+    await(q.empty)
+    acquire(q.lock)
   q.data[q.len] = cast[pointer](fv.data)
   inc q.len
   release(q.lock)
   fv.data = nil
-  # wakeup other potentially waiting threads:
-  if waited: signal(q.empty)
-
-proc cleanFlowVars(w: ptr Worker) =
-  let q = addr(w.q)
-  acquire(q.lock)
-  for i in 0 .. <q.len:
-    GC_unref(cast[RootRef](q.data[i]))
-    #echo "GC_unref"
-  q.len = 0
-  release(q.lock)
-  signal(q.empty)
 
 proc fvFinalizer[T](fv: FlowVar[T]) = finished(fv)
 
@@ -273,7 +275,10 @@ var
 
 proc slave(w: ptr Worker) {.thread.} =
   while true:
-    w.ready = true
+    when declared(atomicStoreN):
+      atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST)
+    else:
+      w.ready = true
     readyWorker = w
     signal(gSomeReady)
     await(w.taskArrived)
@@ -305,7 +310,7 @@ proc activateThread(i: int) {.noinline.} =
   workersData[i].taskArrived = createCondVar()
   workersData[i].taskStarted = createCondVar()
   workersData[i].initialized = true
-  initCond(workersData[i].q.empty)
+  workersData[i].q.empty = createCondVar()
   initLock(workersData[i].q.lock)
   createThread(workers[i], slave, addr(workersData[i]))