diff options
Diffstat (limited to 'lib/pure/asyncdispatch.nim')
-rw-r--r-- | lib/pure/asyncdispatch.nim | 2667 |
1 files changed, 1554 insertions, 1113 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index cc337452f..126db7a7f 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -7,39 +7,26 @@ # distribution, for details about the copyright. # -include "system/inclrtl" - -import os, oids, tables, strutils, macros, times - -import nativesockets, net - -export Port, SocketFlag - -#{.injectStmt: newGcInvariant().} - -## AsyncDispatch -## ************* -## ## This module implements asynchronous IO. This includes a dispatcher, -## a ``Future`` type implementation, and an ``async`` macro which allows -## asynchronous code to be written in a synchronous style with the ``await`` +## a `Future` type implementation, and an `async` macro which allows +## asynchronous code to be written in a synchronous style with the `await` ## keyword. ## -## The dispatcher acts as a kind of event loop. You must call ``poll`` on it -## (or a function which does so for you such as ``waitFor`` or ``runForever``) +## The dispatcher acts as a kind of event loop. You must call `poll` on it +## (or a function which does so for you such as `waitFor` or `runForever`) ## in order to poll for any outstanding events. The underlying implementation ## is based on epoll on Linux, IO Completion Ports on Windows and select on ## other operating systems. ## -## The ``poll`` function will not, on its own, return any events. Instead -## an appropriate ``Future`` object will be completed. A ``Future`` is a +## The `poll` function will not, on its own, return any events. Instead +## an appropriate `Future` object will be completed. A `Future` is a ## type which holds a value which is not yet available, but which *may* be ## available in the future. You can check whether a future is finished -## by using the ``finished`` function. When a future is finished it means that +## by using the `finished` function. When a future is finished it means that ## either the value that it holds is now available or it holds an error instead. ## The latter situation occurs when the operation to complete a future fails ## with an exception. You can distinguish between the two situations with the -## ``failed`` function. +## `failed` function. ## ## Future objects can also store a callback procedure which will be called ## automatically once the future completes. @@ -48,49 +35,50 @@ export Port, SocketFlag ## pattern. In this ## pattern you make a request for an action, and once that action is fulfilled ## a future is completed with the result of that action. Requests can be -## made by calling the appropriate functions. For example: calling the ``recv`` +## made by calling the appropriate functions. For example: calling the `recv` ## function will create a request for some data to be read from a socket. The -## future which the ``recv`` function returns will then complete once the +## future which the `recv` function returns will then complete once the ## requested amount of data is read **or** an exception occurs. ## ## Code to read some data from a socket may look something like this: +## ```Nim +## var future = socket.recv(100) +## future.addCallback( +## proc () = +## echo(future.read) +## ) +## ``` ## -## .. code-block::nim -## var future = socket.recv(100) -## future.callback = -## proc () = -## echo(future.read) -## -## All asynchronous functions returning a ``Future`` will not block. They +## All asynchronous functions returning a `Future` will not block. They ## will not however return immediately. An asynchronous function will have ## code which will be executed before an asynchronous request is made, in most ## cases this code sets up the request. ## -## In the above example, the ``recv`` function will return a brand new -## ``Future`` instance once the request for data to be read from the socket -## is made. This ``Future`` instance will complete once the requested amount +## In the above example, the `recv` function will return a brand new +## `Future` instance once the request for data to be read from the socket +## is made. This `Future` instance will complete once the requested amount ## of data is read, in this case it is 100 bytes. The second line sets a ## callback on this future which will be called once the future completes. -## All the callback does is write the data stored in the future to ``stdout``. -## The ``read`` function is used for this and it checks whether the future -## completes with an error for you (if it did it will simply raise the -## error), if there is no error however it returns the value of the future. +## All the callback does is write the data stored in the future to `stdout`. +## The `read` function is used for this and it checks whether the future +## completes with an error for you (if it did, it will simply raise the +## error), if there is no error, however, it returns the value of the future. ## ## Asynchronous procedures -## ----------------------- +## ======================= ## ## Asynchronous procedures remove the pain of working with callbacks. They do ## this by allowing you to write asynchronous code the same way as you would ## write synchronous code. ## -## An asynchronous procedure is marked using the ``{.async.}`` pragma. -## When marking a procedure with the ``{.async.}`` pragma it must have a -## ``Future[T]`` return type or no return type at all. If you do not specify -## a return type then ``Future[void]`` is assumed. +## An asynchronous procedure is marked using the `{.async.}` pragma. +## When marking a procedure with the `{.async.}` pragma it must have a +## `Future[T]` return type or no return type at all. If you do not specify +## a return type then `Future[void]` is assumed. ## -## Inside asynchronous procedures ``await`` can be used to call any +## Inside asynchronous procedures `await` can be used to call any ## procedures which return a -## ``Future``; this includes asynchronous procedures. When a procedure is +## `Future`; this includes asynchronous procedures. When a procedure is ## "awaited", the asynchronous procedure it is awaited in will ## suspend its execution ## until the awaited procedure's Future completes. At which point the @@ -98,315 +86,293 @@ export Port, SocketFlag ## when an asynchronous procedure is suspended other asynchronous procedures ## will be run by the dispatcher. ## -## The ``await`` call may be used in many contexts. It can be used on the right -## hand side of a variable declaration: ``var data = await socket.recv(100)``, +## The `await` call may be used in many contexts. It can be used on the right +## hand side of a variable declaration: `var data = await socket.recv(100)`, ## in which case the variable will be set to the value of the future -## automatically. It can be used to await a ``Future`` object, and it can -## be used to await a procedure returning a ``Future[void]``: -## ``await socket.send("foobar")``. +## automatically. It can be used to await a `Future` object, and it can +## be used to await a procedure returning a `Future[void]`: +## `await socket.send("foobar")`. +## +## If an awaited future completes with an error, then `await` will re-raise +## this error. To avoid this, you can use the `yield` keyword instead of +## `await`. The following section shows different ways that you can handle +## exceptions in async procs. +## +## .. caution:: +## Procedures marked {.async.} do not support mutable parameters such +## as `var int`. References such as `ref int` should be used instead. +## +## Handling Exceptions +## ------------------- +## +## You can handle exceptions in the same way as in ordinary Nim code; +## by using the try statement: +## +## ```Nim +## try: +## let data = await sock.recv(100) +## echo("Received ", data) +## except: +## # Handle exception +## ``` +## +## An alternative approach to handling exceptions is to use `yield` on a future +## then check the future's `failed` property. For example: +## +## ```Nim +## var future = sock.recv(100) +## yield future +## if future.failed: +## # Handle exception +## ``` +## ## ## Discarding futures -## ------------------ +## ================== +## +## Futures should **never** be discarded directly because they may contain +## errors. If you do not care for the result of a Future then you should use +## the `asyncCheck` procedure instead of the `discard` keyword. Note that this +## does not wait for completion, and you should use `waitFor` or `await` for that purpose. +## +## .. note:: `await` also checks if the future fails, so you can safely discard +## its result. ## -## Futures should **never** be discarded. This is because they may contain -## errors. If you do not care for the result of a Future then you should -## use the ``asyncCheck`` procedure instead of the ``discard`` keyword. +## Handling futures +## ================ +## +## There are many different operations that apply to a future. +## The three primary high-level operations are `asyncCheck`, +## `waitFor`, and `await`. +## +## * `asyncCheck`: Raises an exception if the future fails. It neither waits +## for the future to finish nor returns the result of the future. +## * `waitFor`: Polls the event loop and blocks the current thread until the +## future finishes. This is often used to call an async procedure from a +## synchronous context and should never be used in an `async` proc. +## * `await`: Pauses execution in the current async procedure until the future +## finishes. While the current procedure is paused, other async procedures will +## continue running. Should be used instead of `waitFor` in an async +## procedure. +## +## Here is a handy quick reference chart showing their high-level differences: +## ============== ===================== ======================= +## Procedure Context Blocking +## ============== ===================== ======================= +## `asyncCheck` non-async and async non-blocking +## `waitFor` non-async blocks current thread +## `await` async suspends current proc +## ============== ===================== ======================= ## ## Examples -## -------- +## ======== ## ## For examples take a look at the documentation for the modules implementing ## asynchronous IO. A good place to start is the ## `asyncnet module <asyncnet.html>`_. ## +## Investigating pending futures +## ============================= +## +## It's possible to get into a situation where an async proc, or more accurately +## a `Future[T]` gets stuck and +## never completes. This can happen for various reasons and can cause serious +## memory leaks. When this occurs it's hard to identify the procedure that is +## stuck. +## +## Thankfully there is a mechanism which tracks the count of each pending future. +## All you need to do to enable it is compile with `-d:futureLogging` and +## use the `getFuturesInProgress` procedure to get the list of pending futures +## together with the stack traces to the moment of their creation. +## +## You may also find it useful to use this +## `prometheus package <https://github.com/dom96/prometheus>`_ which will log +## the pending futures into prometheus, allowing you to analyse them via a nice +## graph. +## +## +## ## Limitations/Bugs -## ---------------- +## ================ ## -## * The effect system (``raises: []``) does not work with async procedures. -## * Can't await in a ``except`` body -## * Forward declarations for async procs are broken, -## link includes workaround: https://github.com/nim-lang/Nim/issues/3182. -## * FutureVar[T] needs to be completed manually. - -# TODO: Check if yielded future is nil and throw a more meaningful exception - -# -- Futures - -type - FutureBase* = ref object of RootObj ## Untyped future. - cb: proc () {.closure,gcsafe.} - finished: bool - error*: ref Exception ## Stored exception - errorStackTrace*: string - when not defined(release): - stackTrace: string ## For debugging purposes only. - id: int - fromProc: string - - Future*[T] = ref object of FutureBase ## Typed future. - value: T ## Stored value - - FutureVar*[T] = distinct Future[T] - - FutureError* = object of Exception - cause*: FutureBase - -{.deprecated: [PFutureBase: FutureBase, PFuture: Future].} - -when not defined(release): - var currentID = 0 -proc newFuture*[T](fromProc: string = "unspecified"): Future[T] = - ## Creates a new future. - ## - ## Specifying ``fromProc``, which is a string specifying the name of the proc - ## that this future belongs to, is a good habit as it helps with debugging. - new(result) - result.finished = false - when not defined(release): - result.stackTrace = getStackTrace() - result.id = currentID - result.fromProc = fromProc - currentID.inc() - -proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] = - ## Create a new ``FutureVar``. This Future type is ideally suited for - ## situations where you want to avoid unnecessary allocations of Futures. - ## - ## Specifying ``fromProc``, which is a string specifying the name of the proc - ## that this future belongs to, is a good habit as it helps with debugging. - result = FutureVar[T](newFuture[T](fromProc)) - -proc clean*[T](future: FutureVar[T]) = - ## Resets the ``finished`` status of ``future``. - Future[T](future).finished = false - Future[T](future).error = nil - -proc checkFinished[T](future: Future[T]) = - ## Checks whether `future` is finished. If it is then raises a - ## ``FutureError``. - when not defined(release): - if future.finished: - var msg = "" - msg.add("An attempt was made to complete a Future more than once. ") - msg.add("Details:") - msg.add("\n Future ID: " & $future.id) - msg.add("\n Created in proc: " & future.fromProc) - msg.add("\n Stack trace to moment of creation:") - msg.add("\n" & indent(future.stackTrace.strip(), 4)) - when T is string: - msg.add("\n Contents (string): ") - msg.add("\n" & indent(future.value.repr, 4)) - msg.add("\n Stack trace to moment of secondary completion:") - msg.add("\n" & indent(getStackTrace().strip(), 4)) - var err = newException(FutureError, msg) - err.cause = future - raise err - -proc complete*[T](future: Future[T], val: T) = - ## Completes ``future`` with value ``val``. - #assert(not future.finished, "Future already finished, cannot finish twice.") - checkFinished(future) - assert(future.error == nil) - future.value = val - future.finished = true - if future.cb != nil: - future.cb() - -proc complete*(future: Future[void]) = - ## Completes a void ``future``. - #assert(not future.finished, "Future already finished, cannot finish twice.") - checkFinished(future) - assert(future.error == nil) - future.finished = true - if future.cb != nil: - future.cb() - -proc complete*[T](future: FutureVar[T]) = - ## Completes a ``FutureVar``. - template fut: expr = Future[T](future) - checkFinished(fut) - assert(fut.error == nil) - fut.finished = true - if fut.cb != nil: - fut.cb() - -proc fail*[T](future: Future[T], error: ref Exception) = - ## Completes ``future`` with ``error``. - #assert(not future.finished, "Future already finished, cannot finish twice.") - checkFinished(future) - future.finished = true - future.error = error - future.errorStackTrace = - if getStackTrace(error) == "": getStackTrace() else: getStackTrace(error) - if future.cb != nil: - future.cb() - else: - # This is to prevent exceptions from being silently ignored when a future - # is discarded. - # TODO: This may turn out to be a bad idea. - # Turns out this is a bad idea. - #raise error - discard - -proc `callback=`*(future: FutureBase, cb: proc () {.closure,gcsafe.}) = - ## Sets the callback proc to be called when the future completes. - ## - ## If future has already completed then ``cb`` will be called immediately. - ## - ## **Note**: You most likely want the other ``callback`` setter which - ## passes ``future`` as a param to the callback. - future.cb = cb - if future.finished: - future.cb() - -proc `callback=`*[T](future: Future[T], - cb: proc (future: Future[T]) {.closure,gcsafe.}) = - ## Sets the callback proc to be called when the future completes. - ## - ## If future has already completed then ``cb`` will be called immediately. - future.callback = proc () = cb(future) - -proc injectStacktrace[T](future: Future[T]) = - # TODO: Come up with something better. - when not defined(release): - var msg = "" - msg.add("\n " & future.fromProc & "'s lead up to read of failed Future:") - - if not future.errorStackTrace.isNil and future.errorStackTrace != "": - msg.add("\n" & indent(future.errorStackTrace.strip(), 4)) - else: - msg.add("\n Empty or nil stack trace.") - future.error.msg.add(msg) - -proc read*[T](future: Future[T]): T = - ## Retrieves the value of ``future``. Future must be finished otherwise - ## this function will fail with a ``ValueError`` exception. - ## - ## If the result of the future is an error then that error will be raised. - if future.finished: - if future.error != nil: - injectStacktrace(future) - raise future.error - when T isnot void: - return future.value - else: - # TODO: Make a custom exception type for this? - raise newException(ValueError, "Future still in progress.") - -proc readError*[T](future: Future[T]): ref Exception = - ## Retrieves the exception stored in ``future``. - ## - ## An ``ValueError`` exception will be thrown if no exception exists - ## in the specified Future. - if future.error != nil: return future.error - else: - raise newException(ValueError, "No error in future.") +## * The effect system (`raises: []`) does not work with async procedures. +## * Mutable parameters are not supported by async procedures. +## +## +## Multiple async backend support +## ============================== +## +## Thanks to its powerful macro support, Nim allows ``async``/``await`` to be +## implemented in libraries with only minimal support from the language - as +## such, multiple ``async`` libraries exist, including ``asyncdispatch`` and +## ``chronos``, and more may come to be developed in the future. +## +## Libraries built on top of async/await may wish to support multiple async +## backends - the best way to do so is to create separate modules for each backend +## that may be imported side-by-side. +## +## An alternative way is to select backend using a global compile flag - this +## method makes it difficult to compose applications that use both backends as may +## happen with transitive dependencies, but may be appropriate in some cases - +## libraries choosing this path should call the flag `asyncBackend`, allowing +## applications to choose the backend with `-d:asyncBackend=<backend_name>`. +## +## Known `async` backends include: +## +## * `-d:asyncBackend=none`: disable `async` support completely +## * `-d:asyncBackend=asyncdispatch`: https://nim-lang.org/docs/asyncdispatch.html +## * `-d:asyncBackend=chronos`: https://github.com/status-im/nim-chronos/ +## +## ``none`` can be used when a library supports both a synchronous and +## asynchronous API, to disable the latter. -proc mget*[T](future: FutureVar[T]): var T = - ## Returns a mutable value stored in ``future``. - ## - ## Unlike ``read``, this function will not raise an exception if the - ## Future has not been finished. - result = Future[T](future).value +import std/[os, tables, strutils, times, heapqueue, options, asyncstreams] +import std/[math, monotimes] +import std/asyncfutures except callSoon -proc finished*[T](future: Future[T]): bool = - ## Determines whether ``future`` has completed. - ## - ## ``True`` may indicate an error or a value. Use ``failed`` to distinguish. - future.finished +import std/[nativesockets, net, deques] -proc failed*(future: FutureBase): bool = - ## Determines whether ``future`` completed with an error. - return future.error != nil +when defined(nimPreviewSlimSystem): + import std/[assertions, syncio] -proc asyncCheck*[T](future: Future[T]) = - ## Sets a callback on ``future`` which raises an exception if the future - ## finished with an error. - ## - ## This should be used instead of ``discard`` to discard void futures. - future.callback = - proc () = - if future.failed: - injectStacktrace(future) - raise future.error - -proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] = - ## Returns a future which will complete once both ``fut1`` and ``fut2`` - ## complete. - var retFuture = newFuture[void]("asyncdispatch.`and`") - fut1.callback = - proc () = - if fut2.finished: retFuture.complete() - fut2.callback = - proc () = - if fut1.finished: retFuture.complete() - return retFuture +export Port, SocketFlag +export asyncfutures except callSoon +export asyncstreams -proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] = - ## Returns a future which will complete once either ``fut1`` or ``fut2`` - ## complete. - var retFuture = newFuture[void]("asyncdispatch.`or`") - proc cb() = - if not retFuture.finished: retFuture.complete() - fut1.callback = cb - fut2.callback = cb - return retFuture +# TODO: Check if yielded future is nil and throw a more meaningful exception type PDispatcherBase = ref object of RootRef - timers: seq[tuple[finishAt: float, fut: Future[void]]] - -proc processTimers(p: PDispatcherBase) = - var oldTimers = p.timers - p.timers = @[] - for t in oldTimers: - if epochTime() >= t.finishAt: - t.fut.complete() - else: - p.timers.add(t) + timers*: HeapQueue[tuple[finishAt: MonoTime, fut: Future[void]]] + callbacks*: Deque[proc () {.gcsafe.}] + +proc processTimers( + p: PDispatcherBase, didSomeWork: var bool +): Option[int] {.inline.} = + # Pop the timers in the order in which they will expire (smaller `finishAt`). + var count = p.timers.len + let t = getMonoTime() + while count > 0 and t >= p.timers[0].finishAt: + p.timers.pop().fut.complete() + dec count + didSomeWork = true + + # Return the number of milliseconds in which the next timer will expire. + if p.timers.len == 0: return + + let millisecs = (p.timers[0].finishAt - getMonoTime()).inMilliseconds + return some(millisecs.int + 1) + +proc processPendingCallbacks(p: PDispatcherBase; didSomeWork: var bool) = + while p.callbacks.len > 0: + var cb = p.callbacks.popFirst() + cb() + didSomeWork = true + +proc adjustTimeout( + p: PDispatcherBase, pollTimeout: int, nextTimer: Option[int] +): int {.inline.} = + if p.callbacks.len != 0: + return 0 + + if nextTimer.isNone() or pollTimeout == -1: + return pollTimeout + + result = max(nextTimer.get(), 0) + result = min(pollTimeout, result) + +proc runOnce(timeout: int): bool {.gcsafe.} + +proc callSoon*(cbproc: proc () {.gcsafe.}) {.gcsafe.} + ## Schedule `cbproc` to be called as soon as possible. + ## The callback is called when control returns to the event loop. + +proc initCallSoonProc = + if asyncfutures.getCallSoonProc().isNil: + asyncfutures.setCallSoonProc(callSoon) + +template implementSetInheritable() {.dirty.} = + when declared(setInheritable): + proc setInheritable*(fd: AsyncFD, inheritable: bool): bool = + ## Control whether a file handle can be inherited by child processes. + ## Returns `true` on success. + ## + ## This procedure is not guaranteed to be available for all platforms. + ## Test for availability with `declared() <system.html#declared,untyped>`_. + fd.FileHandle.setInheritable(inheritable) when defined(windows) or defined(nimdoc): - import winlean, sets, hashes + import std/[winlean, sets, hashes] type - CompletionKey = Dword + CompletionKey = ULONG_PTR CompletionData* = object - fd*: AsyncFD # TODO: Rename this. - cb*: proc (fd: AsyncFD, bytesTransferred: Dword, - errcode: OSErrorCode) {.closure,gcsafe.} + fd*: AsyncFD # TODO: Rename this. + cb*: owned(proc (fd: AsyncFD, bytesTransferred: DWORD, + errcode: OSErrorCode) {.closure, gcsafe.}) + cell*: ForeignCell # we need this `cell` to protect our `cb` environment, + # when using RegisterWaitForSingleObject, because + # waiting is done in different thread. PDispatcher* = ref object of PDispatcherBase ioPort: Handle - handles: HashSet[AsyncFD] + handles*: HashSet[AsyncFD] # Export handles so that an external library can register them. - CustomOverlapped = object of OVERLAPPED + CustomObj = object of OVERLAPPED data*: CompletionData - PCustomOverlapped* = ref CustomOverlapped + CustomRef* = ref CustomObj AsyncFD* = distinct int - {.deprecated: [TCompletionKey: CompletionKey, TAsyncFD: AsyncFD, - TCustomOverlapped: CustomOverlapped, TCompletionData: CompletionData].} + + PostCallbackData = object + ioPort: Handle + handleFd: AsyncFD + waitFd: Handle + ovl: owned CustomRef + PostCallbackDataPtr = ptr PostCallbackData + + AsyncEventImpl = object + hEvent: Handle + hWaiter: Handle + pcd: PostCallbackDataPtr + AsyncEvent* = ptr AsyncEventImpl + + Callback* = proc (fd: AsyncFD): bool {.closure, gcsafe.} proc hash(x: AsyncFD): Hash {.borrow.} proc `==`*(x: AsyncFD, y: AsyncFD): bool {.borrow.} - proc newDispatcher*(): PDispatcher = + proc newDispatcher*(): owned PDispatcher = ## Creates a new Dispatcher instance. new result result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1) - result.handles = initSet[AsyncFD]() - result.timers = @[] + result.handles = initHashSet[AsyncFD]() + result.timers.clear() + result.callbacks = initDeque[proc () {.closure, gcsafe.}](64) + + var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher + + proc setGlobalDispatcher*(disp: sink PDispatcher) = + if not gDisp.isNil: + assert gDisp.callbacks.len == 0 + gDisp = disp + initCallSoonProc() - var gDisp{.threadvar.}: PDispatcher ## Global dispatcher proc getGlobalDispatcher*(): PDispatcher = - ## Retrieves the global thread-local dispatcher. - if gDisp.isNil: gDisp = newDispatcher() + if gDisp.isNil: + setGlobalDispatcher(newDispatcher()) result = gDisp + proc getIoHandler*(disp: PDispatcher): Handle = + ## Returns the underlying IO Completion Port handle (Windows) or selector + ## (Unix) for the specified dispatcher. + return disp.ioPort + proc register*(fd: AsyncFD) = - ## Registers ``fd`` with the dispatcher. + ## Registers `fd` with the dispatcher. let p = getGlobalDispatcher() + if createIoCompletionPort(fd.Handle, p.ioPort, cast[CompletionKey](fd), 1) == 0: raiseOSError(osLastError()) @@ -414,28 +380,42 @@ when defined(windows) or defined(nimdoc): proc verifyPresence(fd: AsyncFD) = ## Ensures that file descriptor has been registered with the dispatcher. + ## Raises ValueError if `fd` has not been registered. let p = getGlobalDispatcher() if fd notin p.handles: raise newException(ValueError, "Operation performed on a socket which has not been registered with" & " the dispatcher yet.") - proc poll*(timeout = 500) = - ## Waits for completion events and processes them. + proc hasPendingOperations*(): bool = + ## Returns `true` if the global dispatcher has pending operations. + let p = getGlobalDispatcher() + p.handles.len != 0 or p.timers.len != 0 or p.callbacks.len != 0 + + proc runOnce(timeout: int): bool = let p = getGlobalDispatcher() - if p.handles.len == 0 and p.timers.len == 0: + if p.handles.len == 0 and p.timers.len == 0 and p.callbacks.len == 0: raise newException(ValueError, "No handles or timers registered in dispatcher.") - let llTimeout = - if timeout == -1: winlean.INFINITE - else: timeout.int32 - var lpNumberOfBytesTransferred: Dword - var lpCompletionKey: ULONG - var customOverlapped: PCustomOverlapped + result = false + let nextTimer = processTimers(p, result) + let at = adjustTimeout(p, timeout, nextTimer) + var llTimeout = + if at == -1: winlean.INFINITE + else: at.int32 + + var lpNumberOfBytesTransferred: DWORD + var lpCompletionKey: ULONG_PTR + var customOverlapped: CustomRef let res = getQueuedCompletionStatus(p.ioPort, addr lpNumberOfBytesTransferred, addr lpCompletionKey, cast[ptr POVERLAPPED](addr customOverlapped), llTimeout).bool + result = true + # For 'gcDestructors' the destructor of 'customOverlapped' will + # be called at the end and we are the only owner here. This means + # We do not have to 'GC_unref(customOverlapped)' because the destructor + # does that for us. # http://stackoverflow.com/a/12277264/492186 # TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html @@ -445,162 +425,84 @@ when defined(windows) or defined(nimdoc): customOverlapped.data.cb(customOverlapped.data.fd, lpNumberOfBytesTransferred, OSErrorCode(-1)) - GC_unref(customOverlapped) + + # If cell.data != nil, then system.protect(rawEnv(cb)) was called, + # so we need to dispose our `cb` environment, because it is not needed + # anymore. + if customOverlapped.data.cell.data != nil: + system.dispose(customOverlapped.data.cell) + + when not defined(gcDestructors): + GC_unref(customOverlapped) else: let errCode = osLastError() if customOverlapped != nil: assert customOverlapped.data.fd == lpCompletionKey.AsyncFD customOverlapped.data.cb(customOverlapped.data.fd, lpNumberOfBytesTransferred, errCode) - GC_unref(customOverlapped) + if customOverlapped.data.cell.data != nil: + system.dispose(customOverlapped.data.cell) + when not defined(gcDestructors): + GC_unref(customOverlapped) else: if errCode.int32 == WAIT_TIMEOUT: # Timed out - discard + result = false else: raiseOSError(errCode) # Timer processing. - processTimers(p) + discard processTimers(p, result) + # Callback queue processing + processPendingCallbacks(p, result) - var connectExPtr: pointer = nil - var acceptExPtr: pointer = nil - var getAcceptExSockAddrsPtr: pointer = nil + + var acceptEx: WSAPROC_ACCEPTEX + var connectEx: WSAPROC_CONNECTEX + var getAcceptExSockAddrs: WSAPROC_GETACCEPTEXSOCKADDRS proc initPointer(s: SocketHandle, fun: var pointer, guid: var GUID): bool = # Ref: https://github.com/powdahound/twisted/blob/master/twisted/internet/iocpreactor/iocpsupport/winsock_pointers.c - var bytesRet: Dword + var bytesRet: DWORD fun = nil result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid, - sizeof(GUID).Dword, addr fun, sizeof(pointer).Dword, + sizeof(GUID).DWORD, addr fun, sizeof(pointer).DWORD, addr bytesRet, nil, nil) == 0 proc initAll() = - let dummySock = newNativeSocket() - if not initPointer(dummySock, connectExPtr, WSAID_CONNECTEX): + let dummySock = createNativeSocket() + if dummySock == INVALID_SOCKET: raiseOSError(osLastError()) - if not initPointer(dummySock, acceptExPtr, WSAID_ACCEPTEX): + var fun: pointer = nil + if not initPointer(dummySock, fun, WSAID_CONNECTEX): raiseOSError(osLastError()) - if not initPointer(dummySock, getAcceptExSockAddrsPtr, WSAID_GETACCEPTEXSOCKADDRS): + connectEx = cast[WSAPROC_CONNECTEX](fun) + if not initPointer(dummySock, fun, WSAID_ACCEPTEX): raiseOSError(osLastError()) - - proc connectEx(s: SocketHandle, name: ptr SockAddr, namelen: cint, - lpSendBuffer: pointer, dwSendDataLength: Dword, - lpdwBytesSent: PDword, lpOverlapped: POVERLAPPED): bool = - if connectExPtr.isNil: raise newException(ValueError, "Need to initialise ConnectEx().") - let fun = - cast[proc (s: SocketHandle, name: ptr SockAddr, namelen: cint, - lpSendBuffer: pointer, dwSendDataLength: Dword, - lpdwBytesSent: PDword, lpOverlapped: POVERLAPPED): bool {.stdcall,gcsafe.}](connectExPtr) - - result = fun(s, name, namelen, lpSendBuffer, dwSendDataLength, lpdwBytesSent, - lpOverlapped) - - proc acceptEx(listenSock, acceptSock: SocketHandle, lpOutputBuffer: pointer, - dwReceiveDataLength, dwLocalAddressLength, - dwRemoteAddressLength: Dword, lpdwBytesReceived: PDword, - lpOverlapped: POVERLAPPED): bool = - if acceptExPtr.isNil: raise newException(ValueError, "Need to initialise AcceptEx().") - let fun = - cast[proc (listenSock, acceptSock: SocketHandle, lpOutputBuffer: pointer, - dwReceiveDataLength, dwLocalAddressLength, - dwRemoteAddressLength: Dword, lpdwBytesReceived: PDword, - lpOverlapped: POVERLAPPED): bool {.stdcall,gcsafe.}](acceptExPtr) - result = fun(listenSock, acceptSock, lpOutputBuffer, dwReceiveDataLength, - dwLocalAddressLength, dwRemoteAddressLength, lpdwBytesReceived, - lpOverlapped) - - proc getAcceptExSockaddrs(lpOutputBuffer: pointer, - dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength: Dword, - LocalSockaddr: ptr ptr SockAddr, LocalSockaddrLength: LPInt, - RemoteSockaddr: ptr ptr SockAddr, RemoteSockaddrLength: LPInt) = - if getAcceptExSockAddrsPtr.isNil: - raise newException(ValueError, "Need to initialise getAcceptExSockAddrs().") - - let fun = - cast[proc (lpOutputBuffer: pointer, - dwReceiveDataLength, dwLocalAddressLength, - dwRemoteAddressLength: Dword, LocalSockaddr: ptr ptr SockAddr, - LocalSockaddrLength: LPInt, RemoteSockaddr: ptr ptr SockAddr, - RemoteSockaddrLength: LPInt) {.stdcall,gcsafe.}](getAcceptExSockAddrsPtr) - - fun(lpOutputBuffer, dwReceiveDataLength, dwLocalAddressLength, - dwRemoteAddressLength, LocalSockaddr, LocalSockaddrLength, - RemoteSockaddr, RemoteSockaddrLength) - - proc connect*(socket: AsyncFD, address: string, port: Port, - domain = nativesockets.AF_INET): Future[void] = - ## Connects ``socket`` to server at ``address:port``. - ## - ## Returns a ``Future`` which will complete when the connection succeeds - ## or an error occurs. - verifyPresence(socket) - var retFuture = newFuture[void]("connect") - # Apparently ``ConnectEx`` expects the socket to be initially bound: - var saddr: Sockaddr_in - saddr.sin_family = int16(toInt(domain)) - saddr.sin_port = 0 - saddr.sin_addr.s_addr = INADDR_ANY - if bindAddr(socket.SocketHandle, cast[ptr SockAddr](addr(saddr)), - sizeof(saddr).SockLen) < 0'i32: + acceptEx = cast[WSAPROC_ACCEPTEX](fun) + if not initPointer(dummySock, fun, WSAID_GETACCEPTEXSOCKADDRS): raiseOSError(osLastError()) + getAcceptExSockAddrs = cast[WSAPROC_GETACCEPTEXSOCKADDRS](fun) + close(dummySock) - var aiList = getAddrInfo(address, port, domain) - var success = false - var lastError: OSErrorCode - var it = aiList - while it != nil: - # "the OVERLAPPED structure must remain valid until the I/O completes" - # http://blogs.msdn.com/b/oldnewthing/archive/2011/02/02/10123392.aspx - var ol = PCustomOverlapped() - GC_ref(ol) - ol.data = CompletionData(fd: socket, cb: - proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = - if not retFuture.finished: - if errcode == OSErrorCode(-1): - retFuture.complete() - else: - retFuture.fail(newException(OSError, osErrorMsg(errcode))) - ) - - var ret = connectEx(socket.SocketHandle, it.ai_addr, - sizeof(Sockaddr_in).cint, nil, 0, nil, - cast[POVERLAPPED](ol)) - if ret: - # Request to connect completed immediately. - success = true - retFuture.complete() - # We don't deallocate ``ol`` here because even though this completed - # immediately poll will still be notified about its completion and it will - # free ``ol``. - break - else: - lastError = osLastError() - if lastError.int32 == ERROR_IO_PENDING: - # In this case ``ol`` will be deallocated in ``poll``. - success = true - break - else: - GC_unref(ol) - success = false - it = it.ai_next - - dealloc(aiList) - if not success: - retFuture.fail(newException(OSError, osErrorMsg(lastError))) - return retFuture + proc newCustom*(): CustomRef = + result = CustomRef() # 0 + GC_ref(result) # 1 prevent destructor from doing a premature free. + # destructor of newCustom's caller --> 0. This means + # Windows holds a ref for us with RC == 0 (single owner). + # This is passed back to us in the IO completion port. proc recv*(socket: AsyncFD, size: int, - flags = {SocketFlag.SafeDisconn}): Future[string] = - ## Reads **up to** ``size`` bytes from ``socket``. Returned future will + flags = {SocketFlag.SafeDisconn}): owned(Future[string]) = + ## Reads **up to** `size` bytes from `socket`. Returned future will ## complete once all the data requested is read, a part of the data has been ## read, or the socket has disconnected in which case the future will - ## complete with a value of ``""``. + ## complete with a value of `""`. ## - ## **Warning**: The ``Peek`` socket flag is not supported on Windows. + ## .. warning:: The `Peek` socket flag is not supported on Windows. # Things to note: - # * When WSARecv completes immediately then ``bytesReceived`` is very + # * When WSARecv completes immediately then `bytesReceived` is very # unreliable. # * Still need to implement message-oriented socket disconnection, # '\0' in the message currently signifies a socket disconnect. Who @@ -613,12 +515,11 @@ when defined(windows) or defined(nimdoc): dataBuf.buf = cast[cstring](alloc0(size)) dataBuf.len = size.ULONG - var bytesReceived: Dword - var flagsio = flags.toOSFlags().Dword - var ol = PCustomOverlapped() - GC_ref(ol) + var bytesReceived: DWORD + var flagsio = flags.toOSFlags().DWORD + var ol = newCustom() ol.data = CompletionData(fd: socket, cb: - proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = if not retFuture.finished: if errcode == OSErrorCode(-1): if bytesCount == 0 and dataBuf.buf[0] == '\0': @@ -632,7 +533,7 @@ when defined(windows) or defined(nimdoc): if flags.isDisconnectionError(errcode): retFuture.complete("") else: - retFuture.fail(newException(OSError, osErrorMsg(errcode))) + retFuture.fail(newOSError(errcode)) if dataBuf.buf != nil: dealloc dataBuf.buf dataBuf.buf = nil @@ -650,50 +551,32 @@ when defined(windows) or defined(nimdoc): if flags.isDisconnectionError(err): retFuture.complete("") else: - retFuture.fail(newException(OSError, osErrorMsg(err))) - elif ret == 0 and bytesReceived == 0 and dataBuf.buf[0] == '\0': - # We have to ensure that the buffer is empty because WSARecv will tell - # us immediately when it was disconnected, even when there is still - # data in the buffer. - # We want to give the user as much data as we can. So we only return - # the empty string (which signals a disconnection) when there is - # nothing left to read. - retFuture.complete("") - # TODO: "For message-oriented sockets, where a zero byte message is often - # allowable, a failure with an error code of WSAEDISCON is used to - # indicate graceful closure." - # ~ http://msdn.microsoft.com/en-us/library/ms741688%28v=vs.85%29.aspx - else: - # Request to read completed immediately. - # From my tests bytesReceived isn't reliable. - let realSize = - if bytesReceived == 0: - size - else: - bytesReceived - var data = newString(realSize) - assert realSize <= size - copyMem(addr data[0], addr dataBuf.buf[0], realSize) - #dealloc dataBuf.buf - retFuture.complete($data) - # We don't deallocate ``ol`` here because even though this completed - # immediately poll will still be notified about its completion and it will - # free ``ol``. + retFuture.fail(newOSError(err)) + elif ret == 0: + # Request completed immediately. + if bytesReceived != 0: + var data = newString(bytesReceived) + assert bytesReceived <= size + copyMem(addr data[0], addr dataBuf.buf[0], bytesReceived) + retFuture.complete($data) + else: + if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)): + retFuture.complete("") return retFuture - proc recvInto*(socket: AsyncFD, buf: cstring, size: int, - flags = {SocketFlag.SafeDisconn}): Future[int] = - ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``, which must + proc recvInto*(socket: AsyncFD, buf: pointer, size: int, + flags = {SocketFlag.SafeDisconn}): owned(Future[int]) = + ## Reads **up to** `size` bytes from `socket` into `buf`, which must ## at least be of that size. Returned future will complete once all the ## data requested is read, a part of the data has been read, or the socket ## has disconnected in which case the future will complete with a value of - ## ``0``. + ## `0`. ## - ## **Warning**: The ``Peek`` socket flag is not supported on Windows. + ## .. warning:: The `Peek` socket flag is not supported on Windows. # Things to note: - # * When WSARecv completes immediately then ``bytesReceived`` is very + # * When WSARecv completes immediately then `bytesReceived` is very # unreliable. # * Still need to implement message-oriented socket disconnection, # '\0' in the message currently signifies a socket disconnect. Who @@ -705,26 +588,22 @@ when defined(windows) or defined(nimdoc): #buf[] = '\0' var dataBuf: TWSABuf - dataBuf.buf = buf + dataBuf.buf = cast[cstring](buf) dataBuf.len = size.ULONG - var bytesReceived: Dword - var flagsio = flags.toOSFlags().Dword - var ol = PCustomOverlapped() - GC_ref(ol) + var bytesReceived: DWORD + var flagsio = flags.toOSFlags().DWORD + var ol = newCustom() ol.data = CompletionData(fd: socket, cb: - proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = if not retFuture.finished: if errcode == OSErrorCode(-1): - if bytesCount == 0 and dataBuf.buf[0] == '\0': - retFuture.complete(0) - else: - retFuture.complete(bytesCount) + retFuture.complete(bytesCount) else: if flags.isDisconnectionError(errcode): retFuture.complete(0) else: - retFuture.fail(newException(OSError, osErrorMsg(errcode))) + retFuture.fail(newOSError(errcode)) if dataBuf.buf != nil: dataBuf.buf = nil ) @@ -740,50 +619,35 @@ when defined(windows) or defined(nimdoc): if flags.isDisconnectionError(err): retFuture.complete(0) else: - retFuture.fail(newException(OSError, osErrorMsg(err))) - elif ret == 0 and bytesReceived == 0 and dataBuf.buf[0] == '\0': - # We have to ensure that the buffer is empty because WSARecv will tell - # us immediately when it was disconnected, even when there is still - # data in the buffer. - # We want to give the user as much data as we can. So we only return - # the empty string (which signals a disconnection) when there is - # nothing left to read. - retFuture.complete(0) - # TODO: "For message-oriented sockets, where a zero byte message is often - # allowable, a failure with an error code of WSAEDISCON is used to - # indicate graceful closure." - # ~ http://msdn.microsoft.com/en-us/library/ms741688%28v=vs.85%29.aspx - else: - # Request to read completed immediately. - # From my tests bytesReceived isn't reliable. - let realSize = - if bytesReceived == 0: - size - else: - bytesReceived - assert realSize <= size - retFuture.complete(realSize) - # We don't deallocate ``ol`` here because even though this completed - # immediately poll will still be notified about its completion and it will - # free ``ol``. + retFuture.fail(newOSError(err)) + elif ret == 0: + # Request completed immediately. + if bytesReceived != 0: + assert bytesReceived <= size + retFuture.complete(bytesReceived) + else: + if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)): + retFuture.complete(bytesReceived) return retFuture - proc send*(socket: AsyncFD, data: string, - flags = {SocketFlag.SafeDisconn}): Future[void] = - ## Sends ``data`` to ``socket``. The returned future will complete once all - ## data has been sent. + proc send*(socket: AsyncFD, buf: pointer, size: int, + flags = {SocketFlag.SafeDisconn}): owned(Future[void]) = + ## Sends `size` bytes from `buf` to `socket`. The returned future + ## will complete once all data has been sent. + ## + ## .. warning:: Use it with caution. If `buf` refers to GC'ed object, + ## you must use GC_ref/GC_unref calls to avoid early freeing of the buffer. verifyPresence(socket) var retFuture = newFuture[void]("send") var dataBuf: TWSABuf - dataBuf.buf = data # since this is not used in a callback, this is fine - dataBuf.len = data.len.ULONG + dataBuf.buf = cast[cstring](buf) + dataBuf.len = size.ULONG - var bytesReceived, lowFlags: Dword - var ol = PCustomOverlapped() - GC_ref(ol) + var bytesReceived, lowFlags: DWORD + var ol = newCustom() ol.data = CompletionData(fd: socket, cb: - proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = if not retFuture.finished: if errcode == OSErrorCode(-1): retFuture.complete() @@ -791,7 +655,7 @@ when defined(windows) or defined(nimdoc): if flags.isDisconnectionError(errcode): retFuture.complete() else: - retFuture.fail(newException(OSError, osErrorMsg(errcode))) + retFuture.fail(newOSError(errcode)) ) let ret = WSASend(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived, @@ -803,16 +667,110 @@ when defined(windows) or defined(nimdoc): if flags.isDisconnectionError(err): retFuture.complete() else: - retFuture.fail(newException(OSError, osErrorMsg(err))) + retFuture.fail(newOSError(err)) + else: + retFuture.complete() + # We don't deallocate `ol` here because even though this completed + # immediately poll will still be notified about its completion and it will + # free `ol`. + return retFuture + + proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr, + saddrLen: SockLen, + flags = {SocketFlag.SafeDisconn}): owned(Future[void]) = + ## Sends `data` to specified destination `saddr`, using + ## socket `socket`. The returned future will complete once all data + ## has been sent. + verifyPresence(socket) + var retFuture = newFuture[void]("sendTo") + var dataBuf: TWSABuf + dataBuf.buf = cast[cstring](data) + dataBuf.len = size.ULONG + var bytesSent = 0.DWORD + var lowFlags = 0.DWORD + + # we will preserve address in our stack + var staddr: array[128, char] # SOCKADDR_STORAGE size is 128 bytes + var stalen: cint = cint(saddrLen) + zeroMem(addr(staddr[0]), 128) + copyMem(addr(staddr[0]), saddr, saddrLen) + + var ol = newCustom() + ol.data = CompletionData(fd: socket, cb: + proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = + if not retFuture.finished: + if errcode == OSErrorCode(-1): + retFuture.complete() + else: + retFuture.fail(newOSError(errcode)) + ) + + let ret = WSASendTo(socket.SocketHandle, addr dataBuf, 1, addr bytesSent, + lowFlags, cast[ptr SockAddr](addr(staddr[0])), + stalen, cast[POVERLAPPED](ol), nil) + if ret == -1: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + GC_unref(ol) + retFuture.fail(newOSError(err)) else: retFuture.complete() - # We don't deallocate ``ol`` here because even though this completed + # We don't deallocate `ol` here because even though this completed # immediately poll will still be notified about its completion and it will - # free ``ol``. + # free `ol`. + return retFuture + + proc recvFromInto*(socket: AsyncFD, data: pointer, size: int, + saddr: ptr SockAddr, saddrLen: ptr SockLen, + flags = {SocketFlag.SafeDisconn}): owned(Future[int]) = + ## Receives a datagram data from `socket` into `buf`, which must + ## be at least of size `size`, address of datagram's sender will be + ## stored into `saddr` and `saddrLen`. Returned future will complete + ## once one datagram has been received, and will return size of packet + ## received. + verifyPresence(socket) + var retFuture = newFuture[int]("recvFromInto") + + var dataBuf = TWSABuf(buf: cast[cstring](data), len: size.ULONG) + + var bytesReceived = 0.DWORD + var lowFlags = 0.DWORD + + var ol = newCustom() + ol.data = CompletionData(fd: socket, cb: + proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = + if not retFuture.finished: + if errcode == OSErrorCode(-1): + assert bytesCount <= size + retFuture.complete(bytesCount) + else: + # datagram sockets don't have disconnection, + # so we can just raise an exception + retFuture.fail(newOSError(errcode)) + ) + + let res = WSARecvFrom(socket.SocketHandle, addr dataBuf, 1, + addr bytesReceived, addr lowFlags, + saddr, cast[ptr cint](saddrLen), + cast[POVERLAPPED](ol), nil) + if res == -1: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + GC_unref(ol) + retFuture.fail(newOSError(err)) + else: + # Request completed immediately. + if bytesReceived != 0: + assert bytesReceived <= size + retFuture.complete(bytesReceived) + else: + if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)): + retFuture.complete(bytesReceived) return retFuture - proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}): - Future[tuple[address: string, client: AsyncFD]] = + proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}, + inheritable = defined(nimInheritHandles)): + owned(Future[tuple[address: string, client: AsyncFD]]) {.gcsafe.} = ## Accepts a new connection. Returns a future containing the client socket ## corresponding to that connection and the remote address of the client. ## The future will complete when the connection is successfully accepted. @@ -820,44 +778,27 @@ when defined(windows) or defined(nimdoc): ## The resulting client socket is automatically registered to the ## dispatcher. ## - ## The ``accept`` call may result in an error if the connecting socket - ## disconnects during the duration of the ``accept``. If the ``SafeDisconn`` + ## If `inheritable` is false (the default), the resulting client socket will + ## not be inheritable by child processes. + ## + ## The `accept` call may result in an error if the connecting socket + ## disconnects during the duration of the `accept`. If the `SafeDisconn` ## flag is specified then this error will not be raised and instead ## accept will be called again. verifyPresence(socket) var retFuture = newFuture[tuple[address: string, client: AsyncFD]]("acceptAddr") - var clientSock = newNativeSocket() + var clientSock = createNativeSocket(inheritable = inheritable) if clientSock == osInvalidSocket: raiseOSError(osLastError()) const lpOutputLen = 1024 var lpOutputBuf = newString(lpOutputLen) - var dwBytesReceived: Dword - let dwReceiveDataLength = 0.Dword # We don't want any data to be read. - let dwLocalAddressLength = Dword(sizeof (Sockaddr_in) + 16) - let dwRemoteAddressLength = Dword(sizeof(Sockaddr_in) + 16) + var dwBytesReceived: DWORD + let dwReceiveDataLength = 0.DWORD # We don't want any data to be read. + let dwLocalAddressLength = DWORD(sizeof(Sockaddr_in6) + 16) + let dwRemoteAddressLength = DWORD(sizeof(Sockaddr_in6) + 16) - template completeAccept(): stmt {.immediate, dirty.} = - var listenSock = socket - let setoptRet = setsockopt(clientSock, SOL_SOCKET, - SO_UPDATE_ACCEPT_CONTEXT, addr listenSock, - sizeof(listenSock).SockLen) - if setoptRet != 0: raiseOSError(osLastError()) - - var localSockaddr, remoteSockaddr: ptr SockAddr - var localLen, remoteLen: int32 - getAcceptExSockaddrs(addr lpOutputBuf[0], dwReceiveDataLength, - dwLocalAddressLength, dwRemoteAddressLength, - addr localSockaddr, addr localLen, - addr remoteSockaddr, addr remoteLen) - register(clientSock.AsyncFD) - # TODO: IPv6. Check ``sa_family``. http://stackoverflow.com/a/9212542/492186 - retFuture.complete( - (address: $inet_ntoa(cast[ptr Sockaddr_in](remoteSockAddr).sin_addr), - client: clientSock.AsyncFD) - ) - - template failAccept(errcode): stmt = + template failAccept(errcode) = if flags.isDisconnectionError(errcode): var newAcceptFut = acceptAddr(socket, flags) newAcceptFut.callback = @@ -867,12 +808,36 @@ when defined(windows) or defined(nimdoc): else: retFuture.complete(newAcceptFut.read) else: - retFuture.fail(newException(OSError, osErrorMsg(errcode))) + retFuture.fail(newOSError(errcode)) - var ol = PCustomOverlapped() - GC_ref(ol) + template completeAccept() {.dirty.} = + var listenSock = socket + let setoptRet = setsockopt(clientSock, SOL_SOCKET, + SO_UPDATE_ACCEPT_CONTEXT, addr listenSock, + sizeof(listenSock).SockLen) + if setoptRet != 0: + let errcode = osLastError() + discard clientSock.closesocket() + failAccept(errcode) + else: + var localSockaddr, remoteSockaddr: ptr SockAddr + var localLen, remoteLen: int32 + getAcceptExSockAddrs(addr lpOutputBuf[0], dwReceiveDataLength, + dwLocalAddressLength, dwRemoteAddressLength, + addr localSockaddr, addr localLen, + addr remoteSockaddr, addr remoteLen) + try: + let address = getAddrString(remoteSockaddr) + register(clientSock.AsyncFD) + retFuture.complete((address: address, client: clientSock.AsyncFD)) + except: + # getAddrString may raise + clientSock.close() + retFuture.fail(getCurrentException()) + + var ol = newCustom() ol.data = CompletionData(fd: socket, cb: - proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) {.gcsafe.} = if not retFuture.finished: if errcode == OSErrorCode(-1): completeAccept() @@ -894,25 +859,13 @@ when defined(windows) or defined(nimdoc): GC_unref(ol) else: completeAccept() - # We don't deallocate ``ol`` here because even though this completed + # We don't deallocate `ol` here because even though this completed # immediately poll will still be notified about its completion and it will - # free ``ol``. + # free `ol`. return retFuture - proc newAsyncNativeSocket*(domain, sockType, protocol: cint): AsyncFD = - ## Creates a new socket and registers it with the dispatcher implicitly. - result = newNativeSocket(domain, sockType, protocol).AsyncFD - result.SocketHandle.setBlocking(false) - register(result) - - proc newAsyncNativeSocket*(domain: Domain = nativesockets.AF_INET, - sockType: SockType = SOCK_STREAM, - protocol: Protocol = IPPROTO_TCP): AsyncFD = - ## Creates a new socket and registers it with the dispatcher implicitly. - result = newNativeSocket(domain, sockType, protocol).AsyncFD - result.SocketHandle.setBlocking(false) - register(result) + implementSetInheritable() proc closeSocket*(socket: AsyncFD) = ## Closes a socket and ensures that it is unregistered. @@ -920,186 +873,586 @@ when defined(windows) or defined(nimdoc): getGlobalDispatcher().handles.excl(socket) proc unregister*(fd: AsyncFD) = - ## Unregisters ``fd``. + ## Unregisters `fd`. getGlobalDispatcher().handles.excl(fd) + proc contains*(disp: PDispatcher, fd: AsyncFD): bool = + return fd in disp.handles + + {.push stackTrace: off.} + proc waitableCallback(param: pointer, + timerOrWaitFired: WINBOOL) {.stdcall.} = + var p = cast[PostCallbackDataPtr](param) + discard postQueuedCompletionStatus(p.ioPort, timerOrWaitFired.DWORD, + ULONG_PTR(p.handleFd), + cast[pointer](p.ovl)) + {.pop.} + + proc registerWaitableEvent(fd: AsyncFD, cb: Callback; mask: DWORD) = + let p = getGlobalDispatcher() + var flags = (WT_EXECUTEINWAITTHREAD or WT_EXECUTEONLYONCE).DWORD + var hEvent = wsaCreateEvent() + if hEvent == 0: + raiseOSError(osLastError()) + var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData))) + pcd.ioPort = p.ioPort + pcd.handleFd = fd + var ol = newCustom() + + ol.data = CompletionData(fd: fd, cb: + proc(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) {.gcsafe.} = + # we excluding our `fd` because cb(fd) can register own handler + # for this `fd` + p.handles.excl(fd) + # unregisterWait() is called before callback, because appropriate + # winsockets function can re-enable event. + # https://msdn.microsoft.com/en-us/library/windows/desktop/ms741576(v=vs.85).aspx + if unregisterWait(pcd.waitFd) == 0: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + deallocShared(cast[pointer](pcd)) + discard wsaCloseEvent(hEvent) + raiseOSError(err) + if cb(fd): + # callback returned `true`, so we free all allocated resources + deallocShared(cast[pointer](pcd)) + if not wsaCloseEvent(hEvent): + raiseOSError(osLastError()) + # pcd.ovl will be unrefed in poll(). + else: + # callback returned `false` we need to continue + if p.handles.contains(fd): + # new callback was already registered with `fd`, so we free all + # allocated resources. This happens because in callback `cb` + # addRead/addWrite was called with same `fd`. + deallocShared(cast[pointer](pcd)) + if not wsaCloseEvent(hEvent): + raiseOSError(osLastError()) + else: + # we need to include `fd` again + p.handles.incl(fd) + # and register WaitForSingleObject again + if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent, + cast[WAITORTIMERCALLBACK](waitableCallback), + cast[pointer](pcd), INFINITE, flags): + # pcd.ovl will be unrefed in poll() + let err = osLastError() + deallocShared(cast[pointer](pcd)) + discard wsaCloseEvent(hEvent) + raiseOSError(err) + else: + # we incref `pcd.ovl` and `protect` callback one more time, + # because it will be unrefed and disposed in `poll()` after + # callback finishes. + GC_ref(pcd.ovl) + pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb)) + ) + # We need to protect our callback environment value, so GC will not free it + # accidentally. + ol.data.cell = system.protect(rawEnv(ol.data.cb)) + + # This is main part of `hacky way` is using WSAEventSelect, so `hEvent` + # will be signaled when appropriate `mask` events will be triggered. + if wsaEventSelect(fd.SocketHandle, hEvent, mask) != 0: + let err = osLastError() + GC_unref(ol) + deallocShared(cast[pointer](pcd)) + discard wsaCloseEvent(hEvent) + raiseOSError(err) + + pcd.ovl = ol + if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent, + cast[WAITORTIMERCALLBACK](waitableCallback), + cast[pointer](pcd), INFINITE, flags): + let err = osLastError() + GC_unref(ol) + deallocShared(cast[pointer](pcd)) + discard wsaCloseEvent(hEvent) + raiseOSError(err) + p.handles.incl(fd) + + proc addRead*(fd: AsyncFD, cb: Callback) = + ## Start watching the file descriptor for read availability and then call + ## the callback `cb`. + ## + ## This is not `pure` mechanism for Windows Completion Ports (IOCP), + ## so if you can avoid it, please do it. Use `addRead` only if really + ## need it (main usecase is adaptation of unix-like libraries to be + ## asynchronous on Windows). + ## + ## If you use this function, you don't need to use asyncdispatch.recv() + ## or asyncdispatch.accept(), because they are using IOCP, please use + ## nativesockets.recv() and nativesockets.accept() instead. + ## + ## Be sure your callback `cb` returns `true`, if you want to remove + ## watch of `read` notifications, and `false`, if you want to continue + ## receiving notifications. + registerWaitableEvent(fd, cb, FD_READ or FD_ACCEPT or FD_OOB or FD_CLOSE) + + proc addWrite*(fd: AsyncFD, cb: Callback) = + ## Start watching the file descriptor for write availability and then call + ## the callback `cb`. + ## + ## This is not `pure` mechanism for Windows Completion Ports (IOCP), + ## so if you can avoid it, please do it. Use `addWrite` only if really + ## need it (main usecase is adaptation of unix-like libraries to be + ## asynchronous on Windows). + ## + ## If you use this function, you don't need to use asyncdispatch.send() + ## or asyncdispatch.connect(), because they are using IOCP, please use + ## nativesockets.send() and nativesockets.connect() instead. + ## + ## Be sure your callback `cb` returns `true`, if you want to remove + ## watch of `write` notifications, and `false`, if you want to continue + ## receiving notifications. + registerWaitableEvent(fd, cb, FD_WRITE or FD_CONNECT or FD_CLOSE) + + template registerWaitableHandle(p, hEvent, flags, pcd, timeout, + handleCallback) = + let handleFD = AsyncFD(hEvent) + pcd.ioPort = p.ioPort + pcd.handleFd = handleFD + var ol = newCustom() + ol.data.fd = handleFD + ol.data.cb = handleCallback + # We need to protect our callback environment value, so GC will not free it + # accidentally. + ol.data.cell = system.protect(rawEnv(ol.data.cb)) + + pcd.ovl = ol + if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent, + cast[WAITORTIMERCALLBACK](waitableCallback), + cast[pointer](pcd), timeout.DWORD, flags): + let err = osLastError() + GC_unref(ol) + deallocShared(cast[pointer](pcd)) + discard closeHandle(hEvent) + raiseOSError(err) + p.handles.incl(handleFD) + + template closeWaitable(handle: untyped) = + let waitFd = pcd.waitFd + deallocShared(cast[pointer](pcd)) + p.handles.excl(fd) + if unregisterWait(waitFd) == 0: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + discard closeHandle(handle) + raiseOSError(err) + if closeHandle(handle) == 0: + raiseOSError(osLastError()) + + proc addTimer*(timeout: int, oneshot: bool, cb: Callback) = + ## Registers callback `cb` to be called when timer expired. + ## + ## Parameters: + ## + ## * `timeout` - timeout value in milliseconds. + ## * `oneshot` + ## * `true` - generate only one timeout event + ## * `false` - generate timeout events periodically + + doAssert(timeout > 0) + let p = getGlobalDispatcher() + + var hEvent = createEvent(nil, 1, 0, nil) + if hEvent == INVALID_HANDLE_VALUE: + raiseOSError(osLastError()) + + var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData))) + var flags = WT_EXECUTEINWAITTHREAD.DWORD + if oneshot: flags = flags or WT_EXECUTEONLYONCE + + proc timercb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = + let res = cb(fd) + if res or oneshot: + closeWaitable(hEvent) + else: + # if callback returned `false`, then it wants to be called again, so + # we need to ref and protect `pcd.ovl` again, because it will be + # unrefed and disposed in `poll()`. + GC_ref(pcd.ovl) + pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb)) + + registerWaitableHandle(p, hEvent, flags, pcd, timeout, timercb) + + proc addProcess*(pid: int, cb: Callback) = + ## Registers callback `cb` to be called when process with process ID + ## `pid` exited. + const NULL = Handle(0) + let p = getGlobalDispatcher() + let procFlags = SYNCHRONIZE + var hProcess = openProcess(procFlags, 0, pid.DWORD) + if hProcess == NULL: + raiseOSError(osLastError()) + + var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData))) + var flags = WT_EXECUTEINWAITTHREAD.DWORD or WT_EXECUTEONLYONCE.DWORD + + proc proccb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = + closeWaitable(hProcess) + discard cb(fd) + + registerWaitableHandle(p, hProcess, flags, pcd, INFINITE, proccb) + + proc newAsyncEvent*(): AsyncEvent = + ## Creates a new thread-safe `AsyncEvent` object. + ## + ## New `AsyncEvent` object is not automatically registered with + ## dispatcher like `AsyncSocket`. + var sa = SECURITY_ATTRIBUTES( + nLength: sizeof(SECURITY_ATTRIBUTES).cint, + bInheritHandle: 1 + ) + var event = createEvent(addr(sa), 0'i32, 0'i32, nil) + if event == INVALID_HANDLE_VALUE: + raiseOSError(osLastError()) + result = cast[AsyncEvent](allocShared0(sizeof(AsyncEventImpl))) + result.hEvent = event + + proc trigger*(ev: AsyncEvent) = + ## Set event `ev` to signaled state. + if setEvent(ev.hEvent) == 0: + raiseOSError(osLastError()) + + proc unregister*(ev: AsyncEvent) = + ## Unregisters event `ev`. + doAssert(ev.hWaiter != 0, "Event is not registered in the queue!") + let p = getGlobalDispatcher() + p.handles.excl(AsyncFD(ev.hEvent)) + if unregisterWait(ev.hWaiter) == 0: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + raiseOSError(err) + ev.hWaiter = 0 + + proc close*(ev: AsyncEvent) = + ## Closes event `ev`. + let res = closeHandle(ev.hEvent) + deallocShared(cast[pointer](ev)) + if res == 0: + raiseOSError(osLastError()) + + proc addEvent*(ev: AsyncEvent, cb: Callback) = + ## Registers callback `cb` to be called when `ev` will be signaled + doAssert(ev.hWaiter == 0, "Event is already registered in the queue!") + + let p = getGlobalDispatcher() + let hEvent = ev.hEvent + + var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData))) + var flags = WT_EXECUTEINWAITTHREAD.DWORD + + proc eventcb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = + if ev.hWaiter != 0: + if cb(fd): + # we need this check to avoid exception, if `unregister(event)` was + # called in callback. + deallocShared(cast[pointer](pcd)) + if ev.hWaiter != 0: + unregister(ev) + else: + # if callback returned `false`, then it wants to be called again, so + # we need to ref and protect `pcd.ovl` again, because it will be + # unrefed and disposed in `poll()`. + GC_ref(pcd.ovl) + pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb)) + else: + # if ev.hWaiter == 0, then event was unregistered before `poll()` call. + deallocShared(cast[pointer](pcd)) + + registerWaitableHandle(p, hEvent, flags, pcd, INFINITE, eventcb) + ev.hWaiter = pcd.waitFd + initAll() else: - import selectors - when defined(windows): - import winlean - const - EINTR = WSAEINPROGRESS - EINPROGRESS = WSAEINPROGRESS - EWOULDBLOCK = WSAEWOULDBLOCK - EAGAIN = EINPROGRESS - MSG_NOSIGNAL = 0 - else: - from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK, - MSG_NOSIGNAL - + import std/selectors + from std/posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK, + MSG_NOSIGNAL + when declared(posix.accept4): + from std/posix import accept4, SOCK_CLOEXEC + when defined(genode): + import genode/env # get the implicit Genode env + import genode/signals + + const + InitCallbackListSize = 4 # initial size of callbacks sequence, + # associated with file/socket descriptor. + InitDelayedCallbackListSize = 64 # initial size of delayed callbacks + # queue. type AsyncFD* = distinct cint - Callback = proc (fd: AsyncFD): bool {.closure,gcsafe.} + Callback* = proc (fd: AsyncFD): bool {.closure, gcsafe.} - PData* = ref object of RootRef - fd: AsyncFD - readCBs: seq[Callback] - writeCBs: seq[Callback] + AsyncData = object + readList: seq[Callback] + writeList: seq[Callback] + + AsyncEvent* = distinct SelectEvent PDispatcher* = ref object of PDispatcherBase - selector: Selector - {.deprecated: [TAsyncFD: AsyncFD, TCallback: Callback].} + selector: Selector[AsyncData] + when defined(genode): + signalHandler: SignalHandler proc `==`*(x, y: AsyncFD): bool {.borrow.} + proc `==`*(x, y: AsyncEvent): bool {.borrow.} + + template newAsyncData(): AsyncData = + AsyncData( + readList: newSeqOfCap[Callback](InitCallbackListSize), + writeList: newSeqOfCap[Callback](InitCallbackListSize) + ) - proc newDispatcher*(): PDispatcher = + proc newDispatcher*(): owned(PDispatcher) = new result - result.selector = newSelector() - result.timers = @[] + result.selector = newSelector[AsyncData]() + result.timers.clear() + result.callbacks = initDeque[proc () {.closure, gcsafe.}](InitDelayedCallbackListSize) + when defined(genode): + let entrypoint = ep(cast[GenodeEnv](runtimeEnv)) + result.signalHandler = newSignalHandler(entrypoint): + discard runOnce(0) + + var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher + + when defined(nuttx): + import std/exitprocs + + proc cleanDispatcher() {.noconv.} = + gDisp = nil + + proc addFinalyzer() = + addExitProc(cleanDispatcher) + + proc setGlobalDispatcher*(disp: owned PDispatcher) = + if not gDisp.isNil: + assert gDisp.callbacks.len == 0 + gDisp = disp + initCallSoonProc() - var gDisp{.threadvar.}: PDispatcher ## Global dispatcher proc getGlobalDispatcher*(): PDispatcher = - if gDisp.isNil: gDisp = newDispatcher() + if gDisp.isNil: + setGlobalDispatcher(newDispatcher()) + when defined(nuttx): + addFinalyzer() result = gDisp - proc update(fd: AsyncFD, events: set[Event]) = - let p = getGlobalDispatcher() - assert fd.SocketHandle in p.selector - p.selector.update(fd.SocketHandle, events) + proc getIoHandler*(disp: PDispatcher): Selector[AsyncData] = + return disp.selector proc register*(fd: AsyncFD) = let p = getGlobalDispatcher() - var data = PData(fd: fd, readCBs: @[], writeCBs: @[]) - p.selector.register(fd.SocketHandle, {}, data.RootRef) - - proc newAsyncNativeSocket*(domain: cint, sockType: cint, - protocol: cint): AsyncFD = - result = newNativeSocket(domain, sockType, protocol).AsyncFD - result.SocketHandle.setBlocking(false) - when defined(macosx): - result.SocketHandle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1) - register(result) - - proc newAsyncNativeSocket*(domain: Domain = AF_INET, - sockType: SockType = SOCK_STREAM, - protocol: Protocol = IPPROTO_TCP): AsyncFD = - result = newNativeSocket(domain, sockType, protocol).AsyncFD - result.SocketHandle.setBlocking(false) - when defined(macosx): - result.SocketHandle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1) - register(result) - - proc closeSocket*(sock: AsyncFD) = - let disp = getGlobalDispatcher() - disp.selector.unregister(sock.SocketHandle) - sock.SocketHandle.close() + var data = newAsyncData() + p.selector.registerHandle(fd.SocketHandle, {}, data) proc unregister*(fd: AsyncFD) = getGlobalDispatcher().selector.unregister(fd.SocketHandle) + proc unregister*(ev: AsyncEvent) = + getGlobalDispatcher().selector.unregister(SelectEvent(ev)) + + proc contains*(disp: PDispatcher, fd: AsyncFD): bool = + return fd.SocketHandle in disp.selector + proc addRead*(fd: AsyncFD, cb: Callback) = let p = getGlobalDispatcher() - if fd.SocketHandle notin p.selector: + var newEvents = {Event.Read} + withData(p.selector, fd.SocketHandle, adata) do: + adata.readList.add(cb) + newEvents.incl(Event.Read) + if len(adata.writeList) != 0: newEvents.incl(Event.Write) + do: raise newException(ValueError, "File descriptor not registered.") - p.selector[fd.SocketHandle].data.PData.readCBs.add(cb) - update(fd, p.selector[fd.SocketHandle].events + {EvRead}) + p.selector.updateHandle(fd.SocketHandle, newEvents) proc addWrite*(fd: AsyncFD, cb: Callback) = let p = getGlobalDispatcher() - if fd.SocketHandle notin p.selector: + var newEvents = {Event.Write} + withData(p.selector, fd.SocketHandle, adata) do: + adata.writeList.add(cb) + newEvents.incl(Event.Write) + if len(adata.readList) != 0: newEvents.incl(Event.Read) + do: raise newException(ValueError, "File descriptor not registered.") - p.selector[fd.SocketHandle].data.PData.writeCBs.add(cb) - update(fd, p.selector[fd.SocketHandle].events + {EvWrite}) + p.selector.updateHandle(fd.SocketHandle, newEvents) - proc poll*(timeout = 500) = + proc hasPendingOperations*(): bool = let p = getGlobalDispatcher() - for info in p.selector.select(timeout): - let data = PData(info.key.data) - assert data.fd == info.key.fd.AsyncFD - #echo("In poll ", data.fd.cint) - # There may be EvError here, but we handle them in callbacks, - # so that exceptions can be raised from `send(...)` and - # `recv(...)` routines. - - if EvRead in info.events: - # Callback may add items to ``data.readCBs`` which causes issues if - # we are iterating over ``data.readCBs`` at the same time. We therefore - # make a copy to iterate over. - let currentCBs = data.readCBs - data.readCBs = @[] - for cb in currentCBs: - if not cb(data.fd): - # Callback wants to be called again. - data.readCBs.add(cb) - - if EvWrite in info.events: - let currentCBs = data.writeCBs - data.writeCBs = @[] - for cb in currentCBs: - if not cb(data.fd): - # Callback wants to be called again. - data.writeCBs.add(cb) - - if info.key in p.selector: - var newEvents: set[Event] - if data.readCBs.len != 0: newEvents = {EvRead} - if data.writeCBs.len != 0: newEvents = newEvents + {EvWrite} - if newEvents != info.key.events: - update(data.fd, newEvents) + not p.selector.isEmpty() or p.timers.len != 0 or p.callbacks.len != 0 + + proc prependSeq(dest: var seq[Callback]; src: sink seq[Callback]) = + var old = move dest + dest = src + for i in 0..high(old): + dest.add(move old[i]) + + proc processBasicCallbacks( + fd: AsyncFD, event: Event + ): tuple[readCbListCount, writeCbListCount: int] = + # Process pending descriptor and AsyncEvent callbacks. + # + # Invoke every callback stored in `rwlist`, until one + # returns `false` (which means callback wants to stay + # alive). In such case all remaining callbacks will be added + # to `rwlist` again, in the order they have been inserted. + # + # `rwlist` associated with file descriptor MUST BE emptied before + # dispatching callback (See https://github.com/nim-lang/Nim/issues/5128), + # or it can be possible to fall into endless cycle. + var curList: seq[Callback] + + let selector = getGlobalDispatcher().selector + withData(selector, fd.int, fdData): + case event + of Event.Read: + #shallowCopy(curList, fdData.readList) + curList = move fdData.readList + fdData.readList = newSeqOfCap[Callback](InitCallbackListSize) + of Event.Write: + #shallowCopy(curList, fdData.writeList) + curList = move fdData.writeList + fdData.writeList = newSeqOfCap[Callback](InitCallbackListSize) else: - # FD no longer a part of the selector. Likely been closed - # (e.g. socket disconnected). - discard + assert false, "Cannot process callbacks for " & $event + + let newLength = max(len(curList), InitCallbackListSize) + var newList = newSeqOfCap[Callback](newLength) + + var eventsExtinguished = false + for cb in curList: + if eventsExtinguished: + newList.add(cb) + elif not cb(fd): + # Callback wants to be called again. + newList.add(cb) + # This callback has returned with EAGAIN, so we don't need to + # call any other callbacks as they are all waiting for the same event + # on the same fd. + # We do need to ensure they are called again though. + eventsExtinguished = true + + withData(selector, fd.int, fdData) do: + # Descriptor is still present in the queue. + case event + of Event.Read: prependSeq(fdData.readList, newList) + of Event.Write: prependSeq(fdData.writeList, newList) + else: + assert false, "Cannot process callbacks for " & $event + + result.readCbListCount = len(fdData.readList) + result.writeCbListCount = len(fdData.writeList) + do: + # Descriptor was unregistered in callback via `unregister()`. + result.readCbListCount = -1 + result.writeCbListCount = -1 + + proc processCustomCallbacks(p: PDispatcher; fd: AsyncFD) = + # Process pending custom event callbacks. Custom events are + # {Event.Timer, Event.Signal, Event.Process, Event.Vnode}. + # There can be only one callback registered with one descriptor, + # so there is no need to iterate over list. + var curList: seq[Callback] + + withData(p.selector, fd.int, adata) do: + curList = move adata.readList + adata.readList = newSeqOfCap[Callback](InitCallbackListSize) + + let newLength = len(curList) + var newList = newSeqOfCap[Callback](newLength) + + var cb = curList[0] + if not cb(fd): + newList.add(cb) + + withData(p.selector, fd.int, adata) do: + # descriptor still present in queue. + adata.readList = newList & adata.readList + if len(adata.readList) == 0: + # if no callbacks registered with descriptor, unregister it. + p.selector.unregister(fd.int) + do: + # descriptor was unregistered in callback via `unregister()`. + discard + + implementSetInheritable() - processTimers(p) + proc closeSocket*(sock: AsyncFD) = + let selector = getGlobalDispatcher().selector + if sock.SocketHandle notin selector: + raise newException(ValueError, "File descriptor not registered.") - proc connect*(socket: AsyncFD, address: string, port: Port, - domain = AF_INET): Future[void] = - var retFuture = newFuture[void]("connect") + let data = selector.getData(sock.SocketHandle) + sock.unregister() + sock.SocketHandle.close() + # We need to unblock the read and write callbacks which could still be + # waiting for the socket to become readable and/or writeable. + for cb in data.readList & data.writeList: + if not cb(sock): + raise newException( + ValueError, "Expecting async operations to stop when fd has closed." + ) + + proc runOnce(timeout: int): bool = + let p = getGlobalDispatcher() + if p.selector.isEmpty() and p.timers.len == 0 and p.callbacks.len == 0: + when defined(genode): + if timeout == 0: return + raise newException(ValueError, + "No handles or timers registered in dispatcher.") - proc cb(fd: AsyncFD): bool = - var ret = SocketHandle(fd).getSockOptInt(cint(SOL_SOCKET), cint(SO_ERROR)) - if ret == 0: - # We have connected. - retFuture.complete() - return true - elif ret == EINTR: - # interrupted, keep waiting - return false - else: - retFuture.fail(newException(OSError, osErrorMsg(OSErrorCode(ret)))) - return true + result = false + var keys: array[64, ReadyKey] + let nextTimer = processTimers(p, result) + var count = + p.selector.selectInto(adjustTimeout(p, timeout, nextTimer), keys) + for i in 0..<count: + let fd = keys[i].fd.AsyncFD + let events = keys[i].events + var (readCbListCount, writeCbListCount) = (0, 0) + + if Event.Read in events or events == {Event.Error}: + (readCbListCount, writeCbListCount) = + processBasicCallbacks(fd, Event.Read) + result = true + + if Event.Write in events or events == {Event.Error}: + (readCbListCount, writeCbListCount) = + processBasicCallbacks(fd, Event.Write) + result = true + + var isCustomEvent = false + if Event.User in events: + (readCbListCount, writeCbListCount) = + processBasicCallbacks(fd, Event.Read) + isCustomEvent = true + if readCbListCount == 0: + p.selector.unregister(fd.int) + result = true + + when ioselSupportedPlatform: + const customSet = {Event.Timer, Event.Signal, Event.Process, + Event.Vnode} + if (customSet * events) != {}: + isCustomEvent = true + processCustomCallbacks(p, fd) + result = true - assert getSockDomain(socket.SocketHandle) == domain - var aiList = getAddrInfo(address, port, domain) - var success = false - var lastError: OSErrorCode - var it = aiList - while it != nil: - var ret = connect(socket.SocketHandle, it.ai_addr, it.ai_addrlen.Socklen) - if ret == 0: - # Request to connect completed immediately. - success = true - retFuture.complete() - break - else: - lastError = osLastError() - if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS: - success = true - addWrite(socket, cb) - break - else: - success = false - it = it.ai_next + # because state `data` can be modified in callback we need to update + # descriptor events with currently registered callbacks. + if not isCustomEvent and (readCbListCount != -1 and writeCbListCount != -1): + var newEvents: set[Event] = {} + if readCbListCount > 0: incl(newEvents, Event.Read) + if writeCbListCount > 0: incl(newEvents, Event.Write) + p.selector.updateHandle(SocketHandle(fd), newEvents) - dealloc(aiList) - if not success: - retFuture.fail(newException(OSError, osErrorMsg(lastError))) - return retFuture + # Timer processing. + discard processTimers(p, result) + # Callback queue processing + processPendingCallbacks(p, result) proc recv*(socket: AsyncFD, size: int, - flags = {SocketFlag.SafeDisconn}): Future[string] = + flags = {SocketFlag.SafeDisconn}): owned(Future[string]) = var retFuture = newFuture[string]("recv") var readBuffer = newString(size) @@ -1110,11 +1463,12 @@ else: flags.toOSFlags()) if res < 0: let lastError = osLastError() - if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: + if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and + lastError.int32 != EAGAIN: if flags.isDisconnectionError(lastError): retFuture.complete("") else: - retFuture.fail(newException(OSError, osErrorMsg(lastError))) + retFuture.fail(newOSError(lastError)) else: result = false # We still want this callback to be called. elif res == 0: @@ -1128,8 +1482,8 @@ else: addRead(socket, cb) return retFuture - proc recvInto*(socket: AsyncFD, buf: cstring, size: int, - flags = {SocketFlag.SafeDisconn}): Future[int] = + proc recvInto*(socket: AsyncFD, buf: pointer, size: int, + flags = {SocketFlag.SafeDisconn}): owned(Future[int]) = var retFuture = newFuture[int]("recvInto") proc cb(sock: AsyncFD): bool = @@ -1138,11 +1492,12 @@ else: flags.toOSFlags()) if res < 0: let lastError = osLastError() - if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: + if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and + lastError.int32 != EAGAIN: if flags.isDisconnectionError(lastError): retFuture.complete(0) else: - retFuture.fail(newException(OSError, osErrorMsg(lastError))) + retFuture.fail(newOSError(lastError)) else: result = false # We still want this callback to be called. else: @@ -1152,25 +1507,27 @@ else: addRead(socket, cb) return retFuture - proc send*(socket: AsyncFD, data: string, - flags = {SocketFlag.SafeDisconn}): Future[void] = + proc send*(socket: AsyncFD, buf: pointer, size: int, + flags = {SocketFlag.SafeDisconn}): owned(Future[void]) = var retFuture = newFuture[void]("send") var written = 0 proc cb(sock: AsyncFD): bool = result = true - let netSize = data.len-written - var d = data.cstring + let netSize = size-written + var d = cast[cstring](buf) let res = send(sock.SocketHandle, addr d[written], netSize.cint, MSG_NOSIGNAL) if res < 0: let lastError = osLastError() - if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: + if lastError.int32 != EINTR and + lastError.int32 != EWOULDBLOCK and + lastError.int32 != EAGAIN: if flags.isDisconnectionError(lastError): retFuture.complete() else: - retFuture.fail(newException(OSError, osErrorMsg(lastError))) + retFuture.fail(newOSError(lastError)) else: result = false # We still want this callback to be called. else: @@ -1184,47 +1541,427 @@ else: addWrite(socket, cb) return retFuture - proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}): - Future[tuple[address: string, client: AsyncFD]] = + proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr, + saddrLen: SockLen, + flags = {SocketFlag.SafeDisconn}): owned(Future[void]) = + ## Sends `data` of size `size` in bytes to specified destination + ## (`saddr` of size `saddrLen` in bytes, using socket `socket`. + ## The returned future will complete once all data has been sent. + var retFuture = newFuture[void]("sendTo") + + # we will preserve address in our stack + var staddr: array[128, char] # SOCKADDR_STORAGE size is 128 bytes + var stalen = saddrLen + zeroMem(addr(staddr[0]), 128) + copyMem(addr(staddr[0]), saddr, saddrLen) + + proc cb(sock: AsyncFD): bool = + result = true + let res = sendto(sock.SocketHandle, data, size, MSG_NOSIGNAL, + cast[ptr SockAddr](addr(staddr[0])), stalen) + if res < 0: + let lastError = osLastError() + if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and + lastError.int32 != EAGAIN: + retFuture.fail(newOSError(lastError)) + else: + result = false # We still want this callback to be called. + else: + retFuture.complete() + + addWrite(socket, cb) + return retFuture + + proc recvFromInto*(socket: AsyncFD, data: pointer, size: int, + saddr: ptr SockAddr, saddrLen: ptr SockLen, + flags = {SocketFlag.SafeDisconn}): owned(Future[int]) = + ## Receives a datagram data from `socket` into `data`, which must + ## be at least of size `size` in bytes, address of datagram's sender + ## will be stored into `saddr` and `saddrLen`. Returned future will + ## complete once one datagram has been received, and will return size + ## of packet received. + var retFuture = newFuture[int]("recvFromInto") + proc cb(sock: AsyncFD): bool = + result = true + let res = recvfrom(sock.SocketHandle, data, size.cint, flags.toOSFlags(), + saddr, saddrLen) + if res < 0: + let lastError = osLastError() + if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and + lastError.int32 != EAGAIN: + retFuture.fail(newOSError(lastError)) + else: + result = false + else: + retFuture.complete(res) + addRead(socket, cb) + return retFuture + + proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}, + inheritable = defined(nimInheritHandles)): + owned(Future[tuple[address: string, client: AsyncFD]]) = var retFuture = newFuture[tuple[address: string, client: AsyncFD]]("acceptAddr") - proc cb(sock: AsyncFD): bool = + proc cb(sock: AsyncFD): bool {.gcsafe.} = result = true var sockAddress: Sockaddr_storage - var addrLen = sizeof(sockAddress).Socklen - var client = accept(sock.SocketHandle, - cast[ptr SockAddr](addr(sockAddress)), addr(addrLen)) + var addrLen = sizeof(sockAddress).SockLen + var client = + when declared(accept4): + accept4(sock.SocketHandle, cast[ptr SockAddr](addr(sockAddress)), + addr(addrLen), if inheritable: 0 else: SOCK_CLOEXEC) + else: + accept(sock.SocketHandle, cast[ptr SockAddr](addr(sockAddress)), + addr(addrLen)) + when declared(setInheritable) and not declared(accept4): + if client != osInvalidSocket and not setInheritable(client, inheritable): + # Set failure first because close() itself can fail, + # altering osLastError(). + retFuture.fail(newOSError(osLastError())) + close client + return false + if client == osInvalidSocket: let lastError = osLastError() - assert lastError.int32 notin {EWOULDBLOCK, EAGAIN} + assert lastError.int32 != EWOULDBLOCK and lastError.int32 != EAGAIN if lastError.int32 == EINTR: return false else: if flags.isDisconnectionError(lastError): return false else: - retFuture.fail(newException(OSError, osErrorMsg(lastError))) + retFuture.fail(newOSError(lastError)) else: - register(client.AsyncFD) - retFuture.complete((getAddrString(cast[ptr SockAddr](addr sockAddress)), client.AsyncFD)) + try: + let address = getAddrString(cast[ptr SockAddr](addr sockAddress)) + register(client.AsyncFD) + retFuture.complete((address, client.AsyncFD)) + except: + # getAddrString may raise + client.close() + retFuture.fail(getCurrentException()) addRead(socket, cb) return retFuture -proc sleepAsync*(ms: int): Future[void] = + when ioselSupportedPlatform: + + proc addTimer*(timeout: int, oneshot: bool, cb: Callback) = + ## Start watching for timeout expiration, and then call the + ## callback `cb`. + ## `timeout` - time in milliseconds, + ## `oneshot` - if `true` only one event will be dispatched, + ## if `false` continuous events every `timeout` milliseconds. + let p = getGlobalDispatcher() + var data = newAsyncData() + data.readList.add(cb) + p.selector.registerTimer(timeout, oneshot, data) + + proc addSignal*(signal: int, cb: Callback) = + ## Start watching signal `signal`, and when signal appears, call the + ## callback `cb`. + let p = getGlobalDispatcher() + var data = newAsyncData() + data.readList.add(cb) + p.selector.registerSignal(signal, data) + + proc addProcess*(pid: int, cb: Callback) = + ## Start watching for process exit with pid `pid`, and then call + ## the callback `cb`. + let p = getGlobalDispatcher() + var data = newAsyncData() + data.readList.add(cb) + p.selector.registerProcess(pid, data) + + proc newAsyncEvent*(): AsyncEvent = + ## Creates new `AsyncEvent`. + result = AsyncEvent(newSelectEvent()) + + proc trigger*(ev: AsyncEvent) = + ## Sets new `AsyncEvent` to signaled state. + trigger(SelectEvent(ev)) + + proc close*(ev: AsyncEvent) = + ## Closes `AsyncEvent` + close(SelectEvent(ev)) + + proc addEvent*(ev: AsyncEvent, cb: Callback) = + ## Start watching for event `ev`, and call callback `cb`, when + ## ev will be set to signaled state. + let p = getGlobalDispatcher() + var data = newAsyncData() + data.readList.add(cb) + p.selector.registerEvent(SelectEvent(ev), data) + +proc drain*(timeout = 500) = + ## Waits for completion of **all** events and processes them. Raises `ValueError` + ## if there are no pending operations. In contrast to `poll` this + ## processes as many events as are available until the timeout has elapsed. + var curTimeout = timeout + let start = now() + while hasPendingOperations(): + discard runOnce(curTimeout) + curTimeout -= (now() - start).inMilliseconds.int + if curTimeout < 0: + break + +proc poll*(timeout = 500) = + ## Waits for completion events and processes them. Raises `ValueError` + ## if there are no pending operations. This runs the underlying OS + ## `epoll`:idx: or `kqueue`:idx: primitive only once. + discard runOnce(timeout) + +template createAsyncNativeSocketImpl(domain, sockType, protocol: untyped, + inheritable = defined(nimInheritHandles)) = + let handle = createNativeSocket(domain, sockType, protocol, inheritable) + if handle == osInvalidSocket: + return osInvalidSocket.AsyncFD + handle.setBlocking(false) + when defined(macosx) and not defined(nimdoc): + handle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1) + result = handle.AsyncFD + register(result) + +proc createAsyncNativeSocket*(domain: cint, sockType: cint, + protocol: cint, + inheritable = defined(nimInheritHandles)): AsyncFD = + createAsyncNativeSocketImpl(domain, sockType, protocol, inheritable) + +proc createAsyncNativeSocket*(domain: Domain = Domain.AF_INET, + sockType: SockType = SOCK_STREAM, + protocol: Protocol = IPPROTO_TCP, + inheritable = defined(nimInheritHandles)): AsyncFD = + createAsyncNativeSocketImpl(domain, sockType, protocol, inheritable) + +when defined(windows) or defined(nimdoc): + proc bindToDomain(handle: SocketHandle, domain: Domain) = + # Extracted into a separate proc, because connect() on Windows requires + # the socket to be initially bound. + template doBind(saddr) = + if bindAddr(handle, cast[ptr SockAddr](addr(saddr)), + sizeof(saddr).SockLen) < 0'i32: + raiseOSError(osLastError()) + + if domain == Domain.AF_INET6: + var saddr: Sockaddr_in6 + saddr.sin6_family = uint16(toInt(domain)) + doBind(saddr) + else: + var saddr: Sockaddr_in + saddr.sin_family = uint16(toInt(domain)) + doBind(saddr) + + proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): owned(Future[void]) = + let retFuture = newFuture[void]("doConnect") + result = retFuture + + var ol = newCustom() + ol.data = CompletionData(fd: socket, cb: + proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = + if not retFuture.finished: + if errcode == OSErrorCode(-1): + const SO_UPDATE_CONNECT_CONTEXT = 0x7010 + socket.SocketHandle.setSockOptInt(SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, 1) # 15022 + retFuture.complete() + else: + retFuture.fail(newOSError(errcode)) + ) + + let ret = connectEx(socket.SocketHandle, addrInfo.ai_addr, + cint(addrInfo.ai_addrlen), nil, 0, nil, + cast[POVERLAPPED](ol)) + if ret: + # Request to connect completed immediately. + retFuture.complete() + # We don't deallocate `ol` here because even though this completed + # immediately poll will still be notified about its completion and it + # will free `ol`. + else: + let lastError = osLastError() + if lastError.int32 != ERROR_IO_PENDING: + # With ERROR_IO_PENDING `ol` will be deallocated in `poll`, + # and the future will be completed/failed there, too. + GC_unref(ol) + retFuture.fail(newOSError(lastError)) +else: + proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): owned(Future[void]) = + let retFuture = newFuture[void]("doConnect") + result = retFuture + + proc cb(fd: AsyncFD): bool = + let ret = SocketHandle(fd).getSockOptInt( + cint(SOL_SOCKET), cint(SO_ERROR)) + if ret == 0: + # We have connected. + retFuture.complete() + return true + elif ret == EINTR: + # interrupted, keep waiting + return false + else: + retFuture.fail(newOSError(OSErrorCode(ret))) + return true + + let ret = connect(socket.SocketHandle, + addrInfo.ai_addr, + addrInfo.ai_addrlen.SockLen) + if ret == 0: + # Request to connect completed immediately. + retFuture.complete() + else: + let lastError = osLastError() + if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS: + addWrite(socket, cb) + else: + retFuture.fail(newOSError(lastError)) + +template asyncAddrInfoLoop(addrInfo: ptr AddrInfo, fd: untyped, + protocol: Protocol = IPPROTO_RAW) = + ## Iterates through the AddrInfo linked list asynchronously + ## until the connection can be established. + const shouldCreateFd = not declared(fd) + + when shouldCreateFd: + let sockType = protocol.toSockType() + + var fdPerDomain: array[low(Domain).ord..high(Domain).ord, AsyncFD] + for i in low(fdPerDomain)..high(fdPerDomain): + fdPerDomain[i] = osInvalidSocket.AsyncFD + template closeUnusedFds(domainToKeep = -1) {.dirty.} = + for i, fd in fdPerDomain: + if fd != osInvalidSocket.AsyncFD and i != domainToKeep: + fd.closeSocket() + + var lastException: ref Exception + var curAddrInfo = addrInfo + var domain: Domain + when shouldCreateFd: + var curFd: AsyncFD + else: + var curFd = fd + proc tryNextAddrInfo(fut: Future[void]) {.gcsafe.} = + if fut == nil or fut.failed: + if fut != nil: + lastException = fut.readError() + + while curAddrInfo != nil: + let domainOpt = curAddrInfo.ai_family.toKnownDomain() + if domainOpt.isSome: + domain = domainOpt.unsafeGet() + break + curAddrInfo = curAddrInfo.ai_next + + if curAddrInfo == nil: + freeAddrInfo(addrInfo) + when shouldCreateFd: + closeUnusedFds() + if lastException != nil: + retFuture.fail(lastException) + else: + retFuture.fail(newException( + IOError, "Couldn't resolve address: " & address)) + return + + when shouldCreateFd: + curFd = fdPerDomain[ord(domain)] + if curFd == osInvalidSocket.AsyncFD: + try: + curFd = createAsyncNativeSocket(domain, sockType, protocol) + except: + freeAddrInfo(addrInfo) + closeUnusedFds() + raise getCurrentException() + when defined(windows): + curFd.SocketHandle.bindToDomain(domain) + fdPerDomain[ord(domain)] = curFd + + doConnect(curFd, curAddrInfo).callback = tryNextAddrInfo + curAddrInfo = curAddrInfo.ai_next + else: + freeAddrInfo(addrInfo) + when shouldCreateFd: + closeUnusedFds(ord(domain)) + retFuture.complete(curFd) + else: + retFuture.complete() + + tryNextAddrInfo(nil) + +proc dial*(address: string, port: Port, + protocol: Protocol = IPPROTO_TCP): owned(Future[AsyncFD]) = + ## Establishes connection to the specified `address`:`port` pair via the + ## specified protocol. The procedure iterates through possible + ## resolutions of the `address` until it succeeds, meaning that it + ## seamlessly works with both IPv4 and IPv6. + ## Returns the async file descriptor, registered in the dispatcher of + ## the current thread, ready to send or receive data. + let retFuture = newFuture[AsyncFD]("dial") + result = retFuture + let sockType = protocol.toSockType() + + let aiList = getAddrInfo(address, port, Domain.AF_UNSPEC, sockType, protocol) + asyncAddrInfoLoop(aiList, noFD, protocol) + +proc connect*(socket: AsyncFD, address: string, port: Port, + domain = Domain.AF_INET): owned(Future[void]) = + let retFuture = newFuture[void]("connect") + result = retFuture + + when defined(windows): + verifyPresence(socket) + else: + assert getSockDomain(socket.SocketHandle) == domain + + let aiList = getAddrInfo(address, port, domain) + when defined(windows): + socket.SocketHandle.bindToDomain(domain) + asyncAddrInfoLoop(aiList, socket) + +proc sleepAsync*(ms: int | float): owned(Future[void]) = ## Suspends the execution of the current async procedure for the next - ## ``ms`` milliseconds. + ## `ms` milliseconds. var retFuture = newFuture[void]("sleepAsync") let p = getGlobalDispatcher() - p.timers.add((epochTime() + (ms / 1000), retFuture)) + when ms is int: + p.timers.push((getMonoTime() + initDuration(milliseconds = ms), retFuture)) + elif ms is float: + let ns = (ms * 1_000_000).int64 + p.timers.push((getMonoTime() + initDuration(nanoseconds = ns), retFuture)) + return retFuture + +proc withTimeout*[T](fut: Future[T], timeout: int): owned(Future[bool]) = + ## Returns a future which will complete once `fut` completes or after + ## `timeout` milliseconds has elapsed. + ## + ## If `fut` completes first the returned future will hold true, + ## otherwise, if `timeout` milliseconds has elapsed first, the returned + ## future will hold false. + + var retFuture = newFuture[bool]("asyncdispatch.`withTimeout`") + var timeoutFuture = sleepAsync(timeout) + fut.callback = + proc () = + if not retFuture.finished: + if fut.failed: + retFuture.fail(fut.error) + else: + retFuture.complete(true) + timeoutFuture.callback = + proc () = + if not retFuture.finished: retFuture.complete(false) return retFuture proc accept*(socket: AsyncFD, - flags = {SocketFlag.SafeDisconn}): Future[AsyncFD] = + flags = {SocketFlag.SafeDisconn}, + inheritable = defined(nimInheritHandles)): owned(Future[AsyncFD]) = ## Accepts a new connection. Returns a future containing the client socket ## corresponding to that connection. + ## + ## If `inheritable` is false (the default), the resulting client socket + ## will not be inheritable by child processes. + ## ## The future will complete when the connection is successfully accepted. var retFut = newFuture[AsyncFD]("accept") - var fut = acceptAddr(socket, flags) + var fut = acceptAddr(socket, flags, inheritable) fut.callback = proc (future: Future[tuple[address: string, client: AsyncFD]]) = assert future.finished @@ -1234,382 +1971,45 @@ proc accept*(socket: AsyncFD, retFut.complete(future.read.client) return retFut -# -- Await Macro - -proc skipUntilStmtList(node: NimNode): NimNode {.compileTime.} = - # Skips a nest of StmtList's. - result = node - if node[0].kind == nnkStmtList: - result = skipUntilStmtList(node[0]) - -proc skipStmtList(node: NimNode): NimNode {.compileTime.} = - result = node - if node[0].kind == nnkStmtList: - result = node[0] - -template createCb(retFutureSym, iteratorNameSym, - name: expr): stmt {.immediate.} = - var nameIterVar = iteratorNameSym - #{.push stackTrace: off.} - proc cb {.closure,gcsafe.} = - try: - if not nameIterVar.finished: - var next = nameIterVar() - if next == nil: - assert retFutureSym.finished, "Async procedure's (" & - name & ") return Future was not finished." - else: - next.callback = cb - except: - if retFutureSym.finished: - # Take a look at tasyncexceptions for the bug which this fixes. - # That test explains it better than I can here. - raise - else: - retFutureSym.fail(getCurrentException()) - cb() - #{.pop.} -proc generateExceptionCheck(futSym, - tryStmt, rootReceiver, fromNode: NimNode): NimNode {.compileTime.} = - if tryStmt.kind == nnkNilLit: - result = rootReceiver - else: - var exceptionChecks: seq[tuple[cond, body: NimNode]] = @[] - let errorNode = newDotExpr(futSym, newIdentNode("error")) - for i in 1 .. <tryStmt.len: - let exceptBranch = tryStmt[i] - if exceptBranch[0].kind == nnkStmtList: - exceptionChecks.add((newIdentNode("true"), exceptBranch[0])) - else: - var exceptIdentCount = 0 - var ifCond: NimNode - for i in 0 .. <exceptBranch.len: - let child = exceptBranch[i] - if child.kind == nnkIdent: - let cond = infix(errorNode, "of", child) - if exceptIdentCount == 0: - ifCond = cond - else: - ifCond = infix(ifCond, "or", cond) - else: - break - exceptIdentCount.inc - - expectKind(exceptBranch[exceptIdentCount], nnkStmtList) - exceptionChecks.add((ifCond, exceptBranch[exceptIdentCount])) - # -> -> else: raise futSym.error - exceptionChecks.add((newIdentNode("true"), - newNimNode(nnkRaiseStmt).add(errorNode))) - # Read the future if there is no error. - # -> else: futSym.read - let elseNode = newNimNode(nnkElse, fromNode) - elseNode.add newNimNode(nnkStmtList, fromNode) - elseNode[0].add rootReceiver - - let ifBody = newStmtList() - ifBody.add newCall(newIdentNode("setCurrentException"), errorNode) - ifBody.add newIfStmt(exceptionChecks) - ifBody.add newCall(newIdentNode("setCurrentException"), newNilLit()) - - result = newIfStmt( - (newDotExpr(futSym, newIdentNode("failed")), ifBody) - ) - result.add elseNode - -template createVar(result: var NimNode, futSymName: string, - asyncProc: NimNode, - valueReceiver, rootReceiver: expr, - fromNode: NimNode) = - result = newNimNode(nnkStmtList, fromNode) - var futSym = genSym(nskVar, "future") - result.add newVarStmt(futSym, asyncProc) # -> var future<x> = y - result.add newNimNode(nnkYieldStmt, fromNode).add(futSym) # -> yield future<x> - valueReceiver = newDotExpr(futSym, newIdentNode("read")) # -> future<x>.read - result.add generateExceptionCheck(futSym, tryStmt, rootReceiver, fromNode) - -proc processBody(node, retFutureSym: NimNode, - subTypeIsVoid: bool, - tryStmt: NimNode): NimNode {.compileTime.} = - #echo(node.treeRepr) - result = node - case node.kind - of nnkReturnStmt: - result = newNimNode(nnkStmtList, node) - if node[0].kind == nnkEmpty: - if not subTypeIsVoid: - result.add newCall(newIdentNode("complete"), retFutureSym, - newIdentNode("result")) - else: - result.add newCall(newIdentNode("complete"), retFutureSym) - else: - result.add newCall(newIdentNode("complete"), retFutureSym, - node[0].processBody(retFutureSym, subTypeIsVoid, tryStmt)) - - result.add newNimNode(nnkReturnStmt, node).add(newNilLit()) - return # Don't process the children of this return stmt - of nnkCommand, nnkCall: - if node[0].kind == nnkIdent and node[0].ident == !"await": - case node[1].kind - of nnkIdent, nnkInfix: - # await x - result = newNimNode(nnkYieldStmt, node).add(node[1]) # -> yield x - of nnkCall, nnkCommand: - # await foo(p, x) - var futureValue: NimNode - result.createVar("future" & $node[1][0].toStrLit, node[1], futureValue, - futureValue, node) - else: - error("Invalid node kind in 'await', got: " & $node[1].kind) - elif node.len > 1 and node[1].kind == nnkCommand and - node[1][0].kind == nnkIdent and node[1][0].ident == !"await": - # foo await x - var newCommand = node - result.createVar("future" & $node[0].toStrLit, node[1][1], newCommand[1], - newCommand, node) - - of nnkVarSection, nnkLetSection: - case node[0][2].kind - of nnkCommand: - if node[0][2][0].kind == nnkIdent and node[0][2][0].ident == !"await": - # var x = await y - var newVarSection = node # TODO: Should this use copyNimNode? - result.createVar("future" & $node[0][0].ident, node[0][2][1], - newVarSection[0][2], newVarSection, node) - else: discard - of nnkAsgn: - case node[1].kind - of nnkCommand: - if node[1][0].ident == !"await": - # x = await y - var newAsgn = node - result.createVar("future" & $node[0].toStrLit, node[1][1], newAsgn[1], newAsgn, node) - else: discard - of nnkDiscardStmt: - # discard await x - if node[0].kind == nnkCommand and node[0][0].kind == nnkIdent and - node[0][0].ident == !"await": - var newDiscard = node - result.createVar("futureDiscard_" & $toStrLit(node[0][1]), node[0][1], - newDiscard[0], newDiscard, node) - of nnkTryStmt: - # try: await x; except: ... - result = newNimNode(nnkStmtList, node) - template wrapInTry(n, tryBody: expr) = - var temp = n - n[0] = tryBody - tryBody = temp - - # Transform ``except`` body. - # TODO: Could we perform some ``await`` transformation here to get it - # working in ``except``? - tryBody[1] = processBody(n[1], retFutureSym, subTypeIsVoid, nil) - - proc processForTry(n: NimNode, i: var int, - res: NimNode): bool {.compileTime.} = - ## Transforms the body of the tryStmt. Does not transform the - ## body in ``except``. - ## Returns true if the tryStmt node was transformed into an ifStmt. - result = false - var skipped = n.skipStmtList() - while i < skipped.len: - var processed = processBody(skipped[i], retFutureSym, - subTypeIsVoid, n) - - # Check if we transformed the node into an exception check. - # This suggests skipped[i] contains ``await``. - if processed.kind != skipped[i].kind or processed.len != skipped[i].len: - processed = processed.skipUntilStmtList() - expectKind(processed, nnkStmtList) - expectKind(processed[2][1], nnkElse) - i.inc - - if not processForTry(n, i, processed[2][1][0]): - # We need to wrap the nnkElse nodes back into a tryStmt. - # As they are executed if an exception does not happen - # inside the awaited future. - # The following code will wrap the nodes inside the - # original tryStmt. - wrapInTry(n, processed[2][1][0]) - - res.add processed - result = true +proc keepAlive(x: string) = + discard "mark 'x' as escaping so that it is put into a closure for us to keep the data alive" + +proc send*(socket: AsyncFD, data: string, + flags = {SocketFlag.SafeDisconn}): owned(Future[void]) = + ## Sends `data` to `socket`. The returned future will complete once all + ## data has been sent. + var retFuture = newFuture[void]("send") + if data.len > 0: + let sendFut = socket.send(unsafeAddr data[0], data.len, flags) + sendFut.callback = + proc () = + keepAlive(data) + if sendFut.failed: + retFuture.fail(sendFut.error) else: - res.add skipped[i] - i.inc - var i = 0 - if not processForTry(node, i, result): - # If the tryStmt hasn't been transformed we can just put the body - # back into it. - wrapInTry(node, result) - return - else: discard - - for i in 0 .. <result.len: - result[i] = processBody(result[i], retFutureSym, subTypeIsVoid, nil) - -proc getName(node: NimNode): string {.compileTime.} = - case node.kind - of nnkPostfix: - return $node[1].ident - of nnkIdent: - return $node.ident - of nnkEmpty: - return "anonymous" - else: - error("Unknown name.") - -proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} = - ## This macro transforms a single procedure into a closure iterator. - ## The ``async`` macro supports a stmtList holding multiple async procedures. - if prc.kind notin {nnkProcDef, nnkLambda}: - error("Cannot transform this node kind into an async proc." & - " Proc definition or lambda node expected.") - - hint("Processing " & prc[0].getName & " as an async proc.") - - let returnType = prc[3][0] - var baseType: NimNode - # Verify that the return type is a Future[T] - if returnType.kind == nnkBracketExpr: - let fut = repr(returnType[0]) - if fut != "Future": - error("Expected return type of 'Future' got '" & fut & "'") - baseType = returnType[1] - elif returnType.kind in nnkCallKinds and $returnType[0] == "[]": - let fut = repr(returnType[1]) - if fut != "Future": - error("Expected return type of 'Future' got '" & fut & "'") - baseType = returnType[2] - elif returnType.kind == nnkEmpty: - baseType = returnType - else: - error("Expected return type of 'Future' got '" & repr(returnType) & "'") - - let subtypeIsVoid = returnType.kind == nnkEmpty or - (baseType.kind == nnkIdent and returnType[1].ident == !"void") - - var outerProcBody = newNimNode(nnkStmtList, prc[6]) - - # -> var retFuture = newFuture[T]() - var retFutureSym = genSym(nskVar, "retFuture") - var subRetType = - if returnType.kind == nnkEmpty: newIdentNode("void") - else: baseType - outerProcBody.add( - newVarStmt(retFutureSym, - newCall( - newNimNode(nnkBracketExpr, prc[6]).add( - newIdentNode(!"newFuture"), # TODO: Strange bug here? Remove the `!`. - subRetType), - newLit(prc[0].getName)))) # Get type from return type of this proc - - # -> iterator nameIter(): FutureBase {.closure.} = - # -> {.push warning[resultshadowed]: off.} - # -> var result: T - # -> {.pop.} - # -> <proc_body> - # -> complete(retFuture, result) - var iteratorNameSym = genSym(nskIterator, $prc[0].getName & "Iter") - var procBody = prc[6].processBody(retFutureSym, subtypeIsVoid, nil) - if not subtypeIsVoid: - procBody.insert(0, newNimNode(nnkPragma).add(newIdentNode("push"), - newNimNode(nnkExprColonExpr).add(newNimNode(nnkBracketExpr).add( - newIdentNode("warning"), newIdentNode("resultshadowed")), - newIdentNode("off")))) # -> {.push warning[resultshadowed]: off.} - - procBody.insert(1, newNimNode(nnkVarSection, prc[6]).add( - newIdentDefs(newIdentNode("result"), baseType))) # -> var result: T - - procBody.insert(2, newNimNode(nnkPragma).add( - newIdentNode("pop"))) # -> {.pop.}) - - procBody.add( - newCall(newIdentNode("complete"), - retFutureSym, newIdentNode("result"))) # -> complete(retFuture, result) - else: - # -> complete(retFuture) - procBody.add(newCall(newIdentNode("complete"), retFutureSym)) - - var closureIterator = newProc(iteratorNameSym, [newIdentNode("FutureBase")], - procBody, nnkIteratorDef) - closureIterator[4] = newNimNode(nnkPragma, prc[6]).add(newIdentNode("closure")) - outerProcBody.add(closureIterator) - - # -> createCb(retFuture) - #var cbName = newIdentNode("cb") - var procCb = newCall(bindSym"createCb", retFutureSym, iteratorNameSym, - newStrLitNode(prc[0].getName)) - outerProcBody.add procCb - - # -> return retFuture - outerProcBody.add newNimNode(nnkReturnStmt, prc[6][prc[6].len-1]).add(retFutureSym) - - result = prc - - # Remove the 'async' pragma. - for i in 0 .. <result[4].len: - if result[4][i].kind == nnkIdent and result[4][i].ident == !"async": - result[4].del(i) - if subtypeIsVoid: - # Add discardable pragma. - if returnType.kind == nnkEmpty: - # Add Future[void] - result[3][0] = parseExpr("Future[void]") - - result[6] = outerProcBody - - #echo(treeRepr(result)) - #if prc[0].getName == "hubConnectionLoop": - # echo(toStrLit(result)) - -macro async*(prc: stmt): stmt {.immediate.} = - ## Macro which processes async procedures into the appropriate - ## iterators and yield statements. - if prc.kind == nnkStmtList: - for oneProc in prc: - result = newStmtList() - result.add asyncSingleProc(oneProc) + retFuture.complete() else: - result = asyncSingleProc(prc) + retFuture.complete() -proc recvLine*(socket: AsyncFD): Future[string] {.async.} = - ## Reads a line of data from ``socket``. Returned future will complete once - ## a full line is read or an error occurs. - ## - ## If a full line is read ``\r\L`` is not - ## added to ``line``, however if solely ``\r\L`` is read then ``line`` - ## will be set to it. - ## - ## If the socket is disconnected, ``line`` will be set to ``""``. - ## - ## If the socket is disconnected in the middle of a line (before ``\r\L`` - ## is read) then line will be set to ``""``. - ## The partial line **will be lost**. - ## - ## **Warning**: This assumes that lines are delimited by ``\r\L``. - ## - ## **Note**: This procedure is mostly used for testing. You likely want to - ## use ``asyncnet.recvLine`` instead. + return retFuture - template addNLIfEmpty(): stmt = - if result.len == 0: - result.add("\c\L") +# -- Await Macro +import std/asyncmacro +export asyncmacro +proc readAll*(future: FutureStream[string]): owned(Future[string]) {.async.} = + ## Returns a future that will complete when all the string data from the + ## specified future stream is retrieved. result = "" - var c = "" while true: - c = await recv(socket, 1) - if c.len == 0: - return "" - if c == "\r": - c = await recv(socket, 1) - assert c == "\l" - addNLIfEmpty() - return - elif c == "\L": - addNLIfEmpty() - return - add(result, c) + let (hasValue, value) = await future.read() + if hasValue: + result.add(value) + else: + break + +proc callSoon(cbproc: proc () {.gcsafe.}) = + getGlobalDispatcher().callbacks.addLast(cbproc) proc runForever*() = ## Begins a never ending global dispatcher poll loop. @@ -1622,3 +2022,44 @@ proc waitFor*[T](fut: Future[T]): T = poll() fut.read + +proc activeDescriptors*(): int {.inline.} = + ## Returns the current number of active file descriptors for the current + ## event loop. This is a cheap operation that does not involve a system call. + when defined(windows): + result = getGlobalDispatcher().handles.len + elif not defined(nimdoc): + result = getGlobalDispatcher().selector.count + +when defined(posix): + import std/posix + +when defined(linux) or defined(windows) or defined(macosx) or defined(bsd) or + defined(solaris) or defined(zephyr) or defined(freertos) or defined(nuttx) or defined(haiku): + proc maxDescriptors*(): int {.raises: OSError.} = + ## Returns the maximum number of active file descriptors for the current + ## process. This involves a system call. For now `maxDescriptors` is + ## supported on the following OSes: Windows, Linux, OSX, BSD, Solaris. + when defined(windows): + result = 16_700_000 + elif defined(zephyr) or defined(freertos): + result = FD_MAX + else: + var fdLim: RLimit + if getrlimit(RLIMIT_NOFILE, fdLim) < 0: + raiseOSError(osLastError()) + result = int(fdLim.rlim_cur) - 1 + +when defined(genode): + proc scheduleCallbacks*(): bool {.discardable.} = + ## *Genode only.* + ## Schedule callback processing and return immediately. + ## Returns `false` if there is nothing to schedule. + ## RPC servers should call this to dispatch `callSoon` + ## bodies after retiring an RPC to its client. + ## This is effectively a non-blocking `poll(…)` and is + ## equivalent to scheduling a momentary no-op timeout + ## but faster and with less overhead. + let dis = getGlobalDispatcher() + result = dis.callbacks.len > 0 + if result: submit(dis.signalHandler.cap) |