diff options
-rw-r--r-- | lib/pure/concurrency/threadpool.nim | 50 | ||||
-rw-r--r-- | tests/parallel/tconvexhull.nim | 62 |
2 files changed, 91 insertions, 21 deletions
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim index fbd344e4e..727bbffa2 100644 --- a/lib/pure/concurrency/threadpool.nim +++ b/lib/pure/concurrency/threadpool.nim @@ -100,7 +100,7 @@ type # for 'awaitAny' support ai: ptr AwaitInfo idx: int - data: pointer # we incRef and unref it to keep it alive + data: RootRef # we incRef and unref it to keep it alive owner: pointer # ptr Worker FlowVarObj[T] = object of FlowVarBaseObj @@ -112,7 +112,7 @@ type len: int lock: TLock empty: TCond - data: array[512, pointer] + data: array[2, pointer] WorkerProc = proc (thread, args: pointer) {.nimcall, gcsafe.} Worker = object @@ -134,6 +134,22 @@ proc await*(fv: FlowVarBase) = await(fv.cv) destroyCondVar(fv.cv) +proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool = + if cas(addr w.ready, true, false): + w.data = data + w.f = fn + signal(w.taskArrived) + await(w.taskStarted) + result = true + +proc wakeupWorkerToProcessQueue(w: ptr Worker) = + # Note that if this fails somebody else already woke up the thread so it's + # perfectly fine to do nothing: + if cas(addr w.ready, true, false): + w.data = nil + w.f = proc (t, a: pointer) {.nimcall.} = discard + signal(w.taskArrived) + proc finished(fv: FlowVarBase) = doAssert fv.ai.isNil, "flowVar is still attached to an 'awaitAny'" # we have to protect against the rare cases where the owner of the flowVar @@ -144,18 +160,17 @@ proc finished(fv: FlowVarBase) = let owner = cast[ptr Worker](fv.owner) let q = addr(owner.q) var waited = false - while true: + acquire(q.lock) + while not (q.len < q.data.len): + echo "EXHAUSTED!" + release(q.lock) + wakeupWorkerToProcessQueue(owner) acquire(q.lock) - if q.len < q.data.len: - q.data[q.len] = fv.data - inc q.len - release(q.lock) - break - else: - # the queue is exhausted! We block until it has been cleaned: - release(q.lock) - wait(q.empty, q.lock) - waited = true + wait(q.empty, q.lock) + waited = true + q.data[q.len] = cast[pointer](fv.data) + inc q.len + release(q.lock) fv.data = nil # wakeup other potentially waiting threads: if waited: signal(q.empty) @@ -165,6 +180,7 @@ proc cleanFlowVars(w: ptr Worker) = acquire(q.lock) for i in 0 .. <q.len: GC_unref(cast[RootRef](q.data[i])) + echo "GC_unref" q.len = 0 release(q.lock) signal(q.empty) @@ -331,14 +347,6 @@ var initLock stateLock -proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool = - if cas(addr w.ready, true, false): - w.data = data - w.f = fn - signal(w.taskArrived) - await(w.taskStarted) - result = true - proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} = # implementation of 'spawn' that is used by the code generator. while true: diff --git a/tests/parallel/tconvexhull.nim b/tests/parallel/tconvexhull.nim new file mode 100644 index 000000000..c97fed39b --- /dev/null +++ b/tests/parallel/tconvexhull.nim @@ -0,0 +1,62 @@ +discard """ + output: '''true +true +true +true +true +true''' +""" + +# parallel convex hull for Nim bigbreak +# nim c --threads:on -d:release pconvex_hull.nim +import algorithm, sequtils, threadpool + +type Point = tuple[x, y: float] + +proc cmpPoint(a, b: Point): int = + result = cmp(a.x, b.x) + if result == 0: + result = cmp(a.y, b.y) + +template cross[T](o, a, b: T): expr = + (a.x - o.x) * (b.y - o.y) - (a.y - o.y) * (b.x - o.x) + +template pro(): expr = + while lr1 > 0 and cross(result[lr1 - 1], result[lr1], p[i]) <= 0: + discard result.pop + lr1 -= 1 + result.add(p[i]) + lr1 += 1 + +proc half[T](p: seq[T]; upper: bool): seq[T] = + var i, lr1: int + result = @[] + lr1 = -1 + if upper: + i = 0 + while i <= high(p): + pro() + i += 1 + else: + i = high(p) + while i >= low(p): + pro() + i -= 1 + discard result.pop + +proc convex_hull[T](points: var seq[T], cmp: proc(x, y: T): int {.closure.}) : seq[T] = + if len(points) < 2: return points + points.sort(cmp) + var ul: array[2, FlowVar[seq[T]]] + parallel: + for k in 0..ul.high: + ul[k] = spawn half[T](points, k == 0) + result = concat(^ul[0], ^ul[1]) + +var s = map(toSeq(0..999999), proc(x: int): Point = (float(x div 1000), float(x mod 1000))) +setMaxPoolSize 2 + +#echo convex_hull[Point](s, cmpPoint) +for i in 0..5: + echo convex_hull[Point](s, cmpPoint) == + @[(0.0, 0.0), (999.0, 0.0), (999.0, 999.0), (0.0, 999.0)] |