From 94f54700c4bfcc04966e0ea011960c5432b29092 Mon Sep 17 00:00:00 2001 From: Araq Date: Thu, 28 May 2015 12:42:04 +0200 Subject: first implementation of pinnedSpawn --- lib/pure/concurrency/threadpool.nim | 56 ++++++++++++++++++++++++++++++++++--- 1 file changed, 52 insertions(+), 4 deletions(-) (limited to 'lib/pure/concurrency/threadpool.nim') diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim index 9f1e53fb8..10117183a 100644 --- a/lib/pure/concurrency/threadpool.nim +++ b/lib/pure/concurrency/threadpool.nim @@ -267,6 +267,10 @@ proc nimArgsPassingDone(p: pointer) {.compilerProc.} = const MaxThreadPoolSize* = 256 ## maximal size of the thread pool. 256 threads ## should be good enough for anybody ;-) + MaxDistinguishedThread* = 32 ## maximal number of "distinguished" threads. + +type + ThreadId* = range[0..MaxDistinguishedThread-1] var currentPoolSize: int @@ -291,10 +295,24 @@ proc slave(w: ptr Worker) {.thread.} = w.shutdown = false atomicDec currentPoolSize +proc distinguishedSlave(w: ptr Worker) {.thread.} = + while true: + when declared(atomicStoreN): + atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST) + else: + w.ready = true + await(w.taskArrived) + assert(not w.ready) + w.f(w, w.data) + if w.q.len != 0: w.cleanFlowVars + var workers: array[MaxThreadPoolSize, TThread[ptr Worker]] workersData: array[MaxThreadPoolSize, Worker] + distinguished: array[MaxDistinguishedThread, TThread[ptr Worker]] + distinguishedData: array[MaxDistinguishedThread, Worker] + proc setMinPoolSize*(size: range[1..MaxThreadPoolSize]) = ## sets the minimal thread pool size. The default value of this is 4. minPoolSize = size @@ -308,7 +326,7 @@ proc setMaxPoolSize*(size: range[1..MaxThreadPoolSize]) = let w = addr(workersData[i]) w.shutdown = true -proc activateThread(i: int) {.noinline.} = +proc activateWorkerThread(i: int) {.noinline.} = workersData[i].taskArrived = createSemaphore() workersData[i].taskStarted = createSemaphore() workersData[i].initialized = true @@ -316,10 +334,18 @@ proc activateThread(i: int) {.noinline.} = initLock(workersData[i].q.lock) createThread(workers[i], slave, addr(workersData[i])) +proc activateDistinguishedThread(i: int) {.noinline.} = + distinguishedData[i].taskArrived = createSemaphore() + distinguishedData[i].taskStarted = createSemaphore() + distinguishedData[i].initialized = true + distinguishedData[i].q.empty = createSemaphore() + initLock(distinguishedData[i].q.lock) + createThread(distinguished[i], distinguishedSlave, addr(distinguishedData[i])) + proc setup() = currentPoolSize = min(countProcessors(), MaxThreadPoolSize) readyWorker = addr(workersData[0]) - for i in 0..