summary refs log tree commit diff stats
diff options
context:
space:
mode:
authorȘtefan Talpalaru <stefantalpalaru@yahoo.com>2019-05-20 09:29:13 +0200
committerAndreas Rumpf <rumpf_a@web.de>2019-05-20 09:29:13 +0200
commit13b3e4af8a419f39b139d716ae45acb51750e787 (patch)
treedbb0d40a49a2e1017c4380392c38197e8730952d
parenta63c2a25d0e2536abf68dfa4bb52974e3f9cf226 (diff)
downloadNim-13b3e4af8a419f39b139d716ae45acb51750e787.tar.gz
fixes #11275 (#11276)
-rw-r--r--lib/pure/concurrency/threadpool.nim23
-rw-r--r--tests/parallel/tconvexhull.nim2
2 files changed, 22 insertions, 3 deletions
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim
index 04be704be..56fd74d86 100644
--- a/lib/pure/concurrency/threadpool.nim
+++ b/lib/pure/concurrency/threadpool.nim
@@ -133,6 +133,8 @@ type
     q: ToFreeQueue
     readyForTask: Semaphore
 
+const threadpoolWaitMs {.intdefine.}: int = 100
+
 proc blockUntil*(fv: FlowVarBase) =
   ## Waits until the value for the ``fv`` arrives.
   ##
@@ -201,6 +203,8 @@ proc finished(fv: FlowVarBase) =
   inc q.len
   release(q.lock)
   fv.data = nil
+  # the worker thread waits for "data" to be set to nil before shutting down
+  owner.data = nil
 
 proc fvFinalizer[T](fv: FlowVar[T]) = finished(fv)
 
@@ -241,21 +245,24 @@ proc unsafeRead*[T](fv: FlowVar[ref T]): ptr T =
   ## Blocks until the value is available and then returns this value.
   blockUntil(fv)
   result = cast[ptr T](fv.data)
+  finished(fv)
 
 proc `^`*[T](fv: FlowVar[ref T]): ref T =
   ## Blocks until the value is available and then returns this value.
   blockUntil(fv)
   let src = cast[ref T](fv.data)
   deepCopy result, src
+  finished(fv)
 
 proc `^`*[T](fv: FlowVar[T]): T =
   ## Blocks until the value is available and then returns this value.
   blockUntil(fv)
   when T is string or T is seq:
-    # XXX closures? deepCopy?
-    result = cast[T](fv.data)
+    let src = cast[T](fv.data)
+    deepCopy result, src
   else:
     result = fv.blob
+  finished(fv)
 
 proc blockUntilAny*(flowVars: openArray[FlowVarBase]): int =
   ## Awaits any of the given ``flowVars``. Returns the index of one ``flowVar``
@@ -334,6 +341,16 @@ proc slave(w: ptr Worker) {.thread.} =
     if w.shutdown:
       w.shutdown = false
       atomicDec currentPoolSize
+      while true:
+        if w.data != nil:
+          sleep(threadpoolWaitMs)
+        else:
+          # The flowvar finalizer ("finished()") set w.data to nil, so we can
+          # safely terminate the thread.
+          #
+          # TODO: look for scenarios in which the flowvar is never finalized, so
+          # a shut down thread gets stuck in this loop until the main thread exits.
+          break
       break
     when declared(atomicStoreN):
       atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST)
@@ -576,7 +593,7 @@ proc sync*() =
       if not allReady: break
       allReady = allReady and workersData[i].ready
     if allReady: break
-    sleep(100)
+    sleep(threadpoolWaitMs)
     # We cannot "blockUntil(gSomeReady)" because workers may be shut down between
     # the time we establish that some are not "ready" and the time we wait for a
     # "signal(gSomeReady)" from inside "slave()" that can never come.
diff --git a/tests/parallel/tconvexhull.nim b/tests/parallel/tconvexhull.nim
index cc01b5c78..184a131a2 100644
--- a/tests/parallel/tconvexhull.nim
+++ b/tests/parallel/tconvexhull.nim
@@ -52,6 +52,8 @@ proc convex_hull[T](points: var seq[T], cmp: proc(x, y: T): int {.closure.}) : s
   result = concat(^ul[0], ^ul[1])
 
 var s = map(toSeq(0..99999), proc(x: int): Point = (float(x div 1000), float(x mod 1000)))
+# On some runs, this pool size reduction will set the "shutdown" attribute on the
+# worker thread that executes our spawned task, before we can read the flowvars.
 setMaxPoolSize 2
 
 #echo convex_hull[Point](s, cmpPoint)