diff options
Diffstat (limited to 'lib/pure/asyncdispatch.nim')
-rw-r--r-- | lib/pure/asyncdispatch.nim | 2065 |
1 files changed, 2065 insertions, 0 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim new file mode 100644 index 000000000..126db7a7f --- /dev/null +++ b/lib/pure/asyncdispatch.nim @@ -0,0 +1,2065 @@ +# +# +# Nim's Runtime Library +# (c) Copyright 2015 Dominik Picheta +# +# See the file "copying.txt", included in this +# distribution, for details about the copyright. +# + +## This module implements asynchronous IO. This includes a dispatcher, +## a `Future` type implementation, and an `async` macro which allows +## asynchronous code to be written in a synchronous style with the `await` +## keyword. +## +## The dispatcher acts as a kind of event loop. You must call `poll` on it +## (or a function which does so for you such as `waitFor` or `runForever`) +## in order to poll for any outstanding events. The underlying implementation +## is based on epoll on Linux, IO Completion Ports on Windows and select on +## other operating systems. +## +## The `poll` function will not, on its own, return any events. Instead +## an appropriate `Future` object will be completed. A `Future` is a +## type which holds a value which is not yet available, but which *may* be +## available in the future. You can check whether a future is finished +## by using the `finished` function. When a future is finished it means that +## either the value that it holds is now available or it holds an error instead. +## The latter situation occurs when the operation to complete a future fails +## with an exception. You can distinguish between the two situations with the +## `failed` function. +## +## Future objects can also store a callback procedure which will be called +## automatically once the future completes. +## +## Futures therefore can be thought of as an implementation of the proactor +## pattern. In this +## pattern you make a request for an action, and once that action is fulfilled +## a future is completed with the result of that action. Requests can be +## made by calling the appropriate functions. For example: calling the `recv` +## function will create a request for some data to be read from a socket. The +## future which the `recv` function returns will then complete once the +## requested amount of data is read **or** an exception occurs. +## +## Code to read some data from a socket may look something like this: +## ```Nim +## var future = socket.recv(100) +## future.addCallback( +## proc () = +## echo(future.read) +## ) +## ``` +## +## All asynchronous functions returning a `Future` will not block. They +## will not however return immediately. An asynchronous function will have +## code which will be executed before an asynchronous request is made, in most +## cases this code sets up the request. +## +## In the above example, the `recv` function will return a brand new +## `Future` instance once the request for data to be read from the socket +## is made. This `Future` instance will complete once the requested amount +## of data is read, in this case it is 100 bytes. The second line sets a +## callback on this future which will be called once the future completes. +## All the callback does is write the data stored in the future to `stdout`. +## The `read` function is used for this and it checks whether the future +## completes with an error for you (if it did, it will simply raise the +## error), if there is no error, however, it returns the value of the future. +## +## Asynchronous procedures +## ======================= +## +## Asynchronous procedures remove the pain of working with callbacks. They do +## this by allowing you to write asynchronous code the same way as you would +## write synchronous code. +## +## An asynchronous procedure is marked using the `{.async.}` pragma. +## When marking a procedure with the `{.async.}` pragma it must have a +## `Future[T]` return type or no return type at all. If you do not specify +## a return type then `Future[void]` is assumed. +## +## Inside asynchronous procedures `await` can be used to call any +## procedures which return a +## `Future`; this includes asynchronous procedures. When a procedure is +## "awaited", the asynchronous procedure it is awaited in will +## suspend its execution +## until the awaited procedure's Future completes. At which point the +## asynchronous procedure will resume its execution. During the period +## when an asynchronous procedure is suspended other asynchronous procedures +## will be run by the dispatcher. +## +## The `await` call may be used in many contexts. It can be used on the right +## hand side of a variable declaration: `var data = await socket.recv(100)`, +## in which case the variable will be set to the value of the future +## automatically. It can be used to await a `Future` object, and it can +## be used to await a procedure returning a `Future[void]`: +## `await socket.send("foobar")`. +## +## If an awaited future completes with an error, then `await` will re-raise +## this error. To avoid this, you can use the `yield` keyword instead of +## `await`. The following section shows different ways that you can handle +## exceptions in async procs. +## +## .. caution:: +## Procedures marked {.async.} do not support mutable parameters such +## as `var int`. References such as `ref int` should be used instead. +## +## Handling Exceptions +## ------------------- +## +## You can handle exceptions in the same way as in ordinary Nim code; +## by using the try statement: +## +## ```Nim +## try: +## let data = await sock.recv(100) +## echo("Received ", data) +## except: +## # Handle exception +## ``` +## +## An alternative approach to handling exceptions is to use `yield` on a future +## then check the future's `failed` property. For example: +## +## ```Nim +## var future = sock.recv(100) +## yield future +## if future.failed: +## # Handle exception +## ``` +## +## +## Discarding futures +## ================== +## +## Futures should **never** be discarded directly because they may contain +## errors. If you do not care for the result of a Future then you should use +## the `asyncCheck` procedure instead of the `discard` keyword. Note that this +## does not wait for completion, and you should use `waitFor` or `await` for that purpose. +## +## .. note:: `await` also checks if the future fails, so you can safely discard +## its result. +## +## Handling futures +## ================ +## +## There are many different operations that apply to a future. +## The three primary high-level operations are `asyncCheck`, +## `waitFor`, and `await`. +## +## * `asyncCheck`: Raises an exception if the future fails. It neither waits +## for the future to finish nor returns the result of the future. +## * `waitFor`: Polls the event loop and blocks the current thread until the +## future finishes. This is often used to call an async procedure from a +## synchronous context and should never be used in an `async` proc. +## * `await`: Pauses execution in the current async procedure until the future +## finishes. While the current procedure is paused, other async procedures will +## continue running. Should be used instead of `waitFor` in an async +## procedure. +## +## Here is a handy quick reference chart showing their high-level differences: +## ============== ===================== ======================= +## Procedure Context Blocking +## ============== ===================== ======================= +## `asyncCheck` non-async and async non-blocking +## `waitFor` non-async blocks current thread +## `await` async suspends current proc +## ============== ===================== ======================= +## +## Examples +## ======== +## +## For examples take a look at the documentation for the modules implementing +## asynchronous IO. A good place to start is the +## `asyncnet module <asyncnet.html>`_. +## +## Investigating pending futures +## ============================= +## +## It's possible to get into a situation where an async proc, or more accurately +## a `Future[T]` gets stuck and +## never completes. This can happen for various reasons and can cause serious +## memory leaks. When this occurs it's hard to identify the procedure that is +## stuck. +## +## Thankfully there is a mechanism which tracks the count of each pending future. +## All you need to do to enable it is compile with `-d:futureLogging` and +## use the `getFuturesInProgress` procedure to get the list of pending futures +## together with the stack traces to the moment of their creation. +## +## You may also find it useful to use this +## `prometheus package <https://github.com/dom96/prometheus>`_ which will log +## the pending futures into prometheus, allowing you to analyse them via a nice +## graph. +## +## +## +## Limitations/Bugs +## ================ +## +## * The effect system (`raises: []`) does not work with async procedures. +## * Mutable parameters are not supported by async procedures. +## +## +## Multiple async backend support +## ============================== +## +## Thanks to its powerful macro support, Nim allows ``async``/``await`` to be +## implemented in libraries with only minimal support from the language - as +## such, multiple ``async`` libraries exist, including ``asyncdispatch`` and +## ``chronos``, and more may come to be developed in the future. +## +## Libraries built on top of async/await may wish to support multiple async +## backends - the best way to do so is to create separate modules for each backend +## that may be imported side-by-side. +## +## An alternative way is to select backend using a global compile flag - this +## method makes it difficult to compose applications that use both backends as may +## happen with transitive dependencies, but may be appropriate in some cases - +## libraries choosing this path should call the flag `asyncBackend`, allowing +## applications to choose the backend with `-d:asyncBackend=<backend_name>`. +## +## Known `async` backends include: +## +## * `-d:asyncBackend=none`: disable `async` support completely +## * `-d:asyncBackend=asyncdispatch`: https://nim-lang.org/docs/asyncdispatch.html +## * `-d:asyncBackend=chronos`: https://github.com/status-im/nim-chronos/ +## +## ``none`` can be used when a library supports both a synchronous and +## asynchronous API, to disable the latter. + +import std/[os, tables, strutils, times, heapqueue, options, asyncstreams] +import std/[math, monotimes] +import std/asyncfutures except callSoon + +import std/[nativesockets, net, deques] + +when defined(nimPreviewSlimSystem): + import std/[assertions, syncio] + +export Port, SocketFlag +export asyncfutures except callSoon +export asyncstreams + +# TODO: Check if yielded future is nil and throw a more meaningful exception + +type + PDispatcherBase = ref object of RootRef + timers*: HeapQueue[tuple[finishAt: MonoTime, fut: Future[void]]] + callbacks*: Deque[proc () {.gcsafe.}] + +proc processTimers( + p: PDispatcherBase, didSomeWork: var bool +): Option[int] {.inline.} = + # Pop the timers in the order in which they will expire (smaller `finishAt`). + var count = p.timers.len + let t = getMonoTime() + while count > 0 and t >= p.timers[0].finishAt: + p.timers.pop().fut.complete() + dec count + didSomeWork = true + + # Return the number of milliseconds in which the next timer will expire. + if p.timers.len == 0: return + + let millisecs = (p.timers[0].finishAt - getMonoTime()).inMilliseconds + return some(millisecs.int + 1) + +proc processPendingCallbacks(p: PDispatcherBase; didSomeWork: var bool) = + while p.callbacks.len > 0: + var cb = p.callbacks.popFirst() + cb() + didSomeWork = true + +proc adjustTimeout( + p: PDispatcherBase, pollTimeout: int, nextTimer: Option[int] +): int {.inline.} = + if p.callbacks.len != 0: + return 0 + + if nextTimer.isNone() or pollTimeout == -1: + return pollTimeout + + result = max(nextTimer.get(), 0) + result = min(pollTimeout, result) + +proc runOnce(timeout: int): bool {.gcsafe.} + +proc callSoon*(cbproc: proc () {.gcsafe.}) {.gcsafe.} + ## Schedule `cbproc` to be called as soon as possible. + ## The callback is called when control returns to the event loop. + +proc initCallSoonProc = + if asyncfutures.getCallSoonProc().isNil: + asyncfutures.setCallSoonProc(callSoon) + +template implementSetInheritable() {.dirty.} = + when declared(setInheritable): + proc setInheritable*(fd: AsyncFD, inheritable: bool): bool = + ## Control whether a file handle can be inherited by child processes. + ## Returns `true` on success. + ## + ## This procedure is not guaranteed to be available for all platforms. + ## Test for availability with `declared() <system.html#declared,untyped>`_. + fd.FileHandle.setInheritable(inheritable) + +when defined(windows) or defined(nimdoc): + import std/[winlean, sets, hashes] + type + CompletionKey = ULONG_PTR + + CompletionData* = object + fd*: AsyncFD # TODO: Rename this. + cb*: owned(proc (fd: AsyncFD, bytesTransferred: DWORD, + errcode: OSErrorCode) {.closure, gcsafe.}) + cell*: ForeignCell # we need this `cell` to protect our `cb` environment, + # when using RegisterWaitForSingleObject, because + # waiting is done in different thread. + + PDispatcher* = ref object of PDispatcherBase + ioPort: Handle + handles*: HashSet[AsyncFD] # Export handles so that an external library can register them. + + CustomObj = object of OVERLAPPED + data*: CompletionData + + CustomRef* = ref CustomObj + + AsyncFD* = distinct int + + PostCallbackData = object + ioPort: Handle + handleFd: AsyncFD + waitFd: Handle + ovl: owned CustomRef + PostCallbackDataPtr = ptr PostCallbackData + + AsyncEventImpl = object + hEvent: Handle + hWaiter: Handle + pcd: PostCallbackDataPtr + AsyncEvent* = ptr AsyncEventImpl + + Callback* = proc (fd: AsyncFD): bool {.closure, gcsafe.} + + proc hash(x: AsyncFD): Hash {.borrow.} + proc `==`*(x: AsyncFD, y: AsyncFD): bool {.borrow.} + + proc newDispatcher*(): owned PDispatcher = + ## Creates a new Dispatcher instance. + new result + result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1) + result.handles = initHashSet[AsyncFD]() + result.timers.clear() + result.callbacks = initDeque[proc () {.closure, gcsafe.}](64) + + var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher + + proc setGlobalDispatcher*(disp: sink PDispatcher) = + if not gDisp.isNil: + assert gDisp.callbacks.len == 0 + gDisp = disp + initCallSoonProc() + + proc getGlobalDispatcher*(): PDispatcher = + if gDisp.isNil: + setGlobalDispatcher(newDispatcher()) + result = gDisp + + proc getIoHandler*(disp: PDispatcher): Handle = + ## Returns the underlying IO Completion Port handle (Windows) or selector + ## (Unix) for the specified dispatcher. + return disp.ioPort + + proc register*(fd: AsyncFD) = + ## Registers `fd` with the dispatcher. + let p = getGlobalDispatcher() + + if createIoCompletionPort(fd.Handle, p.ioPort, + cast[CompletionKey](fd), 1) == 0: + raiseOSError(osLastError()) + p.handles.incl(fd) + + proc verifyPresence(fd: AsyncFD) = + ## Ensures that file descriptor has been registered with the dispatcher. + ## Raises ValueError if `fd` has not been registered. + let p = getGlobalDispatcher() + if fd notin p.handles: + raise newException(ValueError, + "Operation performed on a socket which has not been registered with" & + " the dispatcher yet.") + + proc hasPendingOperations*(): bool = + ## Returns `true` if the global dispatcher has pending operations. + let p = getGlobalDispatcher() + p.handles.len != 0 or p.timers.len != 0 or p.callbacks.len != 0 + + proc runOnce(timeout: int): bool = + let p = getGlobalDispatcher() + if p.handles.len == 0 and p.timers.len == 0 and p.callbacks.len == 0: + raise newException(ValueError, + "No handles or timers registered in dispatcher.") + + result = false + let nextTimer = processTimers(p, result) + let at = adjustTimeout(p, timeout, nextTimer) + var llTimeout = + if at == -1: winlean.INFINITE + else: at.int32 + + var lpNumberOfBytesTransferred: DWORD + var lpCompletionKey: ULONG_PTR + var customOverlapped: CustomRef + let res = getQueuedCompletionStatus(p.ioPort, + addr lpNumberOfBytesTransferred, addr lpCompletionKey, + cast[ptr POVERLAPPED](addr customOverlapped), llTimeout).bool + result = true + # For 'gcDestructors' the destructor of 'customOverlapped' will + # be called at the end and we are the only owner here. This means + # We do not have to 'GC_unref(customOverlapped)' because the destructor + # does that for us. + + # http://stackoverflow.com/a/12277264/492186 + # TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html + if res: + # This is useful for ensuring the reliability of the overlapped struct. + assert customOverlapped.data.fd == lpCompletionKey.AsyncFD + + customOverlapped.data.cb(customOverlapped.data.fd, + lpNumberOfBytesTransferred, OSErrorCode(-1)) + + # If cell.data != nil, then system.protect(rawEnv(cb)) was called, + # so we need to dispose our `cb` environment, because it is not needed + # anymore. + if customOverlapped.data.cell.data != nil: + system.dispose(customOverlapped.data.cell) + + when not defined(gcDestructors): + GC_unref(customOverlapped) + else: + let errCode = osLastError() + if customOverlapped != nil: + assert customOverlapped.data.fd == lpCompletionKey.AsyncFD + customOverlapped.data.cb(customOverlapped.data.fd, + lpNumberOfBytesTransferred, errCode) + if customOverlapped.data.cell.data != nil: + system.dispose(customOverlapped.data.cell) + when not defined(gcDestructors): + GC_unref(customOverlapped) + else: + if errCode.int32 == WAIT_TIMEOUT: + # Timed out + result = false + else: raiseOSError(errCode) + + # Timer processing. + discard processTimers(p, result) + # Callback queue processing + processPendingCallbacks(p, result) + + + var acceptEx: WSAPROC_ACCEPTEX + var connectEx: WSAPROC_CONNECTEX + var getAcceptExSockAddrs: WSAPROC_GETACCEPTEXSOCKADDRS + + proc initPointer(s: SocketHandle, fun: var pointer, guid: var GUID): bool = + # Ref: https://github.com/powdahound/twisted/blob/master/twisted/internet/iocpreactor/iocpsupport/winsock_pointers.c + var bytesRet: DWORD + fun = nil + result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid, + sizeof(GUID).DWORD, addr fun, sizeof(pointer).DWORD, + addr bytesRet, nil, nil) == 0 + + proc initAll() = + let dummySock = createNativeSocket() + if dummySock == INVALID_SOCKET: + raiseOSError(osLastError()) + var fun: pointer = nil + if not initPointer(dummySock, fun, WSAID_CONNECTEX): + raiseOSError(osLastError()) + connectEx = cast[WSAPROC_CONNECTEX](fun) + if not initPointer(dummySock, fun, WSAID_ACCEPTEX): + raiseOSError(osLastError()) + acceptEx = cast[WSAPROC_ACCEPTEX](fun) + if not initPointer(dummySock, fun, WSAID_GETACCEPTEXSOCKADDRS): + raiseOSError(osLastError()) + getAcceptExSockAddrs = cast[WSAPROC_GETACCEPTEXSOCKADDRS](fun) + close(dummySock) + + proc newCustom*(): CustomRef = + result = CustomRef() # 0 + GC_ref(result) # 1 prevent destructor from doing a premature free. + # destructor of newCustom's caller --> 0. This means + # Windows holds a ref for us with RC == 0 (single owner). + # This is passed back to us in the IO completion port. + + proc recv*(socket: AsyncFD, size: int, + flags = {SocketFlag.SafeDisconn}): owned(Future[string]) = + ## Reads **up to** `size` bytes from `socket`. Returned future will + ## complete once all the data requested is read, a part of the data has been + ## read, or the socket has disconnected in which case the future will + ## complete with a value of `""`. + ## + ## .. warning:: The `Peek` socket flag is not supported on Windows. + + + # Things to note: + # * When WSARecv completes immediately then `bytesReceived` is very + # unreliable. + # * Still need to implement message-oriented socket disconnection, + # '\0' in the message currently signifies a socket disconnect. Who + # knows what will happen when someone sends that to our socket. + verifyPresence(socket) + assert SocketFlag.Peek notin flags, "Peek not supported on Windows." + + var retFuture = newFuture[string]("recv") + var dataBuf: TWSABuf + dataBuf.buf = cast[cstring](alloc0(size)) + dataBuf.len = size.ULONG + + var bytesReceived: DWORD + var flagsio = flags.toOSFlags().DWORD + var ol = newCustom() + ol.data = CompletionData(fd: socket, cb: + proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = + if not retFuture.finished: + if errcode == OSErrorCode(-1): + if bytesCount == 0 and dataBuf.buf[0] == '\0': + retFuture.complete("") + else: + var data = newString(bytesCount) + assert bytesCount <= size + copyMem(addr data[0], addr dataBuf.buf[0], bytesCount) + retFuture.complete($data) + else: + if flags.isDisconnectionError(errcode): + retFuture.complete("") + else: + retFuture.fail(newOSError(errcode)) + if dataBuf.buf != nil: + dealloc dataBuf.buf + dataBuf.buf = nil + ) + + let ret = WSARecv(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived, + addr flagsio, cast[POVERLAPPED](ol), nil) + if ret == -1: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + if dataBuf.buf != nil: + dealloc dataBuf.buf + dataBuf.buf = nil + GC_unref(ol) + if flags.isDisconnectionError(err): + retFuture.complete("") + else: + retFuture.fail(newOSError(err)) + elif ret == 0: + # Request completed immediately. + if bytesReceived != 0: + var data = newString(bytesReceived) + assert bytesReceived <= size + copyMem(addr data[0], addr dataBuf.buf[0], bytesReceived) + retFuture.complete($data) + else: + if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)): + retFuture.complete("") + return retFuture + + proc recvInto*(socket: AsyncFD, buf: pointer, size: int, + flags = {SocketFlag.SafeDisconn}): owned(Future[int]) = + ## Reads **up to** `size` bytes from `socket` into `buf`, which must + ## at least be of that size. Returned future will complete once all the + ## data requested is read, a part of the data has been read, or the socket + ## has disconnected in which case the future will complete with a value of + ## `0`. + ## + ## .. warning:: The `Peek` socket flag is not supported on Windows. + + + # Things to note: + # * When WSARecv completes immediately then `bytesReceived` is very + # unreliable. + # * Still need to implement message-oriented socket disconnection, + # '\0' in the message currently signifies a socket disconnect. Who + # knows what will happen when someone sends that to our socket. + verifyPresence(socket) + assert SocketFlag.Peek notin flags, "Peek not supported on Windows." + + var retFuture = newFuture[int]("recvInto") + + #buf[] = '\0' + var dataBuf: TWSABuf + dataBuf.buf = cast[cstring](buf) + dataBuf.len = size.ULONG + + var bytesReceived: DWORD + var flagsio = flags.toOSFlags().DWORD + var ol = newCustom() + ol.data = CompletionData(fd: socket, cb: + proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = + if not retFuture.finished: + if errcode == OSErrorCode(-1): + retFuture.complete(bytesCount) + else: + if flags.isDisconnectionError(errcode): + retFuture.complete(0) + else: + retFuture.fail(newOSError(errcode)) + if dataBuf.buf != nil: + dataBuf.buf = nil + ) + + let ret = WSARecv(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived, + addr flagsio, cast[POVERLAPPED](ol), nil) + if ret == -1: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + if dataBuf.buf != nil: + dataBuf.buf = nil + GC_unref(ol) + if flags.isDisconnectionError(err): + retFuture.complete(0) + else: + retFuture.fail(newOSError(err)) + elif ret == 0: + # Request completed immediately. + if bytesReceived != 0: + assert bytesReceived <= size + retFuture.complete(bytesReceived) + else: + if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)): + retFuture.complete(bytesReceived) + return retFuture + + proc send*(socket: AsyncFD, buf: pointer, size: int, + flags = {SocketFlag.SafeDisconn}): owned(Future[void]) = + ## Sends `size` bytes from `buf` to `socket`. The returned future + ## will complete once all data has been sent. + ## + ## .. warning:: Use it with caution. If `buf` refers to GC'ed object, + ## you must use GC_ref/GC_unref calls to avoid early freeing of the buffer. + verifyPresence(socket) + var retFuture = newFuture[void]("send") + + var dataBuf: TWSABuf + dataBuf.buf = cast[cstring](buf) + dataBuf.len = size.ULONG + + var bytesReceived, lowFlags: DWORD + var ol = newCustom() + ol.data = CompletionData(fd: socket, cb: + proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = + if not retFuture.finished: + if errcode == OSErrorCode(-1): + retFuture.complete() + else: + if flags.isDisconnectionError(errcode): + retFuture.complete() + else: + retFuture.fail(newOSError(errcode)) + ) + + let ret = WSASend(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived, + lowFlags, cast[POVERLAPPED](ol), nil) + if ret == -1: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + GC_unref(ol) + if flags.isDisconnectionError(err): + retFuture.complete() + else: + retFuture.fail(newOSError(err)) + else: + retFuture.complete() + # We don't deallocate `ol` here because even though this completed + # immediately poll will still be notified about its completion and it will + # free `ol`. + return retFuture + + proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr, + saddrLen: SockLen, + flags = {SocketFlag.SafeDisconn}): owned(Future[void]) = + ## Sends `data` to specified destination `saddr`, using + ## socket `socket`. The returned future will complete once all data + ## has been sent. + verifyPresence(socket) + var retFuture = newFuture[void]("sendTo") + var dataBuf: TWSABuf + dataBuf.buf = cast[cstring](data) + dataBuf.len = size.ULONG + var bytesSent = 0.DWORD + var lowFlags = 0.DWORD + + # we will preserve address in our stack + var staddr: array[128, char] # SOCKADDR_STORAGE size is 128 bytes + var stalen: cint = cint(saddrLen) + zeroMem(addr(staddr[0]), 128) + copyMem(addr(staddr[0]), saddr, saddrLen) + + var ol = newCustom() + ol.data = CompletionData(fd: socket, cb: + proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = + if not retFuture.finished: + if errcode == OSErrorCode(-1): + retFuture.complete() + else: + retFuture.fail(newOSError(errcode)) + ) + + let ret = WSASendTo(socket.SocketHandle, addr dataBuf, 1, addr bytesSent, + lowFlags, cast[ptr SockAddr](addr(staddr[0])), + stalen, cast[POVERLAPPED](ol), nil) + if ret == -1: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + GC_unref(ol) + retFuture.fail(newOSError(err)) + else: + retFuture.complete() + # We don't deallocate `ol` here because even though this completed + # immediately poll will still be notified about its completion and it will + # free `ol`. + return retFuture + + proc recvFromInto*(socket: AsyncFD, data: pointer, size: int, + saddr: ptr SockAddr, saddrLen: ptr SockLen, + flags = {SocketFlag.SafeDisconn}): owned(Future[int]) = + ## Receives a datagram data from `socket` into `buf`, which must + ## be at least of size `size`, address of datagram's sender will be + ## stored into `saddr` and `saddrLen`. Returned future will complete + ## once one datagram has been received, and will return size of packet + ## received. + verifyPresence(socket) + var retFuture = newFuture[int]("recvFromInto") + + var dataBuf = TWSABuf(buf: cast[cstring](data), len: size.ULONG) + + var bytesReceived = 0.DWORD + var lowFlags = 0.DWORD + + var ol = newCustom() + ol.data = CompletionData(fd: socket, cb: + proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = + if not retFuture.finished: + if errcode == OSErrorCode(-1): + assert bytesCount <= size + retFuture.complete(bytesCount) + else: + # datagram sockets don't have disconnection, + # so we can just raise an exception + retFuture.fail(newOSError(errcode)) + ) + + let res = WSARecvFrom(socket.SocketHandle, addr dataBuf, 1, + addr bytesReceived, addr lowFlags, + saddr, cast[ptr cint](saddrLen), + cast[POVERLAPPED](ol), nil) + if res == -1: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + GC_unref(ol) + retFuture.fail(newOSError(err)) + else: + # Request completed immediately. + if bytesReceived != 0: + assert bytesReceived <= size + retFuture.complete(bytesReceived) + else: + if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)): + retFuture.complete(bytesReceived) + return retFuture + + proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}, + inheritable = defined(nimInheritHandles)): + owned(Future[tuple[address: string, client: AsyncFD]]) {.gcsafe.} = + ## Accepts a new connection. Returns a future containing the client socket + ## corresponding to that connection and the remote address of the client. + ## The future will complete when the connection is successfully accepted. + ## + ## The resulting client socket is automatically registered to the + ## dispatcher. + ## + ## If `inheritable` is false (the default), the resulting client socket will + ## not be inheritable by child processes. + ## + ## The `accept` call may result in an error if the connecting socket + ## disconnects during the duration of the `accept`. If the `SafeDisconn` + ## flag is specified then this error will not be raised and instead + ## accept will be called again. + verifyPresence(socket) + var retFuture = newFuture[tuple[address: string, client: AsyncFD]]("acceptAddr") + + var clientSock = createNativeSocket(inheritable = inheritable) + if clientSock == osInvalidSocket: raiseOSError(osLastError()) + + const lpOutputLen = 1024 + var lpOutputBuf = newString(lpOutputLen) + var dwBytesReceived: DWORD + let dwReceiveDataLength = 0.DWORD # We don't want any data to be read. + let dwLocalAddressLength = DWORD(sizeof(Sockaddr_in6) + 16) + let dwRemoteAddressLength = DWORD(sizeof(Sockaddr_in6) + 16) + + template failAccept(errcode) = + if flags.isDisconnectionError(errcode): + var newAcceptFut = acceptAddr(socket, flags) + newAcceptFut.callback = + proc () = + if newAcceptFut.failed: + retFuture.fail(newAcceptFut.readError) + else: + retFuture.complete(newAcceptFut.read) + else: + retFuture.fail(newOSError(errcode)) + + template completeAccept() {.dirty.} = + var listenSock = socket + let setoptRet = setsockopt(clientSock, SOL_SOCKET, + SO_UPDATE_ACCEPT_CONTEXT, addr listenSock, + sizeof(listenSock).SockLen) + if setoptRet != 0: + let errcode = osLastError() + discard clientSock.closesocket() + failAccept(errcode) + else: + var localSockaddr, remoteSockaddr: ptr SockAddr + var localLen, remoteLen: int32 + getAcceptExSockAddrs(addr lpOutputBuf[0], dwReceiveDataLength, + dwLocalAddressLength, dwRemoteAddressLength, + addr localSockaddr, addr localLen, + addr remoteSockaddr, addr remoteLen) + try: + let address = getAddrString(remoteSockaddr) + register(clientSock.AsyncFD) + retFuture.complete((address: address, client: clientSock.AsyncFD)) + except: + # getAddrString may raise + clientSock.close() + retFuture.fail(getCurrentException()) + + var ol = newCustom() + ol.data = CompletionData(fd: socket, cb: + proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) {.gcsafe.} = + if not retFuture.finished: + if errcode == OSErrorCode(-1): + completeAccept() + else: + failAccept(errcode) + ) + + # http://msdn.microsoft.com/en-us/library/windows/desktop/ms737524%28v=vs.85%29.aspx + let ret = acceptEx(socket.SocketHandle, clientSock, addr lpOutputBuf[0], + dwReceiveDataLength, + dwLocalAddressLength, + dwRemoteAddressLength, + addr dwBytesReceived, cast[POVERLAPPED](ol)) + + if not ret: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + failAccept(err) + GC_unref(ol) + else: + completeAccept() + # We don't deallocate `ol` here because even though this completed + # immediately poll will still be notified about its completion and it will + # free `ol`. + + return retFuture + + implementSetInheritable() + + proc closeSocket*(socket: AsyncFD) = + ## Closes a socket and ensures that it is unregistered. + socket.SocketHandle.close() + getGlobalDispatcher().handles.excl(socket) + + proc unregister*(fd: AsyncFD) = + ## Unregisters `fd`. + getGlobalDispatcher().handles.excl(fd) + + proc contains*(disp: PDispatcher, fd: AsyncFD): bool = + return fd in disp.handles + + {.push stackTrace: off.} + proc waitableCallback(param: pointer, + timerOrWaitFired: WINBOOL) {.stdcall.} = + var p = cast[PostCallbackDataPtr](param) + discard postQueuedCompletionStatus(p.ioPort, timerOrWaitFired.DWORD, + ULONG_PTR(p.handleFd), + cast[pointer](p.ovl)) + {.pop.} + + proc registerWaitableEvent(fd: AsyncFD, cb: Callback; mask: DWORD) = + let p = getGlobalDispatcher() + var flags = (WT_EXECUTEINWAITTHREAD or WT_EXECUTEONLYONCE).DWORD + var hEvent = wsaCreateEvent() + if hEvent == 0: + raiseOSError(osLastError()) + var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData))) + pcd.ioPort = p.ioPort + pcd.handleFd = fd + var ol = newCustom() + + ol.data = CompletionData(fd: fd, cb: + proc(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) {.gcsafe.} = + # we excluding our `fd` because cb(fd) can register own handler + # for this `fd` + p.handles.excl(fd) + # unregisterWait() is called before callback, because appropriate + # winsockets function can re-enable event. + # https://msdn.microsoft.com/en-us/library/windows/desktop/ms741576(v=vs.85).aspx + if unregisterWait(pcd.waitFd) == 0: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + deallocShared(cast[pointer](pcd)) + discard wsaCloseEvent(hEvent) + raiseOSError(err) + if cb(fd): + # callback returned `true`, so we free all allocated resources + deallocShared(cast[pointer](pcd)) + if not wsaCloseEvent(hEvent): + raiseOSError(osLastError()) + # pcd.ovl will be unrefed in poll(). + else: + # callback returned `false` we need to continue + if p.handles.contains(fd): + # new callback was already registered with `fd`, so we free all + # allocated resources. This happens because in callback `cb` + # addRead/addWrite was called with same `fd`. + deallocShared(cast[pointer](pcd)) + if not wsaCloseEvent(hEvent): + raiseOSError(osLastError()) + else: + # we need to include `fd` again + p.handles.incl(fd) + # and register WaitForSingleObject again + if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent, + cast[WAITORTIMERCALLBACK](waitableCallback), + cast[pointer](pcd), INFINITE, flags): + # pcd.ovl will be unrefed in poll() + let err = osLastError() + deallocShared(cast[pointer](pcd)) + discard wsaCloseEvent(hEvent) + raiseOSError(err) + else: + # we incref `pcd.ovl` and `protect` callback one more time, + # because it will be unrefed and disposed in `poll()` after + # callback finishes. + GC_ref(pcd.ovl) + pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb)) + ) + # We need to protect our callback environment value, so GC will not free it + # accidentally. + ol.data.cell = system.protect(rawEnv(ol.data.cb)) + + # This is main part of `hacky way` is using WSAEventSelect, so `hEvent` + # will be signaled when appropriate `mask` events will be triggered. + if wsaEventSelect(fd.SocketHandle, hEvent, mask) != 0: + let err = osLastError() + GC_unref(ol) + deallocShared(cast[pointer](pcd)) + discard wsaCloseEvent(hEvent) + raiseOSError(err) + + pcd.ovl = ol + if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent, + cast[WAITORTIMERCALLBACK](waitableCallback), + cast[pointer](pcd), INFINITE, flags): + let err = osLastError() + GC_unref(ol) + deallocShared(cast[pointer](pcd)) + discard wsaCloseEvent(hEvent) + raiseOSError(err) + p.handles.incl(fd) + + proc addRead*(fd: AsyncFD, cb: Callback) = + ## Start watching the file descriptor for read availability and then call + ## the callback `cb`. + ## + ## This is not `pure` mechanism for Windows Completion Ports (IOCP), + ## so if you can avoid it, please do it. Use `addRead` only if really + ## need it (main usecase is adaptation of unix-like libraries to be + ## asynchronous on Windows). + ## + ## If you use this function, you don't need to use asyncdispatch.recv() + ## or asyncdispatch.accept(), because they are using IOCP, please use + ## nativesockets.recv() and nativesockets.accept() instead. + ## + ## Be sure your callback `cb` returns `true`, if you want to remove + ## watch of `read` notifications, and `false`, if you want to continue + ## receiving notifications. + registerWaitableEvent(fd, cb, FD_READ or FD_ACCEPT or FD_OOB or FD_CLOSE) + + proc addWrite*(fd: AsyncFD, cb: Callback) = + ## Start watching the file descriptor for write availability and then call + ## the callback `cb`. + ## + ## This is not `pure` mechanism for Windows Completion Ports (IOCP), + ## so if you can avoid it, please do it. Use `addWrite` only if really + ## need it (main usecase is adaptation of unix-like libraries to be + ## asynchronous on Windows). + ## + ## If you use this function, you don't need to use asyncdispatch.send() + ## or asyncdispatch.connect(), because they are using IOCP, please use + ## nativesockets.send() and nativesockets.connect() instead. + ## + ## Be sure your callback `cb` returns `true`, if you want to remove + ## watch of `write` notifications, and `false`, if you want to continue + ## receiving notifications. + registerWaitableEvent(fd, cb, FD_WRITE or FD_CONNECT or FD_CLOSE) + + template registerWaitableHandle(p, hEvent, flags, pcd, timeout, + handleCallback) = + let handleFD = AsyncFD(hEvent) + pcd.ioPort = p.ioPort + pcd.handleFd = handleFD + var ol = newCustom() + ol.data.fd = handleFD + ol.data.cb = handleCallback + # We need to protect our callback environment value, so GC will not free it + # accidentally. + ol.data.cell = system.protect(rawEnv(ol.data.cb)) + + pcd.ovl = ol + if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent, + cast[WAITORTIMERCALLBACK](waitableCallback), + cast[pointer](pcd), timeout.DWORD, flags): + let err = osLastError() + GC_unref(ol) + deallocShared(cast[pointer](pcd)) + discard closeHandle(hEvent) + raiseOSError(err) + p.handles.incl(handleFD) + + template closeWaitable(handle: untyped) = + let waitFd = pcd.waitFd + deallocShared(cast[pointer](pcd)) + p.handles.excl(fd) + if unregisterWait(waitFd) == 0: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + discard closeHandle(handle) + raiseOSError(err) + if closeHandle(handle) == 0: + raiseOSError(osLastError()) + + proc addTimer*(timeout: int, oneshot: bool, cb: Callback) = + ## Registers callback `cb` to be called when timer expired. + ## + ## Parameters: + ## + ## * `timeout` - timeout value in milliseconds. + ## * `oneshot` + ## * `true` - generate only one timeout event + ## * `false` - generate timeout events periodically + + doAssert(timeout > 0) + let p = getGlobalDispatcher() + + var hEvent = createEvent(nil, 1, 0, nil) + if hEvent == INVALID_HANDLE_VALUE: + raiseOSError(osLastError()) + + var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData))) + var flags = WT_EXECUTEINWAITTHREAD.DWORD + if oneshot: flags = flags or WT_EXECUTEONLYONCE + + proc timercb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = + let res = cb(fd) + if res or oneshot: + closeWaitable(hEvent) + else: + # if callback returned `false`, then it wants to be called again, so + # we need to ref and protect `pcd.ovl` again, because it will be + # unrefed and disposed in `poll()`. + GC_ref(pcd.ovl) + pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb)) + + registerWaitableHandle(p, hEvent, flags, pcd, timeout, timercb) + + proc addProcess*(pid: int, cb: Callback) = + ## Registers callback `cb` to be called when process with process ID + ## `pid` exited. + const NULL = Handle(0) + let p = getGlobalDispatcher() + let procFlags = SYNCHRONIZE + var hProcess = openProcess(procFlags, 0, pid.DWORD) + if hProcess == NULL: + raiseOSError(osLastError()) + + var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData))) + var flags = WT_EXECUTEINWAITTHREAD.DWORD or WT_EXECUTEONLYONCE.DWORD + + proc proccb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = + closeWaitable(hProcess) + discard cb(fd) + + registerWaitableHandle(p, hProcess, flags, pcd, INFINITE, proccb) + + proc newAsyncEvent*(): AsyncEvent = + ## Creates a new thread-safe `AsyncEvent` object. + ## + ## New `AsyncEvent` object is not automatically registered with + ## dispatcher like `AsyncSocket`. + var sa = SECURITY_ATTRIBUTES( + nLength: sizeof(SECURITY_ATTRIBUTES).cint, + bInheritHandle: 1 + ) + var event = createEvent(addr(sa), 0'i32, 0'i32, nil) + if event == INVALID_HANDLE_VALUE: + raiseOSError(osLastError()) + result = cast[AsyncEvent](allocShared0(sizeof(AsyncEventImpl))) + result.hEvent = event + + proc trigger*(ev: AsyncEvent) = + ## Set event `ev` to signaled state. + if setEvent(ev.hEvent) == 0: + raiseOSError(osLastError()) + + proc unregister*(ev: AsyncEvent) = + ## Unregisters event `ev`. + doAssert(ev.hWaiter != 0, "Event is not registered in the queue!") + let p = getGlobalDispatcher() + p.handles.excl(AsyncFD(ev.hEvent)) + if unregisterWait(ev.hWaiter) == 0: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + raiseOSError(err) + ev.hWaiter = 0 + + proc close*(ev: AsyncEvent) = + ## Closes event `ev`. + let res = closeHandle(ev.hEvent) + deallocShared(cast[pointer](ev)) + if res == 0: + raiseOSError(osLastError()) + + proc addEvent*(ev: AsyncEvent, cb: Callback) = + ## Registers callback `cb` to be called when `ev` will be signaled + doAssert(ev.hWaiter == 0, "Event is already registered in the queue!") + + let p = getGlobalDispatcher() + let hEvent = ev.hEvent + + var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData))) + var flags = WT_EXECUTEINWAITTHREAD.DWORD + + proc eventcb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = + if ev.hWaiter != 0: + if cb(fd): + # we need this check to avoid exception, if `unregister(event)` was + # called in callback. + deallocShared(cast[pointer](pcd)) + if ev.hWaiter != 0: + unregister(ev) + else: + # if callback returned `false`, then it wants to be called again, so + # we need to ref and protect `pcd.ovl` again, because it will be + # unrefed and disposed in `poll()`. + GC_ref(pcd.ovl) + pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb)) + else: + # if ev.hWaiter == 0, then event was unregistered before `poll()` call. + deallocShared(cast[pointer](pcd)) + + registerWaitableHandle(p, hEvent, flags, pcd, INFINITE, eventcb) + ev.hWaiter = pcd.waitFd + + initAll() +else: + import std/selectors + from std/posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK, + MSG_NOSIGNAL + when declared(posix.accept4): + from std/posix import accept4, SOCK_CLOEXEC + when defined(genode): + import genode/env # get the implicit Genode env + import genode/signals + + const + InitCallbackListSize = 4 # initial size of callbacks sequence, + # associated with file/socket descriptor. + InitDelayedCallbackListSize = 64 # initial size of delayed callbacks + # queue. + type + AsyncFD* = distinct cint + Callback* = proc (fd: AsyncFD): bool {.closure, gcsafe.} + + AsyncData = object + readList: seq[Callback] + writeList: seq[Callback] + + AsyncEvent* = distinct SelectEvent + + PDispatcher* = ref object of PDispatcherBase + selector: Selector[AsyncData] + when defined(genode): + signalHandler: SignalHandler + + proc `==`*(x, y: AsyncFD): bool {.borrow.} + proc `==`*(x, y: AsyncEvent): bool {.borrow.} + + template newAsyncData(): AsyncData = + AsyncData( + readList: newSeqOfCap[Callback](InitCallbackListSize), + writeList: newSeqOfCap[Callback](InitCallbackListSize) + ) + + proc newDispatcher*(): owned(PDispatcher) = + new result + result.selector = newSelector[AsyncData]() + result.timers.clear() + result.callbacks = initDeque[proc () {.closure, gcsafe.}](InitDelayedCallbackListSize) + when defined(genode): + let entrypoint = ep(cast[GenodeEnv](runtimeEnv)) + result.signalHandler = newSignalHandler(entrypoint): + discard runOnce(0) + + var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher + + when defined(nuttx): + import std/exitprocs + + proc cleanDispatcher() {.noconv.} = + gDisp = nil + + proc addFinalyzer() = + addExitProc(cleanDispatcher) + + proc setGlobalDispatcher*(disp: owned PDispatcher) = + if not gDisp.isNil: + assert gDisp.callbacks.len == 0 + gDisp = disp + initCallSoonProc() + + proc getGlobalDispatcher*(): PDispatcher = + if gDisp.isNil: + setGlobalDispatcher(newDispatcher()) + when defined(nuttx): + addFinalyzer() + result = gDisp + + proc getIoHandler*(disp: PDispatcher): Selector[AsyncData] = + return disp.selector + + proc register*(fd: AsyncFD) = + let p = getGlobalDispatcher() + var data = newAsyncData() + p.selector.registerHandle(fd.SocketHandle, {}, data) + + proc unregister*(fd: AsyncFD) = + getGlobalDispatcher().selector.unregister(fd.SocketHandle) + + proc unregister*(ev: AsyncEvent) = + getGlobalDispatcher().selector.unregister(SelectEvent(ev)) + + proc contains*(disp: PDispatcher, fd: AsyncFD): bool = + return fd.SocketHandle in disp.selector + + proc addRead*(fd: AsyncFD, cb: Callback) = + let p = getGlobalDispatcher() + var newEvents = {Event.Read} + withData(p.selector, fd.SocketHandle, adata) do: + adata.readList.add(cb) + newEvents.incl(Event.Read) + if len(adata.writeList) != 0: newEvents.incl(Event.Write) + do: + raise newException(ValueError, "File descriptor not registered.") + p.selector.updateHandle(fd.SocketHandle, newEvents) + + proc addWrite*(fd: AsyncFD, cb: Callback) = + let p = getGlobalDispatcher() + var newEvents = {Event.Write} + withData(p.selector, fd.SocketHandle, adata) do: + adata.writeList.add(cb) + newEvents.incl(Event.Write) + if len(adata.readList) != 0: newEvents.incl(Event.Read) + do: + raise newException(ValueError, "File descriptor not registered.") + p.selector.updateHandle(fd.SocketHandle, newEvents) + + proc hasPendingOperations*(): bool = + let p = getGlobalDispatcher() + not p.selector.isEmpty() or p.timers.len != 0 or p.callbacks.len != 0 + + proc prependSeq(dest: var seq[Callback]; src: sink seq[Callback]) = + var old = move dest + dest = src + for i in 0..high(old): + dest.add(move old[i]) + + proc processBasicCallbacks( + fd: AsyncFD, event: Event + ): tuple[readCbListCount, writeCbListCount: int] = + # Process pending descriptor and AsyncEvent callbacks. + # + # Invoke every callback stored in `rwlist`, until one + # returns `false` (which means callback wants to stay + # alive). In such case all remaining callbacks will be added + # to `rwlist` again, in the order they have been inserted. + # + # `rwlist` associated with file descriptor MUST BE emptied before + # dispatching callback (See https://github.com/nim-lang/Nim/issues/5128), + # or it can be possible to fall into endless cycle. + var curList: seq[Callback] + + let selector = getGlobalDispatcher().selector + withData(selector, fd.int, fdData): + case event + of Event.Read: + #shallowCopy(curList, fdData.readList) + curList = move fdData.readList + fdData.readList = newSeqOfCap[Callback](InitCallbackListSize) + of Event.Write: + #shallowCopy(curList, fdData.writeList) + curList = move fdData.writeList + fdData.writeList = newSeqOfCap[Callback](InitCallbackListSize) + else: + assert false, "Cannot process callbacks for " & $event + + let newLength = max(len(curList), InitCallbackListSize) + var newList = newSeqOfCap[Callback](newLength) + + var eventsExtinguished = false + for cb in curList: + if eventsExtinguished: + newList.add(cb) + elif not cb(fd): + # Callback wants to be called again. + newList.add(cb) + # This callback has returned with EAGAIN, so we don't need to + # call any other callbacks as they are all waiting for the same event + # on the same fd. + # We do need to ensure they are called again though. + eventsExtinguished = true + + withData(selector, fd.int, fdData) do: + # Descriptor is still present in the queue. + case event + of Event.Read: prependSeq(fdData.readList, newList) + of Event.Write: prependSeq(fdData.writeList, newList) + else: + assert false, "Cannot process callbacks for " & $event + + result.readCbListCount = len(fdData.readList) + result.writeCbListCount = len(fdData.writeList) + do: + # Descriptor was unregistered in callback via `unregister()`. + result.readCbListCount = -1 + result.writeCbListCount = -1 + + proc processCustomCallbacks(p: PDispatcher; fd: AsyncFD) = + # Process pending custom event callbacks. Custom events are + # {Event.Timer, Event.Signal, Event.Process, Event.Vnode}. + # There can be only one callback registered with one descriptor, + # so there is no need to iterate over list. + var curList: seq[Callback] + + withData(p.selector, fd.int, adata) do: + curList = move adata.readList + adata.readList = newSeqOfCap[Callback](InitCallbackListSize) + + let newLength = len(curList) + var newList = newSeqOfCap[Callback](newLength) + + var cb = curList[0] + if not cb(fd): + newList.add(cb) + + withData(p.selector, fd.int, adata) do: + # descriptor still present in queue. + adata.readList = newList & adata.readList + if len(adata.readList) == 0: + # if no callbacks registered with descriptor, unregister it. + p.selector.unregister(fd.int) + do: + # descriptor was unregistered in callback via `unregister()`. + discard + + implementSetInheritable() + + proc closeSocket*(sock: AsyncFD) = + let selector = getGlobalDispatcher().selector + if sock.SocketHandle notin selector: + raise newException(ValueError, "File descriptor not registered.") + + let data = selector.getData(sock.SocketHandle) + sock.unregister() + sock.SocketHandle.close() + # We need to unblock the read and write callbacks which could still be + # waiting for the socket to become readable and/or writeable. + for cb in data.readList & data.writeList: + if not cb(sock): + raise newException( + ValueError, "Expecting async operations to stop when fd has closed." + ) + + proc runOnce(timeout: int): bool = + let p = getGlobalDispatcher() + if p.selector.isEmpty() and p.timers.len == 0 and p.callbacks.len == 0: + when defined(genode): + if timeout == 0: return + raise newException(ValueError, + "No handles or timers registered in dispatcher.") + + result = false + var keys: array[64, ReadyKey] + let nextTimer = processTimers(p, result) + var count = + p.selector.selectInto(adjustTimeout(p, timeout, nextTimer), keys) + for i in 0..<count: + let fd = keys[i].fd.AsyncFD + let events = keys[i].events + var (readCbListCount, writeCbListCount) = (0, 0) + + if Event.Read in events or events == {Event.Error}: + (readCbListCount, writeCbListCount) = + processBasicCallbacks(fd, Event.Read) + result = true + + if Event.Write in events or events == {Event.Error}: + (readCbListCount, writeCbListCount) = + processBasicCallbacks(fd, Event.Write) + result = true + + var isCustomEvent = false + if Event.User in events: + (readCbListCount, writeCbListCount) = + processBasicCallbacks(fd, Event.Read) + isCustomEvent = true + if readCbListCount == 0: + p.selector.unregister(fd.int) + result = true + + when ioselSupportedPlatform: + const customSet = {Event.Timer, Event.Signal, Event.Process, + Event.Vnode} + if (customSet * events) != {}: + isCustomEvent = true + processCustomCallbacks(p, fd) + result = true + + # because state `data` can be modified in callback we need to update + # descriptor events with currently registered callbacks. + if not isCustomEvent and (readCbListCount != -1 and writeCbListCount != -1): + var newEvents: set[Event] = {} + if readCbListCount > 0: incl(newEvents, Event.Read) + if writeCbListCount > 0: incl(newEvents, Event.Write) + p.selector.updateHandle(SocketHandle(fd), newEvents) + + # Timer processing. + discard processTimers(p, result) + # Callback queue processing + processPendingCallbacks(p, result) + + proc recv*(socket: AsyncFD, size: int, + flags = {SocketFlag.SafeDisconn}): owned(Future[string]) = + var retFuture = newFuture[string]("recv") + + var readBuffer = newString(size) + + proc cb(sock: AsyncFD): bool = + result = true + let res = recv(sock.SocketHandle, addr readBuffer[0], size.cint, + flags.toOSFlags()) + if res < 0: + let lastError = osLastError() + if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and + lastError.int32 != EAGAIN: + if flags.isDisconnectionError(lastError): + retFuture.complete("") + else: + retFuture.fail(newOSError(lastError)) + else: + result = false # We still want this callback to be called. + elif res == 0: + # Disconnected + retFuture.complete("") + else: + readBuffer.setLen(res) + retFuture.complete(readBuffer) + # TODO: The following causes a massive slowdown. + #if not cb(socket): + addRead(socket, cb) + return retFuture + + proc recvInto*(socket: AsyncFD, buf: pointer, size: int, + flags = {SocketFlag.SafeDisconn}): owned(Future[int]) = + var retFuture = newFuture[int]("recvInto") + + proc cb(sock: AsyncFD): bool = + result = true + let res = recv(sock.SocketHandle, buf, size.cint, + flags.toOSFlags()) + if res < 0: + let lastError = osLastError() + if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and + lastError.int32 != EAGAIN: + if flags.isDisconnectionError(lastError): + retFuture.complete(0) + else: + retFuture.fail(newOSError(lastError)) + else: + result = false # We still want this callback to be called. + else: + retFuture.complete(res) + # TODO: The following causes a massive slowdown. + #if not cb(socket): + addRead(socket, cb) + return retFuture + + proc send*(socket: AsyncFD, buf: pointer, size: int, + flags = {SocketFlag.SafeDisconn}): owned(Future[void]) = + var retFuture = newFuture[void]("send") + + var written = 0 + + proc cb(sock: AsyncFD): bool = + result = true + let netSize = size-written + var d = cast[cstring](buf) + let res = send(sock.SocketHandle, addr d[written], netSize.cint, + MSG_NOSIGNAL) + if res < 0: + let lastError = osLastError() + if lastError.int32 != EINTR and + lastError.int32 != EWOULDBLOCK and + lastError.int32 != EAGAIN: + if flags.isDisconnectionError(lastError): + retFuture.complete() + else: + retFuture.fail(newOSError(lastError)) + else: + result = false # We still want this callback to be called. + else: + written.inc(res) + if res != netSize: + result = false # We still have data to send. + else: + retFuture.complete() + # TODO: The following causes crashes. + #if not cb(socket): + addWrite(socket, cb) + return retFuture + + proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr, + saddrLen: SockLen, + flags = {SocketFlag.SafeDisconn}): owned(Future[void]) = + ## Sends `data` of size `size` in bytes to specified destination + ## (`saddr` of size `saddrLen` in bytes, using socket `socket`. + ## The returned future will complete once all data has been sent. + var retFuture = newFuture[void]("sendTo") + + # we will preserve address in our stack + var staddr: array[128, char] # SOCKADDR_STORAGE size is 128 bytes + var stalen = saddrLen + zeroMem(addr(staddr[0]), 128) + copyMem(addr(staddr[0]), saddr, saddrLen) + + proc cb(sock: AsyncFD): bool = + result = true + let res = sendto(sock.SocketHandle, data, size, MSG_NOSIGNAL, + cast[ptr SockAddr](addr(staddr[0])), stalen) + if res < 0: + let lastError = osLastError() + if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and + lastError.int32 != EAGAIN: + retFuture.fail(newOSError(lastError)) + else: + result = false # We still want this callback to be called. + else: + retFuture.complete() + + addWrite(socket, cb) + return retFuture + + proc recvFromInto*(socket: AsyncFD, data: pointer, size: int, + saddr: ptr SockAddr, saddrLen: ptr SockLen, + flags = {SocketFlag.SafeDisconn}): owned(Future[int]) = + ## Receives a datagram data from `socket` into `data`, which must + ## be at least of size `size` in bytes, address of datagram's sender + ## will be stored into `saddr` and `saddrLen`. Returned future will + ## complete once one datagram has been received, and will return size + ## of packet received. + var retFuture = newFuture[int]("recvFromInto") + proc cb(sock: AsyncFD): bool = + result = true + let res = recvfrom(sock.SocketHandle, data, size.cint, flags.toOSFlags(), + saddr, saddrLen) + if res < 0: + let lastError = osLastError() + if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and + lastError.int32 != EAGAIN: + retFuture.fail(newOSError(lastError)) + else: + result = false + else: + retFuture.complete(res) + addRead(socket, cb) + return retFuture + + proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}, + inheritable = defined(nimInheritHandles)): + owned(Future[tuple[address: string, client: AsyncFD]]) = + var retFuture = newFuture[tuple[address: string, + client: AsyncFD]]("acceptAddr") + proc cb(sock: AsyncFD): bool {.gcsafe.} = + result = true + var sockAddress: Sockaddr_storage + var addrLen = sizeof(sockAddress).SockLen + var client = + when declared(accept4): + accept4(sock.SocketHandle, cast[ptr SockAddr](addr(sockAddress)), + addr(addrLen), if inheritable: 0 else: SOCK_CLOEXEC) + else: + accept(sock.SocketHandle, cast[ptr SockAddr](addr(sockAddress)), + addr(addrLen)) + when declared(setInheritable) and not declared(accept4): + if client != osInvalidSocket and not setInheritable(client, inheritable): + # Set failure first because close() itself can fail, + # altering osLastError(). + retFuture.fail(newOSError(osLastError())) + close client + return false + + if client == osInvalidSocket: + let lastError = osLastError() + assert lastError.int32 != EWOULDBLOCK and lastError.int32 != EAGAIN + if lastError.int32 == EINTR: + return false + else: + if flags.isDisconnectionError(lastError): + return false + else: + retFuture.fail(newOSError(lastError)) + else: + try: + let address = getAddrString(cast[ptr SockAddr](addr sockAddress)) + register(client.AsyncFD) + retFuture.complete((address, client.AsyncFD)) + except: + # getAddrString may raise + client.close() + retFuture.fail(getCurrentException()) + addRead(socket, cb) + return retFuture + + when ioselSupportedPlatform: + + proc addTimer*(timeout: int, oneshot: bool, cb: Callback) = + ## Start watching for timeout expiration, and then call the + ## callback `cb`. + ## `timeout` - time in milliseconds, + ## `oneshot` - if `true` only one event will be dispatched, + ## if `false` continuous events every `timeout` milliseconds. + let p = getGlobalDispatcher() + var data = newAsyncData() + data.readList.add(cb) + p.selector.registerTimer(timeout, oneshot, data) + + proc addSignal*(signal: int, cb: Callback) = + ## Start watching signal `signal`, and when signal appears, call the + ## callback `cb`. + let p = getGlobalDispatcher() + var data = newAsyncData() + data.readList.add(cb) + p.selector.registerSignal(signal, data) + + proc addProcess*(pid: int, cb: Callback) = + ## Start watching for process exit with pid `pid`, and then call + ## the callback `cb`. + let p = getGlobalDispatcher() + var data = newAsyncData() + data.readList.add(cb) + p.selector.registerProcess(pid, data) + + proc newAsyncEvent*(): AsyncEvent = + ## Creates new `AsyncEvent`. + result = AsyncEvent(newSelectEvent()) + + proc trigger*(ev: AsyncEvent) = + ## Sets new `AsyncEvent` to signaled state. + trigger(SelectEvent(ev)) + + proc close*(ev: AsyncEvent) = + ## Closes `AsyncEvent` + close(SelectEvent(ev)) + + proc addEvent*(ev: AsyncEvent, cb: Callback) = + ## Start watching for event `ev`, and call callback `cb`, when + ## ev will be set to signaled state. + let p = getGlobalDispatcher() + var data = newAsyncData() + data.readList.add(cb) + p.selector.registerEvent(SelectEvent(ev), data) + +proc drain*(timeout = 500) = + ## Waits for completion of **all** events and processes them. Raises `ValueError` + ## if there are no pending operations. In contrast to `poll` this + ## processes as many events as are available until the timeout has elapsed. + var curTimeout = timeout + let start = now() + while hasPendingOperations(): + discard runOnce(curTimeout) + curTimeout -= (now() - start).inMilliseconds.int + if curTimeout < 0: + break + +proc poll*(timeout = 500) = + ## Waits for completion events and processes them. Raises `ValueError` + ## if there are no pending operations. This runs the underlying OS + ## `epoll`:idx: or `kqueue`:idx: primitive only once. + discard runOnce(timeout) + +template createAsyncNativeSocketImpl(domain, sockType, protocol: untyped, + inheritable = defined(nimInheritHandles)) = + let handle = createNativeSocket(domain, sockType, protocol, inheritable) + if handle == osInvalidSocket: + return osInvalidSocket.AsyncFD + handle.setBlocking(false) + when defined(macosx) and not defined(nimdoc): + handle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1) + result = handle.AsyncFD + register(result) + +proc createAsyncNativeSocket*(domain: cint, sockType: cint, + protocol: cint, + inheritable = defined(nimInheritHandles)): AsyncFD = + createAsyncNativeSocketImpl(domain, sockType, protocol, inheritable) + +proc createAsyncNativeSocket*(domain: Domain = Domain.AF_INET, + sockType: SockType = SOCK_STREAM, + protocol: Protocol = IPPROTO_TCP, + inheritable = defined(nimInheritHandles)): AsyncFD = + createAsyncNativeSocketImpl(domain, sockType, protocol, inheritable) + +when defined(windows) or defined(nimdoc): + proc bindToDomain(handle: SocketHandle, domain: Domain) = + # Extracted into a separate proc, because connect() on Windows requires + # the socket to be initially bound. + template doBind(saddr) = + if bindAddr(handle, cast[ptr SockAddr](addr(saddr)), + sizeof(saddr).SockLen) < 0'i32: + raiseOSError(osLastError()) + + if domain == Domain.AF_INET6: + var saddr: Sockaddr_in6 + saddr.sin6_family = uint16(toInt(domain)) + doBind(saddr) + else: + var saddr: Sockaddr_in + saddr.sin_family = uint16(toInt(domain)) + doBind(saddr) + + proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): owned(Future[void]) = + let retFuture = newFuture[void]("doConnect") + result = retFuture + + var ol = newCustom() + ol.data = CompletionData(fd: socket, cb: + proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = + if not retFuture.finished: + if errcode == OSErrorCode(-1): + const SO_UPDATE_CONNECT_CONTEXT = 0x7010 + socket.SocketHandle.setSockOptInt(SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, 1) # 15022 + retFuture.complete() + else: + retFuture.fail(newOSError(errcode)) + ) + + let ret = connectEx(socket.SocketHandle, addrInfo.ai_addr, + cint(addrInfo.ai_addrlen), nil, 0, nil, + cast[POVERLAPPED](ol)) + if ret: + # Request to connect completed immediately. + retFuture.complete() + # We don't deallocate `ol` here because even though this completed + # immediately poll will still be notified about its completion and it + # will free `ol`. + else: + let lastError = osLastError() + if lastError.int32 != ERROR_IO_PENDING: + # With ERROR_IO_PENDING `ol` will be deallocated in `poll`, + # and the future will be completed/failed there, too. + GC_unref(ol) + retFuture.fail(newOSError(lastError)) +else: + proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): owned(Future[void]) = + let retFuture = newFuture[void]("doConnect") + result = retFuture + + proc cb(fd: AsyncFD): bool = + let ret = SocketHandle(fd).getSockOptInt( + cint(SOL_SOCKET), cint(SO_ERROR)) + if ret == 0: + # We have connected. + retFuture.complete() + return true + elif ret == EINTR: + # interrupted, keep waiting + return false + else: + retFuture.fail(newOSError(OSErrorCode(ret))) + return true + + let ret = connect(socket.SocketHandle, + addrInfo.ai_addr, + addrInfo.ai_addrlen.SockLen) + if ret == 0: + # Request to connect completed immediately. + retFuture.complete() + else: + let lastError = osLastError() + if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS: + addWrite(socket, cb) + else: + retFuture.fail(newOSError(lastError)) + +template asyncAddrInfoLoop(addrInfo: ptr AddrInfo, fd: untyped, + protocol: Protocol = IPPROTO_RAW) = + ## Iterates through the AddrInfo linked list asynchronously + ## until the connection can be established. + const shouldCreateFd = not declared(fd) + + when shouldCreateFd: + let sockType = protocol.toSockType() + + var fdPerDomain: array[low(Domain).ord..high(Domain).ord, AsyncFD] + for i in low(fdPerDomain)..high(fdPerDomain): + fdPerDomain[i] = osInvalidSocket.AsyncFD + template closeUnusedFds(domainToKeep = -1) {.dirty.} = + for i, fd in fdPerDomain: + if fd != osInvalidSocket.AsyncFD and i != domainToKeep: + fd.closeSocket() + + var lastException: ref Exception + var curAddrInfo = addrInfo + var domain: Domain + when shouldCreateFd: + var curFd: AsyncFD + else: + var curFd = fd + proc tryNextAddrInfo(fut: Future[void]) {.gcsafe.} = + if fut == nil or fut.failed: + if fut != nil: + lastException = fut.readError() + + while curAddrInfo != nil: + let domainOpt = curAddrInfo.ai_family.toKnownDomain() + if domainOpt.isSome: + domain = domainOpt.unsafeGet() + break + curAddrInfo = curAddrInfo.ai_next + + if curAddrInfo == nil: + freeAddrInfo(addrInfo) + when shouldCreateFd: + closeUnusedFds() + if lastException != nil: + retFuture.fail(lastException) + else: + retFuture.fail(newException( + IOError, "Couldn't resolve address: " & address)) + return + + when shouldCreateFd: + curFd = fdPerDomain[ord(domain)] + if curFd == osInvalidSocket.AsyncFD: + try: + curFd = createAsyncNativeSocket(domain, sockType, protocol) + except: + freeAddrInfo(addrInfo) + closeUnusedFds() + raise getCurrentException() + when defined(windows): + curFd.SocketHandle.bindToDomain(domain) + fdPerDomain[ord(domain)] = curFd + + doConnect(curFd, curAddrInfo).callback = tryNextAddrInfo + curAddrInfo = curAddrInfo.ai_next + else: + freeAddrInfo(addrInfo) + when shouldCreateFd: + closeUnusedFds(ord(domain)) + retFuture.complete(curFd) + else: + retFuture.complete() + + tryNextAddrInfo(nil) + +proc dial*(address: string, port: Port, + protocol: Protocol = IPPROTO_TCP): owned(Future[AsyncFD]) = + ## Establishes connection to the specified `address`:`port` pair via the + ## specified protocol. The procedure iterates through possible + ## resolutions of the `address` until it succeeds, meaning that it + ## seamlessly works with both IPv4 and IPv6. + ## Returns the async file descriptor, registered in the dispatcher of + ## the current thread, ready to send or receive data. + let retFuture = newFuture[AsyncFD]("dial") + result = retFuture + let sockType = protocol.toSockType() + + let aiList = getAddrInfo(address, port, Domain.AF_UNSPEC, sockType, protocol) + asyncAddrInfoLoop(aiList, noFD, protocol) + +proc connect*(socket: AsyncFD, address: string, port: Port, + domain = Domain.AF_INET): owned(Future[void]) = + let retFuture = newFuture[void]("connect") + result = retFuture + + when defined(windows): + verifyPresence(socket) + else: + assert getSockDomain(socket.SocketHandle) == domain + + let aiList = getAddrInfo(address, port, domain) + when defined(windows): + socket.SocketHandle.bindToDomain(domain) + asyncAddrInfoLoop(aiList, socket) + +proc sleepAsync*(ms: int | float): owned(Future[void]) = + ## Suspends the execution of the current async procedure for the next + ## `ms` milliseconds. + var retFuture = newFuture[void]("sleepAsync") + let p = getGlobalDispatcher() + when ms is int: + p.timers.push((getMonoTime() + initDuration(milliseconds = ms), retFuture)) + elif ms is float: + let ns = (ms * 1_000_000).int64 + p.timers.push((getMonoTime() + initDuration(nanoseconds = ns), retFuture)) + return retFuture + +proc withTimeout*[T](fut: Future[T], timeout: int): owned(Future[bool]) = + ## Returns a future which will complete once `fut` completes or after + ## `timeout` milliseconds has elapsed. + ## + ## If `fut` completes first the returned future will hold true, + ## otherwise, if `timeout` milliseconds has elapsed first, the returned + ## future will hold false. + + var retFuture = newFuture[bool]("asyncdispatch.`withTimeout`") + var timeoutFuture = sleepAsync(timeout) + fut.callback = + proc () = + if not retFuture.finished: + if fut.failed: + retFuture.fail(fut.error) + else: + retFuture.complete(true) + timeoutFuture.callback = + proc () = + if not retFuture.finished: retFuture.complete(false) + return retFuture + +proc accept*(socket: AsyncFD, + flags = {SocketFlag.SafeDisconn}, + inheritable = defined(nimInheritHandles)): owned(Future[AsyncFD]) = + ## Accepts a new connection. Returns a future containing the client socket + ## corresponding to that connection. + ## + ## If `inheritable` is false (the default), the resulting client socket + ## will not be inheritable by child processes. + ## + ## The future will complete when the connection is successfully accepted. + var retFut = newFuture[AsyncFD]("accept") + var fut = acceptAddr(socket, flags, inheritable) + fut.callback = + proc (future: Future[tuple[address: string, client: AsyncFD]]) = + assert future.finished + if future.failed: + retFut.fail(future.error) + else: + retFut.complete(future.read.client) + return retFut + +proc keepAlive(x: string) = + discard "mark 'x' as escaping so that it is put into a closure for us to keep the data alive" + +proc send*(socket: AsyncFD, data: string, + flags = {SocketFlag.SafeDisconn}): owned(Future[void]) = + ## Sends `data` to `socket`. The returned future will complete once all + ## data has been sent. + var retFuture = newFuture[void]("send") + if data.len > 0: + let sendFut = socket.send(unsafeAddr data[0], data.len, flags) + sendFut.callback = + proc () = + keepAlive(data) + if sendFut.failed: + retFuture.fail(sendFut.error) + else: + retFuture.complete() + else: + retFuture.complete() + + return retFuture + +# -- Await Macro +import std/asyncmacro +export asyncmacro + +proc readAll*(future: FutureStream[string]): owned(Future[string]) {.async.} = + ## Returns a future that will complete when all the string data from the + ## specified future stream is retrieved. + result = "" + while true: + let (hasValue, value) = await future.read() + if hasValue: + result.add(value) + else: + break + +proc callSoon(cbproc: proc () {.gcsafe.}) = + getGlobalDispatcher().callbacks.addLast(cbproc) + +proc runForever*() = + ## Begins a never ending global dispatcher poll loop. + while true: + poll() + +proc waitFor*[T](fut: Future[T]): T = + ## **Blocks** the current thread until the specified future completes. + while not fut.finished: + poll() + + fut.read + +proc activeDescriptors*(): int {.inline.} = + ## Returns the current number of active file descriptors for the current + ## event loop. This is a cheap operation that does not involve a system call. + when defined(windows): + result = getGlobalDispatcher().handles.len + elif not defined(nimdoc): + result = getGlobalDispatcher().selector.count + +when defined(posix): + import std/posix + +when defined(linux) or defined(windows) or defined(macosx) or defined(bsd) or + defined(solaris) or defined(zephyr) or defined(freertos) or defined(nuttx) or defined(haiku): + proc maxDescriptors*(): int {.raises: OSError.} = + ## Returns the maximum number of active file descriptors for the current + ## process. This involves a system call. For now `maxDescriptors` is + ## supported on the following OSes: Windows, Linux, OSX, BSD, Solaris. + when defined(windows): + result = 16_700_000 + elif defined(zephyr) or defined(freertos): + result = FD_MAX + else: + var fdLim: RLimit + if getrlimit(RLIMIT_NOFILE, fdLim) < 0: + raiseOSError(osLastError()) + result = int(fdLim.rlim_cur) - 1 + +when defined(genode): + proc scheduleCallbacks*(): bool {.discardable.} = + ## *Genode only.* + ## Schedule callback processing and return immediately. + ## Returns `false` if there is nothing to schedule. + ## RPC servers should call this to dispatch `callSoon` + ## bodies after retiring an RPC to its client. + ## This is effectively a non-blocking `poll(…)` and is + ## equivalent to scheduling a momentary no-op timeout + ## but faster and with less overhead. + let dis = getGlobalDispatcher() + result = dis.callbacks.len > 0 + if result: submit(dis.signalHandler.cap) |