diff options
author | Araq <rumpf_a@web.de> | 2015-06-30 12:50:24 +0200 |
---|---|---|
committer | Araq <rumpf_a@web.de> | 2015-06-30 12:50:24 +0200 |
commit | 28de800d6148065fd3e6344f7255e793298be399 (patch) | |
tree | 8aec27c13cd99be3c9e3d3abef45e1d183db996b /lib/pure/concurrency | |
parent | 4cfe216a776ffef61380c1c5f2d61aff7315c122 (diff) | |
parent | 3312d49a489e50e5c5f2275f7c0e400208eb8a5d (diff) | |
download | Nim-28de800d6148065fd3e6344f7255e793298be399.tar.gz |
Merge branch 'more_concurrency' into devel
Conflicts: doc/tut1.txt lib/core/locks.nim lib/pure/collections/tables.nim lib/pure/selectors.nim
Diffstat (limited to 'lib/pure/concurrency')
-rw-r--r-- | lib/pure/concurrency/threadpool.nim | 59 |
1 files changed, 55 insertions, 4 deletions
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim index 247aa377c..7c9d8adfd 100644 --- a/lib/pure/concurrency/threadpool.nim +++ b/lib/pure/concurrency/threadpool.nim @@ -128,6 +128,7 @@ type initialized: bool # whether it has even been initialized shutdown: bool # the pool requests to shut down this worker thread q: ToFreeQueue + readyForTask: Semaphore proc await*(fv: FlowVarBase) = ## waits until the value for the flowVar arrives. Usually it is not necessary @@ -273,6 +274,10 @@ proc nimArgsPassingDone(p: pointer) {.compilerProc.} = const MaxThreadPoolSize* = 256 ## maximal size of the thread pool. 256 threads ## should be good enough for anybody ;-) + MaxDistinguishedThread* = 32 ## maximal number of "distinguished" threads. + +type + ThreadId* = range[0..MaxDistinguishedThread-1] var currentPoolSize: int @@ -298,10 +303,25 @@ proc slave(w: ptr Worker) {.thread.} = w.shutdown = false atomicDec currentPoolSize +proc distinguishedSlave(w: ptr Worker) {.thread.} = + while true: + when declared(atomicStoreN): + atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST) + else: + w.ready = true + signal(w.readyForTask) + await(w.taskArrived) + assert(not w.ready) + w.f(w, w.data) + if w.q.len != 0: w.cleanFlowVars + var workers: array[MaxThreadPoolSize, TThread[ptr Worker]] workersData: array[MaxThreadPoolSize, Worker] + distinguished: array[MaxDistinguishedThread, TThread[ptr Worker]] + distinguishedData: array[MaxDistinguishedThread, Worker] + proc setMinPoolSize*(size: range[1..MaxThreadPoolSize]) = ## sets the minimal thread pool size. The default value of this is 4. minPoolSize = size @@ -315,7 +335,7 @@ proc setMaxPoolSize*(size: range[1..MaxThreadPoolSize]) = let w = addr(workersData[i]) w.shutdown = true -proc activateThread(i: int) {.noinline.} = +proc activateWorkerThread(i: int) {.noinline.} = workersData[i].taskArrived = createSemaphore() workersData[i].taskStarted = createSemaphore() workersData[i].initialized = true @@ -323,10 +343,19 @@ proc activateThread(i: int) {.noinline.} = initLock(workersData[i].q.lock) createThread(workers[i], slave, addr(workersData[i])) +proc activateDistinguishedThread(i: int) {.noinline.} = + distinguishedData[i].taskArrived = createSemaphore() + distinguishedData[i].taskStarted = createSemaphore() + distinguishedData[i].initialized = true + distinguishedData[i].q.empty = createSemaphore() + initLock(distinguishedData[i].q.lock) + distinguishedData[i].readyForTask = createSemaphore() + createThread(distinguished[i], distinguishedSlave, addr(distinguishedData[i])) + proc setup() = currentPoolSize = min(countProcessors(), MaxThreadPoolSize) readyWorker = addr(workersData[0]) - for i in 0.. <currentPoolSize: activateThread(i) + for i in 0.. <currentPoolSize: activateWorkerThread(i) proc preferSpawn*(): bool = ## Use this proc to determine quickly if a 'spawn' or a direct call is @@ -340,6 +369,13 @@ proc spawn*(call: expr): expr {.magic: "Spawn".} ## is gcsafe and has a return type that is either 'void' or compatible ## with ``FlowVar[T]``. +proc pinnedSpawn*(id: ThreadId; call: expr): expr {.magic: "Spawn".} + ## always spawns a new task on the worker thread with ``id``, so that + ## the 'call' is **always** executed on + ## the this thread. 'call' has to be proc call 'p(...)' where 'p' + ## is gcsafe and has a return type that is either 'void' or compatible + ## with ``FlowVar[T]``. + template spawnX*(call: expr): expr = ## spawns a new task if a CPU core is ready, otherwise executes the ## call in the calling thread. Usually it is advised to @@ -360,7 +396,7 @@ var initLock stateLock -proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} = +proc nimSpawn3(fn: WorkerProc; data: pointer) {.compilerProc.} = # implementation of 'spawn' that is used by the code generator. while true: if selectWorker(readyWorker, fn, data): return @@ -377,7 +413,7 @@ proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} = of doCreateThread: if currentPoolSize < maxPoolSize: if not workersData[currentPoolSize].initialized: - activateThread(currentPoolSize) + activateWorkerThread(currentPoolSize) let w = addr(workersData[currentPoolSize]) atomicInc currentPoolSize if selectWorker(w, fn, data): @@ -394,6 +430,21 @@ proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} = # other thread succeeded, so we don't need to do anything here. await(gSomeReady) +var + distinguishedLock: TLock + +initLock distinguishedLock + +proc nimSpawn4(fn: WorkerProc; data: pointer; id: ThreadId) {.compilerProc.} = + acquire(distinguishedLock) + if not distinguishedData[id].initialized: + activateDistinguishedThread(id) + release(distinguishedLock) + while true: + if selectWorker(addr(distinguishedData[id]), fn, data): break + await(distinguishedData[id].readyForTask) + + proc sync*() = ## a simple barrier to wait for all spawn'ed tasks. If you need more elaborate ## waiting, you have to use an explicit barrier. |