diff options
author | Dominik Picheta <dominikpicheta@gmail.com> | 2016-09-06 20:10:19 +0200 |
---|---|---|
committer | Dominik Picheta <dominikpicheta@gmail.com> | 2016-09-06 20:10:19 +0200 |
commit | 3760a8b973781ac3a9b06fa1c054c489f9c7ce3e (patch) | |
tree | 0344713953563b0ef66b91738e92e5b8a5cf1efd /lib/upcoming | |
parent | e554c1d7cfa8dfded932aee631f873d359f84972 (diff) | |
parent | 60dbfb2ec90d8ad080ae1b202fcc086cb2cf911c (diff) | |
download | Nim-3760a8b973781ac3a9b06fa1c054c489f9c7ce3e.tar.gz |
Merge branch 'async_buffers_v2' of https://github.com/vegansk/Nim into vegansk-async_buffers_v2
Merges #4683.
Diffstat (limited to 'lib/upcoming')
-rw-r--r-- | lib/upcoming/asyncdispatch.nim | 90 |
1 files changed, 82 insertions, 8 deletions
diff --git a/lib/upcoming/asyncdispatch.nim b/lib/upcoming/asyncdispatch.nim index 72ba4efd6..52e7ed552 100644 --- a/lib/upcoming/asyncdispatch.nim +++ b/lib/upcoming/asyncdispatch.nim @@ -718,7 +718,7 @@ when defined(windows) or defined(nimdoc): retFuture.complete("") return retFuture - proc recvInto*(socket: AsyncFD, buf: cstring, size: int, + proc recvInto*(socket: AsyncFD, buf: pointer, size: int, flags = {SocketFlag.SafeDisconn}): Future[int] = ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``, which must ## at least be of that size. Returned future will complete once all the @@ -742,7 +742,7 @@ when defined(windows) or defined(nimdoc): #buf[] = '\0' var dataBuf: TWSABuf - dataBuf.buf = buf + dataBuf.buf = cast[cstring](buf) dataBuf.len = size.ULONG var bytesReceived: Dword @@ -753,10 +753,7 @@ when defined(windows) or defined(nimdoc): proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = if not retFuture.finished: if errcode == OSErrorCode(-1): - if bytesCount == 0 and dataBuf.buf[0] == '\0': - retFuture.complete(0) - else: - retFuture.complete(bytesCount) + retFuture.complete(bytesCount) else: if flags.isDisconnectionError(errcode): retFuture.complete(0) @@ -788,6 +785,51 @@ when defined(windows) or defined(nimdoc): retFuture.complete(bytesReceived) return retFuture + proc send*(socket: AsyncFD, buf: pointer, size: int, + flags = {SocketFlag.SafeDisconn}): Future[void] = + ## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future will complete once all + ## data has been sent. + ## **WARNING**: Use it with caution. If ``buf`` refers to GC'ed object, you must use GC_ref/GC_unref calls + ## to avoid early freeing of the buffer + verifyPresence(socket) + var retFuture = newFuture[void]("send") + + var dataBuf: TWSABuf + dataBuf.buf = cast[cstring](buf) + dataBuf.len = size.ULONG + + var bytesReceived, lowFlags: Dword + var ol = PCustomOverlapped() + GC_ref(ol) + ol.data = CompletionData(fd: socket, cb: + proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + if not retFuture.finished: + if errcode == OSErrorCode(-1): + retFuture.complete() + else: + if flags.isDisconnectionError(errcode): + retFuture.complete() + else: + retFuture.fail(newException(OSError, osErrorMsg(errcode))) + ) + + let ret = WSASend(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived, + lowFlags, cast[POVERLAPPED](ol), nil) + if ret == -1: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + GC_unref(ol) + if flags.isDisconnectionError(err): + retFuture.complete() + else: + retFuture.fail(newException(OSError, osErrorMsg(err))) + else: + retFuture.complete() + # We don't deallocate ``ol`` here because even though this completed + # immediately poll will still be notified about its completion and it will + # free ``ol``. + return retFuture + proc send*(socket: AsyncFD, data: string, flags = {SocketFlag.SafeDisconn}): Future[void] = ## Sends ``data`` to ``socket``. The returned future will complete once all @@ -797,9 +839,9 @@ when defined(windows) or defined(nimdoc): var dataBuf: TWSABuf dataBuf.buf = data - dataBuf.len = data.len.ULONG GC_ref(data) # we need to protect data until send operation is completed # or failed. + dataBuf.len = data.len.ULONG var bytesReceived, lowFlags: Dword var ol = PCustomOverlapped() @@ -1537,7 +1579,7 @@ else: addRead(socket, cb) return retFuture - proc recvInto*(socket: AsyncFD, buf: cstring, size: int, + proc recvInto*(socket: AsyncFD, buf: pointer, size: int, flags = {SocketFlag.SafeDisconn}): Future[int] = var retFuture = newFuture[int]("recvInto") @@ -1561,6 +1603,38 @@ else: addRead(socket, cb) return retFuture + proc send*(socket: AsyncFD, buf: pointer, size: int, + flags = {SocketFlag.SafeDisconn}): Future[void] = + var retFuture = newFuture[void]("send") + + var written = 0 + + proc cb(sock: AsyncFD): bool = + result = true + let netSize = size-written + var d = cast[cstring](buf) + let res = send(sock.SocketHandle, addr d[written], netSize.cint, + MSG_NOSIGNAL) + if res < 0: + let lastError = osLastError() + if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: + if flags.isDisconnectionError(lastError): + retFuture.complete() + else: + retFuture.fail(newException(OSError, osErrorMsg(lastError))) + else: + result = false # We still want this callback to be called. + else: + written.inc(res) + if res != netSize: + result = false # We still have data to send. + else: + retFuture.complete() + # TODO: The following causes crashes. + #if not cb(socket): + addWrite(socket, cb) + return retFuture + proc send*(socket: AsyncFD, data: string, flags = {SocketFlag.SafeDisconn}): Future[void] = var retFuture = newFuture[void]("send") |