summary refs log tree commit diff stats
path: root/lib/pure/concurrency
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pure/concurrency')
-rw-r--r--lib/pure/concurrency/atomics.nim148
-rw-r--r--lib/pure/concurrency/cpuinfo.nim163
-rw-r--r--lib/pure/concurrency/cpuload.nim11
-rw-r--r--lib/pure/concurrency/threadpool.nim187
4 files changed, 288 insertions, 221 deletions
diff --git a/lib/pure/concurrency/atomics.nim b/lib/pure/concurrency/atomics.nim
index 29491f68c..818f1b37a 100644
--- a/lib/pure/concurrency/atomics.nim
+++ b/lib/pure/concurrency/atomics.nim
@@ -10,10 +10,50 @@
 ## Types and operations for atomic operations and lockless algorithms.
 ##
 ## Unstable API.
-
-import macros
-
-when defined(cpp) or defined(nimdoc):
+## 
+## By default, C++ uses C11 atomic primitives. To use C++ `std::atomic`,
+## `-d:nimUseCppAtomics` can be defined.
+
+runnableExamples:
+  # Atomic
+  var loc: Atomic[int]
+  loc.store(4)
+  assert loc.load == 4
+  loc.store(2)
+  assert loc.load(moRelaxed) == 2
+  loc.store(9)
+  assert loc.load(moAcquire) == 9
+  loc.store(0, moRelease)
+  assert loc.load == 0
+
+  assert loc.exchange(7) == 0
+  assert loc.load == 7
+
+  var expected = 7
+  assert loc.compareExchange(expected, 5, moRelaxed, moRelaxed)
+  assert expected == 7
+  assert loc.load == 5
+
+  assert not loc.compareExchange(expected, 12, moRelaxed, moRelaxed)
+  assert expected == 5
+  assert loc.load == 5
+
+  assert loc.fetchAdd(1) == 5
+  assert loc.fetchAdd(2) == 6
+  assert loc.fetchSub(3) == 8
+
+  loc.atomicInc(1)
+  assert loc.load == 6
+
+  # AtomicFlag
+  var flag: AtomicFlag
+
+  assert not flag.testAndSet
+  assert flag.testAndSet
+  flag.clear(moRelaxed)
+  assert not flag.testAndSet
+
+when (defined(cpp) and defined(nimUseCppAtomics)) or defined(nimdoc):
   # For the C++ backend, types and operations map directly to C++11 atomics.
 
   {.push, header: "<atomic>".}
@@ -51,10 +91,11 @@ when defined(cpp) or defined(nimdoc):
         ## with other moSequentiallyConsistent operations.
 
   type
-    Atomic* {.importcpp: "std::atomic".} [T] = object
+    Atomic*[T] {.importcpp: "std::atomic", completeStruct.} = object
       ## An atomic object with underlying type `T`.
+      raw: T
 
-    AtomicFlag* {.importcpp: "std::atomic_flag".} = object
+    AtomicFlag* {.importcpp: "std::atomic_flag", size: 1.} = object
       ## An atomic boolean state.
 
   # Access operations
@@ -172,8 +213,8 @@ else:
 
     # MSVC intrinsics
     proc interlockedExchange(location: pointer; desired: int8): int8 {.importc: "_InterlockedExchange8".}
-    proc interlockedExchange(location: pointer; desired: int16): int16 {.importc: "_InterlockedExchange".}
-    proc interlockedExchange(location: pointer; desired: int32): int32 {.importc: "_InterlockedExchange16".}
+    proc interlockedExchange(location: pointer; desired: int16): int16 {.importc: "_InterlockedExchange16".}
+    proc interlockedExchange(location: pointer; desired: int32): int32 {.importc: "_InterlockedExchange".}
     proc interlockedExchange(location: pointer; desired: int64): int64 {.importc: "_InterlockedExchange64".}
 
     proc interlockedCompareExchange(location: pointer; desired, expected: int8): int8 {.importc: "_InterlockedCompareExchange8".}
@@ -235,10 +276,17 @@ else:
       cast[T](interlockedXor(addr(location.value), cast[nonAtomicType(T)](value)))
 
   else:
-    {.push, header: "<stdatomic.h>".}
+    when defined(cpp):
+      {.push, header: "<atomic>".}
+      template maybeWrapStd(x: string): string =
+        "std::" & x
+    else:
+      {.push, header: "<stdatomic.h>".}
+      template maybeWrapStd(x: string): string =
+        x
 
     type
-      MemoryOrder* {.importc: "memory_order".} = enum
+      MemoryOrder* {.importc: "memory_order".maybeWrapStd.} = enum
         moRelaxed
         moConsume
         moAcquire
@@ -246,58 +294,64 @@ else:
         moAcquireRelease
         moSequentiallyConsistent
 
-    type
-      # Atomic* {.importcpp: "_Atomic('0)".} [T] = object
+    when defined(cpp):
+      type
+        # Atomic*[T] {.importcpp: "_Atomic('0)".} = object
 
-      AtomicInt8 {.importc: "_Atomic NI8".} = object
-      AtomicInt16 {.importc: "_Atomic NI16".} = object
-      AtomicInt32 {.importc: "_Atomic NI32".} = object
-      AtomicInt64 {.importc: "_Atomic NI64".} = object
+        AtomicInt8 {.importc: "std::atomic<NI8>".} = int8
+        AtomicInt16 {.importc: "std::atomic<NI16>".} = int16
+        AtomicInt32 {.importc: "std::atomic<NI32>".} = int32
+        AtomicInt64 {.importc: "std::atomic<NI64>".} = int64
+    else:
+      type
+        # Atomic*[T] {.importcpp: "_Atomic('0)".} = object
 
-    template atomicType*(T: typedesc[Trivial]): untyped =
-      # Maps the size of a trivial type to it's internal atomic type
-      when sizeof(T) == 1: AtomicInt8
-      elif sizeof(T) == 2: AtomicInt16
-      elif sizeof(T) == 4: AtomicInt32
-      elif sizeof(T) == 8: AtomicInt64
+        AtomicInt8 {.importc: "_Atomic NI8".} = int8
+        AtomicInt16 {.importc: "_Atomic NI16".} = int16
+        AtomicInt32 {.importc: "_Atomic NI32".} = int32
+        AtomicInt64 {.importc: "_Atomic NI64".} = int64
 
     type
-      AtomicFlag* {.importc: "atomic_flag".} = object
+      AtomicFlag* {.importc: "atomic_flag".maybeWrapStd, size: 1.} = object
 
       Atomic*[T] = object
         when T is Trivial:
-          value: T.atomicType
+          # Maps the size of a trivial type to it's internal atomic type
+          when sizeof(T) == 1: value: AtomicInt8
+          elif sizeof(T) == 2: value: AtomicInt16
+          elif sizeof(T) == 4: value: AtomicInt32
+          elif sizeof(T) == 8: value: AtomicInt64
         else:
           nonAtomicValue: T
           guard: AtomicFlag
 
     #proc init*[T](location: var Atomic[T]; value: T): T {.importcpp: "atomic_init(@)".}
-    proc atomic_load_explicit[T, A](location: ptr A; order: MemoryOrder): T {.importc.}
-    proc atomic_store_explicit[T, A](location: ptr A; desired: T; order: MemoryOrder = moSequentiallyConsistent) {.importc.}
-    proc atomic_exchange_explicit[T, A](location: ptr A; desired: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc.}
-    proc atomic_compare_exchange_strong_explicit[T, A](location: ptr A; expected: ptr T; desired: T; success, failure: MemoryOrder): bool {.importc.}
-    proc atomic_compare_exchange_weak_explicit[T, A](location: ptr A; expected: ptr T; desired: T; success, failure: MemoryOrder): bool {.importc.}
+    proc atomic_load_explicit[T, A](location: ptr A; order: MemoryOrder): T {.importc: "atomic_load_explicit".maybeWrapStd.}
+    proc atomic_store_explicit[T, A](location: ptr A; desired: T; order: MemoryOrder = moSequentiallyConsistent) {.importc: "atomic_store_explicit".maybeWrapStd.}
+    proc atomic_exchange_explicit[T, A](location: ptr A; desired: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc: "atomic_exchange_explicit".maybeWrapStd.}
+    proc atomic_compare_exchange_strong_explicit[T, A](location: ptr A; expected: ptr T; desired: T; success, failure: MemoryOrder): bool {.importc: "atomic_compare_exchange_strong_explicit".maybeWrapStd.}
+    proc atomic_compare_exchange_weak_explicit[T, A](location: ptr A; expected: ptr T; desired: T; success, failure: MemoryOrder): bool {.importc: "atomic_compare_exchange_weak_explicit".maybeWrapStd.}
 
     # Numerical operations
-    proc atomic_fetch_add_explicit[T, A](location: ptr A; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc.}
-    proc atomic_fetch_sub_explicit[T, A](location: ptr A; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc.}
-    proc atomic_fetch_and_explicit[T, A](location: ptr A; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc.}
-    proc atomic_fetch_or_explicit[T, A](location: ptr A; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc.}
-    proc atomic_fetch_xor_explicit[T, A](location: ptr A; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc.}
+    proc atomic_fetch_add_explicit[T, A](location: ptr A; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc: "atomic_fetch_add_explicit".maybeWrapStd.}
+    proc atomic_fetch_sub_explicit[T, A](location: ptr A; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc: "atomic_fetch_sub_explicit".maybeWrapStd.}
+    proc atomic_fetch_and_explicit[T, A](location: ptr A; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc: "atomic_fetch_and_explicit".maybeWrapStd.}
+    proc atomic_fetch_or_explicit[T, A](location: ptr A; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc: "atomic_fetch_or_explicit".maybeWrapStd.}
+    proc atomic_fetch_xor_explicit[T, A](location: ptr A; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc: "atomic_fetch_xor_explicit".maybeWrapStd.}
 
     # Flag operations
     # var ATOMIC_FLAG_INIT {.importc, nodecl.}: AtomicFlag
     # proc init*(location: var AtomicFlag) {.inline.} = location = ATOMIC_FLAG_INIT
-    proc testAndSet*(location: var AtomicFlag; order: MemoryOrder = moSequentiallyConsistent): bool {.importc: "atomic_flag_test_and_set_explicit".}
-    proc clear*(location: var AtomicFlag; order: MemoryOrder = moSequentiallyConsistent) {.importc: "atomic_flag_clear_explicit".}
+    proc testAndSet*(location: var AtomicFlag; order: MemoryOrder = moSequentiallyConsistent): bool {.importc: "atomic_flag_test_and_set_explicit".maybeWrapStd.}
+    proc clear*(location: var AtomicFlag; order: MemoryOrder = moSequentiallyConsistent) {.importc: "atomic_flag_clear_explicit".maybeWrapStd.}
 
-    proc fence*(order: MemoryOrder) {.importc: "atomic_thread_fence".}
-    proc signalFence*(order: MemoryOrder) {.importc: "atomic_signal_fence".}
+    proc fence*(order: MemoryOrder) {.importc: "atomic_thread_fence".maybeWrapStd.}
+    proc signalFence*(order: MemoryOrder) {.importc: "atomic_signal_fence".maybeWrapStd.}
 
     {.pop.}
 
     proc load*[T: Trivial](location: var Atomic[T]; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} =
-      cast[T](atomic_load_explicit[nonAtomicType(T), type(location.value)](addr(location.value), order))
+      cast[T](atomic_load_explicit[nonAtomicType(T), typeof(location.value)](addr(location.value), order))
     proc store*[T: Trivial](location: var Atomic[T]; desired: T; order: MemoryOrder = moSequentiallyConsistent) {.inline.} =
       atomic_store_explicit(addr(location.value), cast[nonAtomicType(T)](desired), order)
     proc exchange*[T: Trivial](location: var Atomic[T]; desired: T; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} =
@@ -325,9 +379,11 @@ else:
       cast[T](atomic_fetch_xor_explicit(addr(location.value), cast[nonAtomicType(T)](value), order))
 
   template withLock[T: not Trivial](location: var Atomic[T]; order: MemoryOrder; body: untyped): untyped =
-    while location.guard.testAndSet(moAcquire): discard
-    body
-    location.guard.clear(moRelease)
+    while testAndSet(location.guard, moAcquire): discard
+    try:
+      body
+    finally:
+      clear(location.guard, moRelease)
 
   proc load*[T: not Trivial](location: var Atomic[T]; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} =
     withLock(location, order):
@@ -345,16 +401,14 @@ else:
   proc compareExchange*[T: not Trivial](location: var Atomic[T]; expected: var T; desired: T; success, failure: MemoryOrder): bool {.inline.} =
     withLock(location, success):
       if location.nonAtomicValue != expected:
+        expected = location.nonAtomicValue
         return false
+      expected = desired
       swap(location.nonAtomicValue, expected)
       return true
 
   proc compareExchangeWeak*[T: not Trivial](location: var Atomic[T]; expected: var T; desired: T; success, failure: MemoryOrder): bool {.inline.} =
-    withLock(location, success):
-      if location.nonAtomicValue != expected:
-        return false
-      swap(location.nonAtomicValue, expected)
-      return true
+    compareExchange(location, expected, desired, success, failure)
 
   proc compareExchange*[T: not Trivial](location: var Atomic[T]; expected: var T; desired: T; order: MemoryOrder = moSequentiallyConsistent): bool {.inline.} =
     compareExchange(location, expected, desired, order, order)
diff --git a/lib/pure/concurrency/cpuinfo.nim b/lib/pure/concurrency/cpuinfo.nim
index 515d7e2da..9bc3fd579 100644
--- a/lib/pure/concurrency/cpuinfo.nim
+++ b/lib/pure/concurrency/cpuinfo.nim
@@ -7,91 +7,104 @@
 #    distribution, for details about the copyright.
 #
 
-## This module implements procs to determine the number of CPUs / cores.
+## This module implements a proc to determine the number of CPUs / cores.
+
+runnableExamples:
+  doAssert countProcessors() > 0
+
 
 include "system/inclrtl"
 
-when not defined(windows):
-  import posix
+when defined(js):
+  import std/jsffi
+  proc countProcessorsImpl(): int =
+    when defined(nodejs):
+      let jsOs = require("os")
+      let jsObj = jsOs.cpus().length
+    else:
+      # `navigator.hardwareConcurrency`
+      # works on browser as well as deno.
+      let navigator{.importcpp.}: JsObject
+      let jsObj = navigator.hardwareConcurrency
+    result = jsObj.to int
+else:
+  when defined(posix) and not (defined(macosx) or defined(bsd)):
+    import std/posix
+
+  when defined(windows):
+    import std/private/win_getsysteminfo
+
+  when defined(freebsd) or defined(macosx):
+    {.emit: "#include <sys/types.h>".}
 
-when defined(freebsd) or defined(macosx):
-  {.emit:"#include <sys/types.h>".}
+  when defined(openbsd) or defined(netbsd):
+    {.emit: "#include <sys/param.h>".}
 
-when defined(openbsd) or defined(netbsd):
-  {.emit:"#include <sys/param.h>".}
+  when defined(macosx) or defined(bsd):
+    # we HAVE to emit param.h before sysctl.h so we cannot use .header here
+    # either. The amount of archaic bullshit in Poonix based OSes is just insane.
+    {.emit: "#include <sys/sysctl.h>".}
+    {.push nodecl.}
+    when defined(macosx):
+      proc sysctlbyname(name: cstring,
+        oldp: pointer, oldlenp: var csize_t,
+        newp: pointer, newlen: csize_t): cint {.importc.}
+    let
+      CTL_HW{.importc.}: cint
+      HW_NCPU{.importc.}: cint
+    proc sysctl[I: static[int]](name: var array[I, cint], namelen: cuint,
+      oldp: pointer, oldlenp: var csize_t,
+      newp: pointer, newlen: csize_t): cint {.importc.}
+    {.pop.}
 
-when defined(macosx) or defined(bsd):
-  # we HAVE to emit param.h before sysctl.h so we cannot use .header here
-  # either. The amount of archaic bullshit in Poonix based OSes is just insane.
-  {.emit:"#include <sys/sysctl.h>".}
-  const
-    CTL_HW = 6
-    HW_AVAILCPU = 25
-    HW_NCPU = 3
-  proc sysctl(x: ptr array[0..3, cint], y: cint, z: pointer,
-              a: var csize_t, b: pointer, c: csize_t): cint {.
-              importc: "sysctl", nodecl.}
+  when defined(genode):
+    import genode/env
 
-when defined(genode):
-  include genode/env
+    proc affinitySpaceTotal(env: GenodeEnvPtr): cuint {.
+      importcpp: "@->cpu().affinity_space().total()".}
 
-  proc affinitySpaceTotal(env: GenodeEnvPtr): cuint {.
-    importcpp: "@->cpu().affinity_space().total()".}
+  when defined(haiku):
+    type
+      SystemInfo {.importc: "system_info", header: "<OS.h>".} = object
+        cpuCount {.importc: "cpu_count".}: uint32
+
+    proc getSystemInfo(info: ptr SystemInfo): int32 {.importc: "get_system_info",
+                                                      header: "<OS.h>".}
+
+  proc countProcessorsImpl(): int {.inline.} =
+    when defined(windows):
+      var
+        si: SystemInfo
+      getSystemInfo(addr si)
+      result = int(si.dwNumberOfProcessors)
+    elif defined(macosx) or defined(bsd):
+      let dest = addr result
+      var len = sizeof(result).csize_t
+      when defined(macosx):
+        # alias of "hw.activecpu"
+        if sysctlbyname("hw.logicalcpu", dest, len, nil, 0) == 0:
+          return
+      var mib = [CTL_HW, HW_NCPU]
+      if sysctl(mib, 2, dest, len, nil, 0) == 0:
+        return
+    elif defined(hpux):
+      result = mpctl(MPC_GETNUMSPUS, nil, nil)
+    elif defined(irix):
+      var SC_NPROC_ONLN {.importc: "_SC_NPROC_ONLN", header: "<unistd.h>".}: cint
+      result = sysconf(SC_NPROC_ONLN)
+    elif defined(genode):
+      result = runtimeEnv.affinitySpaceTotal().int
+    elif defined(haiku):
+      var sysinfo: SystemInfo
+      if getSystemInfo(addr sysinfo) == 0:
+        result = sysinfo.cpuCount.int
+    else:
+      result = sysconf(SC_NPROCESSORS_ONLN)
+    if result < 0: result = 0
 
-when defined(haiku):
-  type
-    SystemInfo {.importc: "system_info", header: "<OS.h>".} = object
-      cpuCount {.importc: "cpu_count".}: uint32
 
-  proc getSystemInfo(info: ptr SystemInfo): int32 {.importc: "get_system_info",
-                                                    header: "<OS.h>".}
 
 proc countProcessors*(): int {.rtl, extern: "ncpi$1".} =
-  ## returns the number of the processors/cores the machine has.
+  ## Returns the number of the processors/cores the machine has.
   ## Returns 0 if it cannot be detected.
-  when defined(windows):
-    type
-      SYSTEM_INFO {.final, pure.} = object
-        u1: int32
-        dwPageSize: int32
-        lpMinimumApplicationAddress: pointer
-        lpMaximumApplicationAddress: pointer
-        dwActiveProcessorMask: ptr int32
-        dwNumberOfProcessors: int32
-        dwProcessorType: int32
-        dwAllocationGranularity: int32
-        wProcessorLevel: int16
-        wProcessorRevision: int16
-
-    proc GetSystemInfo(lpSystemInfo: var SYSTEM_INFO) {.stdcall, dynlib: "kernel32", importc: "GetSystemInfo".}
-
-    var
-      si: SYSTEM_INFO
-    GetSystemInfo(si)
-    result = si.dwNumberOfProcessors
-  elif defined(macosx) or defined(bsd):
-    var
-      mib: array[0..3, cint]
-      numCPU: int
-    mib[0] = CTL_HW
-    mib[1] = HW_AVAILCPU
-    var len = sizeof(numCPU).csize_t
-    discard sysctl(addr(mib), 2, addr(numCPU), len, nil, 0)
-    if numCPU < 1:
-      mib[1] = HW_NCPU
-      discard sysctl(addr(mib), 2, addr(numCPU), len, nil, 0)
-    result = numCPU
-  elif defined(hpux):
-    result = mpctl(MPC_GETNUMSPUS, nil, nil)
-  elif defined(irix):
-    var SC_NPROC_ONLN {.importc: "_SC_NPROC_ONLN", header: "<unistd.h>".}: cint
-    result = sysconf(SC_NPROC_ONLN)
-  elif defined(genode):
-    result = runtimeEnv.affinitySpaceTotal().int
-  elif defined(haiku):
-    var sysinfo: SystemInfo
-    if getSystemInfo(addr sysinfo) == 0:
-      result = sysinfo.cpuCount.int
-  else:
-    result = sysconf(SC_NPROCESSORS_ONLN)
-  if result <= 0: result = 0
+  countProcessorsImpl()
diff --git a/lib/pure/concurrency/cpuload.nim b/lib/pure/concurrency/cpuload.nim
index 3ee7336f0..bfbf16721 100644
--- a/lib/pure/concurrency/cpuload.nim
+++ b/lib/pure/concurrency/cpuload.nim
@@ -13,11 +13,14 @@
 ## Unstable API.
 
 when defined(windows):
-  import winlean, os, strutils, math
+  import std/[winlean, os, strutils, math]
 
   proc `-`(a, b: FILETIME): int64 = a.rdFileTime - b.rdFileTime
 elif defined(linux):
-  from cpuinfo import countProcessors
+  from std/cpuinfo import countProcessors
+
+when defined(nimPreviewSlimSystem):
+  import std/syncio
 
 type
   ThreadPoolAdvice* = enum
@@ -84,11 +87,11 @@ proc advice*(s: var ThreadPoolState): ThreadPoolAdvice =
   inc s.calls
 
 when not defined(testing) and isMainModule and not defined(nimdoc):
-  import random
+  import std/random
 
   proc busyLoop() =
     while true:
-      discard random(80)
+      discard rand(80)
       os.sleep(100)
 
   spawn busyLoop()
diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim
index 1f7df7c00..06ed2fe54 100644
--- a/lib/pure/concurrency/threadpool.nim
+++ b/lib/pure/concurrency/threadpool.nim
@@ -7,20 +7,25 @@
 #    distribution, for details about the copyright.
 #
 
-## Implements Nim's `spawn <manual_experimental.html#parallel-amp-spawn>`_.
-##
-## **See also:**
-## * `threads module <threads.html>`_
-## * `channels module <channels.html>`_
-## * `locks module <locks.html>`_
-## * `asyncdispatch module <asyncdispatch.html>`_
+{.deprecated: "use the nimble packages `malebolgia`, `taskpools` or `weave` instead".}
+
+## Implements Nim's `parallel & spawn statements <manual_experimental.html#parallel-amp-spawn>`_.
 ##
 ## Unstable API.
+##
+## See also
+## ========
+## * `threads module <typedthreads.html>`_ for basic thread support
+## * `locks module <locks.html>`_ for locks and condition variables
+## * `asyncdispatch module <asyncdispatch.html>`_ for asynchronous IO
 
 when not compileOption("threads"):
   {.error: "Threadpool requires --threads:on option.".}
 
-import cpuinfo, cpuload, locks, os
+import std/[cpuinfo, cpuload, locks, os]
+
+when defined(nimPreviewSlimSystem):
+  import std/[assertions, typedthreads, sysatomics]
 
 {.push stackTrace:off.}
 
@@ -51,38 +56,35 @@ proc signal(cv: var Semaphore) =
   release(cv.L)
   signal(cv.c)
 
-const CacheLineSize = 32 # true for most archs
+const CacheLineSize = 64 # true for most archs
 
 type
-  Barrier {.compilerProc.} = object
+  Barrier {.compilerproc.} = object
     entered: int
     cv: Semaphore # Semaphore takes 3 words at least
-    when sizeof(int) < 8:
-      cacheAlign: array[CacheLineSize-4*sizeof(int), byte]
-    left: int
-    cacheAlign2: array[CacheLineSize-sizeof(int), byte]
-    interest: bool # whether the master is interested in the "all done" event
+    left {.align(CacheLineSize).}: int
+    interest {.align(CacheLineSize).} : bool # whether the master is interested in the "all done" event
 
-proc barrierEnter(b: ptr Barrier) {.compilerProc, inline.} =
+proc barrierEnter(b: ptr Barrier) {.compilerproc, inline.} =
   # due to the signaling between threads, it is ensured we are the only
   # one with access to 'entered' so we don't need 'atomicInc' here:
   inc b.entered
   # also we need no 'fence' instructions here as soon 'nimArgsPassingDone'
   # will be called which already will perform a fence for us.
 
-proc barrierLeave(b: ptr Barrier) {.compilerProc, inline.} =
+proc barrierLeave(b: ptr Barrier) {.compilerproc, inline.} =
   atomicInc b.left
   when not defined(x86): fence()
   # We may not have seen the final value of b.entered yet,
   # so we need to check for >= instead of ==.
   if b.interest and b.left >= b.entered: signal(b.cv)
 
-proc openBarrier(b: ptr Barrier) {.compilerProc, inline.} =
+proc openBarrier(b: ptr Barrier) {.compilerproc, inline.} =
   b.entered = 0
   b.left = 0
   b.interest = false
 
-proc closeBarrier(b: ptr Barrier) {.compilerProc.} =
+proc closeBarrier(b: ptr Barrier) {.compilerproc.} =
   fence()
   if b.left != b.entered:
     b.cv.initSemaphore()
@@ -101,8 +103,8 @@ type
     cv: Semaphore
     idx: int
 
-  FlowVarBase* = ref FlowVarBaseObj ## Untyped base class for ``FlowVar[T]``.
-  FlowVarBaseObj = object of RootObj
+  FlowVarBase* = ref FlowVarBaseObj ## Untyped base class for `FlowVar[T] <#FlowVar>`_.
+  FlowVarBaseObj {.acyclic.} = object of RootObj
     ready, usesSemaphore, awaited: bool
     cv: Semaphore  # for 'blockUntilAny' support
     ai: ptr AwaitInfo
@@ -111,10 +113,10 @@ type
                    # be RootRef here otherwise the wrong GC keeps track of it!
     owner: pointer # ptr Worker
 
-  FlowVarObj[T] = object of FlowVarBaseObj
+  FlowVarObj[T] {.acyclic.} = object of FlowVarBaseObj
     blob: T
 
-  FlowVar*{.compilerProc.}[T] = ref FlowVarObj[T] ## A data flow variable.
+  FlowVar*[T] {.compilerproc.} = ref FlowVarObj[T] ## A data flow variable.
 
   ToFreeQueue = object
     len: int
@@ -137,8 +139,8 @@ type
 
 const threadpoolWaitMs {.intdefine.}: int = 100
 
-proc blockUntil*(fv: FlowVarBase) =
-  ## Waits until the value for the ``fv`` arrives.
+proc blockUntil*(fv: var FlowVarBaseObj) =
+  ## Waits until the value for `fv` arrives.
   ##
   ## Usually it is not necessary to call this explicitly.
   if fv.usesSemaphore and not fv.awaited:
@@ -185,7 +187,7 @@ proc attach(fv: FlowVarBase; i: int): bool =
     result = false
   release(fv.cv.L)
 
-proc finished(fv: FlowVarBase) =
+proc finished(fv: var FlowVarBaseObj) =
   doAssert fv.ai.isNil, "flowVar is still attached to an 'blockUntilAny'"
   # we have to protect against the rare cases where the owner of the flowVar
   # simply disregards the flowVar and yet the "flowVar" has not yet written
@@ -208,16 +210,18 @@ proc finished(fv: FlowVarBase) =
   # the worker thread waits for "data" to be set to nil before shutting down
   owner.data = nil
 
-proc fvFinalizer[T](fv: FlowVar[T]) = finished(fv)
+proc `=destroy`[T](fv: var FlowVarObj[T]) =
+  finished(fv)
+  `=destroy`(fv.blob)
 
-proc nimCreateFlowVar[T](): FlowVar[T] {.compilerProc.} =
-  new(result, fvFinalizer)
+proc nimCreateFlowVar[T](): FlowVar[T] {.compilerproc.} =
+  new(result)
 
-proc nimFlowVarCreateSemaphore(fv: FlowVarBase) {.compilerProc.} =
+proc nimFlowVarCreateSemaphore(fv: FlowVarBase) {.compilerproc.} =
   fv.cv.initSemaphore()
   fv.usesSemaphore = true
 
-proc nimFlowVarSignal(fv: FlowVarBase) {.compilerProc.} =
+proc nimFlowVarSignal(fv: FlowVarBase) {.compilerproc.} =
   if fv.ai != nil:
     acquire(fv.ai.cv.L)
     fv.ai.idx = fv.idx
@@ -228,60 +232,51 @@ proc nimFlowVarSignal(fv: FlowVarBase) {.compilerProc.} =
     signal(fv.cv)
 
 proc awaitAndThen*[T](fv: FlowVar[T]; action: proc (x: T) {.closure.}) =
-  ## Blocks until the ``fv`` is available and then passes its value
-  ## to ``action``.
+  ## Blocks until `fv` is available and then passes its value
+  ## to `action`.
   ##
-  ## Note that due to Nim's parameter passing semantics this
-  ## means that ``T`` doesn't need to be copied so ``awaitAndThen`` can
-  ## sometimes be more efficient than `^ proc <#^,FlowVar[T]>`_.
-  blockUntil(fv)
-  when T is string or T is seq:
+  ## Note that due to Nim's parameter passing semantics, this
+  ## means that `T` doesn't need to be copied, so `awaitAndThen` can
+  ## sometimes be more efficient than the `^ proc <#^,FlowVar[T]>`_.
+  blockUntil(fv[])
+  when defined(nimV2):
+    action(fv.blob)
+  elif T is string or T is seq:
     action(cast[T](fv.data))
   elif T is ref:
     {.error: "'awaitAndThen' not available for FlowVar[ref]".}
   else:
     action(fv.blob)
-  finished(fv)
+  finished(fv[])
 
 proc unsafeRead*[T](fv: FlowVar[ref T]): ptr T =
   ## Blocks until the value is available and then returns this value.
-  blockUntil(fv)
-  result = cast[ptr T](fv.data)
-  finished(fv)
-
-proc `^`*[T](fv: FlowVar[ref T]): ref T =
-  ## Blocks until the value is available and then returns this value.
-  blockUntil(fv)
-  let src = cast[ref T](fv.data)
+  blockUntil(fv[])
   when defined(nimV2):
-    result = src
+    result = cast[ptr T](fv.blob)
   else:
-    deepCopy result, src
-  finished(fv)
+    result = cast[ptr T](fv.data)
+  finished(fv[])
 
 proc `^`*[T](fv: FlowVar[T]): T =
   ## Blocks until the value is available and then returns this value.
-  blockUntil(fv)
-  when T is string or T is seq:
-    let src = cast[T](fv.data)
-    when defined(nimV2):
-      result = src
-    else:
-      deepCopy result, src
+  blockUntil(fv[])
+  when not defined(nimV2) and (T is string or T is seq or T is ref):
+    deepCopy result, cast[T](fv.data)
   else:
     result = fv.blob
-  finished(fv)
+  finished(fv[])
 
 proc blockUntilAny*(flowVars: openArray[FlowVarBase]): int =
-  ## Awaits any of the given ``flowVars``. Returns the index of one ``flowVar``
+  ## Awaits any of the given `flowVars`. Returns the index of one `flowVar`
   ## for which a value arrived.
   ##
-  ## A ``flowVar`` only supports one call to ``blockUntilAny`` at the same time.
-  ## That means if you ``blockUntilAny([a,b])`` and ``blockUntilAny([b,c])``
-  ## the second call will only block until ``c``. If there is no ``flowVar`` left
+  ## A `flowVar` only supports one call to `blockUntilAny` at the same time.
+  ## That means if you `blockUntilAny([a,b])` and `blockUntilAny([b,c])`
+  ## the second call will only block until `c`. If there is no `flowVar` left
   ## to be able to wait on, -1 is returned.
   ##
-  ## **Note**: This results in non-deterministic behaviour and should be avoided.
+  ## **Note:** This results in non-deterministic behaviour and should be avoided.
   var ai: AwaitInfo
   ai.cv.initSemaphore()
   var conflicts = 0
@@ -302,9 +297,9 @@ proc blockUntilAny*(flowVars: openArray[FlowVarBase]): int =
   destroySemaphore(ai.cv)
 
 proc isReady*(fv: FlowVarBase): bool =
-  ## Determines whether the specified ``FlowVarBase``'s value is available.
+  ## Determines whether the specified `FlowVarBase`'s value is available.
   ##
-  ## If ``true``, awaiting ``fv`` will not block.
+  ## If `true`, awaiting `fv` will not block.
   if fv.usesSemaphore and not fv.awaited:
     acquire(fv.cv.L)
     result = fv.cv.counter > 0
@@ -312,31 +307,31 @@ proc isReady*(fv: FlowVarBase): bool =
   else:
     result = true
 
-proc nimArgsPassingDone(p: pointer) {.compilerProc.} =
+proc nimArgsPassingDone(p: pointer) {.compilerproc.} =
   let w = cast[ptr Worker](p)
   signal(w.taskStarted)
 
 const
-  MaxThreadPoolSize* = 256 ## Maximum size of the thread pool. 256 threads
-                           ## should be good enough for anybody ;-)
-  MaxDistinguishedThread* = 32 ## Maximum number of "distinguished" threads.
+  MaxThreadPoolSize* {.intdefine.} = 256 ## Maximum size of the thread pool. 256 threads
+                                         ## should be good enough for anybody ;-)
+  MaxDistinguishedThread* {.intdefine.} = 32 ## Maximum number of "distinguished" threads.
 
 type
-  ThreadId* = range[0..MaxDistinguishedThread-1]
+  ThreadId* = range[0..MaxDistinguishedThread-1] ## A thread identifier.
 
 var
   currentPoolSize: int
   maxPoolSize = MaxThreadPoolSize
   minPoolSize = 4
-  gSomeReady : Semaphore
+  gSomeReady: Semaphore
   readyWorker: ptr Worker
 
 # A workaround for recursion deadlock issue
 # https://github.com/nim-lang/Nim/issues/4597
 var
   numSlavesLock: Lock
-  numSlavesRunning {.guard: numSlavesLock}: int
-  numSlavesWaiting {.guard: numSlavesLock}: int
+  numSlavesRunning {.guard: numSlavesLock.}: int
+  numSlavesWaiting {.guard: numSlavesLock.}: int
   isSlave {.threadvar.}: bool
 
 numSlavesLock.initLock
@@ -409,7 +404,7 @@ proc setMinPoolSize*(size: range[1..MaxThreadPoolSize]) =
 
 proc setMaxPoolSize*(size: range[1..MaxThreadPoolSize]) =
   ## Sets the maximum thread pool size. The default value of this
-  ## is ``MaxThreadPoolSize`` (256).
+  ## is `MaxThreadPoolSize <#MaxThreadPoolSize>`_.
   maxPoolSize = size
   if currentPoolSize > maxPoolSize:
     for i in maxPoolSize..currentPoolSize-1:
@@ -449,43 +444,45 @@ proc setup() =
   for i in 0..<currentPoolSize: activateWorkerThread(i)
 
 proc preferSpawn*(): bool =
-  ## Use this proc to determine quickly if a ``spawn`` or a direct call is
+  ## Use this proc to determine quickly if a `spawn` or a direct call is
   ## preferable.
   ##
-  ## If it returns ``true``, a ``spawn`` may make sense. In general
-  ## it is not necessary to call this directly; use `spawnX template
+  ## If it returns `true`, a `spawn` may make sense. In general
+  ## it is not necessary to call this directly; use the `spawnX template
   ## <#spawnX.t>`_ instead.
   result = gSomeReady.counter > 0
 
-proc spawn*(call: typed): void {.magic: "Spawn".}
-  ## Always spawns a new task, so that the ``call`` is never executed on
+proc spawn*(call: sink typed) {.magic: "Spawn".} =
+  ## Always spawns a new task, so that the `call` is never executed on
   ## the calling thread.
   ##
-  ## ``call`` has to be proc call ``p(...)`` where ``p`` is gcsafe and has a
-  ## return type that is either ``void`` or compatible with ``FlowVar[T]``.
+  ## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a
+  ## return type that is either `void` or compatible with `FlowVar[T]`.
+  discard "It uses `nimSpawn3` internally"
 
-proc pinnedSpawn*(id: ThreadId; call: typed): void {.magic: "Spawn".}
-  ## Always spawns a new task on the worker thread with ``id``, so that
-  ## the ``call`` is **always** executed on the thread.
+proc pinnedSpawn*(id: ThreadId; call: sink typed) {.magic: "Spawn".} =
+  ## Always spawns a new task on the worker thread with `id`, so that
+  ## the `call` is **always** executed on the thread.
   ##
-  ## ``call`` has to be proc call ``p(...)`` where ``p`` is gcsafe and has a
-  ## return type that is either ``void`` or compatible with ``FlowVar[T]``.
+  ## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a
+  ## return type that is either `void` or compatible with `FlowVar[T]`.
+  discard "It uses `nimSpawn4` internally"
 
-template spawnX*(call): void =
+template spawnX*(call) =
   ## Spawns a new task if a CPU core is ready, otherwise executes the
   ## call in the calling thread.
   ##
-  ## Usually it is advised to use `spawn proc <#spawn,typed>`_ in order to
-  ## not block the producer for an unknown amount of time.
+  ## Usually, it is advised to use the `spawn proc <#spawn,sinktyped>`_
+  ## in order to not block the producer for an unknown amount of time.
   ##
-  ## ``call`` has to be proc call ``p(...)`` where ``p`` is gcsafe and has a
-  ## return type that is either 'void' or compatible with ``FlowVar[T]``.
+  ## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a
+  ## return type that is either 'void' or compatible with `FlowVar[T]`.
   (if preferSpawn(): spawn call else: call)
 
 proc parallel*(body: untyped) {.magic: "Parallel".}
   ## A parallel section can be used to execute a block in parallel.
   ##
-  ## ``body`` has to be in a DSL that is a particular subset of the language.
+  ## `body` has to be in a DSL that is a particular subset of the language.
   ##
   ## Please refer to `the manual <manual_experimental.html#parallel-amp-spawn>`_
   ## for further information.
@@ -496,7 +493,7 @@ var
 
 initLock stateLock
 
-proc nimSpawn3(fn: WorkerProc; data: pointer) {.compilerProc.} =
+proc nimSpawn3(fn: WorkerProc; data: pointer) {.compilerproc.} =
   # implementation of 'spawn' that is used by the code generator.
   while true:
     if selectWorker(readyWorker, fn, data): return
@@ -581,7 +578,7 @@ var
 
 initLock distinguishedLock
 
-proc nimSpawn4(fn: WorkerProc; data: pointer; id: ThreadId) {.compilerProc.} =
+proc nimSpawn4(fn: WorkerProc; data: pointer; id: ThreadId) {.compilerproc.} =
   acquire(distinguishedLock)
   if not distinguishedData[id].initialized:
     activateDistinguishedThread(id)
@@ -592,7 +589,7 @@ proc nimSpawn4(fn: WorkerProc; data: pointer; id: ThreadId) {.compilerProc.} =
 
 
 proc sync*() =
-  ## A simple barrier to wait for all ``spawn``'ed tasks.
+  ## A simple barrier to wait for all `spawn`ed tasks.
   ##
   ## If you need more elaborate waiting, you have to use an explicit barrier.
   while true: