summary refs log tree commit diff stats
path: root/lib/pure/concurrency
diff options
context:
space:
mode:
authorAraq <rumpf_a@web.de>2014-06-05 08:46:29 +0200
committerAraq <rumpf_a@web.de>2014-06-05 08:46:29 +0200
commit2de99653d002b919c88322219bff6f33653081c5 (patch)
tree6a68a2f8758fb3860b8d59b0410779bb918c14eb /lib/pure/concurrency
parenta4323b06b321b77ea36bf738efdfa481faf9822c (diff)
downloadNim-2de99653d002b919c88322219bff6f33653081c5.tar.gz
Promises are now refs
Diffstat (limited to 'lib/pure/concurrency')
-rw-r--r--lib/pure/concurrency/threadpool.nim124
1 files changed, 67 insertions, 57 deletions
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim
index 92d5011f4..8129d03ae 100644
--- a/lib/pure/concurrency/threadpool.nim
+++ b/lib/pure/concurrency/threadpool.nim
@@ -85,25 +85,26 @@ type
     cv: CondVar
     idx: int
 
-  RawPromise* = ptr RawPromiseObj ## untyped base class for 'Promise[T]'
-  RawPromiseObj {.inheritable.} = object # \
-    # we allocate this with the thread local allocator; this
-    # is possible since we already need to do the GC_unref
-    # on the owning thread
+  RawPromise* = ref RawPromiseObj ## untyped base class for 'Promise[T]'
+  RawPromiseObj = object of TObject
     ready, usesCondVar: bool
     cv: CondVar #\
     # for 'awaitAny' support
     ai: ptr AwaitInfo
     idx: int
-    data: PObject  # we incRef and unref it to keep it alive
-    owner: ptr Worker
-    next: RawPromise
-    align: float64 # a float for proper alignment
+    data: pointer  # we incRef and unref it to keep it alive
+    owner: pointer # ptr Worker
 
-  Promise* {.compilerProc.} [T] = ptr object of RawPromiseObj
-    blob: T  ## the underlying value, if available. Note that usually
-             ## you should not access this field directly! However it can
-             ## sometimes be more efficient than getting the value via ``^``.
+  PromiseObj[T] = object of RawPromiseObj
+    blob: T
+
+  Promise*{.compilerProc.}[T] = ref PromiseObj[T]
+
+  ToFreeQueue = object
+    len: int
+    lock: TLock
+    empty: TCond
+    data: array[512, pointer]
 
   WorkerProc = proc (thread, args: pointer) {.nimcall, gcsafe.}
   Worker = object
@@ -115,37 +116,55 @@ type
     ready: bool # put it here for correct alignment!
     initialized: bool # whether it has even been initialized
     shutdown: bool # the pool requests to shut down this worker thread
-    promiseLock: TLock
-    head: RawPromise
+    q: ToFreeQueue
+
+proc await*(prom: RawPromise) =
+  ## waits until the value for the promise arrives. Usually it is not necessary
+  ## to call this explicitly.
+  if prom.usesCondVar:
+    prom.usesCondVar = false
+    await(prom.cv)
+    destroyCondVar(prom.cv)
 
-proc finished*(prom: RawPromise) =
-  ## This MUST be called for every created promise to free its associated
-  ## resources. Note that the default reading operation ``^`` is destructive
-  ## and calls ``finished``.
+proc finished(prom: RawPromise) =
   doAssert prom.ai.isNil, "promise is still attached to an 'awaitAny'"
-  assert prom.next == nil
-  let w = prom.owner
-  acquire(w.promiseLock)
-  prom.next = w.head
-  w.head = prom
-  release(w.promiseLock)
+  # we have to protect against the rare cases where the owner of the promise
+  # simply disregards the promise and yet the "promiser" has not yet written
+  # anything to it:
+  await(prom)
+  if prom.data.isNil: return
+  let owner = cast[ptr Worker](prom.owner)
+  let q = addr(owner.q)
+  var waited = false
+  while true:
+    acquire(q.lock)
+    if q.len < q.data.len:
+      q.data[q.len] = prom.data
+      inc q.len
+      release(q.lock)
+      break
+    else:
+      # the queue is exhausted! We block until it has been cleaned:
+      release(q.lock)
+      wait(q.empty, q.lock)
+      waited = true
+  prom.data = nil
+  # wakeup other potentially waiting threads:
+  if waited: signal(q.empty)
 
 proc cleanPromises(w: ptr Worker) =
-  var it = w.head
-  acquire(w.promiseLock)
-  while it != nil:
-    let nxt = it.next
-    if it.usesCondVar: destroyCondVar(it.cv)
-    if it.data != nil: GC_unref(it.data)
-    dealloc(it)
-    it = nxt
-  w.head = nil
-  release(w.promiseLock)
-
-proc nimCreatePromise(owner: pointer; blobSize: int): RawPromise {.
-                     compilerProc.} =
-  result = cast[RawPromise](alloc0(RawPromiseObj.sizeof + blobSize))
-  result.owner = cast[ptr Worker](owner)
+  let q = addr(w.q)
+  acquire(q.lock)
+  for i in 0 .. <q.len:
+    GC_unref(cast[PObject](q.data[i]))
+  q.len = 0
+  release(q.lock)
+  signal(q.empty)
+
+proc promFinalizer[T](prom: Promise[T]) = finished(prom)
+
+proc nimCreatePromise[T](): Promise[T] {.compilerProc.} =
+  new(result, promFinalizer)
 
 proc nimPromiseCreateCondVar(prom: RawPromise) {.compilerProc.} =
   prom.cv = createCondVar()
@@ -160,16 +179,12 @@ proc nimPromiseSignal(prom: RawPromise) {.compilerProc.} =
     signal(prom.ai.cv.c)
   if prom.usesCondVar: signal(prom.cv)
 
-proc await*[T](prom: Promise[T]) =
-  ## waits until the value for the promise arrives.
-  if prom.usesCondVar: await(prom.cv)
-
 proc awaitAndThen*[T](prom: Promise[T]; action: proc (x: T) {.closure.}) =
   ## blocks until the ``prom`` is available and then passes its value
   ## to ``action``. Note that due to Nimrod's parameter passing semantics this
   ## means that ``T`` doesn't need to be copied and so ``awaitAndThen`` can
   ## sometimes be more efficient than ``^``.
-  if prom.usesCondVar: await(prom)
+  await(prom)
   when T is string or T is seq:
     action(cast[T](prom.data))
   elif T is ref:
@@ -179,23 +194,17 @@ proc awaitAndThen*[T](prom: Promise[T]; action: proc (x: T) {.closure.}) =
   finished(prom)
 
 proc `^`*[T](prom: Promise[ref T]): foreign ptr T =
-  ## blocks until the value is available and then returns this value. Note
-  ## this reading is destructive for reasons of efficiency and convenience.
-  ## This calls ``finished(prom)``.
-  if prom.usesCondVar: await(prom)
+  ## blocks until the value is available and then returns this value.
+  await(prom)
   result = cast[foreign ptr T](prom.data)
-  finished(prom)
 
 proc `^`*[T](prom: Promise[T]): T =
-  ## blocks until the value is available and then returns this value. Note
-  ## this reading is destructive for reasons of efficiency and convenience.
-  ## This calls ``finished(prom)``.
-  if prom.usesCondVar: await(prom)
+  ## blocks until the value is available and then returns this value.
+  await(prom)
   when T is string or T is seq:
     result = cast[T](prom.data)
   else:
     result = prom.blob
-  finished(prom)
 
 proc awaitAny*(promises: openArray[RawPromise]): int =
   # awaits any of the given promises. Returns the index of one promise for which
@@ -245,7 +254,7 @@ proc slave(w: ptr Worker) {.thread.} =
     await(w.taskArrived)
     assert(not w.ready)
     w.f(w, w.data)
-    if w.head != nil: w.cleanPromises
+    if w.q.len != 0: w.cleanPromises
     if w.shutdown:
       w.shutdown = false
       atomicDec currentPoolSize
@@ -266,8 +275,9 @@ var
 proc activateThread(i: int) {.noinline.} =
   workersData[i].taskArrived = createCondVar()
   workersData[i].taskStarted = createCondVar()
-  initLock workersData[i].promiseLock
   workersData[i].initialized = true
+  initCond(workersData[i].q.empty)
+  initLock(workersData[i].q.lock)
   createThread(workers[i], slave, addr(workersData[i]))
 
 proc setup() =