summary refs log tree commit diff stats
path: root/lib/pure/concurrency
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pure/concurrency')
-rw-r--r--lib/pure/concurrency/cpuinfo.nim11
-rw-r--r--lib/pure/concurrency/threadpool.nim24
2 files changed, 26 insertions, 9 deletions
diff --git a/lib/pure/concurrency/cpuinfo.nim b/lib/pure/concurrency/cpuinfo.nim
index dfa819f64..8d7f28f8e 100644
--- a/lib/pure/concurrency/cpuinfo.nim
+++ b/lib/pure/concurrency/cpuinfo.nim
@@ -18,15 +18,24 @@ when not defined(windows):
 
 when defined(linux):
   import linux
+  
+when defined(freebsd) or defined(macosx):
+  {.emit:"#include <sys/types.h>".}
+
+when defined(openbsd) or defined(netbsd):
+  {.emit:"#include <sys/param.h>".}
 
 when defined(macosx) or defined(bsd):
+  # we HAVE to emit param.h before sysctl.h so we cannot use .header here
+  # either. The amount of archaic bullshit in Poonix based OSes is just insane.
+  {.emit:"#include <sys/sysctl.h>".}
   const
     CTL_HW = 6
     HW_AVAILCPU = 25
     HW_NCPU = 3
   proc sysctl(x: ptr array[0..3, cint], y: cint, z: pointer,
               a: var csize, b: pointer, c: int): cint {.
-             importc: "sysctl", header: "<sys/sysctl.h>".}
+              importc: "sysctl", nodecl.}
 
 proc countProcessors*(): int {.rtl, extern: "ncpi$1".} =
   ## returns the numer of the processors/cores the machine has.
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim
index fd1041918..f46822d94 100644
--- a/lib/pure/concurrency/threadpool.nim
+++ b/lib/pure/concurrency/threadpool.nim
@@ -9,6 +9,9 @@
 
 ## Implements Nimrod's 'spawn'.
 
+when not compileOption("threads"):
+  {.error: "Threadpool requires --threads:on option.".}
+
 import cpuinfo, cpuload, locks
 
 {.push stackTrace:off.}
@@ -92,7 +95,7 @@ type
 
   FlowVarBase* = ref FlowVarBaseObj ## untyped base class for 'FlowVar[T]'
   FlowVarBaseObj = object of TObject
-    ready, usesCondVar: bool
+    ready, usesCondVar, awaited: bool
     cv: CondVar #\
     # for 'awaitAny' support
     ai: ptr AwaitInfo
@@ -126,15 +129,15 @@ type
 proc await*(fv: FlowVarBase) =
   ## waits until the value for the flowVar arrives. Usually it is not necessary
   ## to call this explicitly.
-  if fv.usesCondVar:
-    fv.usesCondVar = false
+  if fv.usesCondVar and not fv.awaited:
+    fv.awaited = true
     await(fv.cv)
     destroyCondVar(fv.cv)
 
 proc finished(fv: FlowVarBase) =
   doAssert fv.ai.isNil, "flowVar is still attached to an 'awaitAny'"
   # we have to protect against the rare cases where the owner of the flowVar
-  # simply disregards the flowVar and yet the "flowVarr" has not yet written
+  # simply disregards the flowVar and yet the "flowVar" has not yet written
   # anything to it:
   await(fv)
   if fv.data.isNil: return
@@ -207,6 +210,7 @@ proc `^`*[T](fv: FlowVar[T]): T =
   ## blocks until the value is available and then returns this value.
   await(fv)
   when T is string or T is seq:
+    # XXX closures? deepCopy?
     result = cast[T](fv.data)
   else:
     result = fv.blob
@@ -264,6 +268,10 @@ proc slave(w: ptr Worker) {.thread.} =
       w.shutdown = false
       atomicDec currentPoolSize
 
+var
+  workers: array[MaxThreadPoolSize, TThread[ptr Worker]]
+  workersData: array[MaxThreadPoolSize, Worker]
+
 proc setMinPoolSize*(size: range[1..MaxThreadPoolSize]) =
   ## sets the minimal thread pool size. The default value of this is 4.
   minPoolSize = size
@@ -272,10 +280,10 @@ proc setMaxPoolSize*(size: range[1..MaxThreadPoolSize]) =
   ## sets the minimal thread pool size. The default value of this
   ## is ``MaxThreadPoolSize``.
   maxPoolSize = size
-
-var
-  workers: array[MaxThreadPoolSize, TThread[ptr Worker]]
-  workersData: array[MaxThreadPoolSize, Worker]
+  if currentPoolSize > maxPoolSize:
+    for i in maxPoolSize..currentPoolSize-1:
+      let w = addr(workersData[i])
+      w.shutdown = true
 
 proc activateThread(i: int) {.noinline.} =
   workersData[i].taskArrived = createCondVar()