diff options
Diffstat (limited to 'lib/pure/concurrency')
-rw-r--r-- | lib/pure/concurrency/atomics.nim | 148 | ||||
-rw-r--r-- | lib/pure/concurrency/cpuinfo.nim | 163 | ||||
-rw-r--r-- | lib/pure/concurrency/cpuload.nim | 11 | ||||
-rw-r--r-- | lib/pure/concurrency/threadpool.nim | 187 |
4 files changed, 288 insertions, 221 deletions
diff --git a/lib/pure/concurrency/atomics.nim b/lib/pure/concurrency/atomics.nim index 29491f68c..818f1b37a 100644 --- a/lib/pure/concurrency/atomics.nim +++ b/lib/pure/concurrency/atomics.nim @@ -10,10 +10,50 @@ ## Types and operations for atomic operations and lockless algorithms. ## ## Unstable API. - -import macros - -when defined(cpp) or defined(nimdoc): +## +## By default, C++ uses C11 atomic primitives. To use C++ `std::atomic`, +## `-d:nimUseCppAtomics` can be defined. + +runnableExamples: + # Atomic + var loc: Atomic[int] + loc.store(4) + assert loc.load == 4 + loc.store(2) + assert loc.load(moRelaxed) == 2 + loc.store(9) + assert loc.load(moAcquire) == 9 + loc.store(0, moRelease) + assert loc.load == 0 + + assert loc.exchange(7) == 0 + assert loc.load == 7 + + var expected = 7 + assert loc.compareExchange(expected, 5, moRelaxed, moRelaxed) + assert expected == 7 + assert loc.load == 5 + + assert not loc.compareExchange(expected, 12, moRelaxed, moRelaxed) + assert expected == 5 + assert loc.load == 5 + + assert loc.fetchAdd(1) == 5 + assert loc.fetchAdd(2) == 6 + assert loc.fetchSub(3) == 8 + + loc.atomicInc(1) + assert loc.load == 6 + + # AtomicFlag + var flag: AtomicFlag + + assert not flag.testAndSet + assert flag.testAndSet + flag.clear(moRelaxed) + assert not flag.testAndSet + +when (defined(cpp) and defined(nimUseCppAtomics)) or defined(nimdoc): # For the C++ backend, types and operations map directly to C++11 atomics. {.push, header: "<atomic>".} @@ -51,10 +91,11 @@ when defined(cpp) or defined(nimdoc): ## with other moSequentiallyConsistent operations. type - Atomic* {.importcpp: "std::atomic".} [T] = object + Atomic*[T] {.importcpp: "std::atomic", completeStruct.} = object ## An atomic object with underlying type `T`. + raw: T - AtomicFlag* {.importcpp: "std::atomic_flag".} = object + AtomicFlag* {.importcpp: "std::atomic_flag", size: 1.} = object ## An atomic boolean state. # Access operations @@ -172,8 +213,8 @@ else: # MSVC intrinsics proc interlockedExchange(location: pointer; desired: int8): int8 {.importc: "_InterlockedExchange8".} - proc interlockedExchange(location: pointer; desired: int16): int16 {.importc: "_InterlockedExchange".} - proc interlockedExchange(location: pointer; desired: int32): int32 {.importc: "_InterlockedExchange16".} + proc interlockedExchange(location: pointer; desired: int16): int16 {.importc: "_InterlockedExchange16".} + proc interlockedExchange(location: pointer; desired: int32): int32 {.importc: "_InterlockedExchange".} proc interlockedExchange(location: pointer; desired: int64): int64 {.importc: "_InterlockedExchange64".} proc interlockedCompareExchange(location: pointer; desired, expected: int8): int8 {.importc: "_InterlockedCompareExchange8".} @@ -235,10 +276,17 @@ else: cast[T](interlockedXor(addr(location.value), cast[nonAtomicType(T)](value))) else: - {.push, header: "<stdatomic.h>".} + when defined(cpp): + {.push, header: "<atomic>".} + template maybeWrapStd(x: string): string = + "std::" & x + else: + {.push, header: "<stdatomic.h>".} + template maybeWrapStd(x: string): string = + x type - MemoryOrder* {.importc: "memory_order".} = enum + MemoryOrder* {.importc: "memory_order".maybeWrapStd.} = enum moRelaxed moConsume moAcquire @@ -246,58 +294,64 @@ else: moAcquireRelease moSequentiallyConsistent - type - # Atomic* {.importcpp: "_Atomic('0)".} [T] = object + when defined(cpp): + type + # Atomic*[T] {.importcpp: "_Atomic('0)".} = object - AtomicInt8 {.importc: "_Atomic NI8".} = object - AtomicInt16 {.importc: "_Atomic NI16".} = object - AtomicInt32 {.importc: "_Atomic NI32".} = object - AtomicInt64 {.importc: "_Atomic NI64".} = object + AtomicInt8 {.importc: "std::atomic<NI8>".} = int8 + AtomicInt16 {.importc: "std::atomic<NI16>".} = int16 + AtomicInt32 {.importc: "std::atomic<NI32>".} = int32 + AtomicInt64 {.importc: "std::atomic<NI64>".} = int64 + else: + type + # Atomic*[T] {.importcpp: "_Atomic('0)".} = object - template atomicType*(T: typedesc[Trivial]): untyped = - # Maps the size of a trivial type to it's internal atomic type - when sizeof(T) == 1: AtomicInt8 - elif sizeof(T) == 2: AtomicInt16 - elif sizeof(T) == 4: AtomicInt32 - elif sizeof(T) == 8: AtomicInt64 + AtomicInt8 {.importc: "_Atomic NI8".} = int8 + AtomicInt16 {.importc: "_Atomic NI16".} = int16 + AtomicInt32 {.importc: "_Atomic NI32".} = int32 + AtomicInt64 {.importc: "_Atomic NI64".} = int64 type - AtomicFlag* {.importc: "atomic_flag".} = object + AtomicFlag* {.importc: "atomic_flag".maybeWrapStd, size: 1.} = object Atomic*[T] = object when T is Trivial: - value: T.atomicType + # Maps the size of a trivial type to it's internal atomic type + when sizeof(T) == 1: value: AtomicInt8 + elif sizeof(T) == 2: value: AtomicInt16 + elif sizeof(T) == 4: value: AtomicInt32 + elif sizeof(T) == 8: value: AtomicInt64 else: nonAtomicValue: T guard: AtomicFlag #proc init*[T](location: var Atomic[T]; value: T): T {.importcpp: "atomic_init(@)".} - proc atomic_load_explicit[T, A](location: ptr A; order: MemoryOrder): T {.importc.} - proc atomic_store_explicit[T, A](location: ptr A; desired: T; order: MemoryOrder = moSequentiallyConsistent) {.importc.} - proc atomic_exchange_explicit[T, A](location: ptr A; desired: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc.} - proc atomic_compare_exchange_strong_explicit[T, A](location: ptr A; expected: ptr T; desired: T; success, failure: MemoryOrder): bool {.importc.} - proc atomic_compare_exchange_weak_explicit[T, A](location: ptr A; expected: ptr T; desired: T; success, failure: MemoryOrder): bool {.importc.} + proc atomic_load_explicit[T, A](location: ptr A; order: MemoryOrder): T {.importc: "atomic_load_explicit".maybeWrapStd.} + proc atomic_store_explicit[T, A](location: ptr A; desired: T; order: MemoryOrder = moSequentiallyConsistent) {.importc: "atomic_store_explicit".maybeWrapStd.} + proc atomic_exchange_explicit[T, A](location: ptr A; desired: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc: "atomic_exchange_explicit".maybeWrapStd.} + proc atomic_compare_exchange_strong_explicit[T, A](location: ptr A; expected: ptr T; desired: T; success, failure: MemoryOrder): bool {.importc: "atomic_compare_exchange_strong_explicit".maybeWrapStd.} + proc atomic_compare_exchange_weak_explicit[T, A](location: ptr A; expected: ptr T; desired: T; success, failure: MemoryOrder): bool {.importc: "atomic_compare_exchange_weak_explicit".maybeWrapStd.} # Numerical operations - proc atomic_fetch_add_explicit[T, A](location: ptr A; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc.} - proc atomic_fetch_sub_explicit[T, A](location: ptr A; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc.} - proc atomic_fetch_and_explicit[T, A](location: ptr A; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc.} - proc atomic_fetch_or_explicit[T, A](location: ptr A; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc.} - proc atomic_fetch_xor_explicit[T, A](location: ptr A; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc.} + proc atomic_fetch_add_explicit[T, A](location: ptr A; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc: "atomic_fetch_add_explicit".maybeWrapStd.} + proc atomic_fetch_sub_explicit[T, A](location: ptr A; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc: "atomic_fetch_sub_explicit".maybeWrapStd.} + proc atomic_fetch_and_explicit[T, A](location: ptr A; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc: "atomic_fetch_and_explicit".maybeWrapStd.} + proc atomic_fetch_or_explicit[T, A](location: ptr A; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc: "atomic_fetch_or_explicit".maybeWrapStd.} + proc atomic_fetch_xor_explicit[T, A](location: ptr A; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc: "atomic_fetch_xor_explicit".maybeWrapStd.} # Flag operations # var ATOMIC_FLAG_INIT {.importc, nodecl.}: AtomicFlag # proc init*(location: var AtomicFlag) {.inline.} = location = ATOMIC_FLAG_INIT - proc testAndSet*(location: var AtomicFlag; order: MemoryOrder = moSequentiallyConsistent): bool {.importc: "atomic_flag_test_and_set_explicit".} - proc clear*(location: var AtomicFlag; order: MemoryOrder = moSequentiallyConsistent) {.importc: "atomic_flag_clear_explicit".} + proc testAndSet*(location: var AtomicFlag; order: MemoryOrder = moSequentiallyConsistent): bool {.importc: "atomic_flag_test_and_set_explicit".maybeWrapStd.} + proc clear*(location: var AtomicFlag; order: MemoryOrder = moSequentiallyConsistent) {.importc: "atomic_flag_clear_explicit".maybeWrapStd.} - proc fence*(order: MemoryOrder) {.importc: "atomic_thread_fence".} - proc signalFence*(order: MemoryOrder) {.importc: "atomic_signal_fence".} + proc fence*(order: MemoryOrder) {.importc: "atomic_thread_fence".maybeWrapStd.} + proc signalFence*(order: MemoryOrder) {.importc: "atomic_signal_fence".maybeWrapStd.} {.pop.} proc load*[T: Trivial](location: var Atomic[T]; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} = - cast[T](atomic_load_explicit[nonAtomicType(T), type(location.value)](addr(location.value), order)) + cast[T](atomic_load_explicit[nonAtomicType(T), typeof(location.value)](addr(location.value), order)) proc store*[T: Trivial](location: var Atomic[T]; desired: T; order: MemoryOrder = moSequentiallyConsistent) {.inline.} = atomic_store_explicit(addr(location.value), cast[nonAtomicType(T)](desired), order) proc exchange*[T: Trivial](location: var Atomic[T]; desired: T; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} = @@ -325,9 +379,11 @@ else: cast[T](atomic_fetch_xor_explicit(addr(location.value), cast[nonAtomicType(T)](value), order)) template withLock[T: not Trivial](location: var Atomic[T]; order: MemoryOrder; body: untyped): untyped = - while location.guard.testAndSet(moAcquire): discard - body - location.guard.clear(moRelease) + while testAndSet(location.guard, moAcquire): discard + try: + body + finally: + clear(location.guard, moRelease) proc load*[T: not Trivial](location: var Atomic[T]; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} = withLock(location, order): @@ -345,16 +401,14 @@ else: proc compareExchange*[T: not Trivial](location: var Atomic[T]; expected: var T; desired: T; success, failure: MemoryOrder): bool {.inline.} = withLock(location, success): if location.nonAtomicValue != expected: + expected = location.nonAtomicValue return false + expected = desired swap(location.nonAtomicValue, expected) return true proc compareExchangeWeak*[T: not Trivial](location: var Atomic[T]; expected: var T; desired: T; success, failure: MemoryOrder): bool {.inline.} = - withLock(location, success): - if location.nonAtomicValue != expected: - return false - swap(location.nonAtomicValue, expected) - return true + compareExchange(location, expected, desired, success, failure) proc compareExchange*[T: not Trivial](location: var Atomic[T]; expected: var T; desired: T; order: MemoryOrder = moSequentiallyConsistent): bool {.inline.} = compareExchange(location, expected, desired, order, order) diff --git a/lib/pure/concurrency/cpuinfo.nim b/lib/pure/concurrency/cpuinfo.nim index 515d7e2da..9bc3fd579 100644 --- a/lib/pure/concurrency/cpuinfo.nim +++ b/lib/pure/concurrency/cpuinfo.nim @@ -7,91 +7,104 @@ # distribution, for details about the copyright. # -## This module implements procs to determine the number of CPUs / cores. +## This module implements a proc to determine the number of CPUs / cores. + +runnableExamples: + doAssert countProcessors() > 0 + include "system/inclrtl" -when not defined(windows): - import posix +when defined(js): + import std/jsffi + proc countProcessorsImpl(): int = + when defined(nodejs): + let jsOs = require("os") + let jsObj = jsOs.cpus().length + else: + # `navigator.hardwareConcurrency` + # works on browser as well as deno. + let navigator{.importcpp.}: JsObject + let jsObj = navigator.hardwareConcurrency + result = jsObj.to int +else: + when defined(posix) and not (defined(macosx) or defined(bsd)): + import std/posix + + when defined(windows): + import std/private/win_getsysteminfo + + when defined(freebsd) or defined(macosx): + {.emit: "#include <sys/types.h>".} -when defined(freebsd) or defined(macosx): - {.emit:"#include <sys/types.h>".} + when defined(openbsd) or defined(netbsd): + {.emit: "#include <sys/param.h>".} -when defined(openbsd) or defined(netbsd): - {.emit:"#include <sys/param.h>".} + when defined(macosx) or defined(bsd): + # we HAVE to emit param.h before sysctl.h so we cannot use .header here + # either. The amount of archaic bullshit in Poonix based OSes is just insane. + {.emit: "#include <sys/sysctl.h>".} + {.push nodecl.} + when defined(macosx): + proc sysctlbyname(name: cstring, + oldp: pointer, oldlenp: var csize_t, + newp: pointer, newlen: csize_t): cint {.importc.} + let + CTL_HW{.importc.}: cint + HW_NCPU{.importc.}: cint + proc sysctl[I: static[int]](name: var array[I, cint], namelen: cuint, + oldp: pointer, oldlenp: var csize_t, + newp: pointer, newlen: csize_t): cint {.importc.} + {.pop.} -when defined(macosx) or defined(bsd): - # we HAVE to emit param.h before sysctl.h so we cannot use .header here - # either. The amount of archaic bullshit in Poonix based OSes is just insane. - {.emit:"#include <sys/sysctl.h>".} - const - CTL_HW = 6 - HW_AVAILCPU = 25 - HW_NCPU = 3 - proc sysctl(x: ptr array[0..3, cint], y: cint, z: pointer, - a: var csize_t, b: pointer, c: csize_t): cint {. - importc: "sysctl", nodecl.} + when defined(genode): + import genode/env -when defined(genode): - include genode/env + proc affinitySpaceTotal(env: GenodeEnvPtr): cuint {. + importcpp: "@->cpu().affinity_space().total()".} - proc affinitySpaceTotal(env: GenodeEnvPtr): cuint {. - importcpp: "@->cpu().affinity_space().total()".} + when defined(haiku): + type + SystemInfo {.importc: "system_info", header: "<OS.h>".} = object + cpuCount {.importc: "cpu_count".}: uint32 + + proc getSystemInfo(info: ptr SystemInfo): int32 {.importc: "get_system_info", + header: "<OS.h>".} + + proc countProcessorsImpl(): int {.inline.} = + when defined(windows): + var + si: SystemInfo + getSystemInfo(addr si) + result = int(si.dwNumberOfProcessors) + elif defined(macosx) or defined(bsd): + let dest = addr result + var len = sizeof(result).csize_t + when defined(macosx): + # alias of "hw.activecpu" + if sysctlbyname("hw.logicalcpu", dest, len, nil, 0) == 0: + return + var mib = [CTL_HW, HW_NCPU] + if sysctl(mib, 2, dest, len, nil, 0) == 0: + return + elif defined(hpux): + result = mpctl(MPC_GETNUMSPUS, nil, nil) + elif defined(irix): + var SC_NPROC_ONLN {.importc: "_SC_NPROC_ONLN", header: "<unistd.h>".}: cint + result = sysconf(SC_NPROC_ONLN) + elif defined(genode): + result = runtimeEnv.affinitySpaceTotal().int + elif defined(haiku): + var sysinfo: SystemInfo + if getSystemInfo(addr sysinfo) == 0: + result = sysinfo.cpuCount.int + else: + result = sysconf(SC_NPROCESSORS_ONLN) + if result < 0: result = 0 -when defined(haiku): - type - SystemInfo {.importc: "system_info", header: "<OS.h>".} = object - cpuCount {.importc: "cpu_count".}: uint32 - proc getSystemInfo(info: ptr SystemInfo): int32 {.importc: "get_system_info", - header: "<OS.h>".} proc countProcessors*(): int {.rtl, extern: "ncpi$1".} = - ## returns the number of the processors/cores the machine has. + ## Returns the number of the processors/cores the machine has. ## Returns 0 if it cannot be detected. - when defined(windows): - type - SYSTEM_INFO {.final, pure.} = object - u1: int32 - dwPageSize: int32 - lpMinimumApplicationAddress: pointer - lpMaximumApplicationAddress: pointer - dwActiveProcessorMask: ptr int32 - dwNumberOfProcessors: int32 - dwProcessorType: int32 - dwAllocationGranularity: int32 - wProcessorLevel: int16 - wProcessorRevision: int16 - - proc GetSystemInfo(lpSystemInfo: var SYSTEM_INFO) {.stdcall, dynlib: "kernel32", importc: "GetSystemInfo".} - - var - si: SYSTEM_INFO - GetSystemInfo(si) - result = si.dwNumberOfProcessors - elif defined(macosx) or defined(bsd): - var - mib: array[0..3, cint] - numCPU: int - mib[0] = CTL_HW - mib[1] = HW_AVAILCPU - var len = sizeof(numCPU).csize_t - discard sysctl(addr(mib), 2, addr(numCPU), len, nil, 0) - if numCPU < 1: - mib[1] = HW_NCPU - discard sysctl(addr(mib), 2, addr(numCPU), len, nil, 0) - result = numCPU - elif defined(hpux): - result = mpctl(MPC_GETNUMSPUS, nil, nil) - elif defined(irix): - var SC_NPROC_ONLN {.importc: "_SC_NPROC_ONLN", header: "<unistd.h>".}: cint - result = sysconf(SC_NPROC_ONLN) - elif defined(genode): - result = runtimeEnv.affinitySpaceTotal().int - elif defined(haiku): - var sysinfo: SystemInfo - if getSystemInfo(addr sysinfo) == 0: - result = sysinfo.cpuCount.int - else: - result = sysconf(SC_NPROCESSORS_ONLN) - if result <= 0: result = 0 + countProcessorsImpl() diff --git a/lib/pure/concurrency/cpuload.nim b/lib/pure/concurrency/cpuload.nim index 3ee7336f0..bfbf16721 100644 --- a/lib/pure/concurrency/cpuload.nim +++ b/lib/pure/concurrency/cpuload.nim @@ -13,11 +13,14 @@ ## Unstable API. when defined(windows): - import winlean, os, strutils, math + import std/[winlean, os, strutils, math] proc `-`(a, b: FILETIME): int64 = a.rdFileTime - b.rdFileTime elif defined(linux): - from cpuinfo import countProcessors + from std/cpuinfo import countProcessors + +when defined(nimPreviewSlimSystem): + import std/syncio type ThreadPoolAdvice* = enum @@ -84,11 +87,11 @@ proc advice*(s: var ThreadPoolState): ThreadPoolAdvice = inc s.calls when not defined(testing) and isMainModule and not defined(nimdoc): - import random + import std/random proc busyLoop() = while true: - discard random(80) + discard rand(80) os.sleep(100) spawn busyLoop() diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim index 1f7df7c00..06ed2fe54 100644 --- a/lib/pure/concurrency/threadpool.nim +++ b/lib/pure/concurrency/threadpool.nim @@ -7,20 +7,25 @@ # distribution, for details about the copyright. # -## Implements Nim's `spawn <manual_experimental.html#parallel-amp-spawn>`_. -## -## **See also:** -## * `threads module <threads.html>`_ -## * `channels module <channels.html>`_ -## * `locks module <locks.html>`_ -## * `asyncdispatch module <asyncdispatch.html>`_ +{.deprecated: "use the nimble packages `malebolgia`, `taskpools` or `weave` instead".} + +## Implements Nim's `parallel & spawn statements <manual_experimental.html#parallel-amp-spawn>`_. ## ## Unstable API. +## +## See also +## ======== +## * `threads module <typedthreads.html>`_ for basic thread support +## * `locks module <locks.html>`_ for locks and condition variables +## * `asyncdispatch module <asyncdispatch.html>`_ for asynchronous IO when not compileOption("threads"): {.error: "Threadpool requires --threads:on option.".} -import cpuinfo, cpuload, locks, os +import std/[cpuinfo, cpuload, locks, os] + +when defined(nimPreviewSlimSystem): + import std/[assertions, typedthreads, sysatomics] {.push stackTrace:off.} @@ -51,38 +56,35 @@ proc signal(cv: var Semaphore) = release(cv.L) signal(cv.c) -const CacheLineSize = 32 # true for most archs +const CacheLineSize = 64 # true for most archs type - Barrier {.compilerProc.} = object + Barrier {.compilerproc.} = object entered: int cv: Semaphore # Semaphore takes 3 words at least - when sizeof(int) < 8: - cacheAlign: array[CacheLineSize-4*sizeof(int), byte] - left: int - cacheAlign2: array[CacheLineSize-sizeof(int), byte] - interest: bool # whether the master is interested in the "all done" event + left {.align(CacheLineSize).}: int + interest {.align(CacheLineSize).} : bool # whether the master is interested in the "all done" event -proc barrierEnter(b: ptr Barrier) {.compilerProc, inline.} = +proc barrierEnter(b: ptr Barrier) {.compilerproc, inline.} = # due to the signaling between threads, it is ensured we are the only # one with access to 'entered' so we don't need 'atomicInc' here: inc b.entered # also we need no 'fence' instructions here as soon 'nimArgsPassingDone' # will be called which already will perform a fence for us. -proc barrierLeave(b: ptr Barrier) {.compilerProc, inline.} = +proc barrierLeave(b: ptr Barrier) {.compilerproc, inline.} = atomicInc b.left when not defined(x86): fence() # We may not have seen the final value of b.entered yet, # so we need to check for >= instead of ==. if b.interest and b.left >= b.entered: signal(b.cv) -proc openBarrier(b: ptr Barrier) {.compilerProc, inline.} = +proc openBarrier(b: ptr Barrier) {.compilerproc, inline.} = b.entered = 0 b.left = 0 b.interest = false -proc closeBarrier(b: ptr Barrier) {.compilerProc.} = +proc closeBarrier(b: ptr Barrier) {.compilerproc.} = fence() if b.left != b.entered: b.cv.initSemaphore() @@ -101,8 +103,8 @@ type cv: Semaphore idx: int - FlowVarBase* = ref FlowVarBaseObj ## Untyped base class for ``FlowVar[T]``. - FlowVarBaseObj = object of RootObj + FlowVarBase* = ref FlowVarBaseObj ## Untyped base class for `FlowVar[T] <#FlowVar>`_. + FlowVarBaseObj {.acyclic.} = object of RootObj ready, usesSemaphore, awaited: bool cv: Semaphore # for 'blockUntilAny' support ai: ptr AwaitInfo @@ -111,10 +113,10 @@ type # be RootRef here otherwise the wrong GC keeps track of it! owner: pointer # ptr Worker - FlowVarObj[T] = object of FlowVarBaseObj + FlowVarObj[T] {.acyclic.} = object of FlowVarBaseObj blob: T - FlowVar*{.compilerProc.}[T] = ref FlowVarObj[T] ## A data flow variable. + FlowVar*[T] {.compilerproc.} = ref FlowVarObj[T] ## A data flow variable. ToFreeQueue = object len: int @@ -137,8 +139,8 @@ type const threadpoolWaitMs {.intdefine.}: int = 100 -proc blockUntil*(fv: FlowVarBase) = - ## Waits until the value for the ``fv`` arrives. +proc blockUntil*(fv: var FlowVarBaseObj) = + ## Waits until the value for `fv` arrives. ## ## Usually it is not necessary to call this explicitly. if fv.usesSemaphore and not fv.awaited: @@ -185,7 +187,7 @@ proc attach(fv: FlowVarBase; i: int): bool = result = false release(fv.cv.L) -proc finished(fv: FlowVarBase) = +proc finished(fv: var FlowVarBaseObj) = doAssert fv.ai.isNil, "flowVar is still attached to an 'blockUntilAny'" # we have to protect against the rare cases where the owner of the flowVar # simply disregards the flowVar and yet the "flowVar" has not yet written @@ -208,16 +210,18 @@ proc finished(fv: FlowVarBase) = # the worker thread waits for "data" to be set to nil before shutting down owner.data = nil -proc fvFinalizer[T](fv: FlowVar[T]) = finished(fv) +proc `=destroy`[T](fv: var FlowVarObj[T]) = + finished(fv) + `=destroy`(fv.blob) -proc nimCreateFlowVar[T](): FlowVar[T] {.compilerProc.} = - new(result, fvFinalizer) +proc nimCreateFlowVar[T](): FlowVar[T] {.compilerproc.} = + new(result) -proc nimFlowVarCreateSemaphore(fv: FlowVarBase) {.compilerProc.} = +proc nimFlowVarCreateSemaphore(fv: FlowVarBase) {.compilerproc.} = fv.cv.initSemaphore() fv.usesSemaphore = true -proc nimFlowVarSignal(fv: FlowVarBase) {.compilerProc.} = +proc nimFlowVarSignal(fv: FlowVarBase) {.compilerproc.} = if fv.ai != nil: acquire(fv.ai.cv.L) fv.ai.idx = fv.idx @@ -228,60 +232,51 @@ proc nimFlowVarSignal(fv: FlowVarBase) {.compilerProc.} = signal(fv.cv) proc awaitAndThen*[T](fv: FlowVar[T]; action: proc (x: T) {.closure.}) = - ## Blocks until the ``fv`` is available and then passes its value - ## to ``action``. + ## Blocks until `fv` is available and then passes its value + ## to `action`. ## - ## Note that due to Nim's parameter passing semantics this - ## means that ``T`` doesn't need to be copied so ``awaitAndThen`` can - ## sometimes be more efficient than `^ proc <#^,FlowVar[T]>`_. - blockUntil(fv) - when T is string or T is seq: + ## Note that due to Nim's parameter passing semantics, this + ## means that `T` doesn't need to be copied, so `awaitAndThen` can + ## sometimes be more efficient than the `^ proc <#^,FlowVar[T]>`_. + blockUntil(fv[]) + when defined(nimV2): + action(fv.blob) + elif T is string or T is seq: action(cast[T](fv.data)) elif T is ref: {.error: "'awaitAndThen' not available for FlowVar[ref]".} else: action(fv.blob) - finished(fv) + finished(fv[]) proc unsafeRead*[T](fv: FlowVar[ref T]): ptr T = ## Blocks until the value is available and then returns this value. - blockUntil(fv) - result = cast[ptr T](fv.data) - finished(fv) - -proc `^`*[T](fv: FlowVar[ref T]): ref T = - ## Blocks until the value is available and then returns this value. - blockUntil(fv) - let src = cast[ref T](fv.data) + blockUntil(fv[]) when defined(nimV2): - result = src + result = cast[ptr T](fv.blob) else: - deepCopy result, src - finished(fv) + result = cast[ptr T](fv.data) + finished(fv[]) proc `^`*[T](fv: FlowVar[T]): T = ## Blocks until the value is available and then returns this value. - blockUntil(fv) - when T is string or T is seq: - let src = cast[T](fv.data) - when defined(nimV2): - result = src - else: - deepCopy result, src + blockUntil(fv[]) + when not defined(nimV2) and (T is string or T is seq or T is ref): + deepCopy result, cast[T](fv.data) else: result = fv.blob - finished(fv) + finished(fv[]) proc blockUntilAny*(flowVars: openArray[FlowVarBase]): int = - ## Awaits any of the given ``flowVars``. Returns the index of one ``flowVar`` + ## Awaits any of the given `flowVars`. Returns the index of one `flowVar` ## for which a value arrived. ## - ## A ``flowVar`` only supports one call to ``blockUntilAny`` at the same time. - ## That means if you ``blockUntilAny([a,b])`` and ``blockUntilAny([b,c])`` - ## the second call will only block until ``c``. If there is no ``flowVar`` left + ## A `flowVar` only supports one call to `blockUntilAny` at the same time. + ## That means if you `blockUntilAny([a,b])` and `blockUntilAny([b,c])` + ## the second call will only block until `c`. If there is no `flowVar` left ## to be able to wait on, -1 is returned. ## - ## **Note**: This results in non-deterministic behaviour and should be avoided. + ## **Note:** This results in non-deterministic behaviour and should be avoided. var ai: AwaitInfo ai.cv.initSemaphore() var conflicts = 0 @@ -302,9 +297,9 @@ proc blockUntilAny*(flowVars: openArray[FlowVarBase]): int = destroySemaphore(ai.cv) proc isReady*(fv: FlowVarBase): bool = - ## Determines whether the specified ``FlowVarBase``'s value is available. + ## Determines whether the specified `FlowVarBase`'s value is available. ## - ## If ``true``, awaiting ``fv`` will not block. + ## If `true`, awaiting `fv` will not block. if fv.usesSemaphore and not fv.awaited: acquire(fv.cv.L) result = fv.cv.counter > 0 @@ -312,31 +307,31 @@ proc isReady*(fv: FlowVarBase): bool = else: result = true -proc nimArgsPassingDone(p: pointer) {.compilerProc.} = +proc nimArgsPassingDone(p: pointer) {.compilerproc.} = let w = cast[ptr Worker](p) signal(w.taskStarted) const - MaxThreadPoolSize* = 256 ## Maximum size of the thread pool. 256 threads - ## should be good enough for anybody ;-) - MaxDistinguishedThread* = 32 ## Maximum number of "distinguished" threads. + MaxThreadPoolSize* {.intdefine.} = 256 ## Maximum size of the thread pool. 256 threads + ## should be good enough for anybody ;-) + MaxDistinguishedThread* {.intdefine.} = 32 ## Maximum number of "distinguished" threads. type - ThreadId* = range[0..MaxDistinguishedThread-1] + ThreadId* = range[0..MaxDistinguishedThread-1] ## A thread identifier. var currentPoolSize: int maxPoolSize = MaxThreadPoolSize minPoolSize = 4 - gSomeReady : Semaphore + gSomeReady: Semaphore readyWorker: ptr Worker # A workaround for recursion deadlock issue # https://github.com/nim-lang/Nim/issues/4597 var numSlavesLock: Lock - numSlavesRunning {.guard: numSlavesLock}: int - numSlavesWaiting {.guard: numSlavesLock}: int + numSlavesRunning {.guard: numSlavesLock.}: int + numSlavesWaiting {.guard: numSlavesLock.}: int isSlave {.threadvar.}: bool numSlavesLock.initLock @@ -409,7 +404,7 @@ proc setMinPoolSize*(size: range[1..MaxThreadPoolSize]) = proc setMaxPoolSize*(size: range[1..MaxThreadPoolSize]) = ## Sets the maximum thread pool size. The default value of this - ## is ``MaxThreadPoolSize`` (256). + ## is `MaxThreadPoolSize <#MaxThreadPoolSize>`_. maxPoolSize = size if currentPoolSize > maxPoolSize: for i in maxPoolSize..currentPoolSize-1: @@ -449,43 +444,45 @@ proc setup() = for i in 0..<currentPoolSize: activateWorkerThread(i) proc preferSpawn*(): bool = - ## Use this proc to determine quickly if a ``spawn`` or a direct call is + ## Use this proc to determine quickly if a `spawn` or a direct call is ## preferable. ## - ## If it returns ``true``, a ``spawn`` may make sense. In general - ## it is not necessary to call this directly; use `spawnX template + ## If it returns `true`, a `spawn` may make sense. In general + ## it is not necessary to call this directly; use the `spawnX template ## <#spawnX.t>`_ instead. result = gSomeReady.counter > 0 -proc spawn*(call: typed): void {.magic: "Spawn".} - ## Always spawns a new task, so that the ``call`` is never executed on +proc spawn*(call: sink typed) {.magic: "Spawn".} = + ## Always spawns a new task, so that the `call` is never executed on ## the calling thread. ## - ## ``call`` has to be proc call ``p(...)`` where ``p`` is gcsafe and has a - ## return type that is either ``void`` or compatible with ``FlowVar[T]``. + ## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a + ## return type that is either `void` or compatible with `FlowVar[T]`. + discard "It uses `nimSpawn3` internally" -proc pinnedSpawn*(id: ThreadId; call: typed): void {.magic: "Spawn".} - ## Always spawns a new task on the worker thread with ``id``, so that - ## the ``call`` is **always** executed on the thread. +proc pinnedSpawn*(id: ThreadId; call: sink typed) {.magic: "Spawn".} = + ## Always spawns a new task on the worker thread with `id`, so that + ## the `call` is **always** executed on the thread. ## - ## ``call`` has to be proc call ``p(...)`` where ``p`` is gcsafe and has a - ## return type that is either ``void`` or compatible with ``FlowVar[T]``. + ## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a + ## return type that is either `void` or compatible with `FlowVar[T]`. + discard "It uses `nimSpawn4` internally" -template spawnX*(call): void = +template spawnX*(call) = ## Spawns a new task if a CPU core is ready, otherwise executes the ## call in the calling thread. ## - ## Usually it is advised to use `spawn proc <#spawn,typed>`_ in order to - ## not block the producer for an unknown amount of time. + ## Usually, it is advised to use the `spawn proc <#spawn,sinktyped>`_ + ## in order to not block the producer for an unknown amount of time. ## - ## ``call`` has to be proc call ``p(...)`` where ``p`` is gcsafe and has a - ## return type that is either 'void' or compatible with ``FlowVar[T]``. + ## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a + ## return type that is either 'void' or compatible with `FlowVar[T]`. (if preferSpawn(): spawn call else: call) proc parallel*(body: untyped) {.magic: "Parallel".} ## A parallel section can be used to execute a block in parallel. ## - ## ``body`` has to be in a DSL that is a particular subset of the language. + ## `body` has to be in a DSL that is a particular subset of the language. ## ## Please refer to `the manual <manual_experimental.html#parallel-amp-spawn>`_ ## for further information. @@ -496,7 +493,7 @@ var initLock stateLock -proc nimSpawn3(fn: WorkerProc; data: pointer) {.compilerProc.} = +proc nimSpawn3(fn: WorkerProc; data: pointer) {.compilerproc.} = # implementation of 'spawn' that is used by the code generator. while true: if selectWorker(readyWorker, fn, data): return @@ -581,7 +578,7 @@ var initLock distinguishedLock -proc nimSpawn4(fn: WorkerProc; data: pointer; id: ThreadId) {.compilerProc.} = +proc nimSpawn4(fn: WorkerProc; data: pointer; id: ThreadId) {.compilerproc.} = acquire(distinguishedLock) if not distinguishedData[id].initialized: activateDistinguishedThread(id) @@ -592,7 +589,7 @@ proc nimSpawn4(fn: WorkerProc; data: pointer; id: ThreadId) {.compilerProc.} = proc sync*() = - ## A simple barrier to wait for all ``spawn``'ed tasks. + ## A simple barrier to wait for all `spawn`ed tasks. ## ## If you need more elaborate waiting, you have to use an explicit barrier. while true: |