summary refs log tree commit diff stats
path: root/lib/pure/asyncdispatch.nim
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pure/asyncdispatch.nim')
-rw-r--r--lib/pure/asyncdispatch.nim130
1 files changed, 84 insertions, 46 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim
index 92ad9c5ff..126db7a7f 100644
--- a/lib/pure/asyncdispatch.nim
+++ b/lib/pure/asyncdispatch.nim
@@ -41,13 +41,13 @@
 ## requested amount of data is read **or** an exception occurs.
 ##
 ## Code to read some data from a socket may look something like this:
-##
-## .. code-block:: Nim
-##    var future = socket.recv(100)
-##    future.addCallback(
-##      proc () =
-##        echo(future.read)
-##    )
+##   ```Nim
+##   var future = socket.recv(100)
+##   future.addCallback(
+##     proc () =
+##       echo(future.read)
+##   )
+##   ```
 ##
 ## All asynchronous functions returning a `Future` will not block. They
 ## will not however return immediately. An asynchronous function will have
@@ -108,24 +108,24 @@
 ## You can handle exceptions in the same way as in ordinary Nim code;
 ## by using the try statement:
 ##
-##
-## .. code-block:: Nim
+##   ```Nim
 ##   try:
 ##     let data = await sock.recv(100)
 ##     echo("Received ", data)
 ##   except:
 ##     # Handle exception
-##
-##
+##   ```
 ##
 ## An alternative approach to handling exceptions is to use `yield` on a future
 ## then check the future's `failed` property. For example:
 ##
-## .. code-block:: Nim
+##   ```Nim
 ##   var future = sock.recv(100)
 ##   yield future
 ##   if future.failed:
 ##     # Handle exception
+##   ```
+##
 ##
 ## Discarding futures
 ## ==================
@@ -226,11 +226,11 @@
 ## ``none`` can be used when a library supports both a synchronous and
 ## asynchronous API, to disable the latter.
 
-import os, tables, strutils, times, heapqueue, options, asyncstreams
-import options, math, std/monotimes
-import asyncfutures except callSoon
+import std/[os, tables, strutils, times, heapqueue, options, asyncstreams]
+import std/[math, monotimes]
+import std/asyncfutures except callSoon
 
-import nativesockets, net, deques
+import std/[nativesockets, net, deques]
 
 when defined(nimPreviewSlimSystem):
   import std/[assertions, syncio]
@@ -281,6 +281,8 @@ proc adjustTimeout(
   result = max(nextTimer.get(), 0)
   result = min(pollTimeout, result)
 
+proc runOnce(timeout: int): bool {.gcsafe.}
+
 proc callSoon*(cbproc: proc () {.gcsafe.}) {.gcsafe.}
   ## Schedule `cbproc` to be called as soon as possible.
   ## The callback is called when control returns to the event loop.
@@ -300,7 +302,7 @@ template implementSetInheritable() {.dirty.} =
       fd.FileHandle.setInheritable(inheritable)
 
 when defined(windows) or defined(nimdoc):
-  import winlean, sets, hashes
+  import std/[winlean, sets, hashes]
   type
     CompletionKey = ULONG_PTR
 
@@ -390,7 +392,7 @@ when defined(windows) or defined(nimdoc):
     let p = getGlobalDispatcher()
     p.handles.len != 0 or p.timers.len != 0 or p.callbacks.len != 0
 
-  proc runOnce(timeout = 500): bool =
+  proc runOnce(timeout: int): bool =
     let p = getGlobalDispatcher()
     if p.handles.len == 0 and p.timers.len == 0 and p.callbacks.len == 0:
       raise newException(ValueError,
@@ -531,7 +533,7 @@ when defined(windows) or defined(nimdoc):
             if flags.isDisconnectionError(errcode):
               retFuture.complete("")
             else:
-              retFuture.fail(newException(OSError, osErrorMsg(errcode)))
+              retFuture.fail(newOSError(errcode))
         if dataBuf.buf != nil:
           dealloc dataBuf.buf
           dataBuf.buf = nil
@@ -549,7 +551,7 @@ when defined(windows) or defined(nimdoc):
         if flags.isDisconnectionError(err):
           retFuture.complete("")
         else:
-          retFuture.fail(newException(OSError, osErrorMsg(err)))
+          retFuture.fail(newOSError(err))
     elif ret == 0:
       # Request completed immediately.
       if bytesReceived != 0:
@@ -601,7 +603,7 @@ when defined(windows) or defined(nimdoc):
             if flags.isDisconnectionError(errcode):
               retFuture.complete(0)
             else:
-              retFuture.fail(newException(OSError, osErrorMsg(errcode)))
+              retFuture.fail(newOSError(errcode))
         if dataBuf.buf != nil:
           dataBuf.buf = nil
     )
@@ -617,7 +619,7 @@ when defined(windows) or defined(nimdoc):
         if flags.isDisconnectionError(err):
           retFuture.complete(0)
         else:
-          retFuture.fail(newException(OSError, osErrorMsg(err)))
+          retFuture.fail(newOSError(err))
     elif ret == 0:
       # Request completed immediately.
       if bytesReceived != 0:
@@ -665,7 +667,7 @@ when defined(windows) or defined(nimdoc):
         if flags.isDisconnectionError(err):
           retFuture.complete()
         else:
-          retFuture.fail(newException(OSError, osErrorMsg(err)))
+          retFuture.fail(newOSError(err))
     else:
       retFuture.complete()
       # We don't deallocate `ol` here because even though this completed
@@ -700,7 +702,7 @@ when defined(windows) or defined(nimdoc):
           if errcode == OSErrorCode(-1):
             retFuture.complete()
           else:
-            retFuture.fail(newException(OSError, osErrorMsg(errcode)))
+            retFuture.fail(newOSError(errcode))
     )
 
     let ret = WSASendTo(socket.SocketHandle, addr dataBuf, 1, addr bytesSent,
@@ -710,7 +712,7 @@ when defined(windows) or defined(nimdoc):
       let err = osLastError()
       if err.int32 != ERROR_IO_PENDING:
         GC_unref(ol)
-        retFuture.fail(newException(OSError, osErrorMsg(err)))
+        retFuture.fail(newOSError(err))
     else:
       retFuture.complete()
       # We don't deallocate `ol` here because even though this completed
@@ -744,7 +746,7 @@ when defined(windows) or defined(nimdoc):
           else:
             # datagram sockets don't have disconnection,
             # so we can just raise an exception
-            retFuture.fail(newException(OSError, osErrorMsg(errcode)))
+            retFuture.fail(newOSError(errcode))
     )
 
     let res = WSARecvFrom(socket.SocketHandle, addr dataBuf, 1,
@@ -755,7 +757,7 @@ when defined(windows) or defined(nimdoc):
       let err = osLastError()
       if err.int32 != ERROR_IO_PENDING:
         GC_unref(ol)
-        retFuture.fail(newException(OSError, osErrorMsg(err)))
+        retFuture.fail(newOSError(err))
     else:
       # Request completed immediately.
       if bytesReceived != 0:
@@ -806,7 +808,7 @@ when defined(windows) or defined(nimdoc):
             else:
               retFuture.complete(newAcceptFut.read)
       else:
-        retFuture.fail(newException(OSError, osErrorMsg(errcode)))
+        retFuture.fail(newOSError(errcode))
 
     template completeAccept() {.dirty.} =
       var listenSock = socket
@@ -1164,11 +1166,14 @@ when defined(windows) or defined(nimdoc):
 
   initAll()
 else:
-  import selectors
-  from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK,
+  import std/selectors
+  from std/posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK,
                     MSG_NOSIGNAL
   when declared(posix.accept4):
-    from posix import accept4, SOCK_CLOEXEC
+    from std/posix import accept4, SOCK_CLOEXEC
+  when defined(genode):
+    import genode/env # get the implicit Genode env
+    import genode/signals
 
   const
     InitCallbackListSize = 4         # initial size of callbacks sequence,
@@ -1187,6 +1192,8 @@ else:
 
     PDispatcher* = ref object of PDispatcherBase
       selector: Selector[AsyncData]
+      when defined(genode):
+        signalHandler: SignalHandler
 
   proc `==`*(x, y: AsyncFD): bool {.borrow.}
   proc `==`*(x, y: AsyncEvent): bool {.borrow.}
@@ -1202,9 +1209,22 @@ else:
     result.selector = newSelector[AsyncData]()
     result.timers.clear()
     result.callbacks = initDeque[proc () {.closure, gcsafe.}](InitDelayedCallbackListSize)
+    when defined(genode):
+      let entrypoint = ep(cast[GenodeEnv](runtimeEnv))
+      result.signalHandler = newSignalHandler(entrypoint):
+        discard runOnce(0)
 
   var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher
 
+  when defined(nuttx):
+    import std/exitprocs
+
+    proc cleanDispatcher() {.noconv.} =
+      gDisp = nil
+
+    proc addFinalyzer() =
+      addExitProc(cleanDispatcher)
+
   proc setGlobalDispatcher*(disp: owned PDispatcher) =
     if not gDisp.isNil:
       assert gDisp.callbacks.len == 0
@@ -1214,6 +1234,8 @@ else:
   proc getGlobalDispatcher*(): PDispatcher =
     if gDisp.isNil:
       setGlobalDispatcher(newDispatcher())
+      when defined(nuttx):
+        addFinalyzer()
     result = gDisp
 
   proc getIoHandler*(disp: PDispatcher): Selector[AsyncData] =
@@ -1371,10 +1393,11 @@ else:
           ValueError, "Expecting async operations to stop when fd has closed."
         )
 
-
-  proc runOnce(timeout = 500): bool =
+  proc runOnce(timeout: int): bool =
     let p = getGlobalDispatcher()
     if p.selector.isEmpty() and p.timers.len == 0 and p.callbacks.len == 0:
+      when defined(genode):
+        if timeout == 0: return
       raise newException(ValueError,
         "No handles or timers registered in dispatcher.")
 
@@ -1445,7 +1468,7 @@ else:
           if flags.isDisconnectionError(lastError):
             retFuture.complete("")
           else:
-            retFuture.fail(newException(OSError, osErrorMsg(lastError)))
+            retFuture.fail(newOSError(lastError))
         else:
           result = false # We still want this callback to be called.
       elif res == 0:
@@ -1474,7 +1497,7 @@ else:
           if flags.isDisconnectionError(lastError):
             retFuture.complete(0)
           else:
-            retFuture.fail(newException(OSError, osErrorMsg(lastError)))
+            retFuture.fail(newOSError(lastError))
         else:
           result = false # We still want this callback to be called.
       else:
@@ -1540,7 +1563,7 @@ else:
         let lastError = osLastError()
         if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
            lastError.int32 != EAGAIN:
-          retFuture.fail(newException(OSError, osErrorMsg(lastError)))
+          retFuture.fail(newOSError(lastError))
         else:
           result = false # We still want this callback to be called.
       else:
@@ -1566,7 +1589,7 @@ else:
         let lastError = osLastError()
         if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
            lastError.int32 != EAGAIN:
-          retFuture.fail(newException(OSError, osErrorMsg(lastError)))
+          retFuture.fail(newOSError(lastError))
         else:
           result = false
       else:
@@ -1579,7 +1602,7 @@ else:
       owned(Future[tuple[address: string, client: AsyncFD]]) =
     var retFuture = newFuture[tuple[address: string,
         client: AsyncFD]]("acceptAddr")
-    proc cb(sock: AsyncFD): bool =
+    proc cb(sock: AsyncFD): bool {.gcsafe.} =
       result = true
       var sockAddress: Sockaddr_storage
       var addrLen = sizeof(sockAddress).SockLen
@@ -1607,7 +1630,7 @@ else:
           if flags.isDisconnectionError(lastError):
             return false
           else:
-            retFuture.fail(newException(OSError, osErrorMsg(lastError)))
+            retFuture.fail(newOSError(lastError))
       else:
         try:
           let address = getAddrString(cast[ptr SockAddr](addr sockAddress))
@@ -1740,7 +1763,7 @@ when defined(windows) or defined(nimdoc):
             socket.SocketHandle.setSockOptInt(SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, 1) # 15022
             retFuture.complete()
           else:
-            retFuture.fail(newException(OSError, osErrorMsg(errcode)))
+            retFuture.fail(newOSError(errcode))
     )
 
     let ret = connectEx(socket.SocketHandle, addrInfo.ai_addr,
@@ -1758,7 +1781,7 @@ when defined(windows) or defined(nimdoc):
         # With ERROR_IO_PENDING `ol` will be deallocated in `poll`,
         # and the future will be completed/failed there, too.
         GC_unref(ol)
-        retFuture.fail(newException(OSError, osErrorMsg(lastError)))
+        retFuture.fail(newOSError(lastError))
 else:
   proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): owned(Future[void]) =
     let retFuture = newFuture[void]("doConnect")
@@ -1775,7 +1798,7 @@ else:
         # interrupted, keep waiting
         return false
       else:
-        retFuture.fail(newException(OSError, osErrorMsg(OSErrorCode(ret))))
+        retFuture.fail(newOSError(OSErrorCode(ret)))
         return true
 
     let ret = connect(socket.SocketHandle,
@@ -1789,7 +1812,7 @@ else:
       if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS:
         addWrite(socket, cb)
       else:
-        retFuture.fail(newException(OSError, osErrorMsg(lastError)))
+        retFuture.fail(newOSError(lastError))
 
 template asyncAddrInfoLoop(addrInfo: ptr AddrInfo, fd: untyped,
                            protocol: Protocol = IPPROTO_RAW) =
@@ -1971,7 +1994,8 @@ proc send*(socket: AsyncFD, data: string,
   return retFuture
 
 # -- Await Macro
-include asyncmacro
+import std/asyncmacro
+export asyncmacro
 
 proc readAll*(future: FutureStream[string]): owned(Future[string]) {.async.} =
   ## Returns a future that will complete when all the string data from the
@@ -2008,10 +2032,10 @@ proc activeDescriptors*(): int {.inline.} =
     result = getGlobalDispatcher().selector.count
 
 when defined(posix):
-  import posix
+  import std/posix
 
 when defined(linux) or defined(windows) or defined(macosx) or defined(bsd) or
-       defined(solaris) or defined(zephyr) or defined(freertos):
+       defined(solaris) or defined(zephyr) or defined(freertos) or defined(nuttx) or defined(haiku):
   proc maxDescriptors*(): int {.raises: OSError.} =
     ## Returns the maximum number of active file descriptors for the current
     ## process. This involves a system call. For now `maxDescriptors` is
@@ -2025,3 +2049,17 @@ when defined(linux) or defined(windows) or defined(macosx) or defined(bsd) or
       if getrlimit(RLIMIT_NOFILE, fdLim) < 0:
         raiseOSError(osLastError())
       result = int(fdLim.rlim_cur) - 1
+
+when defined(genode):
+  proc scheduleCallbacks*(): bool {.discardable.} =
+    ## *Genode only.*
+    ## Schedule callback processing and return immediately.
+    ## Returns `false` if there is nothing to schedule.
+    ## RPC servers should call this to dispatch `callSoon`
+    ## bodies after retiring an RPC to its client.
+    ## This is effectively a non-blocking `poll(…)` and is
+    ## equivalent to scheduling a momentary no-op timeout
+    ## but faster and with less overhead.
+    let dis = getGlobalDispatcher()
+    result = dis.callbacks.len > 0
+    if result: submit(dis.signalHandler.cap)