diff options
Diffstat (limited to 'lib/pure/concurrency')
-rw-r--r-- | lib/pure/concurrency/atomics.nim | 433 | ||||
-rw-r--r-- | lib/pure/concurrency/cpuinfo.nim | 139 | ||||
-rw-r--r-- | lib/pure/concurrency/cpuload.nim | 56 | ||||
-rw-r--r-- | lib/pure/concurrency/threadpool.nim | 453 |
4 files changed, 886 insertions, 195 deletions
diff --git a/lib/pure/concurrency/atomics.nim b/lib/pure/concurrency/atomics.nim new file mode 100644 index 000000000..818f1b37a --- /dev/null +++ b/lib/pure/concurrency/atomics.nim @@ -0,0 +1,433 @@ +# +# +# Nim's Runtime Library +# (c) Copyright 2018 Jörg Wollenschläger +# +# See the file "copying.txt", included in this +# distribution, for details about the copyright. +# + +## Types and operations for atomic operations and lockless algorithms. +## +## Unstable API. +## +## By default, C++ uses C11 atomic primitives. To use C++ `std::atomic`, +## `-d:nimUseCppAtomics` can be defined. + +runnableExamples: + # Atomic + var loc: Atomic[int] + loc.store(4) + assert loc.load == 4 + loc.store(2) + assert loc.load(moRelaxed) == 2 + loc.store(9) + assert loc.load(moAcquire) == 9 + loc.store(0, moRelease) + assert loc.load == 0 + + assert loc.exchange(7) == 0 + assert loc.load == 7 + + var expected = 7 + assert loc.compareExchange(expected, 5, moRelaxed, moRelaxed) + assert expected == 7 + assert loc.load == 5 + + assert not loc.compareExchange(expected, 12, moRelaxed, moRelaxed) + assert expected == 5 + assert loc.load == 5 + + assert loc.fetchAdd(1) == 5 + assert loc.fetchAdd(2) == 6 + assert loc.fetchSub(3) == 8 + + loc.atomicInc(1) + assert loc.load == 6 + + # AtomicFlag + var flag: AtomicFlag + + assert not flag.testAndSet + assert flag.testAndSet + flag.clear(moRelaxed) + assert not flag.testAndSet + +when (defined(cpp) and defined(nimUseCppAtomics)) or defined(nimdoc): + # For the C++ backend, types and operations map directly to C++11 atomics. + + {.push, header: "<atomic>".} + + type + MemoryOrder* {.importcpp: "std::memory_order".} = enum + ## Specifies how non-atomic operations can be reordered around atomic + ## operations. + + moRelaxed + ## No ordering constraints. Only the atomicity and ordering against + ## other atomic operations is guaranteed. + + moConsume + ## This ordering is currently discouraged as it's semantics are + ## being revised. Acquire operations should be preferred. + + moAcquire + ## When applied to a load operation, no reads or writes in the + ## current thread can be reordered before this operation. + + moRelease + ## When applied to a store operation, no reads or writes in the + ## current thread can be reorderd after this operation. + + moAcquireRelease + ## When applied to a read-modify-write operation, this behaves like + ## both an acquire and a release operation. + + moSequentiallyConsistent + ## Behaves like Acquire when applied to load, like Release when + ## applied to a store and like AcquireRelease when applied to a + ## read-modify-write operation. + ## Also guarantees that all threads observe the same total ordering + ## with other moSequentiallyConsistent operations. + + type + Atomic*[T] {.importcpp: "std::atomic", completeStruct.} = object + ## An atomic object with underlying type `T`. + raw: T + + AtomicFlag* {.importcpp: "std::atomic_flag", size: 1.} = object + ## An atomic boolean state. + + # Access operations + + proc load*[T](location: var Atomic[T]; order: MemoryOrder = moSequentiallyConsistent): T {.importcpp: "#.load(@)".} + ## Atomically obtains the value of the atomic object. + + proc store*[T](location: var Atomic[T]; desired: T; order: MemoryOrder = moSequentiallyConsistent) {.importcpp: "#.store(@)".} + ## Atomically replaces the value of the atomic object with the `desired` + ## value. + + proc exchange*[T](location: var Atomic[T]; desired: T; order: MemoryOrder = moSequentiallyConsistent): T {.importcpp: "#.exchange(@)".} + ## Atomically replaces the value of the atomic object with the `desired` + ## value and returns the old value. + + proc compareExchange*[T](location: var Atomic[T]; expected: var T; desired: T; order: MemoryOrder = moSequentiallyConsistent): bool {.importcpp: "#.compare_exchange_strong(@)".} + ## Atomically compares the value of the atomic object with the `expected` + ## value and performs exchange with the `desired` one if equal or load if + ## not. Returns true if the exchange was successful. + + proc compareExchange*[T](location: var Atomic[T]; expected: var T; desired: T; success, failure: MemoryOrder): bool {.importcpp: "#.compare_exchange_strong(@)".} + ## Same as above, but allows for different memory orders for success and + ## failure. + + proc compareExchangeWeak*[T](location: var Atomic[T]; expected: var T; desired: T; order: MemoryOrder = moSequentiallyConsistent): bool {.importcpp: "#.compare_exchange_weak(@)".} + ## Same as above, but is allowed to fail spuriously. + + proc compareExchangeWeak*[T](location: var Atomic[T]; expected: var T; desired: T; success, failure: MemoryOrder): bool {.importcpp: "#.compare_exchange_weak(@)".} + ## Same as above, but allows for different memory orders for success and + ## failure. + + # Numerical operations + + proc fetchAdd*[T: SomeInteger](location: var Atomic[T]; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importcpp: "#.fetch_add(@)".} + ## Atomically adds a `value` to the atomic integer and returns the + ## original value. + + proc fetchSub*[T: SomeInteger](location: var Atomic[T]; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importcpp: "#.fetch_sub(@)".} + ## Atomically subtracts a `value` to the atomic integer and returns the + ## original value. + + proc fetchAnd*[T: SomeInteger](location: var Atomic[T]; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importcpp: "#.fetch_and(@)".} + ## Atomically replaces the atomic integer with it's bitwise AND + ## with the specified `value` and returns the original value. + + proc fetchOr*[T: SomeInteger](location: var Atomic[T]; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importcpp: "#.fetch_or(@)".} + ## Atomically replaces the atomic integer with it's bitwise OR + ## with the specified `value` and returns the original value. + + proc fetchXor*[T: SomeInteger](location: var Atomic[T]; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importcpp: "#.fetch_xor(@)".} + ## Atomically replaces the atomic integer with it's bitwise XOR + ## with the specified `value` and returns the original value. + + # Flag operations + + proc testAndSet*(location: var AtomicFlag; order: MemoryOrder = moSequentiallyConsistent): bool {.importcpp: "#.test_and_set(@)".} + ## Atomically sets the atomic flag to true and returns the original value. + + proc clear*(location: var AtomicFlag; order: MemoryOrder = moSequentiallyConsistent) {.importcpp: "#.clear(@)".} + ## Atomically sets the value of the atomic flag to false. + + proc fence*(order: MemoryOrder) {.importcpp: "std::atomic_thread_fence(@)".} + ## Ensures memory ordering without using atomic operations. + + proc signalFence*(order: MemoryOrder) {.importcpp: "std::atomic_signal_fence(@)".} + ## Prevents reordering of accesses by the compiler as would fence, but + ## inserts no CPU instructions for memory ordering. + + {.pop.} + +else: + # For the C backend, atomics map to C11 built-ins on GCC and Clang for + # trivial Nim types. Other types are implemented using spin locks. + # This could be overcome by supporting advanced importc-patterns. + + # Since MSVC does not implement C11, we fall back to MS intrinsics + # where available. + + type + Trivial = SomeNumber | bool | enum | ptr | pointer + # A type that is known to be atomic and whose size is known at + # compile time to be 8 bytes or less + + template nonAtomicType*(T: typedesc[Trivial]): untyped = + # Maps types to integers of the same size + when sizeof(T) == 1: int8 + elif sizeof(T) == 2: int16 + elif sizeof(T) == 4: int32 + elif sizeof(T) == 8: int64 + + when defined(vcc): + + # TODO: Trivial types should be volatile and use VC's special volatile + # semantics for store and loads. + + type + MemoryOrder* = enum + moRelaxed + moConsume + moAcquire + moRelease + moAcquireRelease + moSequentiallyConsistent + + Atomic*[T] = object + when T is Trivial: + value: T.nonAtomicType + else: + nonAtomicValue: T + guard: AtomicFlag + + AtomicFlag* = distinct int8 + + {.push header: "<intrin.h>".} + + # MSVC intrinsics + proc interlockedExchange(location: pointer; desired: int8): int8 {.importc: "_InterlockedExchange8".} + proc interlockedExchange(location: pointer; desired: int16): int16 {.importc: "_InterlockedExchange16".} + proc interlockedExchange(location: pointer; desired: int32): int32 {.importc: "_InterlockedExchange".} + proc interlockedExchange(location: pointer; desired: int64): int64 {.importc: "_InterlockedExchange64".} + + proc interlockedCompareExchange(location: pointer; desired, expected: int8): int8 {.importc: "_InterlockedCompareExchange8".} + proc interlockedCompareExchange(location: pointer; desired, expected: int16): int16 {.importc: "_InterlockedCompareExchange16".} + proc interlockedCompareExchange(location: pointer; desired, expected: int32): int32 {.importc: "_InterlockedCompareExchange".} + proc interlockedCompareExchange(location: pointer; desired, expected: int64): int64 {.importc: "_InterlockedCompareExchange64".} + + proc interlockedAnd(location: pointer; value: int8): int8 {.importc: "_InterlockedAnd8".} + proc interlockedAnd(location: pointer; value: int16): int16 {.importc: "_InterlockedAnd16".} + proc interlockedAnd(location: pointer; value: int32): int32 {.importc: "_InterlockedAnd".} + proc interlockedAnd(location: pointer; value: int64): int64 {.importc: "_InterlockedAnd64".} + + proc interlockedOr(location: pointer; value: int8): int8 {.importc: "_InterlockedOr8".} + proc interlockedOr(location: pointer; value: int16): int16 {.importc: "_InterlockedOr16".} + proc interlockedOr(location: pointer; value: int32): int32 {.importc: "_InterlockedOr".} + proc interlockedOr(location: pointer; value: int64): int64 {.importc: "_InterlockedOr64".} + + proc interlockedXor(location: pointer; value: int8): int8 {.importc: "_InterlockedXor8".} + proc interlockedXor(location: pointer; value: int16): int16 {.importc: "_InterlockedXor16".} + proc interlockedXor(location: pointer; value: int32): int32 {.importc: "_InterlockedXor".} + proc interlockedXor(location: pointer; value: int64): int64 {.importc: "_InterlockedXor64".} + + proc fence(order: MemoryOrder): int64 {.importc: "_ReadWriteBarrier()".} + proc signalFence(order: MemoryOrder): int64 {.importc: "_ReadWriteBarrier()".} + + {.pop.} + + proc testAndSet*(location: var AtomicFlag; order: MemoryOrder = moSequentiallyConsistent): bool = + interlockedOr(addr(location), 1'i8) == 1'i8 + proc clear*(location: var AtomicFlag; order: MemoryOrder = moSequentiallyConsistent) = + discard interlockedAnd(addr(location), 0'i8) + + proc load*[T: Trivial](location: var Atomic[T]; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} = + cast[T](interlockedOr(addr(location.value), (nonAtomicType(T))0)) + proc store*[T: Trivial](location: var Atomic[T]; desired: T; order: MemoryOrder = moSequentiallyConsistent) {.inline.} = + discard interlockedExchange(addr(location.value), cast[nonAtomicType(T)](desired)) + + proc exchange*[T: Trivial](location: var Atomic[T]; desired: T; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} = + cast[T](interlockedExchange(addr(location.value), cast[int64](desired))) + proc compareExchange*[T: Trivial](location: var Atomic[T]; expected: var T; desired: T; success, failure: MemoryOrder): bool {.inline.} = + cast[T](interlockedCompareExchange(addr(location.value), cast[nonAtomicType(T)](desired), cast[nonAtomicType(T)](expected))) == expected + proc compareExchange*[T: Trivial](location: var Atomic[T]; expected: var T; desired: T; order: MemoryOrder = moSequentiallyConsistent): bool {.inline.} = + compareExchange(location, expected, desired, order, order) + proc compareExchangeWeak*[T: Trivial](location: var Atomic[T]; expected: var T; desired: T; success, failure: MemoryOrder): bool {.inline.} = + compareExchange(location, expected, desired, success, failure) + proc compareExchangeWeak*[T: Trivial](location: var Atomic[T]; expected: var T; desired: T; order: MemoryOrder = moSequentiallyConsistent): bool {.inline.} = + compareExchangeWeak(location, expected, desired, order, order) + + proc fetchAdd*[T: SomeInteger](location: var Atomic[T]; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} = + var currentValue = location.load() + while not compareExchangeWeak(location, currentValue, currentValue + value): discard + proc fetchSub*[T: SomeInteger](location: var Atomic[T]; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} = + fetchAdd(location, -value, order) + proc fetchAnd*[T: SomeInteger](location: var Atomic[T]; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} = + cast[T](interlockedAnd(addr(location.value), cast[nonAtomicType(T)](value))) + proc fetchOr*[T: SomeInteger](location: var Atomic[T]; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} = + cast[T](interlockedOr(addr(location.value), cast[nonAtomicType(T)](value))) + proc fetchXor*[T: SomeInteger](location: var Atomic[T]; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} = + cast[T](interlockedXor(addr(location.value), cast[nonAtomicType(T)](value))) + + else: + when defined(cpp): + {.push, header: "<atomic>".} + template maybeWrapStd(x: string): string = + "std::" & x + else: + {.push, header: "<stdatomic.h>".} + template maybeWrapStd(x: string): string = + x + + type + MemoryOrder* {.importc: "memory_order".maybeWrapStd.} = enum + moRelaxed + moConsume + moAcquire + moRelease + moAcquireRelease + moSequentiallyConsistent + + when defined(cpp): + type + # Atomic*[T] {.importcpp: "_Atomic('0)".} = object + + AtomicInt8 {.importc: "std::atomic<NI8>".} = int8 + AtomicInt16 {.importc: "std::atomic<NI16>".} = int16 + AtomicInt32 {.importc: "std::atomic<NI32>".} = int32 + AtomicInt64 {.importc: "std::atomic<NI64>".} = int64 + else: + type + # Atomic*[T] {.importcpp: "_Atomic('0)".} = object + + AtomicInt8 {.importc: "_Atomic NI8".} = int8 + AtomicInt16 {.importc: "_Atomic NI16".} = int16 + AtomicInt32 {.importc: "_Atomic NI32".} = int32 + AtomicInt64 {.importc: "_Atomic NI64".} = int64 + + type + AtomicFlag* {.importc: "atomic_flag".maybeWrapStd, size: 1.} = object + + Atomic*[T] = object + when T is Trivial: + # Maps the size of a trivial type to it's internal atomic type + when sizeof(T) == 1: value: AtomicInt8 + elif sizeof(T) == 2: value: AtomicInt16 + elif sizeof(T) == 4: value: AtomicInt32 + elif sizeof(T) == 8: value: AtomicInt64 + else: + nonAtomicValue: T + guard: AtomicFlag + + #proc init*[T](location: var Atomic[T]; value: T): T {.importcpp: "atomic_init(@)".} + proc atomic_load_explicit[T, A](location: ptr A; order: MemoryOrder): T {.importc: "atomic_load_explicit".maybeWrapStd.} + proc atomic_store_explicit[T, A](location: ptr A; desired: T; order: MemoryOrder = moSequentiallyConsistent) {.importc: "atomic_store_explicit".maybeWrapStd.} + proc atomic_exchange_explicit[T, A](location: ptr A; desired: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc: "atomic_exchange_explicit".maybeWrapStd.} + proc atomic_compare_exchange_strong_explicit[T, A](location: ptr A; expected: ptr T; desired: T; success, failure: MemoryOrder): bool {.importc: "atomic_compare_exchange_strong_explicit".maybeWrapStd.} + proc atomic_compare_exchange_weak_explicit[T, A](location: ptr A; expected: ptr T; desired: T; success, failure: MemoryOrder): bool {.importc: "atomic_compare_exchange_weak_explicit".maybeWrapStd.} + + # Numerical operations + proc atomic_fetch_add_explicit[T, A](location: ptr A; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc: "atomic_fetch_add_explicit".maybeWrapStd.} + proc atomic_fetch_sub_explicit[T, A](location: ptr A; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc: "atomic_fetch_sub_explicit".maybeWrapStd.} + proc atomic_fetch_and_explicit[T, A](location: ptr A; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc: "atomic_fetch_and_explicit".maybeWrapStd.} + proc atomic_fetch_or_explicit[T, A](location: ptr A; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc: "atomic_fetch_or_explicit".maybeWrapStd.} + proc atomic_fetch_xor_explicit[T, A](location: ptr A; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.importc: "atomic_fetch_xor_explicit".maybeWrapStd.} + + # Flag operations + # var ATOMIC_FLAG_INIT {.importc, nodecl.}: AtomicFlag + # proc init*(location: var AtomicFlag) {.inline.} = location = ATOMIC_FLAG_INIT + proc testAndSet*(location: var AtomicFlag; order: MemoryOrder = moSequentiallyConsistent): bool {.importc: "atomic_flag_test_and_set_explicit".maybeWrapStd.} + proc clear*(location: var AtomicFlag; order: MemoryOrder = moSequentiallyConsistent) {.importc: "atomic_flag_clear_explicit".maybeWrapStd.} + + proc fence*(order: MemoryOrder) {.importc: "atomic_thread_fence".maybeWrapStd.} + proc signalFence*(order: MemoryOrder) {.importc: "atomic_signal_fence".maybeWrapStd.} + + {.pop.} + + proc load*[T: Trivial](location: var Atomic[T]; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} = + cast[T](atomic_load_explicit[nonAtomicType(T), typeof(location.value)](addr(location.value), order)) + proc store*[T: Trivial](location: var Atomic[T]; desired: T; order: MemoryOrder = moSequentiallyConsistent) {.inline.} = + atomic_store_explicit(addr(location.value), cast[nonAtomicType(T)](desired), order) + proc exchange*[T: Trivial](location: var Atomic[T]; desired: T; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} = + cast[T](atomic_exchange_explicit(addr(location.value), cast[nonAtomicType(T)](desired), order)) + proc compareExchange*[T: Trivial](location: var Atomic[T]; expected: var T; desired: T; success, failure: MemoryOrder): bool {.inline.} = + atomic_compare_exchange_strong_explicit(addr(location.value), cast[ptr nonAtomicType(T)](addr(expected)), cast[nonAtomicType(T)](desired), success, failure) + proc compareExchange*[T: Trivial](location: var Atomic[T]; expected: var T; desired: T; order: MemoryOrder = moSequentiallyConsistent): bool {.inline.} = + compareExchange(location, expected, desired, order, order) + + proc compareExchangeWeak*[T: Trivial](location: var Atomic[T]; expected: var T; desired: T; success, failure: MemoryOrder): bool {.inline.} = + atomic_compare_exchange_weak_explicit(addr(location.value), cast[ptr nonAtomicType(T)](addr(expected)), cast[nonAtomicType(T)](desired), success, failure) + proc compareExchangeWeak*[T: Trivial](location: var Atomic[T]; expected: var T; desired: T; order: MemoryOrder = moSequentiallyConsistent): bool {.inline.} = + compareExchangeWeak(location, expected, desired, order, order) + + # Numerical operations + proc fetchAdd*[T: SomeInteger](location: var Atomic[T]; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} = + cast[T](atomic_fetch_add_explicit(addr(location.value), cast[nonAtomicType(T)](value), order)) + proc fetchSub*[T: SomeInteger](location: var Atomic[T]; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} = + cast[T](atomic_fetch_sub_explicit(addr(location.value), cast[nonAtomicType(T)](value), order)) + proc fetchAnd*[T: SomeInteger](location: var Atomic[T]; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} = + cast[T](atomic_fetch_and_explicit(addr(location.value), cast[nonAtomicType(T)](value), order)) + proc fetchOr*[T: SomeInteger](location: var Atomic[T]; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} = + cast[T](atomic_fetch_or_explicit(addr(location.value), cast[nonAtomicType(T)](value), order)) + proc fetchXor*[T: SomeInteger](location: var Atomic[T]; value: T; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} = + cast[T](atomic_fetch_xor_explicit(addr(location.value), cast[nonAtomicType(T)](value), order)) + + template withLock[T: not Trivial](location: var Atomic[T]; order: MemoryOrder; body: untyped): untyped = + while testAndSet(location.guard, moAcquire): discard + try: + body + finally: + clear(location.guard, moRelease) + + proc load*[T: not Trivial](location: var Atomic[T]; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} = + withLock(location, order): + result = location.nonAtomicValue + + proc store*[T: not Trivial](location: var Atomic[T]; desired: T; order: MemoryOrder = moSequentiallyConsistent) {.inline.} = + withLock(location, order): + location.nonAtomicValue = desired + + proc exchange*[T: not Trivial](location: var Atomic[T]; desired: T; order: MemoryOrder = moSequentiallyConsistent): T {.inline.} = + withLock(location, order): + result = location.nonAtomicValue + location.nonAtomicValue = desired + + proc compareExchange*[T: not Trivial](location: var Atomic[T]; expected: var T; desired: T; success, failure: MemoryOrder): bool {.inline.} = + withLock(location, success): + if location.nonAtomicValue != expected: + expected = location.nonAtomicValue + return false + expected = desired + swap(location.nonAtomicValue, expected) + return true + + proc compareExchangeWeak*[T: not Trivial](location: var Atomic[T]; expected: var T; desired: T; success, failure: MemoryOrder): bool {.inline.} = + compareExchange(location, expected, desired, success, failure) + + proc compareExchange*[T: not Trivial](location: var Atomic[T]; expected: var T; desired: T; order: MemoryOrder = moSequentiallyConsistent): bool {.inline.} = + compareExchange(location, expected, desired, order, order) + + proc compareExchangeWeak*[T: not Trivial](location: var Atomic[T]; expected: var T; desired: T; order: MemoryOrder = moSequentiallyConsistent): bool {.inline.} = + compareExchangeWeak(location, expected, desired, order, order) + +proc atomicInc*[T: SomeInteger](location: var Atomic[T]; value: T = 1) {.inline.} = + ## Atomically increments the atomic integer by some `value`. + discard location.fetchAdd(value) + +proc atomicDec*[T: SomeInteger](location: var Atomic[T]; value: T = 1) {.inline.} = + ## Atomically decrements the atomic integer by some `value`. + discard location.fetchSub(value) + +proc `+=`*[T: SomeInteger](location: var Atomic[T]; value: T) {.inline.} = + ## Atomically increments the atomic integer by some `value`. + discard location.fetchAdd(value) + +proc `-=`*[T: SomeInteger](location: var Atomic[T]; value: T) {.inline.} = + ## Atomically decrements the atomic integer by some `value`. + discard location.fetchSub(value) diff --git a/lib/pure/concurrency/cpuinfo.nim b/lib/pure/concurrency/cpuinfo.nim index 6f2bc4491..9bc3fd579 100644 --- a/lib/pure/concurrency/cpuinfo.nim +++ b/lib/pure/concurrency/cpuinfo.nim @@ -7,61 +7,104 @@ # distribution, for details about the copyright. # -## This module implements procs to determine the number of CPUs / cores. +## This module implements a proc to determine the number of CPUs / cores. + +runnableExamples: + doAssert countProcessors() > 0 + include "system/inclrtl" -import strutils, os +when defined(js): + import std/jsffi + proc countProcessorsImpl(): int = + when defined(nodejs): + let jsOs = require("os") + let jsObj = jsOs.cpus().length + else: + # `navigator.hardwareConcurrency` + # works on browser as well as deno. + let navigator{.importcpp.}: JsObject + let jsObj = navigator.hardwareConcurrency + result = jsObj.to int +else: + when defined(posix) and not (defined(macosx) or defined(bsd)): + import std/posix + + when defined(windows): + import std/private/win_getsysteminfo + + when defined(freebsd) or defined(macosx): + {.emit: "#include <sys/types.h>".} + + when defined(openbsd) or defined(netbsd): + {.emit: "#include <sys/param.h>".} -when not defined(windows): - import posix + when defined(macosx) or defined(bsd): + # we HAVE to emit param.h before sysctl.h so we cannot use .header here + # either. The amount of archaic bullshit in Poonix based OSes is just insane. + {.emit: "#include <sys/sysctl.h>".} + {.push nodecl.} + when defined(macosx): + proc sysctlbyname(name: cstring, + oldp: pointer, oldlenp: var csize_t, + newp: pointer, newlen: csize_t): cint {.importc.} + let + CTL_HW{.importc.}: cint + HW_NCPU{.importc.}: cint + proc sysctl[I: static[int]](name: var array[I, cint], namelen: cuint, + oldp: pointer, oldlenp: var csize_t, + newp: pointer, newlen: csize_t): cint {.importc.} + {.pop.} -when defined(linux): - import linux - -when defined(freebsd) or defined(macosx): - {.emit:"#include <sys/types.h>".} + when defined(genode): + import genode/env + + proc affinitySpaceTotal(env: GenodeEnvPtr): cuint {. + importcpp: "@->cpu().affinity_space().total()".} + + when defined(haiku): + type + SystemInfo {.importc: "system_info", header: "<OS.h>".} = object + cpuCount {.importc: "cpu_count".}: uint32 + + proc getSystemInfo(info: ptr SystemInfo): int32 {.importc: "get_system_info", + header: "<OS.h>".} + + proc countProcessorsImpl(): int {.inline.} = + when defined(windows): + var + si: SystemInfo + getSystemInfo(addr si) + result = int(si.dwNumberOfProcessors) + elif defined(macosx) or defined(bsd): + let dest = addr result + var len = sizeof(result).csize_t + when defined(macosx): + # alias of "hw.activecpu" + if sysctlbyname("hw.logicalcpu", dest, len, nil, 0) == 0: + return + var mib = [CTL_HW, HW_NCPU] + if sysctl(mib, 2, dest, len, nil, 0) == 0: + return + elif defined(hpux): + result = mpctl(MPC_GETNUMSPUS, nil, nil) + elif defined(irix): + var SC_NPROC_ONLN {.importc: "_SC_NPROC_ONLN", header: "<unistd.h>".}: cint + result = sysconf(SC_NPROC_ONLN) + elif defined(genode): + result = runtimeEnv.affinitySpaceTotal().int + elif defined(haiku): + var sysinfo: SystemInfo + if getSystemInfo(addr sysinfo) == 0: + result = sysinfo.cpuCount.int + else: + result = sysconf(SC_NPROCESSORS_ONLN) + if result < 0: result = 0 -when defined(openbsd) or defined(netbsd): - {.emit:"#include <sys/param.h>".} -when defined(macosx) or defined(bsd): - # we HAVE to emit param.h before sysctl.h so we cannot use .header here - # either. The amount of archaic bullshit in Poonix based OSes is just insane. - {.emit:"#include <sys/sysctl.h>".} - const - CTL_HW = 6 - HW_AVAILCPU = 25 - HW_NCPU = 3 - proc sysctl(x: ptr array[0..3, cint], y: cint, z: pointer, - a: var csize, b: pointer, c: int): cint {. - importc: "sysctl", nodecl.} proc countProcessors*(): int {.rtl, extern: "ncpi$1".} = - ## returns the numer of the processors/cores the machine has. + ## Returns the number of the processors/cores the machine has. ## Returns 0 if it cannot be detected. - when defined(windows): - var x = getEnv("NUMBER_OF_PROCESSORS") - if x.len > 0: result = parseInt(x.string) - elif defined(macosx) or defined(bsd): - var - mib: array[0..3, cint] - numCPU: int - len: csize - mib[0] = CTL_HW - mib[1] = HW_AVAILCPU - len = sizeof(numCPU) - discard sysctl(addr(mib), 2, addr(numCPU), len, nil, 0) - if numCPU < 1: - mib[1] = HW_NCPU - discard sysctl(addr(mib), 2, addr(numCPU), len, nil, 0) - result = numCPU - elif defined(hpux): - result = mpctl(MPC_GETNUMSPUS, nil, nil) - elif defined(irix): - var SC_NPROC_ONLN {.importc: "_SC_NPROC_ONLN", header: "<unistd.h>".}: cint - result = sysconf(SC_NPROC_ONLN) - else: - result = sysconf(SC_NPROCESSORS_ONLN) - if result <= 0: result = 1 - + countProcessorsImpl() diff --git a/lib/pure/concurrency/cpuload.nim b/lib/pure/concurrency/cpuload.nim index c1796089a..bfbf16721 100644 --- a/lib/pure/concurrency/cpuload.nim +++ b/lib/pure/concurrency/cpuload.nim @@ -9,13 +9,18 @@ ## This module implements a helper for a thread pool to determine whether ## creating a thread is a good idea. +## +## Unstable API. when defined(windows): - import winlean, os, strutils, math + import std/[winlean, os, strutils, math] - proc `-`(a, b: TFILETIME): int64 = a.rdFileTime - b.rdFileTime + proc `-`(a, b: FILETIME): int64 = a.rdFileTime - b.rdFileTime elif defined(linux): - from cpuinfo import countProcessors + from std/cpuinfo import countProcessors + +when defined(nimPreviewSlimSystem): + import std/syncio type ThreadPoolAdvice* = enum @@ -25,16 +30,16 @@ type ThreadPoolState* = object when defined(windows): - prevSysKernel, prevSysUser, prevProcKernel, prevProcUser: TFILETIME + prevSysKernel, prevSysUser, prevProcKernel, prevProcUser: FILETIME calls*: int proc advice*(s: var ThreadPoolState): ThreadPoolAdvice = when defined(windows): var sysIdle, sysKernel, sysUser, - procCreation, procExit, procKernel, procUser: TFILETIME + procCreation, procExit, procKernel, procUser: FILETIME if getSystemTimes(sysIdle, sysKernel, sysUser) == 0 or - getProcessTimes(THandle(-1), procCreation, procExit, + getProcessTimes(Handle(-1), procCreation, procExit, procKernel, procUser) == 0: return doNothing if s.calls > 0: @@ -45,32 +50,35 @@ proc advice*(s: var ThreadPoolState): ThreadPoolAdvice = procKernelDiff = procKernel - s.prevProcKernel procUserDiff = procUser - s.prevProcUser - sysTotal = int(sysKernelDiff + sysUserDiff) - procTotal = int(procKernelDiff + procUserDiff) + sysTotal = sysKernelDiff + sysUserDiff + procTotal = procKernelDiff + procUserDiff # total CPU usage < 85% --> create a new worker thread. # Measurements show that 100% and often even 90% is not reached even # if all my cores are busy. - if sysTotal == 0 or procTotal / sysTotal < 0.85: + if sysTotal == 0 or procTotal.float / sysTotal.float < 0.85: result = doCreateThread s.prevSysKernel = sysKernel s.prevSysUser = sysUser s.prevProcKernel = procKernel s.prevProcUser = procUser elif defined(linux): - proc fscanf(c: File, frmt: cstring) {.varargs, importc, + proc fscanf(c: File, frmt: cstring) {.varargs, importc, header: "<stdio.h>".} - var f = open("/proc/loadavg") - var b: float - var busy, total: int - fscanf(f,"%lf %lf %lf %ld/%ld", - addr b, addr b, addr b, addr busy, addr total) - f.close() - let cpus = countProcessors() - if busy-1 < cpus: - result = doCreateThread - elif busy-1 >= cpus*2: - result = doShutdownThread + var f: File + if f.open("/proc/loadavg"): + var b: float + var busy, total: int + fscanf(f,"%lf %lf %lf %ld/%ld", + addr b, addr b, addr b, addr busy, addr total) + f.close() + let cpus = countProcessors() + if busy-1 < cpus: + result = doCreateThread + elif busy-1 >= cpus*2: + result = doShutdownThread + else: + result = doNothing else: result = doNothing else: @@ -78,10 +86,12 @@ proc advice*(s: var ThreadPoolState): ThreadPoolAdvice = result = doNothing inc s.calls -when isMainModule: +when not defined(testing) and isMainModule and not defined(nimdoc): + import std/random + proc busyLoop() = while true: - discard random(80) + discard rand(80) os.sleep(100) spawn busyLoop() diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim index 9f1e53fb8..06ed2fe54 100644 --- a/lib/pure/concurrency/threadpool.nim +++ b/lib/pure/concurrency/threadpool.nim @@ -7,30 +7,43 @@ # distribution, for details about the copyright. # -## Implements Nim's 'spawn'. +{.deprecated: "use the nimble packages `malebolgia`, `taskpools` or `weave` instead".} + +## Implements Nim's `parallel & spawn statements <manual_experimental.html#parallel-amp-spawn>`_. +## +## Unstable API. +## +## See also +## ======== +## * `threads module <typedthreads.html>`_ for basic thread support +## * `locks module <locks.html>`_ for locks and condition variables +## * `asyncdispatch module <asyncdispatch.html>`_ for asynchronous IO when not compileOption("threads"): {.error: "Threadpool requires --threads:on option.".} -import cpuinfo, cpuload, locks +import std/[cpuinfo, cpuload, locks, os] + +when defined(nimPreviewSlimSystem): + import std/[assertions, typedthreads, sysatomics] {.push stackTrace:off.} type Semaphore = object - c: TCond - L: TLock + c: Cond + L: Lock counter: int -proc createSemaphore(): Semaphore = - initCond(result.c) - initLock(result.L) +proc initSemaphore(cv: var Semaphore) = + initCond(cv.c) + initLock(cv.L) proc destroySemaphore(cv: var Semaphore) {.inline.} = deinitCond(cv.c) deinitLock(cv.L) -proc await(cv: var Semaphore) = +proc blockUntil(cv: var Semaphore) = acquire(cv.L) while cv.counter <= 0: wait(cv.c, cv.L) @@ -43,45 +56,42 @@ proc signal(cv: var Semaphore) = release(cv.L) signal(cv.c) -const CacheLineSize = 32 # true for most archs +const CacheLineSize = 64 # true for most archs type - Barrier {.compilerProc.} = object + Barrier {.compilerproc.} = object entered: int cv: Semaphore # Semaphore takes 3 words at least - when sizeof(int) < 8: - cacheAlign: array[CacheLineSize-4*sizeof(int), byte] - left: int - cacheAlign2: array[CacheLineSize-sizeof(int), byte] - interest: bool ## wether the master is interested in the "all done" event + left {.align(CacheLineSize).}: int + interest {.align(CacheLineSize).} : bool # whether the master is interested in the "all done" event -proc barrierEnter(b: ptr Barrier) {.compilerProc, inline.} = +proc barrierEnter(b: ptr Barrier) {.compilerproc, inline.} = # due to the signaling between threads, it is ensured we are the only # one with access to 'entered' so we don't need 'atomicInc' here: inc b.entered # also we need no 'fence' instructions here as soon 'nimArgsPassingDone' # will be called which already will perform a fence for us. -proc barrierLeave(b: ptr Barrier) {.compilerProc, inline.} = +proc barrierLeave(b: ptr Barrier) {.compilerproc, inline.} = atomicInc b.left when not defined(x86): fence() # We may not have seen the final value of b.entered yet, # so we need to check for >= instead of ==. if b.interest and b.left >= b.entered: signal(b.cv) -proc openBarrier(b: ptr Barrier) {.compilerProc, inline.} = +proc openBarrier(b: ptr Barrier) {.compilerproc, inline.} = b.entered = 0 b.left = 0 b.interest = false -proc closeBarrier(b: ptr Barrier) {.compilerProc.} = +proc closeBarrier(b: ptr Barrier) {.compilerproc.} = fence() if b.left != b.entered: - b.cv = createSemaphore() + b.cv.initSemaphore() fence() b.interest = true fence() - while b.left != b.entered: await(b.cv) + while b.left != b.entered: blockUntil(b.cv) destroySemaphore(b.cv) {.pop.} @@ -89,31 +99,28 @@ proc closeBarrier(b: ptr Barrier) {.compilerProc.} = # ---------------------------------------------------------------------------- type - foreign* = object ## a region that indicates the pointer comes from a - ## foreign thread heap. AwaitInfo = object cv: Semaphore idx: int - FlowVarBase* = ref FlowVarBaseObj ## untyped base class for 'FlowVar[T]' - FlowVarBaseObj = object of RootObj + FlowVarBase* = ref FlowVarBaseObj ## Untyped base class for `FlowVar[T] <#FlowVar>`_. + FlowVarBaseObj {.acyclic.} = object of RootObj ready, usesSemaphore, awaited: bool - cv: Semaphore #\ - # for 'awaitAny' support + cv: Semaphore # for 'blockUntilAny' support ai: ptr AwaitInfo idx: int data: pointer # we incRef and unref it to keep it alive; note this MUST NOT # be RootRef here otherwise the wrong GC keeps track of it! owner: pointer # ptr Worker - FlowVarObj[T] = object of FlowVarBaseObj + FlowVarObj[T] {.acyclic.} = object of FlowVarBaseObj blob: T - FlowVar*{.compilerProc.}[T] = ref FlowVarObj[T] ## a data flow variable + FlowVar*[T] {.compilerproc.} = ref FlowVarObj[T] ## A data flow variable. ToFreeQueue = object len: int - lock: TLock + lock: Lock empty: Semaphore data: array[128, pointer] @@ -128,13 +135,17 @@ type initialized: bool # whether it has even been initialized shutdown: bool # the pool requests to shut down this worker thread q: ToFreeQueue + readyForTask: Semaphore + +const threadpoolWaitMs {.intdefine.}: int = 100 -proc await*(fv: FlowVarBase) = - ## waits until the value for the flowVar arrives. Usually it is not necessary - ## to call this explicitly. +proc blockUntil*(fv: var FlowVarBaseObj) = + ## Waits until the value for `fv` arrives. + ## + ## Usually it is not necessary to call this explicitly. if fv.usesSemaphore and not fv.awaited: fv.awaited = true - await(fv.cv) + blockUntil(fv.cv) destroySemaphore(fv.cv) proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool = @@ -142,13 +153,13 @@ proc selectWorker(w: ptr Worker; fn: WorkerProc; data: pointer): bool = w.data = data w.f = fn signal(w.taskArrived) - await(w.taskStarted) + blockUntil(w.taskStarted) result = true proc cleanFlowVars(w: ptr Worker) = let q = addr(w.q) acquire(q.lock) - for i in 0 .. <q.len: + for i in 0 ..< q.len: GC_unref(cast[RootRef](q.data[i])) #echo "GC_unref" q.len = 0 @@ -167,12 +178,21 @@ proc wakeupWorkerToProcessQueue(w: ptr Worker) = signal(w.q.empty) signal(w.taskArrived) -proc finished(fv: FlowVarBase) = - doAssert fv.ai.isNil, "flowVar is still attached to an 'awaitAny'" +proc attach(fv: FlowVarBase; i: int): bool = + acquire(fv.cv.L) + if fv.cv.counter <= 0: + fv.idx = i + result = true + else: + result = false + release(fv.cv.L) + +proc finished(fv: var FlowVarBaseObj) = + doAssert fv.ai.isNil, "flowVar is still attached to an 'blockUntilAny'" # we have to protect against the rare cases where the owner of the flowVar # simply disregards the flowVar and yet the "flowVar" has not yet written # anything to it: - await(fv) + blockUntil(fv) if fv.data.isNil: return let owner = cast[ptr Worker](fv.owner) let q = addr(owner.q) @@ -181,23 +201,27 @@ proc finished(fv: FlowVarBase) = #echo "EXHAUSTED!" release(q.lock) wakeupWorkerToProcessQueue(owner) - await(q.empty) + blockUntil(q.empty) acquire(q.lock) q.data[q.len] = cast[pointer](fv.data) inc q.len release(q.lock) fv.data = nil + # the worker thread waits for "data" to be set to nil before shutting down + owner.data = nil -proc fvFinalizer[T](fv: FlowVar[T]) = finished(fv) +proc `=destroy`[T](fv: var FlowVarObj[T]) = + finished(fv) + `=destroy`(fv.blob) -proc nimCreateFlowVar[T](): FlowVar[T] {.compilerProc.} = - new(result, fvFinalizer) +proc nimCreateFlowVar[T](): FlowVar[T] {.compilerproc.} = + new(result) -proc nimFlowVarCreateSemaphore(fv: FlowVarBase) {.compilerProc.} = - fv.cv = createSemaphore() +proc nimFlowVarCreateSemaphore(fv: FlowVarBase) {.compilerproc.} = + fv.cv.initSemaphore() fv.usesSemaphore = true -proc nimFlowVarSignal(fv: FlowVarBase) {.compilerProc.} = +proc nimFlowVarSignal(fv: FlowVarBase) {.compilerproc.} = if fv.ai != nil: acquire(fv.ai.cv.L) fv.ai.idx = fv.idx @@ -208,169 +232,295 @@ proc nimFlowVarSignal(fv: FlowVarBase) {.compilerProc.} = signal(fv.cv) proc awaitAndThen*[T](fv: FlowVar[T]; action: proc (x: T) {.closure.}) = - ## blocks until the ``fv`` is available and then passes its value - ## to ``action``. Note that due to Nim's parameter passing semantics this - ## means that ``T`` doesn't need to be copied and so ``awaitAndThen`` can - ## sometimes be more efficient than ``^``. - await(fv) - when T is string or T is seq: + ## Blocks until `fv` is available and then passes its value + ## to `action`. + ## + ## Note that due to Nim's parameter passing semantics, this + ## means that `T` doesn't need to be copied, so `awaitAndThen` can + ## sometimes be more efficient than the `^ proc <#^,FlowVar[T]>`_. + blockUntil(fv[]) + when defined(nimV2): + action(fv.blob) + elif T is string or T is seq: action(cast[T](fv.data)) elif T is ref: {.error: "'awaitAndThen' not available for FlowVar[ref]".} else: action(fv.blob) - finished(fv) + finished(fv[]) -proc `^`*[T](fv: FlowVar[ref T]): foreign ptr T = - ## blocks until the value is available and then returns this value. - await(fv) - result = cast[foreign ptr T](fv.data) +proc unsafeRead*[T](fv: FlowVar[ref T]): ptr T = + ## Blocks until the value is available and then returns this value. + blockUntil(fv[]) + when defined(nimV2): + result = cast[ptr T](fv.blob) + else: + result = cast[ptr T](fv.data) + finished(fv[]) proc `^`*[T](fv: FlowVar[T]): T = - ## blocks until the value is available and then returns this value. - await(fv) - when T is string or T is seq: - # XXX closures? deepCopy? - result = cast[T](fv.data) + ## Blocks until the value is available and then returns this value. + blockUntil(fv[]) + when not defined(nimV2) and (T is string or T is seq or T is ref): + deepCopy result, cast[T](fv.data) else: result = fv.blob - -proc awaitAny*(flowVars: openArray[FlowVarBase]): int = - ## awaits any of the given flowVars. Returns the index of one flowVar for - ## which a value arrived. A flowVar only supports one call to 'awaitAny' at - ## the same time. That means if you await([a,b]) and await([b,c]) the second - ## call will only await 'c'. If there is no flowVar left to be able to wait - ## on, -1 is returned. - ## **Note**: This results in non-deterministic behaviour and so should be - ## avoided. + finished(fv[]) + +proc blockUntilAny*(flowVars: openArray[FlowVarBase]): int = + ## Awaits any of the given `flowVars`. Returns the index of one `flowVar` + ## for which a value arrived. + ## + ## A `flowVar` only supports one call to `blockUntilAny` at the same time. + ## That means if you `blockUntilAny([a,b])` and `blockUntilAny([b,c])` + ## the second call will only block until `c`. If there is no `flowVar` left + ## to be able to wait on, -1 is returned. + ## + ## **Note:** This results in non-deterministic behaviour and should be avoided. var ai: AwaitInfo - ai.cv = createSemaphore() + ai.cv.initSemaphore() var conflicts = 0 + result = -1 for i in 0 .. flowVars.high: if cas(addr flowVars[i].ai, nil, addr ai): - flowVars[i].idx = i + if not attach(flowVars[i], i): + result = i + break else: inc conflicts if conflicts < flowVars.len: - await(ai.cv) - result = ai.idx + if result < 0: + blockUntil(ai.cv) + result = ai.idx for i in 0 .. flowVars.high: discard cas(addr flowVars[i].ai, addr ai, nil) - else: - result = -1 destroySemaphore(ai.cv) -proc nimArgsPassingDone(p: pointer) {.compilerProc.} = +proc isReady*(fv: FlowVarBase): bool = + ## Determines whether the specified `FlowVarBase`'s value is available. + ## + ## If `true`, awaiting `fv` will not block. + if fv.usesSemaphore and not fv.awaited: + acquire(fv.cv.L) + result = fv.cv.counter > 0 + release(fv.cv.L) + else: + result = true + +proc nimArgsPassingDone(p: pointer) {.compilerproc.} = let w = cast[ptr Worker](p) signal(w.taskStarted) const - MaxThreadPoolSize* = 256 ## maximal size of the thread pool. 256 threads - ## should be good enough for anybody ;-) + MaxThreadPoolSize* {.intdefine.} = 256 ## Maximum size of the thread pool. 256 threads + ## should be good enough for anybody ;-) + MaxDistinguishedThread* {.intdefine.} = 32 ## Maximum number of "distinguished" threads. + +type + ThreadId* = range[0..MaxDistinguishedThread-1] ## A thread identifier. var currentPoolSize: int maxPoolSize = MaxThreadPoolSize minPoolSize = 4 - gSomeReady = createSemaphore() + gSomeReady: Semaphore readyWorker: ptr Worker +# A workaround for recursion deadlock issue +# https://github.com/nim-lang/Nim/issues/4597 +var + numSlavesLock: Lock + numSlavesRunning {.guard: numSlavesLock.}: int + numSlavesWaiting {.guard: numSlavesLock.}: int + isSlave {.threadvar.}: bool + +numSlavesLock.initLock + +gSomeReady.initSemaphore() + proc slave(w: ptr Worker) {.thread.} = + isSlave = true while true: + if w.shutdown: + w.shutdown = false + atomicDec currentPoolSize + while true: + if w.data != nil: + sleep(threadpoolWaitMs) + else: + # The flowvar finalizer ("finished()") set w.data to nil, so we can + # safely terminate the thread. + # + # TODO: look for scenarios in which the flowvar is never finalized, so + # a shut down thread gets stuck in this loop until the main thread exits. + break + break when declared(atomicStoreN): atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST) else: w.ready = true readyWorker = w signal(gSomeReady) - await(w.taskArrived) + blockUntil(w.taskArrived) + # XXX Somebody needs to look into this (why does this assertion fail + # in Visual Studio?) + when not defined(vcc) and not defined(tcc): assert(not w.ready) + + withLock numSlavesLock: + inc numSlavesRunning + + w.f(w, w.data) + + withLock numSlavesLock: + dec numSlavesRunning + + if w.q.len != 0: w.cleanFlowVars + +proc distinguishedSlave(w: ptr Worker) {.thread.} = + while true: + when declared(atomicStoreN): + atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST) + else: + w.ready = true + signal(w.readyForTask) + blockUntil(w.taskArrived) assert(not w.ready) w.f(w, w.data) if w.q.len != 0: w.cleanFlowVars - if w.shutdown: - w.shutdown = false - atomicDec currentPoolSize var - workers: array[MaxThreadPoolSize, TThread[ptr Worker]] + workers: array[MaxThreadPoolSize, Thread[ptr Worker]] workersData: array[MaxThreadPoolSize, Worker] + distinguished: array[MaxDistinguishedThread, Thread[ptr Worker]] + distinguishedData: array[MaxDistinguishedThread, Worker] + +when defined(nimPinToCpu): + var gCpus: Natural + proc setMinPoolSize*(size: range[1..MaxThreadPoolSize]) = - ## sets the minimal thread pool size. The default value of this is 4. + ## Sets the minimum thread pool size. The default value of this is 4. minPoolSize = size proc setMaxPoolSize*(size: range[1..MaxThreadPoolSize]) = - ## sets the minimal thread pool size. The default value of this - ## is ``MaxThreadPoolSize``. + ## Sets the maximum thread pool size. The default value of this + ## is `MaxThreadPoolSize <#MaxThreadPoolSize>`_. maxPoolSize = size if currentPoolSize > maxPoolSize: for i in maxPoolSize..currentPoolSize-1: let w = addr(workersData[i]) w.shutdown = true -proc activateThread(i: int) {.noinline.} = - workersData[i].taskArrived = createSemaphore() - workersData[i].taskStarted = createSemaphore() +when defined(nimRecursiveSpawn): + var localThreadId {.threadvar.}: int + +proc activateWorkerThread(i: int) {.noinline.} = + workersData[i].taskArrived.initSemaphore() + workersData[i].taskStarted.initSemaphore() workersData[i].initialized = true - workersData[i].q.empty = createSemaphore() + workersData[i].q.empty.initSemaphore() initLock(workersData[i].q.lock) createThread(workers[i], slave, addr(workersData[i])) + when defined(nimRecursiveSpawn): + localThreadId = i+1 + when defined(nimPinToCpu): + if gCpus > 0: pinToCpu(workers[i], i mod gCpus) + +proc activateDistinguishedThread(i: int) {.noinline.} = + distinguishedData[i].taskArrived.initSemaphore() + distinguishedData[i].taskStarted.initSemaphore() + distinguishedData[i].initialized = true + distinguishedData[i].q.empty.initSemaphore() + initLock(distinguishedData[i].q.lock) + distinguishedData[i].readyForTask.initSemaphore() + createThread(distinguished[i], distinguishedSlave, addr(distinguishedData[i])) proc setup() = - currentPoolSize = min(countProcessors(), MaxThreadPoolSize) + let p = countProcessors() + when defined(nimPinToCpu): + gCpus = p + currentPoolSize = min(p, MaxThreadPoolSize) readyWorker = addr(workersData[0]) - for i in 0.. <currentPoolSize: activateThread(i) + for i in 0..<currentPoolSize: activateWorkerThread(i) proc preferSpawn*(): bool = - ## Use this proc to determine quickly if a 'spawn' or a direct call is - ## preferable. If it returns 'true' a 'spawn' may make sense. In general - ## it is not necessary to call this directly; use 'spawnX' instead. + ## Use this proc to determine quickly if a `spawn` or a direct call is + ## preferable. + ## + ## If it returns `true`, a `spawn` may make sense. In general + ## it is not necessary to call this directly; use the `spawnX template + ## <#spawnX.t>`_ instead. result = gSomeReady.counter > 0 -proc spawn*(call: expr): expr {.magic: "Spawn".} - ## always spawns a new task, so that the 'call' is never executed on - ## the calling thread. 'call' has to be proc call 'p(...)' where 'p' - ## is gcsafe and has a return type that is either 'void' or compatible - ## with ``FlowVar[T]``. - -template spawnX*(call: expr): expr = - ## spawns a new task if a CPU core is ready, otherwise executes the - ## call in the calling thread. Usually it is advised to - ## use 'spawn' in order to not block the producer for an unknown - ## amount of time. 'call' has to be proc call 'p(...)' where 'p' - ## is gcsafe and has a return type that is either 'void' or compatible - ## with ``FlowVar[T]``. +proc spawn*(call: sink typed) {.magic: "Spawn".} = + ## Always spawns a new task, so that the `call` is never executed on + ## the calling thread. + ## + ## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a + ## return type that is either `void` or compatible with `FlowVar[T]`. + discard "It uses `nimSpawn3` internally" + +proc pinnedSpawn*(id: ThreadId; call: sink typed) {.magic: "Spawn".} = + ## Always spawns a new task on the worker thread with `id`, so that + ## the `call` is **always** executed on the thread. + ## + ## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a + ## return type that is either `void` or compatible with `FlowVar[T]`. + discard "It uses `nimSpawn4` internally" + +template spawnX*(call) = + ## Spawns a new task if a CPU core is ready, otherwise executes the + ## call in the calling thread. + ## + ## Usually, it is advised to use the `spawn proc <#spawn,sinktyped>`_ + ## in order to not block the producer for an unknown amount of time. + ## + ## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a + ## return type that is either 'void' or compatible with `FlowVar[T]`. (if preferSpawn(): spawn call else: call) -proc parallel*(body: stmt) {.magic: "Parallel".} - ## a parallel section can be used to execute a block in parallel. ``body`` - ## has to be in a DSL that is a particular subset of the language. Please - ## refer to the manual for further information. +proc parallel*(body: untyped) {.magic: "Parallel".} + ## A parallel section can be used to execute a block in parallel. + ## + ## `body` has to be in a DSL that is a particular subset of the language. + ## + ## Please refer to `the manual <manual_experimental.html#parallel-amp-spawn>`_ + ## for further information. var state: ThreadPoolState - stateLock: TLock + stateLock: Lock initLock stateLock -proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} = +proc nimSpawn3(fn: WorkerProc; data: pointer) {.compilerproc.} = # implementation of 'spawn' that is used by the code generator. while true: if selectWorker(readyWorker, fn, data): return - for i in 0.. <currentPoolSize: + for i in 0..<currentPoolSize: if selectWorker(addr(workersData[i]), fn, data): return + # determine what to do, but keep in mind this is expensive too: # state.calls < maxPoolSize: warmup phase # (state.calls and 127) == 0: periodic check if state.calls < maxPoolSize or (state.calls and 127) == 0: # ensure the call to 'advice' is atomic: if tryAcquire(stateLock): + if currentPoolSize < minPoolSize: + if not workersData[currentPoolSize].initialized: + activateWorkerThread(currentPoolSize) + let w = addr(workersData[currentPoolSize]) + atomicInc currentPoolSize + if selectWorker(w, fn, data): + release(stateLock) + return + case advice(state) of doNothing: discard of doCreateThread: if currentPoolSize < maxPoolSize: if not workersData[currentPoolSize].initialized: - activateThread(currentPoolSize) + activateWorkerThread(currentPoolSize) let w = addr(workersData[currentPoolSize]) atomicInc currentPoolSize if selectWorker(w, fn, data): @@ -385,17 +535,72 @@ proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} = release(stateLock) # else the acquire failed, but this means some # other thread succeeded, so we don't need to do anything here. - await(gSomeReady) + when defined(nimRecursiveSpawn): + if localThreadId > 0: + # we are a worker thread, so instead of waiting for something which + # might as well never happen (see tparallel_quicksort), we run the task + # on the current thread instead. + var self = addr(workersData[localThreadId-1]) + fn(self, data) + blockUntil(self.taskStarted) + return + + if isSlave: + # Run under lock until `numSlavesWaiting` increment to avoid a + # race (otherwise two last threads might start waiting together) + withLock numSlavesLock: + if numSlavesRunning <= numSlavesWaiting + 1: + # All the other slaves are waiting + # If we wait now, we-re deadlocked until + # an external spawn happens ! + if currentPoolSize < maxPoolSize: + if not workersData[currentPoolSize].initialized: + activateWorkerThread(currentPoolSize) + let w = addr(workersData[currentPoolSize]) + atomicInc currentPoolSize + if selectWorker(w, fn, data): + return + else: + # There is no place in the pool. We're deadlocked. + # echo "Deadlock!" + discard + + inc numSlavesWaiting + + blockUntil(gSomeReady) + + if isSlave: + withLock numSlavesLock: + dec numSlavesWaiting + +var + distinguishedLock: Lock + +initLock distinguishedLock + +proc nimSpawn4(fn: WorkerProc; data: pointer; id: ThreadId) {.compilerproc.} = + acquire(distinguishedLock) + if not distinguishedData[id].initialized: + activateDistinguishedThread(id) + release(distinguishedLock) + while true: + if selectWorker(addr(distinguishedData[id]), fn, data): break + blockUntil(distinguishedData[id].readyForTask) + proc sync*() = - ## a simple barrier to wait for all spawn'ed tasks. If you need more elaborate - ## waiting, you have to use an explicit barrier. + ## A simple barrier to wait for all `spawn`ed tasks. + ## + ## If you need more elaborate waiting, you have to use an explicit barrier. while true: var allReady = true - for i in 0 .. <currentPoolSize: + for i in 0 ..< currentPoolSize: if not allReady: break allReady = allReady and workersData[i].ready if allReady: break - await(gSomeReady) + sleep(threadpoolWaitMs) + # We cannot "blockUntil(gSomeReady)" because workers may be shut down between + # the time we establish that some are not "ready" and the time we wait for a + # "signal(gSomeReady)" from inside "slave()" that can never come. setup() |