summary refs log tree commit diff stats
diff options
context:
space:
mode:
authorAraq <rumpf_a@web.de>2014-06-06 21:11:11 +0200
committerAraq <rumpf_a@web.de>2014-06-06 21:11:11 +0200
commit4220b1c81d433e30e3d41c8ecb05b2a9edaface5 (patch)
tree737c7849ac3f2f9b45655b3cd094ddc135d7c593
parent387593bcaab1d399e87f5f7b596d14858b23dabb (diff)
parentc8b5d6a63492775838aff275e99f82f1af747223 (diff)
downloadNim-4220b1c81d433e30e3d41c8ecb05b2a9edaface5.tar.gz
Merge branch 'new_spawn' of https://github.com/Araq/Nimrod into new_spawn
-rw-r--r--compiler/ccgexprs.nim2
-rw-r--r--compiler/lowerings.nim121
-rw-r--r--compiler/pragmas.nim11
-rw-r--r--compiler/semexprs.nim17
-rw-r--r--compiler/semparallel.nim6
-rw-r--r--doc/spawn.txt57
-rw-r--r--lib/pure/concurrency/threadpool.nim265
-rw-r--r--lib/system/atomics.nim43
-rw-r--r--todo.txt17
9 files changed, 316 insertions, 223 deletions
diff --git a/compiler/ccgexprs.nim b/compiler/ccgexprs.nim
index fb672f121..69e382f8d 100644
--- a/compiler/ccgexprs.nim
+++ b/compiler/ccgexprs.nim
@@ -1636,7 +1636,7 @@ proc genMagicExpr(p: BProc, e: PNode, d: var TLoc, op: TMagic) =
   of mSlurp..mQuoteAst:
     localError(e.info, errXMustBeCompileTime, e.sons[0].sym.name.s)
   of mSpawn:
-    let n = lowerings.wrapProcForSpawn(p.module.module, e[1], e.typ, nil, nil)
+    let n = lowerings.wrapProcForSpawn(p.module.module, e, e.typ, nil, nil)
     expr(p, n, d)
   of mParallel:
     let n = semparallel.liftParallel(p.module.module, e)
diff --git a/compiler/lowerings.nim b/compiler/lowerings.nim
index af4daf785..e2afa4362 100644
--- a/compiler/lowerings.nim
+++ b/compiler/lowerings.nim
@@ -86,7 +86,7 @@ proc indirectAccess*(a: PNode, b: string, info: TLineInfo): PNode =
   # returns a[].b as a node
   var deref = newNodeI(nkHiddenDeref, info)
   deref.typ = a.typ.skipTypes(abstractInst).sons[0]
-  var t = deref.typ
+  var t = deref.typ.skipTypes(abstractInst)
   var field: PSym
   while true:
     assert t.kind == tyObject
@@ -94,6 +94,7 @@ proc indirectAccess*(a: PNode, b: string, info: TLineInfo): PNode =
     if field != nil: break
     t = t.sons[0]
     if t == nil: break
+    t = t.skipTypes(abstractInst)
   assert field != nil, b
   addSon(deref, a)
   result = newNodeI(nkDotExpr, info)
@@ -132,28 +133,33 @@ proc callCodegenProc*(name: string, arg1: PNode;
     if arg3 != nil: result.add arg3
     result.typ = sym.typ.sons[0]
 
+proc callProc(a: PNode): PNode =
+  result = newNodeI(nkCall, a.info)
+  result.add a
+  result.typ = a.typ.sons[0]
+
 # we have 4 cases to consider:
 # - a void proc --> nothing to do
-# - a proc returning GC'ed memory --> requires a promise
+# - a proc returning GC'ed memory --> requires a flowVar
 # - a proc returning non GC'ed memory --> pass as hidden 'var' parameter
-# - not in a parallel environment --> requires a promise for memory safety
+# - not in a parallel environment --> requires a flowVar for memory safety
 type
   TSpawnResult = enum
-    srVoid, srPromise, srByVar
-  TPromiseKind = enum
-    promInvalid # invalid type T for 'Promise[T]'
-    promGC      # Promise of a GC'ed type
-    promBlob    # Promise of a blob type
+    srVoid, srFlowVar, srByVar
+  TFlowVarKind = enum
+    fvInvalid # invalid type T for 'FlowVar[T]'
+    fvGC      # FlowVar of a GC'ed type
+    fvBlob    # FlowVar of a blob type
 
 proc spawnResult(t: PType; inParallel: bool): TSpawnResult =
   if t.isEmptyType: srVoid
   elif inParallel and not containsGarbageCollectedRef(t): srByVar
-  else: srPromise
+  else: srFlowVar
 
-proc promiseKind(t: PType): TPromiseKind =
-  if t.skipTypes(abstractInst).kind in {tyRef, tyString, tySequence}: promGC
-  elif containsGarbageCollectedRef(t): promInvalid
-  else: promBlob
+proc flowVarKind(t: PType): TFlowVarKind =
+  if t.skipTypes(abstractInst).kind in {tyRef, tyString, tySequence}: fvGC
+  elif containsGarbageCollectedRef(t): fvInvalid
+  else: fvBlob
 
 proc addLocalVar(varSection: PNode; owner: PSym; typ: PType; v: PNode): PSym =
   result = newSym(skTemp, getIdent(genPrefix), owner, varSection.info)
@@ -169,18 +175,18 @@ proc addLocalVar(varSection: PNode; owner: PSym; typ: PType; v: PNode): PSym =
 discard """
 We generate roughly this:
 
-proc f_wrapper(args) =
+proc f_wrapper(thread, args) =
   barrierEnter(args.barrier)  # for parallel statement
   var a = args.a # thread transfer; deepCopy or shallowCopy or no copy
                  # depending on whether we're in a 'parallel' statement
   var b = args.b
+  var fv = args.fv
 
-  args.prom = nimCreatePromise(thread, sizeof(T)) # optional
-  nimPromiseCreateCondVar(args.prom)  # optional
+  fv.owner = thread # optional
   nimArgsPassingDone() # signal parent that the work is done
   # 
-  args.prom.blob = f(a, b, ...)
-  nimPromiseSignal(args.prom)
+  args.fv.blob = f(a, b, ...)
+  nimFlowVarSignal(args.fv)
   
   # - or -
   f(a, b, ...)
@@ -192,23 +198,12 @@ stmtList:
   scratchObj.b = b
 
   nimSpawn(f_wrapper, addr scratchObj)
-  scratchObj.prom # optional
+  scratchObj.fv # optional
 
 """
 
-proc createNimCreatePromiseCall(prom, threadParam: PNode): PNode =
-  let size = newNodeIT(nkCall, prom.info, getSysType(tyInt))
-  size.add newSymNode(createMagic("sizeof", mSizeOf))
-  assert prom.typ.kind == tyGenericInst
-  size.add newNodeIT(nkType, prom.info, prom.typ.sons[1])
-
-  let castExpr = newNodeIT(nkCast, prom.info, prom.typ)
-  castExpr.add emptyNode
-  castExpr.add callCodeGenProc("nimCreatePromise", threadParam, size)
-  result = castExpr
-
 proc createWrapperProc(f: PNode; threadParam, argsParam: PSym;
-                       varSection, call, barrier, prom: PNode;
+                       varSection, call, barrier, fv: PNode;
                        spawnKind: TSpawnResult): PSym =
   var body = newNodeI(nkStmtList, f.info)
   var threadLocalBarrier: PSym
@@ -220,32 +215,32 @@ proc createWrapperProc(f: PNode; threadParam, argsParam: PSym;
     body.add callCodeGenProc("barrierEnter", threadLocalBarrier.newSymNode)
   var threadLocalProm: PSym
   if spawnKind == srByVar:
-    threadLocalProm = addLocalVar(varSection, argsParam.owner, prom.typ, prom)
-  elif prom != nil:
-    internalAssert prom.typ.kind == tyGenericInst
-    threadLocalProm = addLocalVar(varSection, argsParam.owner, prom.typ, 
-      createNimCreatePromiseCall(prom, threadParam.newSymNode))
+    threadLocalProm = addLocalVar(varSection, argsParam.owner, fv.typ, fv)
+  elif fv != nil:
+    internalAssert fv.typ.kind == tyGenericInst
+    threadLocalProm = addLocalVar(varSection, argsParam.owner, fv.typ, fv)
     
   body.add varSection
-  if prom != nil and spawnKind != srByVar:
-    body.add newFastAsgnStmt(prom, threadLocalProm.newSymNode)
-    if barrier == nil:
-      body.add callCodeGenProc("nimPromiseCreateCondVar", prom)
+  if fv != nil and spawnKind != srByVar:
+    # generate:
+    #   fv.owner = threadParam
+    body.add newAsgnStmt(indirectAccess(threadLocalProm.newSymNode,
+      "owner", fv.info), threadParam.newSymNode)
 
   body.add callCodeGenProc("nimArgsPassingDone", threadParam.newSymNode)
   if spawnKind == srByVar:
     body.add newAsgnStmt(genDeref(threadLocalProm.newSymNode), call)
-  elif prom != nil:
-    let fk = prom.typ.sons[1].promiseKind
-    if fk == promInvalid:
-      localError(f.info, "cannot create a promise of type: " & 
-        typeToString(prom.typ.sons[1]))
+  elif fv != nil:
+    let fk = fv.typ.sons[1].flowVarKind
+    if fk == fvInvalid:
+      localError(f.info, "cannot create a flowVar of type: " & 
+        typeToString(fv.typ.sons[1]))
     body.add newAsgnStmt(indirectAccess(threadLocalProm.newSymNode,
-      if fk == promGC: "data" else: "blob", prom.info), call)
+      if fk == fvGC: "data" else: "blob", fv.info), call)
     if barrier == nil:
-      # by now 'prom' is shared and thus might have beeen overwritten! we need
+      # by now 'fv' is shared and thus might have beeen overwritten! we need
       # to use the thread-local view instead:
-      body.add callCodeGenProc("nimPromiseSignal", threadLocalProm.newSymNode)
+      body.add callCodeGenProc("nimFlowVarSignal", threadLocalProm.newSymNode)
   else:
     body.add call
   if barrier != nil:
@@ -404,22 +399,23 @@ proc setupArgsForParallelism(n: PNode; objType: PType; scratchObj: PSym;
                                     indirectAccess(castExpr, field, n.info))
       call.add(threadLocal.newSymNode)
 
-proc wrapProcForSpawn*(owner: PSym; n: PNode; retType: PType; 
+proc wrapProcForSpawn*(owner: PSym; spawnExpr: PNode; retType: PType; 
                        barrier, dest: PNode = nil): PNode =
   # if 'barrier' != nil, then it is in a 'parallel' section and we
   # generate quite different code
+  let n = spawnExpr[1]
   let spawnKind = spawnResult(retType, barrier!=nil)
   case spawnKind
   of srVoid:
     internalAssert dest == nil
     result = newNodeI(nkStmtList, n.info)
-  of srPromise:
+  of srFlowVar:
     internalAssert dest == nil
     result = newNodeIT(nkStmtListExpr, n.info, retType)
   of srByVar:
     if dest == nil: localError(n.info, "'spawn' must not be discarded")
     result = newNodeI(nkStmtList, n.info)
-  
+
   if n.kind notin nkCallKinds:
     localError(n.info, "'spawn' takes a call expression")
     return
@@ -482,24 +478,29 @@ proc wrapProcForSpawn*(owner: PSym; n: PNode; retType: PType;
     result.add newFastAsgnStmt(newDotExpr(scratchObj, field), barrier)
     barrierAsExpr = indirectAccess(castExpr, field, n.info)
 
-  var promField, promAsExpr: PNode = nil
-  if spawnKind == srPromise:
-    var field = newSym(skField, getIdent"prom", owner, n.info)
+  var fvField, fvAsExpr: PNode = nil
+  if spawnKind == srFlowVar:
+    var field = newSym(skField, getIdent"fv", owner, n.info)
     field.typ = retType
     objType.addField(field)
-    promField = newDotExpr(scratchObj, field)
-    promAsExpr = indirectAccess(castExpr, field, n.info)
+    fvField = newDotExpr(scratchObj, field)
+    fvAsExpr = indirectAccess(castExpr, field, n.info)
+    # create flowVar:
+    result.add newFastAsgnStmt(fvField, callProc(spawnExpr[2]))
+    if barrier == nil:
+      result.add callCodeGenProc("nimFlowVarCreateCondVar", fvField)
+
   elif spawnKind == srByVar:
-    var field = newSym(skField, getIdent"prom", owner, n.info)
+    var field = newSym(skField, getIdent"fv", owner, n.info)
     field.typ = newType(tyPtr, objType.owner)
     field.typ.rawAddSon(retType)
     objType.addField(field)
-    promAsExpr = indirectAccess(castExpr, field, n.info)
+    fvAsExpr = indirectAccess(castExpr, field, n.info)
     result.add newFastAsgnStmt(newDotExpr(scratchObj, field), genAddrOf(dest))
 
   let wrapper = createWrapperProc(fn, threadParam, argsParam, varSection, call,
-                                  barrierAsExpr, promAsExpr, spawnKind)
+                                  barrierAsExpr, fvAsExpr, spawnKind)
   result.add callCodeGenProc("nimSpawn", wrapper.newSymNode,
                              genAddrOf(scratchObj.newSymNode))
 
-  if spawnKind == srPromise: result.add promField
+  if spawnKind == srFlowVar: result.add fvField
diff --git a/compiler/pragmas.nim b/compiler/pragmas.nim
index f2d8988ea..a17773aa4 100644
--- a/compiler/pragmas.nim
+++ b/compiler/pragmas.nim
@@ -644,12 +644,13 @@ proc singlePragma(c: PContext, sym: PSym, n: PNode, i: int,
           incl(sym.flags, sfNoReturn)
         of wDynlib: 
           processDynLib(c, it, sym)
-        of wCompilerproc: 
+        of wCompilerproc:
           noVal(it)           # compilerproc may not get a string!
-          makeExternExport(sym, "$1", it.info)
-          incl(sym.flags, sfCompilerProc)
-          incl(sym.flags, sfUsed) # suppress all those stupid warnings
-          registerCompilerProc(sym)
+          if sfFromGeneric notin sym.flags:
+            makeExternExport(sym, "$1", it.info)
+            incl(sym.flags, sfCompilerProc)
+            incl(sym.flags, sfUsed) # suppress all those stupid warnings
+            registerCompilerProc(sym)
         of wProcVar: 
           noVal(it)
           incl(sym.flags, sfProcvar)
diff --git a/compiler/semexprs.nim b/compiler/semexprs.nim
index cccb02502..078e95fbe 100644
--- a/compiler/semexprs.nim
+++ b/compiler/semexprs.nim
@@ -1585,12 +1585,22 @@ proc semShallowCopy(c: PContext, n: PNode, flags: TExprFlags): PNode =
   else:
     result = semDirectOp(c, n, flags)
 
-proc createPromise(c: PContext; t: PType; info: TLineInfo): PType =
+proc createFlowVar(c: PContext; t: PType; info: TLineInfo): PType =
   result = newType(tyGenericInvokation, c.module)
-  addSonSkipIntLit(result, magicsys.getCompilerProc("Promise").typ)
+  addSonSkipIntLit(result, magicsys.getCompilerProc("FlowVar").typ)
   addSonSkipIntLit(result, t)
   result = instGenericContainer(c, info, result, allowMetaTypes = false)
 
+proc instantiateCreateFlowVarCall(c: PContext; t: PType;
+                                  info: TLineInfo): PSym =
+  let sym = magicsys.getCompilerProc("nimCreateFlowVar")
+  if sym == nil:
+    localError(info, errSystemNeeds, "nimCreateFlowVar")
+  var bindings: TIdTable 
+  initIdTable(bindings)
+  bindings.idTablePut(sym.ast[genericParamsPos].sons[0].typ, t)
+  result = c.semGenerateInstance(c, sym, bindings, info)
+
 proc setMs(n: PNode, s: PSym): PNode = 
   result = n
   n.sons[0] = newSymNode(s)
@@ -1631,7 +1641,8 @@ proc semMagic(c: PContext, n: PNode, s: PSym, flags: TExprFlags): PNode =
       if c.inParallelStmt > 0:
         result.typ = result[1].typ
       else:
-        result.typ = createPromise(c, result[1].typ, n.info)
+        result.typ = createFlowVar(c, result[1].typ, n.info)
+      result.add instantiateCreateFlowVarCall(c, result[1].typ, n.info).newSymNode
   else: result = semDirectOp(c, n, flags)
 
 proc semWhen(c: PContext, n: PNode, semCheck = true): PNode =
diff --git a/compiler/semparallel.nim b/compiler/semparallel.nim
index 72def1137..c594a4788 100644
--- a/compiler/semparallel.nim
+++ b/compiler/semparallel.nim
@@ -406,19 +406,19 @@ proc transformSpawn(owner: PSym; n, barrier: PNode): PNode =
         if result.isNil:
           result = newNodeI(nkStmtList, n.info)
           result.add n
-        result.add wrapProcForSpawn(owner, m[1], b.typ, barrier, it[0])
+        result.add wrapProcForSpawn(owner, m, b.typ, barrier, it[0])
         it.sons[it.len-1] = emptyNode
     if result.isNil: result = n
   of nkAsgn, nkFastAsgn:
     let b = n[1]
     if getMagic(b) == mSpawn:
       let m = transformSlices(b)
-      return wrapProcForSpawn(owner, m[1], b.typ, barrier, n[0])
+      return wrapProcForSpawn(owner, m, b.typ, barrier, n[0])
     result = transformSpawnSons(owner, n, barrier)
   of nkCallKinds:
     if getMagic(n) == mSpawn:
       result = transformSlices(n)
-      return wrapProcForSpawn(owner, result[1], n.typ, barrier, nil)
+      return wrapProcForSpawn(owner, result, n.typ, barrier, nil)
     result = transformSpawnSons(owner, n, barrier)
   elif n.safeLen > 0:
     result = transformSpawnSons(owner, n, barrier)
diff --git a/doc/spawn.txt b/doc/spawn.txt
new file mode 100644
index 000000000..19560ebf5
--- /dev/null
+++ b/doc/spawn.txt
@@ -0,0 +1,57 @@
+==========================================================
+                  Parallel & Spawn
+==========================================================
+
+Nimrod has two flavors of parallelism: 
+1) `Structured`:idx parallelism via the ``parallel`` statement.
+2) `Unstructured`:idx: parallelism via the standalone ``spawn`` statement.
+
+Somewhat confusingly, ``spawn`` is also used in the ``parallel`` statement
+with slightly different semantics. ``spawn`` always takes a call expression of
+the form ``f(a, ...)``. Let ``T`` be ``f``'s return type. If ``T`` is ``void``
+then ``spawn``'s return type is also ``void``. Within a ``parallel`` section
+``spawn``'s return type is ``T``, otherwise it is ``FlowVar[T]``.
+
+The compiler can ensure the location in ``location = spawn f(...)`` is not
+read prematurely within a ``parallel`` section and so there is no need for
+the overhead of an indirection via ``FlowVar[T]`` to ensure correctness.
+
+
+Parallel statement
+==================
+
+The parallel statement is the preferred mechanism to introduce parallelism
+in a Nimrod program. A subset of the Nimrod language is valid within a
+``parallel`` section. This subset is checked to be free of data races at
+compile time. A sophisticated `disjoint checker`:idx: ensures that no data
+races are possible even though shared memory is extensively supported!
+
+The subset is in fact the full language with the following
+restrictions / changes:
+
+* ``spawn`` within a ``parallel`` section has special semantics.
+* Every location of the form ``a[i]`` and ``a[i..j]`` and ``dest`` where 
+  ``dest`` is part of the pattern ``dest = spawn f(...)`` has to be
+  provable disjoint. This is called the *disjoint check*.
+* Every other complex location ``loc`` that is used in a spawned 
+  proc (``spawn f(loc)``) has to immutable for the duration of
+  the ``parallel``. This is called the *immutability check*. Currently it
+  is not specified what exactly "complex location" means. We need to make that
+  an optimization!
+* Every array access has to be provable within bounds.
+* Slices are optimized so that no copy is performed. This optimization is not
+  yet performed for ordinary slices outside of a ``parallel`` section.
+
+
+Spawn statement
+===============
+
+A standalone ``spawn`` statement is a simple construct. It executes
+the passed expression on the thread pool and returns a `data flow variable`:idx:
+``FlowVar[T]`` that can be read from. The reading with the ``^`` operator is
+**blocking**. However, one can use ``awaitAny`` to wait on multiple flow variables
+at the same time.
+
+Like the ``parallel`` statement data flow variables ensure that no data races
+are possible.
+
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim
index 22f00bc0d..c34b91e30 100644
--- a/lib/pure/concurrency/threadpool.nim
+++ b/lib/pure/concurrency/threadpool.nim
@@ -40,33 +40,44 @@ proc signal(cv: var CondVar) =
   release(cv.L)
   signal(cv.c)
 
+const CacheLineSize = 32 # true for most archs
+
 type
-  Barrier* {.compilerProc.} = object
+  Barrier {.compilerProc.} = object
     entered: int
-    cv: CondVar
-    cacheAlign: array[0..20, byte] # ensure 'left' is not on the same
-                                   # cache line as 'entered'
+    cv: CondVar # condvar takes 3 words at least
+    when sizeof(int) < 8:
+      cacheAlign: array[CacheLineSize-4*sizeof(int), byte] 
     left: int
+    cacheAlign2: array[CacheLineSize-sizeof(int), byte]
+    interest: bool ## wether the master is interested in the "all done" event
 
-proc barrierEnter*(b: ptr Barrier) {.compilerProc.} =
-  atomicInc b.entered
+proc barrierEnter(b: ptr Barrier) {.compilerProc, inline.} =
+  # due to the signaling between threads, it is ensured we are the only
+  # one with access to 'entered' so we don't need 'atomicInc' here:
+  inc b.entered
+  # also we need no 'fence' instructions here as soon 'nimArgsPassingDone'
+  # will be called which already will perform a fence for us.
 
-proc barrierLeave*(b: ptr Barrier) {.compilerProc.} =
+proc barrierLeave(b: ptr Barrier) {.compilerProc, inline.} =
   atomicInc b.left
-  # these can only be equal if 'closeBarrier' already signaled its interest
-  # in this event:
-  if b.left == b.entered: signal(b.cv)
+  when not defined(x86): fence()
+  if b.interest and b.left == b.entered: signal(b.cv)
 
-proc openBarrier*(b: ptr Barrier) {.compilerProc.} =
+proc openBarrier(b: ptr Barrier) {.compilerProc, inline.} =
   b.entered = 0
-  b.cv = createCondVar()
-  b.left = -1
-
-proc closeBarrier*(b: ptr Barrier) {.compilerProc.} =
-  # signal interest in the "all done" event:
-  atomicInc b.left
-  while b.left != b.entered: await(b.cv)
-  destroyCondVar(b.cv)
+  b.left = 0
+  b.interest = false
+
+proc closeBarrier(b: ptr Barrier) {.compilerProc.} =
+  fence()
+  if b.left != b.entered:
+    b.cv = createCondVar()
+    fence()
+    b.interest = true
+    fence()
+    while b.left != b.entered: await(b.cv)
+    destroyCondVar(b.cv)
 
 {.pop.}
 
@@ -79,25 +90,26 @@ type
     cv: CondVar
     idx: int
 
-  RawPromise* = ptr RawPromiseObj ## untyped base class for 'Promise[T]'
-  RawPromiseObj {.inheritable.} = object # \
-    # we allocate this with the thread local allocator; this
-    # is possible since we already need to do the GC_unref
-    # on the owning thread
+  RawFlowVar* = ref RawFlowVarObj ## untyped base class for 'FlowVar[T]'
+  RawFlowVarObj = object of TObject
     ready, usesCondVar: bool
     cv: CondVar #\
     # for 'awaitAny' support
     ai: ptr AwaitInfo
     idx: int
-    data: PObject  # we incRef and unref it to keep it alive
-    owner: ptr Worker
-    next: RawPromise
-    align: float64 # a float for proper alignment
+    data: pointer  # we incRef and unref it to keep it alive
+    owner: pointer # ptr Worker
+
+  FlowVarObj[T] = object of RawFlowVarObj
+    blob: T
+
+  FlowVar*{.compilerProc.}[T] = ref FlowVarObj[T] ## a data flow variable
 
-  Promise* {.compilerProc.} [T] = ptr object of RawPromiseObj
-    blob: T  ## the underlying value, if available. Note that usually
-             ## you should not access this field directly! However it can
-             ## sometimes be more efficient than getting the value via ``^``.
+  ToFreeQueue = object
+    len: int
+    lock: TLock
+    empty: TCond
+    data: array[512, pointer]
 
   WorkerProc = proc (thread, args: pointer) {.nimcall, gcsafe.}
   Worker = object
@@ -109,109 +121,117 @@ type
     ready: bool # put it here for correct alignment!
     initialized: bool # whether it has even been initialized
     shutdown: bool # the pool requests to shut down this worker thread
-    promiseLock: TLock
-    head: RawPromise
-
-proc finished*(prom: RawPromise) =
-  ## This MUST be called for every created promise to free its associated
-  ## resources. Note that the default reading operation ``^`` is destructive
-  ## and calls ``finished``.
-  doAssert prom.ai.isNil, "promise is still attached to an 'awaitAny'"
-  assert prom.next == nil
-  let w = prom.owner
-  acquire(w.promiseLock)
-  prom.next = w.head
-  w.head = prom
-  release(w.promiseLock)
-
-proc cleanPromises(w: ptr Worker) =
-  var it = w.head
-  acquire(w.promiseLock)
-  while it != nil:
-    let nxt = it.next
-    if it.usesCondVar: destroyCondVar(it.cv)
-    if it.data != nil: GC_unref(it.data)
-    dealloc(it)
-    it = nxt
-  w.head = nil
-  release(w.promiseLock)
-
-proc nimCreatePromise(owner: pointer; blobSize: int): RawPromise {.
-                     compilerProc.} =
-  result = cast[RawPromise](alloc0(RawPromiseObj.sizeof + blobSize))
-  result.owner = cast[ptr Worker](owner)
-
-proc nimPromiseCreateCondVar(prom: RawPromise) {.compilerProc.} =
-  prom.cv = createCondVar()
-  prom.usesCondVar = true
-
-proc nimPromiseSignal(prom: RawPromise) {.compilerProc.} =
-  if prom.ai != nil:
-    acquire(prom.ai.cv.L)
-    prom.ai.idx = prom.idx
-    inc prom.ai.cv.counter
-    release(prom.ai.cv.L)
-    signal(prom.ai.cv.c)
-  if prom.usesCondVar: signal(prom.cv)
-
-proc await*[T](prom: Promise[T]) =
-  ## waits until the value for the promise arrives.
-  if prom.usesCondVar: await(prom.cv)
-
-proc awaitAndThen*[T](prom: Promise[T]; action: proc (x: T) {.closure.}) =
-  ## blocks until the ``prom`` is available and then passes its value
+    q: ToFreeQueue
+
+proc await*(fv: RawFlowVar) =
+  ## waits until the value for the flowVar arrives. Usually it is not necessary
+  ## to call this explicitly.
+  if fv.usesCondVar:
+    fv.usesCondVar = false
+    await(fv.cv)
+    destroyCondVar(fv.cv)
+
+proc finished(fv: RawFlowVar) =
+  doAssert fv.ai.isNil, "flowVar is still attached to an 'awaitAny'"
+  # we have to protect against the rare cases where the owner of the flowVar
+  # simply disregards the flowVar and yet the "flowVarr" has not yet written
+  # anything to it:
+  await(fv)
+  if fv.data.isNil: return
+  let owner = cast[ptr Worker](fv.owner)
+  let q = addr(owner.q)
+  var waited = false
+  while true:
+    acquire(q.lock)
+    if q.len < q.data.len:
+      q.data[q.len] = fv.data
+      inc q.len
+      release(q.lock)
+      break
+    else:
+      # the queue is exhausted! We block until it has been cleaned:
+      release(q.lock)
+      wait(q.empty, q.lock)
+      waited = true
+  fv.data = nil
+  # wakeup other potentially waiting threads:
+  if waited: signal(q.empty)
+
+proc cleanFlowVars(w: ptr Worker) =
+  let q = addr(w.q)
+  acquire(q.lock)
+  for i in 0 .. <q.len:
+    GC_unref(cast[PObject](q.data[i]))
+  q.len = 0
+  release(q.lock)
+  signal(q.empty)
+
+proc fvFinalizer[T](fv: FlowVar[T]) = finished(fv)
+
+proc nimCreateFlowVar[T](): FlowVar[T] {.compilerProc.} =
+  new(result, fvFinalizer)
+
+proc nimFlowVarCreateCondVar(fv: RawFlowVar) {.compilerProc.} =
+  fv.cv = createCondVar()
+  fv.usesCondVar = true
+
+proc nimFlowVarSignal(fv: RawFlowVar) {.compilerProc.} =
+  if fv.ai != nil:
+    acquire(fv.ai.cv.L)
+    fv.ai.idx = fv.idx
+    inc fv.ai.cv.counter
+    release(fv.ai.cv.L)
+    signal(fv.ai.cv.c)
+  if fv.usesCondVar: signal(fv.cv)
+
+proc awaitAndThen*[T](fv: FlowVar[T]; action: proc (x: T) {.closure.}) =
+  ## blocks until the ``fv`` is available and then passes its value
   ## to ``action``. Note that due to Nimrod's parameter passing semantics this
   ## means that ``T`` doesn't need to be copied and so ``awaitAndThen`` can
   ## sometimes be more efficient than ``^``.
-  if prom.usesCondVar: await(prom)
+  await(fv)
   when T is string or T is seq:
-    action(cast[T](prom.data))
+    action(cast[T](fv.data))
   elif T is ref:
-    {.error: "'awaitAndThen' not available for Promise[ref]".}
+    {.error: "'awaitAndThen' not available for FlowVar[ref]".}
   else:
-    action(prom.blob)
-  finished(prom)
-
-proc `^`*[T](prom: Promise[ref T]): foreign ptr T =
-  ## blocks until the value is available and then returns this value. Note
-  ## this reading is destructive for reasons of efficiency and convenience.
-  ## This calls ``finished(prom)``.
-  if prom.usesCondVar: await(prom)
-  result = cast[foreign ptr T](prom.data)
-  finished(prom)
-
-proc `^`*[T](prom: Promise[T]): T =
-  ## blocks until the value is available and then returns this value. Note
-  ## this reading is destructive for reasons of efficiency and convenience.
-  ## This calls ``finished(prom)``.
-  if prom.usesCondVar: await(prom)
+    action(fv.blob)
+  finished(fv)
+
+proc `^`*[T](fv: FlowVar[ref T]): foreign ptr T =
+  ## blocks until the value is available and then returns this value.
+  await(fv)
+  result = cast[foreign ptr T](fv.data)
+
+proc `^`*[T](fv: FlowVar[T]): T =
+  ## blocks until the value is available and then returns this value.
+  await(fv)
   when T is string or T is seq:
-    result = cast[T](prom.data)
+    result = cast[T](fv.data)
   else:
-    result = prom.blob
-  finished(prom)
-
-proc awaitAny*(promises: openArray[RawPromise]): int =
-  # awaits any of the given promises. Returns the index of one promise for which
-  ## a value arrived. A promise only supports one call to 'awaitAny' at the
-  ## same time. That means if you await([a,b]) and await([b,c]) the second
-  ## call will only await 'c'. If there is no promise left to be able to wait
+    result = fv.blob
+
+proc awaitAny*(flowVars: openArray[RawFlowVar]): int =
+  ## awaits any of the given flowVars. Returns the index of one flowVar for
+  ## which a value arrived. A flowVar only supports one call to 'awaitAny' at
+  ## the same time. That means if you await([a,b]) and await([b,c]) the second
+  ## call will only await 'c'. If there is no flowVar left to be able to wait
   ## on, -1 is returned.
   ## **Note**: This results in non-deterministic behaviour and so should be
   ## avoided.
   var ai: AwaitInfo
   ai.cv = createCondVar()
   var conflicts = 0
-  for i in 0 .. promises.high:
-    if cas(addr promises[i].ai, nil, addr ai):
-      promises[i].idx = i
+  for i in 0 .. flowVars.high:
+    if cas(addr flowVars[i].ai, nil, addr ai):
+      flowVars[i].idx = i
     else:
       inc conflicts
-  if conflicts < promises.len:
+  if conflicts < flowVars.len:
     await(ai.cv)
     result = ai.idx
-    for i in 0 .. promises.high:
-      discard cas(addr promises[i].ai, addr ai, nil)
+    for i in 0 .. flowVars.high:
+      discard cas(addr flowVars[i].ai, addr ai, nil)
   else:
     result = -1
   destroyCondVar(ai.cv)
@@ -239,7 +259,7 @@ proc slave(w: ptr Worker) {.thread.} =
     await(w.taskArrived)
     assert(not w.ready)
     w.f(w, w.data)
-    if w.head != nil: w.cleanPromises
+    if w.q.len != 0: w.cleanFlowVars
     if w.shutdown:
       w.shutdown = false
       atomicDec currentPoolSize
@@ -260,8 +280,9 @@ var
 proc activateThread(i: int) {.noinline.} =
   workersData[i].taskArrived = createCondVar()
   workersData[i].taskStarted = createCondVar()
-  initLock workersData[i].promiseLock
   workersData[i].initialized = true
+  initCond(workersData[i].q.empty)
+  initLock(workersData[i].q.lock)
   createThread(workers[i], slave, addr(workersData[i]))
 
 proc setup() =
@@ -278,14 +299,16 @@ proc preferSpawn*(): bool =
 proc spawn*(call: expr): expr {.magic: "Spawn".}
   ## always spawns a new task, so that the 'call' is never executed on
   ## the calling thread. 'call' has to be proc call 'p(...)' where 'p'
-  ## is gcsafe and has 'void' as the return type.
+  ## is gcsafe and has a return type that is either 'void' or compatible
+  ## with ``FlowVar[T]``.
 
 template spawnX*(call: expr): expr =
   ## spawns a new task if a CPU core is ready, otherwise executes the
   ## call in the calling thread. Usually it is advised to
   ## use 'spawn' in order to not block the producer for an unknown
   ## amount of time. 'call' has to be proc call 'p(...)' where 'p'
-  ## is gcsafe and has 'void' as the return type.
+  ## is gcsafe and has a return type that is either 'void' or compatible
+  ## with ``FlowVar[T]``.
   (if preferSpawn(): spawn call else: call)
 
 proc parallel*(body: stmt) {.magic: "Parallel".}
diff --git a/lib/system/atomics.nim b/lib/system/atomics.nim
index 96246ba01..43b3f0438 100644
--- a/lib/system/atomics.nim
+++ b/lib/system/atomics.nim
@@ -10,7 +10,9 @@
 ## Atomic operations for Nimrod.
 {.push stackTrace:off.}
 
-when (defined(gcc) or defined(llvm_gcc)) and hasThreadSupport:
+const someGcc = defined(gcc) or defined(llvm_gcc) or defined(clang)
+
+when someGcc and hasThreadSupport:
   type 
     AtomMemModel* = enum
       ATOMIC_RELAXED,  ## No barriers or synchronization. 
@@ -153,41 +155,16 @@ when (defined(gcc) or defined(llvm_gcc)) and hasThreadSupport:
     ## A value of 0 indicates typical alignment should be used. The compiler may also 
     ## ignore this parameter.
 
+  template fence*() = atomicThreadFence(ATOMIC_SEQ_CST)
 elif defined(vcc) and hasThreadSupport:
   proc addAndFetch*(p: ptr int, val: int): int {.
     importc: "NimXadd", nodecl.}
+
 else:
   proc addAndFetch*(p: ptr int, val: int): int {.inline.} =
     inc(p[], val)
     result = p[]
 
-# atomic compare and swap (CAS) funcitons to implement lock-free algorithms  
-      
-#if defined(windows) and not defined(gcc) and hasThreadSupport:
-#    proc InterlockedCompareExchangePointer(mem: ptr pointer,
-#      newValue: pointer, comparand: pointer) : pointer {.nodecl, 
-#        importc: "InterlockedCompareExchangePointer", header:"windows.h".}
-
-#    proc compareAndSwap*[T](mem: ptr T, 
-#      expected: T, newValue: T): bool {.inline.}=
-#      ## Returns true if successfully set value at mem to newValue when value
-#      ## at mem == expected
-#      return InterlockedCompareExchangePointer(addr(mem), 
-#        addr(newValue), addr(expected))[] == expected
-    
-#elif not hasThreadSupport:
-#  proc compareAndSwap*[T](mem: ptr T, 
-#                          expected: T, newValue: T): bool {.inline.} =
-#      ## Returns true if successfully set value at mem to newValue when value
-#      ## at mem == expected
-#      var oldval = mem[]
-#      if oldval == expected:
-#        mem[] = newValue
-#        return true
-#      return false
-
-
-# Some convenient functions 
 proc atomicInc*(memLoc: var int, x: int = 1): int =
   when defined(gcc) and hasThreadSupport:
     result = atomic_add_fetch(memLoc.addr, x, ATOMIC_RELAXED)
@@ -205,7 +182,7 @@ proc atomicDec*(memLoc: var int, x: int = 1): int =
     dec(memLoc, x)
     result = memLoc
 
-when defined(windows) and not defined(gcc):
+when defined(windows) and not someGcc:
   proc interlockedCompareExchange(p: pointer; exchange, comparand: int32): int32
     {.importc: "InterlockedCompareExchange", header: "<windows.h>", cdecl.}
 
@@ -219,7 +196,7 @@ else:
   # XXX is this valid for 'int'?
 
 
-when (defined(x86) or defined(amd64)) and defined(gcc):
+when (defined(x86) or defined(amd64)) and (defined(gcc) or defined(llvm_gcc)):
   proc cpuRelax {.inline.} =
     {.emit: """asm volatile("pause" ::: "memory");""".}
 elif (defined(x86) or defined(amd64)) and defined(vcc):
@@ -231,4 +208,10 @@ elif false:
 
   proc cpuRelax {.inline.} = os.sleep(1)
 
+when not defined(fence) and hasThreadSupport:
+  # XXX fixme
+  proc fence*() {.inline.} =
+    var dummy: bool
+    discard cas(addr dummy, false, true)
+
 {.pop.}
diff --git a/todo.txt b/todo.txt
index 996067175..7d4eac1ad 100644
--- a/todo.txt
+++ b/todo.txt
@@ -1,6 +1,23 @@
 version 0.9.6
 =============
 
+Concurrency
+-----------
+
+- document the new 'spawn' and 'parallel' statements
+- implement 'deepCopy' builtin
+- implement 'foo[1..4] = spawn(f[4..7])'
+- the disjoint checker needs to deal with 'a = spawn f(); g = spawn f()'
+- support for exception propagation
+- Minor: The copying of the 'ref Promise' into the thead local storage only
+  happens to work due to the write barrier's implementation
+- 'gcsafe' inferrence needs to be fixed
+- implement lock levels --> first without the more complex race avoidance
+
+
+Misc
+----
+
 - fix the bug that keeps 'defer' template from working
 - make '--implicitStatic:on' the default
 - fix the tuple unpacking in lambda bug