diff options
Diffstat (limited to 'lib/pure/concurrency')
-rw-r--r-- | lib/pure/concurrency/cpuinfo.nim | 2 | ||||
-rw-r--r-- | lib/pure/concurrency/cpuload.nim | 12 | ||||
-rw-r--r-- | lib/pure/concurrency/threadpool.nim | 110 |
3 files changed, 104 insertions, 20 deletions
diff --git a/lib/pure/concurrency/cpuinfo.nim b/lib/pure/concurrency/cpuinfo.nim index 6f2bc4491..8c87c77df 100644 --- a/lib/pure/concurrency/cpuinfo.nim +++ b/lib/pure/concurrency/cpuinfo.nim @@ -18,7 +18,7 @@ when not defined(windows): when defined(linux): import linux - + when defined(freebsd) or defined(macosx): {.emit:"#include <sys/types.h>".} diff --git a/lib/pure/concurrency/cpuload.nim b/lib/pure/concurrency/cpuload.nim index c1796089a..22598b5c9 100644 --- a/lib/pure/concurrency/cpuload.nim +++ b/lib/pure/concurrency/cpuload.nim @@ -13,7 +13,7 @@ when defined(windows): import winlean, os, strutils, math - proc `-`(a, b: TFILETIME): int64 = a.rdFileTime - b.rdFileTime + proc `-`(a, b: FILETIME): int64 = a.rdFileTime - b.rdFileTime elif defined(linux): from cpuinfo import countProcessors @@ -25,16 +25,16 @@ type ThreadPoolState* = object when defined(windows): - prevSysKernel, prevSysUser, prevProcKernel, prevProcUser: TFILETIME + prevSysKernel, prevSysUser, prevProcKernel, prevProcUser: FILETIME calls*: int proc advice*(s: var ThreadPoolState): ThreadPoolAdvice = when defined(windows): var sysIdle, sysKernel, sysUser, - procCreation, procExit, procKernel, procUser: TFILETIME + procCreation, procExit, procKernel, procUser: FILETIME if getSystemTimes(sysIdle, sysKernel, sysUser) == 0 or - getProcessTimes(THandle(-1), procCreation, procExit, + getProcessTimes(Handle(-1), procCreation, procExit, procKernel, procUser) == 0: return doNothing if s.calls > 0: @@ -57,7 +57,7 @@ proc advice*(s: var ThreadPoolState): ThreadPoolAdvice = s.prevProcKernel = procKernel s.prevProcUser = procUser elif defined(linux): - proc fscanf(c: File, frmt: cstring) {.varargs, importc, + proc fscanf(c: File, frmt: cstring) {.varargs, importc, header: "<stdio.h>".} var f = open("/proc/loadavg") @@ -78,7 +78,7 @@ proc advice*(s: var ThreadPoolState): ThreadPoolAdvice = result = doNothing inc s.calls -when isMainModule: +when not defined(testing) and isMainModule: proc busyLoop() = while true: discard random(80) diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim index 9f1e53fb8..72e744d52 100644 --- a/lib/pure/concurrency/threadpool.nim +++ b/lib/pure/concurrency/threadpool.nim @@ -18,8 +18,8 @@ import cpuinfo, cpuload, locks type Semaphore = object - c: TCond - L: TLock + c: Cond + L: Lock counter: int proc createSemaphore(): Semaphore = @@ -113,7 +113,7 @@ type ToFreeQueue = object len: int - lock: TLock + lock: Lock empty: Semaphore data: array[128, pointer] @@ -128,6 +128,7 @@ type initialized: bool # whether it has even been initialized shutdown: bool # the pool requests to shut down this worker thread q: ToFreeQueue + readyForTask: Semaphore proc await*(fv: FlowVarBase) = ## waits until the value for the flowVar arrives. Usually it is not necessary @@ -221,11 +222,17 @@ proc awaitAndThen*[T](fv: FlowVar[T]; action: proc (x: T) {.closure.}) = action(fv.blob) finished(fv) -proc `^`*[T](fv: FlowVar[ref T]): foreign ptr T = +proc unsafeRead*[T](fv: FlowVar[ref T]): foreign ptr T = ## blocks until the value is available and then returns this value. await(fv) result = cast[foreign ptr T](fv.data) +proc `^`*[T](fv: FlowVar[ref T]): ref T = + ## blocks until the value is available and then returns this value. + await(fv) + let src = cast[ref T](fv.data) + deepCopy result, src + proc `^`*[T](fv: FlowVar[T]): T = ## blocks until the value is available and then returns this value. await(fv) @@ -267,6 +274,10 @@ proc nimArgsPassingDone(p: pointer) {.compilerProc.} = const MaxThreadPoolSize* = 256 ## maximal size of the thread pool. 256 threads ## should be good enough for anybody ;-) + MaxDistinguishedThread* = 32 ## maximal number of "distinguished" threads. + +type + ThreadId* = range[0..MaxDistinguishedThread-1] var currentPoolSize: int @@ -284,23 +295,43 @@ proc slave(w: ptr Worker) {.thread.} = readyWorker = w signal(gSomeReady) await(w.taskArrived) - assert(not w.ready) + # XXX Somebody needs to look into this (why does this assertion fail + # in Visual Studio?) + when not defined(vcc): assert(not w.ready) w.f(w, w.data) if w.q.len != 0: w.cleanFlowVars if w.shutdown: w.shutdown = false atomicDec currentPoolSize +proc distinguishedSlave(w: ptr Worker) {.thread.} = + while true: + when declared(atomicStoreN): + atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST) + else: + w.ready = true + signal(w.readyForTask) + await(w.taskArrived) + assert(not w.ready) + w.f(w, w.data) + if w.q.len != 0: w.cleanFlowVars + var workers: array[MaxThreadPoolSize, TThread[ptr Worker]] workersData: array[MaxThreadPoolSize, Worker] + distinguished: array[MaxDistinguishedThread, TThread[ptr Worker]] + distinguishedData: array[MaxDistinguishedThread, Worker] + +when defined(nimPinToCpu): + var gCpus: Natural + proc setMinPoolSize*(size: range[1..MaxThreadPoolSize]) = ## sets the minimal thread pool size. The default value of this is 4. minPoolSize = size proc setMaxPoolSize*(size: range[1..MaxThreadPoolSize]) = - ## sets the minimal thread pool size. The default value of this + ## sets the maximal thread pool size. The default value of this ## is ``MaxThreadPoolSize``. maxPoolSize = size if currentPoolSize > maxPoolSize: @@ -308,18 +339,37 @@ proc setMaxPoolSize*(size: range[1..MaxThreadPoolSize]) = let w = addr(workersData[i]) w.shutdown = true -proc activateThread(i: int) {.noinline.} = +when defined(nimRecursiveSpawn): + var localThreadId {.threadvar.}: int + +proc activateWorkerThread(i: int) {.noinline.} = workersData[i].taskArrived = createSemaphore() workersData[i].taskStarted = createSemaphore() workersData[i].initialized = true workersData[i].q.empty = createSemaphore() initLock(workersData[i].q.lock) createThread(workers[i], slave, addr(workersData[i])) + when defined(nimRecursiveSpawn): + localThreadId = i+1 + when defined(nimPinToCpu): + if gCpus > 0: pinToCpu(workers[i], i mod gCpus) + +proc activateDistinguishedThread(i: int) {.noinline.} = + distinguishedData[i].taskArrived = createSemaphore() + distinguishedData[i].taskStarted = createSemaphore() + distinguishedData[i].initialized = true + distinguishedData[i].q.empty = createSemaphore() + initLock(distinguishedData[i].q.lock) + distinguishedData[i].readyForTask = createSemaphore() + createThread(distinguished[i], distinguishedSlave, addr(distinguishedData[i])) proc setup() = - currentPoolSize = min(countProcessors(), MaxThreadPoolSize) + let p = countProcessors() + when defined(nimPinToCpu): + gCpus = p + currentPoolSize = min(p, MaxThreadPoolSize) readyWorker = addr(workersData[0]) - for i in 0.. <currentPoolSize: activateThread(i) + for i in 0.. <currentPoolSize: activateWorkerThread(i) proc preferSpawn*(): bool = ## Use this proc to determine quickly if a 'spawn' or a direct call is @@ -333,6 +383,13 @@ proc spawn*(call: expr): expr {.magic: "Spawn".} ## is gcsafe and has a return type that is either 'void' or compatible ## with ``FlowVar[T]``. +proc pinnedSpawn*(id: ThreadId; call: expr): expr {.magic: "Spawn".} + ## always spawns a new task on the worker thread with ``id``, so that + ## the 'call' is **always** executed on + ## the this thread. 'call' has to be proc call 'p(...)' where 'p' + ## is gcsafe and has a return type that is either 'void' or compatible + ## with ``FlowVar[T]``. + template spawnX*(call: expr): expr = ## spawns a new task if a CPU core is ready, otherwise executes the ## call in the calling thread. Usually it is advised to @@ -349,11 +406,11 @@ proc parallel*(body: stmt) {.magic: "Parallel".} var state: ThreadPoolState - stateLock: TLock + stateLock: Lock initLock stateLock -proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} = +proc nimSpawn3(fn: WorkerProc; data: pointer) {.compilerProc.} = # implementation of 'spawn' that is used by the code generator. while true: if selectWorker(readyWorker, fn, data): return @@ -370,7 +427,7 @@ proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} = of doCreateThread: if currentPoolSize < maxPoolSize: if not workersData[currentPoolSize].initialized: - activateThread(currentPoolSize) + activateWorkerThread(currentPoolSize) let w = addr(workersData[currentPoolSize]) atomicInc currentPoolSize if selectWorker(w, fn, data): @@ -385,7 +442,34 @@ proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} = release(stateLock) # else the acquire failed, but this means some # other thread succeeded, so we don't need to do anything here. - await(gSomeReady) + when defined(nimRecursiveSpawn): + if localThreadId > 0: + # we are a worker thread, so instead of waiting for something which + # might as well never happen (see tparallel_quicksort), we run the task + # on the current thread instead. + var self = addr(workersData[localThreadId-1]) + fn(self, data) + await(self.taskStarted) + return + else: + await(gSomeReady) + else: + await(gSomeReady) + +var + distinguishedLock: TLock + +initLock distinguishedLock + +proc nimSpawn4(fn: WorkerProc; data: pointer; id: ThreadId) {.compilerProc.} = + acquire(distinguishedLock) + if not distinguishedData[id].initialized: + activateDistinguishedThread(id) + release(distinguishedLock) + while true: + if selectWorker(addr(distinguishedData[id]), fn, data): break + await(distinguishedData[id].readyForTask) + proc sync*() = ## a simple barrier to wait for all spawn'ed tasks. If you need more elaborate |