summary refs log tree commit diff stats
path: root/lib/pure
diff options
context:
space:
mode:
authorDominik Picheta <dominikpicheta@googlemail.com>2016-06-17 12:28:17 +0100
committerGitHub <noreply@github.com>2016-06-17 12:28:17 +0100
commit99abfcef35ee8cc819fcb3f63d48d8e11dd116d9 (patch)
tree5546a40b8ee73e212bddcd80c652bee9bb5857bb /lib/pure
parent9837b12ee6063ccbee20d50aef21d1916b5af07d (diff)
parenta93ae860887ade8c8fd9fbef23304e4e9032390e (diff)
downloadNim-99abfcef35ee8cc819fcb3f63d48d8e11dd116d9.tar.gz
Merge pull request #4356 from cheatfate/asyncudpv3
Async UDP support (hardcore version)
Diffstat (limited to 'lib/pure')
-rw-r--r--lib/pure/asyncdispatch.nim149
1 files changed, 149 insertions, 0 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim
index 455bebc7f..bb19f87ef 100644
--- a/lib/pure/asyncdispatch.nim
+++ b/lib/pure/asyncdispatch.nim
@@ -862,6 +862,101 @@ when defined(windows) or defined(nimdoc):
       # free ``ol``.
     return retFuture
 
+  proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr,
+               saddrLen: Socklen,
+               flags = {SocketFlag.SafeDisconn}): Future[void] =
+    ## Sends ``data`` to specified destination ``saddr``, using
+    ## socket ``socket``. The returned future will complete once all data
+    ## has been sent.
+    verifyPresence(socket)
+    var retFuture = newFuture[void]("sendTo")
+    var dataBuf: TWSABuf
+    dataBuf.buf = cast[cstring](data)
+    dataBuf.len = size.ULONG
+    var bytesSent = 0.Dword
+    var lowFlags = 0.Dword
+
+    # we will preserve address in our stack
+    var staddr: array[128, char] # SOCKADDR_STORAGE size is 128 bytes
+    var stalen: cint = cint(saddrLen)
+    zeroMem(addr(staddr[0]), 128)
+    copyMem(addr(staddr[0]), saddr, saddrLen)
+
+    var ol = PCustomOverlapped()
+    GC_ref(ol)
+    ol.data = CompletionData(fd: socket, cb:
+      proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
+        if not retFuture.finished:
+          if errcode == OSErrorCode(-1):
+            retFuture.complete()
+          else:
+            retFuture.fail(newException(OSError, osErrorMsg(errcode)))
+    )
+
+    let ret = WSASendTo(socket.SocketHandle, addr dataBuf, 1, addr bytesSent,
+                        lowFlags, cast[ptr SockAddr](addr(staddr[0])),
+                        stalen, cast[POVERLAPPED](ol), nil)
+    if ret == -1:
+      let err = osLastError()
+      if err.int32 != ERROR_IO_PENDING:
+        GC_unref(ol)
+        retFuture.fail(newException(OSError, osErrorMsg(err)))
+    else:
+      retFuture.complete()
+      # We don't deallocate ``ol`` here because even though this completed
+      # immediately poll will still be notified about its completion and it will
+      # free ``ol``.
+    return retFuture
+
+  proc recvFromInto*(socket: AsyncFD, data: pointer, size: int,
+                     saddr: ptr SockAddr, saddrLen: ptr SockLen,
+                     flags = {SocketFlag.SafeDisconn}): Future[int] =
+    ## Receives a datagram data from ``socket`` into ``buf``, which must
+    ## be at least of size ``size``, address of datagram's sender will be
+    ## stored into ``saddr`` and ``saddrLen``. Returned future will complete
+    ## once one datagram has been received, and will return size of packet
+    ## received.
+    verifyPresence(socket)
+    var retFuture = newFuture[int]("recvFromInto")
+
+    var dataBuf = TWSABuf(buf: cast[cstring](data), len: size.ULONG)
+
+    var bytesReceived = 0.Dword
+    var lowFlags = 0.Dword
+
+    var ol = PCustomOverlapped()
+    GC_ref(ol)
+    ol.data = CompletionData(fd: socket, cb:
+      proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
+        if not retFuture.finished:
+          if errcode == OSErrorCode(-1):
+            assert bytesCount <= size
+            retFuture.complete(bytesCount)
+          else:
+            # datagram sockets don't have disconnection,
+            # so we can just raise an exception
+            retFuture.fail(newException(OSError, osErrorMsg(errcode)))
+    )
+
+    let res = WSARecvFrom(socket.SocketHandle, addr dataBuf, 1,
+                          addr bytesReceived, addr lowFlags,
+                          saddr, cast[ptr cint](saddrLen),
+                          cast[POVERLAPPED](ol), nil)
+    if res == -1:
+      let err = osLastError()
+      if err.int32 != ERROR_IO_PENDING:
+        GC_unref(ol)
+        retFuture.fail(newException(OSError, osErrorMsg(err)))
+    else:
+      # Request completed immediately.
+      if bytesReceived != 0:
+        assert bytesReceived <= size
+        retFuture.complete(bytesReceived)
+      else:
+        if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
+          retFuture.complete(bytesReceived)
+    return retFuture
+
   proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}):
       Future[tuple[address: string, client: AsyncFD]] =
     ## Accepts a new connection. Returns a future containing the client socket
@@ -1359,6 +1454,60 @@ else:
     addWrite(socket, cb)
     return retFuture
 
+  proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr,
+               saddrLen: SockLen,
+               flags = {SocketFlag.SafeDisconn}): Future[void] =
+    ## Sends ``data`` of size ``size`` in bytes to specified destination
+    ## (``saddr`` of size ``saddrLen`` in bytes, using socket ``socket``.
+    ## The returned future will complete once all data has been sent.
+    var retFuture = newFuture[void]("sendTo")
+
+    # we will preserve address in our stack
+    var staddr: array[128, char] # SOCKADDR_STORAGE size is 128 bytes
+    var stalen = saddrLen
+    zeroMem(addr(staddr[0]), 128)
+    copyMem(addr(staddr[0]), saddr, saddrLen)
+
+    proc cb(sock: AsyncFD): bool =
+      result = true
+      let res = sendto(sock.SocketHandle, data, size, MSG_NOSIGNAL,
+                       cast[ptr SockAddr](addr(staddr[0])), stalen)
+      if res < 0:
+        let lastError = osLastError()
+        if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
+          retFuture.fail(newException(OSError, osErrorMsg(lastError)))
+        else:
+          result = false # We still want this callback to be called.
+      else:
+        retFuture.complete()
+
+    addWrite(socket, cb)
+    return retFuture
+
+  proc recvFromInto*(socket: AsyncFD, data: pointer, size: int,
+                     saddr: ptr SockAddr, saddrLen: ptr SockLen,
+                     flags = {SocketFlag.SafeDisconn}): Future[int] =
+    ## Receives a datagram data from ``socket`` into ``data``, which must
+    ## be at least of size ``size`` in bytes, address of datagram's sender
+    ## will be stored into ``saddr`` and ``saddrLen``. Returned future will
+    ## complete once one datagram has been received, and will return size
+    ## of packet received.
+    var retFuture = newFuture[int]("recvFromInto")
+    proc cb(sock: AsyncFD): bool =
+      result = true
+      let res = recvfrom(sock.SocketHandle, data, size.cint, flags.toOSFlags(),
+                         saddr, saddrLen)
+      if res < 0:
+        let lastError = osLastError()
+        if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
+          retFuture.fail(newException(OSError, osErrorMsg(lastError)))
+        else:
+          result = false
+      else:
+        retFuture.complete(res)
+    addRead(socket, cb)
+    return retFuture
+
   proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}):
       Future[tuple[address: string, client: AsyncFD]] =
     var retFuture = newFuture[tuple[address: string,