summary refs log tree commit diff stats
path: root/lib/pure/concurrency/threadpool.nim
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pure/concurrency/threadpool.nim')
-rw-r--r--lib/pure/concurrency/threadpool.nim606
1 files changed, 606 insertions, 0 deletions
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim
new file mode 100644
index 000000000..06ed2fe54
--- /dev/null
+++ b/lib/pure/concurrency/threadpool.nim
@@ -0,0 +1,606 @@
+#
+#
+#            Nim's Runtime Library
+#        (c) Copyright 2015 Andreas Rumpf
+#
+#    See the file "copying.txt", included in this
+#    distribution, for details about the copyright.
+#
+
+{.deprecated: "use the nimble packages `malebolgia`, `taskpools` or `weave` instead".}
+
+## Implements Nim's `parallel & spawn statements <manual_experimental.html#parallel-amp-spawn>`_.
+##
+## Unstable API.
+##
+## See also
+## ========
+## * `threads module <typedthreads.html>`_ for basic thread support
+## * `locks module <locks.html>`_ for locks and condition variables
+## * `asyncdispatch module <asyncdispatch.html>`_ for asynchronous IO
+
+when not compileOption("threads"):
+  {.error: "Threadpool requires --threads:on option.".}
+
+import std/[cpuinfo, cpuload, locks, os]
+
+when defined(nimPreviewSlimSystem):
+  import std/[assertions, typedthreads, sysatomics]
+
+{.push stackTrace:off.}
+
+type
+  Semaphore = object
+    c: Cond
+    L: Lock
+    counter: int
+
+proc initSemaphore(cv: var Semaphore) =
+  initCond(cv.c)
+  initLock(cv.L)
+
+proc destroySemaphore(cv: var Semaphore) {.inline.} =
+  deinitCond(cv.c)
+  deinitLock(cv.L)
+
+proc blockUntil(cv: var Semaphore) =
+  acquire(cv.L)
+  while cv.counter <= 0:
+    wait(cv.c, cv.L)
+  dec cv.counter
+  release(cv.L)
+
+proc signal(cv: var Semaphore) =
+  acquire(cv.L)
+  inc cv.counter
+  release(cv.L)
+  signal(cv.c)
+
+const CacheLineSize = 64 # true for most archs
+
+type
+  Barrier {.compilerproc.} = object
+    entered: int
+    cv: Semaphore # Semaphore takes 3 words at least
+    left {.align(CacheLineSize).}: int
+    interest {.align(CacheLineSize).} : bool # whether the master is interested in the "all done" event
+
+proc barrierEnter(b: ptr Barrier) {.compilerproc, inline.} =
+  # due to the signaling between threads, it is ensured we are the only
+  # one with access to 'entered' so we don't need 'atomicInc' here:
+  inc b.entered
+  # also we need no 'fence' instructions here as soon 'nimArgsPassingDone'
+  # will be called which already will perform a fence for us.
+
+proc barrierLeave(b: ptr Barrier) {.compilerproc, inline.} =
+  atomicInc b.left
+  when not defined(x86): fence()
+  # We may not have seen the final value of b.entered yet,
+  # so we need to check for >= instead of ==.
+  if b.interest and b.left >= b.entered: signal(b.cv)
+
+proc openBarrier(b: ptr Barrier) {.compilerproc, inline.} =
+  b.entered = 0
+  b.left = 0
+  b.interest = false
+
+proc closeBarrier(b: ptr Barrier) {.compilerproc.} =
+  fence()
+  if b.left != b.entered:
+    b.cv.initSemaphore()
+    fence()
+    b.interest = true
+    fence()
+    while b.left != b.entered: blockUntil(b.cv)
+    destroySemaphore(b.cv)
+
+{.pop.}
+
+# ----------------------------------------------------------------------------
+
+type
+  AwaitInfo = object
+    cv: Semaphore
+    idx: int
+
+  FlowVarBase* = ref FlowVarBaseObj ## Untyped base class for `FlowVar[T] <#FlowVar>`_.
+  FlowVarBaseObj {.acyclic.} = object of RootObj
+    ready, usesSemaphore, awaited: bool
+    cv: Semaphore  # for 'blockUntilAny' support
+    ai: ptr AwaitInfo
+    idx: int
+    data: pointer  # we incRef and unref it to keep it alive; note this MUST NOT
+                   # be RootRef here otherwise the wrong GC keeps track of it!
+    owner: pointer # ptr Worker
+
+  FlowVarObj[T] {.acyclic.} = object of FlowVarBaseObj
+    blob: T
+
+  FlowVar*[T] {.compilerproc.} = ref FlowVarObj[T] ## A data flow variable.
+
+  ToFreeQueue = object
+    len: int
+    lock: Lock
+    empty: Semaphore
+    data: array[128, pointer]
+
+  WorkerProc = proc (thread, args: pointer) {.nimcall, gcsafe.}
+  Worker = object
+    taskArrived: Semaphore
+    taskStarted: Semaphore #\
+    # task data:
+    f: WorkerProc
+    data: pointer
+    ready: bool # put it here for correct alignment!
+    initialized: bool # whether it has even been initialized
+    shutdown: bool # the pool requests to shut down this worker thread
+    q: ToFreeQueue
+    readyForTask: Semaphore
+
+const threadpoolWaitMs {.intdefine.}: int = 100
+
+proc blockUntil*(fv: var FlowVarBaseObj) =
+  ## Waits until the value for `fv` arrives.
+  ##
+  ## Usually it is not necessary to call this explicitly.
+  if fv.usesSemaphore and not fv.awaited:
+    fv.awaited = true
+    blockUntil(fv.cv)
+    destroySemaphore(fv.cv)
+
+proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool =
+  if cas(addr w.ready, true, false):
+    w.data = data
+    w.f = fn
+    signal(w.taskArrived)
+    blockUntil(w.taskStarted)
+    result = true
+
+proc cleanFlowVars(w: ptr Worker) =
+  let q = addr(w.q)
+  acquire(q.lock)
+  for i in 0 ..< q.len:
+    GC_unref(cast[RootRef](q.data[i]))
+    #echo "GC_unref"
+  q.len = 0
+  release(q.lock)
+
+proc wakeupWorkerToProcessQueue(w: ptr Worker) =
+  # we have to ensure it's us who wakes up the owning thread.
+  # This is quite horrible code, but it runs so rarely that it doesn't matter:
+  while not cas(addr w.ready, true, false):
+    cpuRelax()
+    discard
+  w.data = nil
+  w.f = proc (w, a: pointer) {.nimcall.} =
+    let w = cast[ptr Worker](w)
+    cleanFlowVars(w)
+    signal(w.q.empty)
+  signal(w.taskArrived)
+
+proc attach(fv: FlowVarBase; i: int): bool =
+  acquire(fv.cv.L)
+  if fv.cv.counter <= 0:
+    fv.idx = i
+    result = true
+  else:
+    result = false
+  release(fv.cv.L)
+
+proc finished(fv: var FlowVarBaseObj) =
+  doAssert fv.ai.isNil, "flowVar is still attached to an 'blockUntilAny'"
+  # we have to protect against the rare cases where the owner of the flowVar
+  # simply disregards the flowVar and yet the "flowVar" has not yet written
+  # anything to it:
+  blockUntil(fv)
+  if fv.data.isNil: return
+  let owner = cast[ptr Worker](fv.owner)
+  let q = addr(owner.q)
+  acquire(q.lock)
+  while not (q.len < q.data.len):
+    #echo "EXHAUSTED!"
+    release(q.lock)
+    wakeupWorkerToProcessQueue(owner)
+    blockUntil(q.empty)
+    acquire(q.lock)
+  q.data[q.len] = cast[pointer](fv.data)
+  inc q.len
+  release(q.lock)
+  fv.data = nil
+  # the worker thread waits for "data" to be set to nil before shutting down
+  owner.data = nil
+
+proc `=destroy`[T](fv: var FlowVarObj[T]) =
+  finished(fv)
+  `=destroy`(fv.blob)
+
+proc nimCreateFlowVar[T](): FlowVar[T] {.compilerproc.} =
+  new(result)
+
+proc nimFlowVarCreateSemaphore(fv: FlowVarBase) {.compilerproc.} =
+  fv.cv.initSemaphore()
+  fv.usesSemaphore = true
+
+proc nimFlowVarSignal(fv: FlowVarBase) {.compilerproc.} =
+  if fv.ai != nil:
+    acquire(fv.ai.cv.L)
+    fv.ai.idx = fv.idx
+    inc fv.ai.cv.counter
+    release(fv.ai.cv.L)
+    signal(fv.ai.cv.c)
+  if fv.usesSemaphore:
+    signal(fv.cv)
+
+proc awaitAndThen*[T](fv: FlowVar[T]; action: proc (x: T) {.closure.}) =
+  ## Blocks until `fv` is available and then passes its value
+  ## to `action`.
+  ##
+  ## Note that due to Nim's parameter passing semantics, this
+  ## means that `T` doesn't need to be copied, so `awaitAndThen` can
+  ## sometimes be more efficient than the `^ proc <#^,FlowVar[T]>`_.
+  blockUntil(fv[])
+  when defined(nimV2):
+    action(fv.blob)
+  elif T is string or T is seq:
+    action(cast[T](fv.data))
+  elif T is ref:
+    {.error: "'awaitAndThen' not available for FlowVar[ref]".}
+  else:
+    action(fv.blob)
+  finished(fv[])
+
+proc unsafeRead*[T](fv: FlowVar[ref T]): ptr T =
+  ## Blocks until the value is available and then returns this value.
+  blockUntil(fv[])
+  when defined(nimV2):
+    result = cast[ptr T](fv.blob)
+  else:
+    result = cast[ptr T](fv.data)
+  finished(fv[])
+
+proc `^`*[T](fv: FlowVar[T]): T =
+  ## Blocks until the value is available and then returns this value.
+  blockUntil(fv[])
+  when not defined(nimV2) and (T is string or T is seq or T is ref):
+    deepCopy result, cast[T](fv.data)
+  else:
+    result = fv.blob
+  finished(fv[])
+
+proc blockUntilAny*(flowVars: openArray[FlowVarBase]): int =
+  ## Awaits any of the given `flowVars`. Returns the index of one `flowVar`
+  ## for which a value arrived.
+  ##
+  ## A `flowVar` only supports one call to `blockUntilAny` at the same time.
+  ## That means if you `blockUntilAny([a,b])` and `blockUntilAny([b,c])`
+  ## the second call will only block until `c`. If there is no `flowVar` left
+  ## to be able to wait on, -1 is returned.
+  ##
+  ## **Note:** This results in non-deterministic behaviour and should be avoided.
+  var ai: AwaitInfo
+  ai.cv.initSemaphore()
+  var conflicts = 0
+  result = -1
+  for i in 0 .. flowVars.high:
+    if cas(addr flowVars[i].ai, nil, addr ai):
+      if not attach(flowVars[i], i):
+        result = i
+        break
+    else:
+      inc conflicts
+  if conflicts < flowVars.len:
+    if result < 0:
+      blockUntil(ai.cv)
+      result = ai.idx
+    for i in 0 .. flowVars.high:
+      discard cas(addr flowVars[i].ai, addr ai, nil)
+  destroySemaphore(ai.cv)
+
+proc isReady*(fv: FlowVarBase): bool =
+  ## Determines whether the specified `FlowVarBase`'s value is available.
+  ##
+  ## If `true`, awaiting `fv` will not block.
+  if fv.usesSemaphore and not fv.awaited:
+    acquire(fv.cv.L)
+    result = fv.cv.counter > 0
+    release(fv.cv.L)
+  else:
+    result = true
+
+proc nimArgsPassingDone(p: pointer) {.compilerproc.} =
+  let w = cast[ptr Worker](p)
+  signal(w.taskStarted)
+
+const
+  MaxThreadPoolSize* {.intdefine.} = 256 ## Maximum size of the thread pool. 256 threads
+                                         ## should be good enough for anybody ;-)
+  MaxDistinguishedThread* {.intdefine.} = 32 ## Maximum number of "distinguished" threads.
+
+type
+  ThreadId* = range[0..MaxDistinguishedThread-1] ## A thread identifier.
+
+var
+  currentPoolSize: int
+  maxPoolSize = MaxThreadPoolSize
+  minPoolSize = 4
+  gSomeReady: Semaphore
+  readyWorker: ptr Worker
+
+# A workaround for recursion deadlock issue
+# https://github.com/nim-lang/Nim/issues/4597
+var
+  numSlavesLock: Lock
+  numSlavesRunning {.guard: numSlavesLock.}: int
+  numSlavesWaiting {.guard: numSlavesLock.}: int
+  isSlave {.threadvar.}: bool
+
+numSlavesLock.initLock
+
+gSomeReady.initSemaphore()
+
+proc slave(w: ptr Worker) {.thread.} =
+  isSlave = true
+  while true:
+    if w.shutdown:
+      w.shutdown = false
+      atomicDec currentPoolSize
+      while true:
+        if w.data != nil:
+          sleep(threadpoolWaitMs)
+        else:
+          # The flowvar finalizer ("finished()") set w.data to nil, so we can
+          # safely terminate the thread.
+          #
+          # TODO: look for scenarios in which the flowvar is never finalized, so
+          # a shut down thread gets stuck in this loop until the main thread exits.
+          break
+      break
+    when declared(atomicStoreN):
+      atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST)
+    else:
+      w.ready = true
+    readyWorker = w
+    signal(gSomeReady)
+    blockUntil(w.taskArrived)
+    # XXX Somebody needs to look into this (why does this assertion fail
+    # in Visual Studio?)
+    when not defined(vcc) and not defined(tcc): assert(not w.ready)
+
+    withLock numSlavesLock:
+      inc numSlavesRunning
+
+    w.f(w, w.data)
+
+    withLock numSlavesLock:
+      dec numSlavesRunning
+
+    if w.q.len != 0: w.cleanFlowVars
+
+proc distinguishedSlave(w: ptr Worker) {.thread.} =
+  while true:
+    when declared(atomicStoreN):
+      atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST)
+    else:
+      w.ready = true
+    signal(w.readyForTask)
+    blockUntil(w.taskArrived)
+    assert(not w.ready)
+    w.f(w, w.data)
+    if w.q.len != 0: w.cleanFlowVars
+
+var
+  workers: array[MaxThreadPoolSize, Thread[ptr Worker]]
+  workersData: array[MaxThreadPoolSize, Worker]
+
+  distinguished: array[MaxDistinguishedThread, Thread[ptr Worker]]
+  distinguishedData: array[MaxDistinguishedThread, Worker]
+
+when defined(nimPinToCpu):
+  var gCpus: Natural
+
+proc setMinPoolSize*(size: range[1..MaxThreadPoolSize]) =
+  ## Sets the minimum thread pool size. The default value of this is 4.
+  minPoolSize = size
+
+proc setMaxPoolSize*(size: range[1..MaxThreadPoolSize]) =
+  ## Sets the maximum thread pool size. The default value of this
+  ## is `MaxThreadPoolSize <#MaxThreadPoolSize>`_.
+  maxPoolSize = size
+  if currentPoolSize > maxPoolSize:
+    for i in maxPoolSize..currentPoolSize-1:
+      let w = addr(workersData[i])
+      w.shutdown = true
+
+when defined(nimRecursiveSpawn):
+  var localThreadId {.threadvar.}: int
+
+proc activateWorkerThread(i: int) {.noinline.} =
+  workersData[i].taskArrived.initSemaphore()
+  workersData[i].taskStarted.initSemaphore()
+  workersData[i].initialized = true
+  workersData[i].q.empty.initSemaphore()
+  initLock(workersData[i].q.lock)
+  createThread(workers[i], slave, addr(workersData[i]))
+  when defined(nimRecursiveSpawn):
+    localThreadId = i+1
+  when defined(nimPinToCpu):
+    if gCpus > 0: pinToCpu(workers[i], i mod gCpus)
+
+proc activateDistinguishedThread(i: int) {.noinline.} =
+  distinguishedData[i].taskArrived.initSemaphore()
+  distinguishedData[i].taskStarted.initSemaphore()
+  distinguishedData[i].initialized = true
+  distinguishedData[i].q.empty.initSemaphore()
+  initLock(distinguishedData[i].q.lock)
+  distinguishedData[i].readyForTask.initSemaphore()
+  createThread(distinguished[i], distinguishedSlave, addr(distinguishedData[i]))
+
+proc setup() =
+  let p = countProcessors()
+  when defined(nimPinToCpu):
+    gCpus = p
+  currentPoolSize = min(p, MaxThreadPoolSize)
+  readyWorker = addr(workersData[0])
+  for i in 0..<currentPoolSize: activateWorkerThread(i)
+
+proc preferSpawn*(): bool =
+  ## Use this proc to determine quickly if a `spawn` or a direct call is
+  ## preferable.
+  ##
+  ## If it returns `true`, a `spawn` may make sense. In general
+  ## it is not necessary to call this directly; use the `spawnX template
+  ## <#spawnX.t>`_ instead.
+  result = gSomeReady.counter > 0
+
+proc spawn*(call: sink typed) {.magic: "Spawn".} =
+  ## Always spawns a new task, so that the `call` is never executed on
+  ## the calling thread.
+  ##
+  ## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a
+  ## return type that is either `void` or compatible with `FlowVar[T]`.
+  discard "It uses `nimSpawn3` internally"
+
+proc pinnedSpawn*(id: ThreadId; call: sink typed) {.magic: "Spawn".} =
+  ## Always spawns a new task on the worker thread with `id`, so that
+  ## the `call` is **always** executed on the thread.
+  ##
+  ## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a
+  ## return type that is either `void` or compatible with `FlowVar[T]`.
+  discard "It uses `nimSpawn4` internally"
+
+template spawnX*(call) =
+  ## Spawns a new task if a CPU core is ready, otherwise executes the
+  ## call in the calling thread.
+  ##
+  ## Usually, it is advised to use the `spawn proc <#spawn,sinktyped>`_
+  ## in order to not block the producer for an unknown amount of time.
+  ##
+  ## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a
+  ## return type that is either 'void' or compatible with `FlowVar[T]`.
+  (if preferSpawn(): spawn call else: call)
+
+proc parallel*(body: untyped) {.magic: "Parallel".}
+  ## A parallel section can be used to execute a block in parallel.
+  ##
+  ## `body` has to be in a DSL that is a particular subset of the language.
+  ##
+  ## Please refer to `the manual <manual_experimental.html#parallel-amp-spawn>`_
+  ## for further information.
+
+var
+  state: ThreadPoolState
+  stateLock: Lock
+
+initLock stateLock
+
+proc nimSpawn3(fn: WorkerProc; data: pointer) {.compilerproc.} =
+  # implementation of 'spawn' that is used by the code generator.
+  while true:
+    if selectWorker(readyWorker, fn, data): return
+    for i in 0..<currentPoolSize:
+      if selectWorker(addr(workersData[i]), fn, data): return
+
+    # determine what to do, but keep in mind this is expensive too:
+    # state.calls < maxPoolSize: warmup phase
+    # (state.calls and 127) == 0: periodic check
+    if state.calls < maxPoolSize or (state.calls and 127) == 0:
+      # ensure the call to 'advice' is atomic:
+      if tryAcquire(stateLock):
+        if currentPoolSize < minPoolSize:
+          if not workersData[currentPoolSize].initialized:
+            activateWorkerThread(currentPoolSize)
+          let w = addr(workersData[currentPoolSize])
+          atomicInc currentPoolSize
+          if selectWorker(w, fn, data):
+            release(stateLock)
+            return
+
+        case advice(state)
+        of doNothing: discard
+        of doCreateThread:
+          if currentPoolSize < maxPoolSize:
+            if not workersData[currentPoolSize].initialized:
+              activateWorkerThread(currentPoolSize)
+            let w = addr(workersData[currentPoolSize])
+            atomicInc currentPoolSize
+            if selectWorker(w, fn, data):
+              release(stateLock)
+              return
+            # else we didn't succeed but some other thread, so do nothing.
+        of doShutdownThread:
+          if currentPoolSize > minPoolSize:
+            let w = addr(workersData[currentPoolSize-1])
+            w.shutdown = true
+          # we don't free anything here. Too dangerous.
+        release(stateLock)
+      # else the acquire failed, but this means some
+      # other thread succeeded, so we don't need to do anything here.
+    when defined(nimRecursiveSpawn):
+      if localThreadId > 0:
+        # we are a worker thread, so instead of waiting for something which
+        # might as well never happen (see tparallel_quicksort), we run the task
+        # on the current thread instead.
+        var self = addr(workersData[localThreadId-1])
+        fn(self, data)
+        blockUntil(self.taskStarted)
+        return
+
+    if isSlave:
+      # Run under lock until `numSlavesWaiting` increment to avoid a
+      # race (otherwise two last threads might start waiting together)
+      withLock numSlavesLock:
+        if numSlavesRunning <= numSlavesWaiting + 1:
+          # All the other slaves are waiting
+          # If we wait now, we-re deadlocked until
+          # an external spawn happens !
+          if currentPoolSize < maxPoolSize:
+            if not workersData[currentPoolSize].initialized:
+              activateWorkerThread(currentPoolSize)
+            let w = addr(workersData[currentPoolSize])
+            atomicInc currentPoolSize
+            if selectWorker(w, fn, data):
+              return
+          else:
+            # There is no place in the pool. We're deadlocked.
+            # echo "Deadlock!"
+            discard
+
+        inc numSlavesWaiting
+
+    blockUntil(gSomeReady)
+
+    if isSlave:
+      withLock numSlavesLock:
+        dec numSlavesWaiting
+
+var
+  distinguishedLock: Lock
+
+initLock distinguishedLock
+
+proc nimSpawn4(fn: WorkerProc; data: pointer; id: ThreadId) {.compilerproc.} =
+  acquire(distinguishedLock)
+  if not distinguishedData[id].initialized:
+    activateDistinguishedThread(id)
+  release(distinguishedLock)
+  while true:
+    if selectWorker(addr(distinguishedData[id]), fn, data): break
+    blockUntil(distinguishedData[id].readyForTask)
+
+
+proc sync*() =
+  ## A simple barrier to wait for all `spawn`ed tasks.
+  ##
+  ## If you need more elaborate waiting, you have to use an explicit barrier.
+  while true:
+    var allReady = true
+    for i in 0 ..< currentPoolSize:
+      if not allReady: break
+      allReady = allReady and workersData[i].ready
+    if allReady: break
+    sleep(threadpoolWaitMs)
+    # We cannot "blockUntil(gSomeReady)" because workers may be shut down between
+    # the time we establish that some are not "ready" and the time we wait for a
+    # "signal(gSomeReady)" from inside "slave()" that can never come.
+
+setup()