diff options
author | Araq <rumpf_a@web.de> | 2014-05-12 11:12:37 +0200 |
---|---|---|
committer | Araq <rumpf_a@web.de> | 2014-05-12 11:12:37 +0200 |
commit | 6195dbe491ccd864c5dcb59f87826291ac1f1ff4 (patch) | |
tree | d2a549857984a296ab4b3e2be72745d413da005a /lib/pure/concurrency | |
parent | bdb2d21f276c10aee122218384e568ef843690fa (diff) | |
download | Nim-6195dbe491ccd864c5dcb59f87826291ac1f1ff4.tar.gz |
initial non-compiling version of 'parallel'
Diffstat (limited to 'lib/pure/concurrency')
-rw-r--r-- | lib/pure/concurrency/cpuinfo.nim | 58 | ||||
-rw-r--r-- | lib/pure/concurrency/cpuload.nim | 96 | ||||
-rw-r--r-- | lib/pure/concurrency/threadpool.nim | 210 |
3 files changed, 364 insertions, 0 deletions
diff --git a/lib/pure/concurrency/cpuinfo.nim b/lib/pure/concurrency/cpuinfo.nim new file mode 100644 index 000000000..dfa819f64 --- /dev/null +++ b/lib/pure/concurrency/cpuinfo.nim @@ -0,0 +1,58 @@ +# +# +# Nimrod's Runtime Library +# (c) Copyright 2014 Andreas Rumpf +# +# See the file "copying.txt", included in this +# distribution, for details about the copyright. +# + +## This module implements procs to determine the number of CPUs / cores. + +include "system/inclrtl" + +import strutils, os + +when not defined(windows): + import posix + +when defined(linux): + import linux + +when defined(macosx) or defined(bsd): + const + CTL_HW = 6 + HW_AVAILCPU = 25 + HW_NCPU = 3 + proc sysctl(x: ptr array[0..3, cint], y: cint, z: pointer, + a: var csize, b: pointer, c: int): cint {. + importc: "sysctl", header: "<sys/sysctl.h>".} + +proc countProcessors*(): int {.rtl, extern: "ncpi$1".} = + ## returns the numer of the processors/cores the machine has. + ## Returns 0 if it cannot be detected. + when defined(windows): + var x = getEnv("NUMBER_OF_PROCESSORS") + if x.len > 0: result = parseInt(x.string) + elif defined(macosx) or defined(bsd): + var + mib: array[0..3, cint] + numCPU: int + len: csize + mib[0] = CTL_HW + mib[1] = HW_AVAILCPU + len = sizeof(numCPU) + discard sysctl(addr(mib), 2, addr(numCPU), len, nil, 0) + if numCPU < 1: + mib[1] = HW_NCPU + discard sysctl(addr(mib), 2, addr(numCPU), len, nil, 0) + result = numCPU + elif defined(hpux): + result = mpctl(MPC_GETNUMSPUS, nil, nil) + elif defined(irix): + var SC_NPROC_ONLN {.importc: "_SC_NPROC_ONLN", header: "<unistd.h>".}: cint + result = sysconf(SC_NPROC_ONLN) + else: + result = sysconf(SC_NPROCESSORS_ONLN) + if result <= 0: result = 1 + diff --git a/lib/pure/concurrency/cpuload.nim b/lib/pure/concurrency/cpuload.nim new file mode 100644 index 000000000..3cf6a7392 --- /dev/null +++ b/lib/pure/concurrency/cpuload.nim @@ -0,0 +1,96 @@ +# +# +# Nimrod's Runtime Library +# (c) Copyright 2014 Andreas Rumpf +# +# See the file "copying.txt", included in this +# distribution, for details about the copyright. +# + +## This module implements a helper for a thread pool to determine whether +## creating a thread is a good idea. + +when defined(windows): + import winlean, os, strutils, math + + proc `-`(a, b: TFILETIME): int64 = a.rdFileTime - b.rdFileTime +elif defined(linux): + from cpuinfo import countProcessors + +type + ThreadPoolAdvice* = enum + doNothing, + doCreateThread, # create additional thread for throughput + doShutdownThread # too many threads are busy, shutdown one + + ThreadPoolState* = object + when defined(windows): + prevSysKernel, prevSysUser, prevProcKernel, prevProcUser: TFILETIME + calls*: int + +proc advice*(s: var ThreadPoolState): ThreadPoolAdvice = + when defined(windows): + var + sysIdle, sysKernel, sysUser, + procCreation, procExit, procKernel, procUser: TFILETIME + if getSystemTimes(sysIdle, sysKernel, sysUser) == 0 or + getProcessTimes(THandle(-1), procCreation, procExit, + procKernel, procUser) == 0: + return doNothing + if s.calls > 0: + let + sysKernelDiff = sysKernel - s.prevSysKernel + sysUserDiff = sysUser - s.prevSysUser + + procKernelDiff = procKernel - s.prevProcKernel + procUserDiff = procUser - s.prevProcUser + + sysTotal = int(sysKernelDiff + sysUserDiff) + procTotal = int(procKernelDiff + procUserDiff) + # total CPU usage < 85% --> create a new worker thread. + # Measurements show that 100% and often even 90% is not reached even + # if all my cores are busy. + if sysTotal == 0 or procTotal / sysTotal < 0.85: + result = doCreateThread + s.prevSysKernel = sysKernel + s.prevSysUser = sysUser + s.prevProcKernel = procKernel + s.prevProcUser = procUser + elif defined(linux): + proc fscanf(c: TFile, frmt: cstring) {.varargs, importc, + header: "<stdio.h>".} + + var f = open("/proc/loadavg") + var b: float + var busy, total: int + fscanf(f,"%lf %lf %lf %ld/%ld", + addr b, addr b, addr b, addr busy, addr total) + f.close() + let cpus = countProcessors() + if busy-1 < cpus: + result = doCreateThread + elif busy-1 >= cpus*2: + result = doShutdownThread + else: + result = doNothing + else: + # XXX implement this for other OSes + result = doNothing + inc s.calls + +when isMainModule: + proc busyLoop() = + while true: + discard random(80) + os.sleep(100) + + spawn busyLoop() + spawn busyLoop() + spawn busyLoop() + spawn busyLoop() + + var s: ThreadPoolState + + for i in 1 .. 70: + echo advice(s) + os.sleep(1000) diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim new file mode 100644 index 000000000..856820c6e --- /dev/null +++ b/lib/pure/concurrency/threadpool.nim @@ -0,0 +1,210 @@ +# +# +# Nimrod's Runtime Library +# (c) Copyright 2014 Andreas Rumpf +# +# See the file "copying.txt", included in this +# distribution, for details about the copyright. +# + +## Implements Nimrod's 'spawn'. + +import cpuinfo, cpuload, locks + +{.push stackTrace:off.} + +type + CondVar = object + c: TCond + L: TLock + counter: int + +proc createCondVar(): CondVar = + initCond(result.c) + initLock(result.L) + +proc destroyCondVar(cv: var CondVar) {.inline.} = + deinitCond(cv.c) + deinitLock(cv.L) + +proc await(cv: var CondVar) = + acquire(cv.L) + while cv.counter <= 0: + wait(cv.c, cv.L) + dec cv.counter + release(cv.L) + +proc signal(cv: var CondVar) = + acquire(cv.L) + inc cv.counter + release(cv.L) + signal(cv.c) + +type + Barrier* {.compilerProc.} = object + counter: int + cv: CondVar + +proc barrierEnter*(b: ptr Barrier) {.compilerProc.} = + atomicInc b.counter + +proc barrierLeave*(b: ptr Barrier) {.compilerProc.} = + atomicDec b.counter + if b.counter <= 0: signal(b.cv) + +proc openBarrier*(b: ptr Barrier) {.compilerProc.} = + b.counter = 0 + b.cv = createCondVar() + +proc closeBarrier*(b: ptr Barrier) {.compilerProc.} = + await(b.cv) + destroyCondVar(b.cv) + +{.pop.} + +# ---------------------------------------------------------------------------- + +type + WorkerProc = proc (thread, args: pointer) {.nimcall, gcsafe.} + Worker = object + taskArrived: CondVar + taskStarted: CondVar #\ + # task data: + f: WorkerProc + data: pointer + ready: bool # put it here for correct alignment! + initialized: bool # whether it has even been initialized + +proc nimArgsPassingDone(p: pointer) {.compilerProc.} = + let w = cast[ptr Worker](p) + signal(w.taskStarted) + +var + gSomeReady = createCondVar() + readyWorker: ptr Worker + +proc slave(w: ptr Worker) {.thread.} = + while true: + w.ready = true + readyWorker = w + signal(gSomeReady) + await(w.taskArrived) + assert(not w.ready) + w.f(w, w.data) + +const + MaxThreadPoolSize* = 256 ## maximal size of the thread pool. 256 threads + ## should be good enough for anybody ;-) + +var + currentPoolSize: int + maxPoolSize = MaxThreadPoolSize + minPoolSize = 4 + +proc setMinPoolSize*(size: range[1..MaxThreadPoolSize]) = + ## sets the minimal thread pool size. The default value of this is 4. + minPoolSize = size + +proc setMaxPoolSize*(size: range[1..MaxThreadPoolSize]) = + ## sets the minimal thread pool size. The default value of this + ## is ``MaxThreadPoolSize``. + maxPoolSize = size + +var + workers: array[MaxThreadPoolSize, TThread[ptr Worker]] + workersData: array[MaxThreadPoolSize, Worker] + +proc activateThread(i: int) {.noinline.} = + workersData[i].taskArrived = createCondVar() + workersData[i].taskStarted = createCondVar() + workersData[i].initialized = true + createThread(workers[i], slave, addr(workersData[i])) + +proc setup() = + currentPoolSize = min(countProcessors(), MaxThreadPoolSize) + readyWorker = addr(workersData[0]) + for i in 0.. <currentPoolSize: activateThread(i) + +proc preferSpawn*(): bool = + ## Use this proc to determine quickly if a 'spawn' or a direct call is + ## preferable. If it returns 'true' a 'spawn' may make sense. In general + ## it is not necessary to call this directly; use 'spawnX' instead. + result = gSomeReady.counter > 0 + +proc spawn*(call: stmt) {.magic: "Spawn".} + ## always spawns a new task, so that the 'call' is never executed on + ## the calling thread. 'call' has to be proc call 'p(...)' where 'p' + ## is gcsafe and has 'void' as the return type. + +template spawnX*(call: stmt) = + ## spawns a new task if a CPU core is ready, otherwise executes the + ## call in the calling thread. Usually it is advised to + ## use 'spawn' in order to not block the producer for an unknown + ## amount of time. 'call' has to be proc call 'p(...)' where 'p' + ## is gcsafe and has 'void' as the return type. + if preferSpawn(): spawn call + else: call + +proc parallel*(body: stmt) {.magic: "Parallel".} + ## a parallel section can be used to execute a block in parallel. ``body`` + ## has to be in a DSL that is a particular subset of the language. Please + ## refer to the manual for further information. + +var + state: ThreadPoolState + stateLock: TLock + +initLock stateLock + +proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool = + if cas(addr w.ready, true, false): + w.data = data + w.f = fn + signal(w.taskArrived) + await(w.taskStarted) + result = true + +proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} = + # implementation of 'spawn' that is used by the code generator. + while true: + if selectWorker(readyWorker, fn, data): return + for i in 0.. <currentPoolSize: + if selectWorker(addr(workersData[i]), fn, data): return + # determine what to do, but keep in mind this is expensive too: + # state.calls < maxPoolSize: warmup phase + # (state.calls and 127) == 0: periodic check + if state.calls < maxPoolSize or (state.calls and 127) == 0: + # ensure the call to 'advice' is atomic: + if tryAcquire(stateLock): + case advice(state) + of doNothing: discard + of doCreateThread: + if currentPoolSize < maxPoolSize: + if not workersData[currentPoolSize].initialized: + activateThread(currentPoolSize) + let w = addr(workersData[currentPoolSize]) + inc currentPoolSize + if selectWorker(w, fn, data): + release(stateLock) + return + # else we didn't succeed but some other thread, so do nothing. + of doShutdownThread: + if currentPoolSize > minPoolSize: dec currentPoolSize + # we don't free anything here. Too dangerous. + release(stateLock) + # else the acquire failed, but this means some + # other thread succeeded, so we don't need to do anything here. + await(gSomeReady) + +proc sync*() = + ## a simple barrier to wait for all spawn'ed tasks. If you need more elaborate + ## waiting, you have to use an explicit barrier. + while true: + var allReady = true + for i in 0 .. <currentPoolSize: + if not allReady: break + allReady = allReady and workersData[i].ready + if allReady: break + await(gSomeReady) + +setup() |