diff options
Diffstat (limited to 'lib/pure/asyncdispatch.nim')
-rw-r--r-- | lib/pure/asyncdispatch.nim | 212 |
1 files changed, 134 insertions, 78 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index 41b20cb35..27f77cef2 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -1,7 +1,7 @@ # # # Nim's Runtime Library -# (c) Copyright 2014 Dominik Picheta +# (c) Copyright 2015 Dominik Picheta # # See the file "copying.txt", included in this # distribution, for details about the copyright. @@ -121,7 +121,7 @@ export Port, SocketFlag ## ## Limitations/Bugs ## ---------------- -## +## ## * ``except`` statement (without `try`) does not work inside async procedures. ## * The effect system (``raises: []``) does not work with async procedures. ## * Can't await in a ``except`` body @@ -379,7 +379,7 @@ when defined(windows) or defined(nimdoc): if p.handles.len == 0 and p.timers.len == 0: raise newException(ValueError, "No handles or timers registered in dispatcher.") - + let llTimeout = if timeout == -1: winlean.INFINITE else: timeout.int32 @@ -419,12 +419,12 @@ when defined(windows) or defined(nimdoc): var acceptExPtr: pointer = nil var getAcceptExSockAddrsPtr: pointer = nil - proc initPointer(s: SocketHandle, func: var pointer, guid: var TGUID): bool = + proc initPointer(s: SocketHandle, fun: var pointer, guid: var TGUID): bool = # Ref: https://github.com/powdahound/twisted/blob/master/twisted/internet/iocpreactor/iocpsupport/winsock_pointers.c var bytesRet: Dword - func = nil + fun = nil result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid, - sizeof(TGUID).Dword, addr func, sizeof(pointer).Dword, + sizeof(TGUID).Dword, addr fun, sizeof(pointer).Dword, addr bytesRet, nil, nil) == 0 proc initAll() = @@ -436,16 +436,16 @@ when defined(windows) or defined(nimdoc): if not initPointer(dummySock, getAcceptExSockAddrsPtr, WSAID_GETACCEPTEXSOCKADDRS): raiseOSError(osLastError()) - proc connectEx(s: SocketHandle, name: ptr TSockAddr, namelen: cint, + proc connectEx(s: SocketHandle, name: ptr SockAddr, namelen: cint, lpSendBuffer: pointer, dwSendDataLength: Dword, lpdwBytesSent: PDword, lpOverlapped: POVERLAPPED): bool = if connectExPtr.isNil: raise newException(ValueError, "Need to initialise ConnectEx().") - let func = - cast[proc (s: SocketHandle, name: ptr TSockAddr, namelen: cint, + let fun = + cast[proc (s: SocketHandle, name: ptr SockAddr, namelen: cint, lpSendBuffer: pointer, dwSendDataLength: Dword, lpdwBytesSent: PDword, lpOverlapped: POVERLAPPED): bool {.stdcall,gcsafe.}](connectExPtr) - result = func(s, name, namelen, lpSendBuffer, dwSendDataLength, lpdwBytesSent, + result = fun(s, name, namelen, lpSendBuffer, dwSendDataLength, lpdwBytesSent, lpOverlapped) proc acceptEx(listenSock, acceptSock: SocketHandle, lpOutputBuffer: pointer, @@ -453,30 +453,30 @@ when defined(windows) or defined(nimdoc): dwRemoteAddressLength: Dword, lpdwBytesReceived: PDword, lpOverlapped: POVERLAPPED): bool = if acceptExPtr.isNil: raise newException(ValueError, "Need to initialise AcceptEx().") - let func = + let fun = cast[proc (listenSock, acceptSock: SocketHandle, lpOutputBuffer: pointer, dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength: Dword, lpdwBytesReceived: PDword, lpOverlapped: POVERLAPPED): bool {.stdcall,gcsafe.}](acceptExPtr) - result = func(listenSock, acceptSock, lpOutputBuffer, dwReceiveDataLength, + result = fun(listenSock, acceptSock, lpOutputBuffer, dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength, lpdwBytesReceived, lpOverlapped) proc getAcceptExSockaddrs(lpOutputBuffer: pointer, dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength: Dword, - LocalSockaddr: ptr ptr TSockAddr, LocalSockaddrLength: LPInt, - RemoteSockaddr: ptr ptr TSockAddr, RemoteSockaddrLength: LPInt) = + LocalSockaddr: ptr ptr SockAddr, LocalSockaddrLength: LPInt, + RemoteSockaddr: ptr ptr SockAddr, RemoteSockaddrLength: LPInt) = if getAcceptExSockAddrsPtr.isNil: raise newException(ValueError, "Need to initialise getAcceptExSockAddrs().") - let func = + let fun = cast[proc (lpOutputBuffer: pointer, dwReceiveDataLength, dwLocalAddressLength, - dwRemoteAddressLength: Dword, LocalSockaddr: ptr ptr TSockAddr, - LocalSockaddrLength: LPInt, RemoteSockaddr: ptr ptr TSockAddr, + dwRemoteAddressLength: Dword, LocalSockaddr: ptr ptr SockAddr, + LocalSockaddrLength: LPInt, RemoteSockaddr: ptr ptr SockAddr, RemoteSockaddrLength: LPInt) {.stdcall,gcsafe.}](getAcceptExSockAddrsPtr) - - func(lpOutputBuffer, dwReceiveDataLength, dwLocalAddressLength, + + fun(lpOutputBuffer, dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength, LocalSockaddr, LocalSockaddrLength, RemoteSockaddr, RemoteSockaddrLength) @@ -489,12 +489,12 @@ when defined(windows) or defined(nimdoc): verifyPresence(socket) var retFuture = newFuture[void]("connect") # Apparently ``ConnectEx`` expects the socket to be initially bound: - var saddr: Tsockaddr_in + var saddr: Sockaddr_in saddr.sin_family = int16(toInt(af)) saddr.sin_port = 0 saddr.sin_addr.s_addr = INADDR_ANY - if bindAddr(socket.SocketHandle, cast[ptr TSockAddr](addr(saddr)), - sizeof(saddr).TSockLen) < 0'i32: + if bindAddr(socket.SocketHandle, cast[ptr SockAddr](addr(saddr)), + sizeof(saddr).SockLen) < 0'i32: raiseOSError(osLastError()) var aiList = getAddrInfo(address, port, af) @@ -514,9 +514,9 @@ when defined(windows) or defined(nimdoc): else: retFuture.fail(newException(OSError, osErrorMsg(errcode))) ) - + var ret = connectEx(socket.SocketHandle, it.ai_addr, - sizeof(Tsockaddr_in).cint, nil, 0, nil, + sizeof(Sockaddr_in).cint, nil, 0, nil, cast[POVERLAPPED](ol)) if ret: # Request to connect completed immediately. @@ -565,7 +565,7 @@ when defined(windows) or defined(nimdoc): var dataBuf: TWSABuf dataBuf.buf = cast[cstring](alloc0(size)) dataBuf.len = size - + var bytesReceived: Dword var flagsio = flags.toOSFlags().Dword var ol = PCustomOverlapped() @@ -606,15 +606,15 @@ when defined(windows) or defined(nimdoc): retFuture.fail(newException(OSError, osErrorMsg(err))) elif ret == 0 and bytesReceived == 0 and dataBuf.buf[0] == '\0': # We have to ensure that the buffer is empty because WSARecv will tell - # us immediatelly when it was disconnected, even when there is still + # us immediately when it was disconnected, even when there is still # data in the buffer. # We want to give the user as much data as we can. So we only return # the empty string (which signals a disconnection) when there is # nothing left to read. retFuture.complete("") - # TODO: "For message-oriented sockets, where a zero byte message is often - # allowable, a failure with an error code of WSAEDISCON is used to - # indicate graceful closure." + # TODO: "For message-oriented sockets, where a zero byte message is often + # allowable, a failure with an error code of WSAEDISCON is used to + # indicate graceful closure." # ~ http://msdn.microsoft.com/en-us/library/ms741688%28v=vs.85%29.aspx else: # Request to read completed immediately. @@ -700,17 +700,17 @@ when defined(windows) or defined(nimdoc): var lpOutputBuf = newString(lpOutputLen) var dwBytesReceived: Dword let dwReceiveDataLength = 0.Dword # We don't want any data to be read. - let dwLocalAddressLength = Dword(sizeof (Tsockaddr_in) + 16) - let dwRemoteAddressLength = Dword(sizeof(Tsockaddr_in) + 16) + let dwLocalAddressLength = Dword(sizeof (Sockaddr_in) + 16) + let dwRemoteAddressLength = Dword(sizeof(Sockaddr_in) + 16) template completeAccept(): stmt {.immediate, dirty.} = var listenSock = socket let setoptRet = setsockopt(clientSock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, addr listenSock, - sizeof(listenSock).TSockLen) + sizeof(listenSock).SockLen) if setoptRet != 0: raiseOSError(osLastError()) - var localSockaddr, remoteSockaddr: ptr TSockAddr + var localSockaddr, remoteSockaddr: ptr SockAddr var localLen, remoteLen: int32 getAcceptExSockaddrs(addr lpOutputBuf[0], dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength, @@ -719,7 +719,7 @@ when defined(windows) or defined(nimdoc): register(clientSock.TAsyncFD) # TODO: IPv6. Check ``sa_family``. http://stackoverflow.com/a/9212542/492186 retFuture.complete( - (address: $inet_ntoa(cast[ptr Tsockaddr_in](remoteSockAddr).sin_addr), + (address: $inet_ntoa(cast[ptr Sockaddr_in](remoteSockAddr).sin_addr), client: clientSock.TAsyncFD) ) @@ -748,7 +748,7 @@ when defined(windows) or defined(nimdoc): # http://msdn.microsoft.com/en-us/library/windows/desktop/ms737524%28v=vs.85%29.aspx let ret = acceptEx(socket.SocketHandle, clientSock, addr lpOutputBuf[0], - dwReceiveDataLength, + dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength, addr dwBytesReceived, cast[POVERLAPPED](ol)) @@ -803,7 +803,7 @@ else: else: from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK, MSG_NOSIGNAL - + type TAsyncFD* = distinct cint TCallback = proc (fd: TAsyncFD): bool {.closure,gcsafe.} @@ -841,6 +841,8 @@ else: proc newAsyncRawSocket*(domain: cint, typ: cint, protocol: cint): TAsyncFD = result = newRawSocket(domain, typ, protocol).TAsyncFD result.SocketHandle.setBlocking(false) + when defined(macosx): + result.SocketHandle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1) register(result) proc newAsyncRawSocket*(domain: Domain = AF_INET, @@ -848,8 +850,10 @@ else: protocol: Protocol = IPPROTO_TCP): TAsyncFD = result = newRawSocket(domain, typ, protocol).TAsyncFD result.SocketHandle.setBlocking(false) + when defined(macosx): + result.SocketHandle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1) register(result) - + proc closeSocket*(sock: TAsyncFD) = let disp = getGlobalDispatcher() sock.SocketHandle.close() @@ -864,20 +868,24 @@ else: raise newException(ValueError, "File descriptor not registered.") p.selector[fd.SocketHandle].data.PData.readCBs.add(cb) update(fd, p.selector[fd.SocketHandle].events + {EvRead}) - + proc addWrite*(fd: TAsyncFD, cb: TCallback) = let p = getGlobalDispatcher() if fd.SocketHandle notin p.selector: raise newException(ValueError, "File descriptor not registered.") p.selector[fd.SocketHandle].data.PData.writeCBs.add(cb) update(fd, p.selector[fd.SocketHandle].events + {EvWrite}) - + proc poll*(timeout = 500) = let p = getGlobalDispatcher() for info in p.selector.select(timeout): let data = PData(info.key.data) assert data.fd == info.key.fd.TAsyncFD #echo("In poll ", data.fd.cint) + if EvError in info.events: + closeSocket(data.fd) + continue + if EvRead in info.events: # Callback may add items to ``data.readCBs`` which causes issues if # we are iterating over ``data.readCBs`` at the same time. We therefore @@ -888,7 +896,7 @@ else: if not cb(data.fd): # Callback wants to be called again. data.readCBs.add(cb) - + if EvWrite in info.events: let currentCBs = data.writeCBs data.writeCBs = @[] @@ -896,7 +904,7 @@ else: if not cb(data.fd): # Callback wants to be called again. data.writeCBs.add(cb) - + if info.key in p.selector: var newEvents: set[Event] if data.readCBs.len != 0: newEvents = {EvRead} @@ -909,16 +917,16 @@ else: discard processTimers(p) - + proc connect*(socket: TAsyncFD, address: string, port: Port, af = AF_INET): Future[void] = var retFuture = newFuture[void]("connect") - + proc cb(fd: TAsyncFD): bool = # We have connected. retFuture.complete() return true - + var aiList = getAddrInfo(address, port, af) var success = false var lastError: OSErrorCode @@ -948,14 +956,13 @@ else: proc recv*(socket: TAsyncFD, size: int, flags = {SocketFlag.SafeDisconn}): Future[string] = var retFuture = newFuture[string]("recv") - + var readBuffer = newString(size) proc cb(sock: TAsyncFD): bool = result = true let res = recv(sock.SocketHandle, addr readBuffer[0], size.cint, flags.toOSFlags()) - #echo("recv cb res: ", res) if res < 0: let lastError = osLastError() if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: @@ -979,9 +986,9 @@ else: proc send*(socket: TAsyncFD, data: string, flags = {SocketFlag.SafeDisconn}): Future[void] = var retFuture = newFuture[void]("send") - + var written = 0 - + proc cb(sock: TAsyncFD): bool = result = true let netSize = data.len-written @@ -1060,6 +1067,17 @@ proc accept*(socket: TAsyncFD, # -- Await Macro +proc skipUntilStmtList(node: NimNode): NimNode {.compileTime.} = + # Skips a nest of StmtList's. + result = node + if node[0].kind == nnkStmtList: + result = skipUntilStmtList(node[0]) + +proc skipStmtList(node: NimNode): NimNode {.compileTime.} = + result = node + if node[0].kind == nnkStmtList: + result = node[0] + template createCb(retFutureSym, iteratorNameSym, name: expr): stmt {.immediate.} = var nameIterVar = iteratorNameSym @@ -1083,11 +1101,11 @@ template createCb(retFutureSym, iteratorNameSym, cb() #{.pop.} proc generateExceptionCheck(futSym, - tryStmt, rootReceiver, fromNode: PNimrodNode): PNimrodNode {.compileTime.} = + tryStmt, rootReceiver, fromNode: NimNode): NimNode {.compileTime.} = if tryStmt.kind == nnkNilLit: result = rootReceiver else: - var exceptionChecks: seq[tuple[cond, body: PNimrodNode]] = @[] + var exceptionChecks: seq[tuple[cond, body: NimNode]] = @[] let errorNode = newDotExpr(futSym, newIdentNode("error")) for i in 1 .. <tryStmt.len: let exceptBranch = tryStmt[i] @@ -1095,7 +1113,7 @@ proc generateExceptionCheck(futSym, exceptionChecks.add((newIdentNode("true"), exceptBranch[0])) else: var exceptIdentCount = 0 - var ifCond: PNimrodNode + var ifCond: NimNode for i in 0 .. <exceptBranch.len: let child = exceptBranch[i] if child.kind == nnkIdent: @@ -1129,10 +1147,10 @@ proc generateExceptionCheck(futSym, ) result.add elseNode -template createVar(result: var PNimrodNode, futSymName: string, - asyncProc: PNimrodNode, +template createVar(result: var NimNode, futSymName: string, + asyncProc: NimNode, valueReceiver, rootReceiver: expr, - fromNode: PNimrodNode) = + fromNode: NimNode) = result = newNimNode(nnkStmtList, fromNode) var futSym = genSym(nskVar, "future") result.add newVarStmt(futSym, asyncProc) # -> var future<x> = y @@ -1140,9 +1158,9 @@ template createVar(result: var PNimrodNode, futSymName: string, valueReceiver = newDotExpr(futSym, newIdentNode("read")) # -> future<x>.read result.add generateExceptionCheck(futSym, tryStmt, rootReceiver, fromNode) -proc processBody(node, retFutureSym: PNimrodNode, +proc processBody(node, retFutureSym: NimNode, subTypeIsVoid: bool, - tryStmt: PNimrodNode): PNimrodNode {.compileTime.} = + tryStmt: NimNode): NimNode {.compileTime.} = #echo(node.treeRepr) result = node case node.kind @@ -1168,7 +1186,7 @@ proc processBody(node, retFutureSym: PNimrodNode, result = newNimNode(nnkYieldStmt, node).add(node[1]) # -> yield x of nnkCall, nnkCommand: # await foo(p, x) - var futureValue: PNimrodNode + var futureValue: NimNode result.createVar("future" & $node[1][0].toStrLit, node[1], futureValue, futureValue, node) else: @@ -1207,33 +1225,60 @@ proc processBody(node, retFutureSym: PNimrodNode, of nnkTryStmt: # try: await x; except: ... result = newNimNode(nnkStmtList, node) - proc processForTry(n: PNimrodNode, i: var int, - res: PNimrodNode): bool {.compileTime.} = + template wrapInTry(n, tryBody: expr) = + var temp = n + n[0] = tryBody + tryBody = temp + + # Transform ``except`` body. + # TODO: Could we perform some ``await`` transformation here to get it + # working in ``except``? + tryBody[1] = processBody(n[1], retFutureSym, subTypeIsVoid, nil) + + proc processForTry(n: NimNode, i: var int, + res: NimNode): bool {.compileTime.} = + ## Transforms the body of the tryStmt. Does not transform the + ## body in ``except``. + ## Returns true if the tryStmt node was transformed into an ifStmt. result = false - while i < n[0].len: - var processed = processBody(n[0][i], retFutureSym, subTypeIsVoid, n) - if processed.kind != n[0][i].kind or processed.len != n[0][i].len: + var skipped = n.skipStmtList() + while i < skipped.len: + var processed = processBody(skipped[i], retFutureSym, + subTypeIsVoid, n) + + # Check if we transformed the node into an exception check. + # This suggests skipped[i] contains ``await``. + if processed.kind != skipped[i].kind or processed.len != skipped[i].len: + processed = processed.skipUntilStmtList() expectKind(processed, nnkStmtList) expectKind(processed[2][1], nnkElse) i.inc - discard processForTry(n, i, processed[2][1][0]) + + if not processForTry(n, i, processed[2][1][0]): + # We need to wrap the nnkElse nodes back into a tryStmt. + # As they are executed if an exception does not happen + # inside the awaited future. + # The following code will wrap the nodes inside the + # original tryStmt. + wrapInTry(n, processed[2][1][0]) + res.add processed result = true else: - res.add n[0][i] + res.add skipped[i] i.inc var i = 0 if not processForTry(node, i, result): - var temp = node - temp[0] = result - result = temp + # If the tryStmt hasn't been transformed we can just put the body + # back into it. + wrapInTry(node, result) return else: discard for i in 0 .. <result.len: - result[i] = processBody(result[i], retFutureSym, subTypeIsVoid, tryStmt) + result[i] = processBody(result[i], retFutureSym, subTypeIsVoid, nil) -proc getName(node: PNimrodNode): string {.compileTime.} = +proc getName(node: NimNode): string {.compileTime.} = case node.kind of nnkPostfix: return $node[1].ident @@ -1273,29 +1318,40 @@ macro async*(prc: stmt): stmt {.immediate.} = if returnType.kind == nnkEmpty: newIdentNode("void") else: returnType[1] outerProcBody.add( - newVarStmt(retFutureSym, + newVarStmt(retFutureSym, newCall( newNimNode(nnkBracketExpr, prc[6]).add( newIdentNode(!"newFuture"), # TODO: Strange bug here? Remove the `!`. subRetType), newLit(prc[0].getName)))) # Get type from return type of this proc - - # -> iterator nameIter(): FutureBase {.closure.} = + + # -> iterator nameIter(): FutureBase {.closure.} = + # -> {.push warning[resultshadowed]: off.} # -> var result: T + # -> {.pop.} # -> <proc_body> # -> complete(retFuture, result) var iteratorNameSym = genSym(nskIterator, $prc[0].getName & "Iter") var procBody = prc[6].processBody(retFutureSym, subtypeIsVoid, nil) if not subtypeIsVoid: - procBody.insert(0, newNimNode(nnkVarSection, prc[6]).add( + procBody.insert(0, newNimNode(nnkPragma).add(newIdentNode("push"), + newNimNode(nnkExprColonExpr).add(newNimNode(nnkBracketExpr).add( + newIdentNode("warning"), newIdentNode("resultshadowed")), + newIdentNode("off")))) # -> {.push warning[resultshadowed]: off.} + + procBody.insert(1, newNimNode(nnkVarSection, prc[6]).add( newIdentDefs(newIdentNode("result"), returnType[1]))) # -> var result: T + + procBody.insert(2, newNimNode(nnkPragma).add( + newIdentNode("pop"))) # -> {.pop.}) + procBody.add( newCall(newIdentNode("complete"), retFutureSym, newIdentNode("result"))) # -> complete(retFuture, result) else: # -> complete(retFuture) procBody.add(newCall(newIdentNode("complete"), retFutureSym)) - + var closureIterator = newProc(iteratorNameSym, [newIdentNode("FutureBase")], procBody, nnkIteratorDef) closureIterator[4] = newNimNode(nnkPragma, prc[6]).add(newIdentNode("closure")) @@ -1309,7 +1365,7 @@ macro async*(prc: stmt): stmt {.immediate.} = # -> return retFuture outerProcBody.add newNimNode(nnkReturnStmt, prc[6][prc[6].len-1]).add(retFutureSym) - + result = prc # Remove the 'async' pragma. @@ -1325,7 +1381,7 @@ macro async*(prc: stmt): stmt {.immediate.} = result[6] = outerProcBody #echo(treeRepr(result)) - #if prc[0].getName == "catch": + #if prc[0].getName == "test": # echo(toStrLit(result)) proc recvLine*(socket: TAsyncFD): Future[string] {.async.} = @@ -1335,7 +1391,7 @@ proc recvLine*(socket: TAsyncFD): Future[string] {.async.} = ## If a full line is read ``\r\L`` is not ## added to ``line``, however if solely ``\r\L`` is read then ``line`` ## will be set to it. - ## + ## ## If the socket is disconnected, ``line`` will be set to ``""``. ## ## If the socket is disconnected in the middle of a line (before ``\r\L`` @@ -1346,7 +1402,7 @@ proc recvLine*(socket: TAsyncFD): Future[string] {.async.} = ## ## **Note**: This procedure is mostly used for testing. You likely want to ## use ``asyncnet.recvLine`` instead. - + template addNLIfEmpty(): stmt = if result.len == 0: result.add("\c\L") |