diff options
author | flywind <xzsflywind@gmail.com> | 2021-03-12 20:33:47 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-03-12 13:33:47 +0100 |
commit | a0b8a3d920e7d869523b1182e846ae333fcb8844 (patch) | |
tree | 45f5e43f6fce4e8c62352938a13d5949fd15799a /tests | |
parent | 2e730f1452254ae7c86d89b167f0fe9e002f57b4 (diff) | |
download | Nim-a0b8a3d920e7d869523b1182e846ae333fcb8844.tar.gz |
New channels implementation for ORC (#17305)
* Update lib/std/channels.nim * Rename tchannel_pthread.nim to tchannels_pthread.nim * Rename tchannel_simple.nim to tchannels_simple.nim Co-authored-by: Mamy Ratsimbazafy <mamy_github@numforge.co>
Diffstat (limited to 'tests')
-rw-r--r-- | tests/stdlib/tchannels_pthread.nim | 321 | ||||
-rw-r--r-- | tests/stdlib/tchannels_simple.nim | 68 |
2 files changed, 389 insertions, 0 deletions
diff --git a/tests/stdlib/tchannels_pthread.nim b/tests/stdlib/tchannels_pthread.nim new file mode 100644 index 000000000..a9708d8fb --- /dev/null +++ b/tests/stdlib/tchannels_pthread.nim @@ -0,0 +1,321 @@ +discard """ + targets: "c cpp" + matrix: "--gc:orc --threads:on; --gc:orc --threads:on -d:blockingTest" + disabled: "windows" + disabled: "bsd" + disabled: "osx" +""" + +include std/channels + +import std/unittest + + +type + ChannelBufKind = enum + Unbuffered # Unbuffered (blocking) channel + Buffered # Buffered (non-blocking channel) + + +proc capacity(chan: ChannelRaw): int {.inline.} = chan.size +func isBuffered(chan: ChannelRaw): bool = + chan.size - 1 > 0 + +when defined(blockingTest): + const nonBlocking = false +else: + const nonBlocking = true + +type + Pthread {.importc: "pthread_t", header: "<sys/types.h>".} = distinct culong + PthreadAttr* {.byref, importc: "pthread_attr_t", header: "<sys/types.h>".} = object + Errno* = distinct cint + +proc pthread_create[T]( + thread: var Pthread, + attr: ptr PthreadAttr, # In Nim this is a var and how Nim sets a custom stack + fn: proc (x: ptr T): pointer {.thread, noconv.}, + arg: ptr T + ): Errno {.header: "<sys/types.h>".} + +proc pthread_join( + thread: Pthread, + thread_exit_status: ptr pointer + ): Errno {.header: "<pthread.h>".} + +template channel_send_loop(chan: ChannelRaw, + data: sink pointer, + size: int, + body: untyped): untyped = + while not sendMpmc(chan, data, size, nonBlocking): + body + +template channel_receive_loop(chan: ChannelRaw, + data: pointer, + size: int, + body: untyped): untyped = + while not recvMpmc(chan, data, size, nonBlocking): + body + + +# Without threads:on or release, +# worker threads will crash on popFrame + +import std/unittest + +type ThreadArgs = object + ID: int + chan: ChannelRaw + +template Worker(id: int, body: untyped): untyped {.dirty.} = + if args.ID == id: + body + + +const Sender = 1 +const Receiver = 0 + +proc runSuite( + name: string, + fn: proc(args: ptr ThreadArgs): pointer {.noconv, gcsafe.} + ) = + var chan: ChannelRaw + + for i in Unbuffered .. Buffered: + if i == Unbuffered: + chan = allocChannel(size = 32, n = 1) + check: + peek(chan) == 0 + capacity(chan) == 1 + isBuffered(chan) == false + isUnbuffered(chan) == true + else: + chan = allocChannel(size = int.sizeof.int, n = 7) + check: + peek(chan) == 0 + capacity(chan) == 7 + isBuffered(chan) == true + isUnbuffered(chan) == false + + var threads: array[2, Pthread] + var args = [ + ThreadArgs(ID: 0, chan: chan), + ThreadArgs(ID: 1, chan: chan) + ] + + discard pthread_create(threads[0], nil, fn, args[0].addr) + discard pthread_create(threads[1], nil, fn, args[1].addr) + + discard pthread_join(threads[0], nil) + discard pthread_join(threads[1], nil) + + freeChannel(chan) + +# ---------------------------------------------------------------------------------- + +proc thread_func(args: ptr ThreadArgs): pointer {.noconv.} = + + # Worker RECEIVER: + # --------- + # <- chan + # <- chan + # <- chan + # + # Worker SENDER: + # --------- + # chan <- 42 + # chan <- 53 + # chan <- 64 + # + + Worker(Receiver): + var val: int + for j in 0 ..< 3: + channel_receive_loop(args.chan, val.addr, val.sizeof.int): + # Busy loop, normally it should yield + discard + check: val == 42 + j*11 + + Worker(Sender): + var val: int + check: peek(args.chan) == 0 + for j in 0 ..< 3: + val = 42 + j*11 + channel_send_loop(args.chan, val.addr, val.sizeof.int): + # Busy loop, normally it should yield + discard + + return nil + +runSuite("[ChannelRaw] 2 threads can send data", thread_func) + +# ---------------------------------------------------------------------------------- + +iterator pairs(chan: ChannelRaw, T: typedesc): (int, T) = + var i = 0 + var x: T + while not isClosed(chan) or peek(chan) > 0: + let r = recvMpmc(chan, x.addr, x.sizeof.int, true) + # printf("x: %d, r: %d\n", x, r) + if r: + yield (i, x) + inc i + +proc thread_func_2(args: ptr ThreadArgs): pointer {.noconv.} = + # Worker RECEIVER: + # --------- + # <- chan until closed and empty + # + # Worker SENDER: + # --------- + # chan <- 42, 53, 64, ... + + const N = 100 + + Worker(Receiver): + for j, val in pairs(args.chan, int): + # TODO: Need special handling that doesn't allocate + # in thread with no GC + # when check fails + # + check: val == 42 + j*11 + + Worker(Sender): + var val: int + check: peek(args.chan) == 0 + for j in 0 ..< N: + val = 42 + j*11 + channel_send_loop(args.chan, val.addr, int.sizeof.int): + discard + discard channelCloseMpmc(args.chan) + + return nil + +runSuite("[ChannelRaw] channel_close, freeChannel, channelCache", thread_func_2) + +# ---------------------------------------------------------------------------------- + +proc isCached(chan: ChannelRaw): bool = + assert not chan.isNil + + var p = channelCache + while not p.isNil: + if chan.itemsize == p.chanSize and + chan.size == p.chanN: + for i in 0 ..< p.numCached: + if chan == p.cache[i]: + return true + # No more channel in cache can match + return false + p = p.next + return false + +block: # [ChannelRaw] ChannelRaw caching implementation + + # Start from clean cache slate + freeChannelCache() + + block: # Explicit caches allocation + check: + allocChannelCache(int sizeof(char), 4) + allocChannelCache(int sizeof(int), 8) + allocChannelCache(int sizeof(ptr float64), 16) + + # Don't create existing channel cache + not allocChannelCache(int sizeof(char), 4) + not allocChannelCache(int sizeof(int), 8) + not allocChannelCache(int sizeof(ptr float64), 16) + + check: + channelCacheLen == 3 + + # --------------------------------- + var chan, stash: array[10, ChannelRaw] + + block: # Implicit caches allocation + + chan[0] = allocChannel(sizeof(char), 4) + chan[1] = allocChannel(sizeof(int32), 8) + chan[2] = allocChannel(sizeof(ptr float64), 16) + + chan[3] = allocChannel(sizeof(char), 5) + chan[4] = allocChannel(sizeof(int64), 8) + chan[5] = allocChannel(sizeof(ptr float32), 24) + + # We have caches ready to store specific channel kinds + check: channelCacheLen == 6 # Cumulated with previous test + # But they are not in cache while in use + check: + not chan[0].isCached + not chan[1].isCached + not chan[2].isCached + not chan[3].isCached + not chan[4].isCached + not chan[5].isCached + + block: # Freed channels are returned to cache + stash[0..5] = chan.toOpenArray(0, 5) + for i in 0 .. 5: + # Free the channels + freeChannel(chan[i]) + + check: + stash[0].isCached + stash[1].isCached + stash[2].isCached + stash[3].isCached + stash[4].isCached + stash[5].isCached + + block: # Cached channels are being reused + + chan[6] = allocChannel(sizeof(char), 4) + chan[7] = allocChannel(sizeof(int32), 8) + chan[8] = allocChannel(sizeof(ptr float32), 16) + chan[9] = allocChannel(sizeof(ptr float64), 16) + + # All (itemsize, queue size, implementation) were already allocated + check: channelCacheLen == 6 + + # We reused old channels from cache + check: + chan[6] == stash[0] + chan[7] == stash[1] + chan[8] == stash[2] + # chan[9] - required a fresh alloc + + block: # Clearing the cache + + stash[6..9] = chan.toOpenArray(6, 9) + + for i in 6 .. 9: + freeChannel(chan[i]) + + check: + stash[6].isCached + stash[7].isCached + stash[8].isCached + stash[9].isCached + + freeChannelCache() + + # Check that nothing is cached anymore + for i in 0 .. 9: + check: not stash[i].isCached + # And length is reset to 0 + check: channelCacheLen == 0 + + # Cache can grow again + chan[0] = allocChannel(sizeof((int, float, int32, uint)), 1) + chan[1] = allocChannel(sizeof(int32), 0) + chan[2] = allocChannel(sizeof(int32), 0) + + check: channelCacheLen == 2 + + # Interleave cache clear and channel free + freeChannelCache() + check: channelCacheLen == 0 + + freeChannel(chan[0]) + freeChannel(chan[1]) + freeChannel(chan[2]) diff --git a/tests/stdlib/tchannels_simple.nim b/tests/stdlib/tchannels_simple.nim new file mode 100644 index 000000000..dc4857c3e --- /dev/null +++ b/tests/stdlib/tchannels_simple.nim @@ -0,0 +1,68 @@ +discard """ + matrix: "--threads:on --gc:orc; --threads:on --gc:arc" + disabled: "freebsd" +""" + +import std/channels +import std/os + +var chan = newChannel[string]() + +# This proc will be run in another thread using the threads module. +proc firstWorker() = + chan.send("Hello World!") + +# This is another proc to run in a background thread. This proc takes a while +# to send the message since it sleeps for 2 seconds (or 2000 milliseconds). +proc secondWorker() = + sleep(2000) + chan.send("Another message") + + +# Launch the worker. +var worker1: Thread[void] +createThread(worker1, firstWorker) + +# Block until the message arrives, then print it out. +var dest = "" +chan.recv(dest) +doAssert dest == "Hello World!" + +# Wait for the thread to exit before moving on to the next example. +worker1.joinThread() + +# Launch the other worker. +var worker2: Thread[void] +createThread(worker2, secondWorker) +# This time, use a non-blocking approach with tryRecv. +# Since the main thread is not blocked, it could be used to perform other +# useful work while it waits for data to arrive on the channel. + +var messages: seq[string] +var msg = "" +while true: + let tried = chan.tryRecv(msg) + if tried: + messages.add move(msg) + break + + messages.add "Pretend I'm doing useful work..." + # For this example, sleep in order not to flood stdout with the above + # message. + sleep(400) + +# Wait for the second thread to exit before cleaning up the channel. +worker2.joinThread() + +# Clean up the channel. +doAssert chan.close() +doAssert messages[^1] == "Another message" +doAssert messages.len >= 2 + + +block: + let chan0 = newChannel[int]() + let chan1 = chan0 + block: + let chan3 = chan0 + let chan4 = chan0 |