summary refs log tree commit diff stats
path: root/lib/pure/concurrency/threadpool.nim
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pure/concurrency/threadpool.nim')
-rw-r--r--lib/pure/concurrency/threadpool.nim24
1 files changed, 16 insertions, 8 deletions
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim
index fd1041918..f46822d94 100644
--- a/lib/pure/concurrency/threadpool.nim
+++ b/lib/pure/concurrency/threadpool.nim
@@ -9,6 +9,9 @@
 
 ## Implements Nimrod's 'spawn'.
 
+when not compileOption("threads"):
+  {.error: "Threadpool requires --threads:on option.".}
+
 import cpuinfo, cpuload, locks
 
 {.push stackTrace:off.}
@@ -92,7 +95,7 @@ type
 
   FlowVarBase* = ref FlowVarBaseObj ## untyped base class for 'FlowVar[T]'
   FlowVarBaseObj = object of TObject
-    ready, usesCondVar: bool
+    ready, usesCondVar, awaited: bool
     cv: CondVar #\
     # for 'awaitAny' support
     ai: ptr AwaitInfo
@@ -126,15 +129,15 @@ type
 proc await*(fv: FlowVarBase) =
   ## waits until the value for the flowVar arrives. Usually it is not necessary
   ## to call this explicitly.
-  if fv.usesCondVar:
-    fv.usesCondVar = false
+  if fv.usesCondVar and not fv.awaited:
+    fv.awaited = true
     await(fv.cv)
     destroyCondVar(fv.cv)
 
 proc finished(fv: FlowVarBase) =
   doAssert fv.ai.isNil, "flowVar is still attached to an 'awaitAny'"
   # we have to protect against the rare cases where the owner of the flowVar
-  # simply disregards the flowVar and yet the "flowVarr" has not yet written
+  # simply disregards the flowVar and yet the "flowVar" has not yet written
   # anything to it:
   await(fv)
   if fv.data.isNil: return
@@ -207,6 +210,7 @@ proc `^`*[T](fv: FlowVar[T]): T =
   ## blocks until the value is available and then returns this value.
   await(fv)
   when T is string or T is seq:
+    # XXX closures? deepCopy?
     result = cast[T](fv.data)
   else:
     result = fv.blob
@@ -264,6 +268,10 @@ proc slave(w: ptr Worker) {.thread.} =
       w.shutdown = false
       atomicDec currentPoolSize
 
+var
+  workers: array[MaxThreadPoolSize, TThread[ptr Worker]]
+  workersData: array[MaxThreadPoolSize, Worker]
+
 proc setMinPoolSize*(size: range[1..MaxThreadPoolSize]) =
   ## sets the minimal thread pool size. The default value of this is 4.
   minPoolSize = size
@@ -272,10 +280,10 @@ proc setMaxPoolSize*(size: range[1..MaxThreadPoolSize]) =
   ## sets the minimal thread pool size. The default value of this
   ## is ``MaxThreadPoolSize``.
   maxPoolSize = size
-
-var
-  workers: array[MaxThreadPoolSize, TThread[ptr Worker]]
-  workersData: array[MaxThreadPoolSize, Worker]
+  if currentPoolSize > maxPoolSize:
+    for i in maxPoolSize..currentPoolSize-1:
+      let w = addr(workersData[i])
+      w.shutdown = true
 
 proc activateThread(i: int) {.noinline.} =
   workersData[i].taskArrived = createCondVar()