diff options
Diffstat (limited to 'lib/pure/asyncdispatch.nim')
-rw-r--r-- | lib/pure/asyncdispatch.nim | 122 |
1 files changed, 118 insertions, 4 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index 8e0ac8d21..a4d7a1632 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -634,6 +634,93 @@ when defined(windows) or defined(nimdoc): # free ``ol``. return retFuture + proc recvInto*(socket: TAsyncFD, buf: cstring, size: int, + flags = {SocketFlag.SafeDisconn}): Future[int] = + ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``, which must + ## at least be of that size. Returned future will complete once all the + ## data requested is read, a part of the data has been read, or the socket + ## has disconnected in which case the future will complete with a value of + ## ``0``. + ## + ## **Warning**: The ``Peek`` socket flag is not supported on Windows. + + + # Things to note: + # * When WSARecv completes immediately then ``bytesReceived`` is very + # unreliable. + # * Still need to implement message-oriented socket disconnection, + # '\0' in the message currently signifies a socket disconnect. Who + # knows what will happen when someone sends that to our socket. + verifyPresence(socket) + assert SocketFlag.Peek notin flags, "Peek not supported on Windows." + + var retFuture = newFuture[int]("recvInto") + + #buf[] = '\0' + var dataBuf: TWSABuf + dataBuf.buf = buf + dataBuf.len = size + + var bytesReceived: Dword + var flagsio = flags.toOSFlags().Dword + var ol = PCustomOverlapped() + GC_ref(ol) + ol.data = TCompletionData(fd: socket, cb: + proc (fd: TAsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + if not retFuture.finished: + if errcode == OSErrorCode(-1): + if bytesCount == 0 and dataBuf.buf[0] == '\0': + retFuture.complete(0) + else: + retFuture.complete(bytesCount) + else: + if flags.isDisconnectionError(errcode): + retFuture.complete(0) + else: + retFuture.fail(newException(OSError, osErrorMsg(errcode))) + if dataBuf.buf != nil: + dataBuf.buf = nil + ) + + let ret = WSARecv(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived, + addr flagsio, cast[POVERLAPPED](ol), nil) + if ret == -1: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + if dataBuf.buf != nil: + dataBuf.buf = nil + GC_unref(ol) + if flags.isDisconnectionError(err): + retFuture.complete(0) + else: + retFuture.fail(newException(OSError, osErrorMsg(err))) + elif ret == 0 and bytesReceived == 0 and dataBuf.buf[0] == '\0': + # We have to ensure that the buffer is empty because WSARecv will tell + # us immediately when it was disconnected, even when there is still + # data in the buffer. + # We want to give the user as much data as we can. So we only return + # the empty string (which signals a disconnection) when there is + # nothing left to read. + retFuture.complete(0) + # TODO: "For message-oriented sockets, where a zero byte message is often + # allowable, a failure with an error code of WSAEDISCON is used to + # indicate graceful closure." + # ~ http://msdn.microsoft.com/en-us/library/ms741688%28v=vs.85%29.aspx + else: + # Request to read completed immediately. + # From my tests bytesReceived isn't reliable. + let realSize = + if bytesReceived == 0: + size + else: + bytesReceived + assert realSize <= size + retFuture.complete(realSize) + # We don't deallocate ``ol`` here because even though this completed + # immediately poll will still be notified about its completion and it will + # free ``ol``. + return retFuture + proc send*(socket: TAsyncFD, data: string, flags = {SocketFlag.SafeDisconn}): Future[void] = ## Sends ``data`` to ``socket``. The returned future will complete once all @@ -841,6 +928,8 @@ else: proc newAsyncRawSocket*(domain: cint, typ: cint, protocol: cint): TAsyncFD = result = newRawSocket(domain, typ, protocol).TAsyncFD result.SocketHandle.setBlocking(false) + when defined(macosx): + result.SocketHandle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1) register(result) proc newAsyncRawSocket*(domain: Domain = AF_INET, @@ -848,6 +937,8 @@ else: protocol: Protocol = IPPROTO_TCP): TAsyncFD = result = newRawSocket(domain, typ, protocol).TAsyncFD result.SocketHandle.setBlocking(false) + when defined(macosx): + result.SocketHandle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1) register(result) proc closeSocket*(sock: TAsyncFD) = @@ -959,7 +1050,6 @@ else: result = true let res = recv(sock.SocketHandle, addr readBuffer[0], size.cint, flags.toOSFlags()) - #echo("recv cb res: ", res) if res < 0: let lastError = osLastError() if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: @@ -980,6 +1070,30 @@ else: addRead(socket, cb) return retFuture + proc recvInto*(socket: TAsyncFD, buf: cstring, size: int, + flags = {SocketFlag.SafeDisconn}): Future[int] = + var retFuture = newFuture[int]("recvInto") + + proc cb(sock: TAsyncFD): bool = + result = true + let res = recv(sock.SocketHandle, buf, size.cint, + flags.toOSFlags()) + if res < 0: + let lastError = osLastError() + if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: + if flags.isDisconnectionError(lastError): + retFuture.complete(0) + else: + retFuture.fail(newException(OSError, osErrorMsg(lastError))) + else: + result = false # We still want this callback to be called. + else: + retFuture.complete(res) + # TODO: The following causes a massive slowdown. + #if not cb(socket): + addRead(socket, cb) + return retFuture + proc send*(socket: TAsyncFD, data: string, flags = {SocketFlag.SafeDisconn}): Future[void] = var retFuture = newFuture[void]("send") @@ -1273,7 +1387,7 @@ proc processBody(node, retFutureSym: NimNode, else: discard for i in 0 .. <result.len: - result[i] = processBody(result[i], retFutureSym, subTypeIsVoid, tryStmt) + result[i] = processBody(result[i], retFutureSym, subTypeIsVoid, nil) proc getName(node: NimNode): string {.compileTime.} = case node.kind @@ -1378,8 +1492,8 @@ macro async*(prc: stmt): stmt {.immediate.} = result[6] = outerProcBody #echo(treeRepr(result)) - if prc[0].getName == "test3": - echo(toStrLit(result)) + #if prc[0].getName == "test": + # echo(toStrLit(result)) proc recvLine*(socket: TAsyncFD): Future[string] {.async.} = ## Reads a line of data from ``socket``. Returned future will complete once |