diff options
Diffstat (limited to 'lib/pure/asyncdispatch.nim')
-rw-r--r-- | lib/pure/asyncdispatch.nim | 484 |
1 files changed, 294 insertions, 190 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index 1b3548268..126db7a7f 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -8,25 +8,25 @@ # ## This module implements asynchronous IO. This includes a dispatcher, -## a ``Future`` type implementation, and an ``async`` macro which allows -## asynchronous code to be written in a synchronous style with the ``await`` +## a `Future` type implementation, and an `async` macro which allows +## asynchronous code to be written in a synchronous style with the `await` ## keyword. ## -## The dispatcher acts as a kind of event loop. You must call ``poll`` on it -## (or a function which does so for you such as ``waitFor`` or ``runForever``) +## The dispatcher acts as a kind of event loop. You must call `poll` on it +## (or a function which does so for you such as `waitFor` or `runForever`) ## in order to poll for any outstanding events. The underlying implementation ## is based on epoll on Linux, IO Completion Ports on Windows and select on ## other operating systems. ## -## The ``poll`` function will not, on its own, return any events. Instead -## an appropriate ``Future`` object will be completed. A ``Future`` is a +## The `poll` function will not, on its own, return any events. Instead +## an appropriate `Future` object will be completed. A `Future` is a ## type which holds a value which is not yet available, but which *may* be ## available in the future. You can check whether a future is finished -## by using the ``finished`` function. When a future is finished it means that +## by using the `finished` function. When a future is finished it means that ## either the value that it holds is now available or it holds an error instead. ## The latter situation occurs when the operation to complete a future fails ## with an exception. You can distinguish between the two situations with the -## ``failed`` function. +## `failed` function. ## ## Future objects can also store a callback procedure which will be called ## automatically once the future completes. @@ -35,34 +35,34 @@ ## pattern. In this ## pattern you make a request for an action, and once that action is fulfilled ## a future is completed with the result of that action. Requests can be -## made by calling the appropriate functions. For example: calling the ``recv`` +## made by calling the appropriate functions. For example: calling the `recv` ## function will create a request for some data to be read from a socket. The -## future which the ``recv`` function returns will then complete once the +## future which the `recv` function returns will then complete once the ## requested amount of data is read **or** an exception occurs. ## ## Code to read some data from a socket may look something like this: +## ```Nim +## var future = socket.recv(100) +## future.addCallback( +## proc () = +## echo(future.read) +## ) +## ``` ## -## .. code-block::nim -## var future = socket.recv(100) -## future.addCallback( -## proc () = -## echo(future.read) -## ) -## -## All asynchronous functions returning a ``Future`` will not block. They +## All asynchronous functions returning a `Future` will not block. They ## will not however return immediately. An asynchronous function will have ## code which will be executed before an asynchronous request is made, in most ## cases this code sets up the request. ## -## In the above example, the ``recv`` function will return a brand new -## ``Future`` instance once the request for data to be read from the socket -## is made. This ``Future`` instance will complete once the requested amount +## In the above example, the `recv` function will return a brand new +## `Future` instance once the request for data to be read from the socket +## is made. This `Future` instance will complete once the requested amount ## of data is read, in this case it is 100 bytes. The second line sets a ## callback on this future which will be called once the future completes. -## All the callback does is write the data stored in the future to ``stdout``. -## The ``read`` function is used for this and it checks whether the future -## completes with an error for you (if it did it will simply raise the -## error), if there is no error however it returns the value of the future. +## All the callback does is write the data stored in the future to `stdout`. +## The `read` function is used for this and it checks whether the future +## completes with an error for you (if it did, it will simply raise the +## error), if there is no error, however, it returns the value of the future. ## ## Asynchronous procedures ## ======================= @@ -71,14 +71,14 @@ ## this by allowing you to write asynchronous code the same way as you would ## write synchronous code. ## -## An asynchronous procedure is marked using the ``{.async.}`` pragma. -## When marking a procedure with the ``{.async.}`` pragma it must have a -## ``Future[T]`` return type or no return type at all. If you do not specify -## a return type then ``Future[void]`` is assumed. +## An asynchronous procedure is marked using the `{.async.}` pragma. +## When marking a procedure with the `{.async.}` pragma it must have a +## `Future[T]` return type or no return type at all. If you do not specify +## a return type then `Future[void]` is assumed. ## -## Inside asynchronous procedures ``await`` can be used to call any +## Inside asynchronous procedures `await` can be used to call any ## procedures which return a -## ``Future``; this includes asynchronous procedures. When a procedure is +## `Future`; this includes asynchronous procedures. When a procedure is ## "awaited", the asynchronous procedure it is awaited in will ## suspend its execution ## until the awaited procedure's Future completes. At which point the @@ -86,52 +86,83 @@ ## when an asynchronous procedure is suspended other asynchronous procedures ## will be run by the dispatcher. ## -## The ``await`` call may be used in many contexts. It can be used on the right -## hand side of a variable declaration: ``var data = await socket.recv(100)``, +## The `await` call may be used in many contexts. It can be used on the right +## hand side of a variable declaration: `var data = await socket.recv(100)`, ## in which case the variable will be set to the value of the future -## automatically. It can be used to await a ``Future`` object, and it can -## be used to await a procedure returning a ``Future[void]``: -## ``await socket.send("foobar")``. +## automatically. It can be used to await a `Future` object, and it can +## be used to await a procedure returning a `Future[void]`: +## `await socket.send("foobar")`. ## -## If an awaited future completes with an error, then ``await`` will re-raise -## this error. To avoid this, you can use the ``yield`` keyword instead of -## ``await``. The following section shows different ways that you can handle +## If an awaited future completes with an error, then `await` will re-raise +## this error. To avoid this, you can use the `yield` keyword instead of +## `await`. The following section shows different ways that you can handle ## exceptions in async procs. ## +## .. caution:: +## Procedures marked {.async.} do not support mutable parameters such +## as `var int`. References such as `ref int` should be used instead. +## ## Handling Exceptions ## ------------------- ## -## The most reliable way to handle exceptions is to use ``yield`` on a future -## then check the future's ``failed`` property. For example: -## -## .. code-block:: Nim -## var future = sock.recv(100) -## yield future -## if future.failed: -## # Handle exception +## You can handle exceptions in the same way as in ordinary Nim code; +## by using the try statement: ## -## The ``async`` procedures also offer limited support for the try statement. +## ```Nim +## try: +## let data = await sock.recv(100) +## echo("Received ", data) +## except: +## # Handle exception +## ``` ## -## .. code-block:: Nim -## try: -## let data = await sock.recv(100) -## echo("Received ", data) -## except: -## # Handle exception +## An alternative approach to handling exceptions is to use `yield` on a future +## then check the future's `failed` property. For example: ## -## Unfortunately the semantics of the try statement may not always be correct, -## and occasionally the compilation may fail altogether. -## As such it is better to use the former style when possible. +## ```Nim +## var future = sock.recv(100) +## yield future +## if future.failed: +## # Handle exception +## ``` ## ## ## Discarding futures ## ================== ## -## Futures should **never** be discarded. This is because they may contain -## errors. If you do not care for the result of a Future then you should -## use the ``asyncCheck`` procedure instead of the ``discard`` keyword. Note -## however that this does not wait for completion, and you should use -## ``waitFor`` for that purpose. +## Futures should **never** be discarded directly because they may contain +## errors. If you do not care for the result of a Future then you should use +## the `asyncCheck` procedure instead of the `discard` keyword. Note that this +## does not wait for completion, and you should use `waitFor` or `await` for that purpose. +## +## .. note:: `await` also checks if the future fails, so you can safely discard +## its result. +## +## Handling futures +## ================ +## +## There are many different operations that apply to a future. +## The three primary high-level operations are `asyncCheck`, +## `waitFor`, and `await`. +## +## * `asyncCheck`: Raises an exception if the future fails. It neither waits +## for the future to finish nor returns the result of the future. +## * `waitFor`: Polls the event loop and blocks the current thread until the +## future finishes. This is often used to call an async procedure from a +## synchronous context and should never be used in an `async` proc. +## * `await`: Pauses execution in the current async procedure until the future +## finishes. While the current procedure is paused, other async procedures will +## continue running. Should be used instead of `waitFor` in an async +## procedure. +## +## Here is a handy quick reference chart showing their high-level differences: +## ============== ===================== ======================= +## Procedure Context Blocking +## ============== ===================== ======================= +## `asyncCheck` non-async and async non-blocking +## `waitFor` non-async blocks current thread +## `await` async suspends current proc +## ============== ===================== ======================= ## ## Examples ## ======== @@ -144,14 +175,14 @@ ## ============================= ## ## It's possible to get into a situation where an async proc, or more accurately -## a ``Future[T]`` gets stuck and +## a `Future[T]` gets stuck and ## never completes. This can happen for various reasons and can cause serious ## memory leaks. When this occurs it's hard to identify the procedure that is ## stuck. ## ## Thankfully there is a mechanism which tracks the count of each pending future. -## All you need to do to enable it is compile with ``-d:futureLogging`` and -## use the ``getFuturesInProgress`` procedure to get the list of pending futures +## All you need to do to enable it is compile with `-d:futureLogging` and +## use the `getFuturesInProgress` procedure to get the list of pending futures ## together with the stack traces to the moment of their creation. ## ## You may also find it useful to use this @@ -164,20 +195,50 @@ ## Limitations/Bugs ## ================ ## -## * The effect system (``raises: []``) does not work with async procedures. +## * The effect system (`raises: []`) does not work with async procedures. +## * Mutable parameters are not supported by async procedures. +## +## +## Multiple async backend support +## ============================== +## +## Thanks to its powerful macro support, Nim allows ``async``/``await`` to be +## implemented in libraries with only minimal support from the language - as +## such, multiple ``async`` libraries exist, including ``asyncdispatch`` and +## ``chronos``, and more may come to be developed in the future. +## +## Libraries built on top of async/await may wish to support multiple async +## backends - the best way to do so is to create separate modules for each backend +## that may be imported side-by-side. +## +## An alternative way is to select backend using a global compile flag - this +## method makes it difficult to compose applications that use both backends as may +## happen with transitive dependencies, but may be appropriate in some cases - +## libraries choosing this path should call the flag `asyncBackend`, allowing +## applications to choose the backend with `-d:asyncBackend=<backend_name>`. +## +## Known `async` backends include: +## +## * `-d:asyncBackend=none`: disable `async` support completely +## * `-d:asyncBackend=asyncdispatch`: https://nim-lang.org/docs/asyncdispatch.html +## * `-d:asyncBackend=chronos`: https://github.com/status-im/nim-chronos/ +## +## ``none`` can be used when a library supports both a synchronous and +## asynchronous API, to disable the latter. -import os, tables, strutils, times, heapqueue, options, asyncstreams -import options, math, std/monotimes -import asyncfutures except callSoon +import std/[os, tables, strutils, times, heapqueue, options, asyncstreams] +import std/[math, monotimes] +import std/asyncfutures except callSoon -import nativesockets, net, deques +import std/[nativesockets, net, deques] + +when defined(nimPreviewSlimSystem): + import std/[assertions, syncio] export Port, SocketFlag export asyncfutures except callSoon export asyncstreams -#{.injectStmt: newGcInvariant().} - # TODO: Check if yielded future is nil and throw a more meaningful exception type @@ -220,6 +281,8 @@ proc adjustTimeout( result = max(nextTimer.get(), 0) result = min(pollTimeout, result) +proc runOnce(timeout: int): bool {.gcsafe.} + proc callSoon*(cbproc: proc () {.gcsafe.}) {.gcsafe.} ## Schedule `cbproc` to be called as soon as possible. ## The callback is called when control returns to the event loop. @@ -232,14 +295,14 @@ template implementSetInheritable() {.dirty.} = when declared(setInheritable): proc setInheritable*(fd: AsyncFD, inheritable: bool): bool = ## Control whether a file handle can be inherited by child processes. - ## Returns ``true`` on success. + ## Returns `true` on success. ## ## This procedure is not guaranteed to be available for all platforms. ## Test for availability with `declared() <system.html#declared,untyped>`_. fd.FileHandle.setInheritable(inheritable) when defined(windows) or defined(nimdoc): - import winlean, sets, hashes + import std/[winlean, sets, hashes] type CompletionKey = ULONG_PTR @@ -307,7 +370,7 @@ when defined(windows) or defined(nimdoc): return disp.ioPort proc register*(fd: AsyncFD) = - ## Registers ``fd`` with the dispatcher. + ## Registers `fd` with the dispatcher. let p = getGlobalDispatcher() if createIoCompletionPort(fd.Handle, p.ioPort, @@ -329,7 +392,7 @@ when defined(windows) or defined(nimdoc): let p = getGlobalDispatcher() p.handles.len != 0 or p.timers.len != 0 or p.callbacks.len != 0 - proc runOnce(timeout = 500): bool = + proc runOnce(timeout: int): bool = let p = getGlobalDispatcher() if p.handles.len == 0 and p.timers.len == 0 and p.callbacks.len == 0: raise newException(ValueError, @@ -430,16 +493,16 @@ when defined(windows) or defined(nimdoc): proc recv*(socket: AsyncFD, size: int, flags = {SocketFlag.SafeDisconn}): owned(Future[string]) = - ## Reads **up to** ``size`` bytes from ``socket``. Returned future will + ## Reads **up to** `size` bytes from `socket`. Returned future will ## complete once all the data requested is read, a part of the data has been ## read, or the socket has disconnected in which case the future will - ## complete with a value of ``""``. + ## complete with a value of `""`. ## - ## **Warning**: The ``Peek`` socket flag is not supported on Windows. + ## .. warning:: The `Peek` socket flag is not supported on Windows. # Things to note: - # * When WSARecv completes immediately then ``bytesReceived`` is very + # * When WSARecv completes immediately then `bytesReceived` is very # unreliable. # * Still need to implement message-oriented socket disconnection, # '\0' in the message currently signifies a socket disconnect. Who @@ -470,7 +533,7 @@ when defined(windows) or defined(nimdoc): if flags.isDisconnectionError(errcode): retFuture.complete("") else: - retFuture.fail(newException(OSError, osErrorMsg(errcode))) + retFuture.fail(newOSError(errcode)) if dataBuf.buf != nil: dealloc dataBuf.buf dataBuf.buf = nil @@ -488,7 +551,7 @@ when defined(windows) or defined(nimdoc): if flags.isDisconnectionError(err): retFuture.complete("") else: - retFuture.fail(newException(OSError, osErrorMsg(err))) + retFuture.fail(newOSError(err)) elif ret == 0: # Request completed immediately. if bytesReceived != 0: @@ -503,17 +566,17 @@ when defined(windows) or defined(nimdoc): proc recvInto*(socket: AsyncFD, buf: pointer, size: int, flags = {SocketFlag.SafeDisconn}): owned(Future[int]) = - ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``, which must + ## Reads **up to** `size` bytes from `socket` into `buf`, which must ## at least be of that size. Returned future will complete once all the ## data requested is read, a part of the data has been read, or the socket ## has disconnected in which case the future will complete with a value of - ## ``0``. + ## `0`. ## - ## **Warning**: The ``Peek`` socket flag is not supported on Windows. + ## .. warning:: The `Peek` socket flag is not supported on Windows. # Things to note: - # * When WSARecv completes immediately then ``bytesReceived`` is very + # * When WSARecv completes immediately then `bytesReceived` is very # unreliable. # * Still need to implement message-oriented socket disconnection, # '\0' in the message currently signifies a socket disconnect. Who @@ -540,7 +603,7 @@ when defined(windows) or defined(nimdoc): if flags.isDisconnectionError(errcode): retFuture.complete(0) else: - retFuture.fail(newException(OSError, osErrorMsg(errcode))) + retFuture.fail(newOSError(errcode)) if dataBuf.buf != nil: dataBuf.buf = nil ) @@ -556,7 +619,7 @@ when defined(windows) or defined(nimdoc): if flags.isDisconnectionError(err): retFuture.complete(0) else: - retFuture.fail(newException(OSError, osErrorMsg(err))) + retFuture.fail(newOSError(err)) elif ret == 0: # Request completed immediately. if bytesReceived != 0: @@ -569,11 +632,11 @@ when defined(windows) or defined(nimdoc): proc send*(socket: AsyncFD, buf: pointer, size: int, flags = {SocketFlag.SafeDisconn}): owned(Future[void]) = - ## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future + ## Sends `size` bytes from `buf` to `socket`. The returned future ## will complete once all data has been sent. ## - ## **WARNING**: Use it with caution. If ``buf`` refers to GC'ed object, - ## you must use GC_ref/GC_unref calls to avoid early freeing of the buffer. + ## .. warning:: Use it with caution. If `buf` refers to GC'ed object, + ## you must use GC_ref/GC_unref calls to avoid early freeing of the buffer. verifyPresence(socket) var retFuture = newFuture[void]("send") @@ -604,19 +667,19 @@ when defined(windows) or defined(nimdoc): if flags.isDisconnectionError(err): retFuture.complete() else: - retFuture.fail(newException(OSError, osErrorMsg(err))) + retFuture.fail(newOSError(err)) else: retFuture.complete() - # We don't deallocate ``ol`` here because even though this completed + # We don't deallocate `ol` here because even though this completed # immediately poll will still be notified about its completion and it will - # free ``ol``. + # free `ol`. return retFuture proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr, saddrLen: SockLen, flags = {SocketFlag.SafeDisconn}): owned(Future[void]) = - ## Sends ``data`` to specified destination ``saddr``, using - ## socket ``socket``. The returned future will complete once all data + ## Sends `data` to specified destination `saddr`, using + ## socket `socket`. The returned future will complete once all data ## has been sent. verifyPresence(socket) var retFuture = newFuture[void]("sendTo") @@ -639,7 +702,7 @@ when defined(windows) or defined(nimdoc): if errcode == OSErrorCode(-1): retFuture.complete() else: - retFuture.fail(newException(OSError, osErrorMsg(errcode))) + retFuture.fail(newOSError(errcode)) ) let ret = WSASendTo(socket.SocketHandle, addr dataBuf, 1, addr bytesSent, @@ -649,20 +712,20 @@ when defined(windows) or defined(nimdoc): let err = osLastError() if err.int32 != ERROR_IO_PENDING: GC_unref(ol) - retFuture.fail(newException(OSError, osErrorMsg(err))) + retFuture.fail(newOSError(err)) else: retFuture.complete() - # We don't deallocate ``ol`` here because even though this completed + # We don't deallocate `ol` here because even though this completed # immediately poll will still be notified about its completion and it will - # free ``ol``. + # free `ol`. return retFuture proc recvFromInto*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr, saddrLen: ptr SockLen, flags = {SocketFlag.SafeDisconn}): owned(Future[int]) = - ## Receives a datagram data from ``socket`` into ``buf``, which must - ## be at least of size ``size``, address of datagram's sender will be - ## stored into ``saddr`` and ``saddrLen``. Returned future will complete + ## Receives a datagram data from `socket` into `buf`, which must + ## be at least of size `size`, address of datagram's sender will be + ## stored into `saddr` and `saddrLen`. Returned future will complete ## once one datagram has been received, and will return size of packet ## received. verifyPresence(socket) @@ -683,7 +746,7 @@ when defined(windows) or defined(nimdoc): else: # datagram sockets don't have disconnection, # so we can just raise an exception - retFuture.fail(newException(OSError, osErrorMsg(errcode))) + retFuture.fail(newOSError(errcode)) ) let res = WSARecvFrom(socket.SocketHandle, addr dataBuf, 1, @@ -694,7 +757,7 @@ when defined(windows) or defined(nimdoc): let err = osLastError() if err.int32 != ERROR_IO_PENDING: GC_unref(ol) - retFuture.fail(newException(OSError, osErrorMsg(err))) + retFuture.fail(newOSError(err)) else: # Request completed immediately. if bytesReceived != 0: @@ -707,7 +770,7 @@ when defined(windows) or defined(nimdoc): proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}, inheritable = defined(nimInheritHandles)): - owned(Future[tuple[address: string, client: AsyncFD]]) = + owned(Future[tuple[address: string, client: AsyncFD]]) {.gcsafe.} = ## Accepts a new connection. Returns a future containing the client socket ## corresponding to that connection and the remote address of the client. ## The future will complete when the connection is successfully accepted. @@ -715,11 +778,11 @@ when defined(windows) or defined(nimdoc): ## The resulting client socket is automatically registered to the ## dispatcher. ## - ## If ``inheritable`` is false (the default), the resulting client socket will + ## If `inheritable` is false (the default), the resulting client socket will ## not be inheritable by child processes. ## - ## The ``accept`` call may result in an error if the connecting socket - ## disconnects during the duration of the ``accept``. If the ``SafeDisconn`` + ## The `accept` call may result in an error if the connecting socket + ## disconnects during the duration of the `accept`. If the `SafeDisconn` ## flag is specified then this error will not be raised and instead ## accept will be called again. verifyPresence(socket) @@ -745,7 +808,7 @@ when defined(windows) or defined(nimdoc): else: retFuture.complete(newAcceptFut.read) else: - retFuture.fail(newException(OSError, osErrorMsg(errcode))) + retFuture.fail(newOSError(errcode)) template completeAccept() {.dirty.} = var listenSock = socket @@ -774,7 +837,7 @@ when defined(windows) or defined(nimdoc): var ol = newCustom() ol.data = CompletionData(fd: socket, cb: - proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = + proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) {.gcsafe.} = if not retFuture.finished: if errcode == OSErrorCode(-1): completeAccept() @@ -796,9 +859,9 @@ when defined(windows) or defined(nimdoc): GC_unref(ol) else: completeAccept() - # We don't deallocate ``ol`` here because even though this completed + # We don't deallocate `ol` here because even though this completed # immediately poll will still be notified about its completion and it will - # free ``ol``. + # free `ol`. return retFuture @@ -810,7 +873,7 @@ when defined(windows) or defined(nimdoc): getGlobalDispatcher().handles.excl(socket) proc unregister*(fd: AsyncFD) = - ## Unregisters ``fd``. + ## Unregisters `fd`. getGlobalDispatcher().handles.excl(fd) proc contains*(disp: PDispatcher, fd: AsyncFD): bool = @@ -910,9 +973,9 @@ when defined(windows) or defined(nimdoc): proc addRead*(fd: AsyncFD, cb: Callback) = ## Start watching the file descriptor for read availability and then call - ## the callback ``cb``. + ## the callback `cb`. ## - ## This is not ``pure`` mechanism for Windows Completion Ports (IOCP), + ## This is not `pure` mechanism for Windows Completion Ports (IOCP), ## so if you can avoid it, please do it. Use `addRead` only if really ## need it (main usecase is adaptation of unix-like libraries to be ## asynchronous on Windows). @@ -921,16 +984,16 @@ when defined(windows) or defined(nimdoc): ## or asyncdispatch.accept(), because they are using IOCP, please use ## nativesockets.recv() and nativesockets.accept() instead. ## - ## Be sure your callback ``cb`` returns ``true``, if you want to remove - ## watch of `read` notifications, and ``false``, if you want to continue + ## Be sure your callback `cb` returns `true`, if you want to remove + ## watch of `read` notifications, and `false`, if you want to continue ## receiving notifications. registerWaitableEvent(fd, cb, FD_READ or FD_ACCEPT or FD_OOB or FD_CLOSE) proc addWrite*(fd: AsyncFD, cb: Callback) = ## Start watching the file descriptor for write availability and then call - ## the callback ``cb``. + ## the callback `cb`. ## - ## This is not ``pure`` mechanism for Windows Completion Ports (IOCP), + ## This is not `pure` mechanism for Windows Completion Ports (IOCP), ## so if you can avoid it, please do it. Use `addWrite` only if really ## need it (main usecase is adaptation of unix-like libraries to be ## asynchronous on Windows). @@ -939,8 +1002,8 @@ when defined(windows) or defined(nimdoc): ## or asyncdispatch.connect(), because they are using IOCP, please use ## nativesockets.send() and nativesockets.connect() instead. ## - ## Be sure your callback ``cb`` returns ``true``, if you want to remove - ## watch of `write` notifications, and ``false``, if you want to continue + ## Be sure your callback `cb` returns `true`, if you want to remove + ## watch of `write` notifications, and `false`, if you want to continue ## receiving notifications. registerWaitableEvent(fd, cb, FD_WRITE or FD_CONNECT or FD_CLOSE) @@ -980,12 +1043,12 @@ when defined(windows) or defined(nimdoc): raiseOSError(osLastError()) proc addTimer*(timeout: int, oneshot: bool, cb: Callback) = - ## Registers callback ``cb`` to be called when timer expired. + ## Registers callback `cb` to be called when timer expired. ## ## Parameters: ## - ## * ``timeout`` - timeout value in milliseconds. - ## * ``oneshot`` + ## * `timeout` - timeout value in milliseconds. + ## * `oneshot` ## * `true` - generate only one timeout event ## * `false` - generate timeout events periodically @@ -1014,8 +1077,8 @@ when defined(windows) or defined(nimdoc): registerWaitableHandle(p, hEvent, flags, pcd, timeout, timercb) proc addProcess*(pid: int, cb: Callback) = - ## Registers callback ``cb`` to be called when process with process ID - ## ``pid`` exited. + ## Registers callback `cb` to be called when process with process ID + ## `pid` exited. const NULL = Handle(0) let p = getGlobalDispatcher() let procFlags = SYNCHRONIZE @@ -1024,7 +1087,7 @@ when defined(windows) or defined(nimdoc): raiseOSError(osLastError()) var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData))) - var flags = WT_EXECUTEINWAITTHREAD.DWORD + var flags = WT_EXECUTEINWAITTHREAD.DWORD or WT_EXECUTEONLYONCE.DWORD proc proccb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = closeWaitable(hProcess) @@ -1033,10 +1096,10 @@ when defined(windows) or defined(nimdoc): registerWaitableHandle(p, hProcess, flags, pcd, INFINITE, proccb) proc newAsyncEvent*(): AsyncEvent = - ## Creates a new thread-safe ``AsyncEvent`` object. + ## Creates a new thread-safe `AsyncEvent` object. ## - ## New ``AsyncEvent`` object is not automatically registered with - ## dispatcher like ``AsyncSocket``. + ## New `AsyncEvent` object is not automatically registered with + ## dispatcher like `AsyncSocket`. var sa = SECURITY_ATTRIBUTES( nLength: sizeof(SECURITY_ATTRIBUTES).cint, bInheritHandle: 1 @@ -1048,12 +1111,12 @@ when defined(windows) or defined(nimdoc): result.hEvent = event proc trigger*(ev: AsyncEvent) = - ## Set event ``ev`` to signaled state. + ## Set event `ev` to signaled state. if setEvent(ev.hEvent) == 0: raiseOSError(osLastError()) proc unregister*(ev: AsyncEvent) = - ## Unregisters event ``ev``. + ## Unregisters event `ev`. doAssert(ev.hWaiter != 0, "Event is not registered in the queue!") let p = getGlobalDispatcher() p.handles.excl(AsyncFD(ev.hEvent)) @@ -1064,14 +1127,14 @@ when defined(windows) or defined(nimdoc): ev.hWaiter = 0 proc close*(ev: AsyncEvent) = - ## Closes event ``ev``. + ## Closes event `ev`. let res = closeHandle(ev.hEvent) deallocShared(cast[pointer](ev)) if res == 0: raiseOSError(osLastError()) proc addEvent*(ev: AsyncEvent, cb: Callback) = - ## Registers callback ``cb`` to be called when ``ev`` will be signaled + ## Registers callback `cb` to be called when `ev` will be signaled doAssert(ev.hWaiter == 0, "Event is already registered in the queue!") let p = getGlobalDispatcher() @@ -1103,11 +1166,14 @@ when defined(windows) or defined(nimdoc): initAll() else: - import selectors - from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK, + import std/selectors + from std/posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK, MSG_NOSIGNAL when declared(posix.accept4): - from posix import accept4, SOCK_CLOEXEC + from std/posix import accept4, SOCK_CLOEXEC + when defined(genode): + import genode/env # get the implicit Genode env + import genode/signals const InitCallbackListSize = 4 # initial size of callbacks sequence, @@ -1126,6 +1192,8 @@ else: PDispatcher* = ref object of PDispatcherBase selector: Selector[AsyncData] + when defined(genode): + signalHandler: SignalHandler proc `==`*(x, y: AsyncFD): bool {.borrow.} proc `==`*(x, y: AsyncEvent): bool {.borrow.} @@ -1141,9 +1209,22 @@ else: result.selector = newSelector[AsyncData]() result.timers.clear() result.callbacks = initDeque[proc () {.closure, gcsafe.}](InitDelayedCallbackListSize) + when defined(genode): + let entrypoint = ep(cast[GenodeEnv](runtimeEnv)) + result.signalHandler = newSignalHandler(entrypoint): + discard runOnce(0) var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher + when defined(nuttx): + import std/exitprocs + + proc cleanDispatcher() {.noconv.} = + gDisp = nil + + proc addFinalyzer() = + addExitProc(cleanDispatcher) + proc setGlobalDispatcher*(disp: owned PDispatcher) = if not gDisp.isNil: assert gDisp.callbacks.len == 0 @@ -1153,6 +1234,8 @@ else: proc getGlobalDispatcher*(): PDispatcher = if gDisp.isNil: setGlobalDispatcher(newDispatcher()) + when defined(nuttx): + addFinalyzer() result = gDisp proc getIoHandler*(disp: PDispatcher): Selector[AsyncData] = @@ -1279,7 +1362,7 @@ else: var newList = newSeqOfCap[Callback](newLength) var cb = curList[0] - if not cb(fd.AsyncFD): + if not cb(fd): newList.add(cb) withData(p.selector, fd.int, adata) do: @@ -1310,10 +1393,11 @@ else: ValueError, "Expecting async operations to stop when fd has closed." ) - - proc runOnce(timeout = 500): bool = + proc runOnce(timeout: int): bool = let p = getGlobalDispatcher() if p.selector.isEmpty() and p.timers.len == 0 and p.callbacks.len == 0: + when defined(genode): + if timeout == 0: return raise newException(ValueError, "No handles or timers registered in dispatcher.") @@ -1384,7 +1468,7 @@ else: if flags.isDisconnectionError(lastError): retFuture.complete("") else: - retFuture.fail(newException(OSError, osErrorMsg(lastError))) + retFuture.fail(newOSError(lastError)) else: result = false # We still want this callback to be called. elif res == 0: @@ -1413,7 +1497,7 @@ else: if flags.isDisconnectionError(lastError): retFuture.complete(0) else: - retFuture.fail(newException(OSError, osErrorMsg(lastError))) + retFuture.fail(newOSError(lastError)) else: result = false # We still want this callback to be called. else: @@ -1460,8 +1544,8 @@ else: proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr, saddrLen: SockLen, flags = {SocketFlag.SafeDisconn}): owned(Future[void]) = - ## Sends ``data`` of size ``size`` in bytes to specified destination - ## (``saddr`` of size ``saddrLen`` in bytes, using socket ``socket``. + ## Sends `data` of size `size` in bytes to specified destination + ## (`saddr` of size `saddrLen` in bytes, using socket `socket`. ## The returned future will complete once all data has been sent. var retFuture = newFuture[void]("sendTo") @@ -1479,7 +1563,7 @@ else: let lastError = osLastError() if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and lastError.int32 != EAGAIN: - retFuture.fail(newException(OSError, osErrorMsg(lastError))) + retFuture.fail(newOSError(lastError)) else: result = false # We still want this callback to be called. else: @@ -1491,9 +1575,9 @@ else: proc recvFromInto*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr, saddrLen: ptr SockLen, flags = {SocketFlag.SafeDisconn}): owned(Future[int]) = - ## Receives a datagram data from ``socket`` into ``data``, which must - ## be at least of size ``size`` in bytes, address of datagram's sender - ## will be stored into ``saddr`` and ``saddrLen``. Returned future will + ## Receives a datagram data from `socket` into `data`, which must + ## be at least of size `size` in bytes, address of datagram's sender + ## will be stored into `saddr` and `saddrLen`. Returned future will ## complete once one datagram has been received, and will return size ## of packet received. var retFuture = newFuture[int]("recvFromInto") @@ -1505,7 +1589,7 @@ else: let lastError = osLastError() if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and lastError.int32 != EAGAIN: - retFuture.fail(newException(OSError, osErrorMsg(lastError))) + retFuture.fail(newOSError(lastError)) else: result = false else: @@ -1518,7 +1602,7 @@ else: owned(Future[tuple[address: string, client: AsyncFD]]) = var retFuture = newFuture[tuple[address: string, client: AsyncFD]]("acceptAddr") - proc cb(sock: AsyncFD): bool = + proc cb(sock: AsyncFD): bool {.gcsafe.} = result = true var sockAddress: Sockaddr_storage var addrLen = sizeof(sockAddress).SockLen @@ -1546,7 +1630,7 @@ else: if flags.isDisconnectionError(lastError): return false else: - retFuture.fail(newException(OSError, osErrorMsg(lastError))) + retFuture.fail(newOSError(lastError)) else: try: let address = getAddrString(cast[ptr SockAddr](addr sockAddress)) @@ -1563,45 +1647,45 @@ else: proc addTimer*(timeout: int, oneshot: bool, cb: Callback) = ## Start watching for timeout expiration, and then call the - ## callback ``cb``. - ## ``timeout`` - time in milliseconds, - ## ``oneshot`` - if ``true`` only one event will be dispatched, - ## if ``false`` continuous events every ``timeout`` milliseconds. + ## callback `cb`. + ## `timeout` - time in milliseconds, + ## `oneshot` - if `true` only one event will be dispatched, + ## if `false` continuous events every `timeout` milliseconds. let p = getGlobalDispatcher() var data = newAsyncData() data.readList.add(cb) p.selector.registerTimer(timeout, oneshot, data) proc addSignal*(signal: int, cb: Callback) = - ## Start watching signal ``signal``, and when signal appears, call the - ## callback ``cb``. + ## Start watching signal `signal`, and when signal appears, call the + ## callback `cb`. let p = getGlobalDispatcher() var data = newAsyncData() data.readList.add(cb) p.selector.registerSignal(signal, data) proc addProcess*(pid: int, cb: Callback) = - ## Start watching for process exit with pid ``pid``, and then call - ## the callback ``cb``. + ## Start watching for process exit with pid `pid`, and then call + ## the callback `cb`. let p = getGlobalDispatcher() var data = newAsyncData() data.readList.add(cb) p.selector.registerProcess(pid, data) proc newAsyncEvent*(): AsyncEvent = - ## Creates new ``AsyncEvent``. + ## Creates new `AsyncEvent`. result = AsyncEvent(newSelectEvent()) proc trigger*(ev: AsyncEvent) = - ## Sets new ``AsyncEvent`` to signaled state. + ## Sets new `AsyncEvent` to signaled state. trigger(SelectEvent(ev)) proc close*(ev: AsyncEvent) = - ## Closes ``AsyncEvent`` + ## Closes `AsyncEvent` close(SelectEvent(ev)) proc addEvent*(ev: AsyncEvent, cb: Callback) = - ## Start watching for event ``ev``, and call callback ``cb``, when + ## Start watching for event `ev`, and call callback `cb`, when ## ev will be set to signaled state. let p = getGlobalDispatcher() var data = newAsyncData() @@ -1609,8 +1693,8 @@ else: p.selector.registerEvent(SelectEvent(ev), data) proc drain*(timeout = 500) = - ## Waits for completion of **all** events and processes them. Raises ``ValueError`` - ## if there are no pending operations. In contrast to ``poll`` this + ## Waits for completion of **all** events and processes them. Raises `ValueError` + ## if there are no pending operations. In contrast to `poll` this ## processes as many events as are available until the timeout has elapsed. var curTimeout = timeout let start = now() @@ -1621,7 +1705,7 @@ proc drain*(timeout = 500) = break proc poll*(timeout = 500) = - ## Waits for completion events and processes them. Raises ``ValueError`` + ## Waits for completion events and processes them. Raises `ValueError` ## if there are no pending operations. This runs the underlying OS ## `epoll`:idx: or `kqueue`:idx: primitive only once. discard runOnce(timeout) @@ -1675,9 +1759,11 @@ when defined(windows) or defined(nimdoc): proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = if not retFuture.finished: if errcode == OSErrorCode(-1): + const SO_UPDATE_CONNECT_CONTEXT = 0x7010 + socket.SocketHandle.setSockOptInt(SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, 1) # 15022 retFuture.complete() else: - retFuture.fail(newException(OSError, osErrorMsg(errcode))) + retFuture.fail(newOSError(errcode)) ) let ret = connectEx(socket.SocketHandle, addrInfo.ai_addr, @@ -1686,16 +1772,16 @@ when defined(windows) or defined(nimdoc): if ret: # Request to connect completed immediately. retFuture.complete() - # We don't deallocate ``ol`` here because even though this completed + # We don't deallocate `ol` here because even though this completed # immediately poll will still be notified about its completion and it - # will free ``ol``. + # will free `ol`. else: let lastError = osLastError() if lastError.int32 != ERROR_IO_PENDING: - # With ERROR_IO_PENDING ``ol`` will be deallocated in ``poll``, + # With ERROR_IO_PENDING `ol` will be deallocated in `poll`, # and the future will be completed/failed there, too. GC_unref(ol) - retFuture.fail(newException(OSError, osErrorMsg(lastError))) + retFuture.fail(newOSError(lastError)) else: proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): owned(Future[void]) = let retFuture = newFuture[void]("doConnect") @@ -1712,7 +1798,7 @@ else: # interrupted, keep waiting return false else: - retFuture.fail(newException(OSError, osErrorMsg(OSErrorCode(ret)))) + retFuture.fail(newOSError(OSErrorCode(ret))) return true let ret = connect(socket.SocketHandle, @@ -1726,7 +1812,7 @@ else: if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS: addWrite(socket, cb) else: - retFuture.fail(newException(OSError, osErrorMsg(lastError))) + retFuture.fail(newOSError(lastError)) template asyncAddrInfoLoop(addrInfo: ptr AddrInfo, fd: untyped, protocol: Protocol = IPPROTO_RAW) = @@ -1765,7 +1851,7 @@ template asyncAddrInfoLoop(addrInfo: ptr AddrInfo, fd: untyped, curAddrInfo = curAddrInfo.ai_next if curAddrInfo == nil: - freeaddrinfo(addrInfo) + freeAddrInfo(addrInfo) when shouldCreateFd: closeUnusedFds() if lastException != nil: @@ -1781,7 +1867,7 @@ template asyncAddrInfoLoop(addrInfo: ptr AddrInfo, fd: untyped, try: curFd = createAsyncNativeSocket(domain, sockType, protocol) except: - freeaddrinfo(addrInfo) + freeAddrInfo(addrInfo) closeUnusedFds() raise getCurrentException() when defined(windows): @@ -1791,7 +1877,7 @@ template asyncAddrInfoLoop(addrInfo: ptr AddrInfo, fd: untyped, doConnect(curFd, curAddrInfo).callback = tryNextAddrInfo curAddrInfo = curAddrInfo.ai_next else: - freeaddrinfo(addrInfo) + freeAddrInfo(addrInfo) when shouldCreateFd: closeUnusedFds(ord(domain)) retFuture.complete(curFd) @@ -1802,9 +1888,9 @@ template asyncAddrInfoLoop(addrInfo: ptr AddrInfo, fd: untyped, proc dial*(address: string, port: Port, protocol: Protocol = IPPROTO_TCP): owned(Future[AsyncFD]) = - ## Establishes connection to the specified ``address``:``port`` pair via the + ## Establishes connection to the specified `address`:`port` pair via the ## specified protocol. The procedure iterates through possible - ## resolutions of the ``address`` until it succeeds, meaning that it + ## resolutions of the `address` until it succeeds, meaning that it ## seamlessly works with both IPv4 and IPv6. ## Returns the async file descriptor, registered in the dispatcher of ## the current thread, ready to send or receive data. @@ -1832,7 +1918,7 @@ proc connect*(socket: AsyncFD, address: string, port: Port, proc sleepAsync*(ms: int | float): owned(Future[void]) = ## Suspends the execution of the current async procedure for the next - ## ``ms`` milliseconds. + ## `ms` milliseconds. var retFuture = newFuture[void]("sleepAsync") let p = getGlobalDispatcher() when ms is int: @@ -1843,11 +1929,11 @@ proc sleepAsync*(ms: int | float): owned(Future[void]) = return retFuture proc withTimeout*[T](fut: Future[T], timeout: int): owned(Future[bool]) = - ## Returns a future which will complete once ``fut`` completes or after - ## ``timeout`` milliseconds has elapsed. + ## Returns a future which will complete once `fut` completes or after + ## `timeout` milliseconds has elapsed. ## - ## If ``fut`` completes first the returned future will hold true, - ## otherwise, if ``timeout`` milliseconds has elapsed first, the returned + ## If `fut` completes first the returned future will hold true, + ## otherwise, if `timeout` milliseconds has elapsed first, the returned ## future will hold false. var retFuture = newFuture[bool]("asyncdispatch.`withTimeout`") @@ -1870,7 +1956,7 @@ proc accept*(socket: AsyncFD, ## Accepts a new connection. Returns a future containing the client socket ## corresponding to that connection. ## - ## If ``inheritable`` is false (the default), the resulting client socket + ## If `inheritable` is false (the default), the resulting client socket ## will not be inheritable by child processes. ## ## The future will complete when the connection is successfully accepted. @@ -1890,7 +1976,7 @@ proc keepAlive(x: string) = proc send*(socket: AsyncFD, data: string, flags = {SocketFlag.SafeDisconn}): owned(Future[void]) = - ## Sends ``data`` to ``socket``. The returned future will complete once all + ## Sends `data` to `socket`. The returned future will complete once all ## data has been sent. var retFuture = newFuture[void]("send") if data.len > 0: @@ -1908,7 +1994,8 @@ proc send*(socket: AsyncFD, data: string, return retFuture # -- Await Macro -include asyncmacro +import std/asyncmacro +export asyncmacro proc readAll*(future: FutureStream[string]): owned(Future[string]) {.async.} = ## Returns a future that will complete when all the string data from the @@ -1945,17 +2032,34 @@ proc activeDescriptors*(): int {.inline.} = result = getGlobalDispatcher().selector.count when defined(posix): - import posix + import std/posix -when defined(linux) or defined(windows) or defined(macosx) or defined(bsd): +when defined(linux) or defined(windows) or defined(macosx) or defined(bsd) or + defined(solaris) or defined(zephyr) or defined(freertos) or defined(nuttx) or defined(haiku): proc maxDescriptors*(): int {.raises: OSError.} = ## Returns the maximum number of active file descriptors for the current ## process. This involves a system call. For now `maxDescriptors` is - ## supported on the following OSes: Windows, Linux, OSX, BSD. + ## supported on the following OSes: Windows, Linux, OSX, BSD, Solaris. when defined(windows): result = 16_700_000 + elif defined(zephyr) or defined(freertos): + result = FD_MAX else: var fdLim: RLimit if getrlimit(RLIMIT_NOFILE, fdLim) < 0: raiseOSError(osLastError()) result = int(fdLim.rlim_cur) - 1 + +when defined(genode): + proc scheduleCallbacks*(): bool {.discardable.} = + ## *Genode only.* + ## Schedule callback processing and return immediately. + ## Returns `false` if there is nothing to schedule. + ## RPC servers should call this to dispatch `callSoon` + ## bodies after retiring an RPC to its client. + ## This is effectively a non-blocking `poll(…)` and is + ## equivalent to scheduling a momentary no-op timeout + ## but faster and with less overhead. + let dis = getGlobalDispatcher() + result = dis.callbacks.len > 0 + if result: submit(dis.signalHandler.cap) |