summary refs log tree commit diff stats
path: root/lib/pure/concurrency
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pure/concurrency')
-rw-r--r--lib/pure/concurrency/cpuinfo.nim2
-rw-r--r--lib/pure/concurrency/cpuload.nim12
-rw-r--r--lib/pure/concurrency/threadpool.nim110
3 files changed, 104 insertions, 20 deletions
diff --git a/lib/pure/concurrency/cpuinfo.nim b/lib/pure/concurrency/cpuinfo.nim
index 6f2bc4491..8c87c77df 100644
--- a/lib/pure/concurrency/cpuinfo.nim
+++ b/lib/pure/concurrency/cpuinfo.nim
@@ -18,7 +18,7 @@ when not defined(windows):
 
 when defined(linux):
   import linux
-  
+
 when defined(freebsd) or defined(macosx):
   {.emit:"#include <sys/types.h>".}
 
diff --git a/lib/pure/concurrency/cpuload.nim b/lib/pure/concurrency/cpuload.nim
index c1796089a..22598b5c9 100644
--- a/lib/pure/concurrency/cpuload.nim
+++ b/lib/pure/concurrency/cpuload.nim
@@ -13,7 +13,7 @@
 when defined(windows):
   import winlean, os, strutils, math
 
-  proc `-`(a, b: TFILETIME): int64 = a.rdFileTime - b.rdFileTime
+  proc `-`(a, b: FILETIME): int64 = a.rdFileTime - b.rdFileTime
 elif defined(linux):
   from cpuinfo import countProcessors
 
@@ -25,16 +25,16 @@ type
 
   ThreadPoolState* = object
     when defined(windows):
-      prevSysKernel, prevSysUser, prevProcKernel, prevProcUser: TFILETIME
+      prevSysKernel, prevSysUser, prevProcKernel, prevProcUser: FILETIME
     calls*: int
 
 proc advice*(s: var ThreadPoolState): ThreadPoolAdvice =
   when defined(windows):
     var
       sysIdle, sysKernel, sysUser,
-        procCreation, procExit, procKernel, procUser: TFILETIME
+        procCreation, procExit, procKernel, procUser: FILETIME
     if getSystemTimes(sysIdle, sysKernel, sysUser) == 0 or
-        getProcessTimes(THandle(-1), procCreation, procExit, 
+        getProcessTimes(Handle(-1), procCreation, procExit,
                         procKernel, procUser) == 0:
       return doNothing
     if s.calls > 0:
@@ -57,7 +57,7 @@ proc advice*(s: var ThreadPoolState): ThreadPoolAdvice =
     s.prevProcKernel = procKernel
     s.prevProcUser = procUser
   elif defined(linux):
-    proc fscanf(c: File, frmt: cstring) {.varargs, importc, 
+    proc fscanf(c: File, frmt: cstring) {.varargs, importc,
       header: "<stdio.h>".}
 
     var f = open("/proc/loadavg")
@@ -78,7 +78,7 @@ proc advice*(s: var ThreadPoolState): ThreadPoolAdvice =
     result = doNothing
   inc s.calls
 
-when isMainModule:
+when not defined(testing) and isMainModule:
   proc busyLoop() =
     while true:
       discard random(80)
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim
index 9f1e53fb8..72e744d52 100644
--- a/lib/pure/concurrency/threadpool.nim
+++ b/lib/pure/concurrency/threadpool.nim
@@ -18,8 +18,8 @@ import cpuinfo, cpuload, locks
 
 type
   Semaphore = object
-    c: TCond
-    L: TLock
+    c: Cond
+    L: Lock
     counter: int
 
 proc createSemaphore(): Semaphore =
@@ -113,7 +113,7 @@ type
 
   ToFreeQueue = object
     len: int
-    lock: TLock
+    lock: Lock
     empty: Semaphore
     data: array[128, pointer]
 
@@ -128,6 +128,7 @@ type
     initialized: bool # whether it has even been initialized
     shutdown: bool # the pool requests to shut down this worker thread
     q: ToFreeQueue
+    readyForTask: Semaphore
 
 proc await*(fv: FlowVarBase) =
   ## waits until the value for the flowVar arrives. Usually it is not necessary
@@ -221,11 +222,17 @@ proc awaitAndThen*[T](fv: FlowVar[T]; action: proc (x: T) {.closure.}) =
     action(fv.blob)
   finished(fv)
 
-proc `^`*[T](fv: FlowVar[ref T]): foreign ptr T =
+proc unsafeRead*[T](fv: FlowVar[ref T]): foreign ptr T =
   ## blocks until the value is available and then returns this value.
   await(fv)
   result = cast[foreign ptr T](fv.data)
 
+proc `^`*[T](fv: FlowVar[ref T]): ref T =
+  ## blocks until the value is available and then returns this value.
+  await(fv)
+  let src = cast[ref T](fv.data)
+  deepCopy result, src
+
 proc `^`*[T](fv: FlowVar[T]): T =
   ## blocks until the value is available and then returns this value.
   await(fv)
@@ -267,6 +274,10 @@ proc nimArgsPassingDone(p: pointer) {.compilerProc.} =
 const
   MaxThreadPoolSize* = 256 ## maximal size of the thread pool. 256 threads
                            ## should be good enough for anybody ;-)
+  MaxDistinguishedThread* = 32 ## maximal number of "distinguished" threads.
+
+type
+  ThreadId* = range[0..MaxDistinguishedThread-1]
 
 var
   currentPoolSize: int
@@ -284,23 +295,43 @@ proc slave(w: ptr Worker) {.thread.} =
     readyWorker = w
     signal(gSomeReady)
     await(w.taskArrived)
-    assert(not w.ready)
+    # XXX Somebody needs to look into this (why does this assertion fail
+    # in Visual Studio?)
+    when not defined(vcc): assert(not w.ready)
     w.f(w, w.data)
     if w.q.len != 0: w.cleanFlowVars
     if w.shutdown:
       w.shutdown = false
       atomicDec currentPoolSize
 
+proc distinguishedSlave(w: ptr Worker) {.thread.} =
+  while true:
+    when declared(atomicStoreN):
+      atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST)
+    else:
+      w.ready = true
+    signal(w.readyForTask)
+    await(w.taskArrived)
+    assert(not w.ready)
+    w.f(w, w.data)
+    if w.q.len != 0: w.cleanFlowVars
+
 var
   workers: array[MaxThreadPoolSize, TThread[ptr Worker]]
   workersData: array[MaxThreadPoolSize, Worker]
 
+  distinguished: array[MaxDistinguishedThread, TThread[ptr Worker]]
+  distinguishedData: array[MaxDistinguishedThread, Worker]
+
+when defined(nimPinToCpu):
+  var gCpus: Natural
+
 proc setMinPoolSize*(size: range[1..MaxThreadPoolSize]) =
   ## sets the minimal thread pool size. The default value of this is 4.
   minPoolSize = size
 
 proc setMaxPoolSize*(size: range[1..MaxThreadPoolSize]) =
-  ## sets the minimal thread pool size. The default value of this
+  ## sets the maximal thread pool size. The default value of this
   ## is ``MaxThreadPoolSize``.
   maxPoolSize = size
   if currentPoolSize > maxPoolSize:
@@ -308,18 +339,37 @@ proc setMaxPoolSize*(size: range[1..MaxThreadPoolSize]) =
       let w = addr(workersData[i])
       w.shutdown = true
 
-proc activateThread(i: int) {.noinline.} =
+when defined(nimRecursiveSpawn):
+  var localThreadId {.threadvar.}: int
+
+proc activateWorkerThread(i: int) {.noinline.} =
   workersData[i].taskArrived = createSemaphore()
   workersData[i].taskStarted = createSemaphore()
   workersData[i].initialized = true
   workersData[i].q.empty = createSemaphore()
   initLock(workersData[i].q.lock)
   createThread(workers[i], slave, addr(workersData[i]))
+  when defined(nimRecursiveSpawn):
+    localThreadId = i+1
+  when defined(nimPinToCpu):
+    if gCpus > 0: pinToCpu(workers[i], i mod gCpus)
+
+proc activateDistinguishedThread(i: int) {.noinline.} =
+  distinguishedData[i].taskArrived = createSemaphore()
+  distinguishedData[i].taskStarted = createSemaphore()
+  distinguishedData[i].initialized = true
+  distinguishedData[i].q.empty = createSemaphore()
+  initLock(distinguishedData[i].q.lock)
+  distinguishedData[i].readyForTask = createSemaphore()
+  createThread(distinguished[i], distinguishedSlave, addr(distinguishedData[i]))
 
 proc setup() =
-  currentPoolSize = min(countProcessors(), MaxThreadPoolSize)
+  let p = countProcessors()
+  when defined(nimPinToCpu):
+    gCpus = p
+  currentPoolSize = min(p, MaxThreadPoolSize)
   readyWorker = addr(workersData[0])
-  for i in 0.. <currentPoolSize: activateThread(i)
+  for i in 0.. <currentPoolSize: activateWorkerThread(i)
 
 proc preferSpawn*(): bool =
   ## Use this proc to determine quickly if a 'spawn' or a direct call is
@@ -333,6 +383,13 @@ proc spawn*(call: expr): expr {.magic: "Spawn".}
   ## is gcsafe and has a return type that is either 'void' or compatible
   ## with ``FlowVar[T]``.
 
+proc pinnedSpawn*(id: ThreadId; call: expr): expr {.magic: "Spawn".}
+  ## always spawns a new task on the worker thread with ``id``, so that
+  ## the 'call' is **always** executed on
+  ## the this thread. 'call' has to be proc call 'p(...)' where 'p'
+  ## is gcsafe and has a return type that is either 'void' or compatible
+  ## with ``FlowVar[T]``.
+
 template spawnX*(call: expr): expr =
   ## spawns a new task if a CPU core is ready, otherwise executes the
   ## call in the calling thread. Usually it is advised to
@@ -349,11 +406,11 @@ proc parallel*(body: stmt) {.magic: "Parallel".}
 
 var
   state: ThreadPoolState
-  stateLock: TLock
+  stateLock: Lock
 
 initLock stateLock
 
-proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} =
+proc nimSpawn3(fn: WorkerProc; data: pointer) {.compilerProc.} =
   # implementation of 'spawn' that is used by the code generator.
   while true:
     if selectWorker(readyWorker, fn, data): return
@@ -370,7 +427,7 @@ proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} =
         of doCreateThread:
           if currentPoolSize < maxPoolSize:
             if not workersData[currentPoolSize].initialized:
-              activateThread(currentPoolSize)
+              activateWorkerThread(currentPoolSize)
             let w = addr(workersData[currentPoolSize])
             atomicInc currentPoolSize
             if selectWorker(w, fn, data):
@@ -385,7 +442,34 @@ proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} =
         release(stateLock)
       # else the acquire failed, but this means some
       # other thread succeeded, so we don't need to do anything here.
-    await(gSomeReady)
+    when defined(nimRecursiveSpawn):
+      if localThreadId > 0:
+        # we are a worker thread, so instead of waiting for something which
+        # might as well never happen (see tparallel_quicksort), we run the task
+        # on the current thread instead.
+        var self = addr(workersData[localThreadId-1])
+        fn(self, data)
+        await(self.taskStarted)
+        return
+      else:
+        await(gSomeReady)
+    else:
+      await(gSomeReady)
+
+var
+  distinguishedLock: TLock
+
+initLock distinguishedLock
+
+proc nimSpawn4(fn: WorkerProc; data: pointer; id: ThreadId) {.compilerProc.} =
+  acquire(distinguishedLock)
+  if not distinguishedData[id].initialized:
+    activateDistinguishedThread(id)
+  release(distinguishedLock)
+  while true:
+    if selectWorker(addr(distinguishedData[id]), fn, data): break
+    await(distinguishedData[id].readyForTask)
+
 
 proc sync*() =
   ## a simple barrier to wait for all spawn'ed tasks. If you need more elaborate