diff options
-rw-r--r-- | compiler/ast.nim | 2 | ||||
-rw-r--r-- | compiler/semtypes.nim | 2 | ||||
-rw-r--r-- | doc/spawn.txt | 63 | ||||
-rw-r--r-- | lib/pure/concurrency/threadpool.nim | 18 | ||||
-rw-r--r-- | lib/system.nim | 4 | ||||
-rw-r--r-- | tests/parallel/tpi.nim | 2 | ||||
-rw-r--r-- | todo.txt | 4 |
7 files changed, 70 insertions, 25 deletions
diff --git a/compiler/ast.nim b/compiler/ast.nim index c3cb63df4..eb4574928 100644 --- a/compiler/ast.nim +++ b/compiler/ast.nim @@ -560,7 +560,7 @@ type mFloat, mFloat32, mFloat64, mFloat128, mBool, mChar, mString, mCstring, mPointer, mEmptySet, mIntSetBaseType, mNil, mExpr, mStmt, mTypeDesc, - mVoidType, mPNimrodNode, mShared, mGuarded, mLock, mSpawn, + mVoidType, mPNimrodNode, mShared, mGuarded, mLock, mSpawn, mDeepCopy, mIsMainModule, mCompileDate, mCompileTime, mNimrodVersion, mNimrodMajor, mNimrodMinor, mNimrodPatch, mCpuEndian, mHostOS, mHostCPU, mAppType, mNaN, mInf, mNegInf, diff --git a/compiler/semtypes.nim b/compiler/semtypes.nim index c328f133b..3b9e82261 100644 --- a/compiler/semtypes.nim +++ b/compiler/semtypes.nim @@ -1087,7 +1087,7 @@ proc semTypeNode(c: PContext, n: PNode, prev: PType): PType = elif n[0].kind notin nkIdentKinds: result = semTypeExpr(c, n) else: - let op = considerAcc(n.sons[0]) + let op = considerQuotedIdent(n.sons[0]) if op.id in {ord(wAnd), ord(wOr)} or op.s == "|": checkSonsLen(n, 3) var diff --git a/doc/spawn.txt b/doc/spawn.txt index 19560ebf5..ed500f3a5 100644 --- a/doc/spawn.txt +++ b/doc/spawn.txt @@ -6,6 +6,8 @@ Nimrod has two flavors of parallelism: 1) `Structured`:idx parallelism via the ``parallel`` statement. 2) `Unstructured`:idx: parallelism via the standalone ``spawn`` statement. +Both need the `threadpool <threadpool.html>`_ module to work. + Somewhat confusingly, ``spawn`` is also used in the ``parallel`` statement with slightly different semantics. ``spawn`` always takes a call expression of the form ``f(a, ...)``. Let ``T`` be ``f``'s return type. If ``T`` is ``void`` @@ -20,6 +22,25 @@ the overhead of an indirection via ``FlowVar[T]`` to ensure correctness. Parallel statement ================== +Example: + +.. code-block:: nimrod + # Compute PI in an inefficient way + import strutils, math, threadpool + + proc term(k: float): float = 4 * math.pow(-1, k) / (2*k + 1) + + proc pi(n: int): float = + var ch = newSeq[float](n+1) + parallel: + for k in 0..ch.high: + ch[k] = spawn term(float(k)) + for k in 0..ch.high: + result += ch[k] + + echo formatFloat(pi(5000)) + + The parallel statement is the preferred mechanism to introduce parallelism in a Nimrod program. A subset of the Nimrod language is valid within a ``parallel`` section. This subset is checked to be free of data races at @@ -30,17 +51,21 @@ The subset is in fact the full language with the following restrictions / changes: * ``spawn`` within a ``parallel`` section has special semantics. -* Every location of the form ``a[i]`` and ``a[i..j]`` and ``dest`` where +* Every location of the form ``a[i]`` and ``a[i..j]`` and ``dest`` where ``dest`` is part of the pattern ``dest = spawn f(...)`` has to be provable disjoint. This is called the *disjoint check*. -* Every other complex location ``loc`` that is used in a spawned - proc (``spawn f(loc)``) has to immutable for the duration of - the ``parallel``. This is called the *immutability check*. Currently it - is not specified what exactly "complex location" means. We need to make that - an optimization! -* Every array access has to be provable within bounds. +* Every other complex location ``loc`` that is used in a spawned + proc (``spawn f(loc)``) has to be immutable for the duration of + the ``parallel`` section. This is called the *immutability check*. Currently + it is not specified what exactly "complex location" means. We need to make + this an optimization! +* Every array access has to be provable within bounds. This is called + the *bounds check*. * Slices are optimized so that no copy is performed. This optimization is not - yet performed for ordinary slices outside of a ``parallel`` section. + yet performed for ordinary slices outside of a ``parallel`` section. Slices + are also special in that they currently do not support negative indexes! + + Spawn statement @@ -49,9 +74,25 @@ Spawn statement A standalone ``spawn`` statement is a simple construct. It executes the passed expression on the thread pool and returns a `data flow variable`:idx: ``FlowVar[T]`` that can be read from. The reading with the ``^`` operator is -**blocking**. However, one can use ``awaitAny`` to wait on multiple flow variables -at the same time. +**blocking**. However, one can use ``awaitAny`` to wait on multiple flow +variables at the same time: + +.. code-block:: nimrod + import threadpool, ... + + # wait until 2 out of 3 servers received the update: + proc main = + var responses = newSeq[RawFlowVar](3) + for i in 0..2: + responses[i] = spawn tellServer(Update, "key", "value") + var index = awaitAny(responses) + assert index >= 0 + responses.del(index) + discard awaitAny(responses) Like the ``parallel`` statement data flow variables ensure that no data races -are possible. +are possible. Due to technical limitations not every type ``T`` is possible in +a data flow variable: ``T`` has to be of the type ``ref``, ``string``, ``seq`` +or of a type that doesn't contain a type that is garbage collected. This +restriction will be removed in the future. diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim index c34b91e30..fd1041918 100644 --- a/lib/pure/concurrency/threadpool.nim +++ b/lib/pure/concurrency/threadpool.nim @@ -47,7 +47,7 @@ type entered: int cv: CondVar # condvar takes 3 words at least when sizeof(int) < 8: - cacheAlign: array[CacheLineSize-4*sizeof(int), byte] + cacheAlign: array[CacheLineSize-4*sizeof(int), byte] left: int cacheAlign2: array[CacheLineSize-sizeof(int), byte] interest: bool ## wether the master is interested in the "all done" event @@ -90,8 +90,8 @@ type cv: CondVar idx: int - RawFlowVar* = ref RawFlowVarObj ## untyped base class for 'FlowVar[T]' - RawFlowVarObj = object of TObject + FlowVarBase* = ref FlowVarBaseObj ## untyped base class for 'FlowVar[T]' + FlowVarBaseObj = object of TObject ready, usesCondVar: bool cv: CondVar #\ # for 'awaitAny' support @@ -100,7 +100,7 @@ type data: pointer # we incRef and unref it to keep it alive owner: pointer # ptr Worker - FlowVarObj[T] = object of RawFlowVarObj + FlowVarObj[T] = object of FlowVarBaseObj blob: T FlowVar*{.compilerProc.}[T] = ref FlowVarObj[T] ## a data flow variable @@ -123,7 +123,7 @@ type shutdown: bool # the pool requests to shut down this worker thread q: ToFreeQueue -proc await*(fv: RawFlowVar) = +proc await*(fv: FlowVarBase) = ## waits until the value for the flowVar arrives. Usually it is not necessary ## to call this explicitly. if fv.usesCondVar: @@ -131,7 +131,7 @@ proc await*(fv: RawFlowVar) = await(fv.cv) destroyCondVar(fv.cv) -proc finished(fv: RawFlowVar) = +proc finished(fv: FlowVarBase) = doAssert fv.ai.isNil, "flowVar is still attached to an 'awaitAny'" # we have to protect against the rare cases where the owner of the flowVar # simply disregards the flowVar and yet the "flowVarr" has not yet written @@ -171,11 +171,11 @@ proc fvFinalizer[T](fv: FlowVar[T]) = finished(fv) proc nimCreateFlowVar[T](): FlowVar[T] {.compilerProc.} = new(result, fvFinalizer) -proc nimFlowVarCreateCondVar(fv: RawFlowVar) {.compilerProc.} = +proc nimFlowVarCreateCondVar(fv: FlowVarBase) {.compilerProc.} = fv.cv = createCondVar() fv.usesCondVar = true -proc nimFlowVarSignal(fv: RawFlowVar) {.compilerProc.} = +proc nimFlowVarSignal(fv: FlowVarBase) {.compilerProc.} = if fv.ai != nil: acquire(fv.ai.cv.L) fv.ai.idx = fv.idx @@ -211,7 +211,7 @@ proc `^`*[T](fv: FlowVar[T]): T = else: result = fv.blob -proc awaitAny*(flowVars: openArray[RawFlowVar]): int = +proc awaitAny*(flowVars: openArray[FlowVarBase]): int = ## awaits any of the given flowVars. Returns the index of one flowVar for ## which a value arrived. A flowVar only supports one call to 'awaitAny' at ## the same time. That means if you await([a,b]) and await([b,c]) the second diff --git a/lib/system.nim b/lib/system.nim index dcfe42f2c..816995057 100644 --- a/lib/system.nim +++ b/lib/system.nim @@ -2942,6 +2942,10 @@ proc locals*(): TObject {.magic: "Locals", noSideEffect.} = ## # -> B is 1 discard +proc deepCopy*[T](x: T): T {.magic: "DeepCopy", noSideEffect.} + ## performs a deep copy of `x`. This is also used by the code generator + ## for the implementation of ``spawn``. + when not defined(booting): type semistatic*[T] = static[T] | T diff --git a/tests/parallel/tpi.nim b/tests/parallel/tpi.nim index 1ef5c6aea..dcb9b8fc5 100644 --- a/tests/parallel/tpi.nim +++ b/tests/parallel/tpi.nim @@ -8,7 +8,7 @@ import strutils, math, threadpool proc term(k: float): float = 4 * math.pow(-1, k) / (2*k + 1) proc piU(n: int): float = - var ch = newSeq[Promise[float]](n+1) + var ch = newSeq[FlowVar[float]](n+1) for k in 0..n: ch[k] = spawn term(float(k)) for k in 0..n: diff --git a/todo.txt b/todo.txt index 7d4eac1ad..11fc2ef81 100644 --- a/todo.txt +++ b/todo.txt @@ -4,15 +4,15 @@ version 0.9.6 Concurrency ----------- -- document the new 'spawn' and 'parallel' statements +- the disjoint checker needs to deal with 'a = spawn f(); g = spawn f()' - implement 'deepCopy' builtin - implement 'foo[1..4] = spawn(f[4..7])' -- the disjoint checker needs to deal with 'a = spawn f(); g = spawn f()' - support for exception propagation - Minor: The copying of the 'ref Promise' into the thead local storage only happens to work due to the write barrier's implementation - 'gcsafe' inferrence needs to be fixed - implement lock levels --> first without the more complex race avoidance +- document the new 'spawn' and 'parallel' statements Misc |