summary refs log tree commit diff stats
path: root/lib/pure/concurrency
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pure/concurrency')
-rw-r--r--lib/pure/concurrency/threadpool.nim23
1 files changed, 20 insertions, 3 deletions
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim
index 04be704be..56fd74d86 100644
--- a/lib/pure/concurrency/threadpool.nim
+++ b/lib/pure/concurrency/threadpool.nim
@@ -133,6 +133,8 @@ type
     q: ToFreeQueue
     readyForTask: Semaphore
 
+const threadpoolWaitMs {.intdefine.}: int = 100
+
 proc blockUntil*(fv: FlowVarBase) =
   ## Waits until the value for the ``fv`` arrives.
   ##
@@ -201,6 +203,8 @@ proc finished(fv: FlowVarBase) =
   inc q.len
   release(q.lock)
   fv.data = nil
+  # the worker thread waits for "data" to be set to nil before shutting down
+  owner.data = nil
 
 proc fvFinalizer[T](fv: FlowVar[T]) = finished(fv)
 
@@ -241,21 +245,24 @@ proc unsafeRead*[T](fv: FlowVar[ref T]): ptr T =
   ## Blocks until the value is available and then returns this value.
   blockUntil(fv)
   result = cast[ptr T](fv.data)
+  finished(fv)
 
 proc `^`*[T](fv: FlowVar[ref T]): ref T =
   ## Blocks until the value is available and then returns this value.
   blockUntil(fv)
   let src = cast[ref T](fv.data)
   deepCopy result, src
+  finished(fv)
 
 proc `^`*[T](fv: FlowVar[T]): T =
   ## Blocks until the value is available and then returns this value.
   blockUntil(fv)
   when T is string or T is seq:
-    # XXX closures? deepCopy?
-    result = cast[T](fv.data)
+    let src = cast[T](fv.data)
+    deepCopy result, src
   else:
     result = fv.blob
+  finished(fv)
 
 proc blockUntilAny*(flowVars: openArray[FlowVarBase]): int =
   ## Awaits any of the given ``flowVars``. Returns the index of one ``flowVar``
@@ -334,6 +341,16 @@ proc slave(w: ptr Worker) {.thread.} =
     if w.shutdown:
       w.shutdown = false
       atomicDec currentPoolSize
+      while true:
+        if w.data != nil:
+          sleep(threadpoolWaitMs)
+        else:
+          # The flowvar finalizer ("finished()") set w.data to nil, so we can
+          # safely terminate the thread.
+          #
+          # TODO: look for scenarios in which the flowvar is never finalized, so
+          # a shut down thread gets stuck in this loop until the main thread exits.
+          break
       break
     when declared(atomicStoreN):
       atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST)
@@ -576,7 +593,7 @@ proc sync*() =
       if not allReady: break
       allReady = allReady and workersData[i].ready
     if allReady: break
-    sleep(100)
+    sleep(threadpoolWaitMs)
     # We cannot "blockUntil(gSomeReady)" because workers may be shut down between
     # the time we establish that some are not "ready" and the time we wait for a
     # "signal(gSomeReady)" from inside "slave()" that can never come.