diff options
author | Araq <rumpf_a@web.de> | 2014-06-05 08:46:29 +0200 |
---|---|---|
committer | Araq <rumpf_a@web.de> | 2014-06-05 08:46:29 +0200 |
commit | 2de99653d002b919c88322219bff6f33653081c5 (patch) | |
tree | 6a68a2f8758fb3860b8d59b0410779bb918c14eb /lib/pure/concurrency | |
parent | a4323b06b321b77ea36bf738efdfa481faf9822c (diff) | |
download | Nim-2de99653d002b919c88322219bff6f33653081c5.tar.gz |
Promises are now refs
Diffstat (limited to 'lib/pure/concurrency')
-rw-r--r-- | lib/pure/concurrency/threadpool.nim | 124 |
1 files changed, 67 insertions, 57 deletions
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim index 92d5011f4..8129d03ae 100644 --- a/lib/pure/concurrency/threadpool.nim +++ b/lib/pure/concurrency/threadpool.nim @@ -85,25 +85,26 @@ type cv: CondVar idx: int - RawPromise* = ptr RawPromiseObj ## untyped base class for 'Promise[T]' - RawPromiseObj {.inheritable.} = object # \ - # we allocate this with the thread local allocator; this - # is possible since we already need to do the GC_unref - # on the owning thread + RawPromise* = ref RawPromiseObj ## untyped base class for 'Promise[T]' + RawPromiseObj = object of TObject ready, usesCondVar: bool cv: CondVar #\ # for 'awaitAny' support ai: ptr AwaitInfo idx: int - data: PObject # we incRef and unref it to keep it alive - owner: ptr Worker - next: RawPromise - align: float64 # a float for proper alignment + data: pointer # we incRef and unref it to keep it alive + owner: pointer # ptr Worker - Promise* {.compilerProc.} [T] = ptr object of RawPromiseObj - blob: T ## the underlying value, if available. Note that usually - ## you should not access this field directly! However it can - ## sometimes be more efficient than getting the value via ``^``. + PromiseObj[T] = object of RawPromiseObj + blob: T + + Promise*{.compilerProc.}[T] = ref PromiseObj[T] + + ToFreeQueue = object + len: int + lock: TLock + empty: TCond + data: array[512, pointer] WorkerProc = proc (thread, args: pointer) {.nimcall, gcsafe.} Worker = object @@ -115,37 +116,55 @@ type ready: bool # put it here for correct alignment! initialized: bool # whether it has even been initialized shutdown: bool # the pool requests to shut down this worker thread - promiseLock: TLock - head: RawPromise + q: ToFreeQueue + +proc await*(prom: RawPromise) = + ## waits until the value for the promise arrives. Usually it is not necessary + ## to call this explicitly. + if prom.usesCondVar: + prom.usesCondVar = false + await(prom.cv) + destroyCondVar(prom.cv) -proc finished*(prom: RawPromise) = - ## This MUST be called for every created promise to free its associated - ## resources. Note that the default reading operation ``^`` is destructive - ## and calls ``finished``. +proc finished(prom: RawPromise) = doAssert prom.ai.isNil, "promise is still attached to an 'awaitAny'" - assert prom.next == nil - let w = prom.owner - acquire(w.promiseLock) - prom.next = w.head - w.head = prom - release(w.promiseLock) + # we have to protect against the rare cases where the owner of the promise + # simply disregards the promise and yet the "promiser" has not yet written + # anything to it: + await(prom) + if prom.data.isNil: return + let owner = cast[ptr Worker](prom.owner) + let q = addr(owner.q) + var waited = false + while true: + acquire(q.lock) + if q.len < q.data.len: + q.data[q.len] = prom.data + inc q.len + release(q.lock) + break + else: + # the queue is exhausted! We block until it has been cleaned: + release(q.lock) + wait(q.empty, q.lock) + waited = true + prom.data = nil + # wakeup other potentially waiting threads: + if waited: signal(q.empty) proc cleanPromises(w: ptr Worker) = - var it = w.head - acquire(w.promiseLock) - while it != nil: - let nxt = it.next - if it.usesCondVar: destroyCondVar(it.cv) - if it.data != nil: GC_unref(it.data) - dealloc(it) - it = nxt - w.head = nil - release(w.promiseLock) - -proc nimCreatePromise(owner: pointer; blobSize: int): RawPromise {. - compilerProc.} = - result = cast[RawPromise](alloc0(RawPromiseObj.sizeof + blobSize)) - result.owner = cast[ptr Worker](owner) + let q = addr(w.q) + acquire(q.lock) + for i in 0 .. <q.len: + GC_unref(cast[PObject](q.data[i])) + q.len = 0 + release(q.lock) + signal(q.empty) + +proc promFinalizer[T](prom: Promise[T]) = finished(prom) + +proc nimCreatePromise[T](): Promise[T] {.compilerProc.} = + new(result, promFinalizer) proc nimPromiseCreateCondVar(prom: RawPromise) {.compilerProc.} = prom.cv = createCondVar() @@ -160,16 +179,12 @@ proc nimPromiseSignal(prom: RawPromise) {.compilerProc.} = signal(prom.ai.cv.c) if prom.usesCondVar: signal(prom.cv) -proc await*[T](prom: Promise[T]) = - ## waits until the value for the promise arrives. - if prom.usesCondVar: await(prom.cv) - proc awaitAndThen*[T](prom: Promise[T]; action: proc (x: T) {.closure.}) = ## blocks until the ``prom`` is available and then passes its value ## to ``action``. Note that due to Nimrod's parameter passing semantics this ## means that ``T`` doesn't need to be copied and so ``awaitAndThen`` can ## sometimes be more efficient than ``^``. - if prom.usesCondVar: await(prom) + await(prom) when T is string or T is seq: action(cast[T](prom.data)) elif T is ref: @@ -179,23 +194,17 @@ proc awaitAndThen*[T](prom: Promise[T]; action: proc (x: T) {.closure.}) = finished(prom) proc `^`*[T](prom: Promise[ref T]): foreign ptr T = - ## blocks until the value is available and then returns this value. Note - ## this reading is destructive for reasons of efficiency and convenience. - ## This calls ``finished(prom)``. - if prom.usesCondVar: await(prom) + ## blocks until the value is available and then returns this value. + await(prom) result = cast[foreign ptr T](prom.data) - finished(prom) proc `^`*[T](prom: Promise[T]): T = - ## blocks until the value is available and then returns this value. Note - ## this reading is destructive for reasons of efficiency and convenience. - ## This calls ``finished(prom)``. - if prom.usesCondVar: await(prom) + ## blocks until the value is available and then returns this value. + await(prom) when T is string or T is seq: result = cast[T](prom.data) else: result = prom.blob - finished(prom) proc awaitAny*(promises: openArray[RawPromise]): int = # awaits any of the given promises. Returns the index of one promise for which @@ -245,7 +254,7 @@ proc slave(w: ptr Worker) {.thread.} = await(w.taskArrived) assert(not w.ready) w.f(w, w.data) - if w.head != nil: w.cleanPromises + if w.q.len != 0: w.cleanPromises if w.shutdown: w.shutdown = false atomicDec currentPoolSize @@ -266,8 +275,9 @@ var proc activateThread(i: int) {.noinline.} = workersData[i].taskArrived = createCondVar() workersData[i].taskStarted = createCondVar() - initLock workersData[i].promiseLock workersData[i].initialized = true + initCond(workersData[i].q.empty) + initLock(workersData[i].q.lock) createThread(workers[i], slave, addr(workersData[i])) proc setup() = |