diff options
author | Araq <rumpf_a@web.de> | 2014-05-22 08:41:50 +0200 |
---|---|---|
committer | Araq <rumpf_a@web.de> | 2014-05-22 08:41:50 +0200 |
commit | 417b9f5a1d13f26842b1337395a0f5b57827cc12 (patch) | |
tree | 5f9f101521b06bd6eed880507b7103f70f46139c /lib/pure/concurrency | |
parent | 31b8fd66b1bd54b665e52855909538a50d33d7c3 (diff) | |
download | Nim-417b9f5a1d13f26842b1337395a0f5b57827cc12.tar.gz |
'parallel' statement almost working
Diffstat (limited to 'lib/pure/concurrency')
-rw-r--r-- | lib/pure/concurrency/threadpool.nim | 112 |
1 files changed, 112 insertions, 0 deletions
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim index 86819d25a..583c60c66 100644 --- a/lib/pure/concurrency/threadpool.nim +++ b/lib/pure/concurrency/threadpool.nim @@ -65,6 +65,30 @@ proc closeBarrier*(b: ptr Barrier) {.compilerProc.} = # ---------------------------------------------------------------------------- type + AwaitInfo = object + cv: CondVar + idx: int + + RawFuture* = ptr RawFutureObj ## untyped base class for 'Future[T]' + RawFutureObj {.inheritable.} = object # \ + # we allocate this with the thread local allocator; this + # is possible since we already need to do the GC_unref + # on the owning thread + ready, usesCondVar: bool + cv: CondVar #\ + # for 'awaitAny' support + ai: ptr AwaitInfo + idx: int + data: PObject # we incRef and unref it to keep it alive + owner: ptr Worker + next: RawFuture + align: float64 # a float for proper alignment + + Future* {.compilerProc.} [T] = ptr object of RawFutureObj + blob: T ## the underlying value, if available. Note that usually + ## you should not access this field directly! However it can + ## sometimes be more efficient than getting the value via ``^``. + WorkerProc = proc (thread, args: pointer) {.nimcall, gcsafe.} Worker = object taskArrived: CondVar @@ -75,6 +99,92 @@ type ready: bool # put it here for correct alignment! initialized: bool # whether it has even been initialized shutdown: bool # the pool requests to shut down this worker thread + futureLock: TLock + head: RawFuture + +proc finished*(fut: RawFuture) = + ## This MUST be called for every created future to free its associated + ## resources. Note that the default reading operation ``^`` is destructive + ## and calls ``finished``. + doAssert fut.ai.isNil, "future is still attached to an 'awaitAny'" + assert fut.next == nil + let w = fut.owner + acquire(w.futureLock) + fut.next = w.head + w.head = fut + release(w.futureLock) + +proc cleanFutures(w: ptr Worker) = + var it = w.head + acquire(w.futureLock) + while it != nil: + let nxt = it.next + if it.usesCondVar: destroyCondVar(it.cv) + if it.data != nil: GC_unref(it.data) + dealloc(it) + it = nxt + w.head = nil + release(w.futureLock) + +proc nimCreateFuture(owner: pointer; blobSize: int): RawFuture {. + compilerProc.} = + result = cast[RawFuture](alloc0(RawFutureObj.sizeof + blobSize)) + result.owner = cast[ptr Worker](owner) + +proc nimFutureCreateCondVar(fut: RawFuture) {.compilerProc.} = + fut.cv = createCondVar() + fut.usesCondVar = true + +proc nimFutureSignal(fut: RawFuture) {.compilerProc.} = + assert fut.usesCondVar + signal(fut.cv) + +proc await*[T](fut: Future[T]) = + ## waits until the value for the future arrives. + if fut.usesCondVar: await(fut.cv) + +proc `^`*[T](fut: Future[T]): T = + ## blocks until the value is available and then returns this value. Note + ## this reading is destructive for reasons of efficiency and convenience. + ## This calls ``finished(fut)``. + await(fut) + when T is string or T is seq or T is ref: + result = cast[T](fut.data) + else: + result = fut.payload + finished(fut) + +proc notify*(fut: RawFuture) {.compilerproc.} = + if fut.ai != nil: + acquire(fut.ai.cv.L) + fut.ai.idx = fut.idx + inc fut.ai.cv.counter + release(fut.ai.cv.L) + signal(fut.ai.cv.c) + if fut.usesCondVar: signal(fut.cv) + +proc awaitAny*(futures: openArray[RawFuture]): int = + # awaits any of the given futures. Returns the index of one future for which + ## a value arrived. A future only supports one call to 'awaitAny' at the + ## same time. That means if you await([a,b]) and await([b,c]) the second + ## call will only await 'c'. If there is no future left to be able to wait + ## on, -1 is returned. + var ai: AwaitInfo + ai.cv = createCondVar() + var conflicts = 0 + for i in 0 .. futures.high: + if cas(addr futures[i].ai, nil, addr ai): + futures[i].idx = i + else: + inc conflicts + if conflicts < futures.len: + await(ai.cv) + result = ai.idx + for i in 0 .. futures.high: + discard cas(addr futures[i].ai, addr ai, nil) + else: + result = -1 + destroyCondVar(ai.cv) proc nimArgsPassingDone(p: pointer) {.compilerProc.} = let w = cast[ptr Worker](p) @@ -99,6 +209,7 @@ proc slave(w: ptr Worker) {.thread.} = await(w.taskArrived) assert(not w.ready) w.f(w, w.data) + if w.head != nil: w.cleanFutures if w.shutdown: w.shutdown = false atomicDec currentPoolSize @@ -119,6 +230,7 @@ var proc activateThread(i: int) {.noinline.} = workersData[i].taskArrived = createCondVar() workersData[i].taskStarted = createCondVar() + initLock workersData[i].futureLock workersData[i].initialized = true createThread(workers[i], slave, addr(workersData[i])) |