summary refs log tree commit diff stats
path: root/lib/pure
diff options
context:
space:
mode:
authorAraq <rumpf_a@web.de>2014-11-07 00:27:31 +0100
committerAraq <rumpf_a@web.de>2014-11-07 00:27:31 +0100
commitb5586264a0e0515f88f7ad4944bb987930fbd7b9 (patch)
treeaed2e14820214783049e9a4366f06e975b91222b /lib/pure
parent9500dfcc2e7b56a626744eff3000612b13c79575 (diff)
downloadNim-b5586264a0e0515f88f7ad4944bb987930fbd7b9.tar.gz
broken attempt to fix queue exhaustion
Diffstat (limited to 'lib/pure')
-rw-r--r--lib/pure/concurrency/threadpool.nim50
1 files changed, 29 insertions, 21 deletions
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim
index fbd344e4e..727bbffa2 100644
--- a/lib/pure/concurrency/threadpool.nim
+++ b/lib/pure/concurrency/threadpool.nim
@@ -100,7 +100,7 @@ type
     # for 'awaitAny' support
     ai: ptr AwaitInfo
     idx: int
-    data: pointer  # we incRef and unref it to keep it alive
+    data: RootRef  # we incRef and unref it to keep it alive
     owner: pointer # ptr Worker
 
   FlowVarObj[T] = object of FlowVarBaseObj
@@ -112,7 +112,7 @@ type
     len: int
     lock: TLock
     empty: TCond
-    data: array[512, pointer]
+    data: array[2, pointer]
 
   WorkerProc = proc (thread, args: pointer) {.nimcall, gcsafe.}
   Worker = object
@@ -134,6 +134,22 @@ proc await*(fv: FlowVarBase) =
     await(fv.cv)
     destroyCondVar(fv.cv)
 
+proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool =
+  if cas(addr w.ready, true, false):
+    w.data = data
+    w.f = fn
+    signal(w.taskArrived)
+    await(w.taskStarted)
+    result = true
+
+proc wakeupWorkerToProcessQueue(w: ptr Worker) =
+  # Note that if this fails somebody else already woke up the thread so it's
+  # perfectly fine to do nothing:
+  if cas(addr w.ready, true, false):
+    w.data = nil
+    w.f = proc (t, a: pointer) {.nimcall.} = discard
+    signal(w.taskArrived)
+
 proc finished(fv: FlowVarBase) =
   doAssert fv.ai.isNil, "flowVar is still attached to an 'awaitAny'"
   # we have to protect against the rare cases where the owner of the flowVar
@@ -144,18 +160,17 @@ proc finished(fv: FlowVarBase) =
   let owner = cast[ptr Worker](fv.owner)
   let q = addr(owner.q)
   var waited = false
-  while true:
+  acquire(q.lock)
+  while not (q.len < q.data.len):
+    echo "EXHAUSTED!"
+    release(q.lock)
+    wakeupWorkerToProcessQueue(owner)
     acquire(q.lock)
-    if q.len < q.data.len:
-      q.data[q.len] = fv.data
-      inc q.len
-      release(q.lock)
-      break
-    else:
-      # the queue is exhausted! We block until it has been cleaned:
-      release(q.lock)
-      wait(q.empty, q.lock)
-      waited = true
+    wait(q.empty, q.lock)
+    waited = true
+  q.data[q.len] = cast[pointer](fv.data)
+  inc q.len
+  release(q.lock)
   fv.data = nil
   # wakeup other potentially waiting threads:
   if waited: signal(q.empty)
@@ -165,6 +180,7 @@ proc cleanFlowVars(w: ptr Worker) =
   acquire(q.lock)
   for i in 0 .. <q.len:
     GC_unref(cast[RootRef](q.data[i]))
+    echo "GC_unref"
   q.len = 0
   release(q.lock)
   signal(q.empty)
@@ -331,14 +347,6 @@ var
 
 initLock stateLock
 
-proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool =
-  if cas(addr w.ready, true, false):
-    w.data = data
-    w.f = fn
-    signal(w.taskArrived)
-    await(w.taskStarted)
-    result = true
-
 proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} =
   # implementation of 'spawn' that is used by the code generator.
   while true: