summary refs log tree commit diff stats
path: root/lib/pure/concurrency/threadpool.nim
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pure/concurrency/threadpool.nim')
-rw-r--r--lib/pure/concurrency/threadpool.nim26
1 files changed, 18 insertions, 8 deletions
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim
index cf4f58588..a5eaec86e 100644
--- a/lib/pure/concurrency/threadpool.nim
+++ b/lib/pure/concurrency/threadpool.nim
@@ -149,7 +149,7 @@ proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool =
 proc cleanFlowVars(w: ptr Worker) =
   let q = addr(w.q)
   acquire(q.lock)
-  for i in 0 .. <q.len:
+  for i in 0 ..< q.len:
     GC_unref(cast[RootRef](q.data[i]))
     #echo "GC_unref"
   q.len = 0
@@ -401,7 +401,7 @@ proc setup() =
     gCpus = p
   currentPoolSize = min(p, MaxThreadPoolSize)
   readyWorker = addr(workersData[0])
-  for i in 0.. <currentPoolSize: activateWorkerThread(i)
+  for i in 0..<currentPoolSize: activateWorkerThread(i)
 
 proc preferSpawn*(): bool =
   ## Use this proc to determine quickly if a 'spawn' or a direct call is
@@ -409,20 +409,20 @@ proc preferSpawn*(): bool =
   ## it is not necessary to call this directly; use 'spawnX' instead.
   result = gSomeReady.counter > 0
 
-proc spawn*(call: expr): expr {.magic: "Spawn".}
+proc spawn*(call: typed): void {.magic: "Spawn".}
   ## always spawns a new task, so that the 'call' is never executed on
   ## the calling thread. 'call' has to be proc call 'p(...)' where 'p'
   ## is gcsafe and has a return type that is either 'void' or compatible
   ## with ``FlowVar[T]``.
 
-proc pinnedSpawn*(id: ThreadId; call: expr): expr {.magic: "Spawn".}
+proc pinnedSpawn*(id: ThreadId; call: typed): void {.magic: "Spawn".}
   ## always spawns a new task on the worker thread with ``id``, so that
   ## the 'call' is **always** executed on
   ## the thread. 'call' has to be proc call 'p(...)' where 'p'
   ## is gcsafe and has a return type that is either 'void' or compatible
   ## with ``FlowVar[T]``.
 
-template spawnX*(call: expr): expr =
+template spawnX*(call): void =
   ## spawns a new task if a CPU core is ready, otherwise executes the
   ## call in the calling thread. Usually it is advised to
   ## use 'spawn' in order to not block the producer for an unknown
@@ -431,7 +431,7 @@ template spawnX*(call: expr): expr =
   ## with ``FlowVar[T]``.
   (if preferSpawn(): spawn call else: call)
 
-proc parallel*(body: stmt) {.magic: "Parallel".}
+proc parallel*(body: untyped) {.magic: "Parallel".}
   ## a parallel section can be used to execute a block in parallel. ``body``
   ## has to be in a DSL that is a particular subset of the language. Please
   ## refer to the manual for further information.
@@ -446,14 +446,24 @@ proc nimSpawn3(fn: WorkerProc; data: pointer) {.compilerProc.} =
   # implementation of 'spawn' that is used by the code generator.
   while true:
     if selectWorker(readyWorker, fn, data): return
-    for i in 0.. <currentPoolSize:
+    for i in 0..<currentPoolSize:
       if selectWorker(addr(workersData[i]), fn, data): return
+
     # determine what to do, but keep in mind this is expensive too:
     # state.calls < maxPoolSize: warmup phase
     # (state.calls and 127) == 0: periodic check
     if state.calls < maxPoolSize or (state.calls and 127) == 0:
       # ensure the call to 'advice' is atomic:
       if tryAcquire(stateLock):
+        if currentPoolSize < minPoolSize:
+          if not workersData[currentPoolSize].initialized:
+            activateWorkerThread(currentPoolSize)
+          let w = addr(workersData[currentPoolSize])
+          atomicInc currentPoolSize
+          if selectWorker(w, fn, data):
+            release(stateLock)
+            return
+
         case advice(state)
         of doNothing: discard
         of doCreateThread:
@@ -533,7 +543,7 @@ proc sync*() =
   var toRelease = 0
   while true:
     var allReady = true
-    for i in 0 .. <currentPoolSize:
+    for i in 0 ..< currentPoolSize:
       if not allReady: break
       allReady = allReady and workersData[i].ready
     if allReady: break