summary refs log tree commit diff stats
path: root/tests
diff options
context:
space:
mode:
authorflywind <xzsflywind@gmail.com>2021-03-12 20:33:47 +0800
committerGitHub <noreply@github.com>2021-03-12 13:33:47 +0100
commita0b8a3d920e7d869523b1182e846ae333fcb8844 (patch)
tree45f5e43f6fce4e8c62352938a13d5949fd15799a /tests
parent2e730f1452254ae7c86d89b167f0fe9e002f57b4 (diff)
downloadNim-a0b8a3d920e7d869523b1182e846ae333fcb8844.tar.gz
New channels implementation for ORC (#17305)
* Update lib/std/channels.nim
* Rename tchannel_pthread.nim to tchannels_pthread.nim
* Rename tchannel_simple.nim to tchannels_simple.nim

Co-authored-by: Mamy Ratsimbazafy <mamy_github@numforge.co>
Diffstat (limited to 'tests')
-rw-r--r--tests/stdlib/tchannels_pthread.nim321
-rw-r--r--tests/stdlib/tchannels_simple.nim68
2 files changed, 389 insertions, 0 deletions
diff --git a/tests/stdlib/tchannels_pthread.nim b/tests/stdlib/tchannels_pthread.nim
new file mode 100644
index 000000000..a9708d8fb
--- /dev/null
+++ b/tests/stdlib/tchannels_pthread.nim
@@ -0,0 +1,321 @@
+discard """
+  targets: "c cpp"
+  matrix: "--gc:orc --threads:on; --gc:orc --threads:on -d:blockingTest"
+  disabled: "windows"
+  disabled: "bsd"
+  disabled: "osx"
+"""
+
+include std/channels
+
+import std/unittest
+
+
+type
+  ChannelBufKind = enum
+    Unbuffered # Unbuffered (blocking) channel
+    Buffered   # Buffered (non-blocking channel)
+
+
+proc capacity(chan: ChannelRaw): int {.inline.} = chan.size
+func isBuffered(chan: ChannelRaw): bool =
+  chan.size - 1 > 0
+
+when defined(blockingTest):
+  const nonBlocking = false
+else:
+  const nonBlocking = true
+
+type
+  Pthread {.importc: "pthread_t", header: "<sys/types.h>".} = distinct culong
+  PthreadAttr* {.byref, importc: "pthread_attr_t", header: "<sys/types.h>".} = object
+  Errno* = distinct cint
+
+proc pthread_create[T](
+      thread: var Pthread,
+      attr: ptr PthreadAttr, # In Nim this is a var and how Nim sets a custom stack
+      fn: proc (x: ptr T): pointer {.thread, noconv.},
+      arg: ptr T
+  ): Errno {.header: "<sys/types.h>".}
+
+proc pthread_join(
+      thread: Pthread,
+      thread_exit_status: ptr pointer
+    ): Errno {.header: "<pthread.h>".}
+
+template channel_send_loop(chan: ChannelRaw,
+                      data: sink pointer,
+                      size: int,
+                      body: untyped): untyped =
+  while not sendMpmc(chan, data, size, nonBlocking):
+    body
+
+template channel_receive_loop(chan: ChannelRaw,
+                        data: pointer,
+                        size: int,
+                        body: untyped): untyped =
+  while not recvMpmc(chan, data, size, nonBlocking):
+    body
+
+
+# Without threads:on or release,
+# worker threads will crash on popFrame
+
+import std/unittest
+
+type ThreadArgs = object
+  ID: int
+  chan: ChannelRaw
+
+template Worker(id: int, body: untyped): untyped {.dirty.} =
+  if args.ID == id:
+    body
+
+
+const Sender = 1
+const Receiver = 0
+
+proc runSuite(
+            name: string,
+            fn: proc(args: ptr ThreadArgs): pointer {.noconv, gcsafe.}
+          ) =
+  var chan: ChannelRaw
+
+  for i in Unbuffered .. Buffered:
+    if i == Unbuffered:
+      chan = allocChannel(size = 32, n = 1)
+      check:
+        peek(chan) == 0
+        capacity(chan) == 1
+        isBuffered(chan) == false
+        isUnbuffered(chan) == true
+    else:
+      chan = allocChannel(size = int.sizeof.int, n = 7)
+      check:
+        peek(chan) == 0
+        capacity(chan) == 7
+        isBuffered(chan) == true
+        isUnbuffered(chan) == false
+
+      var threads: array[2, Pthread]
+      var args = [
+        ThreadArgs(ID: 0, chan: chan),
+        ThreadArgs(ID: 1, chan: chan)
+      ]
+
+      discard pthread_create(threads[0], nil, fn, args[0].addr)
+      discard pthread_create(threads[1], nil, fn, args[1].addr)
+
+      discard pthread_join(threads[0], nil)
+      discard pthread_join(threads[1], nil)
+
+      freeChannel(chan)
+
+# ----------------------------------------------------------------------------------
+
+proc thread_func(args: ptr ThreadArgs): pointer {.noconv.} =
+
+  # Worker RECEIVER:
+  # ---------
+  # <- chan
+  # <- chan
+  # <- chan
+  #
+  # Worker SENDER:
+  # ---------
+  # chan <- 42
+  # chan <- 53
+  # chan <- 64
+  #
+
+  Worker(Receiver):
+    var val: int
+    for j in 0 ..< 3:
+      channel_receive_loop(args.chan, val.addr, val.sizeof.int):
+        # Busy loop, normally it should yield
+        discard
+      check: val == 42 + j*11
+
+  Worker(Sender):
+    var val: int
+    check: peek(args.chan) == 0
+    for j in 0 ..< 3:
+      val = 42 + j*11
+      channel_send_loop(args.chan, val.addr, val.sizeof.int):
+        # Busy loop, normally it should yield
+        discard
+
+  return nil
+
+runSuite("[ChannelRaw] 2 threads can send data", thread_func)
+
+# ----------------------------------------------------------------------------------
+
+iterator pairs(chan: ChannelRaw, T: typedesc): (int, T) =
+  var i = 0
+  var x: T
+  while not isClosed(chan) or peek(chan) > 0:
+    let r = recvMpmc(chan, x.addr, x.sizeof.int, true)
+    # printf("x: %d, r: %d\n", x, r)
+    if r:
+      yield (i, x)
+      inc i
+
+proc thread_func_2(args: ptr ThreadArgs): pointer {.noconv.} =
+  # Worker RECEIVER:
+  # ---------
+  # <- chan until closed and empty
+  #
+  # Worker SENDER:
+  # ---------
+  # chan <- 42, 53, 64, ...
+
+  const N = 100
+
+  Worker(Receiver):
+    for j, val in pairs(args.chan, int):
+      # TODO: Need special handling that doesn't allocate
+      #       in thread with no GC
+      #       when check fails
+      #
+      check: val == 42 + j*11
+
+  Worker(Sender):
+    var val: int
+    check: peek(args.chan) == 0
+    for j in 0 ..< N:
+      val = 42 + j*11
+      channel_send_loop(args.chan, val.addr, int.sizeof.int):
+        discard
+    discard channelCloseMpmc(args.chan)
+
+  return nil
+
+runSuite("[ChannelRaw] channel_close, freeChannel, channelCache", thread_func_2)
+
+# ----------------------------------------------------------------------------------
+
+proc isCached(chan: ChannelRaw): bool =
+  assert not chan.isNil
+
+  var p = channelCache
+  while not p.isNil:
+    if chan.itemsize == p.chanSize and
+        chan.size == p.chanN:
+      for i in 0 ..< p.numCached:
+        if chan == p.cache[i]:
+          return true
+      # No more channel in cache can match
+      return false
+    p = p.next
+  return false
+
+block: # [ChannelRaw] ChannelRaw caching implementation
+
+  # Start from clean cache slate
+  freeChannelCache()
+
+  block: # Explicit caches allocation
+    check:
+      allocChannelCache(int sizeof(char), 4)
+      allocChannelCache(int sizeof(int), 8)
+      allocChannelCache(int sizeof(ptr float64), 16)
+
+      # Don't create existing channel cache
+      not allocChannelCache(int sizeof(char), 4)
+      not allocChannelCache(int sizeof(int), 8)
+      not allocChannelCache(int sizeof(ptr float64), 16)
+
+    check:
+      channelCacheLen == 3
+
+  # ---------------------------------
+  var chan, stash: array[10, ChannelRaw]
+
+  block: # Implicit caches allocation
+
+    chan[0] = allocChannel(sizeof(char), 4)
+    chan[1] = allocChannel(sizeof(int32), 8)
+    chan[2] = allocChannel(sizeof(ptr float64), 16)
+
+    chan[3] = allocChannel(sizeof(char), 5)
+    chan[4] = allocChannel(sizeof(int64), 8)
+    chan[5] = allocChannel(sizeof(ptr float32), 24)
+
+    # We have caches ready to store specific channel kinds
+    check: channelCacheLen == 6 # Cumulated with previous test
+    # But they are not in cache while in use
+    check:
+      not chan[0].isCached
+      not chan[1].isCached
+      not chan[2].isCached
+      not chan[3].isCached
+      not chan[4].isCached
+      not chan[5].isCached
+
+  block: # Freed channels are returned to cache
+    stash[0..5] = chan.toOpenArray(0, 5)
+    for i in 0 .. 5:
+      # Free the channels
+      freeChannel(chan[i])
+
+    check:
+      stash[0].isCached
+      stash[1].isCached
+      stash[2].isCached
+      stash[3].isCached
+      stash[4].isCached
+      stash[5].isCached
+
+  block: # Cached channels are being reused
+
+    chan[6] = allocChannel(sizeof(char), 4)
+    chan[7] = allocChannel(sizeof(int32), 8)
+    chan[8] = allocChannel(sizeof(ptr float32), 16)
+    chan[9] = allocChannel(sizeof(ptr float64), 16)
+
+    # All (itemsize, queue size, implementation) were already allocated
+    check: channelCacheLen == 6
+
+    # We reused old channels from cache
+    check:
+      chan[6] == stash[0]
+      chan[7] == stash[1]
+      chan[8] == stash[2]
+      # chan[9] - required a fresh alloc
+
+  block: # Clearing the cache
+
+    stash[6..9] = chan.toOpenArray(6, 9)
+
+    for i in 6 .. 9:
+      freeChannel(chan[i])
+
+    check:
+      stash[6].isCached
+      stash[7].isCached
+      stash[8].isCached
+      stash[9].isCached
+
+    freeChannelCache()
+
+    # Check that nothing is cached anymore
+    for i in 0 .. 9:
+      check: not stash[i].isCached
+    # And length is reset to 0
+    check: channelCacheLen == 0
+
+    # Cache can grow again
+    chan[0] = allocChannel(sizeof((int, float, int32, uint)), 1)
+    chan[1] = allocChannel(sizeof(int32), 0)
+    chan[2] = allocChannel(sizeof(int32), 0)
+
+    check: channelCacheLen == 2
+
+    # Interleave cache clear and channel free
+    freeChannelCache()
+    check: channelCacheLen == 0
+
+    freeChannel(chan[0])
+    freeChannel(chan[1])
+    freeChannel(chan[2])
diff --git a/tests/stdlib/tchannels_simple.nim b/tests/stdlib/tchannels_simple.nim
new file mode 100644
index 000000000..dc4857c3e
--- /dev/null
+++ b/tests/stdlib/tchannels_simple.nim
@@ -0,0 +1,68 @@
+discard """
+  matrix: "--threads:on --gc:orc; --threads:on --gc:arc"
+  disabled: "freebsd"
+"""
+
+import std/channels
+import std/os
+
+var chan = newChannel[string]()
+
+# This proc will be run in another thread using the threads module.
+proc firstWorker() =
+  chan.send("Hello World!")
+
+# This is another proc to run in a background thread. This proc takes a while
+# to send the message since it sleeps for 2 seconds (or 2000 milliseconds).
+proc secondWorker() =
+  sleep(2000)
+  chan.send("Another message")
+
+
+# Launch the worker.
+var worker1: Thread[void]
+createThread(worker1, firstWorker)
+
+# Block until the message arrives, then print it out.
+var dest = ""
+chan.recv(dest)
+doAssert dest == "Hello World!"
+
+# Wait for the thread to exit before moving on to the next example.
+worker1.joinThread()
+
+# Launch the other worker.
+var worker2: Thread[void]
+createThread(worker2, secondWorker)
+# This time, use a non-blocking approach with tryRecv.
+# Since the main thread is not blocked, it could be used to perform other
+# useful work while it waits for data to arrive on the channel.
+
+var messages: seq[string]
+var msg = ""
+while true:
+  let tried = chan.tryRecv(msg)
+  if tried:
+    messages.add move(msg)
+    break
+  
+  messages.add "Pretend I'm doing useful work..."
+  # For this example, sleep in order not to flood stdout with the above
+  # message.
+  sleep(400)
+
+# Wait for the second thread to exit before cleaning up the channel.
+worker2.joinThread()
+
+# Clean up the channel.
+doAssert chan.close()
+doAssert messages[^1] == "Another message"
+doAssert messages.len >= 2
+
+
+block:
+  let chan0 = newChannel[int]()
+  let chan1 = chan0
+  block:
+    let chan3 = chan0
+    let chan4 = chan0