summary refs log tree commit diff stats
path: root/lib/pure/concurrency
diff options
authorAraq <>2014-05-12 11:12:37 +0200
committerAraq <>2014-05-12 11:12:37 +0200
commit6195dbe491ccd864c5dcb59f87826291ac1f1ff4 (patch)
treed2a549857984a296ab4b3e2be72745d413da005a /lib/pure/concurrency
parentbdb2d21f276c10aee122218384e568ef843690fa (diff)
initial non-compiling version of 'parallel'
Diffstat (limited to 'lib/pure/concurrency')
3 files changed, 364 insertions, 0 deletions
diff --git a/lib/pure/concurrency/cpuinfo.nim b/lib/pure/concurrency/cpuinfo.nim
new file mode 100644
index 000000000..dfa819f64
--- /dev/null
+++ b/lib/pure/concurrency/cpuinfo.nim
@@ -0,0 +1,58 @@
+#            Nimrod's Runtime Library
+#        (c) Copyright 2014 Andreas Rumpf
+#    See the file "copying.txt", included in this
+#    distribution, for details about the copyright.
+## This module implements procs to determine the number of CPUs / cores.
+include "system/inclrtl"
+import strutils, os
+when not defined(windows):
+  import posix
+when defined(linux):
+  import linux
+when defined(macosx) or defined(bsd):
+  const
+    CTL_HW = 6
+    HW_AVAILCPU = 25
+    HW_NCPU = 3
+  proc sysctl(x: ptr array[0..3, cint], y: cint, z: pointer,
+              a: var csize, b: pointer, c: int): cint {.
+             importc: "sysctl", header: "<sys/sysctl.h>".}
+proc countProcessors*(): int {.rtl, extern: "ncpi$1".} =
+  ## returns the numer of the processors/cores the machine has.
+  ## Returns 0 if it cannot be detected.
+  when defined(windows):
+    var x = getEnv("NUMBER_OF_PROCESSORS")
+    if x.len > 0: result = parseInt(x.string)
+  elif defined(macosx) or defined(bsd):
+    var
+      mib: array[0..3, cint]
+      numCPU: int
+      len: csize
+    mib[0] = CTL_HW
+    mib[1] = HW_AVAILCPU
+    len = sizeof(numCPU)
+    discard sysctl(addr(mib), 2, addr(numCPU), len, nil, 0)
+    if numCPU < 1:
+      mib[1] = HW_NCPU
+      discard sysctl(addr(mib), 2, addr(numCPU), len, nil, 0)
+    result = numCPU
+  elif defined(hpux):
+    result = mpctl(MPC_GETNUMSPUS, nil, nil)
+  elif defined(irix):
+    var SC_NPROC_ONLN {.importc: "_SC_NPROC_ONLN", header: "<unistd.h>".}: cint
+    result = sysconf(SC_NPROC_ONLN)
+  else:
+    result = sysconf(SC_NPROCESSORS_ONLN)
+  if result <= 0: result = 1
diff --git a/lib/pure/concurrency/cpuload.nim b/lib/pure/concurrency/cpuload.nim
new file mode 100644
index 000000000..3cf6a7392
--- /dev/null
+++ b/lib/pure/concurrency/cpuload.nim
@@ -0,0 +1,96 @@
+#            Nimrod's Runtime Library
+#        (c) Copyright 2014 Andreas Rumpf
+#    See the file "copying.txt", included in this
+#    distribution, for details about the copyright.
+## This module implements a helper for a thread pool to determine whether
+## creating a thread is a good idea.
+when defined(windows):
+  import winlean, os, strutils, math
+  proc `-`(a, b: TFILETIME): int64 = a.rdFileTime - b.rdFileTime
+elif defined(linux):
+  from cpuinfo import countProcessors
+  ThreadPoolAdvice* = enum
+    doNothing,
+    doCreateThread,  # create additional thread for throughput
+    doShutdownThread # too many threads are busy, shutdown one
+  ThreadPoolState* = object
+    when defined(windows):
+      prevSysKernel, prevSysUser, prevProcKernel, prevProcUser: TFILETIME
+    calls*: int
+proc advice*(s: var ThreadPoolState): ThreadPoolAdvice =
+  when defined(windows):
+    var
+      sysIdle, sysKernel, sysUser,
+        procCreation, procExit, procKernel, procUser: TFILETIME
+    if getSystemTimes(sysIdle, sysKernel, sysUser) == 0 or
+        getProcessTimes(THandle(-1), procCreation, procExit, 
+                        procKernel, procUser) == 0:
+      return doNothing
+    if s.calls > 0:
+      let
+        sysKernelDiff = sysKernel - s.prevSysKernel
+        sysUserDiff = sysUser - s.prevSysUser
+        procKernelDiff = procKernel - s.prevProcKernel
+        procUserDiff = procUser - s.prevProcUser
+        sysTotal = int(sysKernelDiff + sysUserDiff)
+        procTotal = int(procKernelDiff + procUserDiff)
+      # total CPU usage < 85% --> create a new worker thread.
+      # Measurements show that 100% and often even 90% is not reached even
+      # if all my cores are busy.
+      if sysTotal == 0 or procTotal / sysTotal < 0.85:
+        result = doCreateThread
+    s.prevSysKernel = sysKernel
+    s.prevSysUser = sysUser
+    s.prevProcKernel = procKernel
+    s.prevProcUser = procUser
+  elif defined(linux):
+    proc fscanf(c: TFile, frmt: cstring) {.varargs, importc, 
+      header: "<stdio.h>".}
+    var f = open("/proc/loadavg")
+    var b: float
+    var busy, total: int
+    fscanf(f,"%lf %lf %lf %ld/%ld",
+           addr b, addr b, addr b, addr busy, addr total)
+    f.close()
+    let cpus = countProcessors()
+    if busy-1 < cpus:
+      result = doCreateThread
+    elif busy-1 >= cpus*2:
+      result = doShutdownThread
+    else:
+      result = doNothing
+  else:
+    # XXX implement this for other OSes
+    result = doNothing
+  inc s.calls
+when isMainModule:
+  proc busyLoop() =
+    while true:
+      discard random(80)
+      os.sleep(100)
+  spawn busyLoop()
+  spawn busyLoop()
+  spawn busyLoop()
+  spawn busyLoop()
+  var s: ThreadPoolState
+  for i in 1 .. 70:
+    echo advice(s)
+    os.sleep(1000)
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim
new file mode 100644
index 000000000..856820c6e
--- /dev/null
+++ b/lib/pure/concurrency/threadpool.nim
@@ -0,0 +1,210 @@
+#            Nimrod's Runtime Library
+#        (c) Copyright 2014 Andreas Rumpf
+#    See the file "copying.txt", included in this
+#    distribution, for details about the copyright.
+## Implements Nimrod's 'spawn'.
+import cpuinfo, cpuload, locks
+{.push stackTrace:off.}
+  CondVar = object
+    c: TCond
+    L: TLock
+    counter: int
+proc createCondVar(): CondVar =
+  initCond(result.c)
+  initLock(result.L)
+proc destroyCondVar(cv: var CondVar) {.inline.} =
+  deinitCond(cv.c)
+  deinitLock(cv.L)
+proc await(cv: var CondVar) =
+  acquire(cv.L)
+  while cv.counter <= 0:
+    wait(cv.c, cv.L)
+  dec cv.counter
+  release(cv.L)
+proc signal(cv: var CondVar) =
+  acquire(cv.L)
+  inc cv.counter
+  release(cv.L)
+  signal(cv.c)
+  Barrier* {.compilerProc.} = object
+    counter: int
+    cv: CondVar
+proc barrierEnter*(b: ptr Barrier) {.compilerProc.} =
+  atomicInc b.counter
+proc barrierLeave*(b: ptr Barrier) {.compilerProc.} =
+  atomicDec b.counter
+  if b.counter <= 0: signal(
+proc openBarrier*(b: ptr Barrier) {.compilerProc.} =
+  b.counter = 0
+ = createCondVar()
+proc closeBarrier*(b: ptr Barrier) {.compilerProc.} =
+  await(
+  destroyCondVar(
+# ----------------------------------------------------------------------------
+  WorkerProc = proc (thread, args: pointer) {.nimcall, gcsafe.}
+  Worker = object
+    taskArrived: CondVar
+    taskStarted: CondVar #\
+    # task data:
+    f: WorkerProc
+    data: pointer
+    ready: bool # put it here for correct alignment!
+    initialized: bool # whether it has even been initialized
+proc nimArgsPassingDone(p: pointer) {.compilerProc.} =
+  let w = cast[ptr Worker](p)
+  signal(w.taskStarted)
+  gSomeReady = createCondVar()
+  readyWorker: ptr Worker
+proc slave(w: ptr Worker) {.thread.} =
+  while true:
+    w.ready = true
+    readyWorker = w
+    signal(gSomeReady)
+    await(w.taskArrived)
+    assert(not w.ready)
+    w.f(w,
+  MaxThreadPoolSize* = 256 ## maximal size of the thread pool. 256 threads
+                           ## should be good enough for anybody ;-)
+  currentPoolSize: int
+  maxPoolSize = MaxThreadPoolSize
+  minPoolSize = 4
+proc setMinPoolSize*(size: range[1..MaxThreadPoolSize]) =
+  ## sets the minimal thread pool size. The default value of this is 4.
+  minPoolSize = size
+proc setMaxPoolSize*(size: range[1..MaxThreadPoolSize]) =
+  ## sets the minimal thread pool size. The default value of this
+  ## is ``MaxThreadPoolSize``.
+  maxPoolSize = size
+  workers: array[MaxThreadPoolSize, TThread[ptr Worker]]
+  workersData: array[MaxThreadPoolSize, Worker]
+proc activateThread(i: int) {.noinline.} =
+  workersData[i].taskArrived = createCondVar()
+  workersData[i].taskStarted = createCondVar()
+  workersData[i].initialized = true
+  createThread(workers[i], slave, addr(workersData[i]))
+proc setup() =
+  currentPoolSize = min(countProcessors(), MaxThreadPoolSize)
+  readyWorker = addr(workersData[0])
+  for i in 0.. <currentPoolSize: activateThread(i)
+proc preferSpawn*(): bool =
+  ## Use this proc to determine quickly if a 'spawn' or a direct call is
+  ## preferable. If it returns 'true' a 'spawn' may make sense. In general
+  ## it is not necessary to call this directly; use 'spawnX' instead.
+  result = gSomeReady.counter > 0
+proc spawn*(call: stmt) {.magic: "Spawn".}
+  ## always spawns a new task, so that the 'call' is never executed on
+  ## the calling thread. 'call' has to be proc call 'p(...)' where 'p'
+  ## is gcsafe and has 'void' as the return type.
+template spawnX*(call: stmt) =
+  ## spawns a new task if a CPU core is ready, otherwise executes the
+  ## call in the calling thread. Usually it is advised to
+  ## use 'spawn' in order to not block the producer for an unknown
+  ## amount of time. 'call' has to be proc call 'p(...)' where 'p'
+  ## is gcsafe and has 'void' as the return type.
+  if preferSpawn(): spawn call
+  else: call
+proc parallel*(body: stmt) {.magic: "Parallel".}
+  ## a parallel section can be used to execute a block in parallel. ``body``
+  ## has to be in a DSL that is a particular subset of the language. Please
+  ## refer to the manual for further information.
+  state: ThreadPoolState
+  stateLock: TLock
+initLock stateLock
+proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool =
+  if cas(addr w.ready, true, false):
+ = data
+    w.f = fn
+    signal(w.taskArrived)
+    await(w.taskStarted)
+    result = true
+proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} =
+  # implementation of 'spawn' that is used by the code generator.
+  while true:
+    if selectWorker(readyWorker, fn, data): return
+    for i in 0.. <currentPoolSize:
+      if selectWorker(addr(workersData[i]), fn, data): return
+    # determine what to do, but keep in mind this is expensive too:
+    # state.calls < maxPoolSize: warmup phase
+    # (state.calls and 127) == 0: periodic check
+    if state.calls < maxPoolSize or (state.calls and 127) == 0:
+      # ensure the call to 'advice' is atomic:
+      if tryAcquire(stateLock):
+        case advice(state)
+        of doNothing: discard
+        of doCreateThread:
+          if currentPoolSize < maxPoolSize:
+            if not workersData[currentPoolSize].initialized:
+              activateThread(currentPoolSize)
+            let w = addr(workersData[currentPoolSize])
+            inc currentPoolSize
+            if selectWorker(w, fn, data):
+              release(stateLock)
+              return
+            # else we didn't succeed but some other thread, so do nothing.
+        of doShutdownThread:
+          if currentPoolSize > minPoolSize: dec currentPoolSize
+          # we don't free anything here. Too dangerous.
+        release(stateLock)
+      # else the acquire failed, but this means some
+      # other thread succeeded, so we don't need to do anything here.
+    await(gSomeReady)
+proc sync*() =
+  ## a simple barrier to wait for all spawn'ed tasks. If you need more elaborate
+  ## waiting, you have to use an explicit barrier.
+  while true:
+    var allReady = true
+    for i in 0 .. <currentPoolSize:
+      if not allReady: break
+      allReady = allReady and workersData[i].ready
+    if allReady: break
+    await(gSomeReady)