diff options
author | Dominik Picheta <dominikpicheta@googlemail.com> | 2014-02-15 21:13:23 +0000 |
---|---|---|
committer | Dominik Picheta <dominikpicheta@googlemail.com> | 2014-02-15 21:13:23 +0000 |
commit | fb7598d25adee57f33549db97bb5de023f40a234 (patch) | |
tree | d8047ca0ee52a334868c3edf6858c4172acfb268 /lib/pure | |
parent | de8d47330152d3d973c6ff52b89c9d79b42d590c (diff) | |
download | Nim-fb7598d25adee57f33549db97bb5de023f40a234.tar.gz |
Async readLine now works. Fixes recv issues.
When using MSG_PEEK and data is retrieved ``lpNumberOfBytesRecvd`` will not be set to the number of bytes read by WSARecv. The buffer must therefore be checked to ensure it's empty when determining whether ``recv`` shall return "" to signal disconnection as we want to read as much data as has been received by the system.
Diffstat (limited to 'lib/pure')
-rw-r--r-- | lib/pure/asyncio2.nim | 74 |
1 files changed, 50 insertions, 24 deletions
diff --git a/lib/pure/asyncio2.nim b/lib/pure/asyncio2.nim index 1cf2292de..00cb60d57 100644 --- a/lib/pure/asyncio2.nim +++ b/lib/pure/asyncio2.nim @@ -99,7 +99,8 @@ when defined(windows): TCompletionData* = object sock: TSocketHandle - cb: proc (sock: TSocketHandle, errcode: TOSErrorCode) {.closure.} + cb: proc (sock: TSocketHandle, bytesTransferred: DWORD, + errcode: TOSErrorCode) {.closure.} PDispatcher* = ref object ioPort: THandle @@ -143,13 +144,15 @@ when defined(windows): # This is useful for ensuring the reliability of the overlapped struct. assert customOverlapped.data.sock == lpCompletionKey.TSocketHandle - customOverlapped.data.cb(customOverlapped.data.sock, TOSErrorCode(-1)) + customOverlapped.data.cb(customOverlapped.data.sock, + lpNumberOfBytesTransferred, TOSErrorCode(-1)) dealloc(customOverlapped) else: let errCode = OSLastError() if lpOverlapped != nil: assert customOverlapped.data.sock == lpCompletionKey.TSocketHandle - customOverlapped.data.cb(customOverlapped.data.sock, errCode) + customOverlapped.data.cb(customOverlapped.data.sock, + lpNumberOfBytesTransferred, errCode) dealloc(customOverlapped) else: if errCode.int32 == WAIT_TIMEOUT: @@ -248,7 +251,7 @@ when defined(windows): # http://blogs.msdn.com/b/oldnewthing/archive/2011/02/02/10123392.aspx var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped))) ol.data = TCompletionData(sock: socket, cb: - proc (sock: TSocketHandle, errcode: TOSErrorCode) = + proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) = if not retFuture.finished: if errcode == TOSErrorCode(-1): retFuture.complete(0) @@ -297,16 +300,19 @@ when defined(windows): var flagsio = flags.dword var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped))) ol.data = TCompletionData(sock: socket, cb: - proc (sock: TSocketHandle, errcode: TOSErrorCode) = + proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) = if not retFuture.finished: if errcode == TOSErrorCode(-1): - var data = newString(size) - copyMem(addr data[0], addr dataBuf.buf[0], size) - retFuture.complete($data) + if bytesCount == 0 and dataBuf.buf[0] == '\0': + retFuture.complete("") + else: + var data = newString(size) + copyMem(addr data[0], addr dataBuf.buf[0], size) + retFuture.complete($data) else: retFuture.fail(newException(EOS, osErrorMsg(errcode))) ) - + let ret = WSARecv(socket, addr dataBuf, 1, addr bytesReceived, addr flagsio, cast[POverlapped](ol), nil) if ret == -1: @@ -314,8 +320,13 @@ when defined(windows): if err.int32 != ERROR_IO_PENDING: retFuture.fail(newException(EOS, osErrorMsg(err))) dealloc(ol) - elif ret == 0 and bytesReceived == 0: - # Disconnected + elif ret == 0 and bytesReceived == 0 and dataBuf.buf[0] == '\0': + # We have to ensure that the buffer is empty because WSARecv will tell + # us immediatelly when it was disconnected, even when there is still + # data in the buffer. + # We want to give the user as much data as we can. So we only return + # the empty string (which signals a disconnection) when there is + # nothing left to read. retFuture.complete("") # TODO: "For message-oriented sockets, where a zero byte message is often # allowable, a failure with an error code of WSAEDISCON is used to @@ -343,7 +354,7 @@ when defined(windows): var bytesReceived, flags: DWord var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped))) ol.data = TCompletionData(sock: socket, cb: - proc (sock: TSocketHandle, errcode: TOSErrorCode) = + proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) = if not retFuture.finished: if errcode == TOSErrorCode(-1): retFuture.complete(0) @@ -404,7 +415,7 @@ when defined(windows): var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped))) ol.data = TCompletionData(sock: socket, cb: - proc (sock: TSocketHandle, errcode: TOSErrorCode) = + proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) = if not retFuture.finished: if errcode == TOSErrorCode(-1): completeAccept() @@ -525,6 +536,15 @@ proc processBody(node, retFutureSym: PNimrodNode): PNimrodNode {.compileTime.} = for i in 0 .. <node.len: result[i] = processBody(node[i], retFutureSym) +proc getName(node: PNimrodNode): string {.compileTime.} = + case node.kind + of nnkPostfix: + return $node[1].ident + of nnkIdent: + return $node.ident + else: + assert false + macro async*(prc: stmt): stmt {.immediate.} = expectKind(prc, nnkProcDef) @@ -553,7 +573,7 @@ macro async*(prc: stmt): stmt {.immediate.} = # -> var result: T # -> <proc_body> # -> complete(retFuture, result) - var iteratorNameSym = newIdentNode($prc[0].ident & "Iter") #genSym(nskIterator, $prc[0].ident & "Iter") + var iteratorNameSym = newIdentNode($prc[0].getName & "Iter") #genSym(nskIterator, $prc[0].ident & "Iter") var procBody = prc[6].processBody(retFutureSym) procBody.insert(0, newNimNode(nnkVarSection).add( newIdentDefs(newIdentNode("result"), prc[3][0][1]))) # -> var result: T @@ -568,7 +588,7 @@ macro async*(prc: stmt): stmt {.immediate.} = # -> var nameIterVar = nameIter # -> var first = nameIterVar() - var varNameIterSym = newIdentNode($prc[0].ident & "IterVar") #genSym(nskVar, $prc[0].ident & "IterVar") + var varNameIterSym = newIdentNode($prc[0].getName & "IterVar") #genSym(nskVar, $prc[0].ident & "IterVar") var varNameIter = newVarStmt(varNameIterSym, iteratorNameSym) outerProcBody.add varNameIter var varFirstSym = genSym(nskVar, "first") @@ -600,18 +620,14 @@ macro async*(prc: stmt): stmt {.immediate.} = echo(toStrLit(result)) proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.} = - ## Reads a line of data from ``socket``. + ## Reads a line of data from ``socket``. Returned future will complete once + ## a full line is read or an error occurs. ## ## If a full line is read ``\r\L`` is not ## added to ``line``, however if solely ``\r\L`` is read then ``line`` ## will be set to it. ## ## If the socket is disconnected, ``line`` will be set to ``""``. - ## - ## An EOS exception will be raised in the case of a socket error. - ## - ## A timeout can be specified in miliseconds, if data is not received within - ## the specified time an ETimeout exception will be raised. template addNLIfEmpty(): stmt = if result.len == 0: @@ -629,7 +645,7 @@ proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.} discard await p.recv(socket, 1) addNLIfEmpty() return - elif c == "\L": + elif c == "\L": addNLIfEmpty() return add(result.string, c) @@ -645,14 +661,24 @@ when isMainModule: when true: # Await tests proc main(p: PDispatcher): PFuture[int] {.async.} = - discard await p.connect(sock, "localhost", TPort(6667)) + discard await p.connect(sock, "irc.freenode.net", TPort(6667)) while true: var line = await p.recvLine(sock) echo("Line is: ", line.repr) if line == "": echo "Disconnected" break - + + proc peekTest(p: PDispatcher): PFuture[int] {.async.} = + discard await p.connect(sock, "localhost", TPort(6667)) + while true: + var line = await p.recv(sock, 1, MSG_PEEK) + var line2 = await p.recv(sock, 1) + echo(line.repr) + echo(line2.repr) + echo("---") + if line2 == "": break + sleep(500) var f = main(p) |