summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--compiler/lowerings.nim13
-rw-r--r--compiler/semexprs.nim16
-rw-r--r--compiler/sempass2.nim5
-rw-r--r--lib/pure/concurrency/threadpool.nim56
4 files changed, 73 insertions, 17 deletions
diff --git a/compiler/lowerings.nim b/compiler/lowerings.nim
index b6b01d558..842bb01f6 100644
--- a/compiler/lowerings.nim
+++ b/compiler/lowerings.nim
@@ -167,7 +167,7 @@ proc genDeref*(n: PNode): PNode =
   result.add n
 
 proc callCodegenProc*(name: string, arg1: PNode;
-                      arg2, arg3: PNode = nil): PNode =
+                      arg2, arg3, optionalArgs: PNode = nil): PNode =
   result = newNodeI(nkCall, arg1.info)
   let sym = magicsys.getCompilerProc(name)
   if sym == nil:
@@ -177,6 +177,9 @@ proc callCodegenProc*(name: string, arg1: PNode;
     result.add arg1
     if arg2 != nil: result.add arg2
     if arg3 != nil: result.add arg3
+    if optionalArgs != nil:
+      for i in 1..optionalArgs.len-3:
+        result.add optionalArgs[i]
     result.typ = sym.typ.sons[0]
 
 proc callProc(a: PNode): PNode =
@@ -483,7 +486,7 @@ proc wrapProcForSpawn*(owner: PSym; spawnExpr: PNode; retType: PType;
                        barrier, dest: PNode = nil): PNode =
   # if 'barrier' != nil, then it is in a 'parallel' section and we
   # generate quite different code
-  let n = spawnExpr[1]
+  let n = spawnExpr[^2]
   let spawnKind = spawnResult(retType, barrier!=nil)
   case spawnKind
   of srVoid:
@@ -569,7 +572,7 @@ proc wrapProcForSpawn*(owner: PSym; spawnExpr: PNode; retType: PType;
     fvField = newDotExpr(scratchObj, field)
     fvAsExpr = indirectAccess(castExpr, field, n.info)
     # create flowVar:
-    result.add newFastAsgnStmt(fvField, callProc(spawnExpr[2]))
+    result.add newFastAsgnStmt(fvField, callProc(spawnExpr[^1]))
     if barrier == nil:
       result.add callCodegenProc("nimFlowVarCreateSemaphore", fvField)
 
@@ -584,7 +587,7 @@ proc wrapProcForSpawn*(owner: PSym; spawnExpr: PNode; retType: PType;
   let wrapper = createWrapperProc(fn, threadParam, argsParam,
                                   varSection, varInit, call,
                                   barrierAsExpr, fvAsExpr, spawnKind)
-  result.add callCodegenProc("nimSpawn", wrapper.newSymNode,
-                             genAddrOf(scratchObj.newSymNode))
+  result.add callCodegenProc("nimSpawn" & $spawnExpr.len, wrapper.newSymNode,
+                             genAddrOf(scratchObj.newSymNode), nil, spawnExpr)
 
   if spawnKind == srFlowVar: result.add fvField
diff --git a/compiler/semexprs.nim b/compiler/semexprs.nim
index cd6ba3753..be1461c3c 100644
--- a/compiler/semexprs.nim
+++ b/compiler/semexprs.nim
@@ -1727,13 +1727,17 @@ proc semMagic(c: PContext, n: PNode, s: PSym, flags: TExprFlags): PNode =
     dec c.inParallelStmt
   of mSpawn:
     result = setMs(n, s)
-    result.sons[1] = semExpr(c, n.sons[1])
-    if not result[1].typ.isEmptyType:
-      if spawnResult(result[1].typ, c.inParallelStmt > 0) == srFlowVar:
-        result.typ = createFlowVar(c, result[1].typ, n.info)
+    for i in 1 .. <n.len:
+      result.sons[i] = semExpr(c, n.sons[i])
+    let typ = result[^1].typ
+    if not typ.isEmptyType:
+      if spawnResult(typ, c.inParallelStmt > 0) == srFlowVar:
+        result.typ = createFlowVar(c, typ, n.info)
       else:
-        result.typ = result[1].typ
-      result.add instantiateCreateFlowVarCall(c, result[1].typ, n.info).newSymNode
+        result.typ = typ
+      result.add instantiateCreateFlowVarCall(c, typ, n.info).newSymNode
+    else:
+      result.add emptyNode
   of mProcCall:
     result = setMs(n, s)
     result.sons[1] = semExpr(c, n.sons[1])
diff --git a/compiler/sempass2.nim b/compiler/sempass2.nim
index adf03be64..517c37b0f 100644
--- a/compiler/sempass2.nim
+++ b/compiler/sempass2.nim
@@ -237,9 +237,10 @@ proc useVar(a: PEffects, n: PNode) =
         message(n.info, warnUninit, s.name.s)
       # prevent superfluous warnings about the same variable:
       a.init.add s.id
-  if {sfGlobal, sfThread} * s.flags == {sfGlobal} and s.kind in {skVar, skLet}:
+  if {sfGlobal, sfThread} * s.flags != {} and s.kind in {skVar, skLet}:
     if s.guard != nil: guardGlobal(a, n, s.guard)
-    if (tfHasGCedMem in s.typ.flags or s.typ.isGCedMem):
+    if {sfGlobal, sfThread} * s.flags == {sfGlobal} and
+        (tfHasGCedMem in s.typ.flags or s.typ.isGCedMem):
       #if warnGcUnsafe in gNotes: warnAboutGcUnsafe(n)
       markGcUnsafe(a, s)
 
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim
index 9f1e53fb8..10117183a 100644
--- a/lib/pure/concurrency/threadpool.nim
+++ b/lib/pure/concurrency/threadpool.nim
@@ -267,6 +267,10 @@ proc nimArgsPassingDone(p: pointer) {.compilerProc.} =
 const
   MaxThreadPoolSize* = 256 ## maximal size of the thread pool. 256 threads
                            ## should be good enough for anybody ;-)
+  MaxDistinguishedThread* = 32 ## maximal number of "distinguished" threads.
+
+type
+  ThreadId* = range[0..MaxDistinguishedThread-1]
 
 var
   currentPoolSize: int
@@ -291,10 +295,24 @@ proc slave(w: ptr Worker) {.thread.} =
       w.shutdown = false
       atomicDec currentPoolSize
 
+proc distinguishedSlave(w: ptr Worker) {.thread.} =
+  while true:
+    when declared(atomicStoreN):
+      atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST)
+    else:
+      w.ready = true
+    await(w.taskArrived)
+    assert(not w.ready)
+    w.f(w, w.data)
+    if w.q.len != 0: w.cleanFlowVars
+
 var
   workers: array[MaxThreadPoolSize, TThread[ptr Worker]]
   workersData: array[MaxThreadPoolSize, Worker]
 
+  distinguished: array[MaxDistinguishedThread, TThread[ptr Worker]]
+  distinguishedData: array[MaxDistinguishedThread, Worker]
+
 proc setMinPoolSize*(size: range[1..MaxThreadPoolSize]) =
   ## sets the minimal thread pool size. The default value of this is 4.
   minPoolSize = size
@@ -308,7 +326,7 @@ proc setMaxPoolSize*(size: range[1..MaxThreadPoolSize]) =
       let w = addr(workersData[i])
       w.shutdown = true
 
-proc activateThread(i: int) {.noinline.} =
+proc activateWorkerThread(i: int) {.noinline.} =
   workersData[i].taskArrived = createSemaphore()
   workersData[i].taskStarted = createSemaphore()
   workersData[i].initialized = true
@@ -316,10 +334,18 @@ proc activateThread(i: int) {.noinline.} =
   initLock(workersData[i].q.lock)
   createThread(workers[i], slave, addr(workersData[i]))
 
+proc activateDistinguishedThread(i: int) {.noinline.} =
+  distinguishedData[i].taskArrived = createSemaphore()
+  distinguishedData[i].taskStarted = createSemaphore()
+  distinguishedData[i].initialized = true
+  distinguishedData[i].q.empty = createSemaphore()
+  initLock(distinguishedData[i].q.lock)
+  createThread(distinguished[i], distinguishedSlave, addr(distinguishedData[i]))
+
 proc setup() =
   currentPoolSize = min(countProcessors(), MaxThreadPoolSize)
   readyWorker = addr(workersData[0])
-  for i in 0.. <currentPoolSize: activateThread(i)
+  for i in 0.. <currentPoolSize: activateWorkerThread(i)
 
 proc preferSpawn*(): bool =
   ## Use this proc to determine quickly if a 'spawn' or a direct call is
@@ -333,6 +359,13 @@ proc spawn*(call: expr): expr {.magic: "Spawn".}
   ## is gcsafe and has a return type that is either 'void' or compatible
   ## with ``FlowVar[T]``.
 
+proc pinnedSpawn*(id: ThreadId; call: expr): expr {.magic: "Spawn".}
+  ## always spawns a new task on the worker thread with ``id``, so that
+  ## the 'call' is **always** executed on
+  ## the this thread. 'call' has to be proc call 'p(...)' where 'p'
+  ## is gcsafe and has a return type that is either 'void' or compatible
+  ## with ``FlowVar[T]``.
+
 template spawnX*(call: expr): expr =
   ## spawns a new task if a CPU core is ready, otherwise executes the
   ## call in the calling thread. Usually it is advised to
@@ -353,7 +386,7 @@ var
 
 initLock stateLock
 
-proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} =
+proc nimSpawn3(fn: WorkerProc; data: pointer) {.compilerProc.} =
   # implementation of 'spawn' that is used by the code generator.
   while true:
     if selectWorker(readyWorker, fn, data): return
@@ -370,7 +403,7 @@ proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} =
         of doCreateThread:
           if currentPoolSize < maxPoolSize:
             if not workersData[currentPoolSize].initialized:
-              activateThread(currentPoolSize)
+              activateWorkerThread(currentPoolSize)
             let w = addr(workersData[currentPoolSize])
             atomicInc currentPoolSize
             if selectWorker(w, fn, data):
@@ -387,6 +420,21 @@ proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} =
       # other thread succeeded, so we don't need to do anything here.
     await(gSomeReady)
 
+var
+  distinguishedLock: TLock
+
+initLock distinguishedLock
+
+proc nimSpawn4(fn: WorkerProc; data: pointer; id: ThreadId) {.compilerProc.} =
+  acquire(distinguishedLock)
+  if not distinguishedData[id].initialized:
+    activateDistinguishedThread(id)
+  while true:
+    if selectWorker(addr(distinguishedData[id]), fn, data): break
+    cpuRelax()
+    # XXX exponential backoff?
+  release(distinguishedLock)
+
 proc sync*() =
   ## a simple barrier to wait for all spawn'ed tasks. If you need more elaborate
   ## waiting, you have to use an explicit barrier.