From 6195dbe491ccd864c5dcb59f87826291ac1f1ff4 Mon Sep 17 00:00:00 2001 From: Araq Date: Mon, 12 May 2014 11:12:37 +0200 Subject: initial non-compiling version of 'parallel' --- lib/system/atomics.nim | 31 ++++++++++++++++++++++++++++++- lib/system/sysspawn.nim | 47 +++++++++++++++++++++++------------------------ 2 files changed, 53 insertions(+), 25 deletions(-) (limited to 'lib/system') diff --git a/lib/system/atomics.nim b/lib/system/atomics.nim index b1a96b209..c6c603b19 100644 --- a/lib/system/atomics.nim +++ b/lib/system/atomics.nim @@ -1,13 +1,14 @@ # # # Nimrod's Runtime Library -# (c) Copyright 2012 Andreas Rumpf +# (c) Copyright 2014 Andreas Rumpf # # See the file "copying.txt", included in this # distribution, for details about the copyright. # ## Atomic operations for Nimrod. +{.push stackTrace:off.} when (defined(gcc) or defined(llvm_gcc)) and hasThreadSupport: type @@ -203,3 +204,31 @@ proc atomicDec*(memLoc: var int, x: int = 1): int = else: dec(memLoc, x) result = memLoc + +when defined(windows) and not defined(gcc): + proc interlockedCompareExchange(p: pointer; exchange, comparand: int32): int32 + {.importc: "InterlockedCompareExchange", header: "", cdecl.} + + proc cas*[T: bool|int](p: ptr T; oldValue, newValue: T): bool = + interlockedCompareExchange(p, newValue.int32, oldValue.int32) != 0 + +else: + # this is valid for GCC and Intel C++ + proc cas*[T: bool|int](p: ptr T; oldValue, newValue: T): bool + {.importc: "__sync_bool_compare_and_swap", nodecl.} + # XXX is this valid for 'int'? + + +when (defined(x86) or defined(amd64)) and defined(gcc): + proc cpuRelax {.inline.} = + {.emit: """asm volatile("pause" ::: "memory");""".} +elif (defined(x86) or defined(amd64)) and defined(vcc): + proc cpuRelax {.importc: "YieldProcessor", header: "".} +elif defined(intelc): + proc cpuRelax {.importc: "_mm_pause", header: "xmmintrin.h".} +elif false: + from os import sleep + + proc cpuRelax {.inline.} = os.sleep(1) + +{.pop.} diff --git a/lib/system/sysspawn.nim b/lib/system/sysspawn.nim index dabf35a3e..95cdba65d 100644 --- a/lib/system/sysspawn.nim +++ b/lib/system/sysspawn.nim @@ -14,30 +14,6 @@ when not defined(NimString): {.push stackTrace:off.} -when (defined(x86) or defined(amd64)) and defined(gcc): - proc cpuRelax {.inline.} = - {.emit: """asm volatile("pause" ::: "memory");""".} -elif (defined(x86) or defined(amd64)) and defined(vcc): - proc cpuRelax {.importc: "YieldProcessor", header: "".} -elif defined(intelc): - proc cpuRelax {.importc: "_mm_pause", header: "xmmintrin.h".} -elif false: - from os import sleep - - proc cpuRelax {.inline.} = os.sleep(1) - -when defined(windows) and not defined(gcc): - proc interlockedCompareExchange(p: pointer; exchange, comparand: int32): int32 - {.importc: "InterlockedCompareExchange", header: "", cdecl.} - - proc cas(p: ptr bool; oldValue, newValue: bool): bool = - interlockedCompareExchange(p, newValue.int32, oldValue.int32) != 0 - -else: - # this is valid for GCC and Intel C++ - proc cas(p: ptr bool; oldValue, newValue: bool): bool - {.importc: "__sync_bool_compare_and_swap", nodecl.} - # We declare our own condition variables here to get rid of the dummy lock # on Windows: @@ -54,6 +30,9 @@ proc createCondVar(): CondVar = initSysLock(result.stupidLock) #acquireSys(result.stupidLock) +proc destroyCondVar(c: var CondVar) {.inline.} = + deinitSysCond(c.c) + proc await(cv: var CondVar) = when defined(posix): acquireSys(cv.stupidLock) @@ -100,6 +79,26 @@ proc signal(cv: var FastCondVar) = #if cas(addr cv.slowPath, true, false): signal(cv.slow) +type + Barrier* {.compilerProc.} = object + counter: int + cv: CondVar + +proc barrierEnter*(b: ptr Barrier) {.compilerProc.} = + atomicInc b.counter + +proc barrierLeave*(b: ptr Barrier) {.compilerProc.} = + atomicDec b.counter + if b.counter <= 0: signal(b.cv) + +proc openBarrier*(b: ptr Barrier) {.compilerProc.} = + b.counter = 0 + b.cv = createCondVar() + +proc closeBarrier*(b: ptr Barrier) {.compilerProc.} = + await(b.cv) + destroyCondVar(b.cv) + {.pop.} # ---------------------------------------------------------------------------- -- cgit 1.4.1-2-gfad0 From 417b9f5a1d13f26842b1337395a0f5b57827cc12 Mon Sep 17 00:00:00 2001 From: Araq Date: Thu, 22 May 2014 08:41:50 +0200 Subject: 'parallel' statement almost working --- compiler/ccgexprs.nim | 2 +- compiler/guards.nim | 58 ++++--- compiler/lowerings.nim | 286 ++++++++++++++++++++++++++----- compiler/semmagic.nim | 12 +- compiler/semparallel.nim | 89 ++++++---- doc/manual.txt | 2 +- lib/pure/concurrency/threadpool.nim | 112 ++++++++++++ lib/system/atomics.nim | 6 +- tests/parallel/tdisjoint_slice1.nim | 16 +- tests/parallel/tinvalid_array_bounds.nim | 2 +- 10 files changed, 470 insertions(+), 115 deletions(-) (limited to 'lib/system') diff --git a/compiler/ccgexprs.nim b/compiler/ccgexprs.nim index 7fb6af896..34fdf5bf1 100644 --- a/compiler/ccgexprs.nim +++ b/compiler/ccgexprs.nim @@ -1636,7 +1636,7 @@ proc genMagicExpr(p: BProc, e: PNode, d: var TLoc, op: TMagic) = of mSlurp..mQuoteAst: localError(e.info, errXMustBeCompileTime, e.sons[0].sym.name.s) of mSpawn: - let n = lowerings.wrapProcForSpawn(p.module.module, e.sons[1]) + let n = lowerings.wrapProcForSpawn(p.module.module, e[1], e.typ, nil, nil) expr(p, n, d) of mParallel: let n = semparallel.liftParallel(p.module.module, e) diff --git a/compiler/guards.nim b/compiler/guards.nim index de0ce1dcc..3df3bd1a8 100644 --- a/compiler/guards.nim +++ b/compiler/guards.nim @@ -672,12 +672,8 @@ proc simpleSlice*(a, b: PNode): BiggestInt = else: result = -1 -proc proveLe*(m: TModel; a, b: PNode): TImplication = - let res = canon(opLe.buildCall(a, b)) - #echo renderTree(res) - # we hardcode lots of axioms here: - let a = res[1] - let b = res[2] +proc ple(m: TModel; a, b: PNode): TImplication = + template `<=?`(a,b): expr = ple(m,a,b) == impYes # 0 <= 3 if a.isValue and b.isValue: return if leValue(a, b): impYes else: impNo @@ -692,26 +688,46 @@ proc proveLe*(m: TModel; a, b: PNode): TImplication = # x <= x if sameTree(a, b): return impYes - # x <= x+c iff 0 <= c - if b.getMagic in someAdd and sameTree(a, b[1]): - return proveLe(m, zero(), b[2]) + # 0 <= x.len + if b.getMagic in someLen and a.isValue: + if a.intVal <= 0: return impYes + + # x <= y+c if 0 <= c and x <= y + if b.getMagic in someAdd and zero() <=? b[2] and a <=? b[1]: return impYes + + # x+c <= y if c <= 0 and x <= y + if a.getMagic in someAdd and a[2] <=? zero() and a[1] <=? b: return impYes - # x+c <= x iff c <= 0 - if a.getMagic in someAdd and sameTree(b, a[1]): - return proveLe(m, a[2], zero()) + # x <= y*c if 1 <= c and x <= y and 0 <= y + if b.getMagic in someMul: + if a <=? b[1] and one() <=? b[2] and zero() <=? b[1]: return impYes - # x <= x*c if 1 <= c and 0 <= x: - if b.getMagic in someMul and sameTree(a, b[1]): - if proveLe(m, one(), b[2]) == impYes and proveLe(m, zero(), a) == impYes: - return impYes + # x div c <= y if 1 <= c and 0 <= y and x <= y: + if a.getMagic in someDiv: + if one() <=? a[2] and zero() <=? b and a[1] <=? b: return impYes - # x div c <= x if 1 <= c and 0 <= x: - if a.getMagic in someDiv and sameTree(a[1], b): - if proveLe(m, one(), a[2]) == impYes and proveLe(m, zero(), b) == impYes: - return impYes + # slightly subtle: + # x <= max(y, z) iff x <= y or x <= z + # note that 'x <= max(x, z)' is a special case of the above rule + if b.getMagic in someMax: + if a <=? b[1] or a <=? b[2]: return impYes + + # min(x, y) <= z iff x <= z or y <= z + if a.getMagic in someMin: + if a[1] <=? b or a[2] <=? b: return impYes # use the knowledge base: - return doesImply(m, res) + return doesImply(m, opLe.buildCall(a, b)) + +proc proveLe*(m: TModel; a, b: PNode): TImplication = + #echo "ROOT ", renderTree(a), " <=? ", b.rendertree + let x = canon(opLe.buildCall(a, b)) + #echo renderTree(res) + result = ple(m, x[1], x[2]) + if result == impUnknown: + # try an alternative: a <= b iff not (b < a) iff not (b+1 <= a): + let y = canon(opLe.buildCall(opAdd.buildCall(b, one()), a)) + result = ~ple(m, y[1], y[2]) proc addFactLe*(m: var TModel; a, b: PNode) = m.add canon(opLe.buildCall(a, b)) diff --git a/compiler/lowerings.nim b/compiler/lowerings.nim index 704cfbcdd..2a1a8e577 100644 --- a/compiler/lowerings.nim +++ b/compiler/lowerings.nim @@ -13,6 +13,8 @@ const genPrefix* = ":tmp" # prefix for generated names import ast, astalgo, types, idents, magicsys, msgs, options +from guards import createMagic +from trees import getMagic proc newTupleAccess*(tup: PNode, i: int): PNode = result = newNodeIT(nkBracketExpr, tup.info, tup.typ.skipTypes( @@ -80,19 +82,23 @@ proc newDotExpr(obj, b: PSym): PNode = addSon(result, newSymNode(field)) result.typ = field.typ -proc indirectAccess*(a: PNode, b: PSym, info: TLineInfo): PNode = +proc indirectAccess*(a: PNode, b: string, info: TLineInfo): PNode = # returns a[].b as a node var deref = newNodeI(nkHiddenDeref, info) - deref.typ = a.typ.sons[0] + deref.typ = a.typ.skipTypes(abstractInst).sons[0] assert deref.typ.kind == tyObject - let field = getSymFromList(deref.typ.n, getIdent(b.name.s & $b.id)) - assert field != nil, b.name.s + let field = getSymFromList(deref.typ.n, getIdent(b)) + assert field != nil, b addSon(deref, a) result = newNodeI(nkDotExpr, info) addSon(result, deref) addSon(result, newSymNode(field)) result.typ = field.typ +proc indirectAccess*(a: PNode, b: PSym, info: TLineInfo): PNode = + # returns a[].b as a node + result = indirectAccess(a, b.name.s & $b.id, info) + proc indirectAccess*(a, b: PSym, info: TLineInfo): PNode = result = indirectAccess(newSymNode(a), b, info) @@ -102,6 +108,11 @@ proc genAddrOf*(n: PNode): PNode = result.typ = newType(tyPtr, n.typ.owner) result.typ.rawAddSon(n.typ) +proc genDeref*(n: PNode): PNode = + result = newNodeIT(nkHiddenDeref, n.info, + n.typ.skipTypes(abstractInst).sons[0]) + result.add n + proc callCodegenProc*(name: string, arg1: PNode; arg2, arg3: PNode = nil): PNode = result = newNodeI(nkCall, arg1.info) @@ -114,14 +125,83 @@ proc callCodegenProc*(name: string, arg1: PNode; if arg2 != nil: result.add arg2 if arg3 != nil: result.add arg3 +# we have 4 cases to consider: +# - a void proc --> nothing to do +# - a proc returning GC'ed memory --> requires a future +# - a proc returning non GC'ed memory --> pass as hidden 'var' parameter +# - not in a parallel environment --> requires a future for memory safety +type + TSpawnResult = enum + srVoid, srFuture, srByVar + TFutureKind = enum + futInvalid # invalid type T for 'Future[T]' + futGC # Future of a GC'ed type + futBlob # Future of a blob type + +proc spawnResult(t: PType; inParallel: bool): TSpawnResult = + if t.isEmptyType: srVoid + elif inParallel and not containsGarbageCollectedRef(t): srByVar + else: srFuture + +proc futureKind(t: PType): TFutureKind = + if t.skipTypes(abstractInst).kind in {tyRef, tyString, tySequence}: futGC + elif containsGarbageCollectedRef(t): futInvalid + else: futBlob + +discard """ +We generate roughly this: + +proc f_wrapper(args) = + var a = args.a # copy strings/seqs; thread transfer; not generated for + # the 'parallel' statement + var b = args.b + + args.fut = createFuture(thread, sizeof(T)) # optional + nimArgsPassingDone() # signal parent that the work is done + args.fut.blob = f(a, b, ...) + # - or - + f(a, b, ...) + +stmtList: + var scratchObj + scratchObj.a = a + scratchObj.b = b + + nimSpawn(f_wrapper, addr scratchObj) + scratchObj.fut # optional + +""" + +proc createNimCreateFutureCall(fut, threadParam: PNode): PNode = + let size = newNodeIT(nkCall, fut.info, getSysType(tyInt)) + size.add newSymNode(createMagic("sizeof", mSizeOf)) + assert fut.typ.kind == tyGenericInst + size.add newNodeIT(nkType, fut.info, fut.typ.sons[1]) + + let castExpr = newNodeIT(nkCast, fut.info, fut.typ) + castExpr.add emptyNode + castExpr.add callCodeGenProc("nimCreateFuture", threadParam, size) + result = newFastAsgnStmt(fut, castExpr) + proc createWrapperProc(f: PNode; threadParam, argsParam: PSym; - varSection, call, barrier: PNode): PSym = + varSection, call, barrier, fut: PNode): PSym = var body = newNodeI(nkStmtList, f.info) body.add varSection if barrier != nil: body.add callCodeGenProc("barrierEnter", barrier) - body.add callCodeGenProc("nimArgsPassingDone", newSymNode(threadParam)) - body.add call + if fut != nil: + body.add createNimCreateFutureCall(fut, threadParam.newSymNode) + if barrier == nil: + body.add callCodeGenProc("nimFutureCreateCondVar", fut) + + body.add callCodeGenProc("nimArgsPassingDone", threadParam.newSymNode) + if fut != nil: + body.add newAsgnStmt(indirectAccess(fut, + if fut.typ.futureKind==futGC: "data" else: "blob", fut.info), call) + if barrier == nil: + body.add callCodeGenProc("nimFutureSignal", fut) + else: + body.add call if barrier != nil: body.add callCodeGenProc("barrierLeave", barrier) @@ -151,10 +231,148 @@ proc createCastExpr(argsParam: PSym; objType: PType): PNode = result.typ = newType(tyPtr, objType.owner) result.typ.rawAddSon(objType) -proc wrapProcForSpawn*(owner: PSym; n: PNode; barrier: PNode = nil): PNode = - result = newNodeI(nkStmtList, n.info) - if n.kind notin nkCallKinds or not n.typ.isEmptyType: - localError(n.info, "'spawn' takes a call expression of type void") +proc setupArgsForConcurrency(n: PNode; objType: PType; scratchObj: PSym, + castExpr, call, varSection, result: PNode) = + let formals = n[0].typ.n + let tmpName = getIdent(genPrefix) + for i in 1 .. 16) and + n.getRoot != nil: + # it is more efficient to pass a pointer instead: + let a = genAddrOf(n) + field.typ = a.typ + objType.addField(field) + result.add newFastAsgnStmt(newDotExpr(scratchObj, field), a) + call.add(genDeref(indirectAccess(castExpr, field, n.info))) + else: + # boring case + field.typ = argType + objType.addField(field) + result.add newFastAsgnStmt(newDotExpr(scratchObj, field), n) + call.add(indirectAccess(castExpr, field, n.info)) + +proc wrapProcForSpawn*(owner: PSym; n: PNode; retType: PType; + barrier, dest: PNode = nil): PNode = + # if 'barrier' != nil, then it is in a 'parallel' section and we + # generate quite different code + let spawnKind = spawnResult(retType, barrier!=nil) + case spawnKind + of srVoid: + internalAssert dest == nil + result = newNodeI(nkStmtList, n.info) + of srFuture: + internalAssert dest == nil + result = newNodeIT(nkStmtListExpr, n.info, retType) + of srByVar: + if dest == nil: localError(n.info, "'spawn' must not be discarded") + result = newNodeI(nkStmtList, n.info) + + if n.kind notin nkCallKinds: + localError(n.info, "'spawn' takes a call expression") return if optThreadAnalysis in gGlobalOptions: if {tfThread, tfNoSideEffect} * n[0].typ.flags == {}: @@ -180,7 +398,7 @@ proc wrapProcForSpawn*(owner: PSym; n: PNode; barrier: PNode = nil): PNode = varSectionB.addVar(scratchObj.newSymNode) result.add varSectionB - var call = newNodeI(nkCall, n.info) + var call = newNodeIT(nkCall, n.info, n.typ) var fn = n.sons[0] # templates and macros are in fact valid here due to the nature of # the transformation: @@ -200,34 +418,10 @@ proc wrapProcForSpawn*(owner: PSym; n: PNode; barrier: PNode = nil): PNode = call.add(fn) var varSection = newNodeI(nkVarSection, n.info) - let formals = n[0].typ.n - let tmpName = getIdent(genPrefix) - for i in 1 .. f shouldn't have side effects anyway # - passed arrays need to be ensured not to alias # - passed slices need to be ensured to be disjoint (+) -# - output slices need special logic +# - output slices need special logic (+) import ast, astalgo, idents, lowerings, magicsys, guards, sempass2, msgs, @@ -94,23 +94,6 @@ proc getSlot(c: var AnalysisCtx; v: PSym): ptr MonotonicVar = c.locals[L].v = v return addr(c.locals[L]) -proc getRoot(n: PNode): PSym = - ## ``getRoot`` takes a *path* ``n``. A path is an lvalue expression - ## like ``obj.x[i].y``. The *root* of a path is the symbol that can be - ## determined as the owner; ``obj`` in the example. - case n.kind - of nkSym: - if n.sym.kind in {skVar, skResult, skTemp, skLet, skForVar}: - result = n.sym - of nkDotExpr, nkBracketExpr, nkHiddenDeref, nkDerefExpr, - nkObjUpConv, nkObjDownConv, nkCheckedFieldExpr: - result = getRoot(n.sons[0]) - of nkHiddenStdConv, nkHiddenSubConv, nkConv: - result = getRoot(n.sons[1]) - of nkCallKinds: - if getMagic(n) == mSlice: result = getRoot(n.sons[1]) - else: discard - proc gatherArgs(c: var AnalysisCtx; n: PNode) = for i in 0.. = 0 and c.locals[s].stride != nil: @@ -193,6 +174,20 @@ proc stride(c: AnalysisCtx; n: PNode): BiggestInt = else: for i in 0 .. = 0 and c.locals[s].stride != nil: + result = n +@ c.locals[s].stride.intVal + else: + result = n + elif n.safeLen > 0: + result = shallowCopy(n) + for i in 0 .. 1: addFact(c.guards, branch.sons[0]) - #setLen(c.locals, oldState) for i in 0 .. 0: - result = shallowCopy(n) - for i in 0 .. < n.len: - result.sons[i] = transformSpawn(owner, n.sons[i], barrier) + result = transformSpawnSons(owner, n, barrier) else: result = n @@ -440,3 +452,4 @@ proc liftParallel*(owner: PSym; n: PNode): PNode = result.add callCodeGenProc("openBarrier", barrier) result.add transformSpawn(owner, body, barrier) result.add callCodeGenProc("closeBarrier", barrier) + diff --git a/doc/manual.txt b/doc/manual.txt index 39e2bad2a..b2e008969 100644 --- a/doc/manual.txt +++ b/doc/manual.txt @@ -2748,7 +2748,7 @@ The following builtin procs cannot be overloaded for reasons of implementation simplicity (they require specialized semantic checking):: defined, definedInScope, compiles, low, high, sizeOf, - is, of, echo, shallowCopy, getAst + is, of, echo, shallowCopy, getAst, spawn Thus they act more like keywords than like ordinary identifiers; unlike a keyword however, a redefinition may `shadow`:idx: the definition in diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim index 86819d25a..583c60c66 100644 --- a/lib/pure/concurrency/threadpool.nim +++ b/lib/pure/concurrency/threadpool.nim @@ -65,6 +65,30 @@ proc closeBarrier*(b: ptr Barrier) {.compilerProc.} = # ---------------------------------------------------------------------------- type + AwaitInfo = object + cv: CondVar + idx: int + + RawFuture* = ptr RawFutureObj ## untyped base class for 'Future[T]' + RawFutureObj {.inheritable.} = object # \ + # we allocate this with the thread local allocator; this + # is possible since we already need to do the GC_unref + # on the owning thread + ready, usesCondVar: bool + cv: CondVar #\ + # for 'awaitAny' support + ai: ptr AwaitInfo + idx: int + data: PObject # we incRef and unref it to keep it alive + owner: ptr Worker + next: RawFuture + align: float64 # a float for proper alignment + + Future* {.compilerProc.} [T] = ptr object of RawFutureObj + blob: T ## the underlying value, if available. Note that usually + ## you should not access this field directly! However it can + ## sometimes be more efficient than getting the value via ``^``. + WorkerProc = proc (thread, args: pointer) {.nimcall, gcsafe.} Worker = object taskArrived: CondVar @@ -75,6 +99,92 @@ type ready: bool # put it here for correct alignment! initialized: bool # whether it has even been initialized shutdown: bool # the pool requests to shut down this worker thread + futureLock: TLock + head: RawFuture + +proc finished*(fut: RawFuture) = + ## This MUST be called for every created future to free its associated + ## resources. Note that the default reading operation ``^`` is destructive + ## and calls ``finished``. + doAssert fut.ai.isNil, "future is still attached to an 'awaitAny'" + assert fut.next == nil + let w = fut.owner + acquire(w.futureLock) + fut.next = w.head + w.head = fut + release(w.futureLock) + +proc cleanFutures(w: ptr Worker) = + var it = w.head + acquire(w.futureLock) + while it != nil: + let nxt = it.next + if it.usesCondVar: destroyCondVar(it.cv) + if it.data != nil: GC_unref(it.data) + dealloc(it) + it = nxt + w.head = nil + release(w.futureLock) + +proc nimCreateFuture(owner: pointer; blobSize: int): RawFuture {. + compilerProc.} = + result = cast[RawFuture](alloc0(RawFutureObj.sizeof + blobSize)) + result.owner = cast[ptr Worker](owner) + +proc nimFutureCreateCondVar(fut: RawFuture) {.compilerProc.} = + fut.cv = createCondVar() + fut.usesCondVar = true + +proc nimFutureSignal(fut: RawFuture) {.compilerProc.} = + assert fut.usesCondVar + signal(fut.cv) + +proc await*[T](fut: Future[T]) = + ## waits until the value for the future arrives. + if fut.usesCondVar: await(fut.cv) + +proc `^`*[T](fut: Future[T]): T = + ## blocks until the value is available and then returns this value. Note + ## this reading is destructive for reasons of efficiency and convenience. + ## This calls ``finished(fut)``. + await(fut) + when T is string or T is seq or T is ref: + result = cast[T](fut.data) + else: + result = fut.payload + finished(fut) + +proc notify*(fut: RawFuture) {.compilerproc.} = + if fut.ai != nil: + acquire(fut.ai.cv.L) + fut.ai.idx = fut.idx + inc fut.ai.cv.counter + release(fut.ai.cv.L) + signal(fut.ai.cv.c) + if fut.usesCondVar: signal(fut.cv) + +proc awaitAny*(futures: openArray[RawFuture]): int = + # awaits any of the given futures. Returns the index of one future for which + ## a value arrived. A future only supports one call to 'awaitAny' at the + ## same time. That means if you await([a,b]) and await([b,c]) the second + ## call will only await 'c'. If there is no future left to be able to wait + ## on, -1 is returned. + var ai: AwaitInfo + ai.cv = createCondVar() + var conflicts = 0 + for i in 0 .. futures.high: + if cas(addr futures[i].ai, nil, addr ai): + futures[i].idx = i + else: + inc conflicts + if conflicts < futures.len: + await(ai.cv) + result = ai.idx + for i in 0 .. futures.high: + discard cas(addr futures[i].ai, addr ai, nil) + else: + result = -1 + destroyCondVar(ai.cv) proc nimArgsPassingDone(p: pointer) {.compilerProc.} = let w = cast[ptr Worker](p) @@ -99,6 +209,7 @@ proc slave(w: ptr Worker) {.thread.} = await(w.taskArrived) assert(not w.ready) w.f(w, w.data) + if w.head != nil: w.cleanFutures if w.shutdown: w.shutdown = false atomicDec currentPoolSize @@ -119,6 +230,7 @@ var proc activateThread(i: int) {.noinline.} = workersData[i].taskArrived = createCondVar() workersData[i].taskStarted = createCondVar() + initLock workersData[i].futureLock workersData[i].initialized = true createThread(workers[i], slave, addr(workersData[i])) diff --git a/lib/system/atomics.nim b/lib/system/atomics.nim index c6c603b19..96246ba01 100644 --- a/lib/system/atomics.nim +++ b/lib/system/atomics.nim @@ -209,12 +209,12 @@ when defined(windows) and not defined(gcc): proc interlockedCompareExchange(p: pointer; exchange, comparand: int32): int32 {.importc: "InterlockedCompareExchange", header: "", cdecl.} - proc cas*[T: bool|int](p: ptr T; oldValue, newValue: T): bool = + proc cas*[T: bool|int|ptr](p: ptr T; oldValue, newValue: T): bool = interlockedCompareExchange(p, newValue.int32, oldValue.int32) != 0 - + # XXX fix for 64 bit build else: # this is valid for GCC and Intel C++ - proc cas*[T: bool|int](p: ptr T; oldValue, newValue: T): bool + proc cas*[T: bool|int|ptr](p: ptr T; oldValue, newValue: T): bool {.importc: "__sync_bool_compare_and_swap", nodecl.} # XXX is this valid for 'int'? diff --git a/tests/parallel/tdisjoint_slice1.nim b/tests/parallel/tdisjoint_slice1.nim index 2ca96d6ae..c1d0e52f8 100644 --- a/tests/parallel/tdisjoint_slice1.nim +++ b/tests/parallel/tdisjoint_slice1.nim @@ -1,20 +1,20 @@ +discard """ + outputsub: "EVEN 28" +""" import threadpool -proc f(a: openArray[int]) = - for x in a: echo x - -proc f(a: int) = echo a +proc odd(a: int) = echo "ODD ", a +proc even(a: int) = echo "EVEN ", a proc main() = var a: array[0..30, int] + for i in low(a)..high(a): a[i] = i parallel: - #spawn f(a[0..15]) - #spawn f(a[16..30]) var i = 0 while i <= 29: - spawn f(a[i]) - spawn f(a[i+1]) + spawn even(a[i]) + spawn odd(a[i+1]) inc i, 2 # is correct here diff --git a/tests/parallel/tinvalid_array_bounds.nim b/tests/parallel/tinvalid_array_bounds.nim index 337fae729..4c6065fd6 100644 --- a/tests/parallel/tinvalid_array_bounds.nim +++ b/tests/parallel/tinvalid_array_bounds.nim @@ -1,5 +1,5 @@ discard """ - errormsg: "cannot prove: i + 1 <= 30" + errormsg: "can prove: i + 1 > 30" line: 21 """ -- cgit 1.4.1-2-gfad0 From 030eac86c05427792d3c3c00b56fbe764d783a40 Mon Sep 17 00:00:00 2001 From: Araq Date: Sun, 25 May 2014 15:19:46 +0200 Subject: bugfix: regionized pointers in a generic context; renamed 'Future' to 'Promise' --- compiler/ast.nim | 2 + compiler/lowerings.nim | 88 +++++++++++------------ compiler/semexprs.nim | 8 +-- compiler/semtypes.nim | 8 +-- lib/pure/concurrency/threadpool.nim | 134 +++++++++++++++++++++--------------- lib/system.nim | 4 +- lib/system/assign.nim | 3 +- 7 files changed, 138 insertions(+), 109 deletions(-) (limited to 'lib/system') diff --git a/compiler/ast.nim b/compiler/ast.nim index c47407ee2..c3cb63df4 100644 --- a/compiler/ast.nim +++ b/compiler/ast.nim @@ -885,6 +885,8 @@ const nkCallKinds* = {nkCall, nkInfix, nkPrefix, nkPostfix, nkCommand, nkCallStrLit, nkHiddenCallConv} + nkIdentKinds* = {nkIdent, nkSym, nkAccQuoted, nkOpenSymChoice, + nkClosedSymChoice} nkLiterals* = {nkCharLit..nkTripleStrLit} nkLambdaKinds* = {nkLambda, nkDo} diff --git a/compiler/lowerings.nim b/compiler/lowerings.nim index 047bdf832..13d4bf60e 100644 --- a/compiler/lowerings.nim +++ b/compiler/lowerings.nim @@ -134,26 +134,26 @@ proc callCodegenProc*(name: string, arg1: PNode; # we have 4 cases to consider: # - a void proc --> nothing to do -# - a proc returning GC'ed memory --> requires a future +# - a proc returning GC'ed memory --> requires a promise # - a proc returning non GC'ed memory --> pass as hidden 'var' parameter -# - not in a parallel environment --> requires a future for memory safety +# - not in a parallel environment --> requires a promise for memory safety type TSpawnResult = enum - srVoid, srFuture, srByVar - TFutureKind = enum - futInvalid # invalid type T for 'Future[T]' - futGC # Future of a GC'ed type - futBlob # Future of a blob type + srVoid, srPromise, srByVar + TPromiseKind = enum + promInvalid # invalid type T for 'Promise[T]' + promGC # Promise of a GC'ed type + promBlob # Promise of a blob type proc spawnResult(t: PType; inParallel: bool): TSpawnResult = if t.isEmptyType: srVoid elif inParallel and not containsGarbageCollectedRef(t): srByVar - else: srFuture + else: srPromise -proc futureKind(t: PType): TFutureKind = - if t.skipTypes(abstractInst).kind in {tyRef, tyString, tySequence}: futGC - elif containsGarbageCollectedRef(t): futInvalid - else: futBlob +proc promiseKind(t: PType): TPromiseKind = + if t.skipTypes(abstractInst).kind in {tyRef, tyString, tySequence}: promGC + elif containsGarbageCollectedRef(t): promInvalid + else: promBlob discard """ We generate roughly this: @@ -164,12 +164,12 @@ proc f_wrapper(args) = # the 'parallel' statement var b = args.b - args.fut = nimCreateFuture(thread, sizeof(T)) # optional - nimFutureCreateCondVar(args.fut) # optional + args.prom = nimCreatePromise(thread, sizeof(T)) # optional + nimPromiseCreateCondVar(args.prom) # optional nimArgsPassingDone() # signal parent that the work is done # - args.fut.blob = f(a, b, ...) - nimFutureSignal(args.fut) + args.prom.blob = f(a, b, ...) + nimPromiseSignal(args.prom) # - or - f(a, b, ...) @@ -181,42 +181,42 @@ stmtList: scratchObj.b = b nimSpawn(f_wrapper, addr scratchObj) - scratchObj.fut # optional + scratchObj.prom # optional """ -proc createNimCreateFutureCall(fut, threadParam: PNode): PNode = - let size = newNodeIT(nkCall, fut.info, getSysType(tyInt)) +proc createNimCreatePromiseCall(prom, threadParam: PNode): PNode = + let size = newNodeIT(nkCall, prom.info, getSysType(tyInt)) size.add newSymNode(createMagic("sizeof", mSizeOf)) - assert fut.typ.kind == tyGenericInst - size.add newNodeIT(nkType, fut.info, fut.typ.sons[1]) + assert prom.typ.kind == tyGenericInst + size.add newNodeIT(nkType, prom.info, prom.typ.sons[1]) - let castExpr = newNodeIT(nkCast, fut.info, fut.typ) + let castExpr = newNodeIT(nkCast, prom.info, prom.typ) castExpr.add emptyNode - castExpr.add callCodeGenProc("nimCreateFuture", threadParam, size) - result = newFastAsgnStmt(fut, castExpr) + castExpr.add callCodeGenProc("nimCreatePromise", threadParam, size) + result = newFastAsgnStmt(prom, castExpr) proc createWrapperProc(f: PNode; threadParam, argsParam: PSym; - varSection, call, barrier, fut: PNode): PSym = + varSection, call, barrier, prom: PNode): PSym = var body = newNodeI(nkStmtList, f.info) body.add varSection if barrier != nil: body.add callCodeGenProc("barrierEnter", barrier) - if fut != nil: - body.add createNimCreateFutureCall(fut, threadParam.newSymNode) + if prom != nil: + body.add createNimCreatePromiseCall(prom, threadParam.newSymNode) if barrier == nil: - body.add callCodeGenProc("nimFutureCreateCondVar", fut) + body.add callCodeGenProc("nimPromiseCreateCondVar", prom) body.add callCodeGenProc("nimArgsPassingDone", threadParam.newSymNode) - if fut != nil: - let fk = fut.typ.sons[1].futureKind - if fk == futInvalid: - localError(f.info, "cannot create a future of type: " & - typeToString(fut.typ.sons[1])) - body.add newAsgnStmt(indirectAccess(fut, - if fk == futGC: "data" else: "blob", fut.info), call) + if prom != nil: + let fk = prom.typ.sons[1].promiseKind + if fk == promInvalid: + localError(f.info, "cannot create a promise of type: " & + typeToString(prom.typ.sons[1])) + body.add newAsgnStmt(indirectAccess(prom, + if fk == promGC: "data" else: "blob", prom.info), call) if barrier == nil: - body.add callCodeGenProc("nimFutureSignal", fut) + body.add callCodeGenProc("nimPromiseSignal", prom) else: body.add call if barrier != nil: @@ -381,7 +381,7 @@ proc wrapProcForSpawn*(owner: PSym; n: PNode; retType: PType; of srVoid: internalAssert dest == nil result = newNodeI(nkStmtList, n.info) - of srFuture: + of srPromise: internalAssert dest == nil result = newNodeIT(nkStmtListExpr, n.info, retType) of srByVar: @@ -450,17 +450,17 @@ proc wrapProcForSpawn*(owner: PSym; n: PNode; retType: PType; result.add newFastAsgnStmt(newDotExpr(scratchObj, field), barrier) barrierAsExpr = indirectAccess(castExpr, field, n.info) - var futField, futAsExpr: PNode = nil - if spawnKind == srFuture: - var field = newSym(skField, getIdent"fut", owner, n.info) + var promField, promAsExpr: PNode = nil + if spawnKind == srPromise: + var field = newSym(skField, getIdent"prom", owner, n.info) field.typ = retType objType.addField(field) - futField = newDotExpr(scratchObj, field) - futAsExpr = indirectAccess(castExpr, field, n.info) + promField = newDotExpr(scratchObj, field) + promAsExpr = indirectAccess(castExpr, field, n.info) let wrapper = createWrapperProc(fn, threadParam, argsParam, varSection, call, - barrierAsExpr, futAsExpr) + barrierAsExpr, promAsExpr) result.add callCodeGenProc("nimSpawn", wrapper.newSymNode, genAddrOf(scratchObj.newSymNode)) - if spawnKind == srFuture: result.add futField + if spawnKind == srPromise: result.add promField diff --git a/compiler/semexprs.nim b/compiler/semexprs.nim index 4e3d2f3ce..8f4cce547 100644 --- a/compiler/semexprs.nim +++ b/compiler/semexprs.nim @@ -1579,9 +1579,9 @@ proc semShallowCopy(c: PContext, n: PNode, flags: TExprFlags): PNode = else: result = semDirectOp(c, n, flags) -proc createFuture(c: PContext; t: PType; info: TLineInfo): PType = +proc createPromise(c: PContext; t: PType; info: TLineInfo): PType = result = newType(tyGenericInvokation, c.module) - addSonSkipIntLit(result, magicsys.getCompilerProc("Future").typ) + addSonSkipIntLit(result, magicsys.getCompilerProc("Promise").typ) addSonSkipIntLit(result, t) result = instGenericContainer(c, info, result, allowMetaTypes = false) @@ -1619,9 +1619,9 @@ proc semMagic(c: PContext, n: PNode, s: PSym, flags: TExprFlags): PNode = of mSpawn: result = setMs(n, s) result.sons[1] = semExpr(c, n.sons[1]) - # later passes may transform the type 'Future[T]' back into 'T' + # later passes may transform the type 'Promise[T]' back into 'T' if not result[1].typ.isEmptyType: - result.typ = createFuture(c, result[1].typ, n.info) + result.typ = createPromise(c, result[1].typ, n.info) else: result = semDirectOp(c, n, flags) proc semWhen(c: PContext, n: PNode, semCheck = true): PNode = diff --git a/compiler/semtypes.nim b/compiler/semtypes.nim index 8fcb6ea99..bb81cbe74 100644 --- a/compiler/semtypes.nim +++ b/compiler/semtypes.nim @@ -1084,8 +1084,10 @@ proc semTypeNode(c: PContext, n: PNode, prev: PType): PType = of nkCallKinds: if isRange(n): result = semRangeAux(c, n, prev) - elif n[0].kind == nkIdent: - let op = n.sons[0].ident + elif n[0].kind notin nkIdentKinds: + result = semTypeExpr(c, n) + else: + let op = considerAcc(n.sons[0]) if op.id in {ord(wAnd), ord(wOr)} or op.s == "|": checkSonsLen(n, 3) var @@ -1120,8 +1122,6 @@ proc semTypeNode(c: PContext, n: PNode, prev: PType): PType = result = semAnyRef(c, n, tyRef, prev) else: result = semTypeExpr(c, n) - else: - result = semTypeExpr(c, n) of nkWhenStmt: var whenResult = semWhen(c, n, false) if whenResult.kind == nkStmtList: whenResult.kind = nkStmtListType diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim index 41c1adca0..24cb9ccdd 100644 --- a/lib/pure/concurrency/threadpool.nim +++ b/lib/pure/concurrency/threadpool.nim @@ -65,12 +65,14 @@ proc closeBarrier*(b: ptr Barrier) {.compilerProc.} = # ---------------------------------------------------------------------------- type + foreign* = object ## a region that indicates the pointer comes from a + ## foreign thread heap. AwaitInfo = object cv: CondVar idx: int - RawFuture* = ptr RawFutureObj ## untyped base class for 'Future[T]' - RawFutureObj {.inheritable.} = object # \ + RawPromise* = ptr RawPromiseObj ## untyped base class for 'Promise[T]' + RawPromiseObj {.inheritable.} = object # \ # we allocate this with the thread local allocator; this # is possible since we already need to do the GC_unref # on the owning thread @@ -81,10 +83,10 @@ type idx: int data: PObject # we incRef and unref it to keep it alive owner: ptr Worker - next: RawFuture + next: RawPromise align: float64 # a float for proper alignment - Future* {.compilerProc.} [T] = ptr object of RawFutureObj + Promise* {.compilerProc.} [T] = ptr object of RawPromiseObj blob: T ## the underlying value, if available. Note that usually ## you should not access this field directly! However it can ## sometimes be more efficient than getting the value via ``^``. @@ -99,24 +101,24 @@ type ready: bool # put it here for correct alignment! initialized: bool # whether it has even been initialized shutdown: bool # the pool requests to shut down this worker thread - futureLock: TLock - head: RawFuture + promiseLock: TLock + head: RawPromise -proc finished*(fut: RawFuture) = - ## This MUST be called for every created future to free its associated +proc finished*(prom: RawPromise) = + ## This MUST be called for every created promise to free its associated ## resources. Note that the default reading operation ``^`` is destructive ## and calls ``finished``. - doAssert fut.ai.isNil, "future is still attached to an 'awaitAny'" - assert fut.next == nil - let w = fut.owner - acquire(w.futureLock) - fut.next = w.head - w.head = fut - release(w.futureLock) - -proc cleanFutures(w: ptr Worker) = + doAssert prom.ai.isNil, "promise is still attached to an 'awaitAny'" + assert prom.next == nil + let w = prom.owner + acquire(w.promiseLock) + prom.next = w.head + w.head = prom + release(w.promiseLock) + +proc cleanPromises(w: ptr Worker) = var it = w.head - acquire(w.futureLock) + acquire(w.promiseLock) while it != nil: let nxt = it.next if it.usesCondVar: destroyCondVar(it.cv) @@ -124,62 +126,84 @@ proc cleanFutures(w: ptr Worker) = dealloc(it) it = nxt w.head = nil - release(w.futureLock) + release(w.promiseLock) -proc nimCreateFuture(owner: pointer; blobSize: int): RawFuture {. +proc nimCreatePromise(owner: pointer; blobSize: int): RawPromise {. compilerProc.} = - result = cast[RawFuture](alloc0(RawFutureObj.sizeof + blobSize)) + result = cast[RawPromise](alloc0(RawPromiseObj.sizeof + blobSize)) result.owner = cast[ptr Worker](owner) -proc nimFutureCreateCondVar(fut: RawFuture) {.compilerProc.} = - fut.cv = createCondVar() - fut.usesCondVar = true - -proc nimFutureSignal(fut: RawFuture) {.compilerProc.} = - if fut.ai != nil: - acquire(fut.ai.cv.L) - fut.ai.idx = fut.idx - inc fut.ai.cv.counter - release(fut.ai.cv.L) - signal(fut.ai.cv.c) - if fut.usesCondVar: signal(fut.cv) +proc nimPromiseCreateCondVar(prom: RawPromise) {.compilerProc.} = + prom.cv = createCondVar() + prom.usesCondVar = true + +proc nimPromiseSignal(prom: RawPromise) {.compilerProc.} = + if prom.ai != nil: + acquire(prom.ai.cv.L) + prom.ai.idx = prom.idx + inc prom.ai.cv.counter + release(prom.ai.cv.L) + signal(prom.ai.cv.c) + if prom.usesCondVar: signal(prom.cv) + +proc await*[T](prom: Promise[T]) = + ## waits until the value for the promise arrives. + if prom.usesCondVar: await(prom.cv) + +proc awaitAndThen*[T](prom: Promise[T]; action: proc (x: T) {.closure.}) = + ## blocks until the value is available and then passes this value + ## to ``action``. Note that due to Nimrod's parameter passing semantics this + ## means that ``T`` doesn't need to be copied and so ``awaitAndThen`` can + ## sometimes be more efficient than ``^``. + if prom.usesCondVar: await(prom) + when T is string or T is seq: + action(cast[T](prom.data)) + elif T is ref: + {.error: "'awaitAndThen' not available for Promise[ref]".} + else: + action(prom.blob) + finished(prom) -proc await*[T](fut: Future[T]) = - ## waits until the value for the future arrives. - if fut.usesCondVar: await(fut.cv) +proc `^`*[T](prom: Promise[ref T]): foreign ptr T = + ## blocks until the value is available and then returns this value. Note + ## this reading is destructive for reasons of efficiency and convenience. + ## This calls ``finished(prom)``. + if prom.usesCondVar: await(prom) + result = cast[foreign ptr T](prom.data) + finished(prom) -proc `^`*[T](fut: Future[T]): T = +proc `^`*[T](prom: Promise[T]): T = ## blocks until the value is available and then returns this value. Note ## this reading is destructive for reasons of efficiency and convenience. - ## This calls ``finished(fut)``. - if fut.usesCondVar: await(fut) - when T is string or T is seq or T is ref: - result = cast[T](fut.data) + ## This calls ``finished(prom)``. + if prom.usesCondVar: await(prom) + when T is string or T is seq: + result = cast[T](prom.data) else: - result = fut.blob - finished(fut) + result = prom.blob + finished(prom) -proc awaitAny*(futures: openArray[RawFuture]): int = - # awaits any of the given futures. Returns the index of one future for which - ## a value arrived. A future only supports one call to 'awaitAny' at the +proc awaitAny*(promises: openArray[RawPromise]): int = + # awaits any of the given promises. Returns the index of one promise for which + ## a value arrived. A promise only supports one call to 'awaitAny' at the ## same time. That means if you await([a,b]) and await([b,c]) the second - ## call will only await 'c'. If there is no future left to be able to wait + ## call will only await 'c'. If there is no promise left to be able to wait ## on, -1 is returned. ## **Note**: This results in non-deterministic behaviour and so should be ## avoided. var ai: AwaitInfo ai.cv = createCondVar() var conflicts = 0 - for i in 0 .. futures.high: - if cas(addr futures[i].ai, nil, addr ai): - futures[i].idx = i + for i in 0 .. promises.high: + if cas(addr promises[i].ai, nil, addr ai): + promises[i].idx = i else: inc conflicts - if conflicts < futures.len: + if conflicts < promises.len: await(ai.cv) result = ai.idx - for i in 0 .. futures.high: - discard cas(addr futures[i].ai, addr ai, nil) + for i in 0 .. promises.high: + discard cas(addr promises[i].ai, addr ai, nil) else: result = -1 destroyCondVar(ai.cv) @@ -207,7 +231,7 @@ proc slave(w: ptr Worker) {.thread.} = await(w.taskArrived) assert(not w.ready) w.f(w, w.data) - if w.head != nil: w.cleanFutures + if w.head != nil: w.cleanPromises if w.shutdown: w.shutdown = false atomicDec currentPoolSize @@ -228,7 +252,7 @@ var proc activateThread(i: int) {.noinline.} = workersData[i].taskArrived = createCondVar() workersData[i].taskStarted = createCondVar() - initLock workersData[i].futureLock + initLock workersData[i].promiseLock workersData[i].initialized = true createThread(workers[i], slave, addr(workersData[i])) diff --git a/lib/system.nim b/lib/system.nim index fbd905afa..fc6f617a5 100644 --- a/lib/system.nim +++ b/lib/system.nim @@ -42,7 +42,6 @@ type cstring* {.magic: Cstring.} ## built-in cstring (*compatible string*) type pointer* {.magic: Pointer.} ## built-in pointer type, use the ``addr`` ## operator to get a pointer to a variable - const on* = true ## alias for ``true`` off* = false ## alias for ``false`` @@ -51,6 +50,9 @@ const type Ordinal* {.magic: Ordinal.}[T] + `ptr`* {.magic: Pointer.}[T] ## built-in generic untraced pointer type + `ref`* {.magic: Pointer.}[T] ## built-in generic traced pointer type + `nil` {.magic: "Nil".} expr* {.magic: Expr.} ## meta type to denote an expression (for templates) stmt* {.magic: Stmt.} ## meta type to denote a statement (for templates) diff --git a/lib/system/assign.nim b/lib/system/assign.nim index 75c749633..2ae945fb1 100644 --- a/lib/system/assign.nim +++ b/lib/system/assign.nim @@ -179,7 +179,8 @@ when not defined(nimmixin): # internal proc used for destroying sequences and arrays for i in countup(0, r.len - 1): destroy(r[i]) else: - # XXX Why is this exported and no compilerproc? + # XXX Why is this exported and no compilerproc? -> compilerprocs cannot be + # generic for now proc nimDestroyRange*[T](r: T) = # internal proc used for destroying sequences and arrays mixin destroy -- cgit 1.4.1-2-gfad0 From 2de99653d002b919c88322219bff6f33653081c5 Mon Sep 17 00:00:00 2001 From: Araq Date: Thu, 5 Jun 2014 08:46:29 +0200 Subject: Promises are now refs --- compiler/ccgexprs.nim | 2 +- compiler/lowerings.nim | 45 ++++++------- compiler/pragmas.nim | 11 ++-- compiler/semexprs.nim | 11 ++++ compiler/semparallel.nim | 6 +- lib/pure/concurrency/threadpool.nim | 124 +++++++++++++++++++----------------- lib/system/atomics.nim | 8 +++ todo.txt | 14 ++++ 8 files changed, 133 insertions(+), 88 deletions(-) (limited to 'lib/system') diff --git a/compiler/ccgexprs.nim b/compiler/ccgexprs.nim index 34fdf5bf1..c0442711e 100644 --- a/compiler/ccgexprs.nim +++ b/compiler/ccgexprs.nim @@ -1636,7 +1636,7 @@ proc genMagicExpr(p: BProc, e: PNode, d: var TLoc, op: TMagic) = of mSlurp..mQuoteAst: localError(e.info, errXMustBeCompileTime, e.sons[0].sym.name.s) of mSpawn: - let n = lowerings.wrapProcForSpawn(p.module.module, e[1], e.typ, nil, nil) + let n = lowerings.wrapProcForSpawn(p.module.module, e, e.typ, nil, nil) expr(p, n, d) of mParallel: let n = semparallel.liftParallel(p.module.module, e) diff --git a/compiler/lowerings.nim b/compiler/lowerings.nim index af4daf785..327a18df5 100644 --- a/compiler/lowerings.nim +++ b/compiler/lowerings.nim @@ -86,7 +86,7 @@ proc indirectAccess*(a: PNode, b: string, info: TLineInfo): PNode = # returns a[].b as a node var deref = newNodeI(nkHiddenDeref, info) deref.typ = a.typ.skipTypes(abstractInst).sons[0] - var t = deref.typ + var t = deref.typ.skipTypes(abstractInst) var field: PSym while true: assert t.kind == tyObject @@ -94,6 +94,7 @@ proc indirectAccess*(a: PNode, b: string, info: TLineInfo): PNode = if field != nil: break t = t.sons[0] if t == nil: break + t = t.skipTypes(abstractInst) assert field != nil, b addSon(deref, a) result = newNodeI(nkDotExpr, info) @@ -132,6 +133,11 @@ proc callCodegenProc*(name: string, arg1: PNode; if arg3 != nil: result.add arg3 result.typ = sym.typ.sons[0] +proc callProc(a: PNode): PNode = + result = newNodeI(nkCall, a.info) + result.add a + result.typ = a.typ.sons[0] + # we have 4 cases to consider: # - a void proc --> nothing to do # - a proc returning GC'ed memory --> requires a promise @@ -169,14 +175,14 @@ proc addLocalVar(varSection: PNode; owner: PSym; typ: PType; v: PNode): PSym = discard """ We generate roughly this: -proc f_wrapper(args) = +proc f_wrapper(thread, args) = barrierEnter(args.barrier) # for parallel statement var a = args.a # thread transfer; deepCopy or shallowCopy or no copy # depending on whether we're in a 'parallel' statement var b = args.b + var prom = args.prom - args.prom = nimCreatePromise(thread, sizeof(T)) # optional - nimPromiseCreateCondVar(args.prom) # optional + prom.owner = thread # optional nimArgsPassingDone() # signal parent that the work is done # args.prom.blob = f(a, b, ...) @@ -196,17 +202,6 @@ stmtList: """ -proc createNimCreatePromiseCall(prom, threadParam: PNode): PNode = - let size = newNodeIT(nkCall, prom.info, getSysType(tyInt)) - size.add newSymNode(createMagic("sizeof", mSizeOf)) - assert prom.typ.kind == tyGenericInst - size.add newNodeIT(nkType, prom.info, prom.typ.sons[1]) - - let castExpr = newNodeIT(nkCast, prom.info, prom.typ) - castExpr.add emptyNode - castExpr.add callCodeGenProc("nimCreatePromise", threadParam, size) - result = castExpr - proc createWrapperProc(f: PNode; threadParam, argsParam: PSym; varSection, call, barrier, prom: PNode; spawnKind: TSpawnResult): PSym = @@ -223,14 +218,14 @@ proc createWrapperProc(f: PNode; threadParam, argsParam: PSym; threadLocalProm = addLocalVar(varSection, argsParam.owner, prom.typ, prom) elif prom != nil: internalAssert prom.typ.kind == tyGenericInst - threadLocalProm = addLocalVar(varSection, argsParam.owner, prom.typ, - createNimCreatePromiseCall(prom, threadParam.newSymNode)) + threadLocalProm = addLocalVar(varSection, argsParam.owner, prom.typ, prom) body.add varSection if prom != nil and spawnKind != srByVar: - body.add newFastAsgnStmt(prom, threadLocalProm.newSymNode) - if barrier == nil: - body.add callCodeGenProc("nimPromiseCreateCondVar", prom) + # generate: + # prom.owner = threadParam + body.add newAsgnStmt(indirectAccess(threadLocalProm.newSymNode, + "owner", prom.info), threadParam.newSymNode) body.add callCodeGenProc("nimArgsPassingDone", threadParam.newSymNode) if spawnKind == srByVar: @@ -404,10 +399,11 @@ proc setupArgsForParallelism(n: PNode; objType: PType; scratchObj: PSym; indirectAccess(castExpr, field, n.info)) call.add(threadLocal.newSymNode) -proc wrapProcForSpawn*(owner: PSym; n: PNode; retType: PType; +proc wrapProcForSpawn*(owner: PSym; spawnExpr: PNode; retType: PType; barrier, dest: PNode = nil): PNode = # if 'barrier' != nil, then it is in a 'parallel' section and we # generate quite different code + let n = spawnExpr[1] let spawnKind = spawnResult(retType, barrier!=nil) case spawnKind of srVoid: @@ -419,7 +415,7 @@ proc wrapProcForSpawn*(owner: PSym; n: PNode; retType: PType; of srByVar: if dest == nil: localError(n.info, "'spawn' must not be discarded") result = newNodeI(nkStmtList, n.info) - + if n.kind notin nkCallKinds: localError(n.info, "'spawn' takes a call expression") return @@ -489,6 +485,11 @@ proc wrapProcForSpawn*(owner: PSym; n: PNode; retType: PType; objType.addField(field) promField = newDotExpr(scratchObj, field) promAsExpr = indirectAccess(castExpr, field, n.info) + # create promise: + result.add newFastAsgnStmt(promField, callProc(spawnExpr[2])) + if barrier == nil: + result.add callCodeGenProc("nimPromiseCreateCondVar", promField) + elif spawnKind == srByVar: var field = newSym(skField, getIdent"prom", owner, n.info) field.typ = newType(tyPtr, objType.owner) diff --git a/compiler/pragmas.nim b/compiler/pragmas.nim index db9fe7cbe..aed0e1850 100644 --- a/compiler/pragmas.nim +++ b/compiler/pragmas.nim @@ -644,12 +644,13 @@ proc singlePragma(c: PContext, sym: PSym, n: PNode, i: int, incl(sym.flags, sfNoReturn) of wDynlib: processDynLib(c, it, sym) - of wCompilerproc: + of wCompilerproc: noVal(it) # compilerproc may not get a string! - makeExternExport(sym, "$1", it.info) - incl(sym.flags, sfCompilerProc) - incl(sym.flags, sfUsed) # suppress all those stupid warnings - registerCompilerProc(sym) + if sfFromGeneric notin sym.flags: + makeExternExport(sym, "$1", it.info) + incl(sym.flags, sfCompilerProc) + incl(sym.flags, sfUsed) # suppress all those stupid warnings + registerCompilerProc(sym) of wProcVar: noVal(it) incl(sym.flags, sfProcvar) diff --git a/compiler/semexprs.nim b/compiler/semexprs.nim index e507e711f..9e3785185 100644 --- a/compiler/semexprs.nim +++ b/compiler/semexprs.nim @@ -1585,6 +1585,16 @@ proc createPromise(c: PContext; t: PType; info: TLineInfo): PType = addSonSkipIntLit(result, t) result = instGenericContainer(c, info, result, allowMetaTypes = false) +proc instantiateCreatePromiseCall(c: PContext; t: PType; + info: TLineInfo): PSym = + let sym = magicsys.getCompilerProc("nimCreatePromise") + if sym == nil: + localError(info, errSystemNeeds, "nimCreatePromise") + var bindings: TIdTable + initIdTable(bindings) + bindings.idTablePut(sym.ast[genericParamsPos].sons[0].typ, t) + result = c.semGenerateInstance(c, sym, bindings, info) + proc setMs(n: PNode, s: PSym): PNode = result = n n.sons[0] = newSymNode(s) @@ -1626,6 +1636,7 @@ proc semMagic(c: PContext, n: PNode, s: PSym, flags: TExprFlags): PNode = result.typ = result[1].typ else: result.typ = createPromise(c, result[1].typ, n.info) + result.add instantiateCreatePromiseCall(c, result[1].typ, n.info).newSymNode else: result = semDirectOp(c, n, flags) proc semWhen(c: PContext, n: PNode, semCheck = true): PNode = diff --git a/compiler/semparallel.nim b/compiler/semparallel.nim index 72def1137..c594a4788 100644 --- a/compiler/semparallel.nim +++ b/compiler/semparallel.nim @@ -406,19 +406,19 @@ proc transformSpawn(owner: PSym; n, barrier: PNode): PNode = if result.isNil: result = newNodeI(nkStmtList, n.info) result.add n - result.add wrapProcForSpawn(owner, m[1], b.typ, barrier, it[0]) + result.add wrapProcForSpawn(owner, m, b.typ, barrier, it[0]) it.sons[it.len-1] = emptyNode if result.isNil: result = n of nkAsgn, nkFastAsgn: let b = n[1] if getMagic(b) == mSpawn: let m = transformSlices(b) - return wrapProcForSpawn(owner, m[1], b.typ, barrier, n[0]) + return wrapProcForSpawn(owner, m, b.typ, barrier, n[0]) result = transformSpawnSons(owner, n, barrier) of nkCallKinds: if getMagic(n) == mSpawn: result = transformSlices(n) - return wrapProcForSpawn(owner, result[1], n.typ, barrier, nil) + return wrapProcForSpawn(owner, result, n.typ, barrier, nil) result = transformSpawnSons(owner, n, barrier) elif n.safeLen > 0: result = transformSpawnSons(owner, n, barrier) diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim index 92d5011f4..8129d03ae 100644 --- a/lib/pure/concurrency/threadpool.nim +++ b/lib/pure/concurrency/threadpool.nim @@ -85,25 +85,26 @@ type cv: CondVar idx: int - RawPromise* = ptr RawPromiseObj ## untyped base class for 'Promise[T]' - RawPromiseObj {.inheritable.} = object # \ - # we allocate this with the thread local allocator; this - # is possible since we already need to do the GC_unref - # on the owning thread + RawPromise* = ref RawPromiseObj ## untyped base class for 'Promise[T]' + RawPromiseObj = object of TObject ready, usesCondVar: bool cv: CondVar #\ # for 'awaitAny' support ai: ptr AwaitInfo idx: int - data: PObject # we incRef and unref it to keep it alive - owner: ptr Worker - next: RawPromise - align: float64 # a float for proper alignment + data: pointer # we incRef and unref it to keep it alive + owner: pointer # ptr Worker - Promise* {.compilerProc.} [T] = ptr object of RawPromiseObj - blob: T ## the underlying value, if available. Note that usually - ## you should not access this field directly! However it can - ## sometimes be more efficient than getting the value via ``^``. + PromiseObj[T] = object of RawPromiseObj + blob: T + + Promise*{.compilerProc.}[T] = ref PromiseObj[T] + + ToFreeQueue = object + len: int + lock: TLock + empty: TCond + data: array[512, pointer] WorkerProc = proc (thread, args: pointer) {.nimcall, gcsafe.} Worker = object @@ -115,37 +116,55 @@ type ready: bool # put it here for correct alignment! initialized: bool # whether it has even been initialized shutdown: bool # the pool requests to shut down this worker thread - promiseLock: TLock - head: RawPromise + q: ToFreeQueue + +proc await*(prom: RawPromise) = + ## waits until the value for the promise arrives. Usually it is not necessary + ## to call this explicitly. + if prom.usesCondVar: + prom.usesCondVar = false + await(prom.cv) + destroyCondVar(prom.cv) -proc finished*(prom: RawPromise) = - ## This MUST be called for every created promise to free its associated - ## resources. Note that the default reading operation ``^`` is destructive - ## and calls ``finished``. +proc finished(prom: RawPromise) = doAssert prom.ai.isNil, "promise is still attached to an 'awaitAny'" - assert prom.next == nil - let w = prom.owner - acquire(w.promiseLock) - prom.next = w.head - w.head = prom - release(w.promiseLock) + # we have to protect against the rare cases where the owner of the promise + # simply disregards the promise and yet the "promiser" has not yet written + # anything to it: + await(prom) + if prom.data.isNil: return + let owner = cast[ptr Worker](prom.owner) + let q = addr(owner.q) + var waited = false + while true: + acquire(q.lock) + if q.len < q.data.len: + q.data[q.len] = prom.data + inc q.len + release(q.lock) + break + else: + # the queue is exhausted! We block until it has been cleaned: + release(q.lock) + wait(q.empty, q.lock) + waited = true + prom.data = nil + # wakeup other potentially waiting threads: + if waited: signal(q.empty) proc cleanPromises(w: ptr Worker) = - var it = w.head - acquire(w.promiseLock) - while it != nil: - let nxt = it.next - if it.usesCondVar: destroyCondVar(it.cv) - if it.data != nil: GC_unref(it.data) - dealloc(it) - it = nxt - w.head = nil - release(w.promiseLock) - -proc nimCreatePromise(owner: pointer; blobSize: int): RawPromise {. - compilerProc.} = - result = cast[RawPromise](alloc0(RawPromiseObj.sizeof + blobSize)) - result.owner = cast[ptr Worker](owner) + let q = addr(w.q) + acquire(q.lock) + for i in 0 .. Date: Fri, 6 Jun 2014 02:05:17 +0200 Subject: added 'fence' instructions to the barrier --- lib/pure/concurrency/threadpool.nim | 25 ++++++++++++++++--------- lib/system/atomics.nim | 35 +++++------------------------------ todo.txt | 3 +++ 3 files changed, 24 insertions(+), 39 deletions(-) (limited to 'lib/system') diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim index 8129d03ae..c4ed42c05 100644 --- a/lib/pure/concurrency/threadpool.nim +++ b/lib/pure/concurrency/threadpool.nim @@ -53,12 +53,15 @@ type interest: bool ## wether the master is interested in the "all done" event proc barrierEnter(b: ptr Barrier) {.compilerProc, inline.} = - ## due to the signaling between threads, it is ensured we are the only - ## one with access to 'entered' so we don't need 'atomicInc' here: + # due to the signaling between threads, it is ensured we are the only + # one with access to 'entered' so we don't need 'atomicInc' here: inc b.entered + # also we need no 'fence' instructions here as soon 'nimArgsPassingDone' + # will be called which already will perform a fence for us. proc barrierLeave(b: ptr Barrier) {.compilerProc, inline.} = atomicInc b.left + when not defined(x86): fence() if b.interest and b.left == b.entered: signal(b.cv) proc openBarrier(b: ptr Barrier) {.compilerProc, inline.} = @@ -67,10 +70,12 @@ proc openBarrier(b: ptr Barrier) {.compilerProc, inline.} = b.interest = false proc closeBarrier(b: ptr Barrier) {.compilerProc.} = + fence() if b.left != b.entered: b.cv = createCondVar() - b.interest = true # XXX we really need to ensure no re-orderings are done - # by the C compiler here + fence() + b.interest = true + fence() while b.left != b.entered: await(b.cv) destroyCondVar(b.cv) @@ -207,9 +212,9 @@ proc `^`*[T](prom: Promise[T]): T = result = prom.blob proc awaitAny*(promises: openArray[RawPromise]): int = - # awaits any of the given promises. Returns the index of one promise for which - ## a value arrived. A promise only supports one call to 'awaitAny' at the - ## same time. That means if you await([a,b]) and await([b,c]) the second + ## awaits any of the given promises. Returns the index of one promise for + ## which a value arrived. A promise only supports one call to 'awaitAny' at + ## the same time. That means if you await([a,b]) and await([b,c]) the second ## call will only await 'c'. If there is no promise left to be able to wait ## on, -1 is returned. ## **Note**: This results in non-deterministic behaviour and so should be @@ -294,14 +299,16 @@ proc preferSpawn*(): bool = proc spawn*(call: expr): expr {.magic: "Spawn".} ## always spawns a new task, so that the 'call' is never executed on ## the calling thread. 'call' has to be proc call 'p(...)' where 'p' - ## is gcsafe and has 'void' as the return type. + ## is gcsafe and has a return type that is either 'void' or compatible + ## with ``Promise[T]``. template spawnX*(call: expr): expr = ## spawns a new task if a CPU core is ready, otherwise executes the ## call in the calling thread. Usually it is advised to ## use 'spawn' in order to not block the producer for an unknown ## amount of time. 'call' has to be proc call 'p(...)' where 'p' - ## is gcsafe and has 'void' as the return type. + ## is gcsafe and has a return type that is either 'void' or compatible + ## with ``Promise[T]``. (if preferSpawn(): spawn call else: call) proc parallel*(body: stmt) {.magic: "Parallel".} diff --git a/lib/system/atomics.nim b/lib/system/atomics.nim index 6e2bd9a97..43b3f0438 100644 --- a/lib/system/atomics.nim +++ b/lib/system/atomics.nim @@ -10,7 +10,9 @@ ## Atomic operations for Nimrod. {.push stackTrace:off.} -when (defined(gcc) or defined(llvm_gcc)) and hasThreadSupport: +const someGcc = defined(gcc) or defined(llvm_gcc) or defined(clang) + +when someGcc and hasThreadSupport: type AtomMemModel* = enum ATOMIC_RELAXED, ## No barriers or synchronization. @@ -163,33 +165,6 @@ else: inc(p[], val) result = p[] -# atomic compare and swap (CAS) funcitons to implement lock-free algorithms - -#if defined(windows) and not defined(gcc) and hasThreadSupport: -# proc InterlockedCompareExchangePointer(mem: ptr pointer, -# newValue: pointer, comparand: pointer) : pointer {.nodecl, -# importc: "InterlockedCompareExchangePointer", header:"windows.h".} - -# proc compareAndSwap*[T](mem: ptr T, -# expected: T, newValue: T): bool {.inline.}= -# ## Returns true if successfully set value at mem to newValue when value -# ## at mem == expected -# return InterlockedCompareExchangePointer(addr(mem), -# addr(newValue), addr(expected))[] == expected - -#elif not hasThreadSupport: -# proc compareAndSwap*[T](mem: ptr T, -# expected: T, newValue: T): bool {.inline.} = -# ## Returns true if successfully set value at mem to newValue when value -# ## at mem == expected -# var oldval = mem[] -# if oldval == expected: -# mem[] = newValue -# return true -# return false - - -# Some convenient functions proc atomicInc*(memLoc: var int, x: int = 1): int = when defined(gcc) and hasThreadSupport: result = atomic_add_fetch(memLoc.addr, x, ATOMIC_RELAXED) @@ -207,7 +182,7 @@ proc atomicDec*(memLoc: var int, x: int = 1): int = dec(memLoc, x) result = memLoc -when defined(windows) and not defined(gcc): +when defined(windows) and not someGcc: proc interlockedCompareExchange(p: pointer; exchange, comparand: int32): int32 {.importc: "InterlockedCompareExchange", header: "", cdecl.} @@ -221,7 +196,7 @@ else: # XXX is this valid for 'int'? -when (defined(x86) or defined(amd64)) and defined(gcc): +when (defined(x86) or defined(amd64)) and (defined(gcc) or defined(llvm_gcc)): proc cpuRelax {.inline.} = {.emit: """asm volatile("pause" ::: "memory");""".} elif (defined(x86) or defined(amd64)) and defined(vcc): diff --git a/todo.txt b/todo.txt index 8a351e8a7..7d4eac1ad 100644 --- a/todo.txt +++ b/todo.txt @@ -8,8 +8,11 @@ Concurrency - implement 'deepCopy' builtin - implement 'foo[1..4] = spawn(f[4..7])' - the disjoint checker needs to deal with 'a = spawn f(); g = spawn f()' +- support for exception propagation - Minor: The copying of the 'ref Promise' into the thead local storage only happens to work due to the write barrier's implementation +- 'gcsafe' inferrence needs to be fixed +- implement lock levels --> first without the more complex race avoidance Misc -- cgit 1.4.1-2-gfad0