summary refs log tree commit diff stats
path: root/lib/pure/concurrency
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pure/concurrency')
-rw-r--r--lib/pure/concurrency/atomics.nim433
-rw-r--r--lib/pure/concurrency/cpuinfo.nim139
-rw-r--r--lib/pure/concurrency/cpuload.nim56
-rw-r--r--lib/pure/concurrency/threadpool.nim453
4 files changed, 886 insertions, 195 deletions
diff --git a/lib/pure/concurrency/atomics.nim b/lib/pure/concurrency/atomics.nim
new file mode 100644
index 000000000..818f1b37a
--- /dev/null
+++ b/lib/pure/concurrency/atomics.nim
@@ -0,0 +1,433 @@
+#
+#
+#            Nim's Runtime Library
+#        (c) Copyright 2018 Jörg Wollenschläger
+#
+#    See the file "copying.txt", included in this
+#    distribution, for details about the copyright.
+#
+
+## Types and operations for atomic operations and lockless algorithms.
+##
+## Unstable API.
+## 
+## By default, C++ uses C11 atomic primitives. To use C++ `std::atomic`,
+## `-d:nimUseCppAtomics` can be defined.
+
+runnableExamples:
+  # Atomic
+  var loc: Atomic[int]
+  loc.store(4)
+  assert loc.load == 4
+  loc.store(2)
+  assert loc.load(moRelaxed) == 2
+  loc.store(9)
+  assert loc.load(moAcquire) == 9
+  loc.store(0, moRelease)
+  assert loc.load == 0
+
+  assert loc.exchange(7) == 0
+  assert loc.load == 7
+
+  var expected = 7
+  assert loc.compareExchange(expected, 5, moRelaxed, moRelaxed)
+  assert expected == 7
+  assert loc.load == 5
+
+  assert not loc.compareExchange(expected, 12, moRelaxed, moRelaxed)
+  assert expected == 5
+  assert loc.load == 5
+
+  assert loc.fetchAdd(1) == 5
+  assert loc.fetchAdd(2) == 6
+  assert loc.fetchSub(3) == 8
+
+  loc.atomicInc(1)
+  assert loc.load == 6
+
+  # AtomicFlag
+  var flag: AtomicFlag
+
+  assert not flag.testAndSet
+  assert flag.testAndSet
+  flag.clear(moRelaxed)
+  assert not flag.testAndSet
+
+when (defined(cpp) and defined(nimUseCppAtomics)) or defined(nimdoc):
+  # For the C++ backend, types and operations map directly to C++11 atomics.
+
+  {.push, header: "<atomic>".}
+
+  type
+    MemoryOrder* {.importcpp: "std::memory_order".} = enum
+      ## Specifies how non-atomic operations can be reordered around atomic
+      ## operations.
+
+      moRelaxed
+        ## No ordering constraints. Only the atomicity and ordering against
+        ## other atomic operations is guaranteed.
+
+      moConsume
+        ## This ordering is currently discouraged as it's semantics are
+        ## being revised. Acquire operations should be preferred.
+
+      moAcquire
+        ## When applied to a load operation, no reads or writes in the
+        ## current thread can be reordered before this operation.
+
+      moRelease
+        ## When applied to a store operation, no reads or writes in the
+        ## current thread can be reorderd after this operation.
+
+      moAcquireRelease
+        ## When applied to a read-modify-write operation, this behaves like
+        ## both an acquire and a release operation.
+
+      moSequentiallyConsistent
+        ## Behaves like Acquire when applied to load, like Release when
+        ## applied to a store and like AcquireRelease when applied to a
+        ## read-modify-write operation.
+        ## Also guarantees that all threads observe the same total ordering
+        ## with other moSequentiallyConsistent operations.
+
+  type
+    Atomic*[T] {.importcpp: "std::atomic", completeStruct.} = object
+      ## An atomic object with underlying type `T`.
+      raw: T
+
+    AtomicFlag* {.importcpp: "std::atomic_flag", size: 1.} = object
+      ## An atomic boolean state.
+
+  # Access operations
+
+  proc load*[T](location: var Atomic[T]; order: MemoryOrder = moSequentiallyConsistent): T {.importcpp: "#.load(@)".}
+    ## Atomically obtains the value of the atomic object.
+
+  proc store*[T](location: var Atomic[T]; desired: T; order: MemoryOrder = moSequentiallyConsistent) {.importcpp: "#.store(@)".}
+    ## Atomically replaces the value of the atomic object with the `desired`
+    ## value.
+
+  proc exchange*[T](location: var Atomic[T]; desired: T; order: MemoryOrder = moSequentiallyConsistent): T {.importcpp: "#.exchange(@)".}
+    ## Atomically replaces the value of the atomic object with the `desired`
+    ## value and returns the old value.
+
+  proc compareExchange*[T](location: var Atomic[T]; expected: var T; desired: T; order: MemoryOrder = moSequentiallyConsistent): bool {.importcpp: "#.compare_exchange_strong(@)".}
+    ## Atomically compares the value of the atomic object with the `expected`
+    ## value and performs exchange with the `desired` one if equal or load if
+    ## not. Returns true if the exchange was successful.
+
+  proc compareExchange*[T](location: var Atomic[T]; expected: var T; desired: T; success, failure: MemoryOrder): bool {.importcpp: "#.compare_exchange_strong(@)".}
+    ## Same as above, but allows for different memory orders for success and
+    ## failure.
+
+  proc compareExchangeWeak*[T](location: var Atomic[T]; expected: var T; desired: T; order: MemoryOrder = moSequentiallyConsistent): bool {.importcpp: "#.compare_exchange_weak(@)".}
+    ## Same as above, but is allowed to fail spuriously.
+
+  proc compareExchangeWeak*[T](location: var Atomic[T]; expected: var T; desired: T; success, failure: MemoryOrder): bool {.importcpp: "#.compare_exchange_weak(@)".}
+    ## Same as above, but allows for different memory orders for success and
+    ## failure.
+
+  # Numerical operations
+
+  proc fetchAdd*[T: SomeInteger](location: var Atomic[T]; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importcpp: "#.fetch_add(@)".}
+    ## Atomically adds a `value` to the atomic integer and returns the
+    ## original value.
+
+  proc fetchSub*[T: SomeInteger](location: var Atomic[T]; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importcpp: "#.fetch_sub(@)".}
+    ## Atomically subtracts a `value` to the atomic integer and returns the
+    ## original value.
+
+  proc fetchAnd*[T: SomeInteger](location: var Atomic[T]; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importcpp: "#.fetch_and(@)".}
+    ## Atomically replaces the atomic integer with it's bitwise AND
+    ## with the specified `value` and returns the original value.
+
+  proc fetchOr*[T: SomeInteger](location: var Atomic[T]; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importcpp: "#.fetch_or(@)".}
+    ## Atomically replaces the atomic integer with it's bitwise OR
+    ## with the specified `value` and returns the original value.
+
+  proc fetchXor*[T: SomeInteger](location: var Atomic[T]; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importcpp: "#.fetch_xor(@)".}
+    ## Atomically replaces the atomic integer with it's bitwise XOR
+    ## with the specified `value` and returns the original value.
+
+  # Flag operations
+
+  proc testAndSet*(location: var AtomicFlag; order: MemoryOrder = moSequentiallyConsistent): bool {.importcpp: "#.test_and_set(@)".}
+    ## Atomically sets the atomic flag to true and returns the original value.
+
+  proc clear*(location: var AtomicFlag; order: MemoryOrder = moSequentiallyConsistent) {.importcpp: "#.clear(@)".}
+    ## Atomically sets the value of the atomic flag to false.
+
+  proc fence*(order: MemoryOrder) {.importcpp: "std::atomic_thread_fence(@)".}
+    ## Ensures memory ordering without using atomic operations.
+
+  proc signalFence*(order: MemoryOrder) {.importcpp: "std::atomic_signal_fence(@)".}
+    ## Prevents reordering of accesses by the compiler as would fence, but
+    ## inserts no CPU instructions for memory ordering.
+
+  {.pop.}
+
+else:
+  # For the C backend, atomics map to C11 built-ins on GCC and Clang for
+  # trivial Nim types. Other types are implemented using spin locks.
+  # This could be overcome by supporting advanced importc-patterns.
+
+  # Since MSVC does not implement C11, we fall back to MS intrinsics
+  # where available.
+
+  type
+    Trivial = SomeNumber | bool | enum | ptr | pointer
+      # A type that is known to be atomic and whose size is known at
+      # compile time to be 8 bytes or less
+
+  template nonAtomicType*(T: typedesc[Trivial]): untyped =
+    # Maps types to integers of the same size
+    when sizeof(T) == 1: int8
+    elif sizeof(T) == 2: int16
+    elif sizeof(T) == 4: int32
+    elif sizeof(T) == 8: int64
+
+  when defined(vcc):
+
+    # TODO: Trivial types should be volatile and use VC's special volatile
+    # semantics for store and loads.
+
+    type
+      MemoryOrder* = enum
+        moRelaxed
+        moConsume
+        moAcquire
+        moRelease
+        moAcquireRelease
+        moSequentiallyConsistent
+
+      Atomic*[T] = object
+        when T is Trivial:
+          value: T.nonAtomicType
+        else:
+          nonAtomicValue: T
+          guard: AtomicFlag
+
+      AtomicFlag* = distinct int8
+
+    {.push header: "<intrin.h>".}
+
+    # MSVC intrinsics
+    proc interlockedExchange(location: pointer; desired: int8): int8 {.importc: "_InterlockedExchange8".}
+    proc interlockedExchange(location: pointer; desired: int16): int16 {.importc: "_InterlockedExchange16".}
+    proc interlockedExchange(location: pointer; desired: int32): int32 {.importc: "_InterlockedExchange".}
+    proc interlockedExchange(location: pointer; desired: int64): int64 {.importc: "_InterlockedExchange64".}
+
+    proc interlockedCompareExchange(location: pointer; desired, expected: int8): int8 {.importc: "_InterlockedCompareExchange8".}
+    proc interlockedCompareExchange(location: pointer; desired, expected: int16): int16 {.importc: "_InterlockedCompareExchange16".}
+    proc interlockedCompareExchange(location: pointer; desired, expected: int32): int32 {.importc: "_InterlockedCompareExchange".}
+    proc interlockedCompareExchange(location: pointer; desired, expected: int64): int64 {.importc: "_InterlockedCompareExchange64".}
+
+    proc interlockedAnd(location: pointer; value: int8): int8 {.importc: "_InterlockedAnd8".}
+    proc interlockedAnd(location: pointer; value: int16): int16 {.importc: "_InterlockedAnd16".}
+    proc interlockedAnd(location: pointer; value: int32): int32 {.importc: "_InterlockedAnd".}
+    proc interlockedAnd(location: pointer; value: int64): int64 {.importc: "_InterlockedAnd64".}
+
+    proc interlockedOr(location: pointer; value: int8): int8 {.importc: "_InterlockedOr8".}
+    proc interlockedOr(location: pointer; value: int16): int16 {.importc: "_InterlockedOr16".}
+    proc interlockedOr(location: pointer; value: int32): int32 {.importc: "_InterlockedOr".}
+    proc interlockedOr(location: pointer; value: int64): int64 {.importc: "_InterlockedOr64".}
+
+    proc interlockedXor(location: pointer; value: int8): int8 {.importc: "_InterlockedXor8".}
+    proc interlockedXor(location: pointer; value: int16): int16 {.importc: "_InterlockedXor16".}
+    proc interlockedXor(location: pointer; value: int32): int32 {.importc: "_InterlockedXor".}
+    proc interlockedXor(location: pointer; value: int64): int64 {.importc: "_InterlockedXor64".}
+
+    proc fence(order: MemoryOrder): int64 {.importc: "_ReadWriteBarrier()".}
+    proc signalFence(order: MemoryOrder): int64 {.importc: "_ReadWriteBarrier()".}
+
+    {.pop.}
+
+    proc testAndSet*(location: var AtomicFlag; order: MemoryOrder = moSequentiallyConsistent): bool =
+      interlockedOr(addr(location), 1'i8) == 1'i8
+    proc clear*(location: var AtomicFlag; order: MemoryOrder = moSequentiallyConsistent) =
+      discard interlockedAnd(addr(location), 0'i8)
+
+    proc load*[T: Trivial](location: var Atomic[T]; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} =
+      cast[T](interlockedOr(addr(location.value), (nonAtomicType(T))0))
+    proc store*[T: Trivial](location: var Atomic[T]; desired: T; order: MemoryOrder = moSequentiallyConsistent) {.inline.} =
+      discard interlockedExchange(addr(location.value), cast[nonAtomicType(T)](desired))
+
+    proc exchange*[T: Trivial](location: var Atomic[T]; desired: T; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} =
+      cast[T](interlockedExchange(addr(location.value), cast[int64](desired)))
+    proc compareExchange*[T: Trivial](location: var Atomic[T]; expected: var T; desired: T; success, failure: MemoryOrder): bool {.inline.} =
+      cast[T](interlockedCompareExchange(addr(location.value), cast[nonAtomicType(T)](desired), cast[nonAtomicType(T)](expected))) == expected
+    proc compareExchange*[T: Trivial](location: var Atomic[T]; expected: var T; desired: T; order: MemoryOrder = moSequentiallyConsistent): bool {.inline.} =
+      compareExchange(location, expected, desired, order, order)
+    proc compareExchangeWeak*[T: Trivial](location: var Atomic[T]; expected: var T; desired: T; success, failure: MemoryOrder): bool {.inline.} =
+      compareExchange(location, expected, desired, success, failure)
+    proc compareExchangeWeak*[T: Trivial](location: var Atomic[T]; expected: var T; desired: T; order: MemoryOrder = moSequentiallyConsistent): bool {.inline.} =
+      compareExchangeWeak(location, expected, desired, order, order)
+
+    proc fetchAdd*[T: SomeInteger](location: var Atomic[T]; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} =
+      var currentValue = location.load()
+      while not compareExchangeWeak(location, currentValue, currentValue + value): discard
+    proc fetchSub*[T: SomeInteger](location: var Atomic[T]; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} =
+      fetchAdd(location, -value, order)
+    proc fetchAnd*[T: SomeInteger](location: var Atomic[T]; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} =
+      cast[T](interlockedAnd(addr(location.value), cast[nonAtomicType(T)](value)))
+    proc fetchOr*[T: SomeInteger](location: var Atomic[T]; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} =
+      cast[T](interlockedOr(addr(location.value), cast[nonAtomicType(T)](value)))
+    proc fetchXor*[T: SomeInteger](location: var Atomic[T]; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} =
+      cast[T](interlockedXor(addr(location.value), cast[nonAtomicType(T)](value)))
+
+  else:
+    when defined(cpp):
+      {.push, header: "<atomic>".}
+      template maybeWrapStd(x: string): string =
+        "std::" & x
+    else:
+      {.push, header: "<stdatomic.h>".}
+      template maybeWrapStd(x: string): string =
+        x
+
+    type
+      MemoryOrder* {.importc: "memory_order".maybeWrapStd.} = enum
+        moRelaxed
+        moConsume
+        moAcquire
+        moRelease
+        moAcquireRelease
+        moSequentiallyConsistent
+
+    when defined(cpp):
+      type
+        # Atomic*[T] {.importcpp: "_Atomic('0)".} = object
+
+        AtomicInt8 {.importc: "std::atomic<NI8>".} = int8
+        AtomicInt16 {.importc: "std::atomic<NI16>".} = int16
+        AtomicInt32 {.importc: "std::atomic<NI32>".} = int32
+        AtomicInt64 {.importc: "std::atomic<NI64>".} = int64
+    else:
+      type
+        # Atomic*[T] {.importcpp: "_Atomic('0)".} = object
+
+        AtomicInt8 {.importc: "_Atomic NI8".} = int8
+        AtomicInt16 {.importc: "_Atomic NI16".} = int16
+        AtomicInt32 {.importc: "_Atomic NI32".} = int32
+        AtomicInt64 {.importc: "_Atomic NI64".} = int64
+
+    type
+      AtomicFlag* {.importc: "atomic_flag".maybeWrapStd, size: 1.} = object
+
+      Atomic*[T] = object
+        when T is Trivial:
+          # Maps the size of a trivial type to it's internal atomic type
+          when sizeof(T) == 1: value: AtomicInt8
+          elif sizeof(T) == 2: value: AtomicInt16
+          elif sizeof(T) == 4: value: AtomicInt32
+          elif sizeof(T) == 8: value: AtomicInt64
+        else:
+          nonAtomicValue: T
+          guard: AtomicFlag
+
+    #proc init*[T](location: var Atomic[T]; value: T): T {.importcpp: "atomic_init(@)".}
+    proc atomic_load_explicit[T, A](location: ptr A; order: MemoryOrder): T {.importc: "atomic_load_explicit".maybeWrapStd.}
+    proc atomic_store_explicit[T, A](location: ptr A; desired: T; order: MemoryOrder = moSequentiallyConsistent) {.importc: "atomic_store_explicit".maybeWrapStd.}
+    proc atomic_exchange_explicit[T, A](location: ptr A; desired: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc: "atomic_exchange_explicit".maybeWrapStd.}
+    proc atomic_compare_exchange_strong_explicit[T, A](location: ptr A; expected: ptr T; desired: T; success, failure: MemoryOrder): bool {.importc: "atomic_compare_exchange_strong_explicit".maybeWrapStd.}
+    proc atomic_compare_exchange_weak_explicit[T, A](location: ptr A; expected: ptr T; desired: T; success, failure: MemoryOrder): bool {.importc: "atomic_compare_exchange_weak_explicit".maybeWrapStd.}
+
+    # Numerical operations
+    proc atomic_fetch_add_explicit[T, A](location: ptr A; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc: "atomic_fetch_add_explicit".maybeWrapStd.}
+    proc atomic_fetch_sub_explicit[T, A](location: ptr A; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc: "atomic_fetch_sub_explicit".maybeWrapStd.}
+    proc atomic_fetch_and_explicit[T, A](location: ptr A; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc: "atomic_fetch_and_explicit".maybeWrapStd.}
+    proc atomic_fetch_or_explicit[T, A](location: ptr A; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc: "atomic_fetch_or_explicit".maybeWrapStd.}
+    proc atomic_fetch_xor_explicit[T, A](location: ptr A; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc: "atomic_fetch_xor_explicit".maybeWrapStd.}
+
+    # Flag operations
+    # var ATOMIC_FLAG_INIT {.importc, nodecl.}: AtomicFlag
+    # proc init*(location: var AtomicFlag) {.inline.} = location = ATOMIC_FLAG_INIT
+    proc testAndSet*(location: var AtomicFlag; order: MemoryOrder = moSequentiallyConsistent): bool {.importc: "atomic_flag_test_and_set_explicit".maybeWrapStd.}
+    proc clear*(location: var AtomicFlag; order: MemoryOrder = moSequentiallyConsistent) {.importc: "atomic_flag_clear_explicit".maybeWrapStd.}
+
+    proc fence*(order: MemoryOrder) {.importc: "atomic_thread_fence".maybeWrapStd.}
+    proc signalFence*(order: MemoryOrder) {.importc: "atomic_signal_fence".maybeWrapStd.}
+
+    {.pop.}
+
+    proc load*[T: Trivial](location: var Atomic[T]; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} =
+      cast[T](atomic_load_explicit[nonAtomicType(T), typeof(location.value)](addr(location.value), order))
+    proc store*[T: Trivial](location: var Atomic[T]; desired: T; order: MemoryOrder = moSequentiallyConsistent) {.inline.} =
+      atomic_store_explicit(addr(location.value), cast[nonAtomicType(T)](desired), order)
+    proc exchange*[T: Trivial](location: var Atomic[T]; desired: T; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} =
+      cast[T](atomic_exchange_explicit(addr(location.value), cast[nonAtomicType(T)](desired), order))
+    proc compareExchange*[T: Trivial](location: var Atomic[T]; expected: var T; desired: T; success, failure: MemoryOrder): bool {.inline.} =
+      atomic_compare_exchange_strong_explicit(addr(location.value), cast[ptr nonAtomicType(T)](addr(expected)), cast[nonAtomicType(T)](desired), success, failure)
+    proc compareExchange*[T: Trivial](location: var Atomic[T]; expected: var T; desired: T; order: MemoryOrder = moSequentiallyConsistent): bool {.inline.} =
+      compareExchange(location, expected, desired, order, order)
+
+    proc compareExchangeWeak*[T: Trivial](location: var Atomic[T]; expected: var T; desired: T; success, failure: MemoryOrder): bool {.inline.} =
+      atomic_compare_exchange_weak_explicit(addr(location.value), cast[ptr nonAtomicType(T)](addr(expected)), cast[nonAtomicType(T)](desired), success, failure)
+    proc compareExchangeWeak*[T: Trivial](location: var Atomic[T]; expected: var T; desired: T; order: MemoryOrder = moSequentiallyConsistent): bool {.inline.} =
+      compareExchangeWeak(location, expected, desired, order, order)
+
+    # Numerical operations
+    proc fetchAdd*[T: SomeInteger](location: var Atomic[T]; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} =
+      cast[T](atomic_fetch_add_explicit(addr(location.value), cast[nonAtomicType(T)](value), order))
+    proc fetchSub*[T: SomeInteger](location: var Atomic[T]; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} =
+      cast[T](atomic_fetch_sub_explicit(addr(location.value), cast[nonAtomicType(T)](value), order))
+    proc fetchAnd*[T: SomeInteger](location: var Atomic[T]; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} =
+      cast[T](atomic_fetch_and_explicit(addr(location.value), cast[nonAtomicType(T)](value), order))
+    proc fetchOr*[T: SomeInteger](location: var Atomic[T]; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} =
+      cast[T](atomic_fetch_or_explicit(addr(location.value), cast[nonAtomicType(T)](value), order))
+    proc fetchXor*[T: SomeInteger](location: var Atomic[T]; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} =
+      cast[T](atomic_fetch_xor_explicit(addr(location.value), cast[nonAtomicType(T)](value), order))
+
+  template withLock[T: not Trivial](location: var Atomic[T]; order: MemoryOrder; body: untyped): untyped =
+    while testAndSet(location.guard, moAcquire): discard
+    try:
+      body
+    finally:
+      clear(location.guard, moRelease)
+
+  proc load*[T: not Trivial](location: var Atomic[T]; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} =
+    withLock(location, order):
+      result = location.nonAtomicValue
+
+  proc store*[T: not Trivial](location: var Atomic[T]; desired: T; order: MemoryOrder = moSequentiallyConsistent) {.inline.} =
+    withLock(location, order):
+      location.nonAtomicValue = desired
+
+  proc exchange*[T: not Trivial](location: var Atomic[T]; desired: T; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} =
+    withLock(location, order):
+      result = location.nonAtomicValue
+      location.nonAtomicValue = desired
+
+  proc compareExchange*[T: not Trivial](location: var Atomic[T]; expected: var T; desired: T; success, failure: MemoryOrder): bool {.inline.} =
+    withLock(location, success):
+      if location.nonAtomicValue != expected:
+        expected = location.nonAtomicValue
+        return false
+      expected = desired
+      swap(location.nonAtomicValue, expected)
+      return true
+
+  proc compareExchangeWeak*[T: not Trivial](location: var Atomic[T]; expected: var T; desired: T; success, failure: MemoryOrder): bool {.inline.} =
+    compareExchange(location, expected, desired, success, failure)
+
+  proc compareExchange*[T: not Trivial](location: var Atomic[T]; expected: var T; desired: T; order: MemoryOrder = moSequentiallyConsistent): bool {.inline.} =
+    compareExchange(location, expected, desired, order, order)
+
+  proc compareExchangeWeak*[T: not Trivial](location: var Atomic[T]; expected: var T; desired: T; order: MemoryOrder = moSequentiallyConsistent): bool {.inline.} =
+    compareExchangeWeak(location, expected, desired, order, order)
+
+proc atomicInc*[T: SomeInteger](location: var Atomic[T]; value: T = 1) {.inline.} =
+  ## Atomically increments the atomic integer by some `value`.
+  discard location.fetchAdd(value)
+
+proc atomicDec*[T: SomeInteger](location: var Atomic[T]; value: T = 1) {.inline.} =
+  ## Atomically decrements the atomic integer by some `value`.
+  discard location.fetchSub(value)
+
+proc `+=`*[T: SomeInteger](location: var Atomic[T]; value: T) {.inline.} =
+  ## Atomically increments the atomic integer by some `value`.
+  discard location.fetchAdd(value)
+
+proc `-=`*[T: SomeInteger](location: var Atomic[T]; value: T) {.inline.} =
+  ## Atomically decrements the atomic integer by some `value`.
+  discard location.fetchSub(value)
diff --git a/lib/pure/concurrency/cpuinfo.nim b/lib/pure/concurrency/cpuinfo.nim
index 6f2bc4491..9bc3fd579 100644
--- a/lib/pure/concurrency/cpuinfo.nim
+++ b/lib/pure/concurrency/cpuinfo.nim
@@ -7,61 +7,104 @@
 #    distribution, for details about the copyright.
 #
 
-## This module implements procs to determine the number of CPUs / cores.
+## This module implements a proc to determine the number of CPUs / cores.
+
+runnableExamples:
+  doAssert countProcessors() > 0
+
 
 include "system/inclrtl"
 
-import strutils, os
+when defined(js):
+  import std/jsffi
+  proc countProcessorsImpl(): int =
+    when defined(nodejs):
+      let jsOs = require("os")
+      let jsObj = jsOs.cpus().length
+    else:
+      # `navigator.hardwareConcurrency`
+      # works on browser as well as deno.
+      let navigator{.importcpp.}: JsObject
+      let jsObj = navigator.hardwareConcurrency
+    result = jsObj.to int
+else:
+  when defined(posix) and not (defined(macosx) or defined(bsd)):
+    import std/posix
+
+  when defined(windows):
+    import std/private/win_getsysteminfo
+
+  when defined(freebsd) or defined(macosx):
+    {.emit: "#include <sys/types.h>".}
+
+  when defined(openbsd) or defined(netbsd):
+    {.emit: "#include <sys/param.h>".}
 
-when not defined(windows):
-  import posix
+  when defined(macosx) or defined(bsd):
+    # we HAVE to emit param.h before sysctl.h so we cannot use .header here
+    # either. The amount of archaic bullshit in Poonix based OSes is just insane.
+    {.emit: "#include <sys/sysctl.h>".}
+    {.push nodecl.}
+    when defined(macosx):
+      proc sysctlbyname(name: cstring,
+        oldp: pointer, oldlenp: var csize_t,
+        newp: pointer, newlen: csize_t): cint {.importc.}
+    let
+      CTL_HW{.importc.}: cint
+      HW_NCPU{.importc.}: cint
+    proc sysctl[I: static[int]](name: var array[I, cint], namelen: cuint,
+      oldp: pointer, oldlenp: var csize_t,
+      newp: pointer, newlen: csize_t): cint {.importc.}
+    {.pop.}
 
-when defined(linux):
-  import linux
-  
-when defined(freebsd) or defined(macosx):
-  {.emit:"#include <sys/types.h>".}
+  when defined(genode):
+    import genode/env
+
+    proc affinitySpaceTotal(env: GenodeEnvPtr): cuint {.
+      importcpp: "@->cpu().affinity_space().total()".}
+
+  when defined(haiku):
+    type
+      SystemInfo {.importc: "system_info", header: "<OS.h>".} = object
+        cpuCount {.importc: "cpu_count".}: uint32
+
+    proc getSystemInfo(info: ptr SystemInfo): int32 {.importc: "get_system_info",
+                                                      header: "<OS.h>".}
+
+  proc countProcessorsImpl(): int {.inline.} =
+    when defined(windows):
+      var
+        si: SystemInfo
+      getSystemInfo(addr si)
+      result = int(si.dwNumberOfProcessors)
+    elif defined(macosx) or defined(bsd):
+      let dest = addr result
+      var len = sizeof(result).csize_t
+      when defined(macosx):
+        # alias of "hw.activecpu"
+        if sysctlbyname("hw.logicalcpu", dest, len, nil, 0) == 0:
+          return
+      var mib = [CTL_HW, HW_NCPU]
+      if sysctl(mib, 2, dest, len, nil, 0) == 0:
+        return
+    elif defined(hpux):
+      result = mpctl(MPC_GETNUMSPUS, nil, nil)
+    elif defined(irix):
+      var SC_NPROC_ONLN {.importc: "_SC_NPROC_ONLN", header: "<unistd.h>".}: cint
+      result = sysconf(SC_NPROC_ONLN)
+    elif defined(genode):
+      result = runtimeEnv.affinitySpaceTotal().int
+    elif defined(haiku):
+      var sysinfo: SystemInfo
+      if getSystemInfo(addr sysinfo) == 0:
+        result = sysinfo.cpuCount.int
+    else:
+      result = sysconf(SC_NPROCESSORS_ONLN)
+    if result < 0: result = 0
 
-when defined(openbsd) or defined(netbsd):
-  {.emit:"#include <sys/param.h>".}
 
-when defined(macosx) or defined(bsd):
-  # we HAVE to emit param.h before sysctl.h so we cannot use .header here
-  # either. The amount of archaic bullshit in Poonix based OSes is just insane.
-  {.emit:"#include <sys/sysctl.h>".}
-  const
-    CTL_HW = 6
-    HW_AVAILCPU = 25
-    HW_NCPU = 3
-  proc sysctl(x: ptr array[0..3, cint], y: cint, z: pointer,
-              a: var csize, b: pointer, c: int): cint {.
-              importc: "sysctl", nodecl.}
 
 proc countProcessors*(): int {.rtl, extern: "ncpi$1".} =
-  ## returns the numer of the processors/cores the machine has.
+  ## Returns the number of the processors/cores the machine has.
   ## Returns 0 if it cannot be detected.
-  when defined(windows):
-    var x = getEnv("NUMBER_OF_PROCESSORS")
-    if x.len > 0: result = parseInt(x.string)
-  elif defined(macosx) or defined(bsd):
-    var
-      mib: array[0..3, cint]
-      numCPU: int
-      len: csize
-    mib[0] = CTL_HW
-    mib[1] = HW_AVAILCPU
-    len = sizeof(numCPU)
-    discard sysctl(addr(mib), 2, addr(numCPU), len, nil, 0)
-    if numCPU < 1:
-      mib[1] = HW_NCPU
-      discard sysctl(addr(mib), 2, addr(numCPU), len, nil, 0)
-    result = numCPU
-  elif defined(hpux):
-    result = mpctl(MPC_GETNUMSPUS, nil, nil)
-  elif defined(irix):
-    var SC_NPROC_ONLN {.importc: "_SC_NPROC_ONLN", header: "<unistd.h>".}: cint
-    result = sysconf(SC_NPROC_ONLN)
-  else:
-    result = sysconf(SC_NPROCESSORS_ONLN)
-  if result <= 0: result = 1
-
+  countProcessorsImpl()
diff --git a/lib/pure/concurrency/cpuload.nim b/lib/pure/concurrency/cpuload.nim
index c1796089a..bfbf16721 100644
--- a/lib/pure/concurrency/cpuload.nim
+++ b/lib/pure/concurrency/cpuload.nim
@@ -9,13 +9,18 @@
 
 ## This module implements a helper for a thread pool to determine whether
 ## creating a thread is a good idea.
+##
+## Unstable API.
 
 when defined(windows):
-  import winlean, os, strutils, math
+  import std/[winlean, os, strutils, math]
 
-  proc `-`(a, b: TFILETIME): int64 = a.rdFileTime - b.rdFileTime
+  proc `-`(a, b: FILETIME): int64 = a.rdFileTime - b.rdFileTime
 elif defined(linux):
-  from cpuinfo import countProcessors
+  from std/cpuinfo import countProcessors
+
+when defined(nimPreviewSlimSystem):
+  import std/syncio
 
 type
   ThreadPoolAdvice* = enum
@@ -25,16 +30,16 @@ type
 
   ThreadPoolState* = object
     when defined(windows):
-      prevSysKernel, prevSysUser, prevProcKernel, prevProcUser: TFILETIME
+      prevSysKernel, prevSysUser, prevProcKernel, prevProcUser: FILETIME
     calls*: int
 
 proc advice*(s: var ThreadPoolState): ThreadPoolAdvice =
   when defined(windows):
     var
       sysIdle, sysKernel, sysUser,
-        procCreation, procExit, procKernel, procUser: TFILETIME
+        procCreation, procExit, procKernel, procUser: FILETIME
     if getSystemTimes(sysIdle, sysKernel, sysUser) == 0 or
-        getProcessTimes(THandle(-1), procCreation, procExit, 
+        getProcessTimes(Handle(-1), procCreation, procExit,
                         procKernel, procUser) == 0:
       return doNothing
     if s.calls > 0:
@@ -45,32 +50,35 @@ proc advice*(s: var ThreadPoolState): ThreadPoolAdvice =
         procKernelDiff = procKernel - s.prevProcKernel
         procUserDiff = procUser - s.prevProcUser
 
-        sysTotal = int(sysKernelDiff + sysUserDiff)
-        procTotal = int(procKernelDiff + procUserDiff)
+        sysTotal = sysKernelDiff + sysUserDiff
+        procTotal = procKernelDiff + procUserDiff
       # total CPU usage < 85% --> create a new worker thread.
       # Measurements show that 100% and often even 90% is not reached even
       # if all my cores are busy.
-      if sysTotal == 0 or procTotal / sysTotal < 0.85:
+      if sysTotal == 0 or procTotal.float / sysTotal.float < 0.85:
         result = doCreateThread
     s.prevSysKernel = sysKernel
     s.prevSysUser = sysUser
     s.prevProcKernel = procKernel
     s.prevProcUser = procUser
   elif defined(linux):
-    proc fscanf(c: File, frmt: cstring) {.varargs, importc, 
+    proc fscanf(c: File, frmt: cstring) {.varargs, importc,
       header: "<stdio.h>".}
 
-    var f = open("/proc/loadavg")
-    var b: float
-    var busy, total: int
-    fscanf(f,"%lf %lf %lf %ld/%ld",
-           addr b, addr b, addr b, addr busy, addr total)
-    f.close()
-    let cpus = countProcessors()
-    if busy-1 < cpus:
-      result = doCreateThread
-    elif busy-1 >= cpus*2:
-      result = doShutdownThread
+    var f: File
+    if f.open("/proc/loadavg"):
+      var b: float
+      var busy, total: int
+      fscanf(f,"%lf %lf %lf %ld/%ld",
+            addr b, addr b, addr b, addr busy, addr total)
+      f.close()
+      let cpus = countProcessors()
+      if busy-1 < cpus:
+        result = doCreateThread
+      elif busy-1 >= cpus*2:
+        result = doShutdownThread
+      else:
+        result = doNothing
     else:
       result = doNothing
   else:
@@ -78,10 +86,12 @@ proc advice*(s: var ThreadPoolState): ThreadPoolAdvice =
     result = doNothing
   inc s.calls
 
-when isMainModule:
+when not defined(testing) and isMainModule and not defined(nimdoc):
+  import std/random
+
   proc busyLoop() =
     while true:
-      discard random(80)
+      discard rand(80)
       os.sleep(100)
 
   spawn busyLoop()
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim
index 9f1e53fb8..06ed2fe54 100644
--- a/lib/pure/concurrency/threadpool.nim
+++ b/lib/pure/concurrency/threadpool.nim
@@ -7,30 +7,43 @@
 #    distribution, for details about the copyright.
 #
 
-## Implements Nim's 'spawn'.
+{.deprecated: "use the nimble packages `malebolgia`, `taskpools` or `weave` instead".}
+
+## Implements Nim's `parallel & spawn statements <manual_experimental.html#parallel-amp-spawn>`_.
+##
+## Unstable API.
+##
+## See also
+## ========
+## * `threads module <typedthreads.html>`_ for basic thread support
+## * `locks module <locks.html>`_ for locks and condition variables
+## * `asyncdispatch module <asyncdispatch.html>`_ for asynchronous IO
 
 when not compileOption("threads"):
   {.error: "Threadpool requires --threads:on option.".}
 
-import cpuinfo, cpuload, locks
+import std/[cpuinfo, cpuload, locks, os]
+
+when defined(nimPreviewSlimSystem):
+  import std/[assertions, typedthreads, sysatomics]
 
 {.push stackTrace:off.}
 
 type
   Semaphore = object
-    c: TCond
-    L: TLock
+    c: Cond
+    L: Lock
     counter: int
 
-proc createSemaphore(): Semaphore =
-  initCond(result.c)
-  initLock(result.L)
+proc initSemaphore(cv: var Semaphore) =
+  initCond(cv.c)
+  initLock(cv.L)
 
 proc destroySemaphore(cv: var Semaphore) {.inline.} =
   deinitCond(cv.c)
   deinitLock(cv.L)
 
-proc await(cv: var Semaphore) =
+proc blockUntil(cv: var Semaphore) =
   acquire(cv.L)
   while cv.counter <= 0:
     wait(cv.c, cv.L)
@@ -43,45 +56,42 @@ proc signal(cv: var Semaphore) =
   release(cv.L)
   signal(cv.c)
 
-const CacheLineSize = 32 # true for most archs
+const CacheLineSize = 64 # true for most archs
 
 type
-  Barrier {.compilerProc.} = object
+  Barrier {.compilerproc.} = object
     entered: int
     cv: Semaphore # Semaphore takes 3 words at least
-    when sizeof(int) < 8:
-      cacheAlign: array[CacheLineSize-4*sizeof(int), byte]
-    left: int
-    cacheAlign2: array[CacheLineSize-sizeof(int), byte]
-    interest: bool ## wether the master is interested in the "all done" event
+    left {.align(CacheLineSize).}: int
+    interest {.align(CacheLineSize).} : bool # whether the master is interested in the "all done" event
 
-proc barrierEnter(b: ptr Barrier) {.compilerProc, inline.} =
+proc barrierEnter(b: ptr Barrier) {.compilerproc, inline.} =
   # due to the signaling between threads, it is ensured we are the only
   # one with access to 'entered' so we don't need 'atomicInc' here:
   inc b.entered
   # also we need no 'fence' instructions here as soon 'nimArgsPassingDone'
   # will be called which already will perform a fence for us.
 
-proc barrierLeave(b: ptr Barrier) {.compilerProc, inline.} =
+proc barrierLeave(b: ptr Barrier) {.compilerproc, inline.} =
   atomicInc b.left
   when not defined(x86): fence()
   # We may not have seen the final value of b.entered yet,
   # so we need to check for >= instead of ==.
   if b.interest and b.left >= b.entered: signal(b.cv)
 
-proc openBarrier(b: ptr Barrier) {.compilerProc, inline.} =
+proc openBarrier(b: ptr Barrier) {.compilerproc, inline.} =
   b.entered = 0
   b.left = 0
   b.interest = false
 
-proc closeBarrier(b: ptr Barrier) {.compilerProc.} =
+proc closeBarrier(b: ptr Barrier) {.compilerproc.} =
   fence()
   if b.left != b.entered:
-    b.cv = createSemaphore()
+    b.cv.initSemaphore()
     fence()
     b.interest = true
     fence()
-    while b.left != b.entered: await(b.cv)
+    while b.left != b.entered: blockUntil(b.cv)
     destroySemaphore(b.cv)
 
 {.pop.}
@@ -89,31 +99,28 @@ proc closeBarrier(b: ptr Barrier) {.compilerProc.} =
 # ----------------------------------------------------------------------------
 
 type
-  foreign* = object ## a region that indicates the pointer comes from a
-                    ## foreign thread heap.
   AwaitInfo = object
     cv: Semaphore
     idx: int
 
-  FlowVarBase* = ref FlowVarBaseObj ## untyped base class for 'FlowVar[T]'
-  FlowVarBaseObj = object of RootObj
+  FlowVarBase* = ref FlowVarBaseObj ## Untyped base class for `FlowVar[T] <#FlowVar>`_.
+  FlowVarBaseObj {.acyclic.} = object of RootObj
     ready, usesSemaphore, awaited: bool
-    cv: Semaphore #\
-    # for 'awaitAny' support
+    cv: Semaphore  # for 'blockUntilAny' support
     ai: ptr AwaitInfo
     idx: int
     data: pointer  # we incRef and unref it to keep it alive; note this MUST NOT
                    # be RootRef here otherwise the wrong GC keeps track of it!
     owner: pointer # ptr Worker
 
-  FlowVarObj[T] = object of FlowVarBaseObj
+  FlowVarObj[T] {.acyclic.} = object of FlowVarBaseObj
     blob: T
 
-  FlowVar*{.compilerProc.}[T] = ref FlowVarObj[T] ## a data flow variable
+  FlowVar*[T] {.compilerproc.} = ref FlowVarObj[T] ## A data flow variable.
 
   ToFreeQueue = object
     len: int
-    lock: TLock
+    lock: Lock
     empty: Semaphore
     data: array[128, pointer]
 
@@ -128,13 +135,17 @@ type
     initialized: bool # whether it has even been initialized
     shutdown: bool # the pool requests to shut down this worker thread
     q: ToFreeQueue
+    readyForTask: Semaphore
+
+const threadpoolWaitMs {.intdefine.}: int = 100
 
-proc await*(fv: FlowVarBase) =
-  ## waits until the value for the flowVar arrives. Usually it is not necessary
-  ## to call this explicitly.
+proc blockUntil*(fv: var FlowVarBaseObj) =
+  ## Waits until the value for `fv` arrives.
+  ##
+  ## Usually it is not necessary to call this explicitly.
   if fv.usesSemaphore and not fv.awaited:
     fv.awaited = true
-    await(fv.cv)
+    blockUntil(fv.cv)
     destroySemaphore(fv.cv)
 
 proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool =
@@ -142,13 +153,13 @@ proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool =
     w.data = data
     w.f = fn
     signal(w.taskArrived)
-    await(w.taskStarted)
+    blockUntil(w.taskStarted)
     result = true
 
 proc cleanFlowVars(w: ptr Worker) =
   let q = addr(w.q)
   acquire(q.lock)
-  for i in 0 .. <q.len:
+  for i in 0 ..< q.len:
     GC_unref(cast[RootRef](q.data[i]))
     #echo "GC_unref"
   q.len = 0
@@ -167,12 +178,21 @@ proc wakeupWorkerToProcessQueue(w: ptr Worker) =
     signal(w.q.empty)
   signal(w.taskArrived)
 
-proc finished(fv: FlowVarBase) =
-  doAssert fv.ai.isNil, "flowVar is still attached to an 'awaitAny'"
+proc attach(fv: FlowVarBase; i: int): bool =
+  acquire(fv.cv.L)
+  if fv.cv.counter <= 0:
+    fv.idx = i
+    result = true
+  else:
+    result = false
+  release(fv.cv.L)
+
+proc finished(fv: var FlowVarBaseObj) =
+  doAssert fv.ai.isNil, "flowVar is still attached to an 'blockUntilAny'"
   # we have to protect against the rare cases where the owner of the flowVar
   # simply disregards the flowVar and yet the "flowVar" has not yet written
   # anything to it:
-  await(fv)
+  blockUntil(fv)
   if fv.data.isNil: return
   let owner = cast[ptr Worker](fv.owner)
   let q = addr(owner.q)
@@ -181,23 +201,27 @@ proc finished(fv: FlowVarBase) =
     #echo "EXHAUSTED!"
     release(q.lock)
     wakeupWorkerToProcessQueue(owner)
-    await(q.empty)
+    blockUntil(q.empty)
     acquire(q.lock)
   q.data[q.len] = cast[pointer](fv.data)
   inc q.len
   release(q.lock)
   fv.data = nil
+  # the worker thread waits for "data" to be set to nil before shutting down
+  owner.data = nil
 
-proc fvFinalizer[T](fv: FlowVar[T]) = finished(fv)
+proc `=destroy`[T](fv: var FlowVarObj[T]) =
+  finished(fv)
+  `=destroy`(fv.blob)
 
-proc nimCreateFlowVar[T](): FlowVar[T] {.compilerProc.} =
-  new(result, fvFinalizer)
+proc nimCreateFlowVar[T](): FlowVar[T] {.compilerproc.} =
+  new(result)
 
-proc nimFlowVarCreateSemaphore(fv: FlowVarBase) {.compilerProc.} =
-  fv.cv = createSemaphore()
+proc nimFlowVarCreateSemaphore(fv: FlowVarBase) {.compilerproc.} =
+  fv.cv.initSemaphore()
   fv.usesSemaphore = true
 
-proc nimFlowVarSignal(fv: FlowVarBase) {.compilerProc.} =
+proc nimFlowVarSignal(fv: FlowVarBase) {.compilerproc.} =
   if fv.ai != nil:
     acquire(fv.ai.cv.L)
     fv.ai.idx = fv.idx
@@ -208,169 +232,295 @@ proc nimFlowVarSignal(fv: FlowVarBase) {.compilerProc.} =
     signal(fv.cv)
 
 proc awaitAndThen*[T](fv: FlowVar[T]; action: proc (x: T) {.closure.}) =
-  ## blocks until the ``fv`` is available and then passes its value
-  ## to ``action``. Note that due to Nim's parameter passing semantics this
-  ## means that ``T`` doesn't need to be copied and so ``awaitAndThen`` can
-  ## sometimes be more efficient than ``^``.
-  await(fv)
-  when T is string or T is seq:
+  ## Blocks until `fv` is available and then passes its value
+  ## to `action`.
+  ##
+  ## Note that due to Nim's parameter passing semantics, this
+  ## means that `T` doesn't need to be copied, so `awaitAndThen` can
+  ## sometimes be more efficient than the `^ proc <#^,FlowVar[T]>`_.
+  blockUntil(fv[])
+  when defined(nimV2):
+    action(fv.blob)
+  elif T is string or T is seq:
     action(cast[T](fv.data))
   elif T is ref:
     {.error: "'awaitAndThen' not available for FlowVar[ref]".}
   else:
     action(fv.blob)
-  finished(fv)
+  finished(fv[])
 
-proc `^`*[T](fv: FlowVar[ref T]): foreign ptr T =
-  ## blocks until the value is available and then returns this value.
-  await(fv)
-  result = cast[foreign ptr T](fv.data)
+proc unsafeRead*[T](fv: FlowVar[ref T]): ptr T =
+  ## Blocks until the value is available and then returns this value.
+  blockUntil(fv[])
+  when defined(nimV2):
+    result = cast[ptr T](fv.blob)
+  else:
+    result = cast[ptr T](fv.data)
+  finished(fv[])
 
 proc `^`*[T](fv: FlowVar[T]): T =
-  ## blocks until the value is available and then returns this value.
-  await(fv)
-  when T is string or T is seq:
-    # XXX closures? deepCopy?
-    result = cast[T](fv.data)
+  ## Blocks until the value is available and then returns this value.
+  blockUntil(fv[])
+  when not defined(nimV2) and (T is string or T is seq or T is ref):
+    deepCopy result, cast[T](fv.data)
   else:
     result = fv.blob
-
-proc awaitAny*(flowVars: openArray[FlowVarBase]): int =
-  ## awaits any of the given flowVars. Returns the index of one flowVar for
-  ## which a value arrived. A flowVar only supports one call to 'awaitAny' at
-  ## the same time. That means if you await([a,b]) and await([b,c]) the second
-  ## call will only await 'c'. If there is no flowVar left to be able to wait
-  ## on, -1 is returned.
-  ## **Note**: This results in non-deterministic behaviour and so should be
-  ## avoided.
+  finished(fv[])
+
+proc blockUntilAny*(flowVars: openArray[FlowVarBase]): int =
+  ## Awaits any of the given `flowVars`. Returns the index of one `flowVar`
+  ## for which a value arrived.
+  ##
+  ## A `flowVar` only supports one call to `blockUntilAny` at the same time.
+  ## That means if you `blockUntilAny([a,b])` and `blockUntilAny([b,c])`
+  ## the second call will only block until `c`. If there is no `flowVar` left
+  ## to be able to wait on, -1 is returned.
+  ##
+  ## **Note:** This results in non-deterministic behaviour and should be avoided.
   var ai: AwaitInfo
-  ai.cv = createSemaphore()
+  ai.cv.initSemaphore()
   var conflicts = 0
+  result = -1
   for i in 0 .. flowVars.high:
     if cas(addr flowVars[i].ai, nil, addr ai):
-      flowVars[i].idx = i
+      if not attach(flowVars[i], i):
+        result = i
+        break
     else:
       inc conflicts
   if conflicts < flowVars.len:
-    await(ai.cv)
-    result = ai.idx
+    if result < 0:
+      blockUntil(ai.cv)
+      result = ai.idx
     for i in 0 .. flowVars.high:
       discard cas(addr flowVars[i].ai, addr ai, nil)
-  else:
-    result = -1
   destroySemaphore(ai.cv)
 
-proc nimArgsPassingDone(p: pointer) {.compilerProc.} =
+proc isReady*(fv: FlowVarBase): bool =
+  ## Determines whether the specified `FlowVarBase`'s value is available.
+  ##
+  ## If `true`, awaiting `fv` will not block.
+  if fv.usesSemaphore and not fv.awaited:
+    acquire(fv.cv.L)
+    result = fv.cv.counter > 0
+    release(fv.cv.L)
+  else:
+    result = true
+
+proc nimArgsPassingDone(p: pointer) {.compilerproc.} =
   let w = cast[ptr Worker](p)
   signal(w.taskStarted)
 
 const
-  MaxThreadPoolSize* = 256 ## maximal size of the thread pool. 256 threads
-                           ## should be good enough for anybody ;-)
+  MaxThreadPoolSize* {.intdefine.} = 256 ## Maximum size of the thread pool. 256 threads
+                                         ## should be good enough for anybody ;-)
+  MaxDistinguishedThread* {.intdefine.} = 32 ## Maximum number of "distinguished" threads.
+
+type
+  ThreadId* = range[0..MaxDistinguishedThread-1] ## A thread identifier.
 
 var
   currentPoolSize: int
   maxPoolSize = MaxThreadPoolSize
   minPoolSize = 4
-  gSomeReady = createSemaphore()
+  gSomeReady: Semaphore
   readyWorker: ptr Worker
 
+# A workaround for recursion deadlock issue
+# https://github.com/nim-lang/Nim/issues/4597
+var
+  numSlavesLock: Lock
+  numSlavesRunning {.guard: numSlavesLock.}: int
+  numSlavesWaiting {.guard: numSlavesLock.}: int
+  isSlave {.threadvar.}: bool
+
+numSlavesLock.initLock
+
+gSomeReady.initSemaphore()
+
 proc slave(w: ptr Worker) {.thread.} =
+  isSlave = true
   while true:
+    if w.shutdown:
+      w.shutdown = false
+      atomicDec currentPoolSize
+      while true:
+        if w.data != nil:
+          sleep(threadpoolWaitMs)
+        else:
+          # The flowvar finalizer ("finished()") set w.data to nil, so we can
+          # safely terminate the thread.
+          #
+          # TODO: look for scenarios in which the flowvar is never finalized, so
+          # a shut down thread gets stuck in this loop until the main thread exits.
+          break
+      break
     when declared(atomicStoreN):
       atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST)
     else:
       w.ready = true
     readyWorker = w
     signal(gSomeReady)
-    await(w.taskArrived)
+    blockUntil(w.taskArrived)
+    # XXX Somebody needs to look into this (why does this assertion fail
+    # in Visual Studio?)
+    when not defined(vcc) and not defined(tcc): assert(not w.ready)
+
+    withLock numSlavesLock:
+      inc numSlavesRunning
+
+    w.f(w, w.data)
+
+    withLock numSlavesLock:
+      dec numSlavesRunning
+
+    if w.q.len != 0: w.cleanFlowVars
+
+proc distinguishedSlave(w: ptr Worker) {.thread.} =
+  while true:
+    when declared(atomicStoreN):
+      atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST)
+    else:
+      w.ready = true
+    signal(w.readyForTask)
+    blockUntil(w.taskArrived)
     assert(not w.ready)
     w.f(w, w.data)
     if w.q.len != 0: w.cleanFlowVars
-    if w.shutdown:
-      w.shutdown = false
-      atomicDec currentPoolSize
 
 var
-  workers: array[MaxThreadPoolSize, TThread[ptr Worker]]
+  workers: array[MaxThreadPoolSize, Thread[ptr Worker]]
   workersData: array[MaxThreadPoolSize, Worker]
 
+  distinguished: array[MaxDistinguishedThread, Thread[ptr Worker]]
+  distinguishedData: array[MaxDistinguishedThread, Worker]
+
+when defined(nimPinToCpu):
+  var gCpus: Natural
+
 proc setMinPoolSize*(size: range[1..MaxThreadPoolSize]) =
-  ## sets the minimal thread pool size. The default value of this is 4.
+  ## Sets the minimum thread pool size. The default value of this is 4.
   minPoolSize = size
 
 proc setMaxPoolSize*(size: range[1..MaxThreadPoolSize]) =
-  ## sets the minimal thread pool size. The default value of this
-  ## is ``MaxThreadPoolSize``.
+  ## Sets the maximum thread pool size. The default value of this
+  ## is `MaxThreadPoolSize <#MaxThreadPoolSize>`_.
   maxPoolSize = size
   if currentPoolSize > maxPoolSize:
     for i in maxPoolSize..currentPoolSize-1:
       let w = addr(workersData[i])
       w.shutdown = true
 
-proc activateThread(i: int) {.noinline.} =
-  workersData[i].taskArrived = createSemaphore()
-  workersData[i].taskStarted = createSemaphore()
+when defined(nimRecursiveSpawn):
+  var localThreadId {.threadvar.}: int
+
+proc activateWorkerThread(i: int) {.noinline.} =
+  workersData[i].taskArrived.initSemaphore()
+  workersData[i].taskStarted.initSemaphore()
   workersData[i].initialized = true
-  workersData[i].q.empty = createSemaphore()
+  workersData[i].q.empty.initSemaphore()
   initLock(workersData[i].q.lock)
   createThread(workers[i], slave, addr(workersData[i]))
+  when defined(nimRecursiveSpawn):
+    localThreadId = i+1
+  when defined(nimPinToCpu):
+    if gCpus > 0: pinToCpu(workers[i], i mod gCpus)
+
+proc activateDistinguishedThread(i: int) {.noinline.} =
+  distinguishedData[i].taskArrived.initSemaphore()
+  distinguishedData[i].taskStarted.initSemaphore()
+  distinguishedData[i].initialized = true
+  distinguishedData[i].q.empty.initSemaphore()
+  initLock(distinguishedData[i].q.lock)
+  distinguishedData[i].readyForTask.initSemaphore()
+  createThread(distinguished[i], distinguishedSlave, addr(distinguishedData[i]))
 
 proc setup() =
-  currentPoolSize = min(countProcessors(), MaxThreadPoolSize)
+  let p = countProcessors()
+  when defined(nimPinToCpu):
+    gCpus = p
+  currentPoolSize = min(p, MaxThreadPoolSize)
   readyWorker = addr(workersData[0])
-  for i in 0.. <currentPoolSize: activateThread(i)
+  for i in 0..<currentPoolSize: activateWorkerThread(i)
 
 proc preferSpawn*(): bool =
-  ## Use this proc to determine quickly if a 'spawn' or a direct call is
-  ## preferable. If it returns 'true' a 'spawn' may make sense. In general
-  ## it is not necessary to call this directly; use 'spawnX' instead.
+  ## Use this proc to determine quickly if a `spawn` or a direct call is
+  ## preferable.
+  ##
+  ## If it returns `true`, a `spawn` may make sense. In general
+  ## it is not necessary to call this directly; use the `spawnX template
+  ## <#spawnX.t>`_ instead.
   result = gSomeReady.counter > 0
 
-proc spawn*(call: expr): expr {.magic: "Spawn".}
-  ## always spawns a new task, so that the 'call' is never executed on
-  ## the calling thread. 'call' has to be proc call 'p(...)' where 'p'
-  ## is gcsafe and has a return type that is either 'void' or compatible
-  ## with ``FlowVar[T]``.
-
-template spawnX*(call: expr): expr =
-  ## spawns a new task if a CPU core is ready, otherwise executes the
-  ## call in the calling thread. Usually it is advised to
-  ## use 'spawn' in order to not block the producer for an unknown
-  ## amount of time. 'call' has to be proc call 'p(...)' where 'p'
-  ## is gcsafe and has a return type that is either 'void' or compatible
-  ## with ``FlowVar[T]``.
+proc spawn*(call: sink typed) {.magic: "Spawn".} =
+  ## Always spawns a new task, so that the `call` is never executed on
+  ## the calling thread.
+  ##
+  ## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a
+  ## return type that is either `void` or compatible with `FlowVar[T]`.
+  discard "It uses `nimSpawn3` internally"
+
+proc pinnedSpawn*(id: ThreadId; call: sink typed) {.magic: "Spawn".} =
+  ## Always spawns a new task on the worker thread with `id`, so that
+  ## the `call` is **always** executed on the thread.
+  ##
+  ## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a
+  ## return type that is either `void` or compatible with `FlowVar[T]`.
+  discard "It uses `nimSpawn4` internally"
+
+template spawnX*(call) =
+  ## Spawns a new task if a CPU core is ready, otherwise executes the
+  ## call in the calling thread.
+  ##
+  ## Usually, it is advised to use the `spawn proc <#spawn,sinktyped>`_
+  ## in order to not block the producer for an unknown amount of time.
+  ##
+  ## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a
+  ## return type that is either 'void' or compatible with `FlowVar[T]`.
   (if preferSpawn(): spawn call else: call)
 
-proc parallel*(body: stmt) {.magic: "Parallel".}
-  ## a parallel section can be used to execute a block in parallel. ``body``
-  ## has to be in a DSL that is a particular subset of the language. Please
-  ## refer to the manual for further information.
+proc parallel*(body: untyped) {.magic: "Parallel".}
+  ## A parallel section can be used to execute a block in parallel.
+  ##
+  ## `body` has to be in a DSL that is a particular subset of the language.
+  ##
+  ## Please refer to `the manual <manual_experimental.html#parallel-amp-spawn>`_
+  ## for further information.
 
 var
   state: ThreadPoolState
-  stateLock: TLock
+  stateLock: Lock
 
 initLock stateLock
 
-proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} =
+proc nimSpawn3(fn: WorkerProc; data: pointer) {.compilerproc.} =
   # implementation of 'spawn' that is used by the code generator.
   while true:
     if selectWorker(readyWorker, fn, data): return
-    for i in 0.. <currentPoolSize:
+    for i in 0..<currentPoolSize:
       if selectWorker(addr(workersData[i]), fn, data): return
+
     # determine what to do, but keep in mind this is expensive too:
     # state.calls < maxPoolSize: warmup phase
     # (state.calls and 127) == 0: periodic check
     if state.calls < maxPoolSize or (state.calls and 127) == 0:
       # ensure the call to 'advice' is atomic:
       if tryAcquire(stateLock):
+        if currentPoolSize < minPoolSize:
+          if not workersData[currentPoolSize].initialized:
+            activateWorkerThread(currentPoolSize)
+          let w = addr(workersData[currentPoolSize])
+          atomicInc currentPoolSize
+          if selectWorker(w, fn, data):
+            release(stateLock)
+            return
+
         case advice(state)
         of doNothing: discard
         of doCreateThread:
           if currentPoolSize < maxPoolSize:
             if not workersData[currentPoolSize].initialized:
-              activateThread(currentPoolSize)
+              activateWorkerThread(currentPoolSize)
             let w = addr(workersData[currentPoolSize])
             atomicInc currentPoolSize
             if selectWorker(w, fn, data):
@@ -385,17 +535,72 @@ proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} =
         release(stateLock)
       # else the acquire failed, but this means some
       # other thread succeeded, so we don't need to do anything here.
-    await(gSomeReady)
+    when defined(nimRecursiveSpawn):
+      if localThreadId > 0:
+        # we are a worker thread, so instead of waiting for something which
+        # might as well never happen (see tparallel_quicksort), we run the task
+        # on the current thread instead.
+        var self = addr(workersData[localThreadId-1])
+        fn(self, data)
+        blockUntil(self.taskStarted)
+        return
+
+    if isSlave:
+      # Run under lock until `numSlavesWaiting` increment to avoid a
+      # race (otherwise two last threads might start waiting together)
+      withLock numSlavesLock:
+        if numSlavesRunning <= numSlavesWaiting + 1:
+          # All the other slaves are waiting
+          # If we wait now, we-re deadlocked until
+          # an external spawn happens !
+          if currentPoolSize < maxPoolSize:
+            if not workersData[currentPoolSize].initialized:
+              activateWorkerThread(currentPoolSize)
+            let w = addr(workersData[currentPoolSize])
+            atomicInc currentPoolSize
+            if selectWorker(w, fn, data):
+              return
+          else:
+            # There is no place in the pool. We're deadlocked.
+            # echo "Deadlock!"
+            discard
+
+        inc numSlavesWaiting
+
+    blockUntil(gSomeReady)
+
+    if isSlave:
+      withLock numSlavesLock:
+        dec numSlavesWaiting
+
+var
+  distinguishedLock: Lock
+
+initLock distinguishedLock
+
+proc nimSpawn4(fn: WorkerProc; data: pointer; id: ThreadId) {.compilerproc.} =
+  acquire(distinguishedLock)
+  if not distinguishedData[id].initialized:
+    activateDistinguishedThread(id)
+  release(distinguishedLock)
+  while true:
+    if selectWorker(addr(distinguishedData[id]), fn, data): break
+    blockUntil(distinguishedData[id].readyForTask)
+
 
 proc sync*() =
-  ## a simple barrier to wait for all spawn'ed tasks. If you need more elaborate
-  ## waiting, you have to use an explicit barrier.
+  ## A simple barrier to wait for all `spawn`ed tasks.
+  ##
+  ## If you need more elaborate waiting, you have to use an explicit barrier.
   while true:
     var allReady = true
-    for i in 0 .. <currentPoolSize:
+    for i in 0 ..< currentPoolSize:
       if not allReady: break
       allReady = allReady and workersData[i].ready
     if allReady: break
-    await(gSomeReady)
+    sleep(threadpoolWaitMs)
+    # We cannot "blockUntil(gSomeReady)" because workers may be shut down between
+    # the time we establish that some are not "ready" and the time we wait for a
+    # "signal(gSomeReady)" from inside "slave()" that can never come.
 
 setup()