diff options
-rw-r--r-- | compiler/lowerings.nim | 92 | ||||
-rw-r--r-- | compiler/semexprs.nim | 14 | ||||
-rw-r--r-- | lib/pure/concurrency/threadpool.nim | 124 |
3 files changed, 115 insertions, 115 deletions
diff --git a/compiler/lowerings.nim b/compiler/lowerings.nim index 327a18df5..e2afa4362 100644 --- a/compiler/lowerings.nim +++ b/compiler/lowerings.nim @@ -140,26 +140,26 @@ proc callProc(a: PNode): PNode = # we have 4 cases to consider: # - a void proc --> nothing to do -# - a proc returning GC'ed memory --> requires a promise +# - a proc returning GC'ed memory --> requires a flowVar # - a proc returning non GC'ed memory --> pass as hidden 'var' parameter -# - not in a parallel environment --> requires a promise for memory safety +# - not in a parallel environment --> requires a flowVar for memory safety type TSpawnResult = enum - srVoid, srPromise, srByVar - TPromiseKind = enum - promInvalid # invalid type T for 'Promise[T]' - promGC # Promise of a GC'ed type - promBlob # Promise of a blob type + srVoid, srFlowVar, srByVar + TFlowVarKind = enum + fvInvalid # invalid type T for 'FlowVar[T]' + fvGC # FlowVar of a GC'ed type + fvBlob # FlowVar of a blob type proc spawnResult(t: PType; inParallel: bool): TSpawnResult = if t.isEmptyType: srVoid elif inParallel and not containsGarbageCollectedRef(t): srByVar - else: srPromise + else: srFlowVar -proc promiseKind(t: PType): TPromiseKind = - if t.skipTypes(abstractInst).kind in {tyRef, tyString, tySequence}: promGC - elif containsGarbageCollectedRef(t): promInvalid - else: promBlob +proc flowVarKind(t: PType): TFlowVarKind = + if t.skipTypes(abstractInst).kind in {tyRef, tyString, tySequence}: fvGC + elif containsGarbageCollectedRef(t): fvInvalid + else: fvBlob proc addLocalVar(varSection: PNode; owner: PSym; typ: PType; v: PNode): PSym = result = newSym(skTemp, getIdent(genPrefix), owner, varSection.info) @@ -180,13 +180,13 @@ proc f_wrapper(thread, args) = var a = args.a # thread transfer; deepCopy or shallowCopy or no copy # depending on whether we're in a 'parallel' statement var b = args.b - var prom = args.prom + var fv = args.fv - prom.owner = thread # optional + fv.owner = thread # optional nimArgsPassingDone() # signal parent that the work is done # - args.prom.blob = f(a, b, ...) - nimPromiseSignal(args.prom) + args.fv.blob = f(a, b, ...) + nimFlowVarSignal(args.fv) # - or - f(a, b, ...) @@ -198,12 +198,12 @@ stmtList: scratchObj.b = b nimSpawn(f_wrapper, addr scratchObj) - scratchObj.prom # optional + scratchObj.fv # optional """ proc createWrapperProc(f: PNode; threadParam, argsParam: PSym; - varSection, call, barrier, prom: PNode; + varSection, call, barrier, fv: PNode; spawnKind: TSpawnResult): PSym = var body = newNodeI(nkStmtList, f.info) var threadLocalBarrier: PSym @@ -215,32 +215,32 @@ proc createWrapperProc(f: PNode; threadParam, argsParam: PSym; body.add callCodeGenProc("barrierEnter", threadLocalBarrier.newSymNode) var threadLocalProm: PSym if spawnKind == srByVar: - threadLocalProm = addLocalVar(varSection, argsParam.owner, prom.typ, prom) - elif prom != nil: - internalAssert prom.typ.kind == tyGenericInst - threadLocalProm = addLocalVar(varSection, argsParam.owner, prom.typ, prom) + threadLocalProm = addLocalVar(varSection, argsParam.owner, fv.typ, fv) + elif fv != nil: + internalAssert fv.typ.kind == tyGenericInst + threadLocalProm = addLocalVar(varSection, argsParam.owner, fv.typ, fv) body.add varSection - if prom != nil and spawnKind != srByVar: + if fv != nil and spawnKind != srByVar: # generate: - # prom.owner = threadParam + # fv.owner = threadParam body.add newAsgnStmt(indirectAccess(threadLocalProm.newSymNode, - "owner", prom.info), threadParam.newSymNode) + "owner", fv.info), threadParam.newSymNode) body.add callCodeGenProc("nimArgsPassingDone", threadParam.newSymNode) if spawnKind == srByVar: body.add newAsgnStmt(genDeref(threadLocalProm.newSymNode), call) - elif prom != nil: - let fk = prom.typ.sons[1].promiseKind - if fk == promInvalid: - localError(f.info, "cannot create a promise of type: " & - typeToString(prom.typ.sons[1])) + elif fv != nil: + let fk = fv.typ.sons[1].flowVarKind + if fk == fvInvalid: + localError(f.info, "cannot create a flowVar of type: " & + typeToString(fv.typ.sons[1])) body.add newAsgnStmt(indirectAccess(threadLocalProm.newSymNode, - if fk == promGC: "data" else: "blob", prom.info), call) + if fk == fvGC: "data" else: "blob", fv.info), call) if barrier == nil: - # by now 'prom' is shared and thus might have beeen overwritten! we need + # by now 'fv' is shared and thus might have beeen overwritten! we need # to use the thread-local view instead: - body.add callCodeGenProc("nimPromiseSignal", threadLocalProm.newSymNode) + body.add callCodeGenProc("nimFlowVarSignal", threadLocalProm.newSymNode) else: body.add call if barrier != nil: @@ -409,7 +409,7 @@ proc wrapProcForSpawn*(owner: PSym; spawnExpr: PNode; retType: PType; of srVoid: internalAssert dest == nil result = newNodeI(nkStmtList, n.info) - of srPromise: + of srFlowVar: internalAssert dest == nil result = newNodeIT(nkStmtListExpr, n.info, retType) of srByVar: @@ -478,29 +478,29 @@ proc wrapProcForSpawn*(owner: PSym; spawnExpr: PNode; retType: PType; result.add newFastAsgnStmt(newDotExpr(scratchObj, field), barrier) barrierAsExpr = indirectAccess(castExpr, field, n.info) - var promField, promAsExpr: PNode = nil - if spawnKind == srPromise: - var field = newSym(skField, getIdent"prom", owner, n.info) + var fvField, fvAsExpr: PNode = nil + if spawnKind == srFlowVar: + var field = newSym(skField, getIdent"fv", owner, n.info) field.typ = retType objType.addField(field) - promField = newDotExpr(scratchObj, field) - promAsExpr = indirectAccess(castExpr, field, n.info) - # create promise: - result.add newFastAsgnStmt(promField, callProc(spawnExpr[2])) + fvField = newDotExpr(scratchObj, field) + fvAsExpr = indirectAccess(castExpr, field, n.info) + # create flowVar: + result.add newFastAsgnStmt(fvField, callProc(spawnExpr[2])) if barrier == nil: - result.add callCodeGenProc("nimPromiseCreateCondVar", promField) + result.add callCodeGenProc("nimFlowVarCreateCondVar", fvField) elif spawnKind == srByVar: - var field = newSym(skField, getIdent"prom", owner, n.info) + var field = newSym(skField, getIdent"fv", owner, n.info) field.typ = newType(tyPtr, objType.owner) field.typ.rawAddSon(retType) objType.addField(field) - promAsExpr = indirectAccess(castExpr, field, n.info) + fvAsExpr = indirectAccess(castExpr, field, n.info) result.add newFastAsgnStmt(newDotExpr(scratchObj, field), genAddrOf(dest)) let wrapper = createWrapperProc(fn, threadParam, argsParam, varSection, call, - barrierAsExpr, promAsExpr, spawnKind) + barrierAsExpr, fvAsExpr, spawnKind) result.add callCodeGenProc("nimSpawn", wrapper.newSymNode, genAddrOf(scratchObj.newSymNode)) - if spawnKind == srPromise: result.add promField + if spawnKind == srFlowVar: result.add fvField diff --git a/compiler/semexprs.nim b/compiler/semexprs.nim index 9e3785185..5603f0e97 100644 --- a/compiler/semexprs.nim +++ b/compiler/semexprs.nim @@ -1579,17 +1579,17 @@ proc semShallowCopy(c: PContext, n: PNode, flags: TExprFlags): PNode = else: result = semDirectOp(c, n, flags) -proc createPromise(c: PContext; t: PType; info: TLineInfo): PType = +proc createFlowVar(c: PContext; t: PType; info: TLineInfo): PType = result = newType(tyGenericInvokation, c.module) - addSonSkipIntLit(result, magicsys.getCompilerProc("Promise").typ) + addSonSkipIntLit(result, magicsys.getCompilerProc("FlowVar").typ) addSonSkipIntLit(result, t) result = instGenericContainer(c, info, result, allowMetaTypes = false) -proc instantiateCreatePromiseCall(c: PContext; t: PType; +proc instantiateCreateFlowVarCall(c: PContext; t: PType; info: TLineInfo): PSym = - let sym = magicsys.getCompilerProc("nimCreatePromise") + let sym = magicsys.getCompilerProc("nimCreateFlowVar") if sym == nil: - localError(info, errSystemNeeds, "nimCreatePromise") + localError(info, errSystemNeeds, "nimCreateFlowVar") var bindings: TIdTable initIdTable(bindings) bindings.idTablePut(sym.ast[genericParamsPos].sons[0].typ, t) @@ -1635,8 +1635,8 @@ proc semMagic(c: PContext, n: PNode, s: PSym, flags: TExprFlags): PNode = if c.inParallelStmt > 0: result.typ = result[1].typ else: - result.typ = createPromise(c, result[1].typ, n.info) - result.add instantiateCreatePromiseCall(c, result[1].typ, n.info).newSymNode + result.typ = createFlowVar(c, result[1].typ, n.info) + result.add instantiateCreateFlowVarCall(c, result[1].typ, n.info).newSymNode else: result = semDirectOp(c, n, flags) proc semWhen(c: PContext, n: PNode, semCheck = true): PNode = diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim index c4ed42c05..c34b91e30 100644 --- a/lib/pure/concurrency/threadpool.nim +++ b/lib/pure/concurrency/threadpool.nim @@ -90,8 +90,8 @@ type cv: CondVar idx: int - RawPromise* = ref RawPromiseObj ## untyped base class for 'Promise[T]' - RawPromiseObj = object of TObject + RawFlowVar* = ref RawFlowVarObj ## untyped base class for 'FlowVar[T]' + RawFlowVarObj = object of TObject ready, usesCondVar: bool cv: CondVar #\ # for 'awaitAny' support @@ -100,10 +100,10 @@ type data: pointer # we incRef and unref it to keep it alive owner: pointer # ptr Worker - PromiseObj[T] = object of RawPromiseObj + FlowVarObj[T] = object of RawFlowVarObj blob: T - Promise*{.compilerProc.}[T] = ref PromiseObj[T] + FlowVar*{.compilerProc.}[T] = ref FlowVarObj[T] ## a data flow variable ToFreeQueue = object len: int @@ -123,28 +123,28 @@ type shutdown: bool # the pool requests to shut down this worker thread q: ToFreeQueue -proc await*(prom: RawPromise) = - ## waits until the value for the promise arrives. Usually it is not necessary +proc await*(fv: RawFlowVar) = + ## waits until the value for the flowVar arrives. Usually it is not necessary ## to call this explicitly. - if prom.usesCondVar: - prom.usesCondVar = false - await(prom.cv) - destroyCondVar(prom.cv) - -proc finished(prom: RawPromise) = - doAssert prom.ai.isNil, "promise is still attached to an 'awaitAny'" - # we have to protect against the rare cases where the owner of the promise - # simply disregards the promise and yet the "promiser" has not yet written + if fv.usesCondVar: + fv.usesCondVar = false + await(fv.cv) + destroyCondVar(fv.cv) + +proc finished(fv: RawFlowVar) = + doAssert fv.ai.isNil, "flowVar is still attached to an 'awaitAny'" + # we have to protect against the rare cases where the owner of the flowVar + # simply disregards the flowVar and yet the "flowVarr" has not yet written # anything to it: - await(prom) - if prom.data.isNil: return - let owner = cast[ptr Worker](prom.owner) + await(fv) + if fv.data.isNil: return + let owner = cast[ptr Worker](fv.owner) let q = addr(owner.q) var waited = false while true: acquire(q.lock) if q.len < q.data.len: - q.data[q.len] = prom.data + q.data[q.len] = fv.data inc q.len release(q.lock) break @@ -153,11 +153,11 @@ proc finished(prom: RawPromise) = release(q.lock) wait(q.empty, q.lock) waited = true - prom.data = nil + fv.data = nil # wakeup other potentially waiting threads: if waited: signal(q.empty) -proc cleanPromises(w: ptr Worker) = +proc cleanFlowVars(w: ptr Worker) = let q = addr(w.q) acquire(q.lock) for i in 0 .. <q.len: @@ -166,72 +166,72 @@ proc cleanPromises(w: ptr Worker) = release(q.lock) signal(q.empty) -proc promFinalizer[T](prom: Promise[T]) = finished(prom) +proc fvFinalizer[T](fv: FlowVar[T]) = finished(fv) -proc nimCreatePromise[T](): Promise[T] {.compilerProc.} = - new(result, promFinalizer) +proc nimCreateFlowVar[T](): FlowVar[T] {.compilerProc.} = + new(result, fvFinalizer) -proc nimPromiseCreateCondVar(prom: RawPromise) {.compilerProc.} = - prom.cv = createCondVar() - prom.usesCondVar = true +proc nimFlowVarCreateCondVar(fv: RawFlowVar) {.compilerProc.} = + fv.cv = createCondVar() + fv.usesCondVar = true -proc nimPromiseSignal(prom: RawPromise) {.compilerProc.} = - if prom.ai != nil: - acquire(prom.ai.cv.L) - prom.ai.idx = prom.idx - inc prom.ai.cv.counter - release(prom.ai.cv.L) - signal(prom.ai.cv.c) - if prom.usesCondVar: signal(prom.cv) +proc nimFlowVarSignal(fv: RawFlowVar) {.compilerProc.} = + if fv.ai != nil: + acquire(fv.ai.cv.L) + fv.ai.idx = fv.idx + inc fv.ai.cv.counter + release(fv.ai.cv.L) + signal(fv.ai.cv.c) + if fv.usesCondVar: signal(fv.cv) -proc awaitAndThen*[T](prom: Promise[T]; action: proc (x: T) {.closure.}) = - ## blocks until the ``prom`` is available and then passes its value +proc awaitAndThen*[T](fv: FlowVar[T]; action: proc (x: T) {.closure.}) = + ## blocks until the ``fv`` is available and then passes its value ## to ``action``. Note that due to Nimrod's parameter passing semantics this ## means that ``T`` doesn't need to be copied and so ``awaitAndThen`` can ## sometimes be more efficient than ``^``. - await(prom) + await(fv) when T is string or T is seq: - action(cast[T](prom.data)) + action(cast[T](fv.data)) elif T is ref: - {.error: "'awaitAndThen' not available for Promise[ref]".} + {.error: "'awaitAndThen' not available for FlowVar[ref]".} else: - action(prom.blob) - finished(prom) + action(fv.blob) + finished(fv) -proc `^`*[T](prom: Promise[ref T]): foreign ptr T = +proc `^`*[T](fv: FlowVar[ref T]): foreign ptr T = ## blocks until the value is available and then returns this value. - await(prom) - result = cast[foreign ptr T](prom.data) + await(fv) + result = cast[foreign ptr T](fv.data) -proc `^`*[T](prom: Promise[T]): T = +proc `^`*[T](fv: FlowVar[T]): T = ## blocks until the value is available and then returns this value. - await(prom) + await(fv) when T is string or T is seq: - result = cast[T](prom.data) + result = cast[T](fv.data) else: - result = prom.blob + result = fv.blob -proc awaitAny*(promises: openArray[RawPromise]): int = - ## awaits any of the given promises. Returns the index of one promise for - ## which a value arrived. A promise only supports one call to 'awaitAny' at +proc awaitAny*(flowVars: openArray[RawFlowVar]): int = + ## awaits any of the given flowVars. Returns the index of one flowVar for + ## which a value arrived. A flowVar only supports one call to 'awaitAny' at ## the same time. That means if you await([a,b]) and await([b,c]) the second - ## call will only await 'c'. If there is no promise left to be able to wait + ## call will only await 'c'. If there is no flowVar left to be able to wait ## on, -1 is returned. ## **Note**: This results in non-deterministic behaviour and so should be ## avoided. var ai: AwaitInfo ai.cv = createCondVar() var conflicts = 0 - for i in 0 .. promises.high: - if cas(addr promises[i].ai, nil, addr ai): - promises[i].idx = i + for i in 0 .. flowVars.high: + if cas(addr flowVars[i].ai, nil, addr ai): + flowVars[i].idx = i else: inc conflicts - if conflicts < promises.len: + if conflicts < flowVars.len: await(ai.cv) result = ai.idx - for i in 0 .. promises.high: - discard cas(addr promises[i].ai, addr ai, nil) + for i in 0 .. flowVars.high: + discard cas(addr flowVars[i].ai, addr ai, nil) else: result = -1 destroyCondVar(ai.cv) @@ -259,7 +259,7 @@ proc slave(w: ptr Worker) {.thread.} = await(w.taskArrived) assert(not w.ready) w.f(w, w.data) - if w.q.len != 0: w.cleanPromises + if w.q.len != 0: w.cleanFlowVars if w.shutdown: w.shutdown = false atomicDec currentPoolSize @@ -300,7 +300,7 @@ proc spawn*(call: expr): expr {.magic: "Spawn".} ## always spawns a new task, so that the 'call' is never executed on ## the calling thread. 'call' has to be proc call 'p(...)' where 'p' ## is gcsafe and has a return type that is either 'void' or compatible - ## with ``Promise[T]``. + ## with ``FlowVar[T]``. template spawnX*(call: expr): expr = ## spawns a new task if a CPU core is ready, otherwise executes the @@ -308,7 +308,7 @@ template spawnX*(call: expr): expr = ## use 'spawn' in order to not block the producer for an unknown ## amount of time. 'call' has to be proc call 'p(...)' where 'p' ## is gcsafe and has a return type that is either 'void' or compatible - ## with ``Promise[T]``. + ## with ``FlowVar[T]``. (if preferSpawn(): spawn call else: call) proc parallel*(body: stmt) {.magic: "Parallel".} |