diff options
Diffstat (limited to 'lib/pure/concurrency/threadpool.nim')
-rw-r--r-- | lib/pure/concurrency/threadpool.nim | 24 |
1 files changed, 16 insertions, 8 deletions
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim index fd1041918..f46822d94 100644 --- a/lib/pure/concurrency/threadpool.nim +++ b/lib/pure/concurrency/threadpool.nim @@ -9,6 +9,9 @@ ## Implements Nimrod's 'spawn'. +when not compileOption("threads"): + {.error: "Threadpool requires --threads:on option.".} + import cpuinfo, cpuload, locks {.push stackTrace:off.} @@ -92,7 +95,7 @@ type FlowVarBase* = ref FlowVarBaseObj ## untyped base class for 'FlowVar[T]' FlowVarBaseObj = object of TObject - ready, usesCondVar: bool + ready, usesCondVar, awaited: bool cv: CondVar #\ # for 'awaitAny' support ai: ptr AwaitInfo @@ -126,15 +129,15 @@ type proc await*(fv: FlowVarBase) = ## waits until the value for the flowVar arrives. Usually it is not necessary ## to call this explicitly. - if fv.usesCondVar: - fv.usesCondVar = false + if fv.usesCondVar and not fv.awaited: + fv.awaited = true await(fv.cv) destroyCondVar(fv.cv) proc finished(fv: FlowVarBase) = doAssert fv.ai.isNil, "flowVar is still attached to an 'awaitAny'" # we have to protect against the rare cases where the owner of the flowVar - # simply disregards the flowVar and yet the "flowVarr" has not yet written + # simply disregards the flowVar and yet the "flowVar" has not yet written # anything to it: await(fv) if fv.data.isNil: return @@ -207,6 +210,7 @@ proc `^`*[T](fv: FlowVar[T]): T = ## blocks until the value is available and then returns this value. await(fv) when T is string or T is seq: + # XXX closures? deepCopy? result = cast[T](fv.data) else: result = fv.blob @@ -264,6 +268,10 @@ proc slave(w: ptr Worker) {.thread.} = w.shutdown = false atomicDec currentPoolSize +var + workers: array[MaxThreadPoolSize, TThread[ptr Worker]] + workersData: array[MaxThreadPoolSize, Worker] + proc setMinPoolSize*(size: range[1..MaxThreadPoolSize]) = ## sets the minimal thread pool size. The default value of this is 4. minPoolSize = size @@ -272,10 +280,10 @@ proc setMaxPoolSize*(size: range[1..MaxThreadPoolSize]) = ## sets the minimal thread pool size. The default value of this ## is ``MaxThreadPoolSize``. maxPoolSize = size - -var - workers: array[MaxThreadPoolSize, TThread[ptr Worker]] - workersData: array[MaxThreadPoolSize, Worker] + if currentPoolSize > maxPoolSize: + for i in maxPoolSize..currentPoolSize-1: + let w = addr(workersData[i]) + w.shutdown = true proc activateThread(i: int) {.noinline.} = workersData[i].taskArrived = createCondVar() |