diff options
author | rockcavera <rockcavera@gmail.com> | 2020-04-26 05:16:10 -0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-04-26 10:16:10 +0200 |
commit | d23446c6baea34438be44e7dad5a467c0a17cb27 (patch) | |
tree | e34572ceaadc376d97c7484fb0ec5867f5397568 | |
parent | 76ffa4fa25ebf8f03a399a343088fa46e4ee13cb (diff) | |
download | Nim-d23446c6baea34438be44e7dad5a467c0a17cb27.tar.gz |
added high level sendTo and recvFrom to std/asyncnet (UDP functionality) (#14109)
* added high level sendTo and recvFrom to std/asyncnet; tests were also added. * add .since annotation, a changelog entry and fixed to standard library style guide. * Improved asserts msgs and added notes for use with UDP sockets
-rw-r--r-- | changelog.md | 2 | ||||
-rw-r--r-- | lib/pure/asyncnet.nim | 127 | ||||
-rw-r--r-- | tests/async/tasyncnetudp.nim | 99 |
3 files changed, 228 insertions, 0 deletions
diff --git a/changelog.md b/changelog.md index 73a6ffe24..80f3c8e72 100644 --- a/changelog.md +++ b/changelog.md @@ -41,6 +41,8 @@ accept an existing string to modify, which avoids memory allocations, similar to `streams.readLine` (#13857). +- Added high-level `asyncnet.sendTo` and `asyncnet.recvFrom`. UDP functionality. + ## Language changes - In newruntime it is now allowed to assign discriminator field without restrictions as long as case object doesn't have custom destructor. Discriminator value doesn't have to be a constant either. If you have custom destructor for case object and you do want to freely assign discriminator fields, it is recommended to refactor object into 2 objects like this: ```nim diff --git a/lib/pure/asyncnet.nim b/lib/pure/asyncnet.nim index 8bdab88b1..ddfab3416 100644 --- a/lib/pure/asyncnet.nim +++ b/lib/pure/asyncnet.nim @@ -780,6 +780,133 @@ proc isClosed*(socket: AsyncSocket): bool = ## Determines whether the socket has been closed. return socket.closed +proc sendTo*(socket: AsyncSocket, data: pointer, dataSize: int, address: string, + port: Port, flags = {SocketFlag.SafeDisconn}): owned(Future[void]) + {.async, since: (1, 3).} = + ## This proc sends ``data`` to the specified ``address``, which may be an IP + ## address or a hostname. If a hostname is specified this function will try + ## each IP of that hostname. The returned future will complete once all data + ## has been sent. + ## + ## If an error occurs an OSError exception will be raised. + ## + ## This proc is normally used with connectionless sockets (UDP sockets). + assert(socket.protocol != IPPROTO_TCP, + "Cannot `sendTo` on a TCP socket. Use `send` instead.") + assert(not socket.closed, "Cannot `sendTo` on a closed socket.") + + let + aiList = getAddrInfo(address, port, socket.domain, socket.sockType, + socket.protocol) + retFuture = newFuture[void]("sendTo") + + var + it = aiList + success = false + lastException: ref Exception + + while it != nil: + let fut = sendTo(socket.fd.AsyncFD, data, dataSize, it.ai_addr, + it.ai_addrlen.SockLen, flags) + + yield fut + + if not fut.failed: + success = true + + break + + lastException = fut.readError() + + it = it.ai_next + + freeaddrinfo(aiList) + + if not success: + if lastException != nil: + retFuture.fail(lastException) + + else: + retFuture.fail(newException( + IOError, "Couldn't resolve address: " & address)) + + else: + retFuture.complete() + +proc sendTo*(socket: AsyncSocket, data, address: string, port: Port): + owned(Future[void]) {.async, since: (1, 3).} = + ## This proc sends ``data`` to the specified ``address``, which may be an IP + ## address or a hostname. If a hostname is specified this function will try + ## each IP of that hostname. The returned future will complete once all data + ## has been sent. + ## + ## If an error occurs an OSError exception will be raised. + ## + ## This proc is normally used with connectionless sockets (UDP sockets). + await sendTo(socket, cstring(data), len(data), address, port) + +proc recvFrom*(socket: AsyncSocket, data: pointer, size: int, + address: FutureVar[string], port: FutureVar[Port], + flags = {SocketFlag.SafeDisconn}): owned(Future[int]) + {.async, since: (1, 3).} = + ## Receives a datagram data from ``socket`` into ``data``, which must be at + ## least of size ``size``. The address and port of datagram's sender will be + ## stored into ``address`` and ``port``, respectively. Returned future will + ## complete once one datagram has been received, and will return size of + ## packet received. + ## + ## If an error occurs an OSError exception will be raised. + ## + ## This proc is normally used with connectionless sockets (UDP sockets). + template awaitRecvFromInto() = + var lAddr = sizeof(sAddr).SockLen + + result = await recvFromInto(AsyncFD(getFd(socket)), data, size, + cast[ptr SockAddr](addr sAddr), addr lAddr) + + address.mget.add(getAddrString(cast[ptr SockAddr](addr sAddr))) + + address.complete() + + assert(socket.protocol != IPPROTO_TCP, + "Cannot `recvFrom` on a TCP socket. Use `recv` or `recvInto` instead.") + assert(not socket.closed, "Cannot `recvFrom` on a closed socket.") + + var readSize: int + + if socket.domain == AF_INET6: + var sAddr: Sockaddr_in6 + + awaitRecvFromInto() + + port.complete(ntohs(sAddr.sin6_port).Port) + + else: + var sAddr: Sockaddr_in + + awaitRecvFromInto() + + port.complete(ntohs(sAddr.sin_port).Port) + +proc recvFrom*(socket: AsyncSocket, data: pointer, size: int): + owned(Future[tuple[size: int, address: string, port: Port]]) + {.async, since: (1, 3).} = + ## Receives a datagram data from ``socket`` into ``data``, which must be at + ## least of size ``size``. Returned future will complete once one datagram has + ## been received and will return tuple with: size of packet received; and + ## address and port of datagram's sender. + ## + ## If an error occurs an OSError exception will be raised. + ## + ## This proc is normally used with connectionless sockets (UDP sockets). + var + fromIp = newFutureVar[string]() + fromPort = newFutureVar[Port]() + + result.size = await recvFrom(socket, data, size, fromIp, fromPort) + result.address = fromIp.mget() + result.port = fromPort.mget() + when not defined(testing) and isMainModule: type TestCases = enum diff --git a/tests/async/tasyncnetudp.nim b/tests/async/tasyncnetudp.nim new file mode 100644 index 000000000..bb0f244e5 --- /dev/null +++ b/tests/async/tasyncnetudp.nim @@ -0,0 +1,99 @@ +# It is a reproduction of the 'tnewasyncudp' test code, but using a high level +# of asynchronous procedures. Output: "5000" +import asyncdispatch, asyncnet, nativesockets, net, strutils + +var msgCount = 0 +var recvCount = 0 + +const + messagesToSend = 100 + swarmSize = 50 + serverPort = 10333 + +var + sendports = 0 + recvports = 0 + +proc saveSendingPort(port: Port) = + sendports = sendports + int(port) + +proc saveReceivedPort(port: Port) = + recvports = recvports + int(port) + +proc launchSwarm(serverIp: string, serverPort: Port) {.async.} = + var + buffer = newString(16384) + i = 0 + + while i < swarmSize: + var sock = newAsyncSocket(nativesockets.AF_INET, nativesockets.SOCK_DGRAM, + Protocol.IPPROTO_UDP, false) + + bindAddr(sock, address = "127.0.0.1") + + let (null, localPort) = getLocalAddr(sock) + + var k = 0 + + while k < messagesToSend: + zeroMem(addr(buffer[0]), 16384) + + let message = "Message " & $(i * messagesToSend + k) + + await sendTo(sock, message, serverIp, serverPort) + + let (size, fromIp, fromPort) = await recvFrom(sock, addr buffer[0], + 16384) + + if buffer[0 .. (size - 1)] == message: + saveSendingPort(localPort) + + inc(recvCount) + + inc(k) + + close(sock) + + inc(i) + +proc readMessages(server: AsyncSocket) {.async.} = + let maxResponses = (swarmSize * messagesToSend) + + var + buffer = newString(16384) + i = 0 + + while i < maxResponses: + zeroMem(addr(buffer[0]), 16384) + + let (size, fromIp, fromPort) = await recvFrom(server, addr buffer[0], 16384) + + if buffer.startswith("Message ") and fromIp == "127.0.0.1": + await sendTo(server, buffer[0 .. (size - 1)], fromIp, fromPort) + + inc(msgCount) + + saveReceivedPort(fromPort) + + inc(i) + +proc createServer() {.async.} = + var server = newAsyncSocket(nativesockets.AF_INET, nativesockets.SOCK_DGRAM, Protocol.IPPROTO_UDP, false) + + bindAddr(server, Port(serverPort), "127.0.0.1") + + asyncCheck readMessages(server) + +asyncCheck createServer() +asyncCheck launchSwarm("127.0.0.1", Port(serverPort)) + +while true: + poll() + + if recvCount == swarmSize * messagesToSend: + break + +assert msgCount == swarmSize * messagesToSend +assert sendports == recvports + +echo msgCount \ No newline at end of file |