diff options
Diffstat (limited to 'lib/pure/asyncdispatch.nim')
-rw-r--r-- | lib/pure/asyncdispatch.nim | 1394 |
1 files changed, 902 insertions, 492 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index dfc7201b8..126db7a7f 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -7,41 +7,26 @@ # distribution, for details about the copyright. # -include "system/inclrtl" - -import os, tables, strutils, times, heapqueue, lists, options, asyncstreams -import asyncfutures except callSoon - -import nativesockets, net, deques - -export Port, SocketFlag -export asyncfutures, asyncstreams - -#{.injectStmt: newGcInvariant().} - -## AsyncDispatch -## ************* -## ## This module implements asynchronous IO. This includes a dispatcher, -## a ``Future`` type implementation, and an ``async`` macro which allows -## asynchronous code to be written in a synchronous style with the ``await`` +## a `Future` type implementation, and an `async` macro which allows +## asynchronous code to be written in a synchronous style with the `await` ## keyword. ## -## The dispatcher acts as a kind of event loop. You must call ``poll`` on it -## (or a function which does so for you such as ``waitFor`` or ``runForever``) +## The dispatcher acts as a kind of event loop. You must call `poll` on it +## (or a function which does so for you such as `waitFor` or `runForever`) ## in order to poll for any outstanding events. The underlying implementation ## is based on epoll on Linux, IO Completion Ports on Windows and select on ## other operating systems. ## -## The ``poll`` function will not, on its own, return any events. Instead -## an appropriate ``Future`` object will be completed. A ``Future`` is a +## The `poll` function will not, on its own, return any events. Instead +## an appropriate `Future` object will be completed. A `Future` is a ## type which holds a value which is not yet available, but which *may* be ## available in the future. You can check whether a future is finished -## by using the ``finished`` function. When a future is finished it means that +## by using the `finished` function. When a future is finished it means that ## either the value that it holds is now available or it holds an error instead. ## The latter situation occurs when the operation to complete a future fails ## with an exception. You can distinguish between the two situations with the -## ``failed`` function. +## `failed` function. ## ## Future objects can also store a callback procedure which will be called ## automatically once the future completes. @@ -50,50 +35,50 @@ export asyncfutures, asyncstreams ## pattern. In this ## pattern you make a request for an action, and once that action is fulfilled ## a future is completed with the result of that action. Requests can be -## made by calling the appropriate functions. For example: calling the ``recv`` +## made by calling the appropriate functions. For example: calling the `recv` ## function will create a request for some data to be read from a socket. The -## future which the ``recv`` function returns will then complete once the +## future which the `recv` function returns will then complete once the ## requested amount of data is read **or** an exception occurs. ## ## Code to read some data from a socket may look something like this: +## ```Nim +## var future = socket.recv(100) +## future.addCallback( +## proc () = +## echo(future.read) +## ) +## ``` ## -## .. code-block::nim -## var future = socket.recv(100) -## future.addCallback( -## proc () = -## echo(future.read) -## ) -## -## All asynchronous functions returning a ``Future`` will not block. They +## All asynchronous functions returning a `Future` will not block. They ## will not however return immediately. An asynchronous function will have ## code which will be executed before an asynchronous request is made, in most ## cases this code sets up the request. ## -## In the above example, the ``recv`` function will return a brand new -## ``Future`` instance once the request for data to be read from the socket -## is made. This ``Future`` instance will complete once the requested amount +## In the above example, the `recv` function will return a brand new +## `Future` instance once the request for data to be read from the socket +## is made. This `Future` instance will complete once the requested amount ## of data is read, in this case it is 100 bytes. The second line sets a ## callback on this future which will be called once the future completes. -## All the callback does is write the data stored in the future to ``stdout``. -## The ``read`` function is used for this and it checks whether the future -## completes with an error for you (if it did it will simply raise the -## error), if there is no error however it returns the value of the future. +## All the callback does is write the data stored in the future to `stdout`. +## The `read` function is used for this and it checks whether the future +## completes with an error for you (if it did, it will simply raise the +## error), if there is no error, however, it returns the value of the future. ## ## Asynchronous procedures -## ----------------------- +## ======================= ## ## Asynchronous procedures remove the pain of working with callbacks. They do ## this by allowing you to write asynchronous code the same way as you would ## write synchronous code. ## -## An asynchronous procedure is marked using the ``{.async.}`` pragma. -## When marking a procedure with the ``{.async.}`` pragma it must have a -## ``Future[T]`` return type or no return type at all. If you do not specify -## a return type then ``Future[void]`` is assumed. +## An asynchronous procedure is marked using the `{.async.}` pragma. +## When marking a procedure with the `{.async.}` pragma it must have a +## `Future[T]` return type or no return type at all. If you do not specify +## a return type then `Future[void]` is assumed. ## -## Inside asynchronous procedures ``await`` can be used to call any +## Inside asynchronous procedures `await` can be used to call any ## procedures which return a -## ``Future``; this includes asynchronous procedures. When a procedure is +## `Future`; this includes asynchronous procedures. When a procedure is ## "awaited", the asynchronous procedure it is awaited in will ## suspend its execution ## until the awaited procedure's Future completes. At which point the @@ -101,126 +86,242 @@ export asyncfutures, asyncstreams ## when an asynchronous procedure is suspended other asynchronous procedures ## will be run by the dispatcher. ## -## The ``await`` call may be used in many contexts. It can be used on the right -## hand side of a variable declaration: ``var data = await socket.recv(100)``, +## The `await` call may be used in many contexts. It can be used on the right +## hand side of a variable declaration: `var data = await socket.recv(100)`, ## in which case the variable will be set to the value of the future -## automatically. It can be used to await a ``Future`` object, and it can -## be used to await a procedure returning a ``Future[void]``: -## ``await socket.send("foobar")``. +## automatically. It can be used to await a `Future` object, and it can +## be used to await a procedure returning a `Future[void]`: +## `await socket.send("foobar")`. ## -## If an awaited future completes with an error, then ``await`` will re-raise -## this error. To avoid this, you can use the ``yield`` keyword instead of -## ``await``. The following section shows different ways that you can handle +## If an awaited future completes with an error, then `await` will re-raise +## this error. To avoid this, you can use the `yield` keyword instead of +## `await`. The following section shows different ways that you can handle ## exceptions in async procs. ## -## Handling Exceptions -## ~~~~~~~~~~~~~~~~~~~ +## .. caution:: +## Procedures marked {.async.} do not support mutable parameters such +## as `var int`. References such as `ref int` should be used instead. ## -## The most reliable way to handle exceptions is to use ``yield`` on a future -## then check the future's ``failed`` property. For example: +## Handling Exceptions +## ------------------- ## -## .. code-block:: Nim -## var future = sock.recv(100) -## yield future -## if future.failed: -## # Handle exception +## You can handle exceptions in the same way as in ordinary Nim code; +## by using the try statement: ## -## The ``async`` procedures also offer limited support for the try statement. +## ```Nim +## try: +## let data = await sock.recv(100) +## echo("Received ", data) +## except: +## # Handle exception +## ``` ## -## .. code-block:: Nim -## try: -## let data = await sock.recv(100) -## echo("Received ", data) -## except: -## # Handle exception +## An alternative approach to handling exceptions is to use `yield` on a future +## then check the future's `failed` property. For example: ## -## Unfortunately the semantics of the try statement may not always be correct, -## and occasionally the compilation may fail altogether. -## As such it is better to use the former style when possible. +## ```Nim +## var future = sock.recv(100) +## yield future +## if future.failed: +## # Handle exception +## ``` ## ## ## Discarding futures -## ------------------ +## ================== +## +## Futures should **never** be discarded directly because they may contain +## errors. If you do not care for the result of a Future then you should use +## the `asyncCheck` procedure instead of the `discard` keyword. Note that this +## does not wait for completion, and you should use `waitFor` or `await` for that purpose. +## +## .. note:: `await` also checks if the future fails, so you can safely discard +## its result. +## +## Handling futures +## ================ +## +## There are many different operations that apply to a future. +## The three primary high-level operations are `asyncCheck`, +## `waitFor`, and `await`. ## -## Futures should **never** be discarded. This is because they may contain -## errors. If you do not care for the result of a Future then you should -## use the ``asyncCheck`` procedure instead of the ``discard`` keyword. +## * `asyncCheck`: Raises an exception if the future fails. It neither waits +## for the future to finish nor returns the result of the future. +## * `waitFor`: Polls the event loop and blocks the current thread until the +## future finishes. This is often used to call an async procedure from a +## synchronous context and should never be used in an `async` proc. +## * `await`: Pauses execution in the current async procedure until the future +## finishes. While the current procedure is paused, other async procedures will +## continue running. Should be used instead of `waitFor` in an async +## procedure. +## +## Here is a handy quick reference chart showing their high-level differences: +## ============== ===================== ======================= +## Procedure Context Blocking +## ============== ===================== ======================= +## `asyncCheck` non-async and async non-blocking +## `waitFor` non-async blocks current thread +## `await` async suspends current proc +## ============== ===================== ======================= ## ## Examples -## -------- +## ======== ## ## For examples take a look at the documentation for the modules implementing ## asynchronous IO. A good place to start is the ## `asyncnet module <asyncnet.html>`_. ## +## Investigating pending futures +## ============================= +## +## It's possible to get into a situation where an async proc, or more accurately +## a `Future[T]` gets stuck and +## never completes. This can happen for various reasons and can cause serious +## memory leaks. When this occurs it's hard to identify the procedure that is +## stuck. +## +## Thankfully there is a mechanism which tracks the count of each pending future. +## All you need to do to enable it is compile with `-d:futureLogging` and +## use the `getFuturesInProgress` procedure to get the list of pending futures +## together with the stack traces to the moment of their creation. +## +## You may also find it useful to use this +## `prometheus package <https://github.com/dom96/prometheus>`_ which will log +## the pending futures into prometheus, allowing you to analyse them via a nice +## graph. +## +## +## ## Limitations/Bugs -## ---------------- +## ================ +## +## * The effect system (`raises: []`) does not work with async procedures. +## * Mutable parameters are not supported by async procedures. +## +## +## Multiple async backend support +## ============================== +## +## Thanks to its powerful macro support, Nim allows ``async``/``await`` to be +## implemented in libraries with only minimal support from the language - as +## such, multiple ``async`` libraries exist, including ``asyncdispatch`` and +## ``chronos``, and more may come to be developed in the future. +## +## Libraries built on top of async/await may wish to support multiple async +## backends - the best way to do so is to create separate modules for each backend +## that may be imported side-by-side. +## +## An alternative way is to select backend using a global compile flag - this +## method makes it difficult to compose applications that use both backends as may +## happen with transitive dependencies, but may be appropriate in some cases - +## libraries choosing this path should call the flag `asyncBackend`, allowing +## applications to choose the backend with `-d:asyncBackend=<backend_name>`. ## -## * The effect system (``raises: []``) does not work with async procedures. -## * Can't await in a ``except`` body -## * Forward declarations for async procs are broken, -## link includes workaround: https://github.com/nim-lang/Nim/issues/3182. +## Known `async` backends include: +## +## * `-d:asyncBackend=none`: disable `async` support completely +## * `-d:asyncBackend=asyncdispatch`: https://nim-lang.org/docs/asyncdispatch.html +## * `-d:asyncBackend=chronos`: https://github.com/status-im/nim-chronos/ +## +## ``none`` can be used when a library supports both a synchronous and +## asynchronous API, to disable the latter. + +import std/[os, tables, strutils, times, heapqueue, options, asyncstreams] +import std/[math, monotimes] +import std/asyncfutures except callSoon + +import std/[nativesockets, net, deques] + +when defined(nimPreviewSlimSystem): + import std/[assertions, syncio] + +export Port, SocketFlag +export asyncfutures except callSoon +export asyncstreams # TODO: Check if yielded future is nil and throw a more meaningful exception type PDispatcherBase = ref object of RootRef - timers*: HeapQueue[tuple[finishAt: float, fut: Future[void]]] - callbacks*: Deque[proc ()] + timers*: HeapQueue[tuple[finishAt: MonoTime, fut: Future[void]]] + callbacks*: Deque[proc () {.gcsafe.}] -proc processTimers(p: PDispatcherBase; didSomeWork: var bool) {.inline.} = - #Process just part if timers at a step +proc processTimers( + p: PDispatcherBase, didSomeWork: var bool +): Option[int] {.inline.} = + # Pop the timers in the order in which they will expire (smaller `finishAt`). var count = p.timers.len - let t = epochTime() + let t = getMonoTime() while count > 0 and t >= p.timers[0].finishAt: p.timers.pop().fut.complete() dec count didSomeWork = true + # Return the number of milliseconds in which the next timer will expire. + if p.timers.len == 0: return + + let millisecs = (p.timers[0].finishAt - getMonoTime()).inMilliseconds + return some(millisecs.int + 1) + proc processPendingCallbacks(p: PDispatcherBase; didSomeWork: var bool) = while p.callbacks.len > 0: var cb = p.callbacks.popFirst() cb() didSomeWork = true -proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} = - # If dispatcher has active timers this proc returns the timeout - # of the nearest timer. Returns `timeout` otherwise. - result = timeout - if p.timers.len > 0: - let timerTimeout = p.timers[0].finishAt - let curTime = epochTime() - if timeout == -1 or (curTime + (timeout / 1000)) > timerTimeout: - result = int((timerTimeout - curTime) * 1000) - if result < 0: result = 0 +proc adjustTimeout( + p: PDispatcherBase, pollTimeout: int, nextTimer: Option[int] +): int {.inline.} = + if p.callbacks.len != 0: + return 0 -proc callSoon(cbproc: proc ()) {.gcsafe.} + if nextTimer.isNone() or pollTimeout == -1: + return pollTimeout + + result = max(nextTimer.get(), 0) + result = min(pollTimeout, result) + +proc runOnce(timeout: int): bool {.gcsafe.} + +proc callSoon*(cbproc: proc () {.gcsafe.}) {.gcsafe.} + ## Schedule `cbproc` to be called as soon as possible. + ## The callback is called when control returns to the event loop. proc initCallSoonProc = if asyncfutures.getCallSoonProc().isNil: asyncfutures.setCallSoonProc(callSoon) +template implementSetInheritable() {.dirty.} = + when declared(setInheritable): + proc setInheritable*(fd: AsyncFD, inheritable: bool): bool = + ## Control whether a file handle can be inherited by child processes. + ## Returns `true` on success. + ## + ## This procedure is not guaranteed to be available for all platforms. + ## Test for availability with `declared() <system.html#declared,untyped>`_. + fd.FileHandle.setInheritable(inheritable) + when defined(windows) or defined(nimdoc): - import winlean, sets, hashes + import std/[winlean, sets, hashes] type CompletionKey = ULONG_PTR CompletionData* = object - fd*: AsyncFD # TODO: Rename this. - cb*: proc (fd: AsyncFD, bytesTransferred: Dword, - errcode: OSErrorCode) {.closure,gcsafe.} + fd*: AsyncFD # TODO: Rename this. + cb*: owned(proc (fd: AsyncFD, bytesTransferred: DWORD, + errcode: OSErrorCode) {.closure, gcsafe.}) cell*: ForeignCell # we need this `cell` to protect our `cb` environment, # when using RegisterWaitForSingleObject, because # waiting is done in different thread. PDispatcher* = ref object of PDispatcherBase ioPort: Handle - handles: HashSet[AsyncFD] + handles*: HashSet[AsyncFD] # Export handles so that an external library can register them. - CustomOverlapped = object of OVERLAPPED + CustomObj = object of OVERLAPPED data*: CompletionData - PCustomOverlapped* = ref CustomOverlapped + CustomRef* = ref CustomObj AsyncFD* = distinct int @@ -228,7 +329,7 @@ when defined(windows) or defined(nimdoc): ioPort: Handle handleFd: AsyncFD waitFd: Handle - ovl: PCustomOverlapped + ovl: owned CustomRef PostCallbackDataPtr = ptr PostCallbackData AsyncEventImpl = object @@ -237,24 +338,22 @@ when defined(windows) or defined(nimdoc): pcd: PostCallbackDataPtr AsyncEvent* = ptr AsyncEventImpl - Callback = proc (fd: AsyncFD): bool {.closure,gcsafe.} - {.deprecated: [TCompletionKey: CompletionKey, TAsyncFD: AsyncFD, - TCustomOverlapped: CustomOverlapped, TCompletionData: CompletionData].} + Callback* = proc (fd: AsyncFD): bool {.closure, gcsafe.} proc hash(x: AsyncFD): Hash {.borrow.} proc `==`*(x: AsyncFD, y: AsyncFD): bool {.borrow.} - proc newDispatcher*(): PDispatcher = + proc newDispatcher*(): owned PDispatcher = ## Creates a new Dispatcher instance. new result result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1) - result.handles = initSet[AsyncFD]() - result.timers.newHeapQueue() - result.callbacks = initDeque[proc ()](64) + result.handles = initHashSet[AsyncFD]() + result.timers.clear() + result.callbacks = initDeque[proc () {.closure, gcsafe.}](64) - var gDisp{.threadvar.}: PDispatcher ## Global dispatcher + var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher - proc setGlobalDispatcher*(disp: PDispatcher) = + proc setGlobalDispatcher*(disp: sink PDispatcher) = if not gDisp.isNil: assert gDisp.callbacks.len == 0 gDisp = disp @@ -271,7 +370,7 @@ when defined(windows) or defined(nimdoc): return disp.ioPort proc register*(fd: AsyncFD) = - ## Registers ``fd`` with the dispatcher. + ## Registers `fd` with the dispatcher. let p = getGlobalDispatcher() if createIoCompletionPort(fd.Handle, p.ioPort, @@ -281,6 +380,7 @@ when defined(windows) or defined(nimdoc): proc verifyPresence(fd: AsyncFD) = ## Ensures that file descriptor has been registered with the dispatcher. + ## Raises ValueError if `fd` has not been registered. let p = getGlobalDispatcher() if fd notin p.handles: raise newException(ValueError, @@ -292,60 +392,66 @@ when defined(windows) or defined(nimdoc): let p = getGlobalDispatcher() p.handles.len != 0 or p.timers.len != 0 or p.callbacks.len != 0 - proc runOnce(timeout = 500): bool = + proc runOnce(timeout: int): bool = let p = getGlobalDispatcher() if p.handles.len == 0 and p.timers.len == 0 and p.callbacks.len == 0: raise newException(ValueError, "No handles or timers registered in dispatcher.") result = false - if p.handles.len != 0: - let at = p.adjustedTimeout(timeout) - var llTimeout = - if at == -1: winlean.INFINITE - else: at.int32 - - var lpNumberOfBytesTransferred: Dword - var lpCompletionKey: ULONG_PTR - var customOverlapped: PCustomOverlapped - let res = getQueuedCompletionStatus(p.ioPort, - addr lpNumberOfBytesTransferred, addr lpCompletionKey, - cast[ptr POVERLAPPED](addr customOverlapped), llTimeout).bool - result = true - - # http://stackoverflow.com/a/12277264/492186 - # TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html - if res: - # This is useful for ensuring the reliability of the overlapped struct. + let nextTimer = processTimers(p, result) + let at = adjustTimeout(p, timeout, nextTimer) + var llTimeout = + if at == -1: winlean.INFINITE + else: at.int32 + + var lpNumberOfBytesTransferred: DWORD + var lpCompletionKey: ULONG_PTR + var customOverlapped: CustomRef + let res = getQueuedCompletionStatus(p.ioPort, + addr lpNumberOfBytesTransferred, addr lpCompletionKey, + cast[ptr POVERLAPPED](addr customOverlapped), llTimeout).bool + result = true + # For 'gcDestructors' the destructor of 'customOverlapped' will + # be called at the end and we are the only owner here. This means + # We do not have to 'GC_unref(customOverlapped)' because the destructor + # does that for us. + + # http://stackoverflow.com/a/12277264/492186 + # TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html + if res: + # This is useful for ensuring the reliability of the overlapped struct. + assert customOverlapped.data.fd == lpCompletionKey.AsyncFD + + customOverlapped.data.cb(customOverlapped.data.fd, + lpNumberOfBytesTransferred, OSErrorCode(-1)) + + # If cell.data != nil, then system.protect(rawEnv(cb)) was called, + # so we need to dispose our `cb` environment, because it is not needed + # anymore. + if customOverlapped.data.cell.data != nil: + system.dispose(customOverlapped.data.cell) + + when not defined(gcDestructors): + GC_unref(customOverlapped) + else: + let errCode = osLastError() + if customOverlapped != nil: assert customOverlapped.data.fd == lpCompletionKey.AsyncFD - customOverlapped.data.cb(customOverlapped.data.fd, - lpNumberOfBytesTransferred, OSErrorCode(-1)) - - # If cell.data != nil, then system.protect(rawEnv(cb)) was called, - # so we need to dispose our `cb` environment, because it is not needed - # anymore. + lpNumberOfBytesTransferred, errCode) if customOverlapped.data.cell.data != nil: system.dispose(customOverlapped.data.cell) - - GC_unref(customOverlapped) - else: - let errCode = osLastError() - if customOverlapped != nil: - assert customOverlapped.data.fd == lpCompletionKey.AsyncFD - customOverlapped.data.cb(customOverlapped.data.fd, - lpNumberOfBytesTransferred, errCode) - if customOverlapped.data.cell.data != nil: - system.dispose(customOverlapped.data.cell) + when not defined(gcDestructors): GC_unref(customOverlapped) - else: - if errCode.int32 == WAIT_TIMEOUT: - # Timed out - result = false - else: raiseOSError(errCode) + else: + if errCode.int32 == WAIT_TIMEOUT: + # Timed out + result = false + else: raiseOSError(errCode) # Timer processing. - processTimers(p, result) + discard processTimers(p, result) # Callback queue processing processPendingCallbacks(p, result) @@ -356,14 +462,14 @@ when defined(windows) or defined(nimdoc): proc initPointer(s: SocketHandle, fun: var pointer, guid: var GUID): bool = # Ref: https://github.com/powdahound/twisted/blob/master/twisted/internet/iocpreactor/iocpsupport/winsock_pointers.c - var bytesRet: Dword + var bytesRet: DWORD fun = nil result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid, - sizeof(GUID).Dword, addr fun, sizeof(pointer).Dword, + sizeof(GUID).DWORD, addr fun, sizeof(pointer).DWORD, addr bytesRet, nil, nil) == 0 proc initAll() = - let dummySock = newNativeSocket() + let dummySock = createNativeSocket() if dummySock == INVALID_SOCKET: raiseOSError(osLastError()) var fun: pointer = nil @@ -378,18 +484,25 @@ when defined(windows) or defined(nimdoc): getAcceptExSockAddrs = cast[WSAPROC_GETACCEPTEXSOCKADDRS](fun) close(dummySock) + proc newCustom*(): CustomRef = + result = CustomRef() # 0 + GC_ref(result) # 1 prevent destructor from doing a premature free. + # destructor of newCustom's caller --> 0. This means + # Windows holds a ref for us with RC == 0 (single owner). + # This is passed back to us in the IO completion port. + proc recv*(socket: AsyncFD, size: int, - flags = {SocketFlag.SafeDisconn}): Future[string] = - ## Reads **up to** ``size`` bytes from ``socket``. Returned future will + flags = {SocketFlag.SafeDisconn}): owned(Future[string]) = + ## Reads **up to** `size` bytes from `socket`. Returned future will ## complete once all the data requested is read, a part of the data has been ## read, or the socket has disconnected in which case the future will - ## complete with a value of ``""``. + ## complete with a value of `""`. ## - ## **Warning**: The ``Peek`` socket flag is not supported on Windows. + ## .. warning:: The `Peek` socket flag is not supported on Windows. # Things to note: - # * When WSARecv completes immediately then ``bytesReceived`` is very + # * When WSARecv completes immediately then `bytesReceived` is very # unreliable. # * Still need to implement message-oriented socket disconnection, # '\0' in the message currently signifies a socket disconnect. Who @@ -402,12 +515,11 @@ when defined(windows) or defined(nimdoc): dataBuf.buf = cast[cstring](alloc0(size)) dataBuf.len = size.ULONG - var bytesReceived: Dword - var flagsio = flags.toOSFlags().Dword - var ol = PCustomOverlapped() - GC_ref(ol) + var bytesReceived: DWORD + var flagsio = flags.toOSFlags().DWORD + var ol = newCustom() ol.data = CompletionData(fd: socket, cb: - proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = if not retFuture.finished: if errcode == OSErrorCode(-1): if bytesCount == 0 and dataBuf.buf[0] == '\0': @@ -421,7 +533,7 @@ when defined(windows) or defined(nimdoc): if flags.isDisconnectionError(errcode): retFuture.complete("") else: - retFuture.fail(newException(OSError, osErrorMsg(errcode))) + retFuture.fail(newOSError(errcode)) if dataBuf.buf != nil: dealloc dataBuf.buf dataBuf.buf = nil @@ -439,7 +551,7 @@ when defined(windows) or defined(nimdoc): if flags.isDisconnectionError(err): retFuture.complete("") else: - retFuture.fail(newException(OSError, osErrorMsg(err))) + retFuture.fail(newOSError(err)) elif ret == 0: # Request completed immediately. if bytesReceived != 0: @@ -453,18 +565,18 @@ when defined(windows) or defined(nimdoc): return retFuture proc recvInto*(socket: AsyncFD, buf: pointer, size: int, - flags = {SocketFlag.SafeDisconn}): Future[int] = - ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``, which must + flags = {SocketFlag.SafeDisconn}): owned(Future[int]) = + ## Reads **up to** `size` bytes from `socket` into `buf`, which must ## at least be of that size. Returned future will complete once all the ## data requested is read, a part of the data has been read, or the socket ## has disconnected in which case the future will complete with a value of - ## ``0``. + ## `0`. ## - ## **Warning**: The ``Peek`` socket flag is not supported on Windows. + ## .. warning:: The `Peek` socket flag is not supported on Windows. # Things to note: - # * When WSARecv completes immediately then ``bytesReceived`` is very + # * When WSARecv completes immediately then `bytesReceived` is very # unreliable. # * Still need to implement message-oriented socket disconnection, # '\0' in the message currently signifies a socket disconnect. Who @@ -479,12 +591,11 @@ when defined(windows) or defined(nimdoc): dataBuf.buf = cast[cstring](buf) dataBuf.len = size.ULONG - var bytesReceived: Dword - var flagsio = flags.toOSFlags().Dword - var ol = PCustomOverlapped() - GC_ref(ol) + var bytesReceived: DWORD + var flagsio = flags.toOSFlags().DWORD + var ol = newCustom() ol.data = CompletionData(fd: socket, cb: - proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = if not retFuture.finished: if errcode == OSErrorCode(-1): retFuture.complete(bytesCount) @@ -492,7 +603,7 @@ when defined(windows) or defined(nimdoc): if flags.isDisconnectionError(errcode): retFuture.complete(0) else: - retFuture.fail(newException(OSError, osErrorMsg(errcode))) + retFuture.fail(newOSError(errcode)) if dataBuf.buf != nil: dataBuf.buf = nil ) @@ -508,7 +619,7 @@ when defined(windows) or defined(nimdoc): if flags.isDisconnectionError(err): retFuture.complete(0) else: - retFuture.fail(newException(OSError, osErrorMsg(err))) + retFuture.fail(newOSError(err)) elif ret == 0: # Request completed immediately. if bytesReceived != 0: @@ -520,12 +631,12 @@ when defined(windows) or defined(nimdoc): return retFuture proc send*(socket: AsyncFD, buf: pointer, size: int, - flags = {SocketFlag.SafeDisconn}): Future[void] = - ## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future + flags = {SocketFlag.SafeDisconn}): owned(Future[void]) = + ## Sends `size` bytes from `buf` to `socket`. The returned future ## will complete once all data has been sent. ## - ## **WARNING**: Use it with caution. If ``buf`` refers to GC'ed object, - ## you must use GC_ref/GC_unref calls to avoid early freeing of the buffer. + ## .. warning:: Use it with caution. If `buf` refers to GC'ed object, + ## you must use GC_ref/GC_unref calls to avoid early freeing of the buffer. verifyPresence(socket) var retFuture = newFuture[void]("send") @@ -533,11 +644,10 @@ when defined(windows) or defined(nimdoc): dataBuf.buf = cast[cstring](buf) dataBuf.len = size.ULONG - var bytesReceived, lowFlags: Dword - var ol = PCustomOverlapped() - GC_ref(ol) + var bytesReceived, lowFlags: DWORD + var ol = newCustom() ol.data = CompletionData(fd: socket, cb: - proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = if not retFuture.finished: if errcode == OSErrorCode(-1): retFuture.complete() @@ -545,7 +655,7 @@ when defined(windows) or defined(nimdoc): if flags.isDisconnectionError(errcode): retFuture.complete() else: - retFuture.fail(newException(OSError, osErrorMsg(errcode))) + retFuture.fail(newOSError(errcode)) ) let ret = WSASend(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived, @@ -557,27 +667,27 @@ when defined(windows) or defined(nimdoc): if flags.isDisconnectionError(err): retFuture.complete() else: - retFuture.fail(newException(OSError, osErrorMsg(err))) + retFuture.fail(newOSError(err)) else: retFuture.complete() - # We don't deallocate ``ol`` here because even though this completed + # We don't deallocate `ol` here because even though this completed # immediately poll will still be notified about its completion and it will - # free ``ol``. + # free `ol`. return retFuture proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr, - saddrLen: Socklen, - flags = {SocketFlag.SafeDisconn}): Future[void] = - ## Sends ``data`` to specified destination ``saddr``, using - ## socket ``socket``. The returned future will complete once all data + saddrLen: SockLen, + flags = {SocketFlag.SafeDisconn}): owned(Future[void]) = + ## Sends `data` to specified destination `saddr`, using + ## socket `socket`. The returned future will complete once all data ## has been sent. verifyPresence(socket) var retFuture = newFuture[void]("sendTo") var dataBuf: TWSABuf dataBuf.buf = cast[cstring](data) dataBuf.len = size.ULONG - var bytesSent = 0.Dword - var lowFlags = 0.Dword + var bytesSent = 0.DWORD + var lowFlags = 0.DWORD # we will preserve address in our stack var staddr: array[128, char] # SOCKADDR_STORAGE size is 128 bytes @@ -585,15 +695,14 @@ when defined(windows) or defined(nimdoc): zeroMem(addr(staddr[0]), 128) copyMem(addr(staddr[0]), saddr, saddrLen) - var ol = PCustomOverlapped() - GC_ref(ol) + var ol = newCustom() ol.data = CompletionData(fd: socket, cb: - proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = if not retFuture.finished: if errcode == OSErrorCode(-1): retFuture.complete() else: - retFuture.fail(newException(OSError, osErrorMsg(errcode))) + retFuture.fail(newOSError(errcode)) ) let ret = WSASendTo(socket.SocketHandle, addr dataBuf, 1, addr bytesSent, @@ -603,20 +712,20 @@ when defined(windows) or defined(nimdoc): let err = osLastError() if err.int32 != ERROR_IO_PENDING: GC_unref(ol) - retFuture.fail(newException(OSError, osErrorMsg(err))) + retFuture.fail(newOSError(err)) else: retFuture.complete() - # We don't deallocate ``ol`` here because even though this completed + # We don't deallocate `ol` here because even though this completed # immediately poll will still be notified about its completion and it will - # free ``ol``. + # free `ol`. return retFuture proc recvFromInto*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr, saddrLen: ptr SockLen, - flags = {SocketFlag.SafeDisconn}): Future[int] = - ## Receives a datagram data from ``socket`` into ``buf``, which must - ## be at least of size ``size``, address of datagram's sender will be - ## stored into ``saddr`` and ``saddrLen``. Returned future will complete + flags = {SocketFlag.SafeDisconn}): owned(Future[int]) = + ## Receives a datagram data from `socket` into `buf`, which must + ## be at least of size `size`, address of datagram's sender will be + ## stored into `saddr` and `saddrLen`. Returned future will complete ## once one datagram has been received, and will return size of packet ## received. verifyPresence(socket) @@ -624,13 +733,12 @@ when defined(windows) or defined(nimdoc): var dataBuf = TWSABuf(buf: cast[cstring](data), len: size.ULONG) - var bytesReceived = 0.Dword - var lowFlags = 0.Dword + var bytesReceived = 0.DWORD + var lowFlags = 0.DWORD - var ol = PCustomOverlapped() - GC_ref(ol) + var ol = newCustom() ol.data = CompletionData(fd: socket, cb: - proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = if not retFuture.finished: if errcode == OSErrorCode(-1): assert bytesCount <= size @@ -638,7 +746,7 @@ when defined(windows) or defined(nimdoc): else: # datagram sockets don't have disconnection, # so we can just raise an exception - retFuture.fail(newException(OSError, osErrorMsg(errcode))) + retFuture.fail(newOSError(errcode)) ) let res = WSARecvFrom(socket.SocketHandle, addr dataBuf, 1, @@ -649,7 +757,7 @@ when defined(windows) or defined(nimdoc): let err = osLastError() if err.int32 != ERROR_IO_PENDING: GC_unref(ol) - retFuture.fail(newException(OSError, osErrorMsg(err))) + retFuture.fail(newOSError(err)) else: # Request completed immediately. if bytesReceived != 0: @@ -660,8 +768,9 @@ when defined(windows) or defined(nimdoc): retFuture.complete(bytesReceived) return retFuture - proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}): - Future[tuple[address: string, client: AsyncFD]] = + proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}, + inheritable = defined(nimInheritHandles)): + owned(Future[tuple[address: string, client: AsyncFD]]) {.gcsafe.} = ## Accepts a new connection. Returns a future containing the client socket ## corresponding to that connection and the remote address of the client. ## The future will complete when the connection is successfully accepted. @@ -669,22 +778,25 @@ when defined(windows) or defined(nimdoc): ## The resulting client socket is automatically registered to the ## dispatcher. ## - ## The ``accept`` call may result in an error if the connecting socket - ## disconnects during the duration of the ``accept``. If the ``SafeDisconn`` + ## If `inheritable` is false (the default), the resulting client socket will + ## not be inheritable by child processes. + ## + ## The `accept` call may result in an error if the connecting socket + ## disconnects during the duration of the `accept`. If the `SafeDisconn` ## flag is specified then this error will not be raised and instead ## accept will be called again. verifyPresence(socket) var retFuture = newFuture[tuple[address: string, client: AsyncFD]]("acceptAddr") - var clientSock = newNativeSocket() + var clientSock = createNativeSocket(inheritable = inheritable) if clientSock == osInvalidSocket: raiseOSError(osLastError()) const lpOutputLen = 1024 var lpOutputBuf = newString(lpOutputLen) - var dwBytesReceived: Dword - let dwReceiveDataLength = 0.Dword # We don't want any data to be read. - let dwLocalAddressLength = Dword(sizeof(Sockaddr_in6) + 16) - let dwRemoteAddressLength = Dword(sizeof(Sockaddr_in6) + 16) + var dwBytesReceived: DWORD + let dwReceiveDataLength = 0.DWORD # We don't want any data to be read. + let dwLocalAddressLength = DWORD(sizeof(Sockaddr_in6) + 16) + let dwRemoteAddressLength = DWORD(sizeof(Sockaddr_in6) + 16) template failAccept(errcode) = if flags.isDisconnectionError(errcode): @@ -696,7 +808,7 @@ when defined(windows) or defined(nimdoc): else: retFuture.complete(newAcceptFut.read) else: - retFuture.fail(newException(OSError, osErrorMsg(errcode))) + retFuture.fail(newOSError(errcode)) template completeAccept() {.dirty.} = var listenSock = socket @@ -705,17 +817,17 @@ when defined(windows) or defined(nimdoc): sizeof(listenSock).SockLen) if setoptRet != 0: let errcode = osLastError() - discard clientSock.closeSocket() + discard clientSock.closesocket() failAccept(errcode) else: var localSockaddr, remoteSockaddr: ptr SockAddr var localLen, remoteLen: int32 - getAcceptExSockaddrs(addr lpOutputBuf[0], dwReceiveDataLength, + getAcceptExSockAddrs(addr lpOutputBuf[0], dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength, addr localSockaddr, addr localLen, addr remoteSockaddr, addr remoteLen) try: - let address = getAddrString(remoteSockAddr) + let address = getAddrString(remoteSockaddr) register(clientSock.AsyncFD) retFuture.complete((address: address, client: clientSock.AsyncFD)) except: @@ -723,10 +835,9 @@ when defined(windows) or defined(nimdoc): clientSock.close() retFuture.fail(getCurrentException()) - var ol = PCustomOverlapped() - GC_ref(ol) + var ol = newCustom() ol.data = CompletionData(fd: socket, cb: - proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) {.gcsafe.} = if not retFuture.finished: if errcode == OSErrorCode(-1): completeAccept() @@ -748,47 +859,48 @@ when defined(windows) or defined(nimdoc): GC_unref(ol) else: completeAccept() - # We don't deallocate ``ol`` here because even though this completed + # We don't deallocate `ol` here because even though this completed # immediately poll will still be notified about its completion and it will - # free ``ol``. + # free `ol`. return retFuture + implementSetInheritable() + proc closeSocket*(socket: AsyncFD) = ## Closes a socket and ensures that it is unregistered. socket.SocketHandle.close() getGlobalDispatcher().handles.excl(socket) proc unregister*(fd: AsyncFD) = - ## Unregisters ``fd``. + ## Unregisters `fd`. getGlobalDispatcher().handles.excl(fd) proc contains*(disp: PDispatcher, fd: AsyncFD): bool = return fd in disp.handles - {.push stackTrace:off.} + {.push stackTrace: off.} proc waitableCallback(param: pointer, - timerOrWaitFired: WINBOOL): void {.stdcall.} = + timerOrWaitFired: WINBOOL) {.stdcall.} = var p = cast[PostCallbackDataPtr](param) - discard postQueuedCompletionStatus(p.ioPort, timerOrWaitFired.Dword, + discard postQueuedCompletionStatus(p.ioPort, timerOrWaitFired.DWORD, ULONG_PTR(p.handleFd), cast[pointer](p.ovl)) {.pop.} - proc registerWaitableEvent(fd: AsyncFD, cb: Callback; mask: Dword) = + proc registerWaitableEvent(fd: AsyncFD, cb: Callback; mask: DWORD) = let p = getGlobalDispatcher() - var flags = (WT_EXECUTEINWAITTHREAD or WT_EXECUTEONLYONCE).Dword + var flags = (WT_EXECUTEINWAITTHREAD or WT_EXECUTEONLYONCE).DWORD var hEvent = wsaCreateEvent() if hEvent == 0: raiseOSError(osLastError()) var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData))) pcd.ioPort = p.ioPort pcd.handleFd = fd - var ol = PCustomOverlapped() - GC_ref(ol) + var ol = newCustom() ol.data = CompletionData(fd: fd, cb: - proc(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + proc(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) {.gcsafe.} = # we excluding our `fd` because cb(fd) can register own handler # for this `fd` p.handles.excl(fd) @@ -861,9 +973,9 @@ when defined(windows) or defined(nimdoc): proc addRead*(fd: AsyncFD, cb: Callback) = ## Start watching the file descriptor for read availability and then call - ## the callback ``cb``. + ## the callback `cb`. ## - ## This is not ``pure`` mechanism for Windows Completion Ports (IOCP), + ## This is not `pure` mechanism for Windows Completion Ports (IOCP), ## so if you can avoid it, please do it. Use `addRead` only if really ## need it (main usecase is adaptation of unix-like libraries to be ## asynchronous on Windows). @@ -872,16 +984,16 @@ when defined(windows) or defined(nimdoc): ## or asyncdispatch.accept(), because they are using IOCP, please use ## nativesockets.recv() and nativesockets.accept() instead. ## - ## Be sure your callback ``cb`` returns ``true``, if you want to remove - ## watch of `read` notifications, and ``false``, if you want to continue + ## Be sure your callback `cb` returns `true`, if you want to remove + ## watch of `read` notifications, and `false`, if you want to continue ## receiving notifications. registerWaitableEvent(fd, cb, FD_READ or FD_ACCEPT or FD_OOB or FD_CLOSE) proc addWrite*(fd: AsyncFD, cb: Callback) = ## Start watching the file descriptor for write availability and then call - ## the callback ``cb``. + ## the callback `cb`. ## - ## This is not ``pure`` mechanism for Windows Completion Ports (IOCP), + ## This is not `pure` mechanism for Windows Completion Ports (IOCP), ## so if you can avoid it, please do it. Use `addWrite` only if really ## need it (main usecase is adaptation of unix-like libraries to be ## asynchronous on Windows). @@ -890,8 +1002,8 @@ when defined(windows) or defined(nimdoc): ## or asyncdispatch.connect(), because they are using IOCP, please use ## nativesockets.send() and nativesockets.connect() instead. ## - ## Be sure your callback ``cb`` returns ``true``, if you want to remove - ## watch of `write` notifications, and ``false``, if you want to continue + ## Be sure your callback `cb` returns `true`, if you want to remove + ## watch of `write` notifications, and `false`, if you want to continue ## receiving notifications. registerWaitableEvent(fd, cb, FD_WRITE or FD_CONNECT or FD_CLOSE) @@ -900,8 +1012,7 @@ when defined(windows) or defined(nimdoc): let handleFD = AsyncFD(hEvent) pcd.ioPort = p.ioPort pcd.handleFd = handleFD - var ol = PCustomOverlapped() - GC_ref(ol) + var ol = newCustom() ol.data.fd = handleFD ol.data.cb = handleCallback # We need to protect our callback environment value, so GC will not free it @@ -911,7 +1022,7 @@ when defined(windows) or defined(nimdoc): pcd.ovl = ol if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent, cast[WAITORTIMERCALLBACK](waitableCallback), - cast[pointer](pcd), timeout.Dword, flags): + cast[pointer](pcd), timeout.DWORD, flags): let err = osLastError() GC_unref(ol) deallocShared(cast[pointer](pcd)) @@ -924,20 +1035,20 @@ when defined(windows) or defined(nimdoc): deallocShared(cast[pointer](pcd)) p.handles.excl(fd) if unregisterWait(waitFd) == 0: - let err = osLastError() - if err.int32 != ERROR_IO_PENDING: - discard closeHandle(handle) - raiseOSError(err) + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + discard closeHandle(handle) + raiseOSError(err) if closeHandle(handle) == 0: raiseOSError(osLastError()) proc addTimer*(timeout: int, oneshot: bool, cb: Callback) = - ## Registers callback ``cb`` to be called when timer expired. + ## Registers callback `cb` to be called when timer expired. ## ## Parameters: ## - ## * ``timeout`` - timeout value in milliseconds. - ## * ``oneshot`` + ## * `timeout` - timeout value in milliseconds. + ## * `oneshot` ## * `true` - generate only one timeout event ## * `false` - generate timeout events periodically @@ -949,10 +1060,10 @@ when defined(windows) or defined(nimdoc): raiseOSError(osLastError()) var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData))) - var flags = WT_EXECUTEINWAITTHREAD.Dword + var flags = WT_EXECUTEINWAITTHREAD.DWORD if oneshot: flags = flags or WT_EXECUTEONLYONCE - proc timercb(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + proc timercb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = let res = cb(fd) if res or oneshot: closeWaitable(hEvent) @@ -966,28 +1077,29 @@ when defined(windows) or defined(nimdoc): registerWaitableHandle(p, hEvent, flags, pcd, timeout, timercb) proc addProcess*(pid: int, cb: Callback) = - ## Registers callback ``cb`` to be called when process with process ID - ## ``pid`` exited. + ## Registers callback `cb` to be called when process with process ID + ## `pid` exited. + const NULL = Handle(0) let p = getGlobalDispatcher() let procFlags = SYNCHRONIZE - var hProcess = openProcess(procFlags, 0, pid.Dword) - if hProcess == INVALID_HANDLE_VALUE: + var hProcess = openProcess(procFlags, 0, pid.DWORD) + if hProcess == NULL: raiseOSError(osLastError()) var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData))) - var flags = WT_EXECUTEINWAITTHREAD.Dword + var flags = WT_EXECUTEINWAITTHREAD.DWORD or WT_EXECUTEONLYONCE.DWORD - proc proccb(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + proc proccb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = closeWaitable(hProcess) discard cb(fd) registerWaitableHandle(p, hProcess, flags, pcd, INFINITE, proccb) proc newAsyncEvent*(): AsyncEvent = - ## Creates a new thread-safe ``AsyncEvent`` object. + ## Creates a new thread-safe `AsyncEvent` object. ## - ## New ``AsyncEvent`` object is not automatically registered with - ## dispatcher like ``AsyncSocket``. + ## New `AsyncEvent` object is not automatically registered with + ## dispatcher like `AsyncSocket`. var sa = SECURITY_ATTRIBUTES( nLength: sizeof(SECURITY_ATTRIBUTES).cint, bInheritHandle: 1 @@ -999,12 +1111,12 @@ when defined(windows) or defined(nimdoc): result.hEvent = event proc trigger*(ev: AsyncEvent) = - ## Set event ``ev`` to signaled state. + ## Set event `ev` to signaled state. if setEvent(ev.hEvent) == 0: raiseOSError(osLastError()) proc unregister*(ev: AsyncEvent) = - ## Unregisters event ``ev``. + ## Unregisters event `ev`. doAssert(ev.hWaiter != 0, "Event is not registered in the queue!") let p = getGlobalDispatcher() p.handles.excl(AsyncFD(ev.hEvent)) @@ -1015,23 +1127,23 @@ when defined(windows) or defined(nimdoc): ev.hWaiter = 0 proc close*(ev: AsyncEvent) = - ## Closes event ``ev``. + ## Closes event `ev`. let res = closeHandle(ev.hEvent) deallocShared(cast[pointer](ev)) if res == 0: raiseOSError(osLastError()) proc addEvent*(ev: AsyncEvent, cb: Callback) = - ## Registers callback ``cb`` to be called when ``ev`` will be signaled + ## Registers callback `cb` to be called when `ev` will be signaled doAssert(ev.hWaiter == 0, "Event is already registered in the queue!") let p = getGlobalDispatcher() let hEvent = ev.hEvent var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData))) - var flags = WT_EXECUTEINWAITTHREAD.Dword + var flags = WT_EXECUTEINWAITTHREAD.DWORD - proc eventcb(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + proc eventcb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = if ev.hWaiter != 0: if cb(fd): # we need this check to avoid exception, if `unregister(event)` was @@ -1054,9 +1166,15 @@ when defined(windows) or defined(nimdoc): initAll() else: - import selectors - from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK, + import std/selectors + from std/posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK, MSG_NOSIGNAL + when declared(posix.accept4): + from std/posix import accept4, SOCK_CLOEXEC + when defined(genode): + import genode/env # get the implicit Genode env + import genode/signals + const InitCallbackListSize = 4 # initial size of callbacks sequence, # associated with file/socket descriptor. @@ -1064,7 +1182,7 @@ else: # queue. type AsyncFD* = distinct cint - Callback = proc (fd: AsyncFD): bool {.closure,gcsafe.} + Callback* = proc (fd: AsyncFD): bool {.closure, gcsafe.} AsyncData = object readList: seq[Callback] @@ -1074,7 +1192,8 @@ else: PDispatcher* = ref object of PDispatcherBase selector: Selector[AsyncData] - {.deprecated: [TAsyncFD: AsyncFD, TCallback: Callback].} + when defined(genode): + signalHandler: SignalHandler proc `==`*(x, y: AsyncFD): bool {.borrow.} proc `==`*(x, y: AsyncEvent): bool {.borrow.} @@ -1085,15 +1204,28 @@ else: writeList: newSeqOfCap[Callback](InitCallbackListSize) ) - proc newDispatcher*(): PDispatcher = + proc newDispatcher*(): owned(PDispatcher) = new result result.selector = newSelector[AsyncData]() - result.timers.newHeapQueue() - result.callbacks = initDeque[proc ()](InitDelayedCallbackListSize) + result.timers.clear() + result.callbacks = initDeque[proc () {.closure, gcsafe.}](InitDelayedCallbackListSize) + when defined(genode): + let entrypoint = ep(cast[GenodeEnv](runtimeEnv)) + result.signalHandler = newSignalHandler(entrypoint): + discard runOnce(0) + + var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher - var gDisp{.threadvar.}: PDispatcher ## Global dispatcher + when defined(nuttx): + import std/exitprocs - proc setGlobalDispatcher*(disp: PDispatcher) = + proc cleanDispatcher() {.noconv.} = + gDisp = nil + + proc addFinalyzer() = + addExitProc(cleanDispatcher) + + proc setGlobalDispatcher*(disp: owned PDispatcher) = if not gDisp.isNil: assert gDisp.callbacks.len == 0 gDisp = disp @@ -1102,6 +1234,8 @@ else: proc getGlobalDispatcher*(): PDispatcher = if gDisp.isNil: setGlobalDispatcher(newDispatcher()) + when defined(nuttx): + addFinalyzer() result = gDisp proc getIoHandler*(disp: PDispatcher): Selector[AsyncData] = @@ -1112,18 +1246,13 @@ else: var data = newAsyncData() p.selector.registerHandle(fd.SocketHandle, {}, data) - proc closeSocket*(sock: AsyncFD) = - let disp = getGlobalDispatcher() - disp.selector.unregister(sock.SocketHandle) - sock.SocketHandle.close() - proc unregister*(fd: AsyncFD) = getGlobalDispatcher().selector.unregister(fd.SocketHandle) proc unregister*(ev: AsyncEvent) = getGlobalDispatcher().selector.unregister(SelectEvent(ev)) - - proc contains*(disp: PDispatcher, fd: AsyncFd): bool = + + proc contains*(disp: PDispatcher, fd: AsyncFD): bool = return fd.SocketHandle in disp.selector proc addRead*(fd: AsyncFD, cb: Callback) = @@ -1152,7 +1281,15 @@ else: let p = getGlobalDispatcher() not p.selector.isEmpty() or p.timers.len != 0 or p.callbacks.len != 0 - template processBasicCallbacks(ident, rwlist: untyped) = + proc prependSeq(dest: var seq[Callback]; src: sink seq[Callback]) = + var old = move dest + dest = src + for i in 0..high(old): + dest.add(move old[i]) + + proc processBasicCallbacks( + fd: AsyncFD, event: Event + ): tuple[readCbListCount, writeCbListCount: int] = # Process pending descriptor and AsyncEvent callbacks. # # Invoke every callback stored in `rwlist`, until one @@ -1165,119 +1302,157 @@ else: # or it can be possible to fall into endless cycle. var curList: seq[Callback] - withData(p.selector, ident, adata) do: - shallowCopy(curList, adata.rwlist) - adata.rwlist = newSeqOfCap[Callback](InitCallbackListSize) + let selector = getGlobalDispatcher().selector + withData(selector, fd.int, fdData): + case event + of Event.Read: + #shallowCopy(curList, fdData.readList) + curList = move fdData.readList + fdData.readList = newSeqOfCap[Callback](InitCallbackListSize) + of Event.Write: + #shallowCopy(curList, fdData.writeList) + curList = move fdData.writeList + fdData.writeList = newSeqOfCap[Callback](InitCallbackListSize) + else: + assert false, "Cannot process callbacks for " & $event let newLength = max(len(curList), InitCallbackListSize) var newList = newSeqOfCap[Callback](newLength) + var eventsExtinguished = false for cb in curList: - if len(newList) > 0: - # A callback has already returned with EAGAIN, don't call any others - # until next `poll`. + if eventsExtinguished: newList.add(cb) + elif not cb(fd): + # Callback wants to be called again. + newList.add(cb) + # This callback has returned with EAGAIN, so we don't need to + # call any other callbacks as they are all waiting for the same event + # on the same fd. + # We do need to ensure they are called again though. + eventsExtinguished = true + + withData(selector, fd.int, fdData) do: + # Descriptor is still present in the queue. + case event + of Event.Read: prependSeq(fdData.readList, newList) + of Event.Write: prependSeq(fdData.writeList, newList) else: - if not cb(fd.AsyncFD): - # Callback wants to be called again. - newList.add(cb) + assert false, "Cannot process callbacks for " & $event - withData(p.selector, ident, adata) do: - # descriptor still present in queue. - adata.rwlist = newList & adata.rwlist - rLength = len(adata.readList) - wLength = len(adata.writeList) + result.readCbListCount = len(fdData.readList) + result.writeCbListCount = len(fdData.writeList) do: - # descriptor was unregistered in callback via `unregister()`. - rLength = -1 - wLength = -1 + # Descriptor was unregistered in callback via `unregister()`. + result.readCbListCount = -1 + result.writeCbListCount = -1 - template processCustomCallbacks(ident: untyped) = + proc processCustomCallbacks(p: PDispatcher; fd: AsyncFD) = # Process pending custom event callbacks. Custom events are # {Event.Timer, Event.Signal, Event.Process, Event.Vnode}. # There can be only one callback registered with one descriptor, # so there is no need to iterate over list. var curList: seq[Callback] - withData(p.selector, ident, adata) do: - shallowCopy(curList, adata.readList) + withData(p.selector, fd.int, adata) do: + curList = move adata.readList adata.readList = newSeqOfCap[Callback](InitCallbackListSize) let newLength = len(curList) var newList = newSeqOfCap[Callback](newLength) var cb = curList[0] - if not cb(fd.AsyncFD): + if not cb(fd): newList.add(cb) - withData(p.selector, ident, adata) do: + withData(p.selector, fd.int, adata) do: # descriptor still present in queue. adata.readList = newList & adata.readList if len(adata.readList) == 0: # if no callbacks registered with descriptor, unregister it. - p.selector.unregister(fd) + p.selector.unregister(fd.int) do: # descriptor was unregistered in callback via `unregister()`. discard - proc runOnce(timeout = 500): bool = - let p = getGlobalDispatcher() - when ioselSupportedPlatform: - let customSet = {Event.Timer, Event.Signal, Event.Process, - Event.Vnode} + implementSetInheritable() + proc closeSocket*(sock: AsyncFD) = + let selector = getGlobalDispatcher().selector + if sock.SocketHandle notin selector: + raise newException(ValueError, "File descriptor not registered.") + + let data = selector.getData(sock.SocketHandle) + sock.unregister() + sock.SocketHandle.close() + # We need to unblock the read and write callbacks which could still be + # waiting for the socket to become readable and/or writeable. + for cb in data.readList & data.writeList: + if not cb(sock): + raise newException( + ValueError, "Expecting async operations to stop when fd has closed." + ) + + proc runOnce(timeout: int): bool = + let p = getGlobalDispatcher() if p.selector.isEmpty() and p.timers.len == 0 and p.callbacks.len == 0: + when defined(genode): + if timeout == 0: return raise newException(ValueError, "No handles or timers registered in dispatcher.") result = false - if not p.selector.isEmpty(): - var keys: array[64, ReadyKey] - var count = p.selector.selectInto(p.adjustedTimeout(timeout), keys) - for i in 0..<count: - var custom = false - let fd = keys[i].fd - let events = keys[i].events - var rLength = 0 # len(data.readList) after callback - var wLength = 0 # len(data.writeList) after callback - - if Event.Read in events or events == {Event.Error}: - processBasicCallbacks(fd, readList) + var keys: array[64, ReadyKey] + let nextTimer = processTimers(p, result) + var count = + p.selector.selectInto(adjustTimeout(p, timeout, nextTimer), keys) + for i in 0..<count: + let fd = keys[i].fd.AsyncFD + let events = keys[i].events + var (readCbListCount, writeCbListCount) = (0, 0) + + if Event.Read in events or events == {Event.Error}: + (readCbListCount, writeCbListCount) = + processBasicCallbacks(fd, Event.Read) + result = true + + if Event.Write in events or events == {Event.Error}: + (readCbListCount, writeCbListCount) = + processBasicCallbacks(fd, Event.Write) + result = true + + var isCustomEvent = false + if Event.User in events: + (readCbListCount, writeCbListCount) = + processBasicCallbacks(fd, Event.Read) + isCustomEvent = true + if readCbListCount == 0: + p.selector.unregister(fd.int) + result = true + + when ioselSupportedPlatform: + const customSet = {Event.Timer, Event.Signal, Event.Process, + Event.Vnode} + if (customSet * events) != {}: + isCustomEvent = true + processCustomCallbacks(p, fd) result = true - if Event.Write in events or events == {Event.Error}: - processBasicCallbacks(fd, writeList) - result = true - - if Event.User in events: - processBasicCallbacks(fd, readList) - custom = true - if rLength == 0: - p.selector.unregister(fd) - result = true - - when ioselSupportedPlatform: - if (customSet * events) != {}: - custom = true - processCustomCallbacks(fd) - result = true - - # because state `data` can be modified in callback we need to update - # descriptor events with currently registered callbacks. - if not custom: - var newEvents: set[Event] = {} - if rLength != -1 and wLength != -1: - if rLength > 0: incl(newEvents, Event.Read) - if wLength > 0: incl(newEvents, Event.Write) - p.selector.updateHandle(SocketHandle(fd), newEvents) + # because state `data` can be modified in callback we need to update + # descriptor events with currently registered callbacks. + if not isCustomEvent and (readCbListCount != -1 and writeCbListCount != -1): + var newEvents: set[Event] = {} + if readCbListCount > 0: incl(newEvents, Event.Read) + if writeCbListCount > 0: incl(newEvents, Event.Write) + p.selector.updateHandle(SocketHandle(fd), newEvents) # Timer processing. - processTimers(p, result) + discard processTimers(p, result) # Callback queue processing processPendingCallbacks(p, result) proc recv*(socket: AsyncFD, size: int, - flags = {SocketFlag.SafeDisconn}): Future[string] = + flags = {SocketFlag.SafeDisconn}): owned(Future[string]) = var retFuture = newFuture[string]("recv") var readBuffer = newString(size) @@ -1288,11 +1463,12 @@ else: flags.toOSFlags()) if res < 0: let lastError = osLastError() - if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: + if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and + lastError.int32 != EAGAIN: if flags.isDisconnectionError(lastError): retFuture.complete("") else: - retFuture.fail(newException(OSError, osErrorMsg(lastError))) + retFuture.fail(newOSError(lastError)) else: result = false # We still want this callback to be called. elif res == 0: @@ -1307,7 +1483,7 @@ else: return retFuture proc recvInto*(socket: AsyncFD, buf: pointer, size: int, - flags = {SocketFlag.SafeDisconn}): Future[int] = + flags = {SocketFlag.SafeDisconn}): owned(Future[int]) = var retFuture = newFuture[int]("recvInto") proc cb(sock: AsyncFD): bool = @@ -1316,11 +1492,12 @@ else: flags.toOSFlags()) if res < 0: let lastError = osLastError() - if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: + if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and + lastError.int32 != EAGAIN: if flags.isDisconnectionError(lastError): retFuture.complete(0) else: - retFuture.fail(newException(OSError, osErrorMsg(lastError))) + retFuture.fail(newOSError(lastError)) else: result = false # We still want this callback to be called. else: @@ -1331,7 +1508,7 @@ else: return retFuture proc send*(socket: AsyncFD, buf: pointer, size: int, - flags = {SocketFlag.SafeDisconn}): Future[void] = + flags = {SocketFlag.SafeDisconn}): owned(Future[void]) = var retFuture = newFuture[void]("send") var written = 0 @@ -1344,11 +1521,13 @@ else: MSG_NOSIGNAL) if res < 0: let lastError = osLastError() - if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: + if lastError.int32 != EINTR and + lastError.int32 != EWOULDBLOCK and + lastError.int32 != EAGAIN: if flags.isDisconnectionError(lastError): retFuture.complete() else: - retFuture.fail(newException(OSError, osErrorMsg(lastError))) + retFuture.fail(newOSError(lastError)) else: result = false # We still want this callback to be called. else: @@ -1364,9 +1543,9 @@ else: proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr, saddrLen: SockLen, - flags = {SocketFlag.SafeDisconn}): Future[void] = - ## Sends ``data`` of size ``size`` in bytes to specified destination - ## (``saddr`` of size ``saddrLen`` in bytes, using socket ``socket``. + flags = {SocketFlag.SafeDisconn}): owned(Future[void]) = + ## Sends `data` of size `size` in bytes to specified destination + ## (`saddr` of size `saddrLen` in bytes, using socket `socket`. ## The returned future will complete once all data has been sent. var retFuture = newFuture[void]("sendTo") @@ -1382,8 +1561,9 @@ else: cast[ptr SockAddr](addr(staddr[0])), stalen) if res < 0: let lastError = osLastError() - if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: - retFuture.fail(newException(OSError, osErrorMsg(lastError))) + if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and + lastError.int32 != EAGAIN: + retFuture.fail(newOSError(lastError)) else: result = false # We still want this callback to be called. else: @@ -1394,10 +1574,10 @@ else: proc recvFromInto*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr, saddrLen: ptr SockLen, - flags = {SocketFlag.SafeDisconn}): Future[int] = - ## Receives a datagram data from ``socket`` into ``data``, which must - ## be at least of size ``size`` in bytes, address of datagram's sender - ## will be stored into ``saddr`` and ``saddrLen``. Returned future will + flags = {SocketFlag.SafeDisconn}): owned(Future[int]) = + ## Receives a datagram data from `socket` into `data`, which must + ## be at least of size `size` in bytes, address of datagram's sender + ## will be stored into `saddr` and `saddrLen`. Returned future will ## complete once one datagram has been received, and will return size ## of packet received. var retFuture = newFuture[int]("recvFromInto") @@ -1407,8 +1587,9 @@ else: saddr, saddrLen) if res < 0: let lastError = osLastError() - if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: - retFuture.fail(newException(OSError, osErrorMsg(lastError))) + if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and + lastError.int32 != EAGAIN: + retFuture.fail(newOSError(lastError)) else: result = false else: @@ -1416,26 +1597,40 @@ else: addRead(socket, cb) return retFuture - proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}): - Future[tuple[address: string, client: AsyncFD]] = + proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}, + inheritable = defined(nimInheritHandles)): + owned(Future[tuple[address: string, client: AsyncFD]]) = var retFuture = newFuture[tuple[address: string, client: AsyncFD]]("acceptAddr") - proc cb(sock: AsyncFD): bool = + proc cb(sock: AsyncFD): bool {.gcsafe.} = result = true var sockAddress: Sockaddr_storage - var addrLen = sizeof(sockAddress).Socklen - var client = accept(sock.SocketHandle, - cast[ptr SockAddr](addr(sockAddress)), addr(addrLen)) + var addrLen = sizeof(sockAddress).SockLen + var client = + when declared(accept4): + accept4(sock.SocketHandle, cast[ptr SockAddr](addr(sockAddress)), + addr(addrLen), if inheritable: 0 else: SOCK_CLOEXEC) + else: + accept(sock.SocketHandle, cast[ptr SockAddr](addr(sockAddress)), + addr(addrLen)) + when declared(setInheritable) and not declared(accept4): + if client != osInvalidSocket and not setInheritable(client, inheritable): + # Set failure first because close() itself can fail, + # altering osLastError(). + retFuture.fail(newOSError(osLastError())) + close client + return false + if client == osInvalidSocket: let lastError = osLastError() - assert lastError.int32 notin {EWOULDBLOCK, EAGAIN} + assert lastError.int32 != EWOULDBLOCK and lastError.int32 != EAGAIN if lastError.int32 == EINTR: return false else: if flags.isDisconnectionError(lastError): return false else: - retFuture.fail(newException(OSError, osErrorMsg(lastError))) + retFuture.fail(newOSError(lastError)) else: try: let address = getAddrString(cast[ptr SockAddr](addr sockAddress)) @@ -1452,45 +1647,45 @@ else: proc addTimer*(timeout: int, oneshot: bool, cb: Callback) = ## Start watching for timeout expiration, and then call the - ## callback ``cb``. - ## ``timeout`` - time in milliseconds, - ## ``oneshot`` - if ``true`` only one event will be dispatched, - ## if ``false`` continuous events every ``timeout`` milliseconds. + ## callback `cb`. + ## `timeout` - time in milliseconds, + ## `oneshot` - if `true` only one event will be dispatched, + ## if `false` continuous events every `timeout` milliseconds. let p = getGlobalDispatcher() var data = newAsyncData() data.readList.add(cb) p.selector.registerTimer(timeout, oneshot, data) proc addSignal*(signal: int, cb: Callback) = - ## Start watching signal ``signal``, and when signal appears, call the - ## callback ``cb``. + ## Start watching signal `signal`, and when signal appears, call the + ## callback `cb`. let p = getGlobalDispatcher() var data = newAsyncData() data.readList.add(cb) p.selector.registerSignal(signal, data) proc addProcess*(pid: int, cb: Callback) = - ## Start watching for process exit with pid ``pid``, and then call - ## the callback ``cb``. + ## Start watching for process exit with pid `pid`, and then call + ## the callback `cb`. let p = getGlobalDispatcher() var data = newAsyncData() data.readList.add(cb) p.selector.registerProcess(pid, data) proc newAsyncEvent*(): AsyncEvent = - ## Creates new ``AsyncEvent``. + ## Creates new `AsyncEvent`. result = AsyncEvent(newSelectEvent()) proc trigger*(ev: AsyncEvent) = - ## Sets new ``AsyncEvent`` to signaled state. + ## Sets new `AsyncEvent` to signaled state. trigger(SelectEvent(ev)) proc close*(ev: AsyncEvent) = - ## Closes ``AsyncEvent`` + ## Closes `AsyncEvent` close(SelectEvent(ev)) proc addEvent*(ev: AsyncEvent, cb: Callback) = - ## Start watching for event ``ev``, and call callback ``cb``, when + ## Start watching for event `ev`, and call callback `cb`, when ## ev will be set to signaled state. let p = getGlobalDispatcher() var data = newAsyncData() @@ -1498,54 +1693,275 @@ else: p.selector.registerEvent(SelectEvent(ev), data) proc drain*(timeout = 500) = - ## Waits for completion events and processes them. Raises ``ValueError`` - ## if there are no pending operations. In contrast to ``poll`` this - ## processes as many events as are available. - if runOnce(timeout): - while hasPendingOperations() and runOnce(0): discard + ## Waits for completion of **all** events and processes them. Raises `ValueError` + ## if there are no pending operations. In contrast to `poll` this + ## processes as many events as are available until the timeout has elapsed. + var curTimeout = timeout + let start = now() + while hasPendingOperations(): + discard runOnce(curTimeout) + curTimeout -= (now() - start).inMilliseconds.int + if curTimeout < 0: + break proc poll*(timeout = 500) = - ## Waits for completion events and processes them. Raises ``ValueError`` + ## Waits for completion events and processes them. Raises `ValueError` ## if there are no pending operations. This runs the underlying OS ## `epoll`:idx: or `kqueue`:idx: primitive only once. discard runOnce(timeout) -# Common procedures between current and upcoming asyncdispatch -include includes.asynccommon +template createAsyncNativeSocketImpl(domain, sockType, protocol: untyped, + inheritable = defined(nimInheritHandles)) = + let handle = createNativeSocket(domain, sockType, protocol, inheritable) + if handle == osInvalidSocket: + return osInvalidSocket.AsyncFD + handle.setBlocking(false) + when defined(macosx) and not defined(nimdoc): + handle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1) + result = handle.AsyncFD + register(result) + +proc createAsyncNativeSocket*(domain: cint, sockType: cint, + protocol: cint, + inheritable = defined(nimInheritHandles)): AsyncFD = + createAsyncNativeSocketImpl(domain, sockType, protocol, inheritable) + +proc createAsyncNativeSocket*(domain: Domain = Domain.AF_INET, + sockType: SockType = SOCK_STREAM, + protocol: Protocol = IPPROTO_TCP, + inheritable = defined(nimInheritHandles)): AsyncFD = + createAsyncNativeSocketImpl(domain, sockType, protocol, inheritable) -proc sleepAsync*(ms: int | float): Future[void] = +when defined(windows) or defined(nimdoc): + proc bindToDomain(handle: SocketHandle, domain: Domain) = + # Extracted into a separate proc, because connect() on Windows requires + # the socket to be initially bound. + template doBind(saddr) = + if bindAddr(handle, cast[ptr SockAddr](addr(saddr)), + sizeof(saddr).SockLen) < 0'i32: + raiseOSError(osLastError()) + + if domain == Domain.AF_INET6: + var saddr: Sockaddr_in6 + saddr.sin6_family = uint16(toInt(domain)) + doBind(saddr) + else: + var saddr: Sockaddr_in + saddr.sin_family = uint16(toInt(domain)) + doBind(saddr) + + proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): owned(Future[void]) = + let retFuture = newFuture[void]("doConnect") + result = retFuture + + var ol = newCustom() + ol.data = CompletionData(fd: socket, cb: + proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = + if not retFuture.finished: + if errcode == OSErrorCode(-1): + const SO_UPDATE_CONNECT_CONTEXT = 0x7010 + socket.SocketHandle.setSockOptInt(SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, 1) # 15022 + retFuture.complete() + else: + retFuture.fail(newOSError(errcode)) + ) + + let ret = connectEx(socket.SocketHandle, addrInfo.ai_addr, + cint(addrInfo.ai_addrlen), nil, 0, nil, + cast[POVERLAPPED](ol)) + if ret: + # Request to connect completed immediately. + retFuture.complete() + # We don't deallocate `ol` here because even though this completed + # immediately poll will still be notified about its completion and it + # will free `ol`. + else: + let lastError = osLastError() + if lastError.int32 != ERROR_IO_PENDING: + # With ERROR_IO_PENDING `ol` will be deallocated in `poll`, + # and the future will be completed/failed there, too. + GC_unref(ol) + retFuture.fail(newOSError(lastError)) +else: + proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): owned(Future[void]) = + let retFuture = newFuture[void]("doConnect") + result = retFuture + + proc cb(fd: AsyncFD): bool = + let ret = SocketHandle(fd).getSockOptInt( + cint(SOL_SOCKET), cint(SO_ERROR)) + if ret == 0: + # We have connected. + retFuture.complete() + return true + elif ret == EINTR: + # interrupted, keep waiting + return false + else: + retFuture.fail(newOSError(OSErrorCode(ret))) + return true + + let ret = connect(socket.SocketHandle, + addrInfo.ai_addr, + addrInfo.ai_addrlen.SockLen) + if ret == 0: + # Request to connect completed immediately. + retFuture.complete() + else: + let lastError = osLastError() + if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS: + addWrite(socket, cb) + else: + retFuture.fail(newOSError(lastError)) + +template asyncAddrInfoLoop(addrInfo: ptr AddrInfo, fd: untyped, + protocol: Protocol = IPPROTO_RAW) = + ## Iterates through the AddrInfo linked list asynchronously + ## until the connection can be established. + const shouldCreateFd = not declared(fd) + + when shouldCreateFd: + let sockType = protocol.toSockType() + + var fdPerDomain: array[low(Domain).ord..high(Domain).ord, AsyncFD] + for i in low(fdPerDomain)..high(fdPerDomain): + fdPerDomain[i] = osInvalidSocket.AsyncFD + template closeUnusedFds(domainToKeep = -1) {.dirty.} = + for i, fd in fdPerDomain: + if fd != osInvalidSocket.AsyncFD and i != domainToKeep: + fd.closeSocket() + + var lastException: ref Exception + var curAddrInfo = addrInfo + var domain: Domain + when shouldCreateFd: + var curFd: AsyncFD + else: + var curFd = fd + proc tryNextAddrInfo(fut: Future[void]) {.gcsafe.} = + if fut == nil or fut.failed: + if fut != nil: + lastException = fut.readError() + + while curAddrInfo != nil: + let domainOpt = curAddrInfo.ai_family.toKnownDomain() + if domainOpt.isSome: + domain = domainOpt.unsafeGet() + break + curAddrInfo = curAddrInfo.ai_next + + if curAddrInfo == nil: + freeAddrInfo(addrInfo) + when shouldCreateFd: + closeUnusedFds() + if lastException != nil: + retFuture.fail(lastException) + else: + retFuture.fail(newException( + IOError, "Couldn't resolve address: " & address)) + return + + when shouldCreateFd: + curFd = fdPerDomain[ord(domain)] + if curFd == osInvalidSocket.AsyncFD: + try: + curFd = createAsyncNativeSocket(domain, sockType, protocol) + except: + freeAddrInfo(addrInfo) + closeUnusedFds() + raise getCurrentException() + when defined(windows): + curFd.SocketHandle.bindToDomain(domain) + fdPerDomain[ord(domain)] = curFd + + doConnect(curFd, curAddrInfo).callback = tryNextAddrInfo + curAddrInfo = curAddrInfo.ai_next + else: + freeAddrInfo(addrInfo) + when shouldCreateFd: + closeUnusedFds(ord(domain)) + retFuture.complete(curFd) + else: + retFuture.complete() + + tryNextAddrInfo(nil) + +proc dial*(address: string, port: Port, + protocol: Protocol = IPPROTO_TCP): owned(Future[AsyncFD]) = + ## Establishes connection to the specified `address`:`port` pair via the + ## specified protocol. The procedure iterates through possible + ## resolutions of the `address` until it succeeds, meaning that it + ## seamlessly works with both IPv4 and IPv6. + ## Returns the async file descriptor, registered in the dispatcher of + ## the current thread, ready to send or receive data. + let retFuture = newFuture[AsyncFD]("dial") + result = retFuture + let sockType = protocol.toSockType() + + let aiList = getAddrInfo(address, port, Domain.AF_UNSPEC, sockType, protocol) + asyncAddrInfoLoop(aiList, noFD, protocol) + +proc connect*(socket: AsyncFD, address: string, port: Port, + domain = Domain.AF_INET): owned(Future[void]) = + let retFuture = newFuture[void]("connect") + result = retFuture + + when defined(windows): + verifyPresence(socket) + else: + assert getSockDomain(socket.SocketHandle) == domain + + let aiList = getAddrInfo(address, port, domain) + when defined(windows): + socket.SocketHandle.bindToDomain(domain) + asyncAddrInfoLoop(aiList, socket) + +proc sleepAsync*(ms: int | float): owned(Future[void]) = ## Suspends the execution of the current async procedure for the next - ## ``ms`` milliseconds. + ## `ms` milliseconds. var retFuture = newFuture[void]("sleepAsync") let p = getGlobalDispatcher() - p.timers.push((epochTime() + (ms / 1000), retFuture)) + when ms is int: + p.timers.push((getMonoTime() + initDuration(milliseconds = ms), retFuture)) + elif ms is float: + let ns = (ms * 1_000_000).int64 + p.timers.push((getMonoTime() + initDuration(nanoseconds = ns), retFuture)) return retFuture -proc withTimeout*[T](fut: Future[T], timeout: int): Future[bool] = - ## Returns a future which will complete once ``fut`` completes or after - ## ``timeout`` milliseconds has elapsed. +proc withTimeout*[T](fut: Future[T], timeout: int): owned(Future[bool]) = + ## Returns a future which will complete once `fut` completes or after + ## `timeout` milliseconds has elapsed. ## - ## If ``fut`` completes first the returned future will hold true, - ## otherwise, if ``timeout`` milliseconds has elapsed first, the returned + ## If `fut` completes first the returned future will hold true, + ## otherwise, if `timeout` milliseconds has elapsed first, the returned ## future will hold false. var retFuture = newFuture[bool]("asyncdispatch.`withTimeout`") var timeoutFuture = sleepAsync(timeout) fut.callback = proc () = - if not retFuture.finished: retFuture.complete(true) + if not retFuture.finished: + if fut.failed: + retFuture.fail(fut.error) + else: + retFuture.complete(true) timeoutFuture.callback = proc () = if not retFuture.finished: retFuture.complete(false) return retFuture proc accept*(socket: AsyncFD, - flags = {SocketFlag.SafeDisconn}): Future[AsyncFD] = + flags = {SocketFlag.SafeDisconn}, + inheritable = defined(nimInheritHandles)): owned(Future[AsyncFD]) = ## Accepts a new connection. Returns a future containing the client socket ## corresponding to that connection. + ## + ## If `inheritable` is false (the default), the resulting client socket + ## will not be inheritable by child processes. + ## ## The future will complete when the connection is successfully accepted. var retFut = newFuture[AsyncFD]("accept") - var fut = acceptAddr(socket, flags) + var fut = acceptAddr(socket, flags, inheritable) fut.callback = proc (future: Future[tuple[address: string, client: AsyncFD]]) = assert future.finished @@ -1555,31 +1971,33 @@ proc accept*(socket: AsyncFD, retFut.complete(future.read.client) return retFut +proc keepAlive(x: string) = + discard "mark 'x' as escaping so that it is put into a closure for us to keep the data alive" + proc send*(socket: AsyncFD, data: string, - flags = {SocketFlag.SafeDisconn}): Future[void] = - ## Sends ``data`` to ``socket``. The returned future will complete once all + flags = {SocketFlag.SafeDisconn}): owned(Future[void]) = + ## Sends `data` to `socket`. The returned future will complete once all ## data has been sent. var retFuture = newFuture[void]("send") - - var copiedData = data - GC_ref(copiedData) # we need to protect data until send operation is completed - # or failed. - - let sendFut = socket.send(addr copiedData[0], data.len, flags) - sendFut.callback = - proc () = - GC_unref(copiedData) - if sendFut.failed: - retFuture.fail(sendFut.error) - else: - retFuture.complete() + if data.len > 0: + let sendFut = socket.send(unsafeAddr data[0], data.len, flags) + sendFut.callback = + proc () = + keepAlive(data) + if sendFut.failed: + retFuture.fail(sendFut.error) + else: + retFuture.complete() + else: + retFuture.complete() return retFuture # -- Await Macro -include asyncmacro +import std/asyncmacro +export asyncmacro -proc readAll*(future: FutureStream[string]): Future[string] {.async.} = +proc readAll*(future: FutureStream[string]): owned(Future[string]) {.async.} = ## Returns a future that will complete when all the string data from the ## specified future stream is retrieved. result = "" @@ -1590,50 +2008,7 @@ proc readAll*(future: FutureStream[string]): Future[string] {.async.} = else: break -proc recvLine*(socket: AsyncFD): Future[string] {.async, deprecated.} = - ## Reads a line of data from ``socket``. Returned future will complete once - ## a full line is read or an error occurs. - ## - ## If a full line is read ``\r\L`` is not - ## added to ``line``, however if solely ``\r\L`` is read then ``line`` - ## will be set to it. - ## - ## If the socket is disconnected, ``line`` will be set to ``""``. - ## - ## If the socket is disconnected in the middle of a line (before ``\r\L`` - ## is read) then line will be set to ``""``. - ## The partial line **will be lost**. - ## - ## **Warning**: This assumes that lines are delimited by ``\r\L``. - ## - ## **Note**: This procedure is mostly used for testing. You likely want to - ## use ``asyncnet.recvLine`` instead. - ## - ## **Deprecated since version 0.15.0**: Use ``asyncnet.recvLine()`` instead. - - template addNLIfEmpty(): typed = - if result.len == 0: - result.add("\c\L") - - result = "" - var c = "" - while true: - c = await recv(socket, 1) - if c.len == 0: - return "" - if c == "\r": - c = await recv(socket, 1) - assert c == "\l" - addNLIfEmpty() - return - elif c == "\L": - addNLIfEmpty() - return - add(result, c) - -proc callSoon(cbproc: proc ()) = - ## Schedule `cbproc` to be called as soon as possible. - ## The callback is called when control returns to the event loop. +proc callSoon(cbproc: proc () {.gcsafe.}) = getGlobalDispatcher().callbacks.addLast(cbproc) proc runForever*() = @@ -1648,8 +2023,43 @@ proc waitFor*[T](fut: Future[T]): T = fut.read -proc setEvent*(ev: AsyncEvent) {.deprecated.} = - ## Set event ``ev`` to signaled state. - ## - ## **Deprecated since v0.18.0:** Use ``trigger`` instead. - ev.trigger() \ No newline at end of file +proc activeDescriptors*(): int {.inline.} = + ## Returns the current number of active file descriptors for the current + ## event loop. This is a cheap operation that does not involve a system call. + when defined(windows): + result = getGlobalDispatcher().handles.len + elif not defined(nimdoc): + result = getGlobalDispatcher().selector.count + +when defined(posix): + import std/posix + +when defined(linux) or defined(windows) or defined(macosx) or defined(bsd) or + defined(solaris) or defined(zephyr) or defined(freertos) or defined(nuttx) or defined(haiku): + proc maxDescriptors*(): int {.raises: OSError.} = + ## Returns the maximum number of active file descriptors for the current + ## process. This involves a system call. For now `maxDescriptors` is + ## supported on the following OSes: Windows, Linux, OSX, BSD, Solaris. + when defined(windows): + result = 16_700_000 + elif defined(zephyr) or defined(freertos): + result = FD_MAX + else: + var fdLim: RLimit + if getrlimit(RLIMIT_NOFILE, fdLim) < 0: + raiseOSError(osLastError()) + result = int(fdLim.rlim_cur) - 1 + +when defined(genode): + proc scheduleCallbacks*(): bool {.discardable.} = + ## *Genode only.* + ## Schedule callback processing and return immediately. + ## Returns `false` if there is nothing to schedule. + ## RPC servers should call this to dispatch `callSoon` + ## bodies after retiring an RPC to its client. + ## This is effectively a non-blocking `poll(…)` and is + ## equivalent to scheduling a momentary no-op timeout + ## but faster and with less overhead. + let dis = getGlobalDispatcher() + result = dis.callbacks.len > 0 + if result: submit(dis.signalHandler.cap) |