# # # Nimrod's Runtime Library # (c) Copyright 2014 Dominik Picheta # # See the file "copying.txt", included in this # distribution, for details about the copyright. # import os, oids, tables, strutils, macros import sockets2, net ## Asyncio2 ## -------- ## ## This module implements a brand new asyncio module based on Futures. ## IOCP is used under the hood on Windows and the selectors module is used for ## other operating systems. # -- Futures type PFutureBase* = ref object of PObject cb: proc () {.closure.} finished: bool PFuture*[T] = ref object of PFutureBase value: T error: ref EBase proc newFuture*[T](): PFuture[T] = ## Creates a new future. new(result) result.finished = false proc complete*[T](future: PFuture[T], val: T) = ## Completes ``future`` with value ``val``. assert(not future.finished, "Future already finished, cannot finish twice.") assert(future.error == nil) future.value = val future.finished = true if future.cb != nil: future.cb() proc fail*[T](future: PFuture[T], error: ref EBase) = ## Completes ``future`` with ``error``. assert(not future.finished, "Future already finished, cannot finish twice.") future.finished = true future.error = error if future.cb != nil: future.cb() proc `callback=`*(future: PFutureBase, cb: proc () {.closure.}) = ## Sets the callback proc to be called when the future completes. ## ## If future has already completed then ``cb`` will be called immediately. ## ## **Note**: You most likely want the other ``callback`` setter which ## passes ``future`` as a param to the callback. future.cb = cb if future.finished: future.cb() proc `callback=`*[T](future: PFuture[T], cb: proc (future: PFuture[T]) {.closure.}) = ## Sets the callback proc to be called when the future completes. ## ## If future has already completed then ``cb`` will be called immediately. future.callback = proc () = cb(future) proc read*[T](future: PFuture[T]): T = ## Retrieves the value of ``future``. Future must be finished otherwise ## this function will fail with a ``EInvalidValue`` exception. ## ## If the result of the future is an error then that error will be raised. if future.finished: if future.error != nil: raise future.error return future.value else: # TODO: Make a custom exception type for this? raise newException(EInvalidValue, "Future still in progress.") proc finished*[T](future: PFuture[T]): bool = ## Determines whether ``future`` has completed. ## ## ``True`` may indicate an error or a value. Use ``hasError`` to distinguish. future.finished proc failed*[T](future: PFuture[T]): bool = ## Determines whether ``future`` completed with an error. future.error != nil # TODO: Get rid of register. Do it implicitly. when defined(windows) or defined(nimdoc): import winlean type TCompletionKey = dword TCompletionData* = object sock: TSocketHandle cb: proc (sock: TSocketHandle, bytesTransferred: DWORD, errcode: TOSErrorCode) {.closure.} PDispatcher* = ref object ioPort: THandle hasHandles: bool TCustomOverlapped = object Internal*: DWORD InternalHigh*: DWORD Offset*: DWORD OffsetHigh*: DWORD hEvent*: THANDLE data*: TCompletionData PCustomOverlapped = ptr TCustomOverlapped proc newDispatcher*(): PDispatcher = ## Creates a new Dispatcher instance. new result result.ioPort = CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1) proc register*(p: PDispatcher, sock: TSocketHandle) = ## Registers ``sock`` with the dispatcher ``p``. if CreateIOCompletionPort(sock.THandle, p.ioPort, cast[TCompletionKey](sock), 1) == 0: OSError(OSLastError()) p.hasHandles = true proc poll*(p: PDispatcher, timeout = 500) = ## Waits for completion events and processes them. if not p.hasHandles: raise newException(EInvalidValue, "No handles registered in dispatcher.") let llTimeout = if timeout == -1: winlean.INFINITE else: timeout.int32 var lpNumberOfBytesTransferred: DWORD var lpCompletionKey: ULONG var lpOverlapped: POverlapped let res = GetQueuedCompletionStatus(p.ioPort, addr lpNumberOfBytesTransferred, addr lpCompletionKey, addr lpOverlapped, llTimeout).bool # http://stackoverflow.com/a/12277264/492186 # TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html var customOverlapped = cast[PCustomOverlapped](lpOverlapped) if res: # This is useful for ensuring the reliability of the overlapped struct. assert customOverlapped.data.sock == lpCompletionKey.TSocketHandle customOverlapped.data.cb(customOverlapped.data.sock, lpNumberOfBytesTransferred, TOSErrorCode(-1)) dealloc(customOverlapped) else: let errCode = OSLastError() if lpOverlapped != nil: assert customOverlapped.data.sock == lpCompletionKey.TSocketHandle customOverlapped.data.cb(customOverlapped.data.sock, lpNumberOfBytesTransferred, errCode) dealloc(customOverlapped) else: if errCode.int32 == WAIT_TIMEOUT: # Timed out discard else: OSError(errCode) var connectExPtr: pointer = nil var acceptExPtr: pointer = nil var getAcceptExSockAddrsPtr: pointer = nil proc initPointer(s: TSocketHandle, func: var pointer, guid: var TGUID): bool = # Ref: https://github.com/powdahound/twisted/blob/master/twisted/internet/iocpreactor/iocpsupport/winsock_pointers.c var bytesRet: DWord func = nil result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid, sizeof(TGUID).dword, addr func, sizeof(pointer).DWORD, addr bytesRet, nil, nil) == 0 proc initAll() = let dummySock = socket() if not initPointer(dummySock, connectExPtr, WSAID_CONNECTEX): OSError(OSLastError()) if not initPointer(dummySock, acceptExPtr, WSAID_ACCEPTEX): OSError(OSLastError()) if not initPointer(dummySock, getAcceptExSockAddrsPtr, WSAID_GETACCEPTEXSOCKADDRS): OSError(OSLastError()) proc connectEx(s: TSocketHandle, name: ptr TSockAddr, namelen: cint, lpSendBuffer: pointer, dwSendDataLength: dword, lpdwBytesSent: PDWORD, lpOverlapped: POverlapped): bool = if connectExPtr.isNil: raise newException(EInvalidValue, "Need to initialise ConnectEx().") let func = cast[proc (s: TSocketHandle, name: ptr TSockAddr, namelen: cint, lpSendBuffer: pointer, dwSendDataLength: dword, lpdwBytesSent: PDWORD, lpOverlapped: POverlapped): bool {.stdcall.}](connectExPtr) result = func(s, name, namelen, lpSendBuffer, dwSendDataLength, lpdwBytesSent, lpOverlapped) proc acceptEx(listenSock, acceptSock: TSocketHandle, lpOutputBuffer: pointer, dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength: DWORD, lpdwBytesReceived: PDWORD, lpOverlapped: POverlapped): bool = if acceptExPtr.isNil: raise newException(EInvalidValue, "Need to initialise AcceptEx().") let func = cast[proc (listenSock, acceptSock: TSocketHandle, lpOutputBuffer: pointer, dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength: DWORD, lpdwBytesReceived: PDWORD, lpOverlapped: POverlapped): bool {.stdcall.}](acceptExPtr) result = func(listenSock, acceptSock, lpOutputBuffer, dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength, lpdwBytesReceived, lpOverlapped) proc getAcceptExSockaddrs(lpOutputBuffer: pointer, dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength: DWORD, LocalSockaddr: ptr ptr TSockAddr, LocalSockaddrLength: lpint, RemoteSockaddr: ptr ptr TSockAddr, RemoteSockaddrLength: lpint) = if getAcceptExSockAddrsPtr.isNil: raise newException(EInvalidValue, "Need to initialise getAcceptExSockAddrs().") let func = cast[proc (lpOutputBuffer: pointer, dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength: DWORD, LocalSockaddr: ptr ptr TSockAddr, LocalSockaddrLength: lpint, RemoteSockaddr: ptr ptr TSockAddr, RemoteSockaddrLength: lpint) {.stdcall.}](getAcceptExSockAddrsPtr) func(lpOutputBuffer, dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength, LocalSockaddr, LocalSockaddrLength, RemoteSockaddr, RemoteSockaddrLength) proc connect*(p: PDispatcher, socket: TSocketHandle, address: string, port: TPort, af = AF_INET): PFuture[int] = ## Connects ``socket`` to server at ``address:port``. ## ## Returns a ``PFuture`` which will complete when the connection succeeds ## or an error occurs. var retFuture = newFuture[int]()# TODO: Change to void when that regression is fixed. # Apparently ``ConnectEx`` expects the socket to be initially bound: var saddr: Tsockaddr_in saddr.sin_family = int16(toInt(af)) saddr.sin_port = 0 saddr.sin_addr.s_addr = INADDR_ANY if bindAddr(socket, cast[ptr TSockAddr](addr(saddr)), sizeof(saddr).TSockLen) < 0'i32: OSError(OSLastError()) var aiList = getAddrInfo(address, port, af) var success = false var lastError: TOSErrorCode var it = aiList while it != nil: # "the OVERLAPPED structure must remain valid until the I/O completes" # http://blogs.msdn.com/b/oldnewthing/archive/2011/02/02/10123392.aspx var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped))) ol.data = TCompletionData(sock: socket, cb: proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) = if not retFuture.finished: if errcode == TOSErrorCode(-1): retFuture.complete(0) else: retFuture.fail(newException(EOS, osErrorMsg(errcode))) ) var ret = connectEx(socket, it.ai_addr, sizeof(TSockAddrIn).cint, nil, 0, nil, cast[POverlapped](ol)) if ret: # Request to connect completed immediately. success = true retFuture.complete(0) # We don't deallocate ``ol`` here because even though this completed # immediately poll will still be notified about its completion and it will # free ``ol``. break else: lastError = OSLastError() if lastError.int32 == ERROR_IO_PENDING: # In this case ``ol`` will be deallocated in ``poll``. success = true break else: dealloc(ol) success = false it = it.ai_next dealloc(aiList) if not success: retFuture.fail(newException(EOS, osErrorMsg(lastError))) return retFuture proc recv*(p: PDispatcher, socket: TSocketHandle, size: int, flags: int = 0): PFuture[string] = ## Reads ``size`` bytes from ``socket``. Returned future will complete once ## all of the requested data is read. If socket is disconnected during the ## recv operation then the future may complete with only a part of the ## requested data read. If socket is disconnected and no data is available ## to be read then the future will complete with a value of ``""``. var retFuture = newFuture[string]() var dataBuf: TWSABuf dataBuf.buf = newString(size) dataBuf.len = size var bytesReceived: DWord var flagsio = flags.dword var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped))) ol.data = TCompletionData(sock: socket, cb: proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) = if not retFuture.finished: if errcode == TOSErrorCode(-1): if bytesCount == 0 and dataBuf.buf[0] == '\0': retFuture.complete("") else: var data = newString(size) copyMem(addr data[0], addr dataBuf.buf[0], size) retFuture.complete($data) else: retFuture.fail(newException(EOS, osErrorMsg(errcode))) ) let ret = WSARecv(socket, addr dataBuf, 1, addr bytesReceived, addr flagsio, cast[POverlapped](ol), nil) if ret == -1: let err = OSLastError() if err.int32 != ERROR_IO_PENDING: retFuture.fail(newException(EOS, osErrorMsg(err))) dealloc(ol) elif ret == 0 and bytesReceived == 0 and dataBuf.buf[0] == '\0': # We have to ensure that the buffer is empty because WSARecv will tell # us immediatelly when it was disconnected, even when there is still # data in the buffer. # We want to give the user as much data as we can. So we only return # the empty string (which signals a disconnection) when there is # nothing left to read. retFuture.complete("") # TODO: "For message-oriented sockets, where a zero byte message is often # allowable, a failure with an error code of WSAEDISCON is used to # indicate graceful closure." # ~ http://msdn.microsoft.com/en-us/library/ms741688%28v=vs.85%29.aspx else: # Request to read completed immediately. var data = newString(size) copyMem(addr data[0], addr dataBuf.buf[0], size) retFuture.complete($data) # We don't deallocate ``ol`` here because even though this completed # immediately poll will still be notified about its completion and it will # free ``ol``. return retFuture proc send*(p: PDispatcher, socket: TSocketHandle, data: string): PFuture[int] = ## Sends ``data`` to ``socket``. The returned future will complete once all ## data has been sent. var retFuture = newFuture[int]() var dataBuf: TWSABuf dataBuf.buf = data dataBuf.len = data.len var bytesReceived, flags: DWord var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped))) ol.data = TCompletionData(sock: socket, cb: proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) = if not retFuture.finished: if errcode == TOSErrorCode(-1): retFuture.complete(0) else: retFuture.fail(newException(EOS, osErrorMsg(errcode))) ) let ret = WSASend(socket, addr dataBuf, 1, addr bytesReceived, flags, cast[POverlapped](ol), nil) if ret == -1: let err = osLastError() if err.int32 != ERROR_IO_PENDING: retFuture.fail(newException(EOS, osErrorMsg(err))) dealloc(ol) else: retFuture.complete(0) # We don't deallocate ``ol`` here because even though this completed # immediately poll will still be notified about its completion and it will # free ``ol``. return retFuture proc acceptAddr*(p: PDispatcher, socket: TSocketHandle): PFuture[tuple[address: string, client: TSocketHandle]] = ## Accepts a new connection. Returns a future containing the client socket ## corresponding to that connection and the remote address of the client. ## The future will complete when the connection is successfully accepted. var retFuture = newFuture[tuple[address: string, client: TSocketHandle]]() var clientSock = socket() if clientSock == OSInvalidSocket: osError(osLastError()) const lpOutputLen = 1024 var lpOutputBuf = newString(lpOutputLen) var dwBytesReceived: DWORD let dwReceiveDataLength = 0.DWORD # We don't want any data to be read. let dwLocalAddressLength = DWORD(sizeof (TSockaddr_in) + 16) let dwRemoteAddressLength = DWORD(sizeof(TSockaddr_in) + 16) template completeAccept(): stmt {.immediate, dirty.} = var listenSock = socket let setoptRet = setsockopt(clientSock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, addr listenSock, sizeof(listenSock).TSockLen) if setoptRet != 0: osError(osLastError()) var LocalSockaddr, RemoteSockaddr: ptr TSockAddr var localLen, remoteLen: int32 getAcceptExSockaddrs(addr lpOutputBuf[0], dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength, addr LocalSockaddr, addr localLen, addr RemoteSockaddr, addr remoteLen) # TODO: IPv6. Check ``sa_family``. http://stackoverflow.com/a/9212542/492186 retFuture.complete( (address: $inet_ntoa(cast[ptr Tsockaddr_in](remoteSockAddr).sin_addr), client: clientSock) ) var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped))) ol.data = TCompletionData(sock: socket, cb: proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) = if not retFuture.finished: if errcode == TOSErrorCode(-1): completeAccept() else: retFuture.fail(newException(EOS, osErrorMsg(errcode))) ) # http://msdn.microsoft.com/en-us/library/windows/desktop/ms737524%28v=vs.85%29.aspx let ret = acceptEx(socket, clientSock, addr lpOutputBuf[0], dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength, addr dwBytesReceived, cast[POverlapped](ol)) if not ret: let err = osLastError() if err.int32 != ERROR_IO_PENDING: retFuture.fail(newException(EOS, osErrorMsg(err))) dealloc(ol) else: completeAccept() # We don't deallocate ``ol`` here because even though this completed # immediately poll will still be notified about its completion and it will # free ``ol``. return retFuture initAll() else: import selectors from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK type TCallback = proc (sock: TSocketHandle): bool {.closure.} PData* = ref object of PObject sock: TSocketHandle readCBs: seq[TCallback] writeCBs: seq[TCallback] PDispatcher* = ref object selector: PSelector proc newDispatcher*(): PDispatcher = new result result.selector = newSelector() proc update(p: PDispatcher, sock: TSocketHandle, events: set[TEvent]) = assert sock in p.selector echo("Update: ", events) if events == {}: discard p.selector.unregister(sock) else: discard p.selector.update(sock, events) proc addRead(p: PDispatcher, sock: TSocketHandle, cb: TCallback) = if sock notin p.selector: var data = PData(sock: sock, readCBs: @[cb], writeCBs: @[]) p.selector.register(sock, {EvRead}, data.PObject) else: p.selector[sock].data.PData.readCBs.add(cb) p.update(sock, p.selector[sock].events + {EvRead}) proc addWrite(p: PDispatcher, sock: TSocketHandle, cb: TCallback) = if sock notin p.selector: var data = PData(sock: sock, readCBs: @[], writeCBs: @[cb]) p.selector.register(sock, {EvWrite}, data.PObject) else: p.selector[sock].data.PData.writeCBs.add(cb) p.update(sock, p.selector[sock].events + {EvWrite}) proc poll*(p: PDispatcher, timeout = 500) = for info in p.selector.select(timeout): let data = PData(info.key.data) assert data.sock == info.key.fd echo("R: ", data.readCBs.len, " W: ", data.writeCBs.len, ". ", info.events) if EvRead in info.events: var newReadCBs: seq[TCallback] = @[] for cb in data.readCBs: if not cb(data.sock): # Callback wants to be called again. newReadCBs.add(cb) data.readCBs = newReadCBs if EvWrite in info.events: var newWriteCBs: seq[TCallback] = @[] for cb in data.writeCBs: if not cb(data.sock): # Callback wants to be called again. newWriteCBs.add(cb) data.writeCBs = newWriteCBs var newEvents: set[TEvent] if data.readCBs.len != 0: newEvents = {EvRead} if data.writeCBs.len != 0: newEvents = newEvents + {EvWrite} p.update(data.sock, newEvents) proc connect*(p: PDispatcher, socket: TSocketHandle, address: string, port: TPort, af = AF_INET): PFuture[int] = var retFuture = newFuture[int]() proc cb(sock: TSocketHandle): bool = # We have connected. retFuture.complete(0) return true var aiList = getAddrInfo(address, port, af) var success = false var lastError: TOSErrorCode var it = aiList while it != nil: var ret = connect(socket, it.ai_addr, it.ai_addrlen.TSocklen) if ret == 0: # Request to connect completed immediately. success = true retFuture.complete(0) break else: lastError = osLastError() if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS: success = true addWrite(p, socket, cb) break else: success = false it = it.ai_next dealloc(aiList) if not success: retFuture.fail(newException(EOS, osErrorMsg(lastError))) return retFuture proc recv*(p: PDispatcher, socket: TSocketHandle, size: int, flags: int = 0): PFuture[string] = var retFuture = newFuture[string]() var readBuffer = newString(size) var sizeRead = 0 proc cb(sock: TSocketHandle): bool = result = true let netSize = size - sizeRead let res = recv(sock, addr readBuffer[sizeRead], netSize, flags.cint) if res < 0: let lastError = osLastError() if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: retFuture.fail(newException(EOS, osErrorMsg(lastError))) else: result = false # We still want this callback to be called. elif res == 0: # Disconnected if sizeRead == 0: retFuture.complete("") else: readBuffer.setLen(sizeRead) retFuture.complete(readBuffer) else: sizeRead.inc(res) if res != netSize: result = false # We want to read all the data requested. else: retFuture.complete(readBuffer) addRead(p, socket, cb) return retFuture proc send*(p: PDispatcher, socket: TSocketHandle, data: string): PFuture[int] = var retFuture = newFuture[int]() var written = 0 proc cb(sock: TSocketHandle): bool = result = true let netSize = data.len-written var d = data.cstring let res = send(sock, addr d[written], netSize, 0.cint) if res < 0: let lastError = osLastError() if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: retFuture.fail(newException(EOS, osErrorMsg(lastError))) else: result = false # We still want this callback to be called. else: written.inc(res) if res != netSize: result = false # We still have data to send. else: retFuture.complete(0) addWrite(p, socket, cb) return retFuture proc acceptAddr*(p: PDispatcher, socket: TSocketHandle): PFuture[tuple[address: string, client: TSocketHandle]] = var retFuture = newFuture[tuple[address: string, client: TSocketHandle]]() proc cb(sock: TSocketHandle): bool = result = true var sockAddress: Tsockaddr_in var addrLen = sizeof(sockAddress).TSocklen var client = accept(sock, cast[ptr TSockAddr](addr(sockAddress)), addr(addrLen)) if client == osInvalidSocket: let lastError = osLastError() assert lastError.int32 notin {EWOULDBLOCK, EAGAIN} if lastError.int32 == EINTR: return false else: retFuture.fail(newException(EOS, osErrorMsg(lastError))) else: retFuture.complete(($inet_ntoa(sockAddress.sin_addr), client)) addRead(p, socket, cb) return retFuture proc accept*(p: PDispatcher, socket: TSocketHandle): PFuture[TSocketHandle] = ## Accepts a new connection. Returns a future containing the client socket ## corresponding to that connection. ## The future will complete when the connection is successfully accepted. var retFut = newFuture[TSocketHandle]() var fut = p.acceptAddr(socket) fut.callback = proc (future: PFuture[tuple[address: string, client: TSocketHandle]]) = assert future.finished if future.failed: retFut.fail(future.error) else: retFut.complete(future.read.client) return retFut # -- Await Macro template createCb*(cbName, varNameIterSym, retFutureSym: expr): stmt {.immediate, dirty.} = proc cbName {.closure.} = if not varNameIterSym.finished: var next = varNameIterSym() if next == nil: assert retFutureSym.finished, "Async procedure's return Future was not finished." else: next.callback = cbName template createVar(futSymName: string, asyncProc: PNimrodNode, valueReceiver: expr) {.immediate, dirty.} = # TODO: Used template here due to bug #926 result = newNimNode(nnkStmtList) var futSym = newIdentNode(futSymName) #genSym(nskVar, "future") result.add newVarStmt(futSym, asyncProc) # -> var future = y result.add newNimNode(nnkYieldStmt).add(futSym) # -> yield future valueReceiver = newDotExpr(futSym, newIdentNode("read")) # -> future.read proc processBody(node, retFutureSym: PNimrodNode): PNimrodNode {.compileTime.} = result = node case node.kind of nnkReturnStmt: result = newNimNode(nnkStmtList) result.add newCall(newIdentNode("complete"), retFutureSym, if node[0].kind == nnkEmpty: newIdentNode("result") else: node[0]) result.add newNimNode(nnkYieldStmt).add(newNilLit()) of nnkCommand: if node[0].ident == !"await": case node[1].kind of nnkIdent: # await x result = newNimNode(nnkYieldStmt).add(node[1]) # -> yield x of nnkCall: # await foo(p, x) var futureValue: PNimrodNode createVar("future" & $node[1][0].toStrLit, node[1], futureValue) result.add futureValue else: error("Invalid node kind in 'await', got: " & $node[1].kind) elif node[1].kind == nnkCommand and node[1][0].kind == nnkIdent and node[1][0].ident == !"await": # foo await x var newCommand = node createVar("future" & $node[0].ident, node[1][0], newCommand[1]) result.add newCommand of nnkVarSection, nnkLetSection: case node[0][2].kind of nnkCommand: if node[0][2][0].ident == !"await": # var x = await y var newVarSection = node # TODO: Should this use copyNimNode? createVar("future" & $node[0][0].ident, node[0][2][1], newVarSection[0][2]) result.add newVarSection else: discard of nnkAsgn: case node[1].kind of nnkCommand: if node[1][0].ident == !"await": # x = await y var newAsgn = node createVar("future" & $node[0].ident, node[1][1], newAsgn[1]) result.add newAsgn else: discard of nnkDiscardStmt: # discard await x if node[0][0].ident == !"await": var dummy = newNimNode(nnkStmtList) createVar("futureDiscard_" & $toStrLit(node[0][1]), node[0][1], dummy) else: discard for i in 0 .. var retFuture = newFuture[T]() var retFutureSym = newIdentNode("retFuture") #genSym(nskVar, "retFuture") outerProcBody.add( newVarStmt(retFutureSym, newCall( newNimNode(nnkBracketExpr).add( newIdentNode("newFuture"), prc[3][0][1])))) # Get type from return type of this proc. # -> iterator nameIter(): PFutureBase {.closure.} = # -> var result: T # -> # -> complete(retFuture, result) var iteratorNameSym = newIdentNode($prc[0].getName & "Iter") #genSym(nskIterator, $prc[0].ident & "Iter") var procBody = prc[6].processBody(retFutureSym) procBody.insert(0, newNimNode(nnkVarSection).add( newIdentDefs(newIdentNode("result"), prc[3][0][1]))) # -> var result: T procBody.add( newCall(newIdentNode("complete"), retFutureSym, newIdentNode("result"))) # -> complete(retFuture, result) var closureIterator = newProc(iteratorNameSym, [newIdentNode("PFutureBase")], procBody, nnkIteratorDef) closureIterator[4] = newNimNode(nnkPragma).add(newIdentNode("closure")) outerProcBody.add(closureIterator) # -> var nameIterVar = nameIter # -> var first = nameIterVar() var varNameIterSym = newIdentNode($prc[0].getName & "IterVar") #genSym(nskVar, $prc[0].ident & "IterVar") var varNameIter = newVarStmt(varNameIterSym, iteratorNameSym) outerProcBody.add varNameIter var varFirstSym = genSym(nskVar, "first") var varFirst = newVarStmt(varFirstSym, newCall(varNameIterSym)) outerProcBody.add varFirst # -> createCb(cb, nameIter, retFuture) var cbName = newIdentNode("cb") var procCb = newCall("createCb", cbName, varNameIterSym, retFutureSym) outerProcBody.add procCb # -> first.callback = cb outerProcBody.add newAssignment( newDotExpr(varFirstSym, newIdentNode("callback")), cbName) # -> return retFuture outerProcBody.add newNimNode(nnkReturnStmt).add(retFutureSym) result = prc # Remove the 'async' pragma. for i in 0 .. 0 and c == "\L": discard await p.recv(socket, 1) addNLIfEmpty() return elif c == "\L": addNLIfEmpty() return add(result.string, c) when isMainModule: var p = newDispatcher() var sock = socket() sock.setBlocking false when false: # Await tests proc main(p: PDispatcher): PFuture[int] {.async.} = discard await p.connect(sock, "irc.freenode.net", TPort(6667)) while true: var line = await p.recvLine(sock) echo("Line is: ", line.repr) if line == "": echo "Disconnected" break proc peekTest(p: PDispatcher): PFuture[int] {.async.} = discard await p.connect(sock, "localhost", TPort(6667)) while true: var line = await p.recv(sock, 1, MSG_PEEK) var line2 = await p.recv(sock, 1) echo(line.repr) echo(line2.repr) echo("---") if line2 == "": break sleep(500) var f = main(p) else: when false: var f = p.connect(sock, "irc.freenode.org", TPort(6667)) f.callback = proc (future: PFuture[int]) = echo("Connected in future!") echo(future.read) for i in 0 .. 50: var recvF = p.recv(sock, 10) recvF.callback = proc (future: PFuture[string]) = echo("Read ", future.read.len, ": ", future.read.repr) else: sock.bindAddr(TPort(6667)) sock.listen() proc onAccept(future: PFuture[TSocketHandle]) = echo "Accepted" var t = p.send(future.read, "test\c\L") t.callback = proc (future: PFuture[int]) = echo(future.read) var f = p.accept(sock) f.callback = onAccept var f = p.accept(sock) f.callback = onAccept while true: p.poll()