diff options
author | Araq <rumpf_a@web.de> | 2015-05-28 12:42:04 +0200 |
---|---|---|
committer | Araq <rumpf_a@web.de> | 2015-05-28 12:42:04 +0200 |
commit | 94f54700c4bfcc04966e0ea011960c5432b29092 (patch) | |
tree | 44dc92a1f7a570dd87a7a366bdd2e3fbb9030902 /lib/pure/concurrency | |
parent | 75ccdebd1e8ea9ffd81815716c2593cfbc9820c0 (diff) | |
download | Nim-94f54700c4bfcc04966e0ea011960c5432b29092.tar.gz |
first implementation of pinnedSpawn
Diffstat (limited to 'lib/pure/concurrency')
-rw-r--r-- | lib/pure/concurrency/threadpool.nim | 56 |
1 files changed, 52 insertions, 4 deletions
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim index 9f1e53fb8..10117183a 100644 --- a/lib/pure/concurrency/threadpool.nim +++ b/lib/pure/concurrency/threadpool.nim @@ -267,6 +267,10 @@ proc nimArgsPassingDone(p: pointer) {.compilerProc.} = const MaxThreadPoolSize* = 256 ## maximal size of the thread pool. 256 threads ## should be good enough for anybody ;-) + MaxDistinguishedThread* = 32 ## maximal number of "distinguished" threads. + +type + ThreadId* = range[0..MaxDistinguishedThread-1] var currentPoolSize: int @@ -291,10 +295,24 @@ proc slave(w: ptr Worker) {.thread.} = w.shutdown = false atomicDec currentPoolSize +proc distinguishedSlave(w: ptr Worker) {.thread.} = + while true: + when declared(atomicStoreN): + atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST) + else: + w.ready = true + await(w.taskArrived) + assert(not w.ready) + w.f(w, w.data) + if w.q.len != 0: w.cleanFlowVars + var workers: array[MaxThreadPoolSize, TThread[ptr Worker]] workersData: array[MaxThreadPoolSize, Worker] + distinguished: array[MaxDistinguishedThread, TThread[ptr Worker]] + distinguishedData: array[MaxDistinguishedThread, Worker] + proc setMinPoolSize*(size: range[1..MaxThreadPoolSize]) = ## sets the minimal thread pool size. The default value of this is 4. minPoolSize = size @@ -308,7 +326,7 @@ proc setMaxPoolSize*(size: range[1..MaxThreadPoolSize]) = let w = addr(workersData[i]) w.shutdown = true -proc activateThread(i: int) {.noinline.} = +proc activateWorkerThread(i: int) {.noinline.} = workersData[i].taskArrived = createSemaphore() workersData[i].taskStarted = createSemaphore() workersData[i].initialized = true @@ -316,10 +334,18 @@ proc activateThread(i: int) {.noinline.} = initLock(workersData[i].q.lock) createThread(workers[i], slave, addr(workersData[i])) +proc activateDistinguishedThread(i: int) {.noinline.} = + distinguishedData[i].taskArrived = createSemaphore() + distinguishedData[i].taskStarted = createSemaphore() + distinguishedData[i].initialized = true + distinguishedData[i].q.empty = createSemaphore() + initLock(distinguishedData[i].q.lock) + createThread(distinguished[i], distinguishedSlave, addr(distinguishedData[i])) + proc setup() = currentPoolSize = min(countProcessors(), MaxThreadPoolSize) readyWorker = addr(workersData[0]) - for i in 0.. <currentPoolSize: activateThread(i) + for i in 0.. <currentPoolSize: activateWorkerThread(i) proc preferSpawn*(): bool = ## Use this proc to determine quickly if a 'spawn' or a direct call is @@ -333,6 +359,13 @@ proc spawn*(call: expr): expr {.magic: "Spawn".} ## is gcsafe and has a return type that is either 'void' or compatible ## with ``FlowVar[T]``. +proc pinnedSpawn*(id: ThreadId; call: expr): expr {.magic: "Spawn".} + ## always spawns a new task on the worker thread with ``id``, so that + ## the 'call' is **always** executed on + ## the this thread. 'call' has to be proc call 'p(...)' where 'p' + ## is gcsafe and has a return type that is either 'void' or compatible + ## with ``FlowVar[T]``. + template spawnX*(call: expr): expr = ## spawns a new task if a CPU core is ready, otherwise executes the ## call in the calling thread. Usually it is advised to @@ -353,7 +386,7 @@ var initLock stateLock -proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} = +proc nimSpawn3(fn: WorkerProc; data: pointer) {.compilerProc.} = # implementation of 'spawn' that is used by the code generator. while true: if selectWorker(readyWorker, fn, data): return @@ -370,7 +403,7 @@ proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} = of doCreateThread: if currentPoolSize < maxPoolSize: if not workersData[currentPoolSize].initialized: - activateThread(currentPoolSize) + activateWorkerThread(currentPoolSize) let w = addr(workersData[currentPoolSize]) atomicInc currentPoolSize if selectWorker(w, fn, data): @@ -387,6 +420,21 @@ proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} = # other thread succeeded, so we don't need to do anything here. await(gSomeReady) +var + distinguishedLock: TLock + +initLock distinguishedLock + +proc nimSpawn4(fn: WorkerProc; data: pointer; id: ThreadId) {.compilerProc.} = + acquire(distinguishedLock) + if not distinguishedData[id].initialized: + activateDistinguishedThread(id) + while true: + if selectWorker(addr(distinguishedData[id]), fn, data): break + cpuRelax() + # XXX exponential backoff? + release(distinguishedLock) + proc sync*() = ## a simple barrier to wait for all spawn'ed tasks. If you need more elaborate ## waiting, you have to use an explicit barrier. |