summary refs log tree commit diff stats
diff options
context:
space:
mode:
authorrockcavera <rockcavera@gmail.com>2020-04-26 05:16:10 -0300
committerGitHub <noreply@github.com>2020-04-26 10:16:10 +0200
commitd23446c6baea34438be44e7dad5a467c0a17cb27 (patch)
treee34572ceaadc376d97c7484fb0ec5867f5397568
parent76ffa4fa25ebf8f03a399a343088fa46e4ee13cb (diff)
downloadNim-d23446c6baea34438be44e7dad5a467c0a17cb27.tar.gz
added high level sendTo and recvFrom to std/asyncnet (UDP functionality) (#14109)
* added high level sendTo and recvFrom to std/asyncnet; tests were also added.

* add .since annotation, a changelog entry and fixed to standard library style guide.

* Improved asserts msgs and added notes for use with UDP sockets
-rw-r--r--changelog.md2
-rw-r--r--lib/pure/asyncnet.nim127
-rw-r--r--tests/async/tasyncnetudp.nim99
3 files changed, 228 insertions, 0 deletions
diff --git a/changelog.md b/changelog.md
index 73a6ffe24..80f3c8e72 100644
--- a/changelog.md
+++ b/changelog.md
@@ -41,6 +41,8 @@
   accept an existing string to modify, which avoids memory
   allocations, similar to `streams.readLine` (#13857).
 
+- Added high-level `asyncnet.sendTo` and `asyncnet.recvFrom`. UDP functionality.
+
 ## Language changes
 - In newruntime it is now allowed to assign discriminator field without restrictions as long as case object doesn't have custom destructor. Discriminator value doesn't have to be a constant either. If you have custom destructor for case object and you do want to freely assign discriminator fields, it is recommended to refactor object into 2 objects like this:
   ```nim
diff --git a/lib/pure/asyncnet.nim b/lib/pure/asyncnet.nim
index 8bdab88b1..ddfab3416 100644
--- a/lib/pure/asyncnet.nim
+++ b/lib/pure/asyncnet.nim
@@ -780,6 +780,133 @@ proc isClosed*(socket: AsyncSocket): bool =
   ## Determines whether the socket has been closed.
   return socket.closed
 
+proc sendTo*(socket: AsyncSocket, data: pointer, dataSize: int, address: string,
+             port: Port, flags = {SocketFlag.SafeDisconn}): owned(Future[void])
+            {.async, since: (1, 3).} =
+  ## This proc sends ``data`` to the specified ``address``, which may be an IP
+  ## address or a hostname. If a hostname is specified this function will try
+  ## each IP of that hostname. The returned future will complete once all data
+  ## has been sent.
+  ## 
+  ## If an error occurs an OSError exception will be raised.
+  ## 
+  ## This proc is normally used with connectionless sockets (UDP sockets).
+  assert(socket.protocol != IPPROTO_TCP,
+         "Cannot `sendTo` on a TCP socket. Use `send` instead.")
+  assert(not socket.closed, "Cannot `sendTo` on a closed socket.")
+
+  let
+    aiList = getAddrInfo(address, port, socket.domain, socket.sockType,
+                         socket.protocol)
+    retFuture = newFuture[void]("sendTo")
+
+  var
+    it = aiList
+    success = false
+    lastException: ref Exception
+  
+  while it != nil:
+    let fut = sendTo(socket.fd.AsyncFD, data, dataSize, it.ai_addr,
+                     it.ai_addrlen.SockLen, flags)
+    
+    yield fut
+
+    if not fut.failed:
+      success = true
+
+      break
+    
+    lastException = fut.readError()
+
+    it = it.ai_next
+  
+  freeaddrinfo(aiList)
+
+  if not success:
+    if lastException != nil:
+      retFuture.fail(lastException)
+
+    else:
+      retFuture.fail(newException(
+            IOError, "Couldn't resolve address: " & address))
+
+  else:
+    retFuture.complete()
+
+proc sendTo*(socket: AsyncSocket, data, address: string, port: Port):
+            owned(Future[void]) {.async, since: (1, 3).} =
+  ## This proc sends ``data`` to the specified ``address``, which may be an IP
+  ## address or a hostname. If a hostname is specified this function will try
+  ## each IP of that hostname. The returned future will complete once all data
+  ## has been sent.
+  ## 
+  ## If an error occurs an OSError exception will be raised.
+  ## 
+  ## This proc is normally used with connectionless sockets (UDP sockets).
+  await sendTo(socket, cstring(data), len(data), address, port)
+
+proc recvFrom*(socket: AsyncSocket, data: pointer, size: int,
+               address: FutureVar[string], port: FutureVar[Port],
+               flags = {SocketFlag.SafeDisconn}): owned(Future[int])
+              {.async, since: (1, 3).} =
+  ## Receives a datagram data from ``socket`` into ``data``, which must be at
+  ## least of size ``size``. The address and port of datagram's sender will be
+  ## stored into ``address`` and ``port``, respectively. Returned future will
+  ## complete once one datagram has been received, and will return size of
+  ## packet received.
+  ## 
+  ## If an error occurs an OSError exception will be raised.
+  ## 
+  ## This proc is normally used with connectionless sockets (UDP sockets).
+  template awaitRecvFromInto() =
+    var lAddr = sizeof(sAddr).SockLen
+    
+    result = await recvFromInto(AsyncFD(getFd(socket)), data, size,
+                                cast[ptr SockAddr](addr sAddr), addr lAddr)
+                                
+    address.mget.add(getAddrString(cast[ptr SockAddr](addr sAddr)))
+
+    address.complete()
+    
+  assert(socket.protocol != IPPROTO_TCP,
+         "Cannot `recvFrom` on a TCP socket. Use `recv` or `recvInto` instead.")
+  assert(not socket.closed, "Cannot `recvFrom` on a closed socket.")
+
+  var readSize: int
+
+  if socket.domain == AF_INET6:
+    var sAddr: Sockaddr_in6
+
+    awaitRecvFromInto()
+
+    port.complete(ntohs(sAddr.sin6_port).Port)
+    
+  else:
+    var sAddr: Sockaddr_in
+
+    awaitRecvFromInto()
+
+    port.complete(ntohs(sAddr.sin_port).Port)
+
+proc recvFrom*(socket: AsyncSocket, data: pointer, size: int):
+              owned(Future[tuple[size: int, address: string, port: Port]])
+              {.async, since: (1, 3).} =
+  ## Receives a datagram data from ``socket`` into ``data``, which must be at
+  ## least of size ``size``. Returned future will complete once one datagram has
+  ## been received and will return tuple with: size of packet received; and
+  ## address and port of datagram's sender.
+  ## 
+  ## If an error occurs an OSError exception will be raised.
+  ## 
+  ## This proc is normally used with connectionless sockets (UDP sockets).
+  var
+    fromIp = newFutureVar[string]()
+    fromPort = newFutureVar[Port]()
+  
+  result.size = await recvFrom(socket, data, size, fromIp, fromPort)
+  result.address = fromIp.mget()
+  result.port = fromPort.mget()
+
 when not defined(testing) and isMainModule:
   type
     TestCases = enum
diff --git a/tests/async/tasyncnetudp.nim b/tests/async/tasyncnetudp.nim
new file mode 100644
index 000000000..bb0f244e5
--- /dev/null
+++ b/tests/async/tasyncnetudp.nim
@@ -0,0 +1,99 @@
+# It is a reproduction of the 'tnewasyncudp' test code, but using a high level
+# of asynchronous procedures. Output: "5000"
+import asyncdispatch, asyncnet, nativesockets, net, strutils
+
+var msgCount = 0
+var recvCount = 0
+
+const
+  messagesToSend = 100
+  swarmSize = 50
+  serverPort = 10333
+
+var
+  sendports = 0
+  recvports = 0
+
+proc saveSendingPort(port: Port) =
+  sendports = sendports + int(port)
+
+proc saveReceivedPort(port: Port) =
+  recvports = recvports + int(port)
+
+proc launchSwarm(serverIp: string, serverPort: Port) {.async.} =
+  var
+    buffer = newString(16384)
+    i = 0
+
+  while i < swarmSize:
+    var sock = newAsyncSocket(nativesockets.AF_INET, nativesockets.SOCK_DGRAM,
+                              Protocol.IPPROTO_UDP, false)
+
+    bindAddr(sock, address = "127.0.0.1")
+
+    let (null, localPort) = getLocalAddr(sock)
+
+    var k = 0
+    
+    while k < messagesToSend:
+      zeroMem(addr(buffer[0]), 16384)
+
+      let message = "Message " & $(i * messagesToSend + k)
+
+      await sendTo(sock, message, serverIp, serverPort)
+
+      let (size, fromIp, fromPort) = await recvFrom(sock, addr buffer[0],
+                                                    16384)
+
+      if buffer[0 .. (size - 1)] == message:
+        saveSendingPort(localPort)
+
+        inc(recvCount)
+
+      inc(k)
+    
+    close(sock)
+
+    inc(i)
+
+proc readMessages(server: AsyncSocket) {.async.} =
+  let maxResponses = (swarmSize * messagesToSend)
+
+  var
+    buffer = newString(16384)
+    i = 0
+  
+  while i < maxResponses:
+    zeroMem(addr(buffer[0]), 16384)
+    
+    let (size, fromIp, fromPort) = await recvFrom(server, addr buffer[0], 16384)
+
+    if buffer.startswith("Message ") and fromIp == "127.0.0.1":
+      await sendTo(server, buffer[0 .. (size - 1)], fromIp, fromPort)
+
+      inc(msgCount)
+
+      saveReceivedPort(fromPort)
+
+    inc(i)
+
+proc createServer() {.async.} =
+  var server = newAsyncSocket(nativesockets.AF_INET, nativesockets.SOCK_DGRAM, Protocol.IPPROTO_UDP, false)
+  
+  bindAddr(server, Port(serverPort), "127.0.0.1")
+
+  asyncCheck readMessages(server)
+
+asyncCheck createServer()
+asyncCheck launchSwarm("127.0.0.1", Port(serverPort))
+
+while true:
+  poll()
+
+  if recvCount == swarmSize * messagesToSend:
+    break
+
+assert msgCount == swarmSize * messagesToSend
+assert sendports == recvports
+
+echo msgCount
\ No newline at end of file