summary refs log tree commit diff stats
path: root/lib/pure/concurrency/threadpool.nim
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pure/concurrency/threadpool.nim')
-rw-r--r--lib/pure/concurrency/threadpool.nim404
1 files changed, 257 insertions, 147 deletions
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim
index 2603835dd..06ed2fe54 100644
--- a/lib/pure/concurrency/threadpool.nim
+++ b/lib/pure/concurrency/threadpool.nim
@@ -7,12 +7,25 @@
 #    distribution, for details about the copyright.
 #
 
-## Implements Nim's 'spawn'.
+{.deprecated: "use the nimble packages `malebolgia`, `taskpools` or `weave` instead".}
+
+## Implements Nim's `parallel & spawn statements <manual_experimental.html#parallel-amp-spawn>`_.
+##
+## Unstable API.
+##
+## See also
+## ========
+## * `threads module <typedthreads.html>`_ for basic thread support
+## * `locks module <locks.html>`_ for locks and condition variables
+## * `asyncdispatch module <asyncdispatch.html>`_ for asynchronous IO
 
 when not compileOption("threads"):
   {.error: "Threadpool requires --threads:on option.".}
 
-import cpuinfo, cpuload, locks
+import std/[cpuinfo, cpuload, locks, os]
+
+when defined(nimPreviewSlimSystem):
+  import std/[assertions, typedthreads, sysatomics]
 
 {.push stackTrace:off.}
 
@@ -22,15 +35,15 @@ type
     L: Lock
     counter: int
 
-proc createSemaphore(): Semaphore =
-  initCond(result.c)
-  initLock(result.L)
+proc initSemaphore(cv: var Semaphore) =
+  initCond(cv.c)
+  initLock(cv.L)
 
 proc destroySemaphore(cv: var Semaphore) {.inline.} =
   deinitCond(cv.c)
   deinitLock(cv.L)
 
-proc await(cv: var Semaphore) =
+proc blockUntil(cv: var Semaphore) =
   acquire(cv.L)
   while cv.counter <= 0:
     wait(cv.c, cv.L)
@@ -43,45 +56,42 @@ proc signal(cv: var Semaphore) =
   release(cv.L)
   signal(cv.c)
 
-const CacheLineSize = 32 # true for most archs
+const CacheLineSize = 64 # true for most archs
 
 type
-  Barrier {.compilerProc.} = object
+  Barrier {.compilerproc.} = object
     entered: int
     cv: Semaphore # Semaphore takes 3 words at least
-    when sizeof(int) < 8:
-      cacheAlign: array[CacheLineSize-4*sizeof(int), byte]
-    left: int
-    cacheAlign2: array[CacheLineSize-sizeof(int), byte]
-    interest: bool ## wether the master is interested in the "all done" event
+    left {.align(CacheLineSize).}: int
+    interest {.align(CacheLineSize).} : bool # whether the master is interested in the "all done" event
 
-proc barrierEnter(b: ptr Barrier) {.compilerProc, inline.} =
+proc barrierEnter(b: ptr Barrier) {.compilerproc, inline.} =
   # due to the signaling between threads, it is ensured we are the only
   # one with access to 'entered' so we don't need 'atomicInc' here:
   inc b.entered
   # also we need no 'fence' instructions here as soon 'nimArgsPassingDone'
   # will be called which already will perform a fence for us.
 
-proc barrierLeave(b: ptr Barrier) {.compilerProc, inline.} =
+proc barrierLeave(b: ptr Barrier) {.compilerproc, inline.} =
   atomicInc b.left
   when not defined(x86): fence()
   # We may not have seen the final value of b.entered yet,
   # so we need to check for >= instead of ==.
   if b.interest and b.left >= b.entered: signal(b.cv)
 
-proc openBarrier(b: ptr Barrier) {.compilerProc, inline.} =
+proc openBarrier(b: ptr Barrier) {.compilerproc, inline.} =
   b.entered = 0
   b.left = 0
   b.interest = false
 
-proc closeBarrier(b: ptr Barrier) {.compilerProc.} =
+proc closeBarrier(b: ptr Barrier) {.compilerproc.} =
   fence()
   if b.left != b.entered:
-    b.cv = createSemaphore()
+    b.cv.initSemaphore()
     fence()
     b.interest = true
     fence()
-    while b.left != b.entered: await(b.cv)
+    while b.left != b.entered: blockUntil(b.cv)
     destroySemaphore(b.cv)
 
 {.pop.}
@@ -89,27 +99,24 @@ proc closeBarrier(b: ptr Barrier) {.compilerProc.} =
 # ----------------------------------------------------------------------------
 
 type
-  foreign* = object ## a region that indicates the pointer comes from a
-                    ## foreign thread heap.
   AwaitInfo = object
     cv: Semaphore
     idx: int
 
-  FlowVarBase* = ref FlowVarBaseObj ## untyped base class for 'FlowVar[T]'
-  FlowVarBaseObj = object of RootObj
+  FlowVarBase* = ref FlowVarBaseObj ## Untyped base class for `FlowVar[T] <#FlowVar>`_.
+  FlowVarBaseObj {.acyclic.} = object of RootObj
     ready, usesSemaphore, awaited: bool
-    cv: Semaphore #\
-    # for 'awaitAny' support
+    cv: Semaphore  # for 'blockUntilAny' support
     ai: ptr AwaitInfo
     idx: int
     data: pointer  # we incRef and unref it to keep it alive; note this MUST NOT
                    # be RootRef here otherwise the wrong GC keeps track of it!
     owner: pointer # ptr Worker
 
-  FlowVarObj[T] = object of FlowVarBaseObj
+  FlowVarObj[T] {.acyclic.} = object of FlowVarBaseObj
     blob: T
 
-  FlowVar*{.compilerProc.}[T] = ref FlowVarObj[T] ## a data flow variable
+  FlowVar*[T] {.compilerproc.} = ref FlowVarObj[T] ## A data flow variable.
 
   ToFreeQueue = object
     len: int
@@ -130,12 +137,15 @@ type
     q: ToFreeQueue
     readyForTask: Semaphore
 
-proc await*(fv: FlowVarBase) =
-  ## waits until the value for the flowVar arrives. Usually it is not necessary
-  ## to call this explicitly.
+const threadpoolWaitMs {.intdefine.}: int = 100
+
+proc blockUntil*(fv: var FlowVarBaseObj) =
+  ## Waits until the value for `fv` arrives.
+  ##
+  ## Usually it is not necessary to call this explicitly.
   if fv.usesSemaphore and not fv.awaited:
     fv.awaited = true
-    await(fv.cv)
+    blockUntil(fv.cv)
     destroySemaphore(fv.cv)
 
 proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool =
@@ -143,13 +153,13 @@ proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool =
     w.data = data
     w.f = fn
     signal(w.taskArrived)
-    await(w.taskStarted)
+    blockUntil(w.taskStarted)
     result = true
 
 proc cleanFlowVars(w: ptr Worker) =
   let q = addr(w.q)
   acquire(q.lock)
-  for i in 0 .. <q.len:
+  for i in 0 ..< q.len:
     GC_unref(cast[RootRef](q.data[i]))
     #echo "GC_unref"
   q.len = 0
@@ -168,12 +178,21 @@ proc wakeupWorkerToProcessQueue(w: ptr Worker) =
     signal(w.q.empty)
   signal(w.taskArrived)
 
-proc finished(fv: FlowVarBase) =
-  doAssert fv.ai.isNil, "flowVar is still attached to an 'awaitAny'"
+proc attach(fv: FlowVarBase; i: int): bool =
+  acquire(fv.cv.L)
+  if fv.cv.counter <= 0:
+    fv.idx = i
+    result = true
+  else:
+    result = false
+  release(fv.cv.L)
+
+proc finished(fv: var FlowVarBaseObj) =
+  doAssert fv.ai.isNil, "flowVar is still attached to an 'blockUntilAny'"
   # we have to protect against the rare cases where the owner of the flowVar
   # simply disregards the flowVar and yet the "flowVar" has not yet written
   # anything to it:
-  await(fv)
+  blockUntil(fv)
   if fv.data.isNil: return
   let owner = cast[ptr Worker](fv.owner)
   let q = addr(owner.q)
@@ -182,23 +201,27 @@ proc finished(fv: FlowVarBase) =
     #echo "EXHAUSTED!"
     release(q.lock)
     wakeupWorkerToProcessQueue(owner)
-    await(q.empty)
+    blockUntil(q.empty)
     acquire(q.lock)
   q.data[q.len] = cast[pointer](fv.data)
   inc q.len
   release(q.lock)
   fv.data = nil
+  # the worker thread waits for "data" to be set to nil before shutting down
+  owner.data = nil
 
-proc fvFinalizer[T](fv: FlowVar[T]) = finished(fv)
+proc `=destroy`[T](fv: var FlowVarObj[T]) =
+  finished(fv)
+  `=destroy`(fv.blob)
 
-proc nimCreateFlowVar[T](): FlowVar[T] {.compilerProc.} =
-  new(result, fvFinalizer)
+proc nimCreateFlowVar[T](): FlowVar[T] {.compilerproc.} =
+  new(result)
 
-proc nimFlowVarCreateSemaphore(fv: FlowVarBase) {.compilerProc.} =
-  fv.cv = createSemaphore()
+proc nimFlowVarCreateSemaphore(fv: FlowVarBase) {.compilerproc.} =
+  fv.cv.initSemaphore()
   fv.usesSemaphore = true
 
-proc nimFlowVarSignal(fv: FlowVarBase) {.compilerProc.} =
+proc nimFlowVarSignal(fv: FlowVarBase) {.compilerproc.} =
   if fv.ai != nil:
     acquire(fv.ai.cv.L)
     fv.ai.idx = fv.idx
@@ -209,68 +232,74 @@ proc nimFlowVarSignal(fv: FlowVarBase) {.compilerProc.} =
     signal(fv.cv)
 
 proc awaitAndThen*[T](fv: FlowVar[T]; action: proc (x: T) {.closure.}) =
-  ## blocks until the ``fv`` is available and then passes its value
-  ## to ``action``. Note that due to Nim's parameter passing semantics this
-  ## means that ``T`` doesn't need to be copied and so ``awaitAndThen`` can
-  ## sometimes be more efficient than ``^``.
-  await(fv)
-  when T is string or T is seq:
+  ## Blocks until `fv` is available and then passes its value
+  ## to `action`.
+  ##
+  ## Note that due to Nim's parameter passing semantics, this
+  ## means that `T` doesn't need to be copied, so `awaitAndThen` can
+  ## sometimes be more efficient than the `^ proc <#^,FlowVar[T]>`_.
+  blockUntil(fv[])
+  when defined(nimV2):
+    action(fv.blob)
+  elif T is string or T is seq:
     action(cast[T](fv.data))
   elif T is ref:
     {.error: "'awaitAndThen' not available for FlowVar[ref]".}
   else:
     action(fv.blob)
-  finished(fv)
-
-proc unsafeRead*[T](fv: FlowVar[ref T]): foreign ptr T =
-  ## blocks until the value is available and then returns this value.
-  await(fv)
-  result = cast[foreign ptr T](fv.data)
+  finished(fv[])
 
-proc `^`*[T](fv: FlowVar[ref T]): ref T =
-  ## blocks until the value is available and then returns this value.
-  await(fv)
-  let src = cast[ref T](fv.data)
-  deepCopy result, src
+proc unsafeRead*[T](fv: FlowVar[ref T]): ptr T =
+  ## Blocks until the value is available and then returns this value.
+  blockUntil(fv[])
+  when defined(nimV2):
+    result = cast[ptr T](fv.blob)
+  else:
+    result = cast[ptr T](fv.data)
+  finished(fv[])
 
 proc `^`*[T](fv: FlowVar[T]): T =
-  ## blocks until the value is available and then returns this value.
-  await(fv)
-  when T is string or T is seq:
-    # XXX closures? deepCopy?
-    result = cast[T](fv.data)
+  ## Blocks until the value is available and then returns this value.
+  blockUntil(fv[])
+  when not defined(nimV2) and (T is string or T is seq or T is ref):
+    deepCopy result, cast[T](fv.data)
   else:
     result = fv.blob
+  finished(fv[])
 
-proc awaitAny*(flowVars: openArray[FlowVarBase]): int =
-  ## awaits any of the given flowVars. Returns the index of one flowVar for
-  ## which a value arrived. A flowVar only supports one call to 'awaitAny' at
-  ## the same time. That means if you await([a,b]) and await([b,c]) the second
-  ## call will only await 'c'. If there is no flowVar left to be able to wait
-  ## on, -1 is returned.
-  ## **Note**: This results in non-deterministic behaviour and so should be
-  ## avoided.
+proc blockUntilAny*(flowVars: openArray[FlowVarBase]): int =
+  ## Awaits any of the given `flowVars`. Returns the index of one `flowVar`
+  ## for which a value arrived.
+  ##
+  ## A `flowVar` only supports one call to `blockUntilAny` at the same time.
+  ## That means if you `blockUntilAny([a,b])` and `blockUntilAny([b,c])`
+  ## the second call will only block until `c`. If there is no `flowVar` left
+  ## to be able to wait on, -1 is returned.
+  ##
+  ## **Note:** This results in non-deterministic behaviour and should be avoided.
   var ai: AwaitInfo
-  ai.cv = createSemaphore()
+  ai.cv.initSemaphore()
   var conflicts = 0
+  result = -1
   for i in 0 .. flowVars.high:
     if cas(addr flowVars[i].ai, nil, addr ai):
-      flowVars[i].idx = i
+      if not attach(flowVars[i], i):
+        result = i
+        break
     else:
       inc conflicts
   if conflicts < flowVars.len:
-    await(ai.cv)
-    result = ai.idx
+    if result < 0:
+      blockUntil(ai.cv)
+      result = ai.idx
     for i in 0 .. flowVars.high:
       discard cas(addr flowVars[i].ai, addr ai, nil)
-  else:
-    result = -1
   destroySemaphore(ai.cv)
 
 proc isReady*(fv: FlowVarBase): bool =
-  ## Determines whether the specified ``FlowVarBase``'s value is available.
+  ## Determines whether the specified `FlowVarBase`'s value is available.
   ##
-  ## If ``true`` awaiting ``fv`` will not block.
+  ## If `true`, awaiting `fv` will not block.
   if fv.usesSemaphore and not fv.awaited:
     acquire(fv.cv.L)
     result = fv.cv.counter > 0
@@ -278,42 +307,74 @@ proc isReady*(fv: FlowVarBase): bool =
   else:
     result = true
 
-proc nimArgsPassingDone(p: pointer) {.compilerProc.} =
+proc nimArgsPassingDone(p: pointer) {.compilerproc.} =
   let w = cast[ptr Worker](p)
   signal(w.taskStarted)
 
 const
-  MaxThreadPoolSize* = 256 ## maximal size of the thread pool. 256 threads
-                           ## should be good enough for anybody ;-)
-  MaxDistinguishedThread* = 32 ## maximal number of "distinguished" threads.
+  MaxThreadPoolSize* {.intdefine.} = 256 ## Maximum size of the thread pool. 256 threads
+                                         ## should be good enough for anybody ;-)
+  MaxDistinguishedThread* {.intdefine.} = 32 ## Maximum number of "distinguished" threads.
 
 type
-  ThreadId* = range[0..MaxDistinguishedThread-1]
+  ThreadId* = range[0..MaxDistinguishedThread-1] ## A thread identifier.
 
 var
   currentPoolSize: int
   maxPoolSize = MaxThreadPoolSize
   minPoolSize = 4
-  gSomeReady = createSemaphore()
+  gSomeReady: Semaphore
   readyWorker: ptr Worker
 
+# A workaround for recursion deadlock issue
+# https://github.com/nim-lang/Nim/issues/4597
+var
+  numSlavesLock: Lock
+  numSlavesRunning {.guard: numSlavesLock.}: int
+  numSlavesWaiting {.guard: numSlavesLock.}: int
+  isSlave {.threadvar.}: bool
+
+numSlavesLock.initLock
+
+gSomeReady.initSemaphore()
+
 proc slave(w: ptr Worker) {.thread.} =
+  isSlave = true
   while true:
+    if w.shutdown:
+      w.shutdown = false
+      atomicDec currentPoolSize
+      while true:
+        if w.data != nil:
+          sleep(threadpoolWaitMs)
+        else:
+          # The flowvar finalizer ("finished()") set w.data to nil, so we can
+          # safely terminate the thread.
+          #
+          # TODO: look for scenarios in which the flowvar is never finalized, so
+          # a shut down thread gets stuck in this loop until the main thread exits.
+          break
+      break
     when declared(atomicStoreN):
       atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST)
     else:
       w.ready = true
     readyWorker = w
     signal(gSomeReady)
-    await(w.taskArrived)
+    blockUntil(w.taskArrived)
     # XXX Somebody needs to look into this (why does this assertion fail
     # in Visual Studio?)
-    when not defined(vcc): assert(not w.ready)
+    when not defined(vcc) and not defined(tcc): assert(not w.ready)
+
+    withLock numSlavesLock:
+      inc numSlavesRunning
+
     w.f(w, w.data)
+
+    withLock numSlavesLock:
+      dec numSlavesRunning
+
     if w.q.len != 0: w.cleanFlowVars
-    if w.shutdown:
-      w.shutdown = false
-      atomicDec currentPoolSize
 
 proc distinguishedSlave(w: ptr Worker) {.thread.} =
   while true:
@@ -322,28 +383,28 @@ proc distinguishedSlave(w: ptr Worker) {.thread.} =
     else:
       w.ready = true
     signal(w.readyForTask)
-    await(w.taskArrived)
+    blockUntil(w.taskArrived)
     assert(not w.ready)
     w.f(w, w.data)
     if w.q.len != 0: w.cleanFlowVars
 
 var
-  workers: array[MaxThreadPoolSize, TThread[ptr Worker]]
+  workers: array[MaxThreadPoolSize, Thread[ptr Worker]]
   workersData: array[MaxThreadPoolSize, Worker]
 
-  distinguished: array[MaxDistinguishedThread, TThread[ptr Worker]]
+  distinguished: array[MaxDistinguishedThread, Thread[ptr Worker]]
   distinguishedData: array[MaxDistinguishedThread, Worker]
 
 when defined(nimPinToCpu):
   var gCpus: Natural
 
 proc setMinPoolSize*(size: range[1..MaxThreadPoolSize]) =
-  ## sets the minimal thread pool size. The default value of this is 4.
+  ## Sets the minimum thread pool size. The default value of this is 4.
   minPoolSize = size
 
 proc setMaxPoolSize*(size: range[1..MaxThreadPoolSize]) =
-  ## sets the maximal thread pool size. The default value of this
-  ## is ``MaxThreadPoolSize``.
+  ## Sets the maximum thread pool size. The default value of this
+  ## is `MaxThreadPoolSize <#MaxThreadPoolSize>`_.
   maxPoolSize = size
   if currentPoolSize > maxPoolSize:
     for i in maxPoolSize..currentPoolSize-1:
@@ -354,10 +415,10 @@ when defined(nimRecursiveSpawn):
   var localThreadId {.threadvar.}: int
 
 proc activateWorkerThread(i: int) {.noinline.} =
-  workersData[i].taskArrived = createSemaphore()
-  workersData[i].taskStarted = createSemaphore()
+  workersData[i].taskArrived.initSemaphore()
+  workersData[i].taskStarted.initSemaphore()
   workersData[i].initialized = true
-  workersData[i].q.empty = createSemaphore()
+  workersData[i].q.empty.initSemaphore()
   initLock(workersData[i].q.lock)
   createThread(workers[i], slave, addr(workersData[i]))
   when defined(nimRecursiveSpawn):
@@ -366,12 +427,12 @@ proc activateWorkerThread(i: int) {.noinline.} =
     if gCpus > 0: pinToCpu(workers[i], i mod gCpus)
 
 proc activateDistinguishedThread(i: int) {.noinline.} =
-  distinguishedData[i].taskArrived = createSemaphore()
-  distinguishedData[i].taskStarted = createSemaphore()
+  distinguishedData[i].taskArrived.initSemaphore()
+  distinguishedData[i].taskStarted.initSemaphore()
   distinguishedData[i].initialized = true
-  distinguishedData[i].q.empty = createSemaphore()
+  distinguishedData[i].q.empty.initSemaphore()
   initLock(distinguishedData[i].q.lock)
-  distinguishedData[i].readyForTask = createSemaphore()
+  distinguishedData[i].readyForTask.initSemaphore()
   createThread(distinguished[i], distinguishedSlave, addr(distinguishedData[i]))
 
 proc setup() =
@@ -380,40 +441,51 @@ proc setup() =
     gCpus = p
   currentPoolSize = min(p, MaxThreadPoolSize)
   readyWorker = addr(workersData[0])
-  for i in 0.. <currentPoolSize: activateWorkerThread(i)
+  for i in 0..<currentPoolSize: activateWorkerThread(i)
 
 proc preferSpawn*(): bool =
-  ## Use this proc to determine quickly if a 'spawn' or a direct call is
-  ## preferable. If it returns 'true' a 'spawn' may make sense. In general
-  ## it is not necessary to call this directly; use 'spawnX' instead.
+  ## Use this proc to determine quickly if a `spawn` or a direct call is
+  ## preferable.
+  ##
+  ## If it returns `true`, a `spawn` may make sense. In general
+  ## it is not necessary to call this directly; use the `spawnX template
+  ## <#spawnX.t>`_ instead.
   result = gSomeReady.counter > 0
 
-proc spawn*(call: expr): expr {.magic: "Spawn".}
-  ## always spawns a new task, so that the 'call' is never executed on
-  ## the calling thread. 'call' has to be proc call 'p(...)' where 'p'
-  ## is gcsafe and has a return type that is either 'void' or compatible
-  ## with ``FlowVar[T]``.
-
-proc pinnedSpawn*(id: ThreadId; call: expr): expr {.magic: "Spawn".}
-  ## always spawns a new task on the worker thread with ``id``, so that
-  ## the 'call' is **always** executed on
-  ## the this thread. 'call' has to be proc call 'p(...)' where 'p'
-  ## is gcsafe and has a return type that is either 'void' or compatible
-  ## with ``FlowVar[T]``.
-
-template spawnX*(call: expr): expr =
-  ## spawns a new task if a CPU core is ready, otherwise executes the
-  ## call in the calling thread. Usually it is advised to
-  ## use 'spawn' in order to not block the producer for an unknown
-  ## amount of time. 'call' has to be proc call 'p(...)' where 'p'
-  ## is gcsafe and has a return type that is either 'void' or compatible
-  ## with ``FlowVar[T]``.
+proc spawn*(call: sink typed) {.magic: "Spawn".} =
+  ## Always spawns a new task, so that the `call` is never executed on
+  ## the calling thread.
+  ##
+  ## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a
+  ## return type that is either `void` or compatible with `FlowVar[T]`.
+  discard "It uses `nimSpawn3` internally"
+
+proc pinnedSpawn*(id: ThreadId; call: sink typed) {.magic: "Spawn".} =
+  ## Always spawns a new task on the worker thread with `id`, so that
+  ## the `call` is **always** executed on the thread.
+  ##
+  ## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a
+  ## return type that is either `void` or compatible with `FlowVar[T]`.
+  discard "It uses `nimSpawn4` internally"
+
+template spawnX*(call) =
+  ## Spawns a new task if a CPU core is ready, otherwise executes the
+  ## call in the calling thread.
+  ##
+  ## Usually, it is advised to use the `spawn proc <#spawn,sinktyped>`_
+  ## in order to not block the producer for an unknown amount of time.
+  ##
+  ## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a
+  ## return type that is either 'void' or compatible with `FlowVar[T]`.
   (if preferSpawn(): spawn call else: call)
 
-proc parallel*(body: stmt) {.magic: "Parallel".}
-  ## a parallel section can be used to execute a block in parallel. ``body``
-  ## has to be in a DSL that is a particular subset of the language. Please
-  ## refer to the manual for further information.
+proc parallel*(body: untyped) {.magic: "Parallel".}
+  ## A parallel section can be used to execute a block in parallel.
+  ##
+  ## `body` has to be in a DSL that is a particular subset of the language.
+  ##
+  ## Please refer to `the manual <manual_experimental.html#parallel-amp-spawn>`_
+  ## for further information.
 
 var
   state: ThreadPoolState
@@ -421,18 +493,28 @@ var
 
 initLock stateLock
 
-proc nimSpawn3(fn: WorkerProc; data: pointer) {.compilerProc.} =
+proc nimSpawn3(fn: WorkerProc; data: pointer) {.compilerproc.} =
   # implementation of 'spawn' that is used by the code generator.
   while true:
     if selectWorker(readyWorker, fn, data): return
-    for i in 0.. <currentPoolSize:
+    for i in 0..<currentPoolSize:
       if selectWorker(addr(workersData[i]), fn, data): return
+
     # determine what to do, but keep in mind this is expensive too:
     # state.calls < maxPoolSize: warmup phase
     # (state.calls and 127) == 0: periodic check
     if state.calls < maxPoolSize or (state.calls and 127) == 0:
       # ensure the call to 'advice' is atomic:
       if tryAcquire(stateLock):
+        if currentPoolSize < minPoolSize:
+          if not workersData[currentPoolSize].initialized:
+            activateWorkerThread(currentPoolSize)
+          let w = addr(workersData[currentPoolSize])
+          atomicInc currentPoolSize
+          if selectWorker(w, fn, data):
+            release(stateLock)
+            return
+
         case advice(state)
         of doNothing: discard
         of doCreateThread:
@@ -460,37 +542,65 @@ proc nimSpawn3(fn: WorkerProc; data: pointer) {.compilerProc.} =
         # on the current thread instead.
         var self = addr(workersData[localThreadId-1])
         fn(self, data)
-        await(self.taskStarted)
+        blockUntil(self.taskStarted)
         return
-      else:
-        await(gSomeReady)
-    else:
-      await(gSomeReady)
+
+    if isSlave:
+      # Run under lock until `numSlavesWaiting` increment to avoid a
+      # race (otherwise two last threads might start waiting together)
+      withLock numSlavesLock:
+        if numSlavesRunning <= numSlavesWaiting + 1:
+          # All the other slaves are waiting
+          # If we wait now, we-re deadlocked until
+          # an external spawn happens !
+          if currentPoolSize < maxPoolSize:
+            if not workersData[currentPoolSize].initialized:
+              activateWorkerThread(currentPoolSize)
+            let w = addr(workersData[currentPoolSize])
+            atomicInc currentPoolSize
+            if selectWorker(w, fn, data):
+              return
+          else:
+            # There is no place in the pool. We're deadlocked.
+            # echo "Deadlock!"
+            discard
+
+        inc numSlavesWaiting
+
+    blockUntil(gSomeReady)
+
+    if isSlave:
+      withLock numSlavesLock:
+        dec numSlavesWaiting
 
 var
-  distinguishedLock: TLock
+  distinguishedLock: Lock
 
 initLock distinguishedLock
 
-proc nimSpawn4(fn: WorkerProc; data: pointer; id: ThreadId) {.compilerProc.} =
+proc nimSpawn4(fn: WorkerProc; data: pointer; id: ThreadId) {.compilerproc.} =
   acquire(distinguishedLock)
   if not distinguishedData[id].initialized:
     activateDistinguishedThread(id)
   release(distinguishedLock)
   while true:
     if selectWorker(addr(distinguishedData[id]), fn, data): break
-    await(distinguishedData[id].readyForTask)
+    blockUntil(distinguishedData[id].readyForTask)
 
 
 proc sync*() =
-  ## a simple barrier to wait for all spawn'ed tasks. If you need more elaborate
-  ## waiting, you have to use an explicit barrier.
+  ## A simple barrier to wait for all `spawn`ed tasks.
+  ##
+  ## If you need more elaborate waiting, you have to use an explicit barrier.
   while true:
     var allReady = true
-    for i in 0 .. <currentPoolSize:
+    for i in 0 ..< currentPoolSize:
       if not allReady: break
       allReady = allReady and workersData[i].ready
     if allReady: break
-    await(gSomeReady)
+    sleep(threadpoolWaitMs)
+    # We cannot "blockUntil(gSomeReady)" because workers may be shut down between
+    # the time we establish that some are not "ready" and the time we wait for a
+    # "signal(gSomeReady)" from inside "slave()" that can never come.
 
 setup()