# # # Nim's Runtime Library # (c) Copyright 2015 Andreas Rumpf # # See the file "copying.txt", included in this # distribution, for details about the copyright. # ## Implements Nim's `parallel & spawn statements `_. ## ## Unstable API. ## ## See also ## ======== ## * `threads module `_ for basic thread support ## * `channels module `_ for message passing support ## * `locks module `_ for locks and condition variables ## * `asyncdispatch module `_ for asynchronous IO when not compileOption("threads"): {.error: "Threadpool requires --threads:on option.".} import cpuinfo, cpuload, locks, os {.push stackTrace:off.} type Semaphore = object c: Cond L: Lock counter: int proc initSemaphore(cv: var Semaphore) = initCond(cv.c) initLock(cv.L) proc destroySemaphore(cv: var Semaphore) {.inline.} = deinitCond(cv.c) deinitLock(cv.L) proc blockUntil(cv: var Semaphore) = acquire(cv.L) while cv.counter <= 0: wait(cv.c, cv.L) dec cv.counter release(cv.L) proc signal(cv: var Semaphore) = acquire(cv.L) inc cv.counter release(cv.L) signal(cv.c) const CacheLineSize = 32 # true for most archs type Barrier {.compilerproc.} = object entered: int cv: Semaphore # Semaphore takes 3 words at least when sizeof(int) < 8: cacheAlign: array[CacheLineSize-4*sizeof(int), byte] left: int cacheAlign2: array[CacheLineSize-sizeof(int), byte] interest: bool # whether the master is interested in the "all done" event proc barrierEnter(b: ptr Barrier) {.compilerproc, inline.} = # due to the signaling between threads, it is ensured we are the only # one with access to 'entered' so we don't need 'atomicInc' here: inc b.entered # also we need no 'fence' instructions here as soon 'nimArgsPassingDone' # will be called which already will perform a fence for us. proc barrierLeave(b: ptr Barrier) {.compilerproc, inline.} = atomicInc b.left when not defined(x86): fence() # We may not have seen the final value of b.entered yet, # so we need to check for >= instead of ==. if b.interest and b.left >= b.entered: signal(b.cv) proc openBarrier(b: ptr Barrier) {.compilerproc, inline.} = b.entered = 0 b.left = 0 b.interest = false proc closeBarrier(b: ptr Barrier) {.compilerproc.} = fence() if b.left != b.entered: b.cv.initSemaphore() fence() b.interest = true fence() while b.left != b.entered: blockUntil(b.cv) destroySemaphore(b.cv) {.pop.} # ---------------------------------------------------------------------------- type AwaitInfo = object cv: Semaphore idx: int FlowVarBase* = ref FlowVarBaseObj ## Untyped base class for `FlowVar[T] <#FlowVar>`_. FlowVarBaseObj = object of RootObj ready, usesSemaphore, awaited: bool cv: Semaphore # for 'blockUntilAny' support ai: ptr AwaitInfo idx: int data: pointer # we incRef and unref it to keep it alive; note this MUST NOT # be RootRef here otherwise the wrong GC keeps track of it! owner: pointer # ptr Worker FlowVarObj[T] = object of FlowVarBaseObj blob: T FlowVar*[T] {.compilerproc.} = ref FlowVarObj[T] ## A data flow variable. ToFreeQueue = object len: int lock: Lock empty: Semaphore data: array[128, pointer] WorkerProc = proc (thread, args: pointer) {.nimcall, gcsafe.} Worker = object taskArrived: Semaphore taskStarted: Semaphore #\ # task data: f: WorkerProc data: pointer ready: bool # put it here for correct alignment! initialized: bool # whether it has even been initialized shutdown: bool # the pool requests to shut down this worker thread q: ToFreeQueue readyForTask: Semaphore const threadpoolWaitMs {.intdefine.}: int = 100 proc blockUntil*(fv: var FlowVarBaseObj) = ## Waits until the value for `fv` arrives. ## ## Usually it is not necessary to call this explicitly. if fv.usesSemaphore and not fv.awaited: fv.awaited = true blockUntil(fv.cv) destroySemaphore(fv.cv) proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool = if cas(addr w.ready, true, false): w.data = data w.f = fn signal(w.taskArrived) blockUntil(w.taskStarted) result = true proc cleanFlowVars(w: ptr Worker) = let q = addr(w.q) acquire(q.lock) for i in 0 ..< q.len: GC_unref(cast[RootRef](q.data[i])) #echo "GC_unref" q.len = 0 release(q.lock) proc wakeupWorkerToProcessQueue(w: ptr Worker) = # we have to ensure it's us who wakes up the owning thread. # This is quite horrible code, but it runs so rarely that it doesn't matter: while not cas(addr w.ready, true, false): cpuRelax() discard w.data = nil w.f = proc (w, a: pointer) {.nimcall.} = let w = cast[ptr Worker](w) cleanFlowVars(w) signal(w.q.empty) signal(w.taskArrived) proc attach(fv: FlowVarBase; i: int): bool = acquire(fv.cv.L) if fv.cv.counter <= 0: fv.idx = i result = true else: result = false release(fv.cv.L) proc finished(fv: var FlowVarBaseObj) = doAssert fv.ai.isNil, "flowVar is still attached to an 'blockUntilAny'" # we have to protect against the rare cases where the owner of the flowVar # simply disregards the flowVar and yet the "flowVar" has not yet written # anything to it: blockUntil(fv) if fv.data.isNil: return let owner = cast[ptr Worker](fv.owner) let q = addr(owner.q) acquire(q.lock) while not (q.len < q.data.len): #echo "EXHAUSTED!" release(q.lock) wakeupWorkerToProcessQueue(owner) blockUntil(q.empty) acquire(q.lock) q.data[q.len] = cast[pointer](fv.data) inc q.len release(q.lock) fv.data = nil # the worker thread waits for "data" to be set to nil before shutting down owner.data = nil proc `=destroy`[T](fv: var FlowVarObj[T]) = finished(fv) `=destroy`(fv.blob) proc nimCreateFlowVar[T](): FlowVar[T] {.compilerproc.} = new(result) proc nimFlowVarCreateSemaphore(fv: FlowVarBase) {.compilerproc.} = fv.cv.initSemaphore() fv.usesSemaphore = true proc nimFlowVarSignal(fv: FlowVarBase) {.compilerproc.} = if fv.ai != nil: acquire(fv.ai.cv.L) fv.ai.idx = fv.idx inc fv.ai.cv.counter release(fv.ai.cv.L) signal(fv.ai.cv.c) if fv.usesSemaphore: signal(fv.cv) proc awaitAndThen*[T](fv: FlowVar[T]; action: proc (x: T) {.closure.}) = ## Blocks until `fv` is available and then passes its value ## to `action`. ## ## Note that due to Nim's parameter passing semantics, this ## means that `T` doesn't need to be copied, so `awaitAndThen` can ## sometimes be more efficient than the `^ proc <#^,FlowVar[T]>`_. blockUntil(fv[]) when defined(nimV2): action(fv.blob) elif T is string or T is seq: action(cast[T](fv.data)) elif T is ref: {.error: "'awaitAndThen' not available for FlowVar[ref]".} else: action(fv.blob) finished(fv[]) proc unsafeRead*[T](fv: FlowVar[ref T]): ptr T = ## Blocks until the value is available and then returns this value. blockUntil(fv[]) when defined(nimV2): result = cast[ptr T](fv.blob) else: result = cast[ptr T](fv.data) finished(fv[]) proc `^`*[T](fv: FlowVar[T]): T = ## Blocks until the value is available and then returns this value. blockUntil(fv[]) when not defined(nimV2) and (T is string or T is seq or T is ref): deepCopy result, cast[T](fv.data) else: result = fv.blob finished(fv[]) proc blockUntilAny*(flowVars: openArray[FlowVarBase]): int = ## Awaits any of the given `flowVars`. Returns the index of one `flowVar` ## for which a value arrived. ## ## A `flowVar` only supports one call to `blockUntilAny` at the same time. ## That means if you `blockUntilAny([a,b])` and `blockUntilAny([b,c])` ## the second call will only block until `c`. If there is no `flowVar` left ## to be able to wait on, -1 is returned. ## ## **Note:** This results in non-deterministic behaviour and should be avoided. var ai: AwaitInfo ai.cv.initSemaphore() var conflicts = 0 result = -1 for i in 0 .. flowVars.high: if cas(addr flowVars[i].ai, nil, addr ai): if not attach(flowVars[i], i): result = i break else: inc conflicts if conflicts < flowVars.len: if result < 0: blockUntil(ai.cv) result = ai.idx for i in 0 .. flowVars.high: discard cas(addr flowVars[i].ai, addr ai, nil) destroySemaphore(ai.cv) proc isReady*(fv: FlowVarBase): bool = ## Determines whether the specified `FlowVarBase`'s value is available. ## ## If `true`, awaiting `fv` will not block. if fv.usesSemaphore and not fv.awaited: acquire(fv.cv.L) result = fv.cv.counter > 0 release(fv.cv.L) else: result = true proc nimArgsPassingDone(p: pointer) {.compilerproc.} = let w = cast[ptr Worker](p) signal(w.taskStarted) const MaxThreadPoolSize* {.intdefine.} = 256 ## Maximum size of the thread pool. 256 threads ## should be good enough for anybody ;-) MaxDistinguishedThread* {.intdefine.} = 32 ## Maximum number of "distinguished" threads. type ThreadId* = range[0..MaxDistinguishedThread-1] ## A thread identifier. var currentPoolSize: int maxPoolSize = MaxThreadPoolSize minPoolSize = 4 gSomeReady: Semaphore readyWorker: ptr Worker # A workaround for recursion deadlock issue # https://github.com/nim-lang/Nim/issues/4597 var numSlavesLock: Lock numSlavesRunning {.guard: numSlavesLock.}: int numSlavesWaiting {.guard: numSlavesLock.}: int isSlave {.threadvar.}: bool numSlavesLock.initLock gSomeReady.initSemaphore() proc slave(w: ptr Worker) {.thread.} = isSlave = true while true: if w.shutdown: w.shutdown = false atomicDec currentPoolSize while true: if w.data != nil: sleep(threadpoolWaitMs) else: # The flowvar finalizer ("finished()") set w.data to nil, so we can # safely terminate the thread. # # TODO: look for scenarios in which the flowvar is never finalized, so # a shut down thread gets stuck in this loop until the main thread exits. break break when declared(atomicStoreN): atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST) else: w.ready = true readyWorker = w signal(gSomeReady) blockUntil(w.taskArrived) # XXX Somebody needs to look into this (why does this assertion fail # in Visual Studio?) when not defined(vcc) and not defined(tcc): assert(not w.ready) withLock numSlavesLock: inc numSlavesRunning w.f(w, w.data) withLock numSlavesLock: dec numSlavesRunning if w.q.len != 0: w.cleanFlowVars proc distinguishedSlave(w: ptr Worker) {.thread.} = while true: when declared(atomicStoreN): atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST) else: w.ready = true signal(w.readyForTask) blockUntil(w.taskArrived) assert(not w.ready) w.f(w, w.data) if w.q.len != 0: w.cleanFlowVars var workers: array[MaxThreadPoolSize, Thread[ptr Worker]] workersData: array[MaxThreadPoolSize, Worker] distinguished: array[MaxDistinguishedThread, Thread[ptr Worker]] distinguishedData: array[MaxDistinguishedThread, Worker] when defined(nimPinToCpu): var gCpus: Natural proc setMinPoolSize*(size: range[1..MaxThreadPoolSize]) = ## Sets the minimum thread pool size. The default value of this is 4. minPoolSize = size proc setMaxPoolSize*(size: range[1..MaxThreadPoolSize]) = ## Sets the maximum thread pool size. The default value of this ## is `MaxThreadPoolSize <#MaxThreadPoolSize>`_. maxPoolSize = size if currentPoolSize > maxPoolSize: for i in maxPoolSize..currentPoolSize-1: let w = addr(workersData[i]) w.shutdown = true when defined(nimRecursiveSpawn): var localThreadId {.threadvar.}: int proc activateWorkerThread(i: int) {.noinline.} = workersData[i].taskArrived.initSemaphore() workersData[i].taskStarted.initSemaphore() workersData[i].initialized = true workersData[i].q.empty.initSemaphore() initLock(workersData[i].q.lock) createThread(workers[i], slave, addr(workersData[i])) when defined(nimRecursiveSpawn): localThreadId = i+1 when defined(nimPinToCpu): if gCpus > 0: pinToCpu(workers[i], i mod gCpus) proc activateDistinguishedThread(i: int) {.noinline.} = distinguishedData[i].taskArrived.initSemaphore() distinguishedData[i].taskStarted.initSemaphore() distinguishedData[i].initialized = true distinguishedData[i].q.empty.initSemaphore() initLock(distinguishedData[i].q.lock) distinguishedData[i].readyForTask.initSemaphore() createThread(distinguished[i], distinguishedSlave, addr(distinguishedData[i])) proc setup() = let p = countProcessors() when defined(nimPinToCpu): gCpus = p currentPoolSize = min(p, MaxThreadPoolSize) readyWorker = addr(workersData[0]) for i in 0..`_ instead. result = gSomeReady.counter > 0 proc spawn*(call: sink typed): void {.magic: "Spawn".} ## Always spawns a new task, so that the `call` is never executed on ## the calling thread. ## ## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a ## return type that is either `void` or compatible with `FlowVar[T]`. proc pinnedSpawn*(id: ThreadId; call: sink typed): void {.magic: "Spawn".} ## Always spawns a new task on the worker thread with `id`, so that ## the `call` is **always** executed on the thread. ## ## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a ## return type that is either `void` or compatible with `FlowVar[T]`. template spawnX*(call) = ## Spawns a new task if a CPU core is ready, otherwise executes the ## call in the calling thread. ## ## Usually, it is advised to use the `spawn proc <#spawn,sinktyped>`_ ## in order to not block the producer for an unknown amount of time. ## ## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a ## return type that is either 'void' or compatible with `FlowVar[T]`. (if preferSpawn(): spawn call else: call) proc parallel*(body: untyped) {.magic: "Parallel".} ## A parallel section can be used to execute a block in parallel. ## ## `body` has to be in a DSL that is a particular subset of the language. ## ## Please refer to `the manual `_ ## for further information. var state: ThreadPoolState stateLock: Lock initLock stateLock proc nimSpawn3(fn: WorkerProc; data: pointer) {.compilerproc.} = # implementation of 'spawn' that is used by the code generator. while true: if selectWorker(readyWorker, fn, data): return for i in 0.. minPoolSize: let w = addr(workersData[currentPoolSize-1]) w.shutdown = true # we don't free anything here. Too dangerous. release(stateLock) # else the acquire failed, but this means some # other thread succeeded, so we don't need to do anything here. when defined(nimRecursiveSpawn): if localThreadId > 0: # we are a worker thread, so instead of waiting for something which # might as well never happen (see tparallel_quicksort), we run the task # on the current thread instead. var self = addr(workersData[localThreadId-1]) fn(self, data) blockUntil(self.taskStarted) return if isSlave: # Run under lock until `numSlavesWaiting` increment to avoid a # race (otherwise two last threads might start waiting together) withLock numSlavesLock: if numSlavesRunning <= numSlavesWaiting + 1: # All the other slaves are waiting # If we wait now, we-re deadlocked until # an external spawn happens ! if currentPoolSize < maxPoolSize: if not workersData[currentPoolSize].initialized: activateWorkerThread(currentPoolSize) let w = addr(workersData[currentPoolSize]) atomicInc currentPoolSize if selectWorker(w, fn, data): return else: # There is no place in the pool. We're deadlocked. # echo "Deadlock!" discard inc numSlavesWaiting blockUntil(gSomeReady) if isSlave: withLock numSlavesLock: dec numSlavesWaiting var distinguishedLock: Lock initLock distinguishedLock proc nimSpawn4(fn: WorkerProc; data: pointer; id: ThreadId) {.compilerproc.} = acquire(distinguishedLock) if not distinguishedData[id].initialized: activateDistinguishedThread(id) release(distinguishedLock) while true: if selectWorker(addr(distinguishedData[id]), fn, data): break blockUntil(distinguishedData[id].readyForTask) proc sync*() = ## A simple barrier to wait for all `spawn`ed tasks. ## ## If you need more elaborate waiting, you have to use an explicit barrier. while true: var allReady = true for i in 0 ..< currentPoolSize: if not allReady: break allReady = allReady and workersData[i].ready if allReady: break sleep(threadpoolWaitMs) # We cannot "blockUntil(gSomeReady)" because workers may be shut down between # the time we establish that some are not "ready" and the time we wait for a # "signal(gSomeReady)" from inside "slave()" that can never come. setup()