diff options
Diffstat (limited to 'lib/pure/asyncdispatch.nim')
-rw-r--r-- | lib/pure/asyncdispatch.nim | 608 |
1 files changed, 489 insertions, 119 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index cc337452f..79bc1b96d 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -9,9 +9,9 @@ include "system/inclrtl" -import os, oids, tables, strutils, macros, times +import os, oids, tables, strutils, macros, times, heapqueue -import nativesockets, net +import nativesockets, net, queues export Port, SocketFlag @@ -155,6 +155,9 @@ type when not defined(release): var currentID = 0 + +proc callSoon*(cbproc: proc ()) {.gcsafe.} + proc newFuture*[T](fromProc: string = "unspecified"): Future[T] = ## Creates a new future. ## @@ -257,7 +260,7 @@ proc `callback=`*(future: FutureBase, cb: proc () {.closure,gcsafe.}) = ## passes ``future`` as a param to the callback. future.cb = cb if future.finished: - future.cb() + callSoon(future.cb) proc `callback=`*[T](future: Future[T], cb: proc (future: Future[T]) {.closure,gcsafe.}) = @@ -352,28 +355,88 @@ proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] = fut2.callback = cb return retFuture +proc all*[T](futs: varargs[Future[T]]): auto = + ## Returns a future which will complete once + ## all futures in ``futs`` complete. + ## + ## If the awaited futures are not ``Future[void]``, the returned future + ## will hold the values of all awaited futures in a sequence. + ## + ## If the awaited futures *are* ``Future[void]``, + ## this proc returns ``Future[void]``. + + when T is void: + var + retFuture = newFuture[void]("asyncdispatch.all") + completedFutures = 0 + + let totalFutures = len(futs) + + for fut in futs: + fut.callback = proc(f: Future[T]) = + inc(completedFutures) + + if completedFutures == totalFutures: + retFuture.complete() + + return retFuture + + else: + var + retFuture = newFuture[seq[T]]("asyncdispatch.all") + retValues = newSeq[T](len(futs)) + completedFutures = 0 + + for i, fut in futs: + proc setCallback(i: int) = + fut.callback = proc(f: Future[T]) = + retValues[i] = f.read() + inc(completedFutures) + + if completedFutures == len(retValues): + retFuture.complete(retValues) + + setCallback(i) + + return retFuture + type PDispatcherBase = ref object of RootRef - timers: seq[tuple[finishAt: float, fut: Future[void]]] - -proc processTimers(p: PDispatcherBase) = - var oldTimers = p.timers - p.timers = @[] - for t in oldTimers: - if epochTime() >= t.finishAt: - t.fut.complete() - else: - p.timers.add(t) + timers: HeapQueue[tuple[finishAt: float, fut: Future[void]]] + callbacks: Queue[proc ()] + +proc processTimers(p: PDispatcherBase) {.inline.} = + while p.timers.len > 0 and epochTime() >= p.timers[0].finishAt: + p.timers.pop().fut.complete() + +proc processPendingCallbacks(p: PDispatcherBase) = + while p.callbacks.len > 0: + var cb = p.callbacks.dequeue() + cb() + +proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} = + # If dispatcher has active timers this proc returns the timeout + # of the nearest timer. Returns `timeout` otherwise. + result = timeout + if p.timers.len > 0: + let timerTimeout = p.timers[0].finishAt + let curTime = epochTime() + if timeout == -1 or (curTime + (timeout / 1000)) > timerTimeout: + result = int((timerTimeout - curTime) * 1000) + if result < 0: result = 0 when defined(windows) or defined(nimdoc): import winlean, sets, hashes type - CompletionKey = Dword + CompletionKey = ULONG_PTR CompletionData* = object fd*: AsyncFD # TODO: Rename this. cb*: proc (fd: AsyncFD, bytesTransferred: Dword, errcode: OSErrorCode) {.closure,gcsafe.} + cell*: ForeignCell # we need this `cell` to protect our `cb` environment, + # when using RegisterWaitForSingleObject, because + # waiting is done in different thread. PDispatcher* = ref object of PDispatcherBase ioPort: Handle @@ -385,6 +448,15 @@ when defined(windows) or defined(nimdoc): PCustomOverlapped* = ref CustomOverlapped AsyncFD* = distinct int + + PostCallbackData = object + ioPort: Handle + handleFd: AsyncFD + waitFd: Handle + ovl: PCustomOverlapped + PostCallbackDataPtr = ptr PostCallbackData + + Callback = proc (fd: AsyncFD): bool {.closure,gcsafe.} {.deprecated: [TCompletionKey: CompletionKey, TAsyncFD: AsyncFD, TCustomOverlapped: CustomOverlapped, TCompletionData: CompletionData].} @@ -396,7 +468,8 @@ when defined(windows) or defined(nimdoc): new result result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1) result.handles = initSet[AsyncFD]() - result.timers = @[] + result.timers.newHeapQueue() + result.callbacks = initQueue[proc ()](64) var gDisp{.threadvar.}: PDispatcher ## Global dispatcher proc getGlobalDispatcher*(): PDispatcher = @@ -423,15 +496,17 @@ when defined(windows) or defined(nimdoc): proc poll*(timeout = 500) = ## Waits for completion events and processes them. let p = getGlobalDispatcher() - if p.handles.len == 0 and p.timers.len == 0: + if p.handles.len == 0 and p.timers.len == 0 and p.callbacks.len == 0: raise newException(ValueError, "No handles or timers registered in dispatcher.") - let llTimeout = - if timeout == -1: winlean.INFINITE - else: timeout.int32 + let at = p.adjustedTimeout(timeout) + var llTimeout = + if at == -1: winlean.INFINITE + else: at.int32 + var lpNumberOfBytesTransferred: Dword - var lpCompletionKey: ULONG + var lpCompletionKey: ULONG_PTR var customOverlapped: PCustomOverlapped let res = getQueuedCompletionStatus(p.ioPort, addr lpNumberOfBytesTransferred, addr lpCompletionKey, @@ -445,6 +520,13 @@ when defined(windows) or defined(nimdoc): customOverlapped.data.cb(customOverlapped.data.fd, lpNumberOfBytesTransferred, OSErrorCode(-1)) + + # If cell.data != nil, then system.protect(rawEnv(cb)) was called, + # so we need to dispose our `cb` environment, because it is not needed + # anymore. + if customOverlapped.data.cell.data != nil: + system.dispose(customOverlapped.data.cell) + GC_unref(customOverlapped) else: let errCode = osLastError() @@ -452,6 +534,8 @@ when defined(windows) or defined(nimdoc): assert customOverlapped.data.fd == lpCompletionKey.AsyncFD customOverlapped.data.cb(customOverlapped.data.fd, lpNumberOfBytesTransferred, errCode) + if customOverlapped.data.cell.data != nil: + system.dispose(customOverlapped.data.cell) GC_unref(customOverlapped) else: if errCode.int32 == WAIT_TIMEOUT: @@ -461,6 +545,8 @@ when defined(windows) or defined(nimdoc): # Timer processing. processTimers(p) + # Callback queue processing + processPendingCallbacks(p) var connectExPtr: pointer = nil var acceptExPtr: pointer = nil @@ -651,34 +737,16 @@ when defined(windows) or defined(nimdoc): retFuture.complete("") else: retFuture.fail(newException(OSError, osErrorMsg(err))) - elif ret == 0 and bytesReceived == 0 and dataBuf.buf[0] == '\0': - # We have to ensure that the buffer is empty because WSARecv will tell - # us immediately when it was disconnected, even when there is still - # data in the buffer. - # We want to give the user as much data as we can. So we only return - # the empty string (which signals a disconnection) when there is - # nothing left to read. - retFuture.complete("") - # TODO: "For message-oriented sockets, where a zero byte message is often - # allowable, a failure with an error code of WSAEDISCON is used to - # indicate graceful closure." - # ~ http://msdn.microsoft.com/en-us/library/ms741688%28v=vs.85%29.aspx - else: - # Request to read completed immediately. - # From my tests bytesReceived isn't reliable. - let realSize = - if bytesReceived == 0: - size - else: - bytesReceived - var data = newString(realSize) - assert realSize <= size - copyMem(addr data[0], addr dataBuf.buf[0], realSize) - #dealloc dataBuf.buf - retFuture.complete($data) - # We don't deallocate ``ol`` here because even though this completed - # immediately poll will still be notified about its completion and it will - # free ``ol``. + elif ret == 0: + # Request completed immediately. + if bytesReceived != 0: + var data = newString(bytesReceived) + assert bytesReceived <= size + copyMem(addr data[0], addr dataBuf.buf[0], bytesReceived) + retFuture.complete($data) + else: + if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)): + retFuture.complete("") return retFuture proc recvInto*(socket: AsyncFD, buf: cstring, size: int, @@ -741,31 +809,14 @@ when defined(windows) or defined(nimdoc): retFuture.complete(0) else: retFuture.fail(newException(OSError, osErrorMsg(err))) - elif ret == 0 and bytesReceived == 0 and dataBuf.buf[0] == '\0': - # We have to ensure that the buffer is empty because WSARecv will tell - # us immediately when it was disconnected, even when there is still - # data in the buffer. - # We want to give the user as much data as we can. So we only return - # the empty string (which signals a disconnection) when there is - # nothing left to read. - retFuture.complete(0) - # TODO: "For message-oriented sockets, where a zero byte message is often - # allowable, a failure with an error code of WSAEDISCON is used to - # indicate graceful closure." - # ~ http://msdn.microsoft.com/en-us/library/ms741688%28v=vs.85%29.aspx - else: - # Request to read completed immediately. - # From my tests bytesReceived isn't reliable. - let realSize = - if bytesReceived == 0: - size - else: - bytesReceived - assert realSize <= size - retFuture.complete(realSize) - # We don't deallocate ``ol`` here because even though this completed - # immediately poll will still be notified about its completion and it will - # free ``ol``. + elif ret == 0: + # Request completed immediately. + if bytesReceived != 0: + assert bytesReceived <= size + retFuture.complete(bytesReceived) + else: + if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)): + retFuture.complete(bytesReceived) return retFuture proc send*(socket: AsyncFD, data: string, @@ -811,6 +862,101 @@ when defined(windows) or defined(nimdoc): # free ``ol``. return retFuture + proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr, + saddrLen: Socklen, + flags = {SocketFlag.SafeDisconn}): Future[void] = + ## Sends ``data`` to specified destination ``saddr``, using + ## socket ``socket``. The returned future will complete once all data + ## has been sent. + verifyPresence(socket) + var retFuture = newFuture[void]("sendTo") + var dataBuf: TWSABuf + dataBuf.buf = cast[cstring](data) + dataBuf.len = size.ULONG + var bytesSent = 0.Dword + var lowFlags = 0.Dword + + # we will preserve address in our stack + var staddr: array[128, char] # SOCKADDR_STORAGE size is 128 bytes + var stalen: cint = cint(saddrLen) + zeroMem(addr(staddr[0]), 128) + copyMem(addr(staddr[0]), saddr, saddrLen) + + var ol = PCustomOverlapped() + GC_ref(ol) + ol.data = CompletionData(fd: socket, cb: + proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + if not retFuture.finished: + if errcode == OSErrorCode(-1): + retFuture.complete() + else: + retFuture.fail(newException(OSError, osErrorMsg(errcode))) + ) + + let ret = WSASendTo(socket.SocketHandle, addr dataBuf, 1, addr bytesSent, + lowFlags, cast[ptr SockAddr](addr(staddr[0])), + stalen, cast[POVERLAPPED](ol), nil) + if ret == -1: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + GC_unref(ol) + retFuture.fail(newException(OSError, osErrorMsg(err))) + else: + retFuture.complete() + # We don't deallocate ``ol`` here because even though this completed + # immediately poll will still be notified about its completion and it will + # free ``ol``. + return retFuture + + proc recvFromInto*(socket: AsyncFD, data: pointer, size: int, + saddr: ptr SockAddr, saddrLen: ptr SockLen, + flags = {SocketFlag.SafeDisconn}): Future[int] = + ## Receives a datagram data from ``socket`` into ``buf``, which must + ## be at least of size ``size``, address of datagram's sender will be + ## stored into ``saddr`` and ``saddrLen``. Returned future will complete + ## once one datagram has been received, and will return size of packet + ## received. + verifyPresence(socket) + var retFuture = newFuture[int]("recvFromInto") + + var dataBuf = TWSABuf(buf: cast[cstring](data), len: size.ULONG) + + var bytesReceived = 0.Dword + var lowFlags = 0.Dword + + var ol = PCustomOverlapped() + GC_ref(ol) + ol.data = CompletionData(fd: socket, cb: + proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + if not retFuture.finished: + if errcode == OSErrorCode(-1): + assert bytesCount <= size + retFuture.complete(bytesCount) + else: + # datagram sockets don't have disconnection, + # so we can just raise an exception + retFuture.fail(newException(OSError, osErrorMsg(errcode))) + ) + + let res = WSARecvFrom(socket.SocketHandle, addr dataBuf, 1, + addr bytesReceived, addr lowFlags, + saddr, cast[ptr cint](saddrLen), + cast[POVERLAPPED](ol), nil) + if res == -1: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + GC_unref(ol) + retFuture.fail(newException(OSError, osErrorMsg(err))) + else: + # Request completed immediately. + if bytesReceived != 0: + assert bytesReceived <= size + retFuture.complete(bytesReceived) + else: + if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)): + retFuture.complete(bytesReceived) + return retFuture + proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}): Future[tuple[address: string, client: AsyncFD]] = ## Accepts a new connection. Returns a future containing the client socket @@ -837,7 +983,7 @@ when defined(windows) or defined(nimdoc): let dwLocalAddressLength = Dword(sizeof (Sockaddr_in) + 16) let dwRemoteAddressLength = Dword(sizeof(Sockaddr_in) + 16) - template completeAccept(): stmt {.immediate, dirty.} = + template completeAccept() {.dirty.} = var listenSock = socket let setoptRet = setsockopt(clientSock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, addr listenSock, @@ -857,7 +1003,7 @@ when defined(windows) or defined(nimdoc): client: clientSock.AsyncFD) ) - template failAccept(errcode): stmt = + template failAccept(errcode) = if flags.isDisconnectionError(errcode): var newAcceptFut = acceptAddr(socket, flags) newAcceptFut.callback = @@ -923,6 +1069,126 @@ when defined(windows) or defined(nimdoc): ## Unregisters ``fd``. getGlobalDispatcher().handles.excl(fd) + {.push stackTrace:off.} + proc waitableCallback(param: pointer, + timerOrWaitFired: WINBOOL): void {.stdcall.} = + var p = cast[PostCallbackDataPtr](param) + discard postQueuedCompletionStatus(p.ioPort, timerOrWaitFired.Dword, + ULONG_PTR(p.handleFd), + cast[pointer](p.ovl)) + {.pop.} + + template registerWaitableEvent(mask) = + let p = getGlobalDispatcher() + var flags = (WT_EXECUTEINWAITTHREAD or WT_EXECUTEONLYONCE).Dword + var hEvent = wsaCreateEvent() + if hEvent == 0: + raiseOSError(osLastError()) + var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData))) + pcd.ioPort = p.ioPort + pcd.handleFd = fd + var ol = PCustomOverlapped() + GC_ref(ol) + + ol.data = CompletionData(fd: fd, cb: + proc(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + # we excluding our `fd` because cb(fd) can register own handler + # for this `fd` + p.handles.excl(fd) + # unregisterWait() is called before callback, because appropriate + # winsockets function can re-enable event. + # https://msdn.microsoft.com/en-us/library/windows/desktop/ms741576(v=vs.85).aspx + if unregisterWait(pcd.waitFd) == 0: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + raiseOSError(osLastError()) + if cb(fd): + # callback returned `true`, so we free all allocated resources + deallocShared(cast[pointer](pcd)) + if not wsaCloseEvent(hEvent): + raiseOSError(osLastError()) + # pcd.ovl will be unrefed in poll(). + else: + # callback returned `false` we need to continue + if p.handles.contains(fd): + # new callback was already registered with `fd`, so we free all + # allocated resources. This happens because in callback `cb` + # addRead/addWrite was called with same `fd`. + deallocShared(cast[pointer](pcd)) + if not wsaCloseEvent(hEvent): + raiseOSError(osLastError()) + else: + # we need to include `fd` again + p.handles.incl(fd) + # and register WaitForSingleObject again + if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent, + cast[WAITORTIMERCALLBACK](waitableCallback), + cast[pointer](pcd), INFINITE, flags): + # pcd.ovl will be unrefed in poll() + discard wsaCloseEvent(hEvent) + deallocShared(cast[pointer](pcd)) + raiseOSError(osLastError()) + else: + # we ref pcd.ovl one more time, because it will be unrefed in + # poll() + GC_ref(pcd.ovl) + ) + # We need to protect our callback environment value, so GC will not free it + # accidentally. + ol.data.cell = system.protect(rawEnv(ol.data.cb)) + + # This is main part of `hacky way` is using WSAEventSelect, so `hEvent` + # will be signaled when appropriate `mask` events will be triggered. + if wsaEventSelect(fd.SocketHandle, hEvent, mask) != 0: + GC_unref(ol) + deallocShared(cast[pointer](pcd)) + discard wsaCloseEvent(hEvent) + raiseOSError(osLastError()) + + pcd.ovl = ol + if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent, + cast[WAITORTIMERCALLBACK](waitableCallback), + cast[pointer](pcd), INFINITE, flags): + GC_unref(ol) + deallocShared(cast[pointer](pcd)) + discard wsaCloseEvent(hEvent) + raiseOSError(osLastError()) + p.handles.incl(fd) + + proc addRead*(fd: AsyncFD, cb: Callback) = + ## Start watching the file descriptor for read availability and then call + ## the callback ``cb``. + ## + ## This is not ``pure`` mechanism for Windows Completion Ports (IOCP), + ## so if you can avoid it, please do it. Use `addRead` only if really + ## need it (main usecase is adaptation of `unix like` libraries to be + ## asynchronous on Windows). + ## If you use this function, you dont need to use asyncdispatch.recv() + ## or asyncdispatch.accept(), because they are using IOCP, please use + ## nativesockets.recv() and nativesockets.accept() instead. + ## + ## Be sure your callback ``cb`` returns ``true``, if you want to remove + ## watch of `read` notifications, and ``false``, if you want to continue + ## receiving notifies. + registerWaitableEvent(FD_READ or FD_ACCEPT or FD_OOB or FD_CLOSE) + + proc addWrite*(fd: AsyncFD, cb: Callback) = + ## Start watching the file descriptor for write availability and then call + ## the callback ``cb``. + ## + ## This is not ``pure`` mechanism for Windows Completion Ports (IOCP), + ## so if you can avoid it, please do it. Use `addWrite` only if really + ## need it (main usecase is adaptation of `unix like` libraries to be + ## asynchronous on Windows). + ## If you use this function, you dont need to use asyncdispatch.send() + ## or asyncdispatch.connect(), because they are using IOCP, please use + ## nativesockets.send() and nativesockets.connect() instead. + ## + ## Be sure your callback ``cb`` returns ``true``, if you want to remove + ## watch of `write` notifications, and ``false``, if you want to continue + ## receiving notifies. + registerWaitableEvent(FD_WRITE or FD_CONNECT or FD_CLOSE) + initAll() else: import selectors @@ -956,7 +1222,8 @@ else: proc newDispatcher*(): PDispatcher = new result result.selector = newSelector() - result.timers = @[] + result.timers.newHeapQueue() + result.callbacks = initQueue[proc ()](64) var gDisp{.threadvar.}: PDispatcher ## Global dispatcher proc getGlobalDispatcher*(): PDispatcher = @@ -1014,7 +1281,7 @@ else: proc poll*(timeout = 500) = let p = getGlobalDispatcher() - for info in p.selector.select(timeout): + for info in p.selector.select(p.adjustedTimeout(timeout)): let data = PData(info.key.data) assert data.fd == info.key.fd.AsyncFD #echo("In poll ", data.fd.cint) @@ -1052,7 +1319,10 @@ else: # (e.g. socket disconnected). discard + # Timer processing. processTimers(p) + # Callback queue processing + processPendingCallbacks(p) proc connect*(socket: AsyncFD, address: string, port: Port, domain = AF_INET): Future[void] = @@ -1184,6 +1454,60 @@ else: addWrite(socket, cb) return retFuture + proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr, + saddrLen: SockLen, + flags = {SocketFlag.SafeDisconn}): Future[void] = + ## Sends ``data`` of size ``size`` in bytes to specified destination + ## (``saddr`` of size ``saddrLen`` in bytes, using socket ``socket``. + ## The returned future will complete once all data has been sent. + var retFuture = newFuture[void]("sendTo") + + # we will preserve address in our stack + var staddr: array[128, char] # SOCKADDR_STORAGE size is 128 bytes + var stalen = saddrLen + zeroMem(addr(staddr[0]), 128) + copyMem(addr(staddr[0]), saddr, saddrLen) + + proc cb(sock: AsyncFD): bool = + result = true + let res = sendto(sock.SocketHandle, data, size, MSG_NOSIGNAL, + cast[ptr SockAddr](addr(staddr[0])), stalen) + if res < 0: + let lastError = osLastError() + if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: + retFuture.fail(newException(OSError, osErrorMsg(lastError))) + else: + result = false # We still want this callback to be called. + else: + retFuture.complete() + + addWrite(socket, cb) + return retFuture + + proc recvFromInto*(socket: AsyncFD, data: pointer, size: int, + saddr: ptr SockAddr, saddrLen: ptr SockLen, + flags = {SocketFlag.SafeDisconn}): Future[int] = + ## Receives a datagram data from ``socket`` into ``data``, which must + ## be at least of size ``size`` in bytes, address of datagram's sender + ## will be stored into ``saddr`` and ``saddrLen``. Returned future will + ## complete once one datagram has been received, and will return size + ## of packet received. + var retFuture = newFuture[int]("recvFromInto") + proc cb(sock: AsyncFD): bool = + result = true + let res = recvfrom(sock.SocketHandle, data, size.cint, flags.toOSFlags(), + saddr, saddrLen) + if res < 0: + let lastError = osLastError() + if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: + retFuture.fail(newException(OSError, osErrorMsg(lastError))) + else: + result = false + else: + retFuture.complete(res) + addRead(socket, cb) + return retFuture + proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}): Future[tuple[address: string, client: AsyncFD]] = var retFuture = newFuture[tuple[address: string, @@ -1215,7 +1539,25 @@ proc sleepAsync*(ms: int): Future[void] = ## ``ms`` milliseconds. var retFuture = newFuture[void]("sleepAsync") let p = getGlobalDispatcher() - p.timers.add((epochTime() + (ms / 1000), retFuture)) + p.timers.push((epochTime() + (ms / 1000), retFuture)) + return retFuture + +proc withTimeout*[T](fut: Future[T], timeout: int): Future[bool] = + ## Returns a future which will complete once ``fut`` completes or after + ## ``timeout`` milliseconds has elapsed. + ## + ## If ``fut`` completes first the returned future will hold true, + ## otherwise, if ``timeout`` milliseconds has elapsed first, the returned + ## future will hold false. + + var retFuture = newFuture[bool]("asyncdispatch.`withTimeout`") + var timeoutFuture = sleepAsync(timeout) + fut.callback = + proc () = + if not retFuture.finished: retFuture.complete(true) + timeoutFuture.callback = + proc () = + if not retFuture.finished: retFuture.complete(false) return retFuture proc accept*(socket: AsyncFD, @@ -1248,7 +1590,7 @@ proc skipStmtList(node: NimNode): NimNode {.compileTime.} = result = node[0] template createCb(retFutureSym, iteratorNameSym, - name: expr): stmt {.immediate.} = + name: untyped) = var nameIterVar = iteratorNameSym #{.push stackTrace: off.} proc cb {.closure,gcsafe.} = @@ -1316,6 +1658,23 @@ proc generateExceptionCheck(futSym, ) result.add elseNode +template useVar(result: var NimNode, futureVarNode: NimNode, valueReceiver, + rootReceiver: expr, fromNode: NimNode) = + ## Params: + ## futureVarNode: The NimNode which is a symbol identifying the Future[T] + ## variable to yield. + ## fromNode: Used for better debug information (to give context). + ## valueReceiver: The node which defines an expression that retrieves the + ## future's value. + ## + ## rootReceiver: ??? TODO + # -> yield future<x> + result.add newNimNode(nnkYieldStmt, fromNode).add(futureVarNode) + # -> future<x>.read + valueReceiver = newDotExpr(futureVarNode, newIdentNode("read")) + result.add generateExceptionCheck(futureVarNode, tryStmt, rootReceiver, + fromNode) + template createVar(result: var NimNode, futSymName: string, asyncProc: NimNode, valueReceiver, rootReceiver: expr, @@ -1323,9 +1682,7 @@ template createVar(result: var NimNode, futSymName: string, result = newNimNode(nnkStmtList, fromNode) var futSym = genSym(nskVar, "future") result.add newVarStmt(futSym, asyncProc) # -> var future<x> = y - result.add newNimNode(nnkYieldStmt, fromNode).add(futSym) # -> yield future<x> - valueReceiver = newDotExpr(futSym, newIdentNode("read")) # -> future<x>.read - result.add generateExceptionCheck(futSym, tryStmt, rootReceiver, fromNode) + useVar(result, futSym, valueReceiver, rootReceiver, fromNode) proc processBody(node, retFutureSym: NimNode, subTypeIsVoid: bool, @@ -1342,19 +1699,23 @@ proc processBody(node, retFutureSym: NimNode, else: result.add newCall(newIdentNode("complete"), retFutureSym) else: - result.add newCall(newIdentNode("complete"), retFutureSym, - node[0].processBody(retFutureSym, subTypeIsVoid, tryStmt)) + let x = node[0].processBody(retFutureSym, subTypeIsVoid, tryStmt) + if x.kind == nnkYieldStmt: result.add x + else: + result.add newCall(newIdentNode("complete"), retFutureSym, x) result.add newNimNode(nnkReturnStmt, node).add(newNilLit()) return # Don't process the children of this return stmt of nnkCommand, nnkCall: if node[0].kind == nnkIdent and node[0].ident == !"await": case node[1].kind - of nnkIdent, nnkInfix: + of nnkIdent, nnkInfix, nnkDotExpr: # await x + # await x or y result = newNimNode(nnkYieldStmt, node).add(node[1]) # -> yield x of nnkCall, nnkCommand: # await foo(p, x) + # await foo p, x var futureValue: NimNode result.createVar("future" & $node[1][0].toStrLit, node[1], futureValue, futureValue, node) @@ -1511,38 +1872,40 @@ proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} = # -> complete(retFuture, result) var iteratorNameSym = genSym(nskIterator, $prc[0].getName & "Iter") var procBody = prc[6].processBody(retFutureSym, subtypeIsVoid, nil) - if not subtypeIsVoid: - procBody.insert(0, newNimNode(nnkPragma).add(newIdentNode("push"), - newNimNode(nnkExprColonExpr).add(newNimNode(nnkBracketExpr).add( - newIdentNode("warning"), newIdentNode("resultshadowed")), - newIdentNode("off")))) # -> {.push warning[resultshadowed]: off.} - - procBody.insert(1, newNimNode(nnkVarSection, prc[6]).add( - newIdentDefs(newIdentNode("result"), baseType))) # -> var result: T - - procBody.insert(2, newNimNode(nnkPragma).add( - newIdentNode("pop"))) # -> {.pop.}) - - procBody.add( - newCall(newIdentNode("complete"), - retFutureSym, newIdentNode("result"))) # -> complete(retFuture, result) - else: - # -> complete(retFuture) - procBody.add(newCall(newIdentNode("complete"), retFutureSym)) + # don't do anything with forward bodies (empty) + if procBody.kind != nnkEmpty: + if not subtypeIsVoid: + procBody.insert(0, newNimNode(nnkPragma).add(newIdentNode("push"), + newNimNode(nnkExprColonExpr).add(newNimNode(nnkBracketExpr).add( + newIdentNode("warning"), newIdentNode("resultshadowed")), + newIdentNode("off")))) # -> {.push warning[resultshadowed]: off.} + + procBody.insert(1, newNimNode(nnkVarSection, prc[6]).add( + newIdentDefs(newIdentNode("result"), baseType))) # -> var result: T + + procBody.insert(2, newNimNode(nnkPragma).add( + newIdentNode("pop"))) # -> {.pop.}) + + procBody.add( + newCall(newIdentNode("complete"), + retFutureSym, newIdentNode("result"))) # -> complete(retFuture, result) + else: + # -> complete(retFuture) + procBody.add(newCall(newIdentNode("complete"), retFutureSym)) - var closureIterator = newProc(iteratorNameSym, [newIdentNode("FutureBase")], - procBody, nnkIteratorDef) - closureIterator[4] = newNimNode(nnkPragma, prc[6]).add(newIdentNode("closure")) - outerProcBody.add(closureIterator) + var closureIterator = newProc(iteratorNameSym, [newIdentNode("FutureBase")], + procBody, nnkIteratorDef) + closureIterator[4] = newNimNode(nnkPragma, prc[6]).add(newIdentNode("closure")) + outerProcBody.add(closureIterator) - # -> createCb(retFuture) - #var cbName = newIdentNode("cb") - var procCb = newCall(bindSym"createCb", retFutureSym, iteratorNameSym, - newStrLitNode(prc[0].getName)) - outerProcBody.add procCb + # -> createCb(retFuture) + #var cbName = newIdentNode("cb") + var procCb = getAst createCb(retFutureSym, iteratorNameSym, + newStrLitNode(prc[0].getName)) + outerProcBody.add procCb - # -> return retFuture - outerProcBody.add newNimNode(nnkReturnStmt, prc[6][prc[6].len-1]).add(retFutureSym) + # -> return retFuture + outerProcBody.add newNimNode(nnkReturnStmt, prc[6][prc[6].len-1]).add(retFutureSym) result = prc @@ -1550,19 +1913,19 @@ proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} = for i in 0 .. <result[4].len: if result[4][i].kind == nnkIdent and result[4][i].ident == !"async": result[4].del(i) + result[4] = newEmptyNode() if subtypeIsVoid: # Add discardable pragma. if returnType.kind == nnkEmpty: # Add Future[void] result[3][0] = parseExpr("Future[void]") - - result[6] = outerProcBody - + if procBody.kind != nnkEmpty: + result[6] = outerProcBody #echo(treeRepr(result)) - #if prc[0].getName == "hubConnectionLoop": + #if prc[0].getName == "testInfix": # echo(toStrLit(result)) -macro async*(prc: stmt): stmt {.immediate.} = +macro async*(prc: untyped): untyped = ## Macro which processes async procedures into the appropriate ## iterators and yield statements. if prc.kind == nnkStmtList: @@ -1571,6 +1934,8 @@ macro async*(prc: stmt): stmt {.immediate.} = result.add asyncSingleProc(oneProc) else: result = asyncSingleProc(prc) + when defined(nimDumpAsync): + echo repr result proc recvLine*(socket: AsyncFD): Future[string] {.async.} = ## Reads a line of data from ``socket``. Returned future will complete once @@ -1611,6 +1976,11 @@ proc recvLine*(socket: AsyncFD): Future[string] {.async.} = return add(result, c) +proc callSoon*(cbproc: proc ()) = + ## Schedule `cbproc` to be called as soon as possible. + ## The callback is called when control returns to the event loop. + getGlobalDispatcher().callbacks.enqueue(cbproc) + proc runForever*() = ## Begins a never ending global dispatcher poll loop. while true: |