summary refs log tree commit diff stats
path: root/tests/async
diff options
context:
space:
mode:
Diffstat (limited to 'tests/async')
-rw-r--r--tests/async/tasync_forward.nim9
-rw-r--r--tests/async/tasyncall.nim68
-rw-r--r--tests/async/tasyncdiscard.nim2
-rw-r--r--tests/async/tasynceverror.nim6
-rw-r--r--tests/async/tasyncexceptions.nim2
-rw-r--r--tests/async/tasyncfile.nim2
-rw-r--r--tests/async/tasynctry.nim10
-rw-r--r--tests/async/tawaitsemantics.nim59
-rw-r--r--tests/async/tgeneric_async.nim9
-rw-r--r--tests/async/tioselectors.nim408
-rw-r--r--tests/async/tioselectors.nim.cfg1
-rw-r--r--tests/async/tnewasyncudp.nim102
-rw-r--r--tests/async/treturn_await.nim23
-rw-r--r--tests/async/twinasyncrw.nim257
14 files changed, 948 insertions, 10 deletions
diff --git a/tests/async/tasync_forward.nim b/tests/async/tasync_forward.nim
new file mode 100644
index 000000000..ffb7acafd
--- /dev/null
+++ b/tests/async/tasync_forward.nim
@@ -0,0 +1,9 @@
+
+import asyncdispatch
+
+# bug #1970
+
+proc foo {.async.}
+
+proc foo {.async.} =
+  discard
diff --git a/tests/async/tasyncall.nim b/tests/async/tasyncall.nim
new file mode 100644
index 000000000..60ba557cc
--- /dev/null
+++ b/tests/async/tasyncall.nim
@@ -0,0 +1,68 @@
+discard """
+  file: "tasyncall.nim"
+  exitcode: 0
+"""
+import times, sequtils
+import asyncdispatch
+
+const
+  taskCount = 10
+  sleepDuration = 500
+
+proc futureWithValue(x: int): Future[int] {.async.} =
+  await sleepAsync(sleepDuration)
+  return x
+
+proc futureWithoutValue() {.async.} =
+  await sleepAsync(1000)
+
+proc testFuturesWithValue(x: int): seq[int] =
+  var tasks = newSeq[Future[int]](taskCount)
+
+  for i in 0..<taskCount:
+    tasks[i] = futureWithValue(x)
+
+  result = waitFor all(tasks)
+
+proc testFuturesWithoutValues() =
+  var tasks = newSeq[Future[void]](taskCount)
+
+  for i in 0..<taskCount:
+    tasks[i] = futureWithoutValue()
+
+  waitFor all(tasks)
+
+proc testVarargs(x, y, z: int): seq[int] =
+  let
+    a = futureWithValue(x)
+    b = futureWithValue(y)
+    c = futureWithValue(z)
+
+  result = waitFor all(a, b, c)
+
+block:
+  let
+    startTime = cpuTime()
+    results = testFuturesWithValue(42)
+    expected = repeat(42, taskCount)
+    execTime = cpuTime() - startTime
+
+  doAssert execTime * 1000 < taskCount * sleepDuration
+  doAssert results == expected
+
+block:
+  let startTime = cpuTime()
+  testFuturesWithoutValues()
+  let execTime = cpuTime() - startTime
+
+  doAssert execTime * 1000 < taskCount * sleepDuration
+
+block:
+  let
+    startTime = cpuTime()
+    results = testVarargs(1, 2, 3)
+    expected = @[1, 2, 3]
+    execTime = cpuTime() - startTime
+
+  doAssert execTime * 100 < taskCount * sleepDuration
+  doAssert results == expected
diff --git a/tests/async/tasyncdiscard.nim b/tests/async/tasyncdiscard.nim
index 71aba29e2..e7c87ad42 100644
--- a/tests/async/tasyncdiscard.nim
+++ b/tests/async/tasyncdiscard.nim
@@ -36,4 +36,4 @@ proc main {.async.} =
   discard await g()
   echo 6
 
-asyncCheck main()
+waitFor(main())
diff --git a/tests/async/tasynceverror.nim b/tests/async/tasynceverror.nim
index 22b4fe9a7..dd05c831b 100644
--- a/tests/async/tasynceverror.nim
+++ b/tests/async/tasynceverror.nim
@@ -1,9 +1,9 @@
 discard """
   file: "tasynceverror.nim"
   exitcode: 1
-  outputsub: "Error: unhandled exception: Connection reset by peer"
+  outputsub: "Error: unhandled exception: "
 """
-
+# error message is actually different on OSX
 import
     asyncdispatch,
     asyncnet,
@@ -43,7 +43,7 @@ else:
         await s.connect(testHost, testPort)
 
         var ps = await ls.accept()
-        SocketHandle(ls).close()
+        closeSocket(ls)
 
         await ps.send("test 1", flags={})
         s.close()
diff --git a/tests/async/tasyncexceptions.nim b/tests/async/tasyncexceptions.nim
index aab08e30f..efe31ef27 100644
--- a/tests/async/tasyncexceptions.nim
+++ b/tests/async/tasyncexceptions.nim
@@ -5,6 +5,8 @@ discard """
 """
 import asyncdispatch
 
+# Note: This is a test case for a bug.
+
 proc accept(): Future[int] {.async.} =
   await sleepAsync(100)
   result = 4
diff --git a/tests/async/tasyncfile.nim b/tests/async/tasyncfile.nim
index 05cda5e5f..26a9bb391 100644
--- a/tests/async/tasyncfile.nim
+++ b/tests/async/tasyncfile.nim
@@ -24,7 +24,7 @@ proc main() {.async.} =
     var file = openAsync(fn, fmAppend)
     await file.write("\ntest2")
     let errorTest = file.readAll()
-    await errorTest
+    echo await errorTest
     doAssert errorTest.failed
     file.close()
     file = openAsync(fn, fmRead)
diff --git a/tests/async/tasynctry.nim b/tests/async/tasynctry.nim
index f77198e2e..5930f296f 100644
--- a/tests/async/tasynctry.nim
+++ b/tests/async/tasynctry.nim
@@ -48,7 +48,7 @@ proc catch() {.async.} =
   except OSError, EInvalidField:
     assert false
 
-asyncCheck catch()
+waitFor catch()
 
 proc test(): Future[bool] {.async.} =
   result = false
@@ -92,13 +92,13 @@ proc test4(): Future[int] {.async.} =
     result = 2
 
 var x = test()
-assert x.read
+assert x.waitFor()
 
 x = test2()
-assert x.read
+assert x.waitFor()
 
 var y = test3()
-assert y.read == 2
+assert y.waitFor() == 2
 
 y = test4()
-assert y.read == 2
+assert y.waitFor() == 2
diff --git a/tests/async/tawaitsemantics.nim b/tests/async/tawaitsemantics.nim
new file mode 100644
index 000000000..3e0c3903e
--- /dev/null
+++ b/tests/async/tawaitsemantics.nim
@@ -0,0 +1,59 @@
+discard """
+  file: "tawaitsemantics.nim"
+  exitcode: 0
+  output: '''
+Error caught
+Test infix
+Test call
+'''
+"""
+
+import asyncdispatch
+
+# This tests the behaviour of 'await' under different circumstances.
+# For example, when awaiting Future variable and this future has failed the
+# exception shouldn't be raised as described here
+# https://github.com/nim-lang/Nim/issues/4170
+
+proc thrower(): Future[void] =
+  result = newFuture[void]()
+  result.fail(newException(Exception, "Test"))
+
+proc dummy: Future[void] =
+  result = newFuture[void]()
+  result.complete()
+
+proc testInfix() {.async.} =
+  # Test the infix operator semantics.
+  var fut = thrower()
+  var fut2 = dummy()
+  await fut or fut2 # Shouldn't raise.
+  # TODO: what about: await thrower() or fut2?
+
+proc testCall() {.async.} =
+  await thrower()
+
+proc tester() {.async.} =
+  # Test that we can handle exceptions without 'try'
+  var fut = thrower()
+  doAssert fut.finished
+  doAssert fut.failed
+  doAssert fut.error.msg == "Test"
+  await fut # We are awaiting a 'Future', so no `read` occurs.
+  doAssert fut.finished
+  doAssert fut.failed
+  doAssert fut.error.msg == "Test"
+  echo("Error caught")
+
+  fut = testInfix()
+  await fut
+  doAssert fut.finished
+  doAssert(not fut.failed)
+  echo("Test infix")
+
+  fut = testCall()
+  await fut
+  doAssert fut.failed
+  echo("Test call")
+
+waitFor(tester())
diff --git a/tests/async/tgeneric_async.nim b/tests/async/tgeneric_async.nim
new file mode 100644
index 000000000..af6370181
--- /dev/null
+++ b/tests/async/tgeneric_async.nim
@@ -0,0 +1,9 @@
+
+import asyncdispatch
+
+when true:
+  # bug #2377
+  proc test[T](v: T) {.async.} =
+    echo $v
+
+  asyncCheck test[int](1)
diff --git a/tests/async/tioselectors.nim b/tests/async/tioselectors.nim
new file mode 100644
index 000000000..2237de01a
--- /dev/null
+++ b/tests/async/tioselectors.nim
@@ -0,0 +1,408 @@
+discard """
+  file: "tioselectors.nim"
+  output: "All tests passed!"
+"""
+import ioselectors
+
+const hasThreadSupport = compileOption("threads")
+
+template processTest(t, x: untyped) =
+  #stdout.write(t)
+  #stdout.flushFile()
+  if not x: echo(t & " FAILED\r\n")
+
+when not defined(windows):
+  import os, posix, osproc, nativesockets, times
+
+  const supportedPlatform = defined(macosx) or defined(freebsd) or
+                            defined(netbsd) or defined(openbsd) or
+                            defined(linux)
+
+  proc socket_notification_test(): bool =
+    proc create_test_socket(): SocketHandle =
+      var sock = posix.socket(posix.AF_INET, posix.SOCK_STREAM,
+                              posix.IPPROTO_TCP)
+      var x: int = fcntl(sock, F_GETFL, 0)
+      if x == -1: raiseOSError(osLastError())
+      else:
+        var mode = x or O_NONBLOCK
+        if fcntl(sock, F_SETFL, mode) == -1:
+          raiseOSError(osLastError())
+      result = sock
+
+    var client_message = "SERVER HELLO =>"
+    var server_message = "CLIENT HELLO"
+    var buffer : array[128, char]
+
+    var selector = newSelector[int]()
+    var client_socket = create_test_socket()
+    var server_socket = create_test_socket()
+
+    registerHandle(selector, server_socket, {Event.Read}, 0)
+    registerHandle(selector, client_socket, {Event.Write}, 0)
+
+    var option : int32 = 1
+    if setsockopt(server_socket, cint(SOL_SOCKET), cint(SO_REUSEADDR),
+                  addr(option), sizeof(option).SockLen) < 0:
+      raiseOSError(osLastError())
+
+    var aiList = getAddrInfo("0.0.0.0", Port(13337))
+    if bindAddr(server_socket, aiList.ai_addr,
+                aiList.ai_addrlen.Socklen) < 0'i32:
+      dealloc(aiList)
+      raiseOSError(osLastError())
+    discard server_socket.listen()
+    dealloc(aiList)
+
+    aiList = getAddrInfo("127.0.0.1", Port(13337))
+    discard posix.connect(client_socket, aiList.ai_addr,
+                          aiList.ai_addrlen.Socklen)
+    dealloc(aiList)
+    discard selector.select(100)
+    var rc1 = selector.select(100)
+    assert(len(rc1) == 2)
+
+    var sockAddress: SockAddr
+    var addrLen = sizeof(sockAddress).Socklen
+    var server2_socket = accept(server_socket,
+                                cast[ptr SockAddr](addr(sockAddress)),
+                                addr(addrLen))
+    assert(server2_socket != osInvalidSocket)
+    selector.registerHandle(server2_socket, {Event.Read}, 0)
+
+    if posix.send(client_socket, addr(client_message[0]),
+                  len(client_message), 0) == -1:
+      raiseOSError(osLastError())
+
+    selector.updateHandle(client_socket, {Event.Read})
+
+    var rc2 = selector.select(100)
+    assert(len(rc2) == 1)
+
+    var read_count = posix.recv(server2_socket, addr buffer[0], 128, 0)
+    if read_count == -1:
+      raiseOSError(osLastError())
+
+    assert(read_count == len(client_message))
+    var test1 = true
+    for i in 0..<read_count:
+      if client_message[i] != buffer[i]:
+        test1 = false
+        break
+    assert(test1)
+
+    selector.updateHandle(server2_socket, {Event.Write})
+    var rc3 = selector.select(0)
+    assert(len(rc3) == 1)
+    if posix.send(server2_socket, addr(server_message[0]),
+                  len(server_message), 0) == -1:
+      raiseOSError(osLastError())
+    selector.updateHandle(server2_socket, {Event.Read})
+
+    var rc4 = selector.select(100)
+    assert(len(rc4) == 1)
+    read_count = posix.recv(client_socket, addr(buffer[0]), 128, 0)
+    if read_count == -1:
+      raiseOSError(osLastError())
+
+    assert(read_count == len(server_message))
+    var test2 = true
+    for i in 0..<read_count:
+      if server_message[i] != buffer[i]:
+        test2 = false
+        break
+    assert(test2)
+
+    selector.unregister(server_socket)
+    selector.unregister(server2_socket)
+    selector.unregister(client_socket)
+    discard posix.close(server_socket)
+    discard posix.close(server2_socket)
+    discard posix.close(client_socket)
+    assert(selector.isEmpty())
+    close(selector)
+    result = true
+
+  proc event_notification_test(): bool =
+    var selector = newSelector[int]()
+    var event = newSelectEvent()
+    selector.registerEvent(event, 1)
+    selector.flush()
+    event.setEvent()
+    var rc1 = selector.select(0)
+    event.setEvent()
+    var rc2 = selector.select(0)
+    var rc3 = selector.select(0)
+    assert(len(rc1) == 1 and len(rc2) == 1 and len(rc3) == 0)
+    var ev1 = rc1[0].data
+    var ev2 = rc2[0].data
+    assert(ev1 == 1 and ev2 == 1)
+    selector.unregister(event)
+    event.close()
+    assert(selector.isEmpty())
+    selector.close()
+    result = true
+
+  when supportedPlatform:
+    proc timer_notification_test(): bool =
+      var selector = newSelector[int]()
+      var timer = selector.registerTimer(100, false, 0)
+      var rc1 = selector.select(140)
+      var rc2 = selector.select(140)
+      assert(len(rc1) == 1 and len(rc2) == 1)
+      selector.unregister(timer)
+      selector.flush()
+      selector.registerTimer(100, true, 0)
+      var rc3 = selector.select(120)
+      var rc4 = selector.select(120)
+      assert(len(rc3) == 1 and len(rc4) == 0)
+      assert(selector.isEmpty())
+      selector.close()
+      result = true
+
+    proc process_notification_test(): bool =
+      var selector = newSelector[int]()
+      var process2 = startProcess("/bin/sleep", "", ["2"], nil,
+                           {poStdErrToStdOut, poUsePath})
+      discard startProcess("/bin/sleep", "", ["1"], nil,
+                           {poStdErrToStdOut, poUsePath})
+
+      selector.registerProcess(process2.processID, 0)
+      var rc1 = selector.select(1500)
+      var rc2 = selector.select(1500)
+      var r = len(rc1) + len(rc2)
+      assert(r == 1)
+      result = true
+
+    proc signal_notification_test(): bool =
+      var sigset1n, sigset1o, sigset2n, sigset2o: Sigset
+      var pid = posix.getpid()
+
+      discard sigemptyset(sigset1n)
+      discard sigemptyset(sigset1o)
+      discard sigemptyset(sigset2n)
+      discard sigemptyset(sigset2o)
+
+      when hasThreadSupport:
+        if pthread_sigmask(SIG_BLOCK, sigset1n, sigset1o) == -1:
+          raiseOSError(osLastError())
+      else:
+        if sigprocmask(SIG_BLOCK, sigset1n, sigset1o) == -1:
+          raiseOSError(osLastError())
+
+      var selector = newSelector[int]()
+      var s1 = selector.registerSignal(SIGUSR1, 1)
+      var s2 = selector.registerSignal(SIGUSR2, 2)
+      var s3 = selector.registerSignal(SIGTERM, 3)
+      selector.flush()
+
+      discard posix.kill(pid, SIGUSR1)
+      discard posix.kill(pid, SIGUSR2)
+      discard posix.kill(pid, SIGTERM)
+      var rc = selector.select(0)
+      selector.unregister(s1)
+      selector.unregister(s2)
+      selector.unregister(s3)
+
+      when hasThreadSupport:
+        if pthread_sigmask(SIG_BLOCK, sigset2n, sigset2o) == -1:
+          raiseOSError(osLastError())
+      else:
+        if sigprocmask(SIG_BLOCK, sigset2n, sigset2o) == -1:
+          raiseOSError(osLastError())
+
+      assert(len(rc) == 3)
+      assert(rc[0].data + rc[1].data + rc[2].data == 6) # 1 + 2 + 3
+      assert(equalMem(addr sigset1o, addr sigset2o, sizeof(Sigset)))
+      assert(selector.isEmpty())
+      result = true
+
+  when hasThreadSupport:
+
+    var counter = 0
+
+    proc event_wait_thread(event: SelectEvent) {.thread.} =
+      var selector = newSelector[int]()
+      selector.registerEvent(event, 1)
+      selector.flush()
+      var rc = selector.select(1000)
+      if len(rc) == 1:
+        inc(counter)
+      selector.unregister(event)
+      assert(selector.isEmpty())
+
+    proc mt_event_test(): bool =
+      var
+        thr: array[0..7, Thread[SelectEvent]]
+      var selector = newSelector[int]()
+      var sock = newNativeSocket()
+      var event = newSelectEvent()
+      for i in 0..high(thr):
+        createThread(thr[i], event_wait_thread, event)
+      selector.registerHandle(sock, {Event.Read}, 1)
+      discard selector.select(500)
+      selector.unregister(sock)
+      event.setEvent()
+      joinThreads(thr)
+      assert(counter == 1)
+      result = true
+
+  processTest("Socket notification test...", socket_notification_test())
+  processTest("User event notification test...", event_notification_test())
+  when hasThreadSupport:
+    processTest("Multithreaded user event notification test...",
+                mt_event_test())
+  when supportedPlatform:
+    processTest("Timer notification test...", timer_notification_test())
+    processTest("Process notification test...", process_notification_test())
+    processTest("Signal notification test...", signal_notification_test())
+  echo("All tests passed!")
+else:
+  import nativesockets, winlean, os, osproc
+
+  proc socket_notification_test(): bool =
+    proc create_test_socket(): SocketHandle =
+      var sock = newNativeSocket()
+      setBlocking(sock, false)
+      result = sock
+
+    var client_message = "SERVER HELLO =>"
+    var server_message = "CLIENT HELLO"
+    var buffer : array[128, char]
+
+    var selector = newSelector[int]()
+    var client_socket = create_test_socket()
+    var server_socket = create_test_socket()
+
+    selector.registerHandle(server_socket, {Event.Read}, 0)
+    selector.registerHandle(client_socket, {Event.Write}, 0)
+
+    var option : int32 = 1
+    if setsockopt(server_socket, cint(SOL_SOCKET), cint(SO_REUSEADDR),
+                  addr(option), sizeof(option).SockLen) < 0:
+      raiseOSError(osLastError())
+
+    var aiList = getAddrInfo("0.0.0.0", Port(13337))
+    if bindAddr(server_socket, aiList.ai_addr,
+                aiList.ai_addrlen.Socklen) < 0'i32:
+      dealloc(aiList)
+      raiseOSError(osLastError())
+    discard server_socket.listen()
+    dealloc(aiList)
+
+    aiList = getAddrInfo("127.0.0.1", Port(13337))
+    discard connect(client_socket, aiList.ai_addr,
+                    aiList.ai_addrlen.Socklen)
+    dealloc(aiList)
+    # for some reason Windows select doesn't return both
+    # descriptors from first call, so we need to make 2 calls
+    discard selector.select(100)
+    var rcm = selector.select(100)
+    assert(len(rcm) == 2)
+
+    var sockAddress = SockAddr()
+    var addrLen = sizeof(sockAddress).Socklen
+    var server2_socket = accept(server_socket,
+                                cast[ptr SockAddr](addr(sockAddress)),
+                                addr(addrLen))
+    assert(server2_socket != osInvalidSocket)
+    selector.registerHandle(server2_socket, {Event.Read}, 0)
+
+    if send(client_socket, cast[pointer](addr(client_message[0])),
+            cint(len(client_message)), 0) == -1:
+      raiseOSError(osLastError())
+
+    selector.updateHandle(client_socket, {Event.Read})
+
+    var rc2 = selector.select(100)
+    assert(len(rc2) == 1)
+
+    var read_count = recv(server2_socket, addr buffer[0], 128, 0)
+    if read_count == -1:
+      raiseOSError(osLastError())
+
+    assert(read_count == len(client_message))
+    var test1 = true
+    for i in 0..<read_count:
+      if client_message[i] != buffer[i]:
+        test1 = false
+        break
+    assert(test1)
+
+    if send(server2_socket, cast[pointer](addr(server_message[0])),
+                  cint(len(server_message)), 0) == -1:
+      raiseOSError(osLastError())
+
+    var rc3 = selector.select(0)
+    assert(len(rc3) == 1)
+    read_count = recv(client_socket, addr(buffer[0]), 128, 0)
+    if read_count == -1:
+      raiseOSError(osLastError())
+
+    assert(read_count == len(server_message))
+    var test2 = true
+    for i in 0..<read_count:
+      if server_message[i] != buffer[i]:
+        test2 = false
+        break
+    assert(test2)
+
+    selector.unregister(server_socket)
+    selector.unregister(server2_socket)
+    selector.unregister(client_socket)
+    close(server_socket)
+    close(server2_socket)
+    close(client_socket)
+    assert(selector.isEmpty())
+    close(selector)
+    result = true
+
+  proc event_notification_test(): bool =
+    var selector = newSelector[int]()
+    var event = newSelectEvent()
+    selector.registerEvent(event, 1)
+    selector.flush()
+    event.setEvent()
+    var rc1 = selector.select(0)
+    event.setEvent()
+    var rc2 = selector.select(0)
+    var rc3 = selector.select(0)
+    assert(len(rc1) == 1 and len(rc2) == 1 and len(rc3) == 0)
+    var ev1 = rc1[0].data
+    var ev2 = rc2[0].data
+    assert(ev1 == 1 and ev2 == 1)
+    selector.unregister(event)
+    event.close()
+    assert(selector.isEmpty())
+    selector.close()
+    result = true
+
+  when hasThreadSupport:
+    var counter = 0
+
+    proc event_wait_thread(event: SelectEvent) {.thread.} =
+      var selector = newSelector[int]()
+      selector.registerEvent(event, 1)
+      selector.flush()
+      var rc = selector.select(500)
+      if len(rc) == 1:
+        inc(counter)
+      selector.unregister(event)
+      assert(selector.isEmpty())
+
+    proc mt_event_test(): bool =
+      var thr: array[0..7, Thread[SelectEvent]]
+      var event = newSelectEvent()
+      for i in 0..high(thr):
+        createThread(thr[i], event_wait_thread, event)
+      event.setEvent()
+      joinThreads(thr)
+      assert(counter == 1)
+      result = true
+
+  processTest("Socket notification test...", socket_notification_test())
+  processTest("User event notification test...", event_notification_test())
+  when hasThreadSupport:
+    processTest("Multithreaded user event notification test...",
+                 mt_event_test())
+  echo("All tests passed!")
diff --git a/tests/async/tioselectors.nim.cfg b/tests/async/tioselectors.nim.cfg
new file mode 100644
index 000000000..b1b896858
--- /dev/null
+++ b/tests/async/tioselectors.nim.cfg
@@ -0,0 +1 @@
+threads:on -d:threadsafe
diff --git a/tests/async/tnewasyncudp.nim b/tests/async/tnewasyncudp.nim
new file mode 100644
index 000000000..7025fa20d
--- /dev/null
+++ b/tests/async/tnewasyncudp.nim
@@ -0,0 +1,102 @@
+discard """
+  file: "tnewasyncudp.nim"
+  output: "5000"
+"""
+import asyncdispatch, nativesockets, net, strutils, os
+
+when defined(windows):
+  import winlean
+else:
+  import posix
+
+var msgCount = 0
+var recvCount = 0
+
+const
+  messagesToSend = 100
+  swarmSize = 50
+  serverPort = 10333
+
+var
+  sendports = 0
+  recvports = 0
+
+proc saveSendingPort(port: int) =
+  sendports = sendports + port
+
+proc saveReceivedPort(port: int) =
+  recvports = recvports + port
+
+proc prepareAddress(intaddr: uint32, intport: uint16): ptr Sockaddr_in =
+  result = cast[ptr Sockaddr_in](alloc0(sizeof(Sockaddr_in)))
+  when defined(windows):
+    result.sin_family = toInt(nativesockets.AF_INET).int16
+  else:
+    result.sin_family = toInt(nativesockets.AF_INET)
+  result.sin_port = htons(intport)
+  result.sin_addr.s_addr = htonl(intaddr)
+
+proc launchSwarm(name: ptr SockAddr) {.async.} =
+  var i = 0
+  var k = 0
+  while i < swarmSize:
+    var peeraddr = prepareAddress(INADDR_ANY, 0)
+    var sock = newAsyncNativeSocket(nativesockets.AF_INET,
+                                    nativesockets.SOCK_DGRAM,
+                                    Protocol.IPPROTO_UDP)
+    if bindAddr(sock.SocketHandle, cast[ptr SockAddr](peeraddr),
+              sizeof(Sockaddr_in).Socklen) < 0'i32:
+      raiseOSError(osLastError())
+    let sockport = getSockName(sock.SocketHandle).int
+    k = 0
+    while k < messagesToSend:
+      var message = "Message " & $(i * messagesToSend + k)
+      await sendTo(sock, addr message[0], len(message),
+                   name, sizeof(Sockaddr_in).SockLen)
+      saveSendingPort(sockport)
+      inc(k)
+    closeSocket(sock)
+    inc(i)
+
+proc readMessages(server: AsyncFD) {.async.} =
+  var buffer: array[16384, char]
+  var slen = sizeof(Sockaddr_in).SockLen
+  var saddr = Sockaddr_in()
+  var maxResponses = (swarmSize * messagesToSend)
+
+  var i = 0
+  while i < maxResponses:
+    zeroMem(addr(buffer[0]), 16384)
+    zeroMem(cast[pointer](addr(saddr)), sizeof(Sockaddr_in))
+    var size = await recvFromInto(server, cast[cstring](addr buffer[0]),
+                                  16384, cast[ptr SockAddr](addr(saddr)),
+                                  addr(slen))
+    size = 0
+    var grammString = $buffer
+    if grammString.startswith("Message ") and
+       saddr.sin_addr.s_addr == 0x100007F:
+      inc(msgCount)
+      saveReceivedPort(ntohs(saddr.sin_port).int)
+      inc(recvCount)
+    inc(i)
+
+proc createServer() {.async.} =
+  var name = prepareAddress(INADDR_ANY, serverPort)
+  var server = newAsyncNativeSocket(nativesockets.AF_INET,
+                                    nativesockets.SOCK_DGRAM,
+                                    Protocol.IPPROTO_UDP)
+  if bindAddr(server.SocketHandle, cast[ptr SockAddr](name),
+              sizeof(Sockaddr_in).Socklen) < 0'i32:
+    raiseOSError(osLastError())
+  asyncCheck readMessages(server)
+
+var name = prepareAddress(0x7F000001, serverPort) # 127.0.0.1
+asyncCheck createServer()
+asyncCheck launchSwarm(cast[ptr SockAddr](name))
+while true:
+  poll()
+  if recvCount == swarmSize * messagesToSend:
+    break
+assert msgCount == swarmSize * messagesToSend
+assert sendports == recvports
+echo msgCount
diff --git a/tests/async/treturn_await.nim b/tests/async/treturn_await.nim
new file mode 100644
index 000000000..8d266d665
--- /dev/null
+++ b/tests/async/treturn_await.nim
@@ -0,0 +1,23 @@
+
+# bug #4371
+
+import strutils, asyncdispatch, asynchttpserver
+
+type
+  List[A] = ref object
+    value: A
+    next: List[A]
+  StrPair* = tuple[k, v: string]
+  Context* = object
+    position*: int
+    accept*: bool
+    headers*: List[StrPair]
+  Handler* = proc(req: ref Request, ctx: Context): Future[Context]
+
+proc logging*(handler: Handler): auto =
+  proc h(req: ref Request, ctx: Context): Future[Context] {.async.} =
+    let ret = handler(req, ctx)
+    debugEcho "$3 $1 $2".format(req.reqMethod, req.url.path, req.hostname)
+    return await ret
+
+  return h
diff --git a/tests/async/twinasyncrw.nim b/tests/async/twinasyncrw.nim
new file mode 100644
index 000000000..17b7d1cf5
--- /dev/null
+++ b/tests/async/twinasyncrw.nim
@@ -0,0 +1,257 @@
+discard """
+  file: "twinasyncrw.nim"
+  output: "5000"
+"""
+when defined(windows):
+  import asyncdispatch, nativesockets, net, strutils, os, winlean
+
+  var msgCount = 0
+
+  const
+    swarmSize = 50
+    messagesToSend = 100
+
+  var clientCount = 0
+
+  proc winConnect*(socket: AsyncFD, address: string, port: Port,
+    domain = Domain.AF_INET): Future[void] =
+    var retFuture = newFuture[void]("winConnect")
+    proc cb(fd: AsyncFD): bool =
+      var ret = SocketHandle(fd).getSockOptInt(cint(SOL_SOCKET), cint(SO_ERROR))
+      if ret == 0:
+          # We have connected.
+          retFuture.complete()
+          return true
+      else:
+          retFuture.fail(newException(OSError, osErrorMsg(OSErrorCode(ret))))
+          return true
+
+    var aiList = getAddrInfo(address, port, domain)
+    var success = false
+    var lastError: OSErrorCode = OSErrorCode(0)
+    var it = aiList
+    while it != nil:
+      var ret = nativesockets.connect(socket.SocketHandle, it.ai_addr, it.ai_addrlen.Socklen)
+      if ret == 0:
+        # Request to connect completed immediately.
+        success = true
+        retFuture.complete()
+        break
+      else:
+        lastError = osLastError()
+        if lastError.int32 == WSAEWOULDBLOCK:
+          success = true
+          addWrite(socket, cb)
+          break
+        else:
+          success = false
+      it = it.ai_next
+
+    dealloc(aiList)
+    if not success:
+      retFuture.fail(newException(OSError, osErrorMsg(lastError)))
+    return retFuture
+
+  proc winRecv*(socket: AsyncFD, size: int,
+             flags = {SocketFlag.SafeDisconn}): Future[string] =
+    var retFuture = newFuture[string]("recv")
+
+    var readBuffer = newString(size)
+
+    proc cb(sock: AsyncFD): bool =
+      result = true
+      let res = recv(sock.SocketHandle, addr readBuffer[0], size.cint,
+                     flags.toOSFlags())
+      if res < 0:
+        let lastError = osLastError()
+        if flags.isDisconnectionError(lastError):
+          retFuture.complete("")
+        else:
+          retFuture.fail(newException(OSError, osErrorMsg(lastError)))
+      elif res == 0:
+        # Disconnected
+        retFuture.complete("")
+      else:
+        readBuffer.setLen(res)
+        retFuture.complete(readBuffer)
+    # TODO: The following causes a massive slowdown.
+    #if not cb(socket):
+    addRead(socket, cb)
+    return retFuture
+
+  proc winRecvInto*(socket: AsyncFD, buf: cstring, size: int,
+                  flags = {SocketFlag.SafeDisconn}): Future[int] =
+    var retFuture = newFuture[int]("winRecvInto")
+
+    proc cb(sock: AsyncFD): bool =
+      result = true
+      let res = nativesockets.recv(sock.SocketHandle, buf, size.cint,
+                                   flags.toOSFlags())
+      if res < 0:
+        let lastError = osLastError()
+        if flags.isDisconnectionError(lastError):
+          retFuture.complete(0)
+        else:
+          retFuture.fail(newException(OSError, osErrorMsg(lastError)))
+      else:
+        retFuture.complete(res)
+    # TODO: The following causes a massive slowdown.
+    #if not cb(socket):
+    addRead(socket, cb)
+    return retFuture
+
+  proc winSend*(socket: AsyncFD, data: string,
+             flags = {SocketFlag.SafeDisconn}): Future[void] =
+    var retFuture = newFuture[void]("winSend")
+
+    var written = 0
+
+    proc cb(sock: AsyncFD): bool =
+      result = true
+      let netSize = data.len-written
+      var d = data.cstring
+      let res = nativesockets.send(sock.SocketHandle, addr d[written], netSize.cint, 0)
+      if res < 0:
+        let lastError = osLastError()
+        if flags.isDisconnectionError(lastError):
+          retFuture.complete()
+        else:
+          retFuture.fail(newException(OSError, osErrorMsg(lastError)))
+      else:
+        written.inc(res)
+        if res != netSize:
+          result = false # We still have data to send.
+        else:
+          retFuture.complete()
+    # TODO: The following causes crashes.
+    #if not cb(socket):
+    addWrite(socket, cb)
+    return retFuture
+
+  proc winAcceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}):
+      Future[tuple[address: string, client: AsyncFD]] =
+    var retFuture = newFuture[tuple[address: string,
+        client: AsyncFD]]("winAcceptAddr")
+    proc cb(sock: AsyncFD): bool =
+      result = true
+      if not retFuture.finished:
+        var sockAddress = Sockaddr()
+        var addrLen = sizeof(sockAddress).Socklen
+        var client = nativesockets.accept(sock.SocketHandle,
+                                          cast[ptr SockAddr](addr(sockAddress)), addr(addrLen))
+        if client == osInvalidSocket:
+          retFuture.fail(newException(OSError, osErrorMsg(osLastError())))
+        else:
+          retFuture.complete((getAddrString(cast[ptr SockAddr](addr sockAddress)), client.AsyncFD))
+
+    addRead(socket, cb)
+    return retFuture
+
+  proc winAccept*(socket: AsyncFD,
+      flags = {SocketFlag.SafeDisconn}): Future[AsyncFD] =
+    ## Accepts a new connection. Returns a future containing the client socket
+    ## corresponding to that connection.
+    ## The future will complete when the connection is successfully accepted.
+    var retFut = newFuture[AsyncFD]("winAccept")
+    var fut = winAcceptAddr(socket, flags)
+    fut.callback =
+      proc (future: Future[tuple[address: string, client: AsyncFD]]) =
+        assert future.finished
+        if future.failed:
+          retFut.fail(future.error)
+        else:
+          retFut.complete(future.read.client)
+    return retFut
+
+
+  proc winRecvLine*(socket: AsyncFD): Future[string] {.async.} =
+    ## Reads a line of data from ``socket``. Returned future will complete once
+    ## a full line is read or an error occurs.
+    ##
+    ## If a full line is read ``\r\L`` is not
+    ## added to ``line``, however if solely ``\r\L`` is read then ``line``
+    ## will be set to it.
+    ##
+    ## If the socket is disconnected, ``line`` will be set to ``""``.
+    ##
+    ## If the socket is disconnected in the middle of a line (before ``\r\L``
+    ## is read) then line will be set to ``""``.
+    ## The partial line **will be lost**.
+    ##
+    ## **Warning**: This assumes that lines are delimited by ``\r\L``.
+    ##
+    ## **Note**: This procedure is mostly used for testing. You likely want to
+    ## use ``asyncnet.recvLine`` instead.
+
+    template addNLIfEmpty(): stmt =
+      if result.len == 0:
+        result.add("\c\L")
+
+    result = ""
+    var c = ""
+    while true:
+      c = await winRecv(socket, 1)
+      if c.len == 0:
+        return ""
+      if c == "\r":
+        c = await winRecv(socket, 1)
+        assert c == "\l"
+        addNLIfEmpty()
+        return
+      elif c == "\L":
+        addNLIfEmpty()
+        return
+      add(result, c)
+
+  proc sendMessages(client: AsyncFD) {.async.} =
+    for i in 0 .. <messagesToSend:
+      await winSend(client, "Message " & $i & "\c\L")
+
+  proc launchSwarm(port: Port) {.async.} =
+    for i in 0 .. <swarmSize:
+      var sock = newNativeSocket()
+      setBlocking(sock, false)
+
+      await winConnect(AsyncFD(sock), "localhost", port)
+      await sendMessages(AsyncFD(sock))
+      discard closeSocket(sock)
+
+  proc readMessages(client: AsyncFD) {.async.} =
+    while true:
+      var line = await winRecvLine(client)
+      if line == "":
+        closeSocket(client)
+        clientCount.inc
+        break
+      else:
+        if line.startswith("Message "):
+          msgCount.inc
+        else:
+          doAssert false
+
+  proc createServer(port: Port) {.async.} =
+    var server = newNativeSocket()
+    setBlocking(server, false)
+    block:
+      var name = Sockaddr_in()
+      name.sin_family = toInt(Domain.AF_INET).int16
+      name.sin_port = htons(uint16(port))
+      name.sin_addr.s_addr = htonl(INADDR_ANY)
+      if bindAddr(server, cast[ptr SockAddr](addr(name)),
+                  sizeof(name).Socklen) < 0'i32:
+        raiseOSError(osLastError())
+
+    discard server.listen()
+    while true:
+      asyncCheck readMessages(await winAccept(AsyncFD(server)))
+
+  asyncCheck createServer(Port(10335))
+  asyncCheck launchSwarm(Port(10335))
+  while true:
+    poll()
+    if clientCount == swarmSize: break
+
+  assert msgCount == swarmSize * messagesToSend
+  echo msgCount
+else:
+  echo(5000)