summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--lib/pure/concurrency/threadpool.nim50
-rw-r--r--tests/parallel/tconvexhull.nim62
2 files changed, 91 insertions, 21 deletions
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim
index fbd344e4e..727bbffa2 100644
--- a/lib/pure/concurrency/threadpool.nim
+++ b/lib/pure/concurrency/threadpool.nim
@@ -100,7 +100,7 @@ type
     # for 'awaitAny' support
     ai: ptr AwaitInfo
     idx: int
-    data: pointer  # we incRef and unref it to keep it alive
+    data: RootRef  # we incRef and unref it to keep it alive
     owner: pointer # ptr Worker
 
   FlowVarObj[T] = object of FlowVarBaseObj
@@ -112,7 +112,7 @@ type
     len: int
     lock: TLock
     empty: TCond
-    data: array[512, pointer]
+    data: array[2, pointer]
 
   WorkerProc = proc (thread, args: pointer) {.nimcall, gcsafe.}
   Worker = object
@@ -134,6 +134,22 @@ proc await*(fv: FlowVarBase) =
     await(fv.cv)
     destroyCondVar(fv.cv)
 
+proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool =
+  if cas(addr w.ready, true, false):
+    w.data = data
+    w.f = fn
+    signal(w.taskArrived)
+    await(w.taskStarted)
+    result = true
+
+proc wakeupWorkerToProcessQueue(w: ptr Worker) =
+  # Note that if this fails somebody else already woke up the thread so it's
+  # perfectly fine to do nothing:
+  if cas(addr w.ready, true, false):
+    w.data = nil
+    w.f = proc (t, a: pointer) {.nimcall.} = discard
+    signal(w.taskArrived)
+
 proc finished(fv: FlowVarBase) =
   doAssert fv.ai.isNil, "flowVar is still attached to an 'awaitAny'"
   # we have to protect against the rare cases where the owner of the flowVar
@@ -144,18 +160,17 @@ proc finished(fv: FlowVarBase) =
   let owner = cast[ptr Worker](fv.owner)
   let q = addr(owner.q)
   var waited = false
-  while true:
+  acquire(q.lock)
+  while not (q.len < q.data.len):
+    echo "EXHAUSTED!"
+    release(q.lock)
+    wakeupWorkerToProcessQueue(owner)
     acquire(q.lock)
-    if q.len < q.data.len:
-      q.data[q.len] = fv.data
-      inc q.len
-      release(q.lock)
-      break
-    else:
-      # the queue is exhausted! We block until it has been cleaned:
-      release(q.lock)
-      wait(q.empty, q.lock)
-      waited = true
+    wait(q.empty, q.lock)
+    waited = true
+  q.data[q.len] = cast[pointer](fv.data)
+  inc q.len
+  release(q.lock)
   fv.data = nil
   # wakeup other potentially waiting threads:
   if waited: signal(q.empty)
@@ -165,6 +180,7 @@ proc cleanFlowVars(w: ptr Worker) =
   acquire(q.lock)
   for i in 0 .. <q.len:
     GC_unref(cast[RootRef](q.data[i]))
+    echo "GC_unref"
   q.len = 0
   release(q.lock)
   signal(q.empty)
@@ -331,14 +347,6 @@ var
 
 initLock stateLock
 
-proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool =
-  if cas(addr w.ready, true, false):
-    w.data = data
-    w.f = fn
-    signal(w.taskArrived)
-    await(w.taskStarted)
-    result = true
-
 proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} =
   # implementation of 'spawn' that is used by the code generator.
   while true:
diff --git a/tests/parallel/tconvexhull.nim b/tests/parallel/tconvexhull.nim
new file mode 100644
index 000000000..c97fed39b
--- /dev/null
+++ b/tests/parallel/tconvexhull.nim
@@ -0,0 +1,62 @@
+discard """
+  output: '''true
+true
+true
+true
+true
+true'''
+"""
+
+# parallel convex hull for Nim bigbreak
+# nim c --threads:on -d:release pconvex_hull.nim
+import algorithm, sequtils, threadpool
+
+type Point = tuple[x, y: float]
+
+proc cmpPoint(a, b: Point): int =
+  result = cmp(a.x, b.x)
+  if result == 0:
+    result = cmp(a.y, b.y)
+
+template cross[T](o, a, b: T): expr =
+  (a.x - o.x) * (b.y - o.y) - (a.y - o.y) * (b.x - o.x)
+
+template pro(): expr =
+  while lr1 > 0 and cross(result[lr1 - 1], result[lr1], p[i]) <= 0:
+    discard result.pop
+    lr1 -= 1
+  result.add(p[i])
+  lr1 += 1
+
+proc half[T](p: seq[T]; upper: bool): seq[T] =
+  var i, lr1: int
+  result = @[]
+  lr1 = -1
+  if upper:
+    i = 0
+    while i <= high(p):
+      pro()
+      i += 1
+  else:
+    i = high(p)
+    while i >= low(p):
+      pro()
+      i -= 1
+  discard result.pop
+
+proc convex_hull[T](points: var seq[T], cmp: proc(x, y: T): int {.closure.}) : seq[T] =
+  if len(points) < 2: return points
+  points.sort(cmp)
+  var ul: array[2, FlowVar[seq[T]]]
+  parallel:
+    for k in 0..ul.high:
+      ul[k] = spawn half[T](points, k == 0)
+  result = concat(^ul[0], ^ul[1])
+
+var s = map(toSeq(0..999999), proc(x: int): Point = (float(x div 1000), float(x mod 1000)))
+setMaxPoolSize 2
+
+#echo convex_hull[Point](s, cmpPoint)
+for i in 0..5:
+  echo convex_hull[Point](s, cmpPoint) ==
+      @[(0.0, 0.0), (999.0, 0.0), (999.0, 999.0), (0.0, 999.0)]