diff options
author | Anatoly Galiulin <galiulin.anatoly@gmail.com> | 2016-08-31 11:18:36 +0700 |
---|---|---|
committer | Anatoly Galiulin <galiulin.anatoly@gmail.com> | 2016-09-06 09:31:13 +0700 |
commit | e4c46e6fba65303c7046db28d996df17ec805e4f (patch) | |
tree | a78b72acadcf9cf5dea5234556d31a649e984215 /lib/pure | |
parent | 41f6c08f92f9f0255330155f6fe8802e3fb2ba3a (diff) | |
download | Nim-e4c46e6fba65303c7046db28d996df17ec805e4f.tar.gz |
Add async IO operations with buffers on files and sockets
Diffstat (limited to 'lib/pure')
-rw-r--r-- | lib/pure/asyncdispatch.nim | 94 | ||||
-rw-r--r-- | lib/pure/asyncfile.nim | 141 | ||||
-rw-r--r-- | lib/pure/asyncnet.nim | 67 |
3 files changed, 289 insertions, 13 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index 6bea8e817..faac79d3b 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -749,7 +749,7 @@ when defined(windows) or defined(nimdoc): retFuture.complete("") return retFuture - proc recvInto*(socket: AsyncFD, buf: cstring, size: int, + proc recvBuffer*(socket: AsyncFD, buf: pointer, size: int, flags = {SocketFlag.SafeDisconn}): Future[int] = ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``, which must ## at least be of that size. Returned future will complete once all the @@ -769,11 +769,11 @@ when defined(windows) or defined(nimdoc): verifyPresence(socket) assert SocketFlag.Peek notin flags, "Peek not supported on Windows." - var retFuture = newFuture[int]("recvInto") + var retFuture = newFuture[int]("recvBuffer") #buf[] = '\0' var dataBuf: TWSABuf - dataBuf.buf = buf + dataBuf.buf = cast[cstring](buf) dataBuf.len = size.ULONG var bytesReceived: Dword @@ -784,10 +784,7 @@ when defined(windows) or defined(nimdoc): proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = if not retFuture.finished: if errcode == OSErrorCode(-1): - if bytesCount == 0 and dataBuf.buf[0] == '\0': - retFuture.complete(0) - else: - retFuture.complete(bytesCount) + retFuture.complete(bytesCount) else: if flags.isDisconnectionError(errcode): retFuture.complete(0) @@ -819,6 +816,51 @@ when defined(windows) or defined(nimdoc): retFuture.complete(bytesReceived) return retFuture + proc sendBuffer*(socket: AsyncFD, buf: pointer, size: int, + flags = {SocketFlag.SafeDisconn}): Future[void] = + ## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future will complete once all + ## data has been sent. + ## **WARNING**: Use it with caution. If ``buf`` refers to GC'ed object, you must use GC_ref/GC_unref calls + ## to avoid early freeing of the buffer + verifyPresence(socket) + var retFuture = newFuture[void]("send") + + var dataBuf: TWSABuf + dataBuf.buf = cast[cstring](buf) + dataBuf.len = size.ULONG + + var bytesReceived, lowFlags: Dword + var ol = PCustomOverlapped() + GC_ref(ol) + ol.data = CompletionData(fd: socket, cb: + proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + if not retFuture.finished: + if errcode == OSErrorCode(-1): + retFuture.complete() + else: + if flags.isDisconnectionError(errcode): + retFuture.complete() + else: + retFuture.fail(newException(OSError, osErrorMsg(errcode))) + ) + + let ret = WSASend(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived, + lowFlags, cast[POVERLAPPED](ol), nil) + if ret == -1: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + GC_unref(ol) + if flags.isDisconnectionError(err): + retFuture.complete() + else: + retFuture.fail(newException(OSError, osErrorMsg(err))) + else: + retFuture.complete() + # We don't deallocate ``ol`` here because even though this completed + # immediately poll will still be notified about its completion and it will + # free ``ol``. + return retFuture + proc send*(socket: AsyncFD, data: string, flags = {SocketFlag.SafeDisconn}): Future[void] = ## Sends ``data`` to ``socket``. The returned future will complete once all @@ -828,9 +870,9 @@ when defined(windows) or defined(nimdoc): var dataBuf: TWSABuf dataBuf.buf = data - dataBuf.len = data.len.ULONG GC_ref(data) # we need to protect data until send operation is completed # or failed. + dataBuf.len = data.len.ULONG var bytesReceived, lowFlags: Dword var ol = PCustomOverlapped() @@ -1403,9 +1445,9 @@ else: addRead(socket, cb) return retFuture - proc recvInto*(socket: AsyncFD, buf: cstring, size: int, + proc recvBuffer*(socket: AsyncFD, buf: pointer, size: int, flags = {SocketFlag.SafeDisconn}): Future[int] = - var retFuture = newFuture[int]("recvInto") + var retFuture = newFuture[int]("recvBuffer") proc cb(sock: AsyncFD): bool = result = true @@ -1427,6 +1469,38 @@ else: addRead(socket, cb) return retFuture + proc sendBuffer*(socket: AsyncFD, buf: pointer, size: int, + flags = {SocketFlag.SafeDisconn}): Future[void] = + var retFuture = newFuture[void]("send") + + var written = 0 + + proc cb(sock: AsyncFD): bool = + result = true + let netSize = size-written + var d = cast[cstring](buf) + let res = send(sock.SocketHandle, addr d[written], netSize.cint, + MSG_NOSIGNAL) + if res < 0: + let lastError = osLastError() + if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: + if flags.isDisconnectionError(lastError): + retFuture.complete() + else: + retFuture.fail(newException(OSError, osErrorMsg(lastError))) + else: + result = false # We still want this callback to be called. + else: + written.inc(res) + if res != netSize: + result = false # We still have data to send. + else: + retFuture.complete() + # TODO: The following causes crashes. + #if not cb(socket): + addWrite(socket, cb) + return retFuture + proc send*(socket: AsyncFD, data: string, flags = {SocketFlag.SafeDisconn}): Future[void] = var retFuture = newFuture[void]("send") diff --git a/lib/pure/asyncfile.nim b/lib/pure/asyncfile.nim index 5df606ea8..8801c64d0 100644 --- a/lib/pure/asyncfile.nim +++ b/lib/pure/asyncfile.nim @@ -112,6 +112,80 @@ proc openAsync*(filename: string, mode = fmRead): AsyncFile = register(result.fd) +proc readBuffer*(f: AsyncFile, buf: pointer, size: int): Future[int] = + ## Read ``size`` bytes from the specified file asynchronously starting at + ## the current position of the file pointer. + ## + ## If the file pointer is past the end of the file then an empty string is + ## returned. + var retFuture = newFuture[int]("asyncfile.readBuffer") + + when defined(windows) or defined(nimdoc): + var ol = PCustomOverlapped() + GC_ref(ol) + ol.data = CompletionData(fd: f.fd, cb: + proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + if not retFuture.finished: + if errcode == OSErrorCode(-1): + assert bytesCount > 0 + assert bytesCount <= size + f.offset.inc bytesCount + retFuture.complete(bytesCount) + else: + if errcode.int32 == ERROR_HANDLE_EOF: + retFuture.complete(0) + else: + retFuture.fail(newException(OSError, osErrorMsg(errcode))) + ) + ol.offset = DWord(f.offset and 0xffffffff) + ol.offsetHigh = DWord(f.offset shr 32) + + # According to MSDN we're supposed to pass nil to lpNumberOfBytesRead. + let ret = readFile(f.fd.Handle, buf, size.int32, nil, + cast[POVERLAPPED](ol)) + if not ret.bool: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + GC_unref(ol) + retFuture.fail(newException(OSError, osErrorMsg(err))) + else: + # Request completed immediately. + var bytesRead: DWord + let overlappedRes = getOverlappedResult(f.fd.Handle, + cast[POverlapped](ol), bytesRead, false.WinBool) + if not overlappedRes.bool: + let err = osLastError() + if err.int32 == ERROR_HANDLE_EOF: + retFuture.complete(0) + else: + retFuture.fail(newException(OSError, osErrorMsg(osLastError()))) + else: + assert bytesRead > 0 + assert bytesRead <= size + f.offset.inc bytesRead + retFuture.complete(bytesRead) + else: + proc cb(fd: AsyncFD): bool = + result = true + let res = read(fd.cint, cast[cstring](buf), size.cint) + if res < 0: + let lastError = osLastError() + if lastError.int32 != EAGAIN: + retFuture.fail(newException(OSError, osErrorMsg(lastError))) + else: + result = false # We still want this callback to be called. + elif res == 0: + # EOF + retFuture.complete(0) + else: + f.offset.inc(res) + retFuture.complete(res) + + if not cb(f.fd): + addRead(f.fd, cb) + + return retFuture + proc read*(f: AsyncFile, size: int): Future[string] = ## Read ``size`` bytes from the specified file asynchronously starting at ## the current position of the file pointer. @@ -238,6 +312,73 @@ proc readAll*(f: AsyncFile): Future[string] {.async.} = return result.add data +proc writeBuffer*(f: AsyncFile, buf: pointer, size: int): Future[void] = + ## Writes ``size`` bytes from ``buf`` to the file specified asynchronously. + ## + ## The returned Future will complete once all data has been written to the + ## specified file. + var retFuture = newFuture[void]("asyncfile.writeBuffer") + when defined(windows) or defined(nimdoc): + var ol = PCustomOverlapped() + GC_ref(ol) + ol.data = CompletionData(fd: f.fd, cb: + proc (fd: AsyncFD, bytesCount: DWord, errcode: OSErrorCode) = + if not retFuture.finished: + if errcode == OSErrorCode(-1): + assert bytesCount == size.int32 + f.offset.inc(size) + retFuture.complete() + else: + retFuture.fail(newException(OSError, osErrorMsg(errcode))) + ) + ol.offset = DWord(f.offset and 0xffffffff) + ol.offsetHigh = DWord(f.offset shr 32) + + # According to MSDN we're supposed to pass nil to lpNumberOfBytesWritten. + let ret = writeFile(f.fd.Handle, buf, size.int32, nil, + cast[POVERLAPPED](ol)) + if not ret.bool: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + GC_unref(ol) + retFuture.fail(newException(OSError, osErrorMsg(err))) + else: + # Request completed immediately. + var bytesWritten: DWord + let overlappedRes = getOverlappedResult(f.fd.Handle, + cast[POverlapped](ol), bytesWritten, false.WinBool) + if not overlappedRes.bool: + retFuture.fail(newException(OSError, osErrorMsg(osLastError()))) + else: + assert bytesWritten == size.int32 + f.offset.inc(size) + retFuture.complete() + else: + var written = 0 + + proc cb(fd: AsyncFD): bool = + result = true + let remainderSize = size-written + var cbuf = cast[cstring](buf) + let res = write(fd.cint, addr cbuf[written], remainderSize.cint) + if res < 0: + let lastError = osLastError() + if lastError.int32 != EAGAIN: + retFuture.fail(newException(OSError, osErrorMsg(lastError))) + else: + result = false # We still want this callback to be called. + else: + written.inc res + f.offset.inc res + if res != remainderSize: + result = false # We still have data to write. + else: + retFuture.complete() + + if not cb(f.fd): + addWrite(f.fd, cb) + return retFuture + proc write*(f: AsyncFile, data: string): Future[void] = ## Writes ``data`` to the file specified asynchronously. ## diff --git a/lib/pure/asyncnet.nim b/lib/pure/asyncnet.nim index a1988f4a6..d92100831 100644 --- a/lib/pure/asyncnet.nim +++ b/lib/pure/asyncnet.nim @@ -193,7 +193,7 @@ proc connect*(socket: AsyncSocket, address: string, port: Port) {.async.} = sslSetConnectState(socket.sslHandle) sslLoop(socket, flags, sslDoHandshake(socket.sslHandle)) -template readInto(buf: cstring, size: int, socket: AsyncSocket, +template readInto(buf: pointer, size: int, socket: AsyncSocket, flags: set[SocketFlag]): int = ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``. Note that ## this is a template and not a proc. @@ -202,10 +202,10 @@ template readInto(buf: cstring, size: int, socket: AsyncSocket, when defineSsl: # SSL mode. sslLoop(socket, flags, - sslRead(socket.sslHandle, buf, size.cint)) + sslRead(socket.sslHandle, cast[cstring](buf), size.cint)) res = opResult else: - var recvIntoFut = recvInto(socket.fd.AsyncFD, buf, size, flags) + var recvIntoFut = recvBuffer(socket.fd.AsyncFD, buf, size, flags) yield recvIntoFut # Not in SSL mode. res = recvIntoFut.read() @@ -218,6 +218,54 @@ template readIntoBuf(socket: AsyncSocket, socket.bufLen = size size +proc recvBuffer*(socket: AsyncSocket, buf: pointer, size: int, + flags = {SocketFlag.SafeDisconn}): Future[int] {.async.} = + ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``. + ## + ## For buffered sockets this function will attempt to read all the requested + ## data. It will read this data in ``BufferSize`` chunks. + ## + ## For unbuffered sockets this function makes no effort to read + ## all the data requested. It will return as much data as the operating system + ## gives it. + ## + ## If socket is disconnected during the + ## recv operation then the future may complete with only a part of the + ## requested data. + ## + ## If socket is disconnected and no data is available + ## to be read then the future will complete with a value of ``0``. + if socket.isBuffered: + let originalBufPos = socket.currPos + + if socket.bufLen == 0: + let res = socket.readIntoBuf(flags - {SocketFlag.Peek}) + if res == 0: + return 0 + + var read = 0 + var cbuf = cast[cstring](buf) + while read < size: + if socket.currPos >= socket.bufLen: + if SocketFlag.Peek in flags: + # We don't want to get another buffer if we're peeking. + break + let res = socket.readIntoBuf(flags - {SocketFlag.Peek}) + if res == 0: + break + + let chunk = min(socket.bufLen-socket.currPos, size-read) + copyMem(addr(cbuf[read]), addr(socket.buffer[socket.currPos]), chunk) + read.inc(chunk) + socket.currPos.inc(chunk) + + if SocketFlag.Peek in flags: + # Restore old buffer cursor position. + socket.currPos = originalBufPos + result = read + else: + result = readInto(buf, size, socket, flags) + proc recv*(socket: AsyncSocket, size: int, flags = {SocketFlag.SafeDisconn}): Future[string] {.async.} = ## Reads **up to** ``size`` bytes from ``socket``. @@ -270,6 +318,19 @@ proc recv*(socket: AsyncSocket, size: int, let read = readInto(addr result[0], size, socket, flags) result.setLen(read) +proc sendBuffer*(socket: AsyncSocket, buf: pointer, size: int, + flags = {SocketFlag.SafeDisconn}) {.async.} = + ## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future will complete once all + ## data has been sent. + assert socket != nil + if socket.isSsl: + when defineSsl: + sslLoop(socket, flags, + sslWrite(socket.sslHandle, cast[pointer](buf), size.cint)) + await sendPendingSslData(socket, flags) + else: + await sendBuffer(socket.fd.AsyncFD, buf, size, flags) + proc send*(socket: AsyncSocket, data: string, flags = {SocketFlag.SafeDisconn}) {.async.} = ## Sends ``data`` to ``socket``. The returned future will complete once all |