diff options
Diffstat (limited to 'lib/pure/concurrency/threadpool.nim')
-rw-r--r-- | lib/pure/concurrency/threadpool.nim | 26 |
1 files changed, 18 insertions, 8 deletions
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim index cf4f58588..a5eaec86e 100644 --- a/lib/pure/concurrency/threadpool.nim +++ b/lib/pure/concurrency/threadpool.nim @@ -149,7 +149,7 @@ proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool = proc cleanFlowVars(w: ptr Worker) = let q = addr(w.q) acquire(q.lock) - for i in 0 .. <q.len: + for i in 0 ..< q.len: GC_unref(cast[RootRef](q.data[i])) #echo "GC_unref" q.len = 0 @@ -401,7 +401,7 @@ proc setup() = gCpus = p currentPoolSize = min(p, MaxThreadPoolSize) readyWorker = addr(workersData[0]) - for i in 0.. <currentPoolSize: activateWorkerThread(i) + for i in 0..<currentPoolSize: activateWorkerThread(i) proc preferSpawn*(): bool = ## Use this proc to determine quickly if a 'spawn' or a direct call is @@ -409,20 +409,20 @@ proc preferSpawn*(): bool = ## it is not necessary to call this directly; use 'spawnX' instead. result = gSomeReady.counter > 0 -proc spawn*(call: expr): expr {.magic: "Spawn".} +proc spawn*(call: typed): void {.magic: "Spawn".} ## always spawns a new task, so that the 'call' is never executed on ## the calling thread. 'call' has to be proc call 'p(...)' where 'p' ## is gcsafe and has a return type that is either 'void' or compatible ## with ``FlowVar[T]``. -proc pinnedSpawn*(id: ThreadId; call: expr): expr {.magic: "Spawn".} +proc pinnedSpawn*(id: ThreadId; call: typed): void {.magic: "Spawn".} ## always spawns a new task on the worker thread with ``id``, so that ## the 'call' is **always** executed on ## the thread. 'call' has to be proc call 'p(...)' where 'p' ## is gcsafe and has a return type that is either 'void' or compatible ## with ``FlowVar[T]``. -template spawnX*(call: expr): expr = +template spawnX*(call): void = ## spawns a new task if a CPU core is ready, otherwise executes the ## call in the calling thread. Usually it is advised to ## use 'spawn' in order to not block the producer for an unknown @@ -431,7 +431,7 @@ template spawnX*(call: expr): expr = ## with ``FlowVar[T]``. (if preferSpawn(): spawn call else: call) -proc parallel*(body: stmt) {.magic: "Parallel".} +proc parallel*(body: untyped) {.magic: "Parallel".} ## a parallel section can be used to execute a block in parallel. ``body`` ## has to be in a DSL that is a particular subset of the language. Please ## refer to the manual for further information. @@ -446,14 +446,24 @@ proc nimSpawn3(fn: WorkerProc; data: pointer) {.compilerProc.} = # implementation of 'spawn' that is used by the code generator. while true: if selectWorker(readyWorker, fn, data): return - for i in 0.. <currentPoolSize: + for i in 0..<currentPoolSize: if selectWorker(addr(workersData[i]), fn, data): return + # determine what to do, but keep in mind this is expensive too: # state.calls < maxPoolSize: warmup phase # (state.calls and 127) == 0: periodic check if state.calls < maxPoolSize or (state.calls and 127) == 0: # ensure the call to 'advice' is atomic: if tryAcquire(stateLock): + if currentPoolSize < minPoolSize: + if not workersData[currentPoolSize].initialized: + activateWorkerThread(currentPoolSize) + let w = addr(workersData[currentPoolSize]) + atomicInc currentPoolSize + if selectWorker(w, fn, data): + release(stateLock) + return + case advice(state) of doNothing: discard of doCreateThread: @@ -533,7 +543,7 @@ proc sync*() = var toRelease = 0 while true: var allReady = true - for i in 0 .. <currentPoolSize: + for i in 0 ..< currentPoolSize: if not allReady: break allReady = allReady and workersData[i].ready if allReady: break |