diff options
Diffstat (limited to 'lib/pure/asyncdispatch.nim')
-rw-r--r-- | lib/pure/asyncdispatch.nim | 130 |
1 files changed, 84 insertions, 46 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index 92ad9c5ff..126db7a7f 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -41,13 +41,13 @@ ## requested amount of data is read **or** an exception occurs. ## ## Code to read some data from a socket may look something like this: -## -## .. code-block:: Nim -## var future = socket.recv(100) -## future.addCallback( -## proc () = -## echo(future.read) -## ) +## ```Nim +## var future = socket.recv(100) +## future.addCallback( +## proc () = +## echo(future.read) +## ) +## ``` ## ## All asynchronous functions returning a `Future` will not block. They ## will not however return immediately. An asynchronous function will have @@ -108,24 +108,24 @@ ## You can handle exceptions in the same way as in ordinary Nim code; ## by using the try statement: ## -## -## .. code-block:: Nim +## ```Nim ## try: ## let data = await sock.recv(100) ## echo("Received ", data) ## except: ## # Handle exception -## -## +## ``` ## ## An alternative approach to handling exceptions is to use `yield` on a future ## then check the future's `failed` property. For example: ## -## .. code-block:: Nim +## ```Nim ## var future = sock.recv(100) ## yield future ## if future.failed: ## # Handle exception +## ``` +## ## ## Discarding futures ## ================== @@ -226,11 +226,11 @@ ## ``none`` can be used when a library supports both a synchronous and ## asynchronous API, to disable the latter. -import os, tables, strutils, times, heapqueue, options, asyncstreams -import options, math, std/monotimes -import asyncfutures except callSoon +import std/[os, tables, strutils, times, heapqueue, options, asyncstreams] +import std/[math, monotimes] +import std/asyncfutures except callSoon -import nativesockets, net, deques +import std/[nativesockets, net, deques] when defined(nimPreviewSlimSystem): import std/[assertions, syncio] @@ -281,6 +281,8 @@ proc adjustTimeout( result = max(nextTimer.get(), 0) result = min(pollTimeout, result) +proc runOnce(timeout: int): bool {.gcsafe.} + proc callSoon*(cbproc: proc () {.gcsafe.}) {.gcsafe.} ## Schedule `cbproc` to be called as soon as possible. ## The callback is called when control returns to the event loop. @@ -300,7 +302,7 @@ template implementSetInheritable() {.dirty.} = fd.FileHandle.setInheritable(inheritable) when defined(windows) or defined(nimdoc): - import winlean, sets, hashes + import std/[winlean, sets, hashes] type CompletionKey = ULONG_PTR @@ -390,7 +392,7 @@ when defined(windows) or defined(nimdoc): let p = getGlobalDispatcher() p.handles.len != 0 or p.timers.len != 0 or p.callbacks.len != 0 - proc runOnce(timeout = 500): bool = + proc runOnce(timeout: int): bool = let p = getGlobalDispatcher() if p.handles.len == 0 and p.timers.len == 0 and p.callbacks.len == 0: raise newException(ValueError, @@ -531,7 +533,7 @@ when defined(windows) or defined(nimdoc): if flags.isDisconnectionError(errcode): retFuture.complete("") else: - retFuture.fail(newException(OSError, osErrorMsg(errcode))) + retFuture.fail(newOSError(errcode)) if dataBuf.buf != nil: dealloc dataBuf.buf dataBuf.buf = nil @@ -549,7 +551,7 @@ when defined(windows) or defined(nimdoc): if flags.isDisconnectionError(err): retFuture.complete("") else: - retFuture.fail(newException(OSError, osErrorMsg(err))) + retFuture.fail(newOSError(err)) elif ret == 0: # Request completed immediately. if bytesReceived != 0: @@ -601,7 +603,7 @@ when defined(windows) or defined(nimdoc): if flags.isDisconnectionError(errcode): retFuture.complete(0) else: - retFuture.fail(newException(OSError, osErrorMsg(errcode))) + retFuture.fail(newOSError(errcode)) if dataBuf.buf != nil: dataBuf.buf = nil ) @@ -617,7 +619,7 @@ when defined(windows) or defined(nimdoc): if flags.isDisconnectionError(err): retFuture.complete(0) else: - retFuture.fail(newException(OSError, osErrorMsg(err))) + retFuture.fail(newOSError(err)) elif ret == 0: # Request completed immediately. if bytesReceived != 0: @@ -665,7 +667,7 @@ when defined(windows) or defined(nimdoc): if flags.isDisconnectionError(err): retFuture.complete() else: - retFuture.fail(newException(OSError, osErrorMsg(err))) + retFuture.fail(newOSError(err)) else: retFuture.complete() # We don't deallocate `ol` here because even though this completed @@ -700,7 +702,7 @@ when defined(windows) or defined(nimdoc): if errcode == OSErrorCode(-1): retFuture.complete() else: - retFuture.fail(newException(OSError, osErrorMsg(errcode))) + retFuture.fail(newOSError(errcode)) ) let ret = WSASendTo(socket.SocketHandle, addr dataBuf, 1, addr bytesSent, @@ -710,7 +712,7 @@ when defined(windows) or defined(nimdoc): let err = osLastError() if err.int32 != ERROR_IO_PENDING: GC_unref(ol) - retFuture.fail(newException(OSError, osErrorMsg(err))) + retFuture.fail(newOSError(err)) else: retFuture.complete() # We don't deallocate `ol` here because even though this completed @@ -744,7 +746,7 @@ when defined(windows) or defined(nimdoc): else: # datagram sockets don't have disconnection, # so we can just raise an exception - retFuture.fail(newException(OSError, osErrorMsg(errcode))) + retFuture.fail(newOSError(errcode)) ) let res = WSARecvFrom(socket.SocketHandle, addr dataBuf, 1, @@ -755,7 +757,7 @@ when defined(windows) or defined(nimdoc): let err = osLastError() if err.int32 != ERROR_IO_PENDING: GC_unref(ol) - retFuture.fail(newException(OSError, osErrorMsg(err))) + retFuture.fail(newOSError(err)) else: # Request completed immediately. if bytesReceived != 0: @@ -806,7 +808,7 @@ when defined(windows) or defined(nimdoc): else: retFuture.complete(newAcceptFut.read) else: - retFuture.fail(newException(OSError, osErrorMsg(errcode))) + retFuture.fail(newOSError(errcode)) template completeAccept() {.dirty.} = var listenSock = socket @@ -1164,11 +1166,14 @@ when defined(windows) or defined(nimdoc): initAll() else: - import selectors - from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK, + import std/selectors + from std/posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK, MSG_NOSIGNAL when declared(posix.accept4): - from posix import accept4, SOCK_CLOEXEC + from std/posix import accept4, SOCK_CLOEXEC + when defined(genode): + import genode/env # get the implicit Genode env + import genode/signals const InitCallbackListSize = 4 # initial size of callbacks sequence, @@ -1187,6 +1192,8 @@ else: PDispatcher* = ref object of PDispatcherBase selector: Selector[AsyncData] + when defined(genode): + signalHandler: SignalHandler proc `==`*(x, y: AsyncFD): bool {.borrow.} proc `==`*(x, y: AsyncEvent): bool {.borrow.} @@ -1202,9 +1209,22 @@ else: result.selector = newSelector[AsyncData]() result.timers.clear() result.callbacks = initDeque[proc () {.closure, gcsafe.}](InitDelayedCallbackListSize) + when defined(genode): + let entrypoint = ep(cast[GenodeEnv](runtimeEnv)) + result.signalHandler = newSignalHandler(entrypoint): + discard runOnce(0) var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher + when defined(nuttx): + import std/exitprocs + + proc cleanDispatcher() {.noconv.} = + gDisp = nil + + proc addFinalyzer() = + addExitProc(cleanDispatcher) + proc setGlobalDispatcher*(disp: owned PDispatcher) = if not gDisp.isNil: assert gDisp.callbacks.len == 0 @@ -1214,6 +1234,8 @@ else: proc getGlobalDispatcher*(): PDispatcher = if gDisp.isNil: setGlobalDispatcher(newDispatcher()) + when defined(nuttx): + addFinalyzer() result = gDisp proc getIoHandler*(disp: PDispatcher): Selector[AsyncData] = @@ -1371,10 +1393,11 @@ else: ValueError, "Expecting async operations to stop when fd has closed." ) - - proc runOnce(timeout = 500): bool = + proc runOnce(timeout: int): bool = let p = getGlobalDispatcher() if p.selector.isEmpty() and p.timers.len == 0 and p.callbacks.len == 0: + when defined(genode): + if timeout == 0: return raise newException(ValueError, "No handles or timers registered in dispatcher.") @@ -1445,7 +1468,7 @@ else: if flags.isDisconnectionError(lastError): retFuture.complete("") else: - retFuture.fail(newException(OSError, osErrorMsg(lastError))) + retFuture.fail(newOSError(lastError)) else: result = false # We still want this callback to be called. elif res == 0: @@ -1474,7 +1497,7 @@ else: if flags.isDisconnectionError(lastError): retFuture.complete(0) else: - retFuture.fail(newException(OSError, osErrorMsg(lastError))) + retFuture.fail(newOSError(lastError)) else: result = false # We still want this callback to be called. else: @@ -1540,7 +1563,7 @@ else: let lastError = osLastError() if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and lastError.int32 != EAGAIN: - retFuture.fail(newException(OSError, osErrorMsg(lastError))) + retFuture.fail(newOSError(lastError)) else: result = false # We still want this callback to be called. else: @@ -1566,7 +1589,7 @@ else: let lastError = osLastError() if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and lastError.int32 != EAGAIN: - retFuture.fail(newException(OSError, osErrorMsg(lastError))) + retFuture.fail(newOSError(lastError)) else: result = false else: @@ -1579,7 +1602,7 @@ else: owned(Future[tuple[address: string, client: AsyncFD]]) = var retFuture = newFuture[tuple[address: string, client: AsyncFD]]("acceptAddr") - proc cb(sock: AsyncFD): bool = + proc cb(sock: AsyncFD): bool {.gcsafe.} = result = true var sockAddress: Sockaddr_storage var addrLen = sizeof(sockAddress).SockLen @@ -1607,7 +1630,7 @@ else: if flags.isDisconnectionError(lastError): return false else: - retFuture.fail(newException(OSError, osErrorMsg(lastError))) + retFuture.fail(newOSError(lastError)) else: try: let address = getAddrString(cast[ptr SockAddr](addr sockAddress)) @@ -1740,7 +1763,7 @@ when defined(windows) or defined(nimdoc): socket.SocketHandle.setSockOptInt(SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, 1) # 15022 retFuture.complete() else: - retFuture.fail(newException(OSError, osErrorMsg(errcode))) + retFuture.fail(newOSError(errcode)) ) let ret = connectEx(socket.SocketHandle, addrInfo.ai_addr, @@ -1758,7 +1781,7 @@ when defined(windows) or defined(nimdoc): # With ERROR_IO_PENDING `ol` will be deallocated in `poll`, # and the future will be completed/failed there, too. GC_unref(ol) - retFuture.fail(newException(OSError, osErrorMsg(lastError))) + retFuture.fail(newOSError(lastError)) else: proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): owned(Future[void]) = let retFuture = newFuture[void]("doConnect") @@ -1775,7 +1798,7 @@ else: # interrupted, keep waiting return false else: - retFuture.fail(newException(OSError, osErrorMsg(OSErrorCode(ret)))) + retFuture.fail(newOSError(OSErrorCode(ret))) return true let ret = connect(socket.SocketHandle, @@ -1789,7 +1812,7 @@ else: if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS: addWrite(socket, cb) else: - retFuture.fail(newException(OSError, osErrorMsg(lastError))) + retFuture.fail(newOSError(lastError)) template asyncAddrInfoLoop(addrInfo: ptr AddrInfo, fd: untyped, protocol: Protocol = IPPROTO_RAW) = @@ -1971,7 +1994,8 @@ proc send*(socket: AsyncFD, data: string, return retFuture # -- Await Macro -include asyncmacro +import std/asyncmacro +export asyncmacro proc readAll*(future: FutureStream[string]): owned(Future[string]) {.async.} = ## Returns a future that will complete when all the string data from the @@ -2008,10 +2032,10 @@ proc activeDescriptors*(): int {.inline.} = result = getGlobalDispatcher().selector.count when defined(posix): - import posix + import std/posix when defined(linux) or defined(windows) or defined(macosx) or defined(bsd) or - defined(solaris) or defined(zephyr) or defined(freertos): + defined(solaris) or defined(zephyr) or defined(freertos) or defined(nuttx) or defined(haiku): proc maxDescriptors*(): int {.raises: OSError.} = ## Returns the maximum number of active file descriptors for the current ## process. This involves a system call. For now `maxDescriptors` is @@ -2025,3 +2049,17 @@ when defined(linux) or defined(windows) or defined(macosx) or defined(bsd) or if getrlimit(RLIMIT_NOFILE, fdLim) < 0: raiseOSError(osLastError()) result = int(fdLim.rlim_cur) - 1 + +when defined(genode): + proc scheduleCallbacks*(): bool {.discardable.} = + ## *Genode only.* + ## Schedule callback processing and return immediately. + ## Returns `false` if there is nothing to schedule. + ## RPC servers should call this to dispatch `callSoon` + ## bodies after retiring an RPC to its client. + ## This is effectively a non-blocking `poll(…)` and is + ## equivalent to scheduling a momentary no-op timeout + ## but faster and with less overhead. + let dis = getGlobalDispatcher() + result = dis.callbacks.len > 0 + if result: submit(dis.signalHandler.cap) |