summary refs log tree commit diff stats
path: root/lib/pure/concurrency
diff options
context:
space:
mode:
authorAraq <rumpf_a@web.de>2015-05-28 12:42:04 +0200
committerAraq <rumpf_a@web.de>2015-05-28 12:42:04 +0200
commit94f54700c4bfcc04966e0ea011960c5432b29092 (patch)
tree44dc92a1f7a570dd87a7a366bdd2e3fbb9030902 /lib/pure/concurrency
parent75ccdebd1e8ea9ffd81815716c2593cfbc9820c0 (diff)
downloadNim-94f54700c4bfcc04966e0ea011960c5432b29092.tar.gz
first implementation of pinnedSpawn
Diffstat (limited to 'lib/pure/concurrency')
-rw-r--r--lib/pure/concurrency/threadpool.nim56
1 files changed, 52 insertions, 4 deletions
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim
index 9f1e53fb8..10117183a 100644
--- a/lib/pure/concurrency/threadpool.nim
+++ b/lib/pure/concurrency/threadpool.nim
@@ -267,6 +267,10 @@ proc nimArgsPassingDone(p: pointer) {.compilerProc.} =
 const
   MaxThreadPoolSize* = 256 ## maximal size of the thread pool. 256 threads
                            ## should be good enough for anybody ;-)
+  MaxDistinguishedThread* = 32 ## maximal number of "distinguished" threads.
+
+type
+  ThreadId* = range[0..MaxDistinguishedThread-1]
 
 var
   currentPoolSize: int
@@ -291,10 +295,24 @@ proc slave(w: ptr Worker) {.thread.} =
       w.shutdown = false
       atomicDec currentPoolSize
 
+proc distinguishedSlave(w: ptr Worker) {.thread.} =
+  while true:
+    when declared(atomicStoreN):
+      atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST)
+    else:
+      w.ready = true
+    await(w.taskArrived)
+    assert(not w.ready)
+    w.f(w, w.data)
+    if w.q.len != 0: w.cleanFlowVars
+
 var
   workers: array[MaxThreadPoolSize, TThread[ptr Worker]]
   workersData: array[MaxThreadPoolSize, Worker]
 
+  distinguished: array[MaxDistinguishedThread, TThread[ptr Worker]]
+  distinguishedData: array[MaxDistinguishedThread, Worker]
+
 proc setMinPoolSize*(size: range[1..MaxThreadPoolSize]) =
   ## sets the minimal thread pool size. The default value of this is 4.
   minPoolSize = size
@@ -308,7 +326,7 @@ proc setMaxPoolSize*(size: range[1..MaxThreadPoolSize]) =
       let w = addr(workersData[i])
       w.shutdown = true
 
-proc activateThread(i: int) {.noinline.} =
+proc activateWorkerThread(i: int) {.noinline.} =
   workersData[i].taskArrived = createSemaphore()
   workersData[i].taskStarted = createSemaphore()
   workersData[i].initialized = true
@@ -316,10 +334,18 @@ proc activateThread(i: int) {.noinline.} =
   initLock(workersData[i].q.lock)
   createThread(workers[i], slave, addr(workersData[i]))
 
+proc activateDistinguishedThread(i: int) {.noinline.} =
+  distinguishedData[i].taskArrived = createSemaphore()
+  distinguishedData[i].taskStarted = createSemaphore()
+  distinguishedData[i].initialized = true
+  distinguishedData[i].q.empty = createSemaphore()
+  initLock(distinguishedData[i].q.lock)
+  createThread(distinguished[i], distinguishedSlave, addr(distinguishedData[i]))
+
 proc setup() =
   currentPoolSize = min(countProcessors(), MaxThreadPoolSize)
   readyWorker = addr(workersData[0])
-  for i in 0.. <currentPoolSize: activateThread(i)
+  for i in 0.. <currentPoolSize: activateWorkerThread(i)
 
 proc preferSpawn*(): bool =
   ## Use this proc to determine quickly if a 'spawn' or a direct call is
@@ -333,6 +359,13 @@ proc spawn*(call: expr): expr {.magic: "Spawn".}
   ## is gcsafe and has a return type that is either 'void' or compatible
   ## with ``FlowVar[T]``.
 
+proc pinnedSpawn*(id: ThreadId; call: expr): expr {.magic: "Spawn".}
+  ## always spawns a new task on the worker thread with ``id``, so that
+  ## the 'call' is **always** executed on
+  ## the this thread. 'call' has to be proc call 'p(...)' where 'p'
+  ## is gcsafe and has a return type that is either 'void' or compatible
+  ## with ``FlowVar[T]``.
+
 template spawnX*(call: expr): expr =
   ## spawns a new task if a CPU core is ready, otherwise executes the
   ## call in the calling thread. Usually it is advised to
@@ -353,7 +386,7 @@ var
 
 initLock stateLock
 
-proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} =
+proc nimSpawn3(fn: WorkerProc; data: pointer) {.compilerProc.} =
   # implementation of 'spawn' that is used by the code generator.
   while true:
     if selectWorker(readyWorker, fn, data): return
@@ -370,7 +403,7 @@ proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} =
         of doCreateThread:
           if currentPoolSize < maxPoolSize:
             if not workersData[currentPoolSize].initialized:
-              activateThread(currentPoolSize)
+              activateWorkerThread(currentPoolSize)
             let w = addr(workersData[currentPoolSize])
             atomicInc currentPoolSize
             if selectWorker(w, fn, data):
@@ -387,6 +420,21 @@ proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} =
       # other thread succeeded, so we don't need to do anything here.
     await(gSomeReady)
 
+var
+  distinguishedLock: TLock
+
+initLock distinguishedLock
+
+proc nimSpawn4(fn: WorkerProc; data: pointer; id: ThreadId) {.compilerProc.} =
+  acquire(distinguishedLock)
+  if not distinguishedData[id].initialized:
+    activateDistinguishedThread(id)
+  while true:
+    if selectWorker(addr(distinguishedData[id]), fn, data): break
+    cpuRelax()
+    # XXX exponential backoff?
+  release(distinguishedLock)
+
 proc sync*() =
   ## a simple barrier to wait for all spawn'ed tasks. If you need more elaborate
   ## waiting, you have to use an explicit barrier.