diff options
Diffstat (limited to 'lib/pure/asyncdispatch.nim')
-rw-r--r-- | lib/pure/asyncdispatch.nim | 279 |
1 files changed, 223 insertions, 56 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index 4bdce9cfb..a4d7a1632 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -1,7 +1,7 @@ # # # Nim's Runtime Library -# (c) Copyright 2014 Dominik Picheta +# (c) Copyright 2015 Dominik Picheta # # See the file "copying.txt", included in this # distribution, for details about the copyright. @@ -121,7 +121,7 @@ export Port, SocketFlag ## ## Limitations/Bugs ## ---------------- -## +## ## * ``except`` statement (without `try`) does not work inside async procedures. ## * The effect system (``raises: []``) does not work with async procedures. ## * Can't await in a ``except`` body @@ -379,7 +379,7 @@ when defined(windows) or defined(nimdoc): if p.handles.len == 0 and p.timers.len == 0: raise newException(ValueError, "No handles or timers registered in dispatcher.") - + let llTimeout = if timeout == -1: winlean.INFINITE else: timeout.int32 @@ -436,12 +436,12 @@ when defined(windows) or defined(nimdoc): if not initPointer(dummySock, getAcceptExSockAddrsPtr, WSAID_GETACCEPTEXSOCKADDRS): raiseOSError(osLastError()) - proc connectEx(s: SocketHandle, name: ptr SockAddr, namelen: cint, + proc connectEx(s: SocketHandle, name: ptr SockAddr, namelen: cint, lpSendBuffer: pointer, dwSendDataLength: Dword, lpdwBytesSent: PDword, lpOverlapped: POVERLAPPED): bool = if connectExPtr.isNil: raise newException(ValueError, "Need to initialise ConnectEx().") let fun = - cast[proc (s: SocketHandle, name: ptr SockAddr, namelen: cint, + cast[proc (s: SocketHandle, name: ptr SockAddr, namelen: cint, lpSendBuffer: pointer, dwSendDataLength: Dword, lpdwBytesSent: PDword, lpOverlapped: POVERLAPPED): bool {.stdcall,gcsafe.}](connectExPtr) @@ -475,7 +475,7 @@ when defined(windows) or defined(nimdoc): dwRemoteAddressLength: Dword, LocalSockaddr: ptr ptr SockAddr, LocalSockaddrLength: LPInt, RemoteSockaddr: ptr ptr SockAddr, RemoteSockaddrLength: LPInt) {.stdcall,gcsafe.}](getAcceptExSockAddrsPtr) - + fun(lpOutputBuffer, dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength, LocalSockaddr, LocalSockaddrLength, RemoteSockaddr, RemoteSockaddrLength) @@ -514,7 +514,7 @@ when defined(windows) or defined(nimdoc): else: retFuture.fail(newException(OSError, osErrorMsg(errcode))) ) - + var ret = connectEx(socket.SocketHandle, it.ai_addr, sizeof(Sockaddr_in).cint, nil, 0, nil, cast[POVERLAPPED](ol)) @@ -565,7 +565,7 @@ when defined(windows) or defined(nimdoc): var dataBuf: TWSABuf dataBuf.buf = cast[cstring](alloc0(size)) dataBuf.len = size - + var bytesReceived: Dword var flagsio = flags.toOSFlags().Dword var ol = PCustomOverlapped() @@ -606,15 +606,15 @@ when defined(windows) or defined(nimdoc): retFuture.fail(newException(OSError, osErrorMsg(err))) elif ret == 0 and bytesReceived == 0 and dataBuf.buf[0] == '\0': # We have to ensure that the buffer is empty because WSARecv will tell - # us immediatelly when it was disconnected, even when there is still + # us immediately when it was disconnected, even when there is still # data in the buffer. # We want to give the user as much data as we can. So we only return # the empty string (which signals a disconnection) when there is # nothing left to read. retFuture.complete("") - # TODO: "For message-oriented sockets, where a zero byte message is often - # allowable, a failure with an error code of WSAEDISCON is used to - # indicate graceful closure." + # TODO: "For message-oriented sockets, where a zero byte message is often + # allowable, a failure with an error code of WSAEDISCON is used to + # indicate graceful closure." # ~ http://msdn.microsoft.com/en-us/library/ms741688%28v=vs.85%29.aspx else: # Request to read completed immediately. @@ -634,6 +634,93 @@ when defined(windows) or defined(nimdoc): # free ``ol``. return retFuture + proc recvInto*(socket: TAsyncFD, buf: cstring, size: int, + flags = {SocketFlag.SafeDisconn}): Future[int] = + ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``, which must + ## at least be of that size. Returned future will complete once all the + ## data requested is read, a part of the data has been read, or the socket + ## has disconnected in which case the future will complete with a value of + ## ``0``. + ## + ## **Warning**: The ``Peek`` socket flag is not supported on Windows. + + + # Things to note: + # * When WSARecv completes immediately then ``bytesReceived`` is very + # unreliable. + # * Still need to implement message-oriented socket disconnection, + # '\0' in the message currently signifies a socket disconnect. Who + # knows what will happen when someone sends that to our socket. + verifyPresence(socket) + assert SocketFlag.Peek notin flags, "Peek not supported on Windows." + + var retFuture = newFuture[int]("recvInto") + + #buf[] = '\0' + var dataBuf: TWSABuf + dataBuf.buf = buf + dataBuf.len = size + + var bytesReceived: Dword + var flagsio = flags.toOSFlags().Dword + var ol = PCustomOverlapped() + GC_ref(ol) + ol.data = TCompletionData(fd: socket, cb: + proc (fd: TAsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + if not retFuture.finished: + if errcode == OSErrorCode(-1): + if bytesCount == 0 and dataBuf.buf[0] == '\0': + retFuture.complete(0) + else: + retFuture.complete(bytesCount) + else: + if flags.isDisconnectionError(errcode): + retFuture.complete(0) + else: + retFuture.fail(newException(OSError, osErrorMsg(errcode))) + if dataBuf.buf != nil: + dataBuf.buf = nil + ) + + let ret = WSARecv(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived, + addr flagsio, cast[POVERLAPPED](ol), nil) + if ret == -1: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + if dataBuf.buf != nil: + dataBuf.buf = nil + GC_unref(ol) + if flags.isDisconnectionError(err): + retFuture.complete(0) + else: + retFuture.fail(newException(OSError, osErrorMsg(err))) + elif ret == 0 and bytesReceived == 0 and dataBuf.buf[0] == '\0': + # We have to ensure that the buffer is empty because WSARecv will tell + # us immediately when it was disconnected, even when there is still + # data in the buffer. + # We want to give the user as much data as we can. So we only return + # the empty string (which signals a disconnection) when there is + # nothing left to read. + retFuture.complete(0) + # TODO: "For message-oriented sockets, where a zero byte message is often + # allowable, a failure with an error code of WSAEDISCON is used to + # indicate graceful closure." + # ~ http://msdn.microsoft.com/en-us/library/ms741688%28v=vs.85%29.aspx + else: + # Request to read completed immediately. + # From my tests bytesReceived isn't reliable. + let realSize = + if bytesReceived == 0: + size + else: + bytesReceived + assert realSize <= size + retFuture.complete(realSize) + # We don't deallocate ``ol`` here because even though this completed + # immediately poll will still be notified about its completion and it will + # free ``ol``. + return retFuture + proc send*(socket: TAsyncFD, data: string, flags = {SocketFlag.SafeDisconn}): Future[void] = ## Sends ``data`` to ``socket``. The returned future will complete once all @@ -748,7 +835,7 @@ when defined(windows) or defined(nimdoc): # http://msdn.microsoft.com/en-us/library/windows/desktop/ms737524%28v=vs.85%29.aspx let ret = acceptEx(socket.SocketHandle, clientSock, addr lpOutputBuf[0], - dwReceiveDataLength, + dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength, addr dwBytesReceived, cast[POVERLAPPED](ol)) @@ -803,7 +890,7 @@ else: else: from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK, MSG_NOSIGNAL - + type TAsyncFD* = distinct cint TCallback = proc (fd: TAsyncFD): bool {.closure,gcsafe.} @@ -841,6 +928,8 @@ else: proc newAsyncRawSocket*(domain: cint, typ: cint, protocol: cint): TAsyncFD = result = newRawSocket(domain, typ, protocol).TAsyncFD result.SocketHandle.setBlocking(false) + when defined(macosx): + result.SocketHandle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1) register(result) proc newAsyncRawSocket*(domain: Domain = AF_INET, @@ -848,8 +937,10 @@ else: protocol: Protocol = IPPROTO_TCP): TAsyncFD = result = newRawSocket(domain, typ, protocol).TAsyncFD result.SocketHandle.setBlocking(false) + when defined(macosx): + result.SocketHandle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1) register(result) - + proc closeSocket*(sock: TAsyncFD) = let disp = getGlobalDispatcher() sock.SocketHandle.close() @@ -864,20 +955,24 @@ else: raise newException(ValueError, "File descriptor not registered.") p.selector[fd.SocketHandle].data.PData.readCBs.add(cb) update(fd, p.selector[fd.SocketHandle].events + {EvRead}) - + proc addWrite*(fd: TAsyncFD, cb: TCallback) = let p = getGlobalDispatcher() if fd.SocketHandle notin p.selector: raise newException(ValueError, "File descriptor not registered.") p.selector[fd.SocketHandle].data.PData.writeCBs.add(cb) update(fd, p.selector[fd.SocketHandle].events + {EvWrite}) - + proc poll*(timeout = 500) = let p = getGlobalDispatcher() for info in p.selector.select(timeout): let data = PData(info.key.data) assert data.fd == info.key.fd.TAsyncFD #echo("In poll ", data.fd.cint) + if EvError in info.events: + closeSocket(data.fd) + continue + if EvRead in info.events: # Callback may add items to ``data.readCBs`` which causes issues if # we are iterating over ``data.readCBs`` at the same time. We therefore @@ -888,7 +983,7 @@ else: if not cb(data.fd): # Callback wants to be called again. data.readCBs.add(cb) - + if EvWrite in info.events: let currentCBs = data.writeCBs data.writeCBs = @[] @@ -896,7 +991,7 @@ else: if not cb(data.fd): # Callback wants to be called again. data.writeCBs.add(cb) - + if info.key in p.selector: var newEvents: set[Event] if data.readCBs.len != 0: newEvents = {EvRead} @@ -909,16 +1004,16 @@ else: discard processTimers(p) - + proc connect*(socket: TAsyncFD, address: string, port: Port, af = AF_INET): Future[void] = var retFuture = newFuture[void]("connect") - + proc cb(fd: TAsyncFD): bool = # We have connected. retFuture.complete() return true - + var aiList = getAddrInfo(address, port, af) var success = false var lastError: OSErrorCode @@ -948,14 +1043,13 @@ else: proc recv*(socket: TAsyncFD, size: int, flags = {SocketFlag.SafeDisconn}): Future[string] = var retFuture = newFuture[string]("recv") - + var readBuffer = newString(size) proc cb(sock: TAsyncFD): bool = result = true let res = recv(sock.SocketHandle, addr readBuffer[0], size.cint, flags.toOSFlags()) - #echo("recv cb res: ", res) if res < 0: let lastError = osLastError() if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: @@ -976,12 +1070,36 @@ else: addRead(socket, cb) return retFuture + proc recvInto*(socket: TAsyncFD, buf: cstring, size: int, + flags = {SocketFlag.SafeDisconn}): Future[int] = + var retFuture = newFuture[int]("recvInto") + + proc cb(sock: TAsyncFD): bool = + result = true + let res = recv(sock.SocketHandle, buf, size.cint, + flags.toOSFlags()) + if res < 0: + let lastError = osLastError() + if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: + if flags.isDisconnectionError(lastError): + retFuture.complete(0) + else: + retFuture.fail(newException(OSError, osErrorMsg(lastError))) + else: + result = false # We still want this callback to be called. + else: + retFuture.complete(res) + # TODO: The following causes a massive slowdown. + #if not cb(socket): + addRead(socket, cb) + return retFuture + proc send*(socket: TAsyncFD, data: string, flags = {SocketFlag.SafeDisconn}): Future[void] = var retFuture = newFuture[void]("send") - + var written = 0 - + proc cb(sock: TAsyncFD): bool = result = true let netSize = data.len-written @@ -1060,6 +1178,17 @@ proc accept*(socket: TAsyncFD, # -- Await Macro +proc skipUntilStmtList(node: NimNode): NimNode {.compileTime.} = + # Skips a nest of StmtList's. + result = node + if node[0].kind == nnkStmtList: + result = skipUntilStmtList(node[0]) + +proc skipStmtList(node: NimNode): NimNode {.compileTime.} = + result = node + if node[0].kind == nnkStmtList: + result = node[0] + template createCb(retFutureSym, iteratorNameSym, name: expr): stmt {.immediate.} = var nameIterVar = iteratorNameSym @@ -1083,11 +1212,11 @@ template createCb(retFutureSym, iteratorNameSym, cb() #{.pop.} proc generateExceptionCheck(futSym, - tryStmt, rootReceiver, fromNode: PNimrodNode): PNimrodNode {.compileTime.} = + tryStmt, rootReceiver, fromNode: NimNode): NimNode {.compileTime.} = if tryStmt.kind == nnkNilLit: result = rootReceiver else: - var exceptionChecks: seq[tuple[cond, body: PNimrodNode]] = @[] + var exceptionChecks: seq[tuple[cond, body: NimNode]] = @[] let errorNode = newDotExpr(futSym, newIdentNode("error")) for i in 1 .. <tryStmt.len: let exceptBranch = tryStmt[i] @@ -1095,7 +1224,7 @@ proc generateExceptionCheck(futSym, exceptionChecks.add((newIdentNode("true"), exceptBranch[0])) else: var exceptIdentCount = 0 - var ifCond: PNimrodNode + var ifCond: NimNode for i in 0 .. <exceptBranch.len: let child = exceptBranch[i] if child.kind == nnkIdent: @@ -1129,10 +1258,10 @@ proc generateExceptionCheck(futSym, ) result.add elseNode -template createVar(result: var PNimrodNode, futSymName: string, - asyncProc: PNimrodNode, +template createVar(result: var NimNode, futSymName: string, + asyncProc: NimNode, valueReceiver, rootReceiver: expr, - fromNode: PNimrodNode) = + fromNode: NimNode) = result = newNimNode(nnkStmtList, fromNode) var futSym = genSym(nskVar, "future") result.add newVarStmt(futSym, asyncProc) # -> var future<x> = y @@ -1140,9 +1269,9 @@ template createVar(result: var PNimrodNode, futSymName: string, valueReceiver = newDotExpr(futSym, newIdentNode("read")) # -> future<x>.read result.add generateExceptionCheck(futSym, tryStmt, rootReceiver, fromNode) -proc processBody(node, retFutureSym: PNimrodNode, +proc processBody(node, retFutureSym: NimNode, subTypeIsVoid: bool, - tryStmt: PNimrodNode): PNimrodNode {.compileTime.} = + tryStmt: NimNode): NimNode {.compileTime.} = #echo(node.treeRepr) result = node case node.kind @@ -1168,7 +1297,7 @@ proc processBody(node, retFutureSym: PNimrodNode, result = newNimNode(nnkYieldStmt, node).add(node[1]) # -> yield x of nnkCall, nnkCommand: # await foo(p, x) - var futureValue: PNimrodNode + var futureValue: NimNode result.createVar("future" & $node[1][0].toStrLit, node[1], futureValue, futureValue, node) else: @@ -1207,33 +1336,60 @@ proc processBody(node, retFutureSym: PNimrodNode, of nnkTryStmt: # try: await x; except: ... result = newNimNode(nnkStmtList, node) - proc processForTry(n: PNimrodNode, i: var int, - res: PNimrodNode): bool {.compileTime.} = + template wrapInTry(n, tryBody: expr) = + var temp = n + n[0] = tryBody + tryBody = temp + + # Transform ``except`` body. + # TODO: Could we perform some ``await`` transformation here to get it + # working in ``except``? + tryBody[1] = processBody(n[1], retFutureSym, subTypeIsVoid, nil) + + proc processForTry(n: NimNode, i: var int, + res: NimNode): bool {.compileTime.} = + ## Transforms the body of the tryStmt. Does not transform the + ## body in ``except``. + ## Returns true if the tryStmt node was transformed into an ifStmt. result = false - while i < n[0].len: - var processed = processBody(n[0][i], retFutureSym, subTypeIsVoid, n) - if processed.kind != n[0][i].kind or processed.len != n[0][i].len: + var skipped = n.skipStmtList() + while i < skipped.len: + var processed = processBody(skipped[i], retFutureSym, + subTypeIsVoid, n) + + # Check if we transformed the node into an exception check. + # This suggests skipped[i] contains ``await``. + if processed.kind != skipped[i].kind or processed.len != skipped[i].len: + processed = processed.skipUntilStmtList() expectKind(processed, nnkStmtList) expectKind(processed[2][1], nnkElse) i.inc - discard processForTry(n, i, processed[2][1][0]) + + if not processForTry(n, i, processed[2][1][0]): + # We need to wrap the nnkElse nodes back into a tryStmt. + # As they are executed if an exception does not happen + # inside the awaited future. + # The following code will wrap the nodes inside the + # original tryStmt. + wrapInTry(n, processed[2][1][0]) + res.add processed result = true else: - res.add n[0][i] + res.add skipped[i] i.inc var i = 0 if not processForTry(node, i, result): - var temp = node - temp[0] = result - result = temp + # If the tryStmt hasn't been transformed we can just put the body + # back into it. + wrapInTry(node, result) return else: discard for i in 0 .. <result.len: - result[i] = processBody(result[i], retFutureSym, subTypeIsVoid, tryStmt) + result[i] = processBody(result[i], retFutureSym, subTypeIsVoid, nil) -proc getName(node: PNimrodNode): string {.compileTime.} = +proc getName(node: NimNode): string {.compileTime.} = case node.kind of nnkPostfix: return $node[1].ident @@ -1273,29 +1429,40 @@ macro async*(prc: stmt): stmt {.immediate.} = if returnType.kind == nnkEmpty: newIdentNode("void") else: returnType[1] outerProcBody.add( - newVarStmt(retFutureSym, + newVarStmt(retFutureSym, newCall( newNimNode(nnkBracketExpr, prc[6]).add( newIdentNode(!"newFuture"), # TODO: Strange bug here? Remove the `!`. subRetType), newLit(prc[0].getName)))) # Get type from return type of this proc - - # -> iterator nameIter(): FutureBase {.closure.} = + + # -> iterator nameIter(): FutureBase {.closure.} = + # -> {.push warning[resultshadowed]: off.} # -> var result: T + # -> {.pop.} # -> <proc_body> # -> complete(retFuture, result) var iteratorNameSym = genSym(nskIterator, $prc[0].getName & "Iter") var procBody = prc[6].processBody(retFutureSym, subtypeIsVoid, nil) if not subtypeIsVoid: - procBody.insert(0, newNimNode(nnkVarSection, prc[6]).add( + procBody.insert(0, newNimNode(nnkPragma).add(newIdentNode("push"), + newNimNode(nnkExprColonExpr).add(newNimNode(nnkBracketExpr).add( + newIdentNode("warning"), newIdentNode("resultshadowed")), + newIdentNode("off")))) # -> {.push warning[resultshadowed]: off.} + + procBody.insert(1, newNimNode(nnkVarSection, prc[6]).add( newIdentDefs(newIdentNode("result"), returnType[1]))) # -> var result: T + + procBody.insert(2, newNimNode(nnkPragma).add( + newIdentNode("pop"))) # -> {.pop.}) + procBody.add( newCall(newIdentNode("complete"), retFutureSym, newIdentNode("result"))) # -> complete(retFuture, result) else: # -> complete(retFuture) procBody.add(newCall(newIdentNode("complete"), retFutureSym)) - + var closureIterator = newProc(iteratorNameSym, [newIdentNode("FutureBase")], procBody, nnkIteratorDef) closureIterator[4] = newNimNode(nnkPragma, prc[6]).add(newIdentNode("closure")) @@ -1309,7 +1476,7 @@ macro async*(prc: stmt): stmt {.immediate.} = # -> return retFuture outerProcBody.add newNimNode(nnkReturnStmt, prc[6][prc[6].len-1]).add(retFutureSym) - + result = prc # Remove the 'async' pragma. @@ -1325,7 +1492,7 @@ macro async*(prc: stmt): stmt {.immediate.} = result[6] = outerProcBody #echo(treeRepr(result)) - #if prc[0].getName == "catch": + #if prc[0].getName == "test": # echo(toStrLit(result)) proc recvLine*(socket: TAsyncFD): Future[string] {.async.} = @@ -1335,7 +1502,7 @@ proc recvLine*(socket: TAsyncFD): Future[string] {.async.} = ## If a full line is read ``\r\L`` is not ## added to ``line``, however if solely ``\r\L`` is read then ``line`` ## will be set to it. - ## + ## ## If the socket is disconnected, ``line`` will be set to ``""``. ## ## If the socket is disconnected in the middle of a line (before ``\r\L`` @@ -1346,7 +1513,7 @@ proc recvLine*(socket: TAsyncFD): Future[string] {.async.} = ## ## **Note**: This procedure is mostly used for testing. You likely want to ## use ``asyncnet.recvLine`` instead. - + template addNLIfEmpty(): stmt = if result.len == 0: result.add("\c\L") |