summary refs log tree commit diff stats
path: root/lib/pure/concurrency
diff options
context:
space:
mode:
authorAraq <rumpf_a@web.de>2014-05-25 15:19:46 +0200
committerAraq <rumpf_a@web.de>2014-05-25 15:19:46 +0200
commit030eac86c05427792d3c3c00b56fbe764d783a40 (patch)
treeed98967d7606604b9b79ee9fd6d0aaf03920a444 /lib/pure/concurrency
parentd2dbcf2fa44aa76c6c7ed2c07641560640e6bc6b (diff)
downloadNim-030eac86c05427792d3c3c00b56fbe764d783a40.tar.gz
bugfix: regionized pointers in a generic context; renamed 'Future' to 'Promise'
Diffstat (limited to 'lib/pure/concurrency')
-rw-r--r--lib/pure/concurrency/threadpool.nim134
1 files changed, 79 insertions, 55 deletions
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim
index 41c1adca0..24cb9ccdd 100644
--- a/lib/pure/concurrency/threadpool.nim
+++ b/lib/pure/concurrency/threadpool.nim
@@ -65,12 +65,14 @@ proc closeBarrier*(b: ptr Barrier) {.compilerProc.} =
 # ----------------------------------------------------------------------------
 
 type
+  foreign* = object ## a region that indicates the pointer comes from a
+                    ## foreign thread heap.
   AwaitInfo = object
     cv: CondVar
     idx: int
 
-  RawFuture* = ptr RawFutureObj ## untyped base class for 'Future[T]'
-  RawFutureObj {.inheritable.} = object # \
+  RawPromise* = ptr RawPromiseObj ## untyped base class for 'Promise[T]'
+  RawPromiseObj {.inheritable.} = object # \
     # we allocate this with the thread local allocator; this
     # is possible since we already need to do the GC_unref
     # on the owning thread
@@ -81,10 +83,10 @@ type
     idx: int
     data: PObject  # we incRef and unref it to keep it alive
     owner: ptr Worker
-    next: RawFuture
+    next: RawPromise
     align: float64 # a float for proper alignment
 
-  Future* {.compilerProc.} [T] = ptr object of RawFutureObj
+  Promise* {.compilerProc.} [T] = ptr object of RawPromiseObj
     blob: T  ## the underlying value, if available. Note that usually
              ## you should not access this field directly! However it can
              ## sometimes be more efficient than getting the value via ``^``.
@@ -99,24 +101,24 @@ type
     ready: bool # put it here for correct alignment!
     initialized: bool # whether it has even been initialized
     shutdown: bool # the pool requests to shut down this worker thread
-    futureLock: TLock
-    head: RawFuture
+    promiseLock: TLock
+    head: RawPromise
 
-proc finished*(fut: RawFuture) =
-  ## This MUST be called for every created future to free its associated
+proc finished*(prom: RawPromise) =
+  ## This MUST be called for every created promise to free its associated
   ## resources. Note that the default reading operation ``^`` is destructive
   ## and calls ``finished``.
-  doAssert fut.ai.isNil, "future is still attached to an 'awaitAny'"
-  assert fut.next == nil
-  let w = fut.owner
-  acquire(w.futureLock)
-  fut.next = w.head
-  w.head = fut
-  release(w.futureLock)
-
-proc cleanFutures(w: ptr Worker) =
+  doAssert prom.ai.isNil, "promise is still attached to an 'awaitAny'"
+  assert prom.next == nil
+  let w = prom.owner
+  acquire(w.promiseLock)
+  prom.next = w.head
+  w.head = prom
+  release(w.promiseLock)
+
+proc cleanPromises(w: ptr Worker) =
   var it = w.head
-  acquire(w.futureLock)
+  acquire(w.promiseLock)
   while it != nil:
     let nxt = it.next
     if it.usesCondVar: destroyCondVar(it.cv)
@@ -124,62 +126,84 @@ proc cleanFutures(w: ptr Worker) =
     dealloc(it)
     it = nxt
   w.head = nil
-  release(w.futureLock)
+  release(w.promiseLock)
 
-proc nimCreateFuture(owner: pointer; blobSize: int): RawFuture {.
+proc nimCreatePromise(owner: pointer; blobSize: int): RawPromise {.
                      compilerProc.} =
-  result = cast[RawFuture](alloc0(RawFutureObj.sizeof + blobSize))
+  result = cast[RawPromise](alloc0(RawPromiseObj.sizeof + blobSize))
   result.owner = cast[ptr Worker](owner)
 
-proc nimFutureCreateCondVar(fut: RawFuture) {.compilerProc.} =
-  fut.cv = createCondVar()
-  fut.usesCondVar = true
-
-proc nimFutureSignal(fut: RawFuture) {.compilerProc.} =
-  if fut.ai != nil:
-    acquire(fut.ai.cv.L)
-    fut.ai.idx = fut.idx
-    inc fut.ai.cv.counter
-    release(fut.ai.cv.L)
-    signal(fut.ai.cv.c)
-  if fut.usesCondVar: signal(fut.cv)
+proc nimPromiseCreateCondVar(prom: RawPromise) {.compilerProc.} =
+  prom.cv = createCondVar()
+  prom.usesCondVar = true
+
+proc nimPromiseSignal(prom: RawPromise) {.compilerProc.} =
+  if prom.ai != nil:
+    acquire(prom.ai.cv.L)
+    prom.ai.idx = prom.idx
+    inc prom.ai.cv.counter
+    release(prom.ai.cv.L)
+    signal(prom.ai.cv.c)
+  if prom.usesCondVar: signal(prom.cv)
+
+proc await*[T](prom: Promise[T]) =
+  ## waits until the value for the promise arrives.
+  if prom.usesCondVar: await(prom.cv)
+
+proc awaitAndThen*[T](prom: Promise[T]; action: proc (x: T) {.closure.}) =
+  ## blocks until the value is available and then passes this value
+  ## to ``action``. Note that due to Nimrod's parameter passing semantics this
+  ## means that ``T`` doesn't need to be copied and so ``awaitAndThen`` can
+  ## sometimes be more efficient than ``^``.
+  if prom.usesCondVar: await(prom)
+  when T is string or T is seq:
+    action(cast[T](prom.data))
+  elif T is ref:
+    {.error: "'awaitAndThen' not available for Promise[ref]".}
+  else:
+    action(prom.blob)
+  finished(prom)
 
-proc await*[T](fut: Future[T]) =
-  ## waits until the value for the future arrives.
-  if fut.usesCondVar: await(fut.cv)
+proc `^`*[T](prom: Promise[ref T]): foreign ptr T =
+  ## blocks until the value is available and then returns this value. Note
+  ## this reading is destructive for reasons of efficiency and convenience.
+  ## This calls ``finished(prom)``.
+  if prom.usesCondVar: await(prom)
+  result = cast[foreign ptr T](prom.data)
+  finished(prom)
 
-proc `^`*[T](fut: Future[T]): T =
+proc `^`*[T](prom: Promise[T]): T =
   ## blocks until the value is available and then returns this value. Note
   ## this reading is destructive for reasons of efficiency and convenience.
-  ## This calls ``finished(fut)``.
-  if fut.usesCondVar: await(fut)
-  when T is string or T is seq or T is ref:
-    result = cast[T](fut.data)
+  ## This calls ``finished(prom)``.
+  if prom.usesCondVar: await(prom)
+  when T is string or T is seq:
+    result = cast[T](prom.data)
   else:
-    result = fut.blob
-  finished(fut)
+    result = prom.blob
+  finished(prom)
 
-proc awaitAny*(futures: openArray[RawFuture]): int =
-  # awaits any of the given futures. Returns the index of one future for which
-  ## a value arrived. A future only supports one call to 'awaitAny' at the
+proc awaitAny*(promises: openArray[RawPromise]): int =
+  # awaits any of the given promises. Returns the index of one promise for which
+  ## a value arrived. A promise only supports one call to 'awaitAny' at the
   ## same time. That means if you await([a,b]) and await([b,c]) the second
-  ## call will only await 'c'. If there is no future left to be able to wait
+  ## call will only await 'c'. If there is no promise left to be able to wait
   ## on, -1 is returned.
   ## **Note**: This results in non-deterministic behaviour and so should be
   ## avoided.
   var ai: AwaitInfo
   ai.cv = createCondVar()
   var conflicts = 0
-  for i in 0 .. futures.high:
-    if cas(addr futures[i].ai, nil, addr ai):
-      futures[i].idx = i
+  for i in 0 .. promises.high:
+    if cas(addr promises[i].ai, nil, addr ai):
+      promises[i].idx = i
     else:
       inc conflicts
-  if conflicts < futures.len:
+  if conflicts < promises.len:
     await(ai.cv)
     result = ai.idx
-    for i in 0 .. futures.high:
-      discard cas(addr futures[i].ai, addr ai, nil)
+    for i in 0 .. promises.high:
+      discard cas(addr promises[i].ai, addr ai, nil)
   else:
     result = -1
   destroyCondVar(ai.cv)
@@ -207,7 +231,7 @@ proc slave(w: ptr Worker) {.thread.} =
     await(w.taskArrived)
     assert(not w.ready)
     w.f(w, w.data)
-    if w.head != nil: w.cleanFutures
+    if w.head != nil: w.cleanPromises
     if w.shutdown:
       w.shutdown = false
       atomicDec currentPoolSize
@@ -228,7 +252,7 @@ var
 proc activateThread(i: int) {.noinline.} =
   workersData[i].taskArrived = createCondVar()
   workersData[i].taskStarted = createCondVar()
-  initLock workersData[i].futureLock
+  initLock workersData[i].promiseLock
   workersData[i].initialized = true
   createThread(workers[i], slave, addr(workersData[i]))