diff options
-rw-r--r-- | compiler/lowerings.nim | 13 | ||||
-rw-r--r-- | compiler/semexprs.nim | 16 | ||||
-rw-r--r-- | compiler/sempass2.nim | 5 | ||||
-rw-r--r-- | lib/pure/concurrency/threadpool.nim | 56 |
4 files changed, 73 insertions, 17 deletions
diff --git a/compiler/lowerings.nim b/compiler/lowerings.nim index b6b01d558..842bb01f6 100644 --- a/compiler/lowerings.nim +++ b/compiler/lowerings.nim @@ -167,7 +167,7 @@ proc genDeref*(n: PNode): PNode = result.add n proc callCodegenProc*(name: string, arg1: PNode; - arg2, arg3: PNode = nil): PNode = + arg2, arg3, optionalArgs: PNode = nil): PNode = result = newNodeI(nkCall, arg1.info) let sym = magicsys.getCompilerProc(name) if sym == nil: @@ -177,6 +177,9 @@ proc callCodegenProc*(name: string, arg1: PNode; result.add arg1 if arg2 != nil: result.add arg2 if arg3 != nil: result.add arg3 + if optionalArgs != nil: + for i in 1..optionalArgs.len-3: + result.add optionalArgs[i] result.typ = sym.typ.sons[0] proc callProc(a: PNode): PNode = @@ -483,7 +486,7 @@ proc wrapProcForSpawn*(owner: PSym; spawnExpr: PNode; retType: PType; barrier, dest: PNode = nil): PNode = # if 'barrier' != nil, then it is in a 'parallel' section and we # generate quite different code - let n = spawnExpr[1] + let n = spawnExpr[^2] let spawnKind = spawnResult(retType, barrier!=nil) case spawnKind of srVoid: @@ -569,7 +572,7 @@ proc wrapProcForSpawn*(owner: PSym; spawnExpr: PNode; retType: PType; fvField = newDotExpr(scratchObj, field) fvAsExpr = indirectAccess(castExpr, field, n.info) # create flowVar: - result.add newFastAsgnStmt(fvField, callProc(spawnExpr[2])) + result.add newFastAsgnStmt(fvField, callProc(spawnExpr[^1])) if barrier == nil: result.add callCodegenProc("nimFlowVarCreateSemaphore", fvField) @@ -584,7 +587,7 @@ proc wrapProcForSpawn*(owner: PSym; spawnExpr: PNode; retType: PType; let wrapper = createWrapperProc(fn, threadParam, argsParam, varSection, varInit, call, barrierAsExpr, fvAsExpr, spawnKind) - result.add callCodegenProc("nimSpawn", wrapper.newSymNode, - genAddrOf(scratchObj.newSymNode)) + result.add callCodegenProc("nimSpawn" & $spawnExpr.len, wrapper.newSymNode, + genAddrOf(scratchObj.newSymNode), nil, spawnExpr) if spawnKind == srFlowVar: result.add fvField diff --git a/compiler/semexprs.nim b/compiler/semexprs.nim index cd6ba3753..be1461c3c 100644 --- a/compiler/semexprs.nim +++ b/compiler/semexprs.nim @@ -1727,13 +1727,17 @@ proc semMagic(c: PContext, n: PNode, s: PSym, flags: TExprFlags): PNode = dec c.inParallelStmt of mSpawn: result = setMs(n, s) - result.sons[1] = semExpr(c, n.sons[1]) - if not result[1].typ.isEmptyType: - if spawnResult(result[1].typ, c.inParallelStmt > 0) == srFlowVar: - result.typ = createFlowVar(c, result[1].typ, n.info) + for i in 1 .. <n.len: + result.sons[i] = semExpr(c, n.sons[i]) + let typ = result[^1].typ + if not typ.isEmptyType: + if spawnResult(typ, c.inParallelStmt > 0) == srFlowVar: + result.typ = createFlowVar(c, typ, n.info) else: - result.typ = result[1].typ - result.add instantiateCreateFlowVarCall(c, result[1].typ, n.info).newSymNode + result.typ = typ + result.add instantiateCreateFlowVarCall(c, typ, n.info).newSymNode + else: + result.add emptyNode of mProcCall: result = setMs(n, s) result.sons[1] = semExpr(c, n.sons[1]) diff --git a/compiler/sempass2.nim b/compiler/sempass2.nim index adf03be64..517c37b0f 100644 --- a/compiler/sempass2.nim +++ b/compiler/sempass2.nim @@ -237,9 +237,10 @@ proc useVar(a: PEffects, n: PNode) = message(n.info, warnUninit, s.name.s) # prevent superfluous warnings about the same variable: a.init.add s.id - if {sfGlobal, sfThread} * s.flags == {sfGlobal} and s.kind in {skVar, skLet}: + if {sfGlobal, sfThread} * s.flags != {} and s.kind in {skVar, skLet}: if s.guard != nil: guardGlobal(a, n, s.guard) - if (tfHasGCedMem in s.typ.flags or s.typ.isGCedMem): + if {sfGlobal, sfThread} * s.flags == {sfGlobal} and + (tfHasGCedMem in s.typ.flags or s.typ.isGCedMem): #if warnGcUnsafe in gNotes: warnAboutGcUnsafe(n) markGcUnsafe(a, s) diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim index 9f1e53fb8..10117183a 100644 --- a/lib/pure/concurrency/threadpool.nim +++ b/lib/pure/concurrency/threadpool.nim @@ -267,6 +267,10 @@ proc nimArgsPassingDone(p: pointer) {.compilerProc.} = const MaxThreadPoolSize* = 256 ## maximal size of the thread pool. 256 threads ## should be good enough for anybody ;-) + MaxDistinguishedThread* = 32 ## maximal number of "distinguished" threads. + +type + ThreadId* = range[0..MaxDistinguishedThread-1] var currentPoolSize: int @@ -291,10 +295,24 @@ proc slave(w: ptr Worker) {.thread.} = w.shutdown = false atomicDec currentPoolSize +proc distinguishedSlave(w: ptr Worker) {.thread.} = + while true: + when declared(atomicStoreN): + atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST) + else: + w.ready = true + await(w.taskArrived) + assert(not w.ready) + w.f(w, w.data) + if w.q.len != 0: w.cleanFlowVars + var workers: array[MaxThreadPoolSize, TThread[ptr Worker]] workersData: array[MaxThreadPoolSize, Worker] + distinguished: array[MaxDistinguishedThread, TThread[ptr Worker]] + distinguishedData: array[MaxDistinguishedThread, Worker] + proc setMinPoolSize*(size: range[1..MaxThreadPoolSize]) = ## sets the minimal thread pool size. The default value of this is 4. minPoolSize = size @@ -308,7 +326,7 @@ proc setMaxPoolSize*(size: range[1..MaxThreadPoolSize]) = let w = addr(workersData[i]) w.shutdown = true -proc activateThread(i: int) {.noinline.} = +proc activateWorkerThread(i: int) {.noinline.} = workersData[i].taskArrived = createSemaphore() workersData[i].taskStarted = createSemaphore() workersData[i].initialized = true @@ -316,10 +334,18 @@ proc activateThread(i: int) {.noinline.} = initLock(workersData[i].q.lock) createThread(workers[i], slave, addr(workersData[i])) +proc activateDistinguishedThread(i: int) {.noinline.} = + distinguishedData[i].taskArrived = createSemaphore() + distinguishedData[i].taskStarted = createSemaphore() + distinguishedData[i].initialized = true + distinguishedData[i].q.empty = createSemaphore() + initLock(distinguishedData[i].q.lock) + createThread(distinguished[i], distinguishedSlave, addr(distinguishedData[i])) + proc setup() = currentPoolSize = min(countProcessors(), MaxThreadPoolSize) readyWorker = addr(workersData[0]) - for i in 0.. <currentPoolSize: activateThread(i) + for i in 0.. <currentPoolSize: activateWorkerThread(i) proc preferSpawn*(): bool = ## Use this proc to determine quickly if a 'spawn' or a direct call is @@ -333,6 +359,13 @@ proc spawn*(call: expr): expr {.magic: "Spawn".} ## is gcsafe and has a return type that is either 'void' or compatible ## with ``FlowVar[T]``. +proc pinnedSpawn*(id: ThreadId; call: expr): expr {.magic: "Spawn".} + ## always spawns a new task on the worker thread with ``id``, so that + ## the 'call' is **always** executed on + ## the this thread. 'call' has to be proc call 'p(...)' where 'p' + ## is gcsafe and has a return type that is either 'void' or compatible + ## with ``FlowVar[T]``. + template spawnX*(call: expr): expr = ## spawns a new task if a CPU core is ready, otherwise executes the ## call in the calling thread. Usually it is advised to @@ -353,7 +386,7 @@ var initLock stateLock -proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} = +proc nimSpawn3(fn: WorkerProc; data: pointer) {.compilerProc.} = # implementation of 'spawn' that is used by the code generator. while true: if selectWorker(readyWorker, fn, data): return @@ -370,7 +403,7 @@ proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} = of doCreateThread: if currentPoolSize < maxPoolSize: if not workersData[currentPoolSize].initialized: - activateThread(currentPoolSize) + activateWorkerThread(currentPoolSize) let w = addr(workersData[currentPoolSize]) atomicInc currentPoolSize if selectWorker(w, fn, data): @@ -387,6 +420,21 @@ proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} = # other thread succeeded, so we don't need to do anything here. await(gSomeReady) +var + distinguishedLock: TLock + +initLock distinguishedLock + +proc nimSpawn4(fn: WorkerProc; data: pointer; id: ThreadId) {.compilerProc.} = + acquire(distinguishedLock) + if not distinguishedData[id].initialized: + activateDistinguishedThread(id) + while true: + if selectWorker(addr(distinguishedData[id]), fn, data): break + cpuRelax() + # XXX exponential backoff? + release(distinguishedLock) + proc sync*() = ## a simple barrier to wait for all spawn'ed tasks. If you need more elaborate ## waiting, you have to use an explicit barrier. |