summary refs log tree commit diff stats
path: root/lib/pure/concurrency
diff options
context:
space:
mode:
authorAndreas Rumpf <rumpf_a@web.de>2018-04-19 08:54:16 +0200
committerAndreas Rumpf <rumpf_a@web.de>2018-04-19 08:54:23 +0200
commit0dc4d6dcc28a6e35f36caaa5bcc801b4acac61b4 (patch)
treec8cb2d68768cefb840ab3daf80c04358b46bb117 /lib/pure/concurrency
parentcb03ae2c9fcbb459b58bee90aa0759dcea6be878 (diff)
downloadNim-0dc4d6dcc28a6e35f36caaa5bcc801b4acac61b4.tar.gz
fixes #7638; awaitAny blocks if the flow vars all have been complete already
Diffstat (limited to 'lib/pure/concurrency')
-rw-r--r--lib/pure/concurrency/threadpool.nim24
1 files changed, 17 insertions, 7 deletions
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim
index b01f8fc0d..ca4f80f2a 100644
--- a/lib/pure/concurrency/threadpool.nim
+++ b/lib/pure/concurrency/threadpool.nim
@@ -168,6 +168,15 @@ proc wakeupWorkerToProcessQueue(w: ptr Worker) =
     signal(w.q.empty)
   signal(w.taskArrived)
 
+proc attach(fv: FlowVarBase; i: int): bool =
+  acquire(fv.cv.L)
+  if fv.cv.counter <= 0:
+    fv.idx = i
+    result = true
+  else:
+    result = false
+  release(fv.cv.L)
+
 proc finished(fv: FlowVarBase) =
   doAssert fv.ai.isNil, "flowVar is still attached to an 'awaitAny'"
   # we have to protect against the rare cases where the owner of the flowVar
@@ -248,23 +257,24 @@ proc awaitAny*(flowVars: openArray[FlowVarBase]): int =
   ## the same time. That means if you awaitAny([a,b]) and awaitAny([b,c]) the second
   ## call will only await 'c'. If there is no flowVar left to be able to wait
   ## on, -1 is returned.
-  ## **Note**: This results in non-deterministic behaviour and so should be
-  ## avoided.
+  ## **Note**: This results in non-deterministic behaviour and should be avoided.
   var ai: AwaitInfo
   ai.cv.initSemaphore()
   var conflicts = 0
+  result = -1
   for i in 0 .. flowVars.high:
     if cas(addr flowVars[i].ai, nil, addr ai):
-      flowVars[i].idx = i
+      if not attach(flowVars[i], i):
+        result = i
+        break
     else:
       inc conflicts
   if conflicts < flowVars.len:
-    await(ai.cv)
-    result = ai.idx
+    if result < 0:
+      await(ai.cv)
+      result = ai.idx
     for i in 0 .. flowVars.high:
       discard cas(addr flowVars[i].ai, addr ai, nil)
-  else:
-    result = -1
   destroySemaphore(ai.cv)
 
 proc isReady*(fv: FlowVarBase): bool =