summary refs log tree commit diff stats
path: root/lib/pure/concurrency
diff options
context:
space:
mode:
authorAraq <rumpf_a@web.de>2014-05-22 08:41:50 +0200
committerAraq <rumpf_a@web.de>2014-05-22 08:41:50 +0200
commit417b9f5a1d13f26842b1337395a0f5b57827cc12 (patch)
tree5f9f101521b06bd6eed880507b7103f70f46139c /lib/pure/concurrency
parent31b8fd66b1bd54b665e52855909538a50d33d7c3 (diff)
downloadNim-417b9f5a1d13f26842b1337395a0f5b57827cc12.tar.gz
'parallel' statement almost working
Diffstat (limited to 'lib/pure/concurrency')
-rw-r--r--lib/pure/concurrency/threadpool.nim112
1 files changed, 112 insertions, 0 deletions
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim
index 86819d25a..583c60c66 100644
--- a/lib/pure/concurrency/threadpool.nim
+++ b/lib/pure/concurrency/threadpool.nim
@@ -65,6 +65,30 @@ proc closeBarrier*(b: ptr Barrier) {.compilerProc.} =
 # ----------------------------------------------------------------------------
 
 type
+  AwaitInfo = object
+    cv: CondVar
+    idx: int
+
+  RawFuture* = ptr RawFutureObj ## untyped base class for 'Future[T]'
+  RawFutureObj {.inheritable.} = object # \
+    # we allocate this with the thread local allocator; this
+    # is possible since we already need to do the GC_unref
+    # on the owning thread
+    ready, usesCondVar: bool
+    cv: CondVar #\
+    # for 'awaitAny' support
+    ai: ptr AwaitInfo
+    idx: int
+    data: PObject  # we incRef and unref it to keep it alive
+    owner: ptr Worker
+    next: RawFuture
+    align: float64 # a float for proper alignment
+
+  Future* {.compilerProc.} [T] = ptr object of RawFutureObj
+    blob: T  ## the underlying value, if available. Note that usually
+             ## you should not access this field directly! However it can
+             ## sometimes be more efficient than getting the value via ``^``.
+
   WorkerProc = proc (thread, args: pointer) {.nimcall, gcsafe.}
   Worker = object
     taskArrived: CondVar
@@ -75,6 +99,92 @@ type
     ready: bool # put it here for correct alignment!
     initialized: bool # whether it has even been initialized
     shutdown: bool # the pool requests to shut down this worker thread
+    futureLock: TLock
+    head: RawFuture
+
+proc finished*(fut: RawFuture) =
+  ## This MUST be called for every created future to free its associated
+  ## resources. Note that the default reading operation ``^`` is destructive
+  ## and calls ``finished``.
+  doAssert fut.ai.isNil, "future is still attached to an 'awaitAny'"
+  assert fut.next == nil
+  let w = fut.owner
+  acquire(w.futureLock)
+  fut.next = w.head
+  w.head = fut
+  release(w.futureLock)
+
+proc cleanFutures(w: ptr Worker) =
+  var it = w.head
+  acquire(w.futureLock)
+  while it != nil:
+    let nxt = it.next
+    if it.usesCondVar: destroyCondVar(it.cv)
+    if it.data != nil: GC_unref(it.data)
+    dealloc(it)
+    it = nxt
+  w.head = nil
+  release(w.futureLock)
+
+proc nimCreateFuture(owner: pointer; blobSize: int): RawFuture {.
+                     compilerProc.} =
+  result = cast[RawFuture](alloc0(RawFutureObj.sizeof + blobSize))
+  result.owner = cast[ptr Worker](owner)
+
+proc nimFutureCreateCondVar(fut: RawFuture) {.compilerProc.} =
+  fut.cv = createCondVar()
+  fut.usesCondVar = true
+
+proc nimFutureSignal(fut: RawFuture) {.compilerProc.} =
+  assert fut.usesCondVar
+  signal(fut.cv)
+
+proc await*[T](fut: Future[T]) =
+  ## waits until the value for the future arrives.
+  if fut.usesCondVar: await(fut.cv)
+
+proc `^`*[T](fut: Future[T]): T =
+  ## blocks until the value is available and then returns this value. Note
+  ## this reading is destructive for reasons of efficiency and convenience.
+  ## This calls ``finished(fut)``.
+  await(fut)
+  when T is string or T is seq or T is ref:
+    result = cast[T](fut.data)
+  else:
+    result = fut.payload
+  finished(fut)
+
+proc notify*(fut: RawFuture) {.compilerproc.} =
+  if fut.ai != nil:
+    acquire(fut.ai.cv.L)
+    fut.ai.idx = fut.idx
+    inc fut.ai.cv.counter
+    release(fut.ai.cv.L)
+    signal(fut.ai.cv.c)
+  if fut.usesCondVar: signal(fut.cv)
+
+proc awaitAny*(futures: openArray[RawFuture]): int =
+  # awaits any of the given futures. Returns the index of one future for which
+  ## a value arrived. A future only supports one call to 'awaitAny' at the
+  ## same time. That means if you await([a,b]) and await([b,c]) the second
+  ## call will only await 'c'. If there is no future left to be able to wait
+  ## on, -1 is returned.
+  var ai: AwaitInfo
+  ai.cv = createCondVar()
+  var conflicts = 0
+  for i in 0 .. futures.high:
+    if cas(addr futures[i].ai, nil, addr ai):
+      futures[i].idx = i
+    else:
+      inc conflicts
+  if conflicts < futures.len:
+    await(ai.cv)
+    result = ai.idx
+    for i in 0 .. futures.high:
+      discard cas(addr futures[i].ai, addr ai, nil)
+  else:
+    result = -1
+  destroyCondVar(ai.cv)
 
 proc nimArgsPassingDone(p: pointer) {.compilerProc.} =
   let w = cast[ptr Worker](p)
@@ -99,6 +209,7 @@ proc slave(w: ptr Worker) {.thread.} =
     await(w.taskArrived)
     assert(not w.ready)
     w.f(w, w.data)
+    if w.head != nil: w.cleanFutures
     if w.shutdown:
       w.shutdown = false
       atomicDec currentPoolSize
@@ -119,6 +230,7 @@ var
 proc activateThread(i: int) {.noinline.} =
   workersData[i].taskArrived = createCondVar()
   workersData[i].taskStarted = createCondVar()
+  initLock workersData[i].futureLock
   workersData[i].initialized = true
   createThread(workers[i], slave, addr(workersData[i]))