diff options
-rw-r--r-- | compiler/ccgexprs.nim | 2 | ||||
-rw-r--r-- | compiler/lowerings.nim | 45 | ||||
-rw-r--r-- | compiler/pragmas.nim | 11 | ||||
-rw-r--r-- | compiler/semexprs.nim | 11 | ||||
-rw-r--r-- | compiler/semparallel.nim | 6 | ||||
-rw-r--r-- | lib/pure/concurrency/threadpool.nim | 124 | ||||
-rw-r--r-- | lib/system/atomics.nim | 8 | ||||
-rw-r--r-- | todo.txt | 14 |
8 files changed, 133 insertions, 88 deletions
diff --git a/compiler/ccgexprs.nim b/compiler/ccgexprs.nim index 34fdf5bf1..c0442711e 100644 --- a/compiler/ccgexprs.nim +++ b/compiler/ccgexprs.nim @@ -1636,7 +1636,7 @@ proc genMagicExpr(p: BProc, e: PNode, d: var TLoc, op: TMagic) = of mSlurp..mQuoteAst: localError(e.info, errXMustBeCompileTime, e.sons[0].sym.name.s) of mSpawn: - let n = lowerings.wrapProcForSpawn(p.module.module, e[1], e.typ, nil, nil) + let n = lowerings.wrapProcForSpawn(p.module.module, e, e.typ, nil, nil) expr(p, n, d) of mParallel: let n = semparallel.liftParallel(p.module.module, e) diff --git a/compiler/lowerings.nim b/compiler/lowerings.nim index af4daf785..327a18df5 100644 --- a/compiler/lowerings.nim +++ b/compiler/lowerings.nim @@ -86,7 +86,7 @@ proc indirectAccess*(a: PNode, b: string, info: TLineInfo): PNode = # returns a[].b as a node var deref = newNodeI(nkHiddenDeref, info) deref.typ = a.typ.skipTypes(abstractInst).sons[0] - var t = deref.typ + var t = deref.typ.skipTypes(abstractInst) var field: PSym while true: assert t.kind == tyObject @@ -94,6 +94,7 @@ proc indirectAccess*(a: PNode, b: string, info: TLineInfo): PNode = if field != nil: break t = t.sons[0] if t == nil: break + t = t.skipTypes(abstractInst) assert field != nil, b addSon(deref, a) result = newNodeI(nkDotExpr, info) @@ -132,6 +133,11 @@ proc callCodegenProc*(name: string, arg1: PNode; if arg3 != nil: result.add arg3 result.typ = sym.typ.sons[0] +proc callProc(a: PNode): PNode = + result = newNodeI(nkCall, a.info) + result.add a + result.typ = a.typ.sons[0] + # we have 4 cases to consider: # - a void proc --> nothing to do # - a proc returning GC'ed memory --> requires a promise @@ -169,14 +175,14 @@ proc addLocalVar(varSection: PNode; owner: PSym; typ: PType; v: PNode): PSym = discard """ We generate roughly this: -proc f_wrapper(args) = +proc f_wrapper(thread, args) = barrierEnter(args.barrier) # for parallel statement var a = args.a # thread transfer; deepCopy or shallowCopy or no copy # depending on whether we're in a 'parallel' statement var b = args.b + var prom = args.prom - args.prom = nimCreatePromise(thread, sizeof(T)) # optional - nimPromiseCreateCondVar(args.prom) # optional + prom.owner = thread # optional nimArgsPassingDone() # signal parent that the work is done # args.prom.blob = f(a, b, ...) @@ -196,17 +202,6 @@ stmtList: """ -proc createNimCreatePromiseCall(prom, threadParam: PNode): PNode = - let size = newNodeIT(nkCall, prom.info, getSysType(tyInt)) - size.add newSymNode(createMagic("sizeof", mSizeOf)) - assert prom.typ.kind == tyGenericInst - size.add newNodeIT(nkType, prom.info, prom.typ.sons[1]) - - let castExpr = newNodeIT(nkCast, prom.info, prom.typ) - castExpr.add emptyNode - castExpr.add callCodeGenProc("nimCreatePromise", threadParam, size) - result = castExpr - proc createWrapperProc(f: PNode; threadParam, argsParam: PSym; varSection, call, barrier, prom: PNode; spawnKind: TSpawnResult): PSym = @@ -223,14 +218,14 @@ proc createWrapperProc(f: PNode; threadParam, argsParam: PSym; threadLocalProm = addLocalVar(varSection, argsParam.owner, prom.typ, prom) elif prom != nil: internalAssert prom.typ.kind == tyGenericInst - threadLocalProm = addLocalVar(varSection, argsParam.owner, prom.typ, - createNimCreatePromiseCall(prom, threadParam.newSymNode)) + threadLocalProm = addLocalVar(varSection, argsParam.owner, prom.typ, prom) body.add varSection if prom != nil and spawnKind != srByVar: - body.add newFastAsgnStmt(prom, threadLocalProm.newSymNode) - if barrier == nil: - body.add callCodeGenProc("nimPromiseCreateCondVar", prom) + # generate: + # prom.owner = threadParam + body.add newAsgnStmt(indirectAccess(threadLocalProm.newSymNode, + "owner", prom.info), threadParam.newSymNode) body.add callCodeGenProc("nimArgsPassingDone", threadParam.newSymNode) if spawnKind == srByVar: @@ -404,10 +399,11 @@ proc setupArgsForParallelism(n: PNode; objType: PType; scratchObj: PSym; indirectAccess(castExpr, field, n.info)) call.add(threadLocal.newSymNode) -proc wrapProcForSpawn*(owner: PSym; n: PNode; retType: PType; +proc wrapProcForSpawn*(owner: PSym; spawnExpr: PNode; retType: PType; barrier, dest: PNode = nil): PNode = # if 'barrier' != nil, then it is in a 'parallel' section and we # generate quite different code + let n = spawnExpr[1] let spawnKind = spawnResult(retType, barrier!=nil) case spawnKind of srVoid: @@ -419,7 +415,7 @@ proc wrapProcForSpawn*(owner: PSym; n: PNode; retType: PType; of srByVar: if dest == nil: localError(n.info, "'spawn' must not be discarded") result = newNodeI(nkStmtList, n.info) - + if n.kind notin nkCallKinds: localError(n.info, "'spawn' takes a call expression") return @@ -489,6 +485,11 @@ proc wrapProcForSpawn*(owner: PSym; n: PNode; retType: PType; objType.addField(field) promField = newDotExpr(scratchObj, field) promAsExpr = indirectAccess(castExpr, field, n.info) + # create promise: + result.add newFastAsgnStmt(promField, callProc(spawnExpr[2])) + if barrier == nil: + result.add callCodeGenProc("nimPromiseCreateCondVar", promField) + elif spawnKind == srByVar: var field = newSym(skField, getIdent"prom", owner, n.info) field.typ = newType(tyPtr, objType.owner) diff --git a/compiler/pragmas.nim b/compiler/pragmas.nim index db9fe7cbe..aed0e1850 100644 --- a/compiler/pragmas.nim +++ b/compiler/pragmas.nim @@ -644,12 +644,13 @@ proc singlePragma(c: PContext, sym: PSym, n: PNode, i: int, incl(sym.flags, sfNoReturn) of wDynlib: processDynLib(c, it, sym) - of wCompilerproc: + of wCompilerproc: noVal(it) # compilerproc may not get a string! - makeExternExport(sym, "$1", it.info) - incl(sym.flags, sfCompilerProc) - incl(sym.flags, sfUsed) # suppress all those stupid warnings - registerCompilerProc(sym) + if sfFromGeneric notin sym.flags: + makeExternExport(sym, "$1", it.info) + incl(sym.flags, sfCompilerProc) + incl(sym.flags, sfUsed) # suppress all those stupid warnings + registerCompilerProc(sym) of wProcVar: noVal(it) incl(sym.flags, sfProcvar) diff --git a/compiler/semexprs.nim b/compiler/semexprs.nim index e507e711f..9e3785185 100644 --- a/compiler/semexprs.nim +++ b/compiler/semexprs.nim @@ -1585,6 +1585,16 @@ proc createPromise(c: PContext; t: PType; info: TLineInfo): PType = addSonSkipIntLit(result, t) result = instGenericContainer(c, info, result, allowMetaTypes = false) +proc instantiateCreatePromiseCall(c: PContext; t: PType; + info: TLineInfo): PSym = + let sym = magicsys.getCompilerProc("nimCreatePromise") + if sym == nil: + localError(info, errSystemNeeds, "nimCreatePromise") + var bindings: TIdTable + initIdTable(bindings) + bindings.idTablePut(sym.ast[genericParamsPos].sons[0].typ, t) + result = c.semGenerateInstance(c, sym, bindings, info) + proc setMs(n: PNode, s: PSym): PNode = result = n n.sons[0] = newSymNode(s) @@ -1626,6 +1636,7 @@ proc semMagic(c: PContext, n: PNode, s: PSym, flags: TExprFlags): PNode = result.typ = result[1].typ else: result.typ = createPromise(c, result[1].typ, n.info) + result.add instantiateCreatePromiseCall(c, result[1].typ, n.info).newSymNode else: result = semDirectOp(c, n, flags) proc semWhen(c: PContext, n: PNode, semCheck = true): PNode = diff --git a/compiler/semparallel.nim b/compiler/semparallel.nim index 72def1137..c594a4788 100644 --- a/compiler/semparallel.nim +++ b/compiler/semparallel.nim @@ -406,19 +406,19 @@ proc transformSpawn(owner: PSym; n, barrier: PNode): PNode = if result.isNil: result = newNodeI(nkStmtList, n.info) result.add n - result.add wrapProcForSpawn(owner, m[1], b.typ, barrier, it[0]) + result.add wrapProcForSpawn(owner, m, b.typ, barrier, it[0]) it.sons[it.len-1] = emptyNode if result.isNil: result = n of nkAsgn, nkFastAsgn: let b = n[1] if getMagic(b) == mSpawn: let m = transformSlices(b) - return wrapProcForSpawn(owner, m[1], b.typ, barrier, n[0]) + return wrapProcForSpawn(owner, m, b.typ, barrier, n[0]) result = transformSpawnSons(owner, n, barrier) of nkCallKinds: if getMagic(n) == mSpawn: result = transformSlices(n) - return wrapProcForSpawn(owner, result[1], n.typ, barrier, nil) + return wrapProcForSpawn(owner, result, n.typ, barrier, nil) result = transformSpawnSons(owner, n, barrier) elif n.safeLen > 0: result = transformSpawnSons(owner, n, barrier) diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim index 92d5011f4..8129d03ae 100644 --- a/lib/pure/concurrency/threadpool.nim +++ b/lib/pure/concurrency/threadpool.nim @@ -85,25 +85,26 @@ type cv: CondVar idx: int - RawPromise* = ptr RawPromiseObj ## untyped base class for 'Promise[T]' - RawPromiseObj {.inheritable.} = object # \ - # we allocate this with the thread local allocator; this - # is possible since we already need to do the GC_unref - # on the owning thread + RawPromise* = ref RawPromiseObj ## untyped base class for 'Promise[T]' + RawPromiseObj = object of TObject ready, usesCondVar: bool cv: CondVar #\ # for 'awaitAny' support ai: ptr AwaitInfo idx: int - data: PObject # we incRef and unref it to keep it alive - owner: ptr Worker - next: RawPromise - align: float64 # a float for proper alignment + data: pointer # we incRef and unref it to keep it alive + owner: pointer # ptr Worker - Promise* {.compilerProc.} [T] = ptr object of RawPromiseObj - blob: T ## the underlying value, if available. Note that usually - ## you should not access this field directly! However it can - ## sometimes be more efficient than getting the value via ``^``. + PromiseObj[T] = object of RawPromiseObj + blob: T + + Promise*{.compilerProc.}[T] = ref PromiseObj[T] + + ToFreeQueue = object + len: int + lock: TLock + empty: TCond + data: array[512, pointer] WorkerProc = proc (thread, args: pointer) {.nimcall, gcsafe.} Worker = object @@ -115,37 +116,55 @@ type ready: bool # put it here for correct alignment! initialized: bool # whether it has even been initialized shutdown: bool # the pool requests to shut down this worker thread - promiseLock: TLock - head: RawPromise + q: ToFreeQueue + +proc await*(prom: RawPromise) = + ## waits until the value for the promise arrives. Usually it is not necessary + ## to call this explicitly. + if prom.usesCondVar: + prom.usesCondVar = false + await(prom.cv) + destroyCondVar(prom.cv) -proc finished*(prom: RawPromise) = - ## This MUST be called for every created promise to free its associated - ## resources. Note that the default reading operation ``^`` is destructive - ## and calls ``finished``. +proc finished(prom: RawPromise) = doAssert prom.ai.isNil, "promise is still attached to an 'awaitAny'" - assert prom.next == nil - let w = prom.owner - acquire(w.promiseLock) - prom.next = w.head - w.head = prom - release(w.promiseLock) + # we have to protect against the rare cases where the owner of the promise + # simply disregards the promise and yet the "promiser" has not yet written + # anything to it: + await(prom) + if prom.data.isNil: return + let owner = cast[ptr Worker](prom.owner) + let q = addr(owner.q) + var waited = false + while true: + acquire(q.lock) + if q.len < q.data.len: + q.data[q.len] = prom.data + inc q.len + release(q.lock) + break + else: + # the queue is exhausted! We block until it has been cleaned: + release(q.lock) + wait(q.empty, q.lock) + waited = true + prom.data = nil + # wakeup other potentially waiting threads: + if waited: signal(q.empty) proc cleanPromises(w: ptr Worker) = - var it = w.head - acquire(w.promiseLock) - while it != nil: - let nxt = it.next - if it.usesCondVar: destroyCondVar(it.cv) - if it.data != nil: GC_unref(it.data) - dealloc(it) - it = nxt - w.head = nil - release(w.promiseLock) - -proc nimCreatePromise(owner: pointer; blobSize: int): RawPromise {. - compilerProc.} = - result = cast[RawPromise](alloc0(RawPromiseObj.sizeof + blobSize)) - result.owner = cast[ptr Worker](owner) + let q = addr(w.q) + acquire(q.lock) + for i in 0 .. <q.len: + GC_unref(cast[PObject](q.data[i])) + q.len = 0 + release(q.lock) + signal(q.empty) + +proc promFinalizer[T](prom: Promise[T]) = finished(prom) + +proc nimCreatePromise[T](): Promise[T] {.compilerProc.} = + new(result, promFinalizer) proc nimPromiseCreateCondVar(prom: RawPromise) {.compilerProc.} = prom.cv = createCondVar() @@ -160,16 +179,12 @@ proc nimPromiseSignal(prom: RawPromise) {.compilerProc.} = signal(prom.ai.cv.c) if prom.usesCondVar: signal(prom.cv) -proc await*[T](prom: Promise[T]) = - ## waits until the value for the promise arrives. - if prom.usesCondVar: await(prom.cv) - proc awaitAndThen*[T](prom: Promise[T]; action: proc (x: T) {.closure.}) = ## blocks until the ``prom`` is available and then passes its value ## to ``action``. Note that due to Nimrod's parameter passing semantics this ## means that ``T`` doesn't need to be copied and so ``awaitAndThen`` can ## sometimes be more efficient than ``^``. - if prom.usesCondVar: await(prom) + await(prom) when T is string or T is seq: action(cast[T](prom.data)) elif T is ref: @@ -179,23 +194,17 @@ proc awaitAndThen*[T](prom: Promise[T]; action: proc (x: T) {.closure.}) = finished(prom) proc `^`*[T](prom: Promise[ref T]): foreign ptr T = - ## blocks until the value is available and then returns this value. Note - ## this reading is destructive for reasons of efficiency and convenience. - ## This calls ``finished(prom)``. - if prom.usesCondVar: await(prom) + ## blocks until the value is available and then returns this value. + await(prom) result = cast[foreign ptr T](prom.data) - finished(prom) proc `^`*[T](prom: Promise[T]): T = - ## blocks until the value is available and then returns this value. Note - ## this reading is destructive for reasons of efficiency and convenience. - ## This calls ``finished(prom)``. - if prom.usesCondVar: await(prom) + ## blocks until the value is available and then returns this value. + await(prom) when T is string or T is seq: result = cast[T](prom.data) else: result = prom.blob - finished(prom) proc awaitAny*(promises: openArray[RawPromise]): int = # awaits any of the given promises. Returns the index of one promise for which @@ -245,7 +254,7 @@ proc slave(w: ptr Worker) {.thread.} = await(w.taskArrived) assert(not w.ready) w.f(w, w.data) - if w.head != nil: w.cleanPromises + if w.q.len != 0: w.cleanPromises if w.shutdown: w.shutdown = false atomicDec currentPoolSize @@ -266,8 +275,9 @@ var proc activateThread(i: int) {.noinline.} = workersData[i].taskArrived = createCondVar() workersData[i].taskStarted = createCondVar() - initLock workersData[i].promiseLock workersData[i].initialized = true + initCond(workersData[i].q.empty) + initLock(workersData[i].q.lock) createThread(workers[i], slave, addr(workersData[i])) proc setup() = diff --git a/lib/system/atomics.nim b/lib/system/atomics.nim index 96246ba01..6e2bd9a97 100644 --- a/lib/system/atomics.nim +++ b/lib/system/atomics.nim @@ -153,9 +153,11 @@ when (defined(gcc) or defined(llvm_gcc)) and hasThreadSupport: ## A value of 0 indicates typical alignment should be used. The compiler may also ## ignore this parameter. + template fence*() = atomicThreadFence(ATOMIC_SEQ_CST) elif defined(vcc) and hasThreadSupport: proc addAndFetch*(p: ptr int, val: int): int {. importc: "NimXadd", nodecl.} + else: proc addAndFetch*(p: ptr int, val: int): int {.inline.} = inc(p[], val) @@ -231,4 +233,10 @@ elif false: proc cpuRelax {.inline.} = os.sleep(1) +when not defined(fence) and hasThreadSupport: + # XXX fixme + proc fence*() {.inline.} = + var dummy: bool + discard cas(addr dummy, false, true) + {.pop.} diff --git a/todo.txt b/todo.txt index 996067175..8a351e8a7 100644 --- a/todo.txt +++ b/todo.txt @@ -1,6 +1,20 @@ version 0.9.6 ============= +Concurrency +----------- + +- document the new 'spawn' and 'parallel' statements +- implement 'deepCopy' builtin +- implement 'foo[1..4] = spawn(f[4..7])' +- the disjoint checker needs to deal with 'a = spawn f(); g = spawn f()' +- Minor: The copying of the 'ref Promise' into the thead local storage only + happens to work due to the write barrier's implementation + + +Misc +---- + - fix the bug that keeps 'defer' template from working - make '--implicitStatic:on' the default - fix the tuple unpacking in lambda bug |