diff options
author | flywind <xzsflywind@gmail.com> | 2021-03-12 20:33:47 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-03-12 13:33:47 +0100 |
commit | a0b8a3d920e7d869523b1182e846ae333fcb8844 (patch) | |
tree | 45f5e43f6fce4e8c62352938a13d5949fd15799a /lib/std | |
parent | 2e730f1452254ae7c86d89b167f0fe9e002f57b4 (diff) | |
download | Nim-a0b8a3d920e7d869523b1182e846ae333fcb8844.tar.gz |
New channels implementation for ORC (#17305)
* Update lib/std/channels.nim * Rename tchannel_pthread.nim to tchannels_pthread.nim * Rename tchannel_simple.nim to tchannels_simple.nim Co-authored-by: Mamy Ratsimbazafy <mamy_github@numforge.co>
Diffstat (limited to 'lib/std')
-rw-r--r-- | lib/std/channels.nim | 510 |
1 files changed, 510 insertions, 0 deletions
diff --git a/lib/std/channels.nim b/lib/std/channels.nim new file mode 100644 index 000000000..a212af0d3 --- /dev/null +++ b/lib/std/channels.nim @@ -0,0 +1,510 @@ +# +# +# The Nim Compiler +# (c) Copyright 2021 Andreas Prell, Mamy André-Ratsimbazafy & Nim Contributors +# +# See the file "copying.txt", included in this +# distribution, for details about the copyright. +# + + +# Based on https://github.com/mratsim/weave/blob/5696d94e6358711e840f8c0b7c684fcc5cbd4472/unused/channels/channels_legacy.nim +# Those are translations of @aprell (Andreas Prell) original channels from C to Nim +# (https://github.com/aprell/tasking-2.0/blob/master/src/channel_shm/channel.c) +# And in turn they are an implementation of Michael & Scott lock-based queues +# (note the paper has 2 channels: lock-free and lock-based) with additional caching: +# Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms +# Maged M. Michael, Michael L. Scott, 1996 +# https://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf + +## This module only works with `--gc:arc` or `--gc:orc`. +## +## .. warning:: This module is experimental and its interface may change. +## +## The following is a simple example of two different ways to use channels: +## blocking and non-blocking. +## + +runnableExamples("--threads:on --gc:orc"): + import std/os + + # In this example a channel is declared at module scope. + # Channels are generic, and they include support for passing objects between + # threads. + # Note that isolated data passed through channels is moved around. + var chan = newChannel[string]() + + # This proc will be run in another thread using the threads module. + proc firstWorker() = + chan.send("Hello World!") + + # This is another proc to run in a background thread. This proc takes a while + # to send the message since it sleeps for 2 seconds (or 2000 milliseconds). + proc secondWorker() = + sleep(2000) + chan.send("Another message") + + # Launch the worker. + var worker1: Thread[void] + createThread(worker1, firstWorker) + + # Block until the message arrives, then print it out. + var dest = "" + chan.recv(dest) + assert dest == "Hello World!" + + # Wait for the thread to exit before moving on to the next example. + worker1.joinThread() + + # Launch the other worker. + var worker2: Thread[void] + createThread(worker2, secondWorker) + # This time, use a non-blocking approach with tryRecv. + # Since the main thread is not blocked, it could be used to perform other + # useful work while it waits for data to arrive on the channel. + var messages: seq[string] + while true: + var msg = "" + if chan.tryRecv(msg): + messages.add msg # "Another message" + break + + messages.add "Pretend I'm doing useful work..." + # For this example, sleep in order not to flood stdout with the above + # message. + sleep(400) + + # Wait for the second thread to exit before cleaning up the channel. + worker2.joinThread() + + # Clean up the channel. + assert chan.close() + + assert messages[^1] == "Another message" + assert messages.len >= 2 + + +when not defined(gcArc) and not defined(gcOrc) and not defined(nimdoc): + {.error: "This channel implementation requires --gc:arc or --gc:orc".} + +import std/[locks, atomics, isolation] +import system/ansi_c + +# Channel (Shared memory channels) +# ---------------------------------------------------------------------------------- + +const + cacheLineSize {.intdefine.} = 64 # TODO: some Samsung phone have 128 cache-line + nimChannelCacheSize* {.intdefine.} = 100 + +type + ChannelRaw = ptr ChannelObj + ChannelObj = object + headLock, tailLock: Lock + notFullCond, notEmptyCond: Cond + closed: Atomic[bool] + size: int + itemsize: int # up to itemsize bytes can be exchanged over this channel + head {.align: cacheLineSize.} : int # Items are taken from head and new items are inserted at tail + tail: int + buffer: ptr UncheckedArray[byte] + atomicCounter: Atomic[int] + + ChannelCache = ptr ChannelCacheObj + ChannelCacheObj = object + next: ChannelCache + chanSize: int + chanN: int + numCached: int + cache: array[nimChannelCacheSize, ChannelRaw] + +# ---------------------------------------------------------------------------------- + +proc numItems(chan: ChannelRaw): int {.inline.} = + result = chan.tail - chan.head + if result < 0: + inc(result, 2 * chan.size) + + assert result <= chan.size + +template isFull(chan: ChannelRaw): bool = + abs(chan.tail - chan.head) == chan.size + +template isEmpty(chan: ChannelRaw): bool = + chan.head == chan.tail + +# Unbuffered / synchronous channels +# ---------------------------------------------------------------------------------- + +template numItemsUnbuf(chan: ChannelRaw): int = + chan.head + +template isFullUnbuf(chan: ChannelRaw): bool = + chan.head == 1 + +template isEmptyUnbuf(chan: ChannelRaw): bool = + chan.head == 0 + +# ChannelRaw kinds +# ---------------------------------------------------------------------------------- + +func isUnbuffered(chan: ChannelRaw): bool = + chan.size - 1 == 0 + +# ChannelRaw status and properties +# ---------------------------------------------------------------------------------- + +proc isClosed(chan: ChannelRaw): bool {.inline.} = load(chan.closed, moRelaxed) + +proc peek(chan: ChannelRaw): int {.inline.} = + (if chan.isUnbuffered: numItemsUnbuf(chan) else: numItems(chan)) + +# Per-thread channel cache +# ---------------------------------------------------------------------------------- + +var channelCache {.threadvar.}: ChannelCache +var channelCacheLen {.threadvar.}: int + +proc allocChannelCache(size, n: int): bool = + ## Allocate a free list for storing channels of a given type + var p = channelCache + + # Avoid multiple free lists for the exact same type of channel + while not p.isNil: + if size == p.chanSize and n == p.chanN: + return false + p = p.next + + p = cast[ptr ChannelCacheObj](c_malloc(csize_t sizeof(ChannelCacheObj))) + if p.isNil: + raise newException(OutOfMemDefect, "Could not allocate memory") + + p.chanSize = size + p.chanN = n + p.numCached = 0 + + p.next = channelCache + channelCache = p + inc channelCacheLen + result = true + +proc freeChannelCache*() = + ## Frees the entire channel cache, including all channels + var p = channelCache + var q: ChannelCache + + while not p.isNil: + q = p.next + for i in 0 ..< p.numCached: + let chan = p.cache[i] + if not chan.buffer.isNil: + c_free(chan.buffer) + deinitLock(chan.headLock) + deinitLock(chan.tailLock) + deinitCond(chan.notFullCond) + deinitCond(chan.notEmptyCond) + c_free(chan) + c_free(p) + dec channelCacheLen + p = q + + assert(channelCacheLen == 0) + channelCache = nil + +# Channels memory ops +# ---------------------------------------------------------------------------------- + +proc allocChannel(size, n: int): ChannelRaw = + when nimChannelCacheSize > 0: + var p = channelCache + + while not p.isNil: + if size == p.chanSize and n == p.chanN: + # Check if free list contains channel + if p.numCached > 0: + dec p.numCached + result = p.cache[p.numCached] + assert(result.isEmpty) + return + else: + # All the other lists in cache won't match + break + p = p.next + + result = cast[ChannelRaw](c_malloc(csize_t sizeof(ChannelObj))) + if result.isNil: + raise newException(OutOfMemDefect, "Could not allocate memory") + + # To buffer n items, we allocate for n + result.buffer = cast[ptr UncheckedArray[byte]](c_malloc(csize_t n*size)) + if result.buffer.isNil: + raise newException(OutOfMemDefect, "Could not allocate memory") + + initLock(result.headLock) + initLock(result.tailLock) + initCond(result.notFullCond) + initCond(result.notEmptyCond) + + result.closed.store(false, moRelaxed) # We don't need atomic here, how to? + result.size = n + result.itemsize = size + result.head = 0 + result.tail = 0 + result.atomicCounter.store(0, moRelaxed) + + when nimChannelCacheSize > 0: + # Allocate a cache as well if one of the proper size doesn't exist + discard allocChannelCache(size, n) + +proc freeChannel(chan: ChannelRaw) = + if chan.isNil: + return + + when nimChannelCacheSize > 0: + var p = channelCache + while not p.isNil: + if chan.itemsize == p.chanSize and + chan.size == p.chanN: + if p.numCached < nimChannelCacheSize: + # If space left in cache, cache it + p.cache[p.numCached] = chan + inc p.numCached + return + else: + # All the other lists in cache won't match + break + p = p.next + + if not chan.buffer.isNil: + c_free(chan.buffer) + + deinitLock(chan.headLock) + deinitLock(chan.tailLock) + deinitCond(chan.notFullCond) + deinitCond(chan.notEmptyCond) + + c_free(chan) + +# MPMC Channels (Multi-Producer Multi-Consumer) +# ---------------------------------------------------------------------------------- + +proc sendUnbufferedMpmc(chan: ChannelRaw, data: sink pointer, size: int, nonBlocking: bool): bool = + if nonBlocking and chan.isFullUnbuf: + return false + + acquire(chan.headLock) + + if nonBlocking and chan.isFullUnbuf: + # Another thread was faster + release(chan.headLock) + return false + + while chan.isFullUnbuf: + wait(chan.notFullcond, chan.headLock) + + assert chan.isEmptyUnbuf + assert size <= chan.itemsize + copyMem(chan.buffer, data, size) + + chan.head = 1 + + release(chan.headLock) + signal(chan.notEmptyCond) + result = true + +proc sendMpmc(chan: ChannelRaw, data: sink pointer, size: int, nonBlocking: bool): bool = + assert not chan.isNil + assert not data.isNil + + if isUnbuffered(chan): + return sendUnbufferedMpmc(chan, data, size, nonBlocking) + + if nonBlocking and chan.isFull: + return false + + acquire(chan.tailLock) + + if nonBlocking and chan.isFull: + # Another thread was faster + release(chan.tailLock) + return false + + while chan.isFull: + wait(chan.notFullcond, chan.tailLock) + + assert not chan.isFull + assert size <= chan.itemsize + + let writeIdx = if chan.tail < chan.size: chan.tail + else: chan.tail - chan.size + + copyMem(chan.buffer[writeIdx * chan.itemsize].addr, data, size) + + inc chan.tail + if chan.tail == 2 * chan.size: + chan.tail = 0 + + release(chan.tailLock) + signal(chan.notEmptyCond) + result = true + +proc recvUnbufferedMpmc(chan: ChannelRaw, data: pointer, size: int, nonBlocking: bool): bool = + if nonBlocking and chan.isEmptyUnbuf: + return false + + acquire(chan.headLock) + + if nonBlocking and chan.isEmptyUnbuf: + # Another thread was faster + release(chan.headLock) + return false + + while chan.isEmptyUnbuf: + wait(chan.notEmptyCond, chan.headLock) + + assert chan.isFullUnbuf + assert size <= chan.itemsize + + copyMem(data, chan.buffer, size) + + chan.head = 0 + + release(chan.headLock) + signal(chan.notFullCond) + result = true + +proc recvMpmc(chan: ChannelRaw, data: pointer, size: int, nonBlocking: bool): bool = + assert not chan.isNil + assert not data.isNil + + if isUnbuffered(chan): + return recvUnbufferedMpmc(chan, data, size, nonBlocking) + + if nonBlocking and chan.isEmpty: + return false + + acquire(chan.headLock) + + if nonBlocking and chan.isEmpty: + # Another thread took the last data + release(chan.headLock) + return false + + while chan.isEmpty: + wait(chan.notEmptyCond, chan.headLock) + + assert not chan.isEmpty + assert size <= chan.itemsize + + let readIdx = if chan.head < chan.size: chan.head + else: chan.head - chan.size + + copyMem(data, chan.buffer[readIdx * chan.itemsize].addr, size) + + inc chan.head + if chan.head == 2 * chan.size: + chan.head = 0 + + release(chan.headLock) + signal(chan.notFullCond) + result = true + +proc channelCloseMpmc(chan: ChannelRaw): bool = + # Unsynchronized + + if chan.isClosed: + # ChannelRaw already closed + return false + + store(chan.closed, true, moRelaxed) + result = true + +proc channelOpenMpmc(chan: ChannelRaw): bool = + # Unsynchronized + + if not chan.isClosed: + # ChannelRaw already open + return false + + store(chan.closed, false, moRelaxed) + result = true + +# Public API +# ---------------------------------------------------------------------------------- + +type + Channel*[T] = object ## Typed channels + d: ChannelRaw + +proc `=destroy`*[T](c: var Channel[T]) = + if c.d != nil: + if load(c.d.atomicCounter, moAcquire) == 0: + if c.d.buffer != nil: + freeChannel(c.d) + else: + atomicDec(c.d.atomicCounter) + +proc `=`*[T](dest: var Channel[T], src: Channel[T]) = + ## Shares `Channel` by reference counting. + if src.d != nil: + atomicInc(src.d.atomicCounter) + + if dest.d != nil: + `=destroy`(dest) + dest.d = src.d + +proc channelSend[T](chan: Channel[T], data: sink T, size: int, nonBlocking: bool): bool {.inline.} = + ## Send item to the channel (FIFO queue) + ## (Insert at last) + sendMpmc(chan.d, data.unsafeAddr, size, nonBlocking) + +proc channelReceive[T](chan: Channel[T], data: ptr T, size: int, nonBlocking: bool): bool {.inline.} = + ## Receive an item from the channel + ## (Remove the first item) + recvMpmc(chan.d, data, size, nonBlocking) + +func trySend*[T](c: Channel[T], src: var Isolated[T]): bool {.inline.} = + ## Sends item to the channel(non blocking). + var data = src.extract + result = channelSend(c, data, sizeof(data), true) + if result: + wasMoved(data) + +template trySend*[T](c: Channel[T], src: T): bool = + ## Helper templates for `trySend`. + trySend(c, isolate(src)) + +func tryRecv*[T](c: Channel[T], dst: var T): bool {.inline.} = + ## Receives item from the channel(non blocking). + channelReceive(c, dst.addr, sizeof(dst), true) + +func send*[T](c: Channel[T], src: sink Isolated[T]) {.inline.} = + ## Sends item to the channel(blocking). + var data = src.extract + discard channelSend(c, data, sizeof(data), false) + wasMoved(data) + +template send*[T](c: var Channel[T]; src: T) = + ## Helper templates for `send`. + send(c, isolate(src)) + +func recv*[T](c: Channel[T], dst: var T) {.inline.} = + ## Receives item from the channel(blocking). + discard channelReceive(c, dst.addr, sizeof(dst), false) + +func recvIso*[T](c: Channel[T]): Isolated[T] {.inline.} = + var dst: T + discard channelReceive(c, dst.addr, sizeof(dst), false) + result = isolate(dst) + +func open*[T](c: Channel[T]): bool {.inline.} = + result = c.d.channelOpenMpmc() + +func close*[T](c: Channel[T]): bool {.inline.} = + result = c.d.channelCloseMpmc() + +func peek*[T](c: Channel[T]): int {.inline.} = peek(c.d) + +proc newChannel*[T](elements = 30): Channel[T] = + assert elements >= 1, "Elements must be positive!" + result = Channel[T](d: allocChannel(sizeof(T), elements)) |