summary refs log tree commit diff stats
path: root/lib
diff options
context:
space:
mode:
authorAnatoly Galiulin <galiulin.anatoly@gmail.com>2016-08-31 11:18:36 +0700
committerAnatoly Galiulin <galiulin.anatoly@gmail.com>2016-09-06 09:31:13 +0700
commite4c46e6fba65303c7046db28d996df17ec805e4f (patch)
treea78b72acadcf9cf5dea5234556d31a649e984215 /lib
parent41f6c08f92f9f0255330155f6fe8802e3fb2ba3a (diff)
downloadNim-e4c46e6fba65303c7046db28d996df17ec805e4f.tar.gz
Add async IO operations with buffers on files and sockets
Diffstat (limited to 'lib')
-rw-r--r--lib/pure/asyncdispatch.nim94
-rw-r--r--lib/pure/asyncfile.nim141
-rw-r--r--lib/pure/asyncnet.nim67
-rw-r--r--lib/upcoming/asyncdispatch.nim94
4 files changed, 373 insertions, 23 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim
index 6bea8e817..faac79d3b 100644
--- a/lib/pure/asyncdispatch.nim
+++ b/lib/pure/asyncdispatch.nim
@@ -749,7 +749,7 @@ when defined(windows) or defined(nimdoc):
           retFuture.complete("")
     return retFuture
 
-  proc recvInto*(socket: AsyncFD, buf: cstring, size: int,
+  proc recvBuffer*(socket: AsyncFD, buf: pointer, size: int,
                 flags = {SocketFlag.SafeDisconn}): Future[int] =
     ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``, which must
     ## at least be of that size. Returned future will complete once all the
@@ -769,11 +769,11 @@ when defined(windows) or defined(nimdoc):
     verifyPresence(socket)
     assert SocketFlag.Peek notin flags, "Peek not supported on Windows."
 
-    var retFuture = newFuture[int]("recvInto")
+    var retFuture = newFuture[int]("recvBuffer")
 
     #buf[] = '\0'
     var dataBuf: TWSABuf
-    dataBuf.buf = buf
+    dataBuf.buf = cast[cstring](buf)
     dataBuf.len = size.ULONG
 
     var bytesReceived: Dword
@@ -784,10 +784,7 @@ when defined(windows) or defined(nimdoc):
       proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
         if not retFuture.finished:
           if errcode == OSErrorCode(-1):
-            if bytesCount == 0 and dataBuf.buf[0] == '\0':
-              retFuture.complete(0)
-            else:
-              retFuture.complete(bytesCount)
+            retFuture.complete(bytesCount)
           else:
             if flags.isDisconnectionError(errcode):
               retFuture.complete(0)
@@ -819,6 +816,51 @@ when defined(windows) or defined(nimdoc):
           retFuture.complete(bytesReceived)
     return retFuture
 
+  proc sendBuffer*(socket: AsyncFD, buf: pointer, size: int,
+             flags = {SocketFlag.SafeDisconn}): Future[void] =
+    ## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future will complete once all
+    ## data has been sent.
+    ## **WARNING**: Use it with caution. If ``buf`` refers to GC'ed object, you must use GC_ref/GC_unref calls
+    ## to avoid early freeing of the buffer
+    verifyPresence(socket)
+    var retFuture = newFuture[void]("send")
+
+    var dataBuf: TWSABuf
+    dataBuf.buf = cast[cstring](buf)
+    dataBuf.len = size.ULONG
+
+    var bytesReceived, lowFlags: Dword
+    var ol = PCustomOverlapped()
+    GC_ref(ol)
+    ol.data = CompletionData(fd: socket, cb:
+      proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
+        if not retFuture.finished:
+          if errcode == OSErrorCode(-1):
+            retFuture.complete()
+          else:
+            if flags.isDisconnectionError(errcode):
+              retFuture.complete()
+            else:
+              retFuture.fail(newException(OSError, osErrorMsg(errcode)))
+    )
+
+    let ret = WSASend(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
+                      lowFlags, cast[POVERLAPPED](ol), nil)
+    if ret == -1:
+      let err = osLastError()
+      if err.int32 != ERROR_IO_PENDING:
+        GC_unref(ol)
+        if flags.isDisconnectionError(err):
+          retFuture.complete()
+        else:
+          retFuture.fail(newException(OSError, osErrorMsg(err)))
+    else:
+      retFuture.complete()
+      # We don't deallocate ``ol`` here because even though this completed
+      # immediately poll will still be notified about its completion and it will
+      # free ``ol``.
+    return retFuture
+
   proc send*(socket: AsyncFD, data: string,
              flags = {SocketFlag.SafeDisconn}): Future[void] =
     ## Sends ``data`` to ``socket``. The returned future will complete once all
@@ -828,9 +870,9 @@ when defined(windows) or defined(nimdoc):
 
     var dataBuf: TWSABuf
     dataBuf.buf = data
-    dataBuf.len = data.len.ULONG
     GC_ref(data) # we need to protect data until send operation is completed
                  # or failed.
+    dataBuf.len = data.len.ULONG
 
     var bytesReceived, lowFlags: Dword
     var ol = PCustomOverlapped()
@@ -1403,9 +1445,9 @@ else:
     addRead(socket, cb)
     return retFuture
 
-  proc recvInto*(socket: AsyncFD, buf: cstring, size: int,
+  proc recvBuffer*(socket: AsyncFD, buf: pointer, size: int,
                   flags = {SocketFlag.SafeDisconn}): Future[int] =
-    var retFuture = newFuture[int]("recvInto")
+    var retFuture = newFuture[int]("recvBuffer")
 
     proc cb(sock: AsyncFD): bool =
       result = true
@@ -1427,6 +1469,38 @@ else:
     addRead(socket, cb)
     return retFuture
 
+  proc sendBuffer*(socket: AsyncFD, buf: pointer, size: int,
+             flags = {SocketFlag.SafeDisconn}): Future[void] =
+    var retFuture = newFuture[void]("send")
+
+    var written = 0
+
+    proc cb(sock: AsyncFD): bool =
+      result = true
+      let netSize = size-written
+      var d = cast[cstring](buf)
+      let res = send(sock.SocketHandle, addr d[written], netSize.cint,
+                     MSG_NOSIGNAL)
+      if res < 0:
+        let lastError = osLastError()
+        if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
+          if flags.isDisconnectionError(lastError):
+            retFuture.complete()
+          else:
+            retFuture.fail(newException(OSError, osErrorMsg(lastError)))
+        else:
+          result = false # We still want this callback to be called.
+      else:
+        written.inc(res)
+        if res != netSize:
+          result = false # We still have data to send.
+        else:
+          retFuture.complete()
+    # TODO: The following causes crashes.
+    #if not cb(socket):
+    addWrite(socket, cb)
+    return retFuture
+
   proc send*(socket: AsyncFD, data: string,
              flags = {SocketFlag.SafeDisconn}): Future[void] =
     var retFuture = newFuture[void]("send")
diff --git a/lib/pure/asyncfile.nim b/lib/pure/asyncfile.nim
index 5df606ea8..8801c64d0 100644
--- a/lib/pure/asyncfile.nim
+++ b/lib/pure/asyncfile.nim
@@ -112,6 +112,80 @@ proc openAsync*(filename: string, mode = fmRead): AsyncFile =
 
     register(result.fd)
 
+proc readBuffer*(f: AsyncFile, buf: pointer, size: int): Future[int] =
+  ## Read ``size`` bytes from the specified file asynchronously starting at
+  ## the current position of the file pointer.
+  ##
+  ## If the file pointer is past the end of the file then an empty string is
+  ## returned.
+  var retFuture = newFuture[int]("asyncfile.readBuffer")
+
+  when defined(windows) or defined(nimdoc):
+    var ol = PCustomOverlapped()
+    GC_ref(ol)
+    ol.data = CompletionData(fd: f.fd, cb:
+      proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
+        if not retFuture.finished:
+          if errcode == OSErrorCode(-1):
+            assert bytesCount > 0
+            assert bytesCount <= size
+            f.offset.inc bytesCount
+            retFuture.complete(bytesCount)
+          else:
+            if errcode.int32 == ERROR_HANDLE_EOF:
+              retFuture.complete(0)
+            else:
+              retFuture.fail(newException(OSError, osErrorMsg(errcode)))
+    )
+    ol.offset = DWord(f.offset and 0xffffffff)
+    ol.offsetHigh = DWord(f.offset shr 32)
+
+    # According to MSDN we're supposed to pass nil to lpNumberOfBytesRead.
+    let ret = readFile(f.fd.Handle, buf, size.int32, nil,
+                       cast[POVERLAPPED](ol))
+    if not ret.bool:
+      let err = osLastError()
+      if err.int32 != ERROR_IO_PENDING:
+        GC_unref(ol)
+        retFuture.fail(newException(OSError, osErrorMsg(err)))
+    else:
+      # Request completed immediately.
+      var bytesRead: DWord
+      let overlappedRes = getOverlappedResult(f.fd.Handle,
+          cast[POverlapped](ol), bytesRead, false.WinBool)
+      if not overlappedRes.bool:
+        let err = osLastError()
+        if err.int32 == ERROR_HANDLE_EOF:
+          retFuture.complete(0)
+        else:
+          retFuture.fail(newException(OSError, osErrorMsg(osLastError())))
+      else:
+        assert bytesRead > 0
+        assert bytesRead <= size
+        f.offset.inc bytesRead
+        retFuture.complete(bytesRead)
+  else:
+    proc cb(fd: AsyncFD): bool =
+      result = true
+      let res = read(fd.cint, cast[cstring](buf), size.cint)
+      if res < 0:
+        let lastError = osLastError()
+        if lastError.int32 != EAGAIN:
+          retFuture.fail(newException(OSError, osErrorMsg(lastError)))
+        else:
+          result = false # We still want this callback to be called.
+      elif res == 0:
+        # EOF
+        retFuture.complete(0)
+      else:
+        f.offset.inc(res)
+        retFuture.complete(res)
+
+    if not cb(f.fd):
+      addRead(f.fd, cb)
+
+  return retFuture
+
 proc read*(f: AsyncFile, size: int): Future[string] =
   ## Read ``size`` bytes from the specified file asynchronously starting at
   ## the current position of the file pointer.
@@ -238,6 +312,73 @@ proc readAll*(f: AsyncFile): Future[string] {.async.} =
       return
     result.add data
 
+proc writeBuffer*(f: AsyncFile, buf: pointer, size: int): Future[void] =
+  ## Writes ``size`` bytes from ``buf`` to the file specified asynchronously.
+  ##
+  ## The returned Future will complete once all data has been written to the
+  ## specified file.
+  var retFuture = newFuture[void]("asyncfile.writeBuffer")
+  when defined(windows) or defined(nimdoc):
+    var ol = PCustomOverlapped()
+    GC_ref(ol)
+    ol.data = CompletionData(fd: f.fd, cb:
+      proc (fd: AsyncFD, bytesCount: DWord, errcode: OSErrorCode) =
+        if not retFuture.finished:
+          if errcode == OSErrorCode(-1):
+            assert bytesCount == size.int32
+            f.offset.inc(size)
+            retFuture.complete()
+          else:
+            retFuture.fail(newException(OSError, osErrorMsg(errcode)))
+    )
+    ol.offset = DWord(f.offset and 0xffffffff)
+    ol.offsetHigh = DWord(f.offset shr 32)
+
+    # According to MSDN we're supposed to pass nil to lpNumberOfBytesWritten.
+    let ret = writeFile(f.fd.Handle, buf, size.int32, nil,
+                       cast[POVERLAPPED](ol))
+    if not ret.bool:
+      let err = osLastError()
+      if err.int32 != ERROR_IO_PENDING:
+        GC_unref(ol)
+        retFuture.fail(newException(OSError, osErrorMsg(err)))
+    else:
+      # Request completed immediately.
+      var bytesWritten: DWord
+      let overlappedRes = getOverlappedResult(f.fd.Handle,
+          cast[POverlapped](ol), bytesWritten, false.WinBool)
+      if not overlappedRes.bool:
+        retFuture.fail(newException(OSError, osErrorMsg(osLastError())))
+      else:
+        assert bytesWritten == size.int32
+        f.offset.inc(size)
+        retFuture.complete()
+  else:
+    var written = 0
+
+    proc cb(fd: AsyncFD): bool =
+      result = true
+      let remainderSize = size-written
+      var cbuf = cast[cstring](buf)
+      let res = write(fd.cint, addr cbuf[written], remainderSize.cint)
+      if res < 0:
+        let lastError = osLastError()
+        if lastError.int32 != EAGAIN:
+          retFuture.fail(newException(OSError, osErrorMsg(lastError)))
+        else:
+          result = false # We still want this callback to be called.
+      else:
+        written.inc res
+        f.offset.inc res
+        if res != remainderSize:
+          result = false # We still have data to write.
+        else:
+          retFuture.complete()
+
+    if not cb(f.fd):
+      addWrite(f.fd, cb)
+  return retFuture
+
 proc write*(f: AsyncFile, data: string): Future[void] =
   ## Writes ``data`` to the file specified asynchronously.
   ##
diff --git a/lib/pure/asyncnet.nim b/lib/pure/asyncnet.nim
index a1988f4a6..d92100831 100644
--- a/lib/pure/asyncnet.nim
+++ b/lib/pure/asyncnet.nim
@@ -193,7 +193,7 @@ proc connect*(socket: AsyncSocket, address: string, port: Port) {.async.} =
       sslSetConnectState(socket.sslHandle)
       sslLoop(socket, flags, sslDoHandshake(socket.sslHandle))
 
-template readInto(buf: cstring, size: int, socket: AsyncSocket,
+template readInto(buf: pointer, size: int, socket: AsyncSocket,
                   flags: set[SocketFlag]): int =
   ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``. Note that
   ## this is a template and not a proc.
@@ -202,10 +202,10 @@ template readInto(buf: cstring, size: int, socket: AsyncSocket,
     when defineSsl:
       # SSL mode.
       sslLoop(socket, flags,
-        sslRead(socket.sslHandle, buf, size.cint))
+        sslRead(socket.sslHandle, cast[cstring](buf), size.cint))
       res = opResult
   else:
-    var recvIntoFut = recvInto(socket.fd.AsyncFD, buf, size, flags)
+    var recvIntoFut = recvBuffer(socket.fd.AsyncFD, buf, size, flags)
     yield recvIntoFut
     # Not in SSL mode.
     res = recvIntoFut.read()
@@ -218,6 +218,54 @@ template readIntoBuf(socket: AsyncSocket,
   socket.bufLen = size
   size
 
+proc recvBuffer*(socket: AsyncSocket, buf: pointer, size: int,
+           flags = {SocketFlag.SafeDisconn}): Future[int] {.async.} =
+  ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``.
+  ##
+  ## For buffered sockets this function will attempt to read all the requested
+  ## data. It will read this data in ``BufferSize`` chunks.
+  ##
+  ## For unbuffered sockets this function makes no effort to read
+  ## all the data requested. It will return as much data as the operating system
+  ## gives it.
+  ##
+  ## If socket is disconnected during the
+  ## recv operation then the future may complete with only a part of the
+  ## requested data.
+  ##
+  ## If socket is disconnected and no data is available
+  ## to be read then the future will complete with a value of ``0``.
+  if socket.isBuffered:
+    let originalBufPos = socket.currPos
+
+    if socket.bufLen == 0:
+      let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
+      if res == 0:
+        return 0
+
+    var read = 0
+    var cbuf = cast[cstring](buf)
+    while read < size:
+      if socket.currPos >= socket.bufLen:
+        if SocketFlag.Peek in flags:
+          # We don't want to get another buffer if we're peeking.
+          break
+        let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
+        if res == 0:
+          break
+
+      let chunk = min(socket.bufLen-socket.currPos, size-read)
+      copyMem(addr(cbuf[read]), addr(socket.buffer[socket.currPos]), chunk)
+      read.inc(chunk)
+      socket.currPos.inc(chunk)
+
+    if SocketFlag.Peek in flags:
+      # Restore old buffer cursor position.
+      socket.currPos = originalBufPos
+    result = read
+  else:
+    result = readInto(buf, size, socket, flags)
+
 proc recv*(socket: AsyncSocket, size: int,
            flags = {SocketFlag.SafeDisconn}): Future[string] {.async.} =
   ## Reads **up to** ``size`` bytes from ``socket``.
@@ -270,6 +318,19 @@ proc recv*(socket: AsyncSocket, size: int,
     let read = readInto(addr result[0], size, socket, flags)
     result.setLen(read)
 
+proc sendBuffer*(socket: AsyncSocket, buf: pointer, size: int,
+            flags = {SocketFlag.SafeDisconn}) {.async.} =
+  ## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future will complete once all
+  ## data has been sent.
+  assert socket != nil
+  if socket.isSsl:
+    when defineSsl:
+      sslLoop(socket, flags,
+              sslWrite(socket.sslHandle, cast[pointer](buf), size.cint))
+      await sendPendingSslData(socket, flags)
+  else:
+    await sendBuffer(socket.fd.AsyncFD, buf, size, flags)
+
 proc send*(socket: AsyncSocket, data: string,
            flags = {SocketFlag.SafeDisconn}) {.async.} =
   ## Sends ``data`` to ``socket``. The returned future will complete once all
diff --git a/lib/upcoming/asyncdispatch.nim b/lib/upcoming/asyncdispatch.nim
index ce44e8a6a..1f66ef6c8 100644
--- a/lib/upcoming/asyncdispatch.nim
+++ b/lib/upcoming/asyncdispatch.nim
@@ -718,7 +718,7 @@ when defined(windows) or defined(nimdoc):
           retFuture.complete("")
     return retFuture
 
-  proc recvInto*(socket: AsyncFD, buf: cstring, size: int,
+  proc recvBuffer*(socket: AsyncFD, buf: pointer, size: int,
                 flags = {SocketFlag.SafeDisconn}): Future[int] =
     ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``, which must
     ## at least be of that size. Returned future will complete once all the
@@ -738,11 +738,11 @@ when defined(windows) or defined(nimdoc):
     verifyPresence(socket)
     assert SocketFlag.Peek notin flags, "Peek not supported on Windows."
 
-    var retFuture = newFuture[int]("recvInto")
+    var retFuture = newFuture[int]("recvBuffer")
 
     #buf[] = '\0'
     var dataBuf: TWSABuf
-    dataBuf.buf = buf
+    dataBuf.buf = cast[cstring](buf)
     dataBuf.len = size.ULONG
 
     var bytesReceived: Dword
@@ -753,10 +753,7 @@ when defined(windows) or defined(nimdoc):
       proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
         if not retFuture.finished:
           if errcode == OSErrorCode(-1):
-            if bytesCount == 0 and dataBuf.buf[0] == '\0':
-              retFuture.complete(0)
-            else:
-              retFuture.complete(bytesCount)
+            retFuture.complete(bytesCount)
           else:
             if flags.isDisconnectionError(errcode):
               retFuture.complete(0)
@@ -788,6 +785,51 @@ when defined(windows) or defined(nimdoc):
           retFuture.complete(bytesReceived)
     return retFuture
 
+  proc sendBuffer*(socket: AsyncFD, buf: pointer, size: int,
+             flags = {SocketFlag.SafeDisconn}): Future[void] =
+    ## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future will complete once all
+    ## data has been sent.
+    ## **WARNING**: Use it with caution. If ``buf`` refers to GC'ed object, you must use GC_ref/GC_unref calls
+    ## to avoid early freeing of the buffer
+    verifyPresence(socket)
+    var retFuture = newFuture[void]("send")
+
+    var dataBuf: TWSABuf
+    dataBuf.buf = cast[cstring](buf)
+    dataBuf.len = size.ULONG
+
+    var bytesReceived, lowFlags: Dword
+    var ol = PCustomOverlapped()
+    GC_ref(ol)
+    ol.data = CompletionData(fd: socket, cb:
+      proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
+        if not retFuture.finished:
+          if errcode == OSErrorCode(-1):
+            retFuture.complete()
+          else:
+            if flags.isDisconnectionError(errcode):
+              retFuture.complete()
+            else:
+              retFuture.fail(newException(OSError, osErrorMsg(errcode)))
+    )
+
+    let ret = WSASend(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
+                      lowFlags, cast[POVERLAPPED](ol), nil)
+    if ret == -1:
+      let err = osLastError()
+      if err.int32 != ERROR_IO_PENDING:
+        GC_unref(ol)
+        if flags.isDisconnectionError(err):
+          retFuture.complete()
+        else:
+          retFuture.fail(newException(OSError, osErrorMsg(err)))
+    else:
+      retFuture.complete()
+      # We don't deallocate ``ol`` here because even though this completed
+      # immediately poll will still be notified about its completion and it will
+      # free ``ol``.
+    return retFuture
+
   proc send*(socket: AsyncFD, data: string,
              flags = {SocketFlag.SafeDisconn}): Future[void] =
     ## Sends ``data`` to ``socket``. The returned future will complete once all
@@ -797,9 +839,9 @@ when defined(windows) or defined(nimdoc):
 
     var dataBuf: TWSABuf
     dataBuf.buf = data
-    dataBuf.len = data.len.ULONG
     GC_ref(data) # we need to protect data until send operation is completed
                  # or failed.
+    dataBuf.len = data.len.ULONG
 
     var bytesReceived, lowFlags: Dword
     var ol = PCustomOverlapped()
@@ -1528,9 +1570,9 @@ else:
     addRead(socket, cb)
     return retFuture
 
-  proc recvInto*(socket: AsyncFD, buf: cstring, size: int,
+  proc recvBuffer*(socket: AsyncFD, buf: pointer, size: int,
                  flags = {SocketFlag.SafeDisconn}): Future[int] =
-    var retFuture = newFuture[int]("recvInto")
+    var retFuture = newFuture[int]("recvBuffer")
 
     proc cb(sock: AsyncFD): bool =
       result = true
@@ -1552,6 +1594,38 @@ else:
     addRead(socket, cb)
     return retFuture
 
+  proc sendBuffer*(socket: AsyncFD, buf: pointer, size: int,
+             flags = {SocketFlag.SafeDisconn}): Future[void] =
+    var retFuture = newFuture[void]("send")
+
+    var written = 0
+
+    proc cb(sock: AsyncFD): bool =
+      result = true
+      let netSize = size-written
+      var d = cast[cstring](buf)
+      let res = send(sock.SocketHandle, addr d[written], netSize.cint,
+                     MSG_NOSIGNAL)
+      if res < 0:
+        let lastError = osLastError()
+        if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
+          if flags.isDisconnectionError(lastError):
+            retFuture.complete()
+          else:
+            retFuture.fail(newException(OSError, osErrorMsg(lastError)))
+        else:
+          result = false # We still want this callback to be called.
+      else:
+        written.inc(res)
+        if res != netSize:
+          result = false # We still have data to send.
+        else:
+          retFuture.complete()
+    # TODO: The following causes crashes.
+    #if not cb(socket):
+    addWrite(socket, cb)
+    return retFuture
+
   proc send*(socket: AsyncFD, data: string,
              flags = {SocketFlag.SafeDisconn}): Future[void] =
     var retFuture = newFuture[void]("send")