diff options
Diffstat (limited to 'tests/async')
-rw-r--r-- | tests/async/tasync_forward.nim | 9 | ||||
-rw-r--r-- | tests/async/tasyncall.nim | 68 | ||||
-rw-r--r-- | tests/async/tasyncdiscard.nim | 2 | ||||
-rw-r--r-- | tests/async/tasynceverror.nim | 6 | ||||
-rw-r--r-- | tests/async/tasyncexceptions.nim | 2 | ||||
-rw-r--r-- | tests/async/tasyncfile.nim | 2 | ||||
-rw-r--r-- | tests/async/tasynctry.nim | 10 | ||||
-rw-r--r-- | tests/async/tawaitsemantics.nim | 59 | ||||
-rw-r--r-- | tests/async/tgeneric_async.nim | 9 | ||||
-rw-r--r-- | tests/async/tioselectors.nim | 408 | ||||
-rw-r--r-- | tests/async/tioselectors.nim.cfg | 1 | ||||
-rw-r--r-- | tests/async/tnewasyncudp.nim | 102 | ||||
-rw-r--r-- | tests/async/treturn_await.nim | 23 | ||||
-rw-r--r-- | tests/async/twinasyncrw.nim | 257 |
14 files changed, 948 insertions, 10 deletions
diff --git a/tests/async/tasync_forward.nim b/tests/async/tasync_forward.nim new file mode 100644 index 000000000..ffb7acafd --- /dev/null +++ b/tests/async/tasync_forward.nim @@ -0,0 +1,9 @@ + +import asyncdispatch + +# bug #1970 + +proc foo {.async.} + +proc foo {.async.} = + discard diff --git a/tests/async/tasyncall.nim b/tests/async/tasyncall.nim new file mode 100644 index 000000000..60ba557cc --- /dev/null +++ b/tests/async/tasyncall.nim @@ -0,0 +1,68 @@ +discard """ + file: "tasyncall.nim" + exitcode: 0 +""" +import times, sequtils +import asyncdispatch + +const + taskCount = 10 + sleepDuration = 500 + +proc futureWithValue(x: int): Future[int] {.async.} = + await sleepAsync(sleepDuration) + return x + +proc futureWithoutValue() {.async.} = + await sleepAsync(1000) + +proc testFuturesWithValue(x: int): seq[int] = + var tasks = newSeq[Future[int]](taskCount) + + for i in 0..<taskCount: + tasks[i] = futureWithValue(x) + + result = waitFor all(tasks) + +proc testFuturesWithoutValues() = + var tasks = newSeq[Future[void]](taskCount) + + for i in 0..<taskCount: + tasks[i] = futureWithoutValue() + + waitFor all(tasks) + +proc testVarargs(x, y, z: int): seq[int] = + let + a = futureWithValue(x) + b = futureWithValue(y) + c = futureWithValue(z) + + result = waitFor all(a, b, c) + +block: + let + startTime = cpuTime() + results = testFuturesWithValue(42) + expected = repeat(42, taskCount) + execTime = cpuTime() - startTime + + doAssert execTime * 1000 < taskCount * sleepDuration + doAssert results == expected + +block: + let startTime = cpuTime() + testFuturesWithoutValues() + let execTime = cpuTime() - startTime + + doAssert execTime * 1000 < taskCount * sleepDuration + +block: + let + startTime = cpuTime() + results = testVarargs(1, 2, 3) + expected = @[1, 2, 3] + execTime = cpuTime() - startTime + + doAssert execTime * 100 < taskCount * sleepDuration + doAssert results == expected diff --git a/tests/async/tasyncdiscard.nim b/tests/async/tasyncdiscard.nim index 71aba29e2..e7c87ad42 100644 --- a/tests/async/tasyncdiscard.nim +++ b/tests/async/tasyncdiscard.nim @@ -36,4 +36,4 @@ proc main {.async.} = discard await g() echo 6 -asyncCheck main() +waitFor(main()) diff --git a/tests/async/tasynceverror.nim b/tests/async/tasynceverror.nim index 22b4fe9a7..dd05c831b 100644 --- a/tests/async/tasynceverror.nim +++ b/tests/async/tasynceverror.nim @@ -1,9 +1,9 @@ discard """ file: "tasynceverror.nim" exitcode: 1 - outputsub: "Error: unhandled exception: Connection reset by peer" + outputsub: "Error: unhandled exception: " """ - +# error message is actually different on OSX import asyncdispatch, asyncnet, @@ -43,7 +43,7 @@ else: await s.connect(testHost, testPort) var ps = await ls.accept() - SocketHandle(ls).close() + closeSocket(ls) await ps.send("test 1", flags={}) s.close() diff --git a/tests/async/tasyncexceptions.nim b/tests/async/tasyncexceptions.nim index aab08e30f..efe31ef27 100644 --- a/tests/async/tasyncexceptions.nim +++ b/tests/async/tasyncexceptions.nim @@ -5,6 +5,8 @@ discard """ """ import asyncdispatch +# Note: This is a test case for a bug. + proc accept(): Future[int] {.async.} = await sleepAsync(100) result = 4 diff --git a/tests/async/tasyncfile.nim b/tests/async/tasyncfile.nim index 05cda5e5f..26a9bb391 100644 --- a/tests/async/tasyncfile.nim +++ b/tests/async/tasyncfile.nim @@ -24,7 +24,7 @@ proc main() {.async.} = var file = openAsync(fn, fmAppend) await file.write("\ntest2") let errorTest = file.readAll() - await errorTest + echo await errorTest doAssert errorTest.failed file.close() file = openAsync(fn, fmRead) diff --git a/tests/async/tasynctry.nim b/tests/async/tasynctry.nim index f77198e2e..5930f296f 100644 --- a/tests/async/tasynctry.nim +++ b/tests/async/tasynctry.nim @@ -48,7 +48,7 @@ proc catch() {.async.} = except OSError, EInvalidField: assert false -asyncCheck catch() +waitFor catch() proc test(): Future[bool] {.async.} = result = false @@ -92,13 +92,13 @@ proc test4(): Future[int] {.async.} = result = 2 var x = test() -assert x.read +assert x.waitFor() x = test2() -assert x.read +assert x.waitFor() var y = test3() -assert y.read == 2 +assert y.waitFor() == 2 y = test4() -assert y.read == 2 +assert y.waitFor() == 2 diff --git a/tests/async/tawaitsemantics.nim b/tests/async/tawaitsemantics.nim new file mode 100644 index 000000000..3e0c3903e --- /dev/null +++ b/tests/async/tawaitsemantics.nim @@ -0,0 +1,59 @@ +discard """ + file: "tawaitsemantics.nim" + exitcode: 0 + output: ''' +Error caught +Test infix +Test call +''' +""" + +import asyncdispatch + +# This tests the behaviour of 'await' under different circumstances. +# For example, when awaiting Future variable and this future has failed the +# exception shouldn't be raised as described here +# https://github.com/nim-lang/Nim/issues/4170 + +proc thrower(): Future[void] = + result = newFuture[void]() + result.fail(newException(Exception, "Test")) + +proc dummy: Future[void] = + result = newFuture[void]() + result.complete() + +proc testInfix() {.async.} = + # Test the infix operator semantics. + var fut = thrower() + var fut2 = dummy() + await fut or fut2 # Shouldn't raise. + # TODO: what about: await thrower() or fut2? + +proc testCall() {.async.} = + await thrower() + +proc tester() {.async.} = + # Test that we can handle exceptions without 'try' + var fut = thrower() + doAssert fut.finished + doAssert fut.failed + doAssert fut.error.msg == "Test" + await fut # We are awaiting a 'Future', so no `read` occurs. + doAssert fut.finished + doAssert fut.failed + doAssert fut.error.msg == "Test" + echo("Error caught") + + fut = testInfix() + await fut + doAssert fut.finished + doAssert(not fut.failed) + echo("Test infix") + + fut = testCall() + await fut + doAssert fut.failed + echo("Test call") + +waitFor(tester()) diff --git a/tests/async/tgeneric_async.nim b/tests/async/tgeneric_async.nim new file mode 100644 index 000000000..af6370181 --- /dev/null +++ b/tests/async/tgeneric_async.nim @@ -0,0 +1,9 @@ + +import asyncdispatch + +when true: + # bug #2377 + proc test[T](v: T) {.async.} = + echo $v + + asyncCheck test[int](1) diff --git a/tests/async/tioselectors.nim b/tests/async/tioselectors.nim new file mode 100644 index 000000000..2237de01a --- /dev/null +++ b/tests/async/tioselectors.nim @@ -0,0 +1,408 @@ +discard """ + file: "tioselectors.nim" + output: "All tests passed!" +""" +import ioselectors + +const hasThreadSupport = compileOption("threads") + +template processTest(t, x: untyped) = + #stdout.write(t) + #stdout.flushFile() + if not x: echo(t & " FAILED\r\n") + +when not defined(windows): + import os, posix, osproc, nativesockets, times + + const supportedPlatform = defined(macosx) or defined(freebsd) or + defined(netbsd) or defined(openbsd) or + defined(linux) + + proc socket_notification_test(): bool = + proc create_test_socket(): SocketHandle = + var sock = posix.socket(posix.AF_INET, posix.SOCK_STREAM, + posix.IPPROTO_TCP) + var x: int = fcntl(sock, F_GETFL, 0) + if x == -1: raiseOSError(osLastError()) + else: + var mode = x or O_NONBLOCK + if fcntl(sock, F_SETFL, mode) == -1: + raiseOSError(osLastError()) + result = sock + + var client_message = "SERVER HELLO =>" + var server_message = "CLIENT HELLO" + var buffer : array[128, char] + + var selector = newSelector[int]() + var client_socket = create_test_socket() + var server_socket = create_test_socket() + + registerHandle(selector, server_socket, {Event.Read}, 0) + registerHandle(selector, client_socket, {Event.Write}, 0) + + var option : int32 = 1 + if setsockopt(server_socket, cint(SOL_SOCKET), cint(SO_REUSEADDR), + addr(option), sizeof(option).SockLen) < 0: + raiseOSError(osLastError()) + + var aiList = getAddrInfo("0.0.0.0", Port(13337)) + if bindAddr(server_socket, aiList.ai_addr, + aiList.ai_addrlen.Socklen) < 0'i32: + dealloc(aiList) + raiseOSError(osLastError()) + discard server_socket.listen() + dealloc(aiList) + + aiList = getAddrInfo("127.0.0.1", Port(13337)) + discard posix.connect(client_socket, aiList.ai_addr, + aiList.ai_addrlen.Socklen) + dealloc(aiList) + discard selector.select(100) + var rc1 = selector.select(100) + assert(len(rc1) == 2) + + var sockAddress: SockAddr + var addrLen = sizeof(sockAddress).Socklen + var server2_socket = accept(server_socket, + cast[ptr SockAddr](addr(sockAddress)), + addr(addrLen)) + assert(server2_socket != osInvalidSocket) + selector.registerHandle(server2_socket, {Event.Read}, 0) + + if posix.send(client_socket, addr(client_message[0]), + len(client_message), 0) == -1: + raiseOSError(osLastError()) + + selector.updateHandle(client_socket, {Event.Read}) + + var rc2 = selector.select(100) + assert(len(rc2) == 1) + + var read_count = posix.recv(server2_socket, addr buffer[0], 128, 0) + if read_count == -1: + raiseOSError(osLastError()) + + assert(read_count == len(client_message)) + var test1 = true + for i in 0..<read_count: + if client_message[i] != buffer[i]: + test1 = false + break + assert(test1) + + selector.updateHandle(server2_socket, {Event.Write}) + var rc3 = selector.select(0) + assert(len(rc3) == 1) + if posix.send(server2_socket, addr(server_message[0]), + len(server_message), 0) == -1: + raiseOSError(osLastError()) + selector.updateHandle(server2_socket, {Event.Read}) + + var rc4 = selector.select(100) + assert(len(rc4) == 1) + read_count = posix.recv(client_socket, addr(buffer[0]), 128, 0) + if read_count == -1: + raiseOSError(osLastError()) + + assert(read_count == len(server_message)) + var test2 = true + for i in 0..<read_count: + if server_message[i] != buffer[i]: + test2 = false + break + assert(test2) + + selector.unregister(server_socket) + selector.unregister(server2_socket) + selector.unregister(client_socket) + discard posix.close(server_socket) + discard posix.close(server2_socket) + discard posix.close(client_socket) + assert(selector.isEmpty()) + close(selector) + result = true + + proc event_notification_test(): bool = + var selector = newSelector[int]() + var event = newSelectEvent() + selector.registerEvent(event, 1) + selector.flush() + event.setEvent() + var rc1 = selector.select(0) + event.setEvent() + var rc2 = selector.select(0) + var rc3 = selector.select(0) + assert(len(rc1) == 1 and len(rc2) == 1 and len(rc3) == 0) + var ev1 = rc1[0].data + var ev2 = rc2[0].data + assert(ev1 == 1 and ev2 == 1) + selector.unregister(event) + event.close() + assert(selector.isEmpty()) + selector.close() + result = true + + when supportedPlatform: + proc timer_notification_test(): bool = + var selector = newSelector[int]() + var timer = selector.registerTimer(100, false, 0) + var rc1 = selector.select(140) + var rc2 = selector.select(140) + assert(len(rc1) == 1 and len(rc2) == 1) + selector.unregister(timer) + selector.flush() + selector.registerTimer(100, true, 0) + var rc3 = selector.select(120) + var rc4 = selector.select(120) + assert(len(rc3) == 1 and len(rc4) == 0) + assert(selector.isEmpty()) + selector.close() + result = true + + proc process_notification_test(): bool = + var selector = newSelector[int]() + var process2 = startProcess("/bin/sleep", "", ["2"], nil, + {poStdErrToStdOut, poUsePath}) + discard startProcess("/bin/sleep", "", ["1"], nil, + {poStdErrToStdOut, poUsePath}) + + selector.registerProcess(process2.processID, 0) + var rc1 = selector.select(1500) + var rc2 = selector.select(1500) + var r = len(rc1) + len(rc2) + assert(r == 1) + result = true + + proc signal_notification_test(): bool = + var sigset1n, sigset1o, sigset2n, sigset2o: Sigset + var pid = posix.getpid() + + discard sigemptyset(sigset1n) + discard sigemptyset(sigset1o) + discard sigemptyset(sigset2n) + discard sigemptyset(sigset2o) + + when hasThreadSupport: + if pthread_sigmask(SIG_BLOCK, sigset1n, sigset1o) == -1: + raiseOSError(osLastError()) + else: + if sigprocmask(SIG_BLOCK, sigset1n, sigset1o) == -1: + raiseOSError(osLastError()) + + var selector = newSelector[int]() + var s1 = selector.registerSignal(SIGUSR1, 1) + var s2 = selector.registerSignal(SIGUSR2, 2) + var s3 = selector.registerSignal(SIGTERM, 3) + selector.flush() + + discard posix.kill(pid, SIGUSR1) + discard posix.kill(pid, SIGUSR2) + discard posix.kill(pid, SIGTERM) + var rc = selector.select(0) + selector.unregister(s1) + selector.unregister(s2) + selector.unregister(s3) + + when hasThreadSupport: + if pthread_sigmask(SIG_BLOCK, sigset2n, sigset2o) == -1: + raiseOSError(osLastError()) + else: + if sigprocmask(SIG_BLOCK, sigset2n, sigset2o) == -1: + raiseOSError(osLastError()) + + assert(len(rc) == 3) + assert(rc[0].data + rc[1].data + rc[2].data == 6) # 1 + 2 + 3 + assert(equalMem(addr sigset1o, addr sigset2o, sizeof(Sigset))) + assert(selector.isEmpty()) + result = true + + when hasThreadSupport: + + var counter = 0 + + proc event_wait_thread(event: SelectEvent) {.thread.} = + var selector = newSelector[int]() + selector.registerEvent(event, 1) + selector.flush() + var rc = selector.select(1000) + if len(rc) == 1: + inc(counter) + selector.unregister(event) + assert(selector.isEmpty()) + + proc mt_event_test(): bool = + var + thr: array[0..7, Thread[SelectEvent]] + var selector = newSelector[int]() + var sock = newNativeSocket() + var event = newSelectEvent() + for i in 0..high(thr): + createThread(thr[i], event_wait_thread, event) + selector.registerHandle(sock, {Event.Read}, 1) + discard selector.select(500) + selector.unregister(sock) + event.setEvent() + joinThreads(thr) + assert(counter == 1) + result = true + + processTest("Socket notification test...", socket_notification_test()) + processTest("User event notification test...", event_notification_test()) + when hasThreadSupport: + processTest("Multithreaded user event notification test...", + mt_event_test()) + when supportedPlatform: + processTest("Timer notification test...", timer_notification_test()) + processTest("Process notification test...", process_notification_test()) + processTest("Signal notification test...", signal_notification_test()) + echo("All tests passed!") +else: + import nativesockets, winlean, os, osproc + + proc socket_notification_test(): bool = + proc create_test_socket(): SocketHandle = + var sock = newNativeSocket() + setBlocking(sock, false) + result = sock + + var client_message = "SERVER HELLO =>" + var server_message = "CLIENT HELLO" + var buffer : array[128, char] + + var selector = newSelector[int]() + var client_socket = create_test_socket() + var server_socket = create_test_socket() + + selector.registerHandle(server_socket, {Event.Read}, 0) + selector.registerHandle(client_socket, {Event.Write}, 0) + + var option : int32 = 1 + if setsockopt(server_socket, cint(SOL_SOCKET), cint(SO_REUSEADDR), + addr(option), sizeof(option).SockLen) < 0: + raiseOSError(osLastError()) + + var aiList = getAddrInfo("0.0.0.0", Port(13337)) + if bindAddr(server_socket, aiList.ai_addr, + aiList.ai_addrlen.Socklen) < 0'i32: + dealloc(aiList) + raiseOSError(osLastError()) + discard server_socket.listen() + dealloc(aiList) + + aiList = getAddrInfo("127.0.0.1", Port(13337)) + discard connect(client_socket, aiList.ai_addr, + aiList.ai_addrlen.Socklen) + dealloc(aiList) + # for some reason Windows select doesn't return both + # descriptors from first call, so we need to make 2 calls + discard selector.select(100) + var rcm = selector.select(100) + assert(len(rcm) == 2) + + var sockAddress = SockAddr() + var addrLen = sizeof(sockAddress).Socklen + var server2_socket = accept(server_socket, + cast[ptr SockAddr](addr(sockAddress)), + addr(addrLen)) + assert(server2_socket != osInvalidSocket) + selector.registerHandle(server2_socket, {Event.Read}, 0) + + if send(client_socket, cast[pointer](addr(client_message[0])), + cint(len(client_message)), 0) == -1: + raiseOSError(osLastError()) + + selector.updateHandle(client_socket, {Event.Read}) + + var rc2 = selector.select(100) + assert(len(rc2) == 1) + + var read_count = recv(server2_socket, addr buffer[0], 128, 0) + if read_count == -1: + raiseOSError(osLastError()) + + assert(read_count == len(client_message)) + var test1 = true + for i in 0..<read_count: + if client_message[i] != buffer[i]: + test1 = false + break + assert(test1) + + if send(server2_socket, cast[pointer](addr(server_message[0])), + cint(len(server_message)), 0) == -1: + raiseOSError(osLastError()) + + var rc3 = selector.select(0) + assert(len(rc3) == 1) + read_count = recv(client_socket, addr(buffer[0]), 128, 0) + if read_count == -1: + raiseOSError(osLastError()) + + assert(read_count == len(server_message)) + var test2 = true + for i in 0..<read_count: + if server_message[i] != buffer[i]: + test2 = false + break + assert(test2) + + selector.unregister(server_socket) + selector.unregister(server2_socket) + selector.unregister(client_socket) + close(server_socket) + close(server2_socket) + close(client_socket) + assert(selector.isEmpty()) + close(selector) + result = true + + proc event_notification_test(): bool = + var selector = newSelector[int]() + var event = newSelectEvent() + selector.registerEvent(event, 1) + selector.flush() + event.setEvent() + var rc1 = selector.select(0) + event.setEvent() + var rc2 = selector.select(0) + var rc3 = selector.select(0) + assert(len(rc1) == 1 and len(rc2) == 1 and len(rc3) == 0) + var ev1 = rc1[0].data + var ev2 = rc2[0].data + assert(ev1 == 1 and ev2 == 1) + selector.unregister(event) + event.close() + assert(selector.isEmpty()) + selector.close() + result = true + + when hasThreadSupport: + var counter = 0 + + proc event_wait_thread(event: SelectEvent) {.thread.} = + var selector = newSelector[int]() + selector.registerEvent(event, 1) + selector.flush() + var rc = selector.select(500) + if len(rc) == 1: + inc(counter) + selector.unregister(event) + assert(selector.isEmpty()) + + proc mt_event_test(): bool = + var thr: array[0..7, Thread[SelectEvent]] + var event = newSelectEvent() + for i in 0..high(thr): + createThread(thr[i], event_wait_thread, event) + event.setEvent() + joinThreads(thr) + assert(counter == 1) + result = true + + processTest("Socket notification test...", socket_notification_test()) + processTest("User event notification test...", event_notification_test()) + when hasThreadSupport: + processTest("Multithreaded user event notification test...", + mt_event_test()) + echo("All tests passed!") diff --git a/tests/async/tioselectors.nim.cfg b/tests/async/tioselectors.nim.cfg new file mode 100644 index 000000000..b1b896858 --- /dev/null +++ b/tests/async/tioselectors.nim.cfg @@ -0,0 +1 @@ +threads:on -d:threadsafe diff --git a/tests/async/tnewasyncudp.nim b/tests/async/tnewasyncudp.nim new file mode 100644 index 000000000..7025fa20d --- /dev/null +++ b/tests/async/tnewasyncudp.nim @@ -0,0 +1,102 @@ +discard """ + file: "tnewasyncudp.nim" + output: "5000" +""" +import asyncdispatch, nativesockets, net, strutils, os + +when defined(windows): + import winlean +else: + import posix + +var msgCount = 0 +var recvCount = 0 + +const + messagesToSend = 100 + swarmSize = 50 + serverPort = 10333 + +var + sendports = 0 + recvports = 0 + +proc saveSendingPort(port: int) = + sendports = sendports + port + +proc saveReceivedPort(port: int) = + recvports = recvports + port + +proc prepareAddress(intaddr: uint32, intport: uint16): ptr Sockaddr_in = + result = cast[ptr Sockaddr_in](alloc0(sizeof(Sockaddr_in))) + when defined(windows): + result.sin_family = toInt(nativesockets.AF_INET).int16 + else: + result.sin_family = toInt(nativesockets.AF_INET) + result.sin_port = htons(intport) + result.sin_addr.s_addr = htonl(intaddr) + +proc launchSwarm(name: ptr SockAddr) {.async.} = + var i = 0 + var k = 0 + while i < swarmSize: + var peeraddr = prepareAddress(INADDR_ANY, 0) + var sock = newAsyncNativeSocket(nativesockets.AF_INET, + nativesockets.SOCK_DGRAM, + Protocol.IPPROTO_UDP) + if bindAddr(sock.SocketHandle, cast[ptr SockAddr](peeraddr), + sizeof(Sockaddr_in).Socklen) < 0'i32: + raiseOSError(osLastError()) + let sockport = getSockName(sock.SocketHandle).int + k = 0 + while k < messagesToSend: + var message = "Message " & $(i * messagesToSend + k) + await sendTo(sock, addr message[0], len(message), + name, sizeof(Sockaddr_in).SockLen) + saveSendingPort(sockport) + inc(k) + closeSocket(sock) + inc(i) + +proc readMessages(server: AsyncFD) {.async.} = + var buffer: array[16384, char] + var slen = sizeof(Sockaddr_in).SockLen + var saddr = Sockaddr_in() + var maxResponses = (swarmSize * messagesToSend) + + var i = 0 + while i < maxResponses: + zeroMem(addr(buffer[0]), 16384) + zeroMem(cast[pointer](addr(saddr)), sizeof(Sockaddr_in)) + var size = await recvFromInto(server, cast[cstring](addr buffer[0]), + 16384, cast[ptr SockAddr](addr(saddr)), + addr(slen)) + size = 0 + var grammString = $buffer + if grammString.startswith("Message ") and + saddr.sin_addr.s_addr == 0x100007F: + inc(msgCount) + saveReceivedPort(ntohs(saddr.sin_port).int) + inc(recvCount) + inc(i) + +proc createServer() {.async.} = + var name = prepareAddress(INADDR_ANY, serverPort) + var server = newAsyncNativeSocket(nativesockets.AF_INET, + nativesockets.SOCK_DGRAM, + Protocol.IPPROTO_UDP) + if bindAddr(server.SocketHandle, cast[ptr SockAddr](name), + sizeof(Sockaddr_in).Socklen) < 0'i32: + raiseOSError(osLastError()) + asyncCheck readMessages(server) + +var name = prepareAddress(0x7F000001, serverPort) # 127.0.0.1 +asyncCheck createServer() +asyncCheck launchSwarm(cast[ptr SockAddr](name)) +while true: + poll() + if recvCount == swarmSize * messagesToSend: + break +assert msgCount == swarmSize * messagesToSend +assert sendports == recvports +echo msgCount diff --git a/tests/async/treturn_await.nim b/tests/async/treturn_await.nim new file mode 100644 index 000000000..8d266d665 --- /dev/null +++ b/tests/async/treturn_await.nim @@ -0,0 +1,23 @@ + +# bug #4371 + +import strutils, asyncdispatch, asynchttpserver + +type + List[A] = ref object + value: A + next: List[A] + StrPair* = tuple[k, v: string] + Context* = object + position*: int + accept*: bool + headers*: List[StrPair] + Handler* = proc(req: ref Request, ctx: Context): Future[Context] + +proc logging*(handler: Handler): auto = + proc h(req: ref Request, ctx: Context): Future[Context] {.async.} = + let ret = handler(req, ctx) + debugEcho "$3 $1 $2".format(req.reqMethod, req.url.path, req.hostname) + return await ret + + return h diff --git a/tests/async/twinasyncrw.nim b/tests/async/twinasyncrw.nim new file mode 100644 index 000000000..17b7d1cf5 --- /dev/null +++ b/tests/async/twinasyncrw.nim @@ -0,0 +1,257 @@ +discard """ + file: "twinasyncrw.nim" + output: "5000" +""" +when defined(windows): + import asyncdispatch, nativesockets, net, strutils, os, winlean + + var msgCount = 0 + + const + swarmSize = 50 + messagesToSend = 100 + + var clientCount = 0 + + proc winConnect*(socket: AsyncFD, address: string, port: Port, + domain = Domain.AF_INET): Future[void] = + var retFuture = newFuture[void]("winConnect") + proc cb(fd: AsyncFD): bool = + var ret = SocketHandle(fd).getSockOptInt(cint(SOL_SOCKET), cint(SO_ERROR)) + if ret == 0: + # We have connected. + retFuture.complete() + return true + else: + retFuture.fail(newException(OSError, osErrorMsg(OSErrorCode(ret)))) + return true + + var aiList = getAddrInfo(address, port, domain) + var success = false + var lastError: OSErrorCode = OSErrorCode(0) + var it = aiList + while it != nil: + var ret = nativesockets.connect(socket.SocketHandle, it.ai_addr, it.ai_addrlen.Socklen) + if ret == 0: + # Request to connect completed immediately. + success = true + retFuture.complete() + break + else: + lastError = osLastError() + if lastError.int32 == WSAEWOULDBLOCK: + success = true + addWrite(socket, cb) + break + else: + success = false + it = it.ai_next + + dealloc(aiList) + if not success: + retFuture.fail(newException(OSError, osErrorMsg(lastError))) + return retFuture + + proc winRecv*(socket: AsyncFD, size: int, + flags = {SocketFlag.SafeDisconn}): Future[string] = + var retFuture = newFuture[string]("recv") + + var readBuffer = newString(size) + + proc cb(sock: AsyncFD): bool = + result = true + let res = recv(sock.SocketHandle, addr readBuffer[0], size.cint, + flags.toOSFlags()) + if res < 0: + let lastError = osLastError() + if flags.isDisconnectionError(lastError): + retFuture.complete("") + else: + retFuture.fail(newException(OSError, osErrorMsg(lastError))) + elif res == 0: + # Disconnected + retFuture.complete("") + else: + readBuffer.setLen(res) + retFuture.complete(readBuffer) + # TODO: The following causes a massive slowdown. + #if not cb(socket): + addRead(socket, cb) + return retFuture + + proc winRecvInto*(socket: AsyncFD, buf: cstring, size: int, + flags = {SocketFlag.SafeDisconn}): Future[int] = + var retFuture = newFuture[int]("winRecvInto") + + proc cb(sock: AsyncFD): bool = + result = true + let res = nativesockets.recv(sock.SocketHandle, buf, size.cint, + flags.toOSFlags()) + if res < 0: + let lastError = osLastError() + if flags.isDisconnectionError(lastError): + retFuture.complete(0) + else: + retFuture.fail(newException(OSError, osErrorMsg(lastError))) + else: + retFuture.complete(res) + # TODO: The following causes a massive slowdown. + #if not cb(socket): + addRead(socket, cb) + return retFuture + + proc winSend*(socket: AsyncFD, data: string, + flags = {SocketFlag.SafeDisconn}): Future[void] = + var retFuture = newFuture[void]("winSend") + + var written = 0 + + proc cb(sock: AsyncFD): bool = + result = true + let netSize = data.len-written + var d = data.cstring + let res = nativesockets.send(sock.SocketHandle, addr d[written], netSize.cint, 0) + if res < 0: + let lastError = osLastError() + if flags.isDisconnectionError(lastError): + retFuture.complete() + else: + retFuture.fail(newException(OSError, osErrorMsg(lastError))) + else: + written.inc(res) + if res != netSize: + result = false # We still have data to send. + else: + retFuture.complete() + # TODO: The following causes crashes. + #if not cb(socket): + addWrite(socket, cb) + return retFuture + + proc winAcceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}): + Future[tuple[address: string, client: AsyncFD]] = + var retFuture = newFuture[tuple[address: string, + client: AsyncFD]]("winAcceptAddr") + proc cb(sock: AsyncFD): bool = + result = true + if not retFuture.finished: + var sockAddress = Sockaddr() + var addrLen = sizeof(sockAddress).Socklen + var client = nativesockets.accept(sock.SocketHandle, + cast[ptr SockAddr](addr(sockAddress)), addr(addrLen)) + if client == osInvalidSocket: + retFuture.fail(newException(OSError, osErrorMsg(osLastError()))) + else: + retFuture.complete((getAddrString(cast[ptr SockAddr](addr sockAddress)), client.AsyncFD)) + + addRead(socket, cb) + return retFuture + + proc winAccept*(socket: AsyncFD, + flags = {SocketFlag.SafeDisconn}): Future[AsyncFD] = + ## Accepts a new connection. Returns a future containing the client socket + ## corresponding to that connection. + ## The future will complete when the connection is successfully accepted. + var retFut = newFuture[AsyncFD]("winAccept") + var fut = winAcceptAddr(socket, flags) + fut.callback = + proc (future: Future[tuple[address: string, client: AsyncFD]]) = + assert future.finished + if future.failed: + retFut.fail(future.error) + else: + retFut.complete(future.read.client) + return retFut + + + proc winRecvLine*(socket: AsyncFD): Future[string] {.async.} = + ## Reads a line of data from ``socket``. Returned future will complete once + ## a full line is read or an error occurs. + ## + ## If a full line is read ``\r\L`` is not + ## added to ``line``, however if solely ``\r\L`` is read then ``line`` + ## will be set to it. + ## + ## If the socket is disconnected, ``line`` will be set to ``""``. + ## + ## If the socket is disconnected in the middle of a line (before ``\r\L`` + ## is read) then line will be set to ``""``. + ## The partial line **will be lost**. + ## + ## **Warning**: This assumes that lines are delimited by ``\r\L``. + ## + ## **Note**: This procedure is mostly used for testing. You likely want to + ## use ``asyncnet.recvLine`` instead. + + template addNLIfEmpty(): stmt = + if result.len == 0: + result.add("\c\L") + + result = "" + var c = "" + while true: + c = await winRecv(socket, 1) + if c.len == 0: + return "" + if c == "\r": + c = await winRecv(socket, 1) + assert c == "\l" + addNLIfEmpty() + return + elif c == "\L": + addNLIfEmpty() + return + add(result, c) + + proc sendMessages(client: AsyncFD) {.async.} = + for i in 0 .. <messagesToSend: + await winSend(client, "Message " & $i & "\c\L") + + proc launchSwarm(port: Port) {.async.} = + for i in 0 .. <swarmSize: + var sock = newNativeSocket() + setBlocking(sock, false) + + await winConnect(AsyncFD(sock), "localhost", port) + await sendMessages(AsyncFD(sock)) + discard closeSocket(sock) + + proc readMessages(client: AsyncFD) {.async.} = + while true: + var line = await winRecvLine(client) + if line == "": + closeSocket(client) + clientCount.inc + break + else: + if line.startswith("Message "): + msgCount.inc + else: + doAssert false + + proc createServer(port: Port) {.async.} = + var server = newNativeSocket() + setBlocking(server, false) + block: + var name = Sockaddr_in() + name.sin_family = toInt(Domain.AF_INET).int16 + name.sin_port = htons(uint16(port)) + name.sin_addr.s_addr = htonl(INADDR_ANY) + if bindAddr(server, cast[ptr SockAddr](addr(name)), + sizeof(name).Socklen) < 0'i32: + raiseOSError(osLastError()) + + discard server.listen() + while true: + asyncCheck readMessages(await winAccept(AsyncFD(server))) + + asyncCheck createServer(Port(10335)) + asyncCheck launchSwarm(Port(10335)) + while true: + poll() + if clientCount == swarmSize: break + + assert msgCount == swarmSize * messagesToSend + echo msgCount +else: + echo(5000) |