# # # Nimrod's Runtime Library # (c) Copyright 2014 Andreas Rumpf # # See the file "copying.txt", included in this # distribution, for details about the copyright. # ## Implements Nimrod's 'spawn'. when not compileOption("threads"): {.error: "Threadpool requires --threads:on option.".} import cpuinfo, cpuload, locks {.push stackTrace:off.} type CondVar = object c: TCond L: TLock counter: int proc createCondVar(): CondVar = initCond(result.c) initLock(result.L) proc destroyCondVar(cv: var CondVar) {.inline.} = deinitCond(cv.c) deinitLock(cv.L) proc await(cv: var CondVar) = acquire(cv.L) while cv.counter <= 0: wait(cv.c, cv.L) dec cv.counter release(cv.L) proc signal(cv: var CondVar) = acquire(cv.L) inc cv.counter release(cv.L) signal(cv.c) const CacheLineSize = 32 # true for most archs type Barrier {.compilerProc.} = object entered: int cv: CondVar # condvar takes 3 words at least when sizeof(int) < 8: cacheAlign: array[CacheLineSize-4*sizeof(int), byte] left: int cacheAlign2: array[CacheLineSize-sizeof(int), byte] interest: bool ## wether the master is interested in the "all done" event proc barrierEnter(b: ptr Barrier) {.compilerProc, inline.} = # due to the signaling between threads, it is ensured we are the only # one with access to 'entered' so we don't need 'atomicInc' here: inc b.entered # also we need no 'fence' instructions here as soon 'nimArgsPassingDone' # will be called which already will perform a fence for us. proc barrierLeave(b: ptr Barrier) {.compilerProc, inline.} = atomicInc b.left when not defined(x86): fence() if b.interest and b.left == b.entered: signal(b.cv) proc openBarrier(b: ptr Barrier) {.compilerProc, inline.} = b.entered = 0 b.left = 0 b.interest = false proc closeBarrier(b: ptr Barrier) {.compilerProc.} = fence() if b.left != b.entered: b.cv = createCondVar() fence() b.interest = true fence() while b.left != b.entered: await(b.cv) destroyCondVar(b.cv) {.pop.} # ---------------------------------------------------------------------------- type foreign* = object ## a region that indicates the pointer comes from a ## foreign thread heap. AwaitInfo = object cv: CondVar idx: int FlowVarBase* = ref FlowVarBaseObj ## untyped base class for 'FlowVar[T]' FlowVarBaseObj = object of TObject ready, usesCondVar: bool cv: CondVar #\ # for 'awaitAny' support ai: ptr AwaitInfo idx: int data: pointer # we incRef and unref it to keep it alive owner: pointer # ptr Worker FlowVarObj[T] = object of FlowVarBaseObj blob: T FlowVar*{.compilerProc.}[T] = ref FlowVarObj[T] ## a data flow variable ToFreeQueue = object len: int lock: TLock empty: TCond data: array[512, pointer] WorkerProc = proc (thread, args: pointer) {.nimcall, gcsafe.} Worker = object taskArrived: CondVar taskStarted: CondVar #\ # task data: f: WorkerProc data: pointer ready: bool # put it here for correct alignment! initialized: bool # whether it has even been initialized shutdown: bool # the pool requests to shut down this worker thread q: ToFreeQueue proc await*(fv: FlowVarBase) = ## waits until the value for the flowVar arrives. Usually it is not necessary ## to call this explicitly. if fv.usesCondVar: fv.usesCondVar = false await(fv.cv) destroyCondVar(fv.cv) proc finished(fv: FlowVarBase) = doAssert fv.ai.isNil, "flowVar is still attached to an 'awaitAny'" # we have to protect against the rare cases where the owner of the flowVar # simply disregards the flowVar and yet the "flowVar" has not yet written # anything to it: await(fv) if fv.data.isNil: return let owner = cast[ptr Worker](fv.owner) let q = addr(owner.q) var waited = false while true: acquire(q.lock) if q.len < q.data.len: q.data[q.len] = fv.data inc q.len release(q.lock) break else: # the queue is exhausted! We block until it has been cleaned: release(q.lock) wait(q.empty, q.lock) waited = true fv.data = nil # wakeup other potentially waiting threads: if waited: signal(q.empty) proc cleanFlowVars(w: ptr Worker) = let q = addr(w.q) acquire(q.lock) for i in 0 .. maxPoolSize: for i in maxPoolSize..currentPoolSize-1: let w = addr(workersData[i]) w.shutdown = true proc activateThread(i: int) {.noinline.} = workersData[i].taskArrived = createCondVar() workersData[i].taskStarted = createCondVar() workersData[i].initialized = true initCond(workersData[i].q.empty) initLock(workersData[i].q.lock) createThread(workers[i], slave, addr(workersData[i])) proc setup() = currentPoolSize = min(countProcessors(), MaxThreadPoolSize) readyWorker = addr(workersData[0]) for i in 0.. 0 proc spawn*(call: expr): expr {.magic: "Spawn".} ## always spawns a new task, so that the 'call' is never executed on ## the calling thread. 'call' has to be proc call 'p(...)' where 'p' ## is gcsafe and has a return type that is either 'void' or compatible ## with ``FlowVar[T]``. template spawnX*(call: expr): expr = ## spawns a new task if a CPU core is ready, otherwise executes the ## call in the calling thread. Usually it is advised to ## use 'spawn' in order to not block the producer for an unknown ## amount of time. 'call' has to be proc call 'p(...)' where 'p' ## is gcsafe and has a return type that is either 'void' or compatible ## with ``FlowVar[T]``. (if preferSpawn(): spawn call else: call) proc parallel*(body: stmt) {.magic: "Parallel".} ## a parallel section can be used to execute a block in parallel. ``body`` ## has to be in a DSL that is a particular subset of the language. Please ## refer to the manual for further information. var state: ThreadPoolState stateLock: TLock initLock stateLock proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool = if cas(addr w.ready, true, false): w.data = data w.f = fn signal(w.taskArrived) await(w.taskStarted) result = true proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} = # implementation of 'spawn' that is used by the code generator. while true: if selectWorker(readyWorker, fn, data): return for i in 0.. minPoolSize: let w = addr(workersData[currentPoolSize-1]) w.shutdown = true # we don't free anything here. Too dangerous. release(stateLock) # else the acquire failed, but this means some # other thread succeeded, so we don't need to do anything here. await(gSomeReady) proc sync*() = ## a simple barrier to wait for all spawn'ed tasks. If you need more elaborate ## waiting, you have to use an explicit barrier. while true: var allReady = true for i in 0 ..