diff options
Diffstat (limited to 'lib/pure/concurrency')
-rw-r--r-- | lib/pure/concurrency/cpuload.nim | 25 | ||||
-rw-r--r-- | lib/pure/concurrency/threadpool.nim | 28 |
2 files changed, 33 insertions, 20 deletions
diff --git a/lib/pure/concurrency/cpuload.nim b/lib/pure/concurrency/cpuload.nim index db5f47407..1ec739485 100644 --- a/lib/pure/concurrency/cpuload.nim +++ b/lib/pure/concurrency/cpuload.nim @@ -60,17 +60,20 @@ proc advice*(s: var ThreadPoolState): ThreadPoolAdvice = proc fscanf(c: File, frmt: cstring) {.varargs, importc, header: "<stdio.h>".} - var f = open("/proc/loadavg") - var b: float - var busy, total: int - fscanf(f,"%lf %lf %lf %ld/%ld", - addr b, addr b, addr b, addr busy, addr total) - f.close() - let cpus = countProcessors() - if busy-1 < cpus: - result = doCreateThread - elif busy-1 >= cpus*2: - result = doShutdownThread + var f: File + if f.open("/proc/loadavg"): + var b: float + var busy, total: int + fscanf(f,"%lf %lf %lf %ld/%ld", + addr b, addr b, addr b, addr busy, addr total) + f.close() + let cpus = countProcessors() + if busy-1 < cpus: + result = doCreateThread + elif busy-1 >= cpus*2: + result = doShutdownThread + else: + result = doNothing else: result = doNothing else: diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim index a5eaec86e..6ec71e912 100644 --- a/lib/pure/concurrency/threadpool.nim +++ b/lib/pure/concurrency/threadpool.nim @@ -168,6 +168,15 @@ proc wakeupWorkerToProcessQueue(w: ptr Worker) = signal(w.q.empty) signal(w.taskArrived) +proc attach(fv: FlowVarBase; i: int): bool = + acquire(fv.cv.L) + if fv.cv.counter <= 0: + fv.idx = i + result = true + else: + result = false + release(fv.cv.L) + proc finished(fv: FlowVarBase) = doAssert fv.ai.isNil, "flowVar is still attached to an 'awaitAny'" # we have to protect against the rare cases where the owner of the flowVar @@ -245,26 +254,27 @@ proc `^`*[T](fv: FlowVar[T]): T = proc awaitAny*(flowVars: openArray[FlowVarBase]): int = ## awaits any of the given flowVars. Returns the index of one flowVar for ## which a value arrived. A flowVar only supports one call to 'awaitAny' at - ## the same time. That means if you await([a,b]) and await([b,c]) the second + ## the same time. That means if you awaitAny([a,b]) and awaitAny([b,c]) the second ## call will only await 'c'. If there is no flowVar left to be able to wait ## on, -1 is returned. - ## **Note**: This results in non-deterministic behaviour and so should be - ## avoided. + ## **Note**: This results in non-deterministic behaviour and should be avoided. var ai: AwaitInfo ai.cv.initSemaphore() var conflicts = 0 + result = -1 for i in 0 .. flowVars.high: if cas(addr flowVars[i].ai, nil, addr ai): - flowVars[i].idx = i + if not attach(flowVars[i], i): + result = i + break else: inc conflicts if conflicts < flowVars.len: - await(ai.cv) - result = ai.idx + if result < 0: + await(ai.cv) + result = ai.idx for i in 0 .. flowVars.high: discard cas(addr flowVars[i].ai, addr ai, nil) - else: - result = -1 destroySemaphore(ai.cv) proc isReady*(fv: FlowVarBase): bool = @@ -321,7 +331,7 @@ proc slave(w: ptr Worker) {.thread.} = await(w.taskArrived) # XXX Somebody needs to look into this (why does this assertion fail # in Visual Studio?) - when not defined(vcc): assert(not w.ready) + when not defined(vcc) and not defined(tcc): assert(not w.ready) withLock numSlavesLock: inc numSlavesRunning |