summary refs log tree commit diff stats
path: root/lib/pure/concurrency
diff options
context:
space:
mode:
authorAraq <rumpf_a@web.de>2015-06-30 12:50:24 +0200
committerAraq <rumpf_a@web.de>2015-06-30 12:50:24 +0200
commit28de800d6148065fd3e6344f7255e793298be399 (patch)
tree8aec27c13cd99be3c9e3d3abef45e1d183db996b /lib/pure/concurrency
parent4cfe216a776ffef61380c1c5f2d61aff7315c122 (diff)
parent3312d49a489e50e5c5f2275f7c0e400208eb8a5d (diff)
downloadNim-28de800d6148065fd3e6344f7255e793298be399.tar.gz
Merge branch 'more_concurrency' into devel
Conflicts:
	doc/tut1.txt
	lib/core/locks.nim
	lib/pure/collections/tables.nim
	lib/pure/selectors.nim
Diffstat (limited to 'lib/pure/concurrency')
-rw-r--r--lib/pure/concurrency/threadpool.nim59
1 files changed, 55 insertions, 4 deletions
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim
index 247aa377c..7c9d8adfd 100644
--- a/lib/pure/concurrency/threadpool.nim
+++ b/lib/pure/concurrency/threadpool.nim
@@ -128,6 +128,7 @@ type
     initialized: bool # whether it has even been initialized
     shutdown: bool # the pool requests to shut down this worker thread
     q: ToFreeQueue
+    readyForTask: Semaphore
 
 proc await*(fv: FlowVarBase) =
   ## waits until the value for the flowVar arrives. Usually it is not necessary
@@ -273,6 +274,10 @@ proc nimArgsPassingDone(p: pointer) {.compilerProc.} =
 const
   MaxThreadPoolSize* = 256 ## maximal size of the thread pool. 256 threads
                            ## should be good enough for anybody ;-)
+  MaxDistinguishedThread* = 32 ## maximal number of "distinguished" threads.
+
+type
+  ThreadId* = range[0..MaxDistinguishedThread-1]
 
 var
   currentPoolSize: int
@@ -298,10 +303,25 @@ proc slave(w: ptr Worker) {.thread.} =
       w.shutdown = false
       atomicDec currentPoolSize
 
+proc distinguishedSlave(w: ptr Worker) {.thread.} =
+  while true:
+    when declared(atomicStoreN):
+      atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST)
+    else:
+      w.ready = true
+    signal(w.readyForTask)
+    await(w.taskArrived)
+    assert(not w.ready)
+    w.f(w, w.data)
+    if w.q.len != 0: w.cleanFlowVars
+
 var
   workers: array[MaxThreadPoolSize, TThread[ptr Worker]]
   workersData: array[MaxThreadPoolSize, Worker]
 
+  distinguished: array[MaxDistinguishedThread, TThread[ptr Worker]]
+  distinguishedData: array[MaxDistinguishedThread, Worker]
+
 proc setMinPoolSize*(size: range[1..MaxThreadPoolSize]) =
   ## sets the minimal thread pool size. The default value of this is 4.
   minPoolSize = size
@@ -315,7 +335,7 @@ proc setMaxPoolSize*(size: range[1..MaxThreadPoolSize]) =
       let w = addr(workersData[i])
       w.shutdown = true
 
-proc activateThread(i: int) {.noinline.} =
+proc activateWorkerThread(i: int) {.noinline.} =
   workersData[i].taskArrived = createSemaphore()
   workersData[i].taskStarted = createSemaphore()
   workersData[i].initialized = true
@@ -323,10 +343,19 @@ proc activateThread(i: int) {.noinline.} =
   initLock(workersData[i].q.lock)
   createThread(workers[i], slave, addr(workersData[i]))
 
+proc activateDistinguishedThread(i: int) {.noinline.} =
+  distinguishedData[i].taskArrived = createSemaphore()
+  distinguishedData[i].taskStarted = createSemaphore()
+  distinguishedData[i].initialized = true
+  distinguishedData[i].q.empty = createSemaphore()
+  initLock(distinguishedData[i].q.lock)
+  distinguishedData[i].readyForTask = createSemaphore()
+  createThread(distinguished[i], distinguishedSlave, addr(distinguishedData[i]))
+
 proc setup() =
   currentPoolSize = min(countProcessors(), MaxThreadPoolSize)
   readyWorker = addr(workersData[0])
-  for i in 0.. <currentPoolSize: activateThread(i)
+  for i in 0.. <currentPoolSize: activateWorkerThread(i)
 
 proc preferSpawn*(): bool =
   ## Use this proc to determine quickly if a 'spawn' or a direct call is
@@ -340,6 +369,13 @@ proc spawn*(call: expr): expr {.magic: "Spawn".}
   ## is gcsafe and has a return type that is either 'void' or compatible
   ## with ``FlowVar[T]``.
 
+proc pinnedSpawn*(id: ThreadId; call: expr): expr {.magic: "Spawn".}
+  ## always spawns a new task on the worker thread with ``id``, so that
+  ## the 'call' is **always** executed on
+  ## the this thread. 'call' has to be proc call 'p(...)' where 'p'
+  ## is gcsafe and has a return type that is either 'void' or compatible
+  ## with ``FlowVar[T]``.
+
 template spawnX*(call: expr): expr =
   ## spawns a new task if a CPU core is ready, otherwise executes the
   ## call in the calling thread. Usually it is advised to
@@ -360,7 +396,7 @@ var
 
 initLock stateLock
 
-proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} =
+proc nimSpawn3(fn: WorkerProc; data: pointer) {.compilerProc.} =
   # implementation of 'spawn' that is used by the code generator.
   while true:
     if selectWorker(readyWorker, fn, data): return
@@ -377,7 +413,7 @@ proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} =
         of doCreateThread:
           if currentPoolSize < maxPoolSize:
             if not workersData[currentPoolSize].initialized:
-              activateThread(currentPoolSize)
+              activateWorkerThread(currentPoolSize)
             let w = addr(workersData[currentPoolSize])
             atomicInc currentPoolSize
             if selectWorker(w, fn, data):
@@ -394,6 +430,21 @@ proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} =
       # other thread succeeded, so we don't need to do anything here.
     await(gSomeReady)
 
+var
+  distinguishedLock: TLock
+
+initLock distinguishedLock
+
+proc nimSpawn4(fn: WorkerProc; data: pointer; id: ThreadId) {.compilerProc.} =
+  acquire(distinguishedLock)
+  if not distinguishedData[id].initialized:
+    activateDistinguishedThread(id)
+  release(distinguishedLock)
+  while true:
+    if selectWorker(addr(distinguishedData[id]), fn, data): break
+    await(distinguishedData[id].readyForTask)
+
+
 proc sync*() =
   ## a simple barrier to wait for all spawn'ed tasks. If you need more elaborate
   ## waiting, you have to use an explicit barrier.