diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/pure/concurrency/cpuinfo.nim | 58 | ||||
-rw-r--r-- | lib/pure/concurrency/cpuload.nim | 96 | ||||
-rw-r--r-- | lib/pure/concurrency/threadpool.nim | 378 | ||||
-rw-r--r-- | lib/pure/osproc.nim | 38 | ||||
-rw-r--r-- | lib/system.nim | 11 | ||||
-rw-r--r-- | lib/system/assign.nim | 3 | ||||
-rw-r--r-- | lib/system/atomics.nim | 70 | ||||
-rw-r--r-- | lib/system/sysspawn.nim | 47 |
8 files changed, 608 insertions, 93 deletions
diff --git a/lib/pure/concurrency/cpuinfo.nim b/lib/pure/concurrency/cpuinfo.nim new file mode 100644 index 000000000..dfa819f64 --- /dev/null +++ b/lib/pure/concurrency/cpuinfo.nim @@ -0,0 +1,58 @@ +# +# +# Nimrod's Runtime Library +# (c) Copyright 2014 Andreas Rumpf +# +# See the file "copying.txt", included in this +# distribution, for details about the copyright. +# + +## This module implements procs to determine the number of CPUs / cores. + +include "system/inclrtl" + +import strutils, os + +when not defined(windows): + import posix + +when defined(linux): + import linux + +when defined(macosx) or defined(bsd): + const + CTL_HW = 6 + HW_AVAILCPU = 25 + HW_NCPU = 3 + proc sysctl(x: ptr array[0..3, cint], y: cint, z: pointer, + a: var csize, b: pointer, c: int): cint {. + importc: "sysctl", header: "<sys/sysctl.h>".} + +proc countProcessors*(): int {.rtl, extern: "ncpi$1".} = + ## returns the numer of the processors/cores the machine has. + ## Returns 0 if it cannot be detected. + when defined(windows): + var x = getEnv("NUMBER_OF_PROCESSORS") + if x.len > 0: result = parseInt(x.string) + elif defined(macosx) or defined(bsd): + var + mib: array[0..3, cint] + numCPU: int + len: csize + mib[0] = CTL_HW + mib[1] = HW_AVAILCPU + len = sizeof(numCPU) + discard sysctl(addr(mib), 2, addr(numCPU), len, nil, 0) + if numCPU < 1: + mib[1] = HW_NCPU + discard sysctl(addr(mib), 2, addr(numCPU), len, nil, 0) + result = numCPU + elif defined(hpux): + result = mpctl(MPC_GETNUMSPUS, nil, nil) + elif defined(irix): + var SC_NPROC_ONLN {.importc: "_SC_NPROC_ONLN", header: "<unistd.h>".}: cint + result = sysconf(SC_NPROC_ONLN) + else: + result = sysconf(SC_NPROCESSORS_ONLN) + if result <= 0: result = 1 + diff --git a/lib/pure/concurrency/cpuload.nim b/lib/pure/concurrency/cpuload.nim new file mode 100644 index 000000000..3cf6a7392 --- /dev/null +++ b/lib/pure/concurrency/cpuload.nim @@ -0,0 +1,96 @@ +# +# +# Nimrod's Runtime Library +# (c) Copyright 2014 Andreas Rumpf +# +# See the file "copying.txt", included in this +# distribution, for details about the copyright. +# + +## This module implements a helper for a thread pool to determine whether +## creating a thread is a good idea. + +when defined(windows): + import winlean, os, strutils, math + + proc `-`(a, b: TFILETIME): int64 = a.rdFileTime - b.rdFileTime +elif defined(linux): + from cpuinfo import countProcessors + +type + ThreadPoolAdvice* = enum + doNothing, + doCreateThread, # create additional thread for throughput + doShutdownThread # too many threads are busy, shutdown one + + ThreadPoolState* = object + when defined(windows): + prevSysKernel, prevSysUser, prevProcKernel, prevProcUser: TFILETIME + calls*: int + +proc advice*(s: var ThreadPoolState): ThreadPoolAdvice = + when defined(windows): + var + sysIdle, sysKernel, sysUser, + procCreation, procExit, procKernel, procUser: TFILETIME + if getSystemTimes(sysIdle, sysKernel, sysUser) == 0 or + getProcessTimes(THandle(-1), procCreation, procExit, + procKernel, procUser) == 0: + return doNothing + if s.calls > 0: + let + sysKernelDiff = sysKernel - s.prevSysKernel + sysUserDiff = sysUser - s.prevSysUser + + procKernelDiff = procKernel - s.prevProcKernel + procUserDiff = procUser - s.prevProcUser + + sysTotal = int(sysKernelDiff + sysUserDiff) + procTotal = int(procKernelDiff + procUserDiff) + # total CPU usage < 85% --> create a new worker thread. + # Measurements show that 100% and often even 90% is not reached even + # if all my cores are busy. + if sysTotal == 0 or procTotal / sysTotal < 0.85: + result = doCreateThread + s.prevSysKernel = sysKernel + s.prevSysUser = sysUser + s.prevProcKernel = procKernel + s.prevProcUser = procUser + elif defined(linux): + proc fscanf(c: TFile, frmt: cstring) {.varargs, importc, + header: "<stdio.h>".} + + var f = open("/proc/loadavg") + var b: float + var busy, total: int + fscanf(f,"%lf %lf %lf %ld/%ld", + addr b, addr b, addr b, addr busy, addr total) + f.close() + let cpus = countProcessors() + if busy-1 < cpus: + result = doCreateThread + elif busy-1 >= cpus*2: + result = doShutdownThread + else: + result = doNothing + else: + # XXX implement this for other OSes + result = doNothing + inc s.calls + +when isMainModule: + proc busyLoop() = + while true: + discard random(80) + os.sleep(100) + + spawn busyLoop() + spawn busyLoop() + spawn busyLoop() + spawn busyLoop() + + var s: ThreadPoolState + + for i in 1 .. 70: + echo advice(s) + os.sleep(1000) diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim new file mode 100644 index 000000000..fd1041918 --- /dev/null +++ b/lib/pure/concurrency/threadpool.nim @@ -0,0 +1,378 @@ +# +# +# Nimrod's Runtime Library +# (c) Copyright 2014 Andreas Rumpf +# +# See the file "copying.txt", included in this +# distribution, for details about the copyright. +# + +## Implements Nimrod's 'spawn'. + +import cpuinfo, cpuload, locks + +{.push stackTrace:off.} + +type + CondVar = object + c: TCond + L: TLock + counter: int + +proc createCondVar(): CondVar = + initCond(result.c) + initLock(result.L) + +proc destroyCondVar(cv: var CondVar) {.inline.} = + deinitCond(cv.c) + deinitLock(cv.L) + +proc await(cv: var CondVar) = + acquire(cv.L) + while cv.counter <= 0: + wait(cv.c, cv.L) + dec cv.counter + release(cv.L) + +proc signal(cv: var CondVar) = + acquire(cv.L) + inc cv.counter + release(cv.L) + signal(cv.c) + +const CacheLineSize = 32 # true for most archs + +type + Barrier {.compilerProc.} = object + entered: int + cv: CondVar # condvar takes 3 words at least + when sizeof(int) < 8: + cacheAlign: array[CacheLineSize-4*sizeof(int), byte] + left: int + cacheAlign2: array[CacheLineSize-sizeof(int), byte] + interest: bool ## wether the master is interested in the "all done" event + +proc barrierEnter(b: ptr Barrier) {.compilerProc, inline.} = + # due to the signaling between threads, it is ensured we are the only + # one with access to 'entered' so we don't need 'atomicInc' here: + inc b.entered + # also we need no 'fence' instructions here as soon 'nimArgsPassingDone' + # will be called which already will perform a fence for us. + +proc barrierLeave(b: ptr Barrier) {.compilerProc, inline.} = + atomicInc b.left + when not defined(x86): fence() + if b.interest and b.left == b.entered: signal(b.cv) + +proc openBarrier(b: ptr Barrier) {.compilerProc, inline.} = + b.entered = 0 + b.left = 0 + b.interest = false + +proc closeBarrier(b: ptr Barrier) {.compilerProc.} = + fence() + if b.left != b.entered: + b.cv = createCondVar() + fence() + b.interest = true + fence() + while b.left != b.entered: await(b.cv) + destroyCondVar(b.cv) + +{.pop.} + +# ---------------------------------------------------------------------------- + +type + foreign* = object ## a region that indicates the pointer comes from a + ## foreign thread heap. + AwaitInfo = object + cv: CondVar + idx: int + + FlowVarBase* = ref FlowVarBaseObj ## untyped base class for 'FlowVar[T]' + FlowVarBaseObj = object of TObject + ready, usesCondVar: bool + cv: CondVar #\ + # for 'awaitAny' support + ai: ptr AwaitInfo + idx: int + data: pointer # we incRef and unref it to keep it alive + owner: pointer # ptr Worker + + FlowVarObj[T] = object of FlowVarBaseObj + blob: T + + FlowVar*{.compilerProc.}[T] = ref FlowVarObj[T] ## a data flow variable + + ToFreeQueue = object + len: int + lock: TLock + empty: TCond + data: array[512, pointer] + + WorkerProc = proc (thread, args: pointer) {.nimcall, gcsafe.} + Worker = object + taskArrived: CondVar + taskStarted: CondVar #\ + # task data: + f: WorkerProc + data: pointer + ready: bool # put it here for correct alignment! + initialized: bool # whether it has even been initialized + shutdown: bool # the pool requests to shut down this worker thread + q: ToFreeQueue + +proc await*(fv: FlowVarBase) = + ## waits until the value for the flowVar arrives. Usually it is not necessary + ## to call this explicitly. + if fv.usesCondVar: + fv.usesCondVar = false + await(fv.cv) + destroyCondVar(fv.cv) + +proc finished(fv: FlowVarBase) = + doAssert fv.ai.isNil, "flowVar is still attached to an 'awaitAny'" + # we have to protect against the rare cases where the owner of the flowVar + # simply disregards the flowVar and yet the "flowVarr" has not yet written + # anything to it: + await(fv) + if fv.data.isNil: return + let owner = cast[ptr Worker](fv.owner) + let q = addr(owner.q) + var waited = false + while true: + acquire(q.lock) + if q.len < q.data.len: + q.data[q.len] = fv.data + inc q.len + release(q.lock) + break + else: + # the queue is exhausted! We block until it has been cleaned: + release(q.lock) + wait(q.empty, q.lock) + waited = true + fv.data = nil + # wakeup other potentially waiting threads: + if waited: signal(q.empty) + +proc cleanFlowVars(w: ptr Worker) = + let q = addr(w.q) + acquire(q.lock) + for i in 0 .. <q.len: + GC_unref(cast[PObject](q.data[i])) + q.len = 0 + release(q.lock) + signal(q.empty) + +proc fvFinalizer[T](fv: FlowVar[T]) = finished(fv) + +proc nimCreateFlowVar[T](): FlowVar[T] {.compilerProc.} = + new(result, fvFinalizer) + +proc nimFlowVarCreateCondVar(fv: FlowVarBase) {.compilerProc.} = + fv.cv = createCondVar() + fv.usesCondVar = true + +proc nimFlowVarSignal(fv: FlowVarBase) {.compilerProc.} = + if fv.ai != nil: + acquire(fv.ai.cv.L) + fv.ai.idx = fv.idx + inc fv.ai.cv.counter + release(fv.ai.cv.L) + signal(fv.ai.cv.c) + if fv.usesCondVar: signal(fv.cv) + +proc awaitAndThen*[T](fv: FlowVar[T]; action: proc (x: T) {.closure.}) = + ## blocks until the ``fv`` is available and then passes its value + ## to ``action``. Note that due to Nimrod's parameter passing semantics this + ## means that ``T`` doesn't need to be copied and so ``awaitAndThen`` can + ## sometimes be more efficient than ``^``. + await(fv) + when T is string or T is seq: + action(cast[T](fv.data)) + elif T is ref: + {.error: "'awaitAndThen' not available for FlowVar[ref]".} + else: + action(fv.blob) + finished(fv) + +proc `^`*[T](fv: FlowVar[ref T]): foreign ptr T = + ## blocks until the value is available and then returns this value. + await(fv) + result = cast[foreign ptr T](fv.data) + +proc `^`*[T](fv: FlowVar[T]): T = + ## blocks until the value is available and then returns this value. + await(fv) + when T is string or T is seq: + result = cast[T](fv.data) + else: + result = fv.blob + +proc awaitAny*(flowVars: openArray[FlowVarBase]): int = + ## awaits any of the given flowVars. Returns the index of one flowVar for + ## which a value arrived. A flowVar only supports one call to 'awaitAny' at + ## the same time. That means if you await([a,b]) and await([b,c]) the second + ## call will only await 'c'. If there is no flowVar left to be able to wait + ## on, -1 is returned. + ## **Note**: This results in non-deterministic behaviour and so should be + ## avoided. + var ai: AwaitInfo + ai.cv = createCondVar() + var conflicts = 0 + for i in 0 .. flowVars.high: + if cas(addr flowVars[i].ai, nil, addr ai): + flowVars[i].idx = i + else: + inc conflicts + if conflicts < flowVars.len: + await(ai.cv) + result = ai.idx + for i in 0 .. flowVars.high: + discard cas(addr flowVars[i].ai, addr ai, nil) + else: + result = -1 + destroyCondVar(ai.cv) + +proc nimArgsPassingDone(p: pointer) {.compilerProc.} = + let w = cast[ptr Worker](p) + signal(w.taskStarted) + +const + MaxThreadPoolSize* = 256 ## maximal size of the thread pool. 256 threads + ## should be good enough for anybody ;-) + +var + currentPoolSize: int + maxPoolSize = MaxThreadPoolSize + minPoolSize = 4 + gSomeReady = createCondVar() + readyWorker: ptr Worker + +proc slave(w: ptr Worker) {.thread.} = + while true: + w.ready = true + readyWorker = w + signal(gSomeReady) + await(w.taskArrived) + assert(not w.ready) + w.f(w, w.data) + if w.q.len != 0: w.cleanFlowVars + if w.shutdown: + w.shutdown = false + atomicDec currentPoolSize + +proc setMinPoolSize*(size: range[1..MaxThreadPoolSize]) = + ## sets the minimal thread pool size. The default value of this is 4. + minPoolSize = size + +proc setMaxPoolSize*(size: range[1..MaxThreadPoolSize]) = + ## sets the minimal thread pool size. The default value of this + ## is ``MaxThreadPoolSize``. + maxPoolSize = size + +var + workers: array[MaxThreadPoolSize, TThread[ptr Worker]] + workersData: array[MaxThreadPoolSize, Worker] + +proc activateThread(i: int) {.noinline.} = + workersData[i].taskArrived = createCondVar() + workersData[i].taskStarted = createCondVar() + workersData[i].initialized = true + initCond(workersData[i].q.empty) + initLock(workersData[i].q.lock) + createThread(workers[i], slave, addr(workersData[i])) + +proc setup() = + currentPoolSize = min(countProcessors(), MaxThreadPoolSize) + readyWorker = addr(workersData[0]) + for i in 0.. <currentPoolSize: activateThread(i) + +proc preferSpawn*(): bool = + ## Use this proc to determine quickly if a 'spawn' or a direct call is + ## preferable. If it returns 'true' a 'spawn' may make sense. In general + ## it is not necessary to call this directly; use 'spawnX' instead. + result = gSomeReady.counter > 0 + +proc spawn*(call: expr): expr {.magic: "Spawn".} + ## always spawns a new task, so that the 'call' is never executed on + ## the calling thread. 'call' has to be proc call 'p(...)' where 'p' + ## is gcsafe and has a return type that is either 'void' or compatible + ## with ``FlowVar[T]``. + +template spawnX*(call: expr): expr = + ## spawns a new task if a CPU core is ready, otherwise executes the + ## call in the calling thread. Usually it is advised to + ## use 'spawn' in order to not block the producer for an unknown + ## amount of time. 'call' has to be proc call 'p(...)' where 'p' + ## is gcsafe and has a return type that is either 'void' or compatible + ## with ``FlowVar[T]``. + (if preferSpawn(): spawn call else: call) + +proc parallel*(body: stmt) {.magic: "Parallel".} + ## a parallel section can be used to execute a block in parallel. ``body`` + ## has to be in a DSL that is a particular subset of the language. Please + ## refer to the manual for further information. + +var + state: ThreadPoolState + stateLock: TLock + +initLock stateLock + +proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool = + if cas(addr w.ready, true, false): + w.data = data + w.f = fn + signal(w.taskArrived) + await(w.taskStarted) + result = true + +proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} = + # implementation of 'spawn' that is used by the code generator. + while true: + if selectWorker(readyWorker, fn, data): return + for i in 0.. <currentPoolSize: + if selectWorker(addr(workersData[i]), fn, data): return + # determine what to do, but keep in mind this is expensive too: + # state.calls < maxPoolSize: warmup phase + # (state.calls and 127) == 0: periodic check + if state.calls < maxPoolSize or (state.calls and 127) == 0: + # ensure the call to 'advice' is atomic: + if tryAcquire(stateLock): + case advice(state) + of doNothing: discard + of doCreateThread: + if currentPoolSize < maxPoolSize: + if not workersData[currentPoolSize].initialized: + activateThread(currentPoolSize) + let w = addr(workersData[currentPoolSize]) + atomicInc currentPoolSize + if selectWorker(w, fn, data): + release(stateLock) + return + # else we didn't succeed but some other thread, so do nothing. + of doShutdownThread: + if currentPoolSize > minPoolSize: + let w = addr(workersData[currentPoolSize-1]) + w.shutdown = true + # we don't free anything here. Too dangerous. + release(stateLock) + # else the acquire failed, but this means some + # other thread succeeded, so we don't need to do anything here. + await(gSomeReady) + +proc sync*() = + ## a simple barrier to wait for all spawn'ed tasks. If you need more elaborate + ## waiting, you have to use an explicit barrier. + while true: + var allReady = true + for i in 0 .. <currentPoolSize: + if not allReady: break + allReady = allReady and workersData[i].ready + if allReady: break + await(gSomeReady) + +setup() diff --git a/lib/pure/osproc.nim b/lib/pure/osproc.nim index cb9792f2b..04a0c2403 100644 --- a/lib/pure/osproc.nim +++ b/lib/pure/osproc.nim @@ -1,7 +1,7 @@ # # # Nimrod's Runtime Library -# (c) Copyright 2013 Andreas Rumpf +# (c) Copyright 2014 Andreas Rumpf # # See the file "copying.txt", included in this # distribution, for details about the copyright. @@ -13,7 +13,7 @@ include "system/inclrtl" import - strutils, os, strtabs, streams + strutils, os, strtabs, streams, cpuinfo when defined(windows): import winlean @@ -225,42 +225,10 @@ proc errorHandle*(p: PProcess): TFileHandle {.rtl, extern: "nosp$1", ## it is closed when closing the PProcess ``p``. result = p.errHandle -when defined(macosx) or defined(bsd): - const - CTL_HW = 6 - HW_AVAILCPU = 25 - HW_NCPU = 3 - proc sysctl(x: ptr array[0..3, cint], y: cint, z: pointer, - a: var csize, b: pointer, c: int): cint {. - importc: "sysctl", header: "<sys/sysctl.h>".} - proc countProcessors*(): int {.rtl, extern: "nosp$1".} = ## returns the numer of the processors/cores the machine has. ## Returns 0 if it cannot be detected. - when defined(windows): - var x = getEnv("NUMBER_OF_PROCESSORS") - if x.len > 0: result = parseInt(x.string) - elif defined(macosx) or defined(bsd): - var - mib: array[0..3, cint] - numCPU: int - len: csize - mib[0] = CTL_HW - mib[1] = HW_AVAILCPU - len = sizeof(numCPU) - discard sysctl(addr(mib), 2, addr(numCPU), len, nil, 0) - if numCPU < 1: - mib[1] = HW_NCPU - discard sysctl(addr(mib), 2, addr(numCPU), len, nil, 0) - result = numCPU - elif defined(hpux): - result = mpctl(MPC_GETNUMSPUS, nil, nil) - elif defined(irix): - var SC_NPROC_ONLN {.importc: "_SC_NPROC_ONLN", header: "<unistd.h>".}: cint - result = sysconf(SC_NPROC_ONLN) - else: - result = sysconf(SC_NPROCESSORS_ONLN) - if result <= 0: result = 1 + result = cpuinfo.countProcessors() proc execProcesses*(cmds: openArray[string], options = {poStdErrToStdOut, poParentStreams}, diff --git a/lib/system.nim b/lib/system.nim index 2f24f68b1..2c1257467 100644 --- a/lib/system.nim +++ b/lib/system.nim @@ -42,7 +42,6 @@ type cstring* {.magic: Cstring.} ## built-in cstring (*compatible string*) type pointer* {.magic: Pointer.} ## built-in pointer type, use the ``addr`` ## operator to get a pointer to a variable - const on* = true ## alias for ``true`` off* = false ## alias for ``false`` @@ -51,6 +50,9 @@ const type Ordinal* {.magic: Ordinal.}[T] + `ptr`* {.magic: Pointer.}[T] ## built-in generic untraced pointer type + `ref`* {.magic: Pointer.}[T] ## built-in generic traced pointer type + `nil` {.magic: "Nil".} expr* {.magic: Expr.} ## meta type to denote an expression (for templates) stmt* {.magic: Stmt.} ## meta type to denote a statement (for templates) @@ -2983,6 +2985,10 @@ proc locals*(): TObject {.magic: "Locals", noSideEffect.} = ## # -> B is 1 discard +proc deepCopy*[T](x: T): T {.magic: "DeepCopy", noSideEffect.} + ## performs a deep copy of `x`. This is also used by the code generator + ## for the implementation of ``spawn``. + when not defined(booting): type semistatic*[T] = static[T] | T @@ -2991,6 +2997,3 @@ when not defined(booting): template isStatic*(x): expr = compiles(static(x)) # checks whether `x` is a value known at compile-time - -when hasThreadSupport: - when hostOS != "standalone": include "system/sysspawn" diff --git a/lib/system/assign.nim b/lib/system/assign.nim index 75c749633..2ae945fb1 100644 --- a/lib/system/assign.nim +++ b/lib/system/assign.nim @@ -179,7 +179,8 @@ when not defined(nimmixin): # internal proc used for destroying sequences and arrays for i in countup(0, r.len - 1): destroy(r[i]) else: - # XXX Why is this exported and no compilerproc? + # XXX Why is this exported and no compilerproc? -> compilerprocs cannot be + # generic for now proc nimDestroyRange*[T](r: T) = # internal proc used for destroying sequences and arrays mixin destroy diff --git a/lib/system/atomics.nim b/lib/system/atomics.nim index b1a96b209..43b3f0438 100644 --- a/lib/system/atomics.nim +++ b/lib/system/atomics.nim @@ -1,15 +1,18 @@ # # # Nimrod's Runtime Library -# (c) Copyright 2012 Andreas Rumpf +# (c) Copyright 2014 Andreas Rumpf # # See the file "copying.txt", included in this # distribution, for details about the copyright. # ## Atomic operations for Nimrod. +{.push stackTrace:off.} -when (defined(gcc) or defined(llvm_gcc)) and hasThreadSupport: +const someGcc = defined(gcc) or defined(llvm_gcc) or defined(clang) + +when someGcc and hasThreadSupport: type AtomMemModel* = enum ATOMIC_RELAXED, ## No barriers or synchronization. @@ -152,41 +155,16 @@ when (defined(gcc) or defined(llvm_gcc)) and hasThreadSupport: ## A value of 0 indicates typical alignment should be used. The compiler may also ## ignore this parameter. + template fence*() = atomicThreadFence(ATOMIC_SEQ_CST) elif defined(vcc) and hasThreadSupport: proc addAndFetch*(p: ptr int, val: int): int {. importc: "NimXadd", nodecl.} + else: proc addAndFetch*(p: ptr int, val: int): int {.inline.} = inc(p[], val) result = p[] -# atomic compare and swap (CAS) funcitons to implement lock-free algorithms - -#if defined(windows) and not defined(gcc) and hasThreadSupport: -# proc InterlockedCompareExchangePointer(mem: ptr pointer, -# newValue: pointer, comparand: pointer) : pointer {.nodecl, -# importc: "InterlockedCompareExchangePointer", header:"windows.h".} - -# proc compareAndSwap*[T](mem: ptr T, -# expected: T, newValue: T): bool {.inline.}= -# ## Returns true if successfully set value at mem to newValue when value -# ## at mem == expected -# return InterlockedCompareExchangePointer(addr(mem), -# addr(newValue), addr(expected))[] == expected - -#elif not hasThreadSupport: -# proc compareAndSwap*[T](mem: ptr T, -# expected: T, newValue: T): bool {.inline.} = -# ## Returns true if successfully set value at mem to newValue when value -# ## at mem == expected -# var oldval = mem[] -# if oldval == expected: -# mem[] = newValue -# return true -# return false - - -# Some convenient functions proc atomicInc*(memLoc: var int, x: int = 1): int = when defined(gcc) and hasThreadSupport: result = atomic_add_fetch(memLoc.addr, x, ATOMIC_RELAXED) @@ -203,3 +181,37 @@ proc atomicDec*(memLoc: var int, x: int = 1): int = else: dec(memLoc, x) result = memLoc + +when defined(windows) and not someGcc: + proc interlockedCompareExchange(p: pointer; exchange, comparand: int32): int32 + {.importc: "InterlockedCompareExchange", header: "<windows.h>", cdecl.} + + proc cas*[T: bool|int|ptr](p: ptr T; oldValue, newValue: T): bool = + interlockedCompareExchange(p, newValue.int32, oldValue.int32) != 0 + # XXX fix for 64 bit build +else: + # this is valid for GCC and Intel C++ + proc cas*[T: bool|int|ptr](p: ptr T; oldValue, newValue: T): bool + {.importc: "__sync_bool_compare_and_swap", nodecl.} + # XXX is this valid for 'int'? + + +when (defined(x86) or defined(amd64)) and (defined(gcc) or defined(llvm_gcc)): + proc cpuRelax {.inline.} = + {.emit: """asm volatile("pause" ::: "memory");""".} +elif (defined(x86) or defined(amd64)) and defined(vcc): + proc cpuRelax {.importc: "YieldProcessor", header: "<windows.h>".} +elif defined(intelc): + proc cpuRelax {.importc: "_mm_pause", header: "xmmintrin.h".} +elif false: + from os import sleep + + proc cpuRelax {.inline.} = os.sleep(1) + +when not defined(fence) and hasThreadSupport: + # XXX fixme + proc fence*() {.inline.} = + var dummy: bool + discard cas(addr dummy, false, true) + +{.pop.} diff --git a/lib/system/sysspawn.nim b/lib/system/sysspawn.nim index dabf35a3e..95cdba65d 100644 --- a/lib/system/sysspawn.nim +++ b/lib/system/sysspawn.nim @@ -14,30 +14,6 @@ when not defined(NimString): {.push stackTrace:off.} -when (defined(x86) or defined(amd64)) and defined(gcc): - proc cpuRelax {.inline.} = - {.emit: """asm volatile("pause" ::: "memory");""".} -elif (defined(x86) or defined(amd64)) and defined(vcc): - proc cpuRelax {.importc: "YieldProcessor", header: "<windows.h>".} -elif defined(intelc): - proc cpuRelax {.importc: "_mm_pause", header: "xmmintrin.h".} -elif false: - from os import sleep - - proc cpuRelax {.inline.} = os.sleep(1) - -when defined(windows) and not defined(gcc): - proc interlockedCompareExchange(p: pointer; exchange, comparand: int32): int32 - {.importc: "InterlockedCompareExchange", header: "<windows.h>", cdecl.} - - proc cas(p: ptr bool; oldValue, newValue: bool): bool = - interlockedCompareExchange(p, newValue.int32, oldValue.int32) != 0 - -else: - # this is valid for GCC and Intel C++ - proc cas(p: ptr bool; oldValue, newValue: bool): bool - {.importc: "__sync_bool_compare_and_swap", nodecl.} - # We declare our own condition variables here to get rid of the dummy lock # on Windows: @@ -54,6 +30,9 @@ proc createCondVar(): CondVar = initSysLock(result.stupidLock) #acquireSys(result.stupidLock) +proc destroyCondVar(c: var CondVar) {.inline.} = + deinitSysCond(c.c) + proc await(cv: var CondVar) = when defined(posix): acquireSys(cv.stupidLock) @@ -100,6 +79,26 @@ proc signal(cv: var FastCondVar) = #if cas(addr cv.slowPath, true, false): signal(cv.slow) +type + Barrier* {.compilerProc.} = object + counter: int + cv: CondVar + +proc barrierEnter*(b: ptr Barrier) {.compilerProc.} = + atomicInc b.counter + +proc barrierLeave*(b: ptr Barrier) {.compilerProc.} = + atomicDec b.counter + if b.counter <= 0: signal(b.cv) + +proc openBarrier*(b: ptr Barrier) {.compilerProc.} = + b.counter = 0 + b.cv = createCondVar() + +proc closeBarrier*(b: ptr Barrier) {.compilerProc.} = + await(b.cv) + destroyCondVar(b.cv) + {.pop.} # ---------------------------------------------------------------------------- |