summary refs log tree commit diff stats
path: root/lib/pure/asyncdispatch.nim
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pure/asyncdispatch.nim')
-rw-r--r--lib/pure/asyncdispatch.nim2667
1 files changed, 1554 insertions, 1113 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim
index cc337452f..126db7a7f 100644
--- a/lib/pure/asyncdispatch.nim
+++ b/lib/pure/asyncdispatch.nim
@@ -7,39 +7,26 @@
 #    distribution, for details about the copyright.
 #
 
-include "system/inclrtl"
-
-import os, oids, tables, strutils, macros, times
-
-import nativesockets, net
-
-export Port, SocketFlag
-
-#{.injectStmt: newGcInvariant().}
-
-## AsyncDispatch
-## *************
-##
 ## This module implements asynchronous IO. This includes a dispatcher,
-## a ``Future`` type implementation, and an ``async`` macro which allows
-## asynchronous code to be written in a synchronous style with the ``await``
+## a `Future` type implementation, and an `async` macro which allows
+## asynchronous code to be written in a synchronous style with the `await`
 ## keyword.
 ##
-## The dispatcher acts as a kind of event loop. You must call ``poll`` on it
-## (or a function which does so for you such as ``waitFor`` or ``runForever``)
+## The dispatcher acts as a kind of event loop. You must call `poll` on it
+## (or a function which does so for you such as `waitFor` or `runForever`)
 ## in order to poll for any outstanding events. The underlying implementation
 ## is based on epoll on Linux, IO Completion Ports on Windows and select on
 ## other operating systems.
 ##
-## The ``poll`` function will not, on its own, return any events. Instead
-## an appropriate ``Future`` object will be completed. A ``Future`` is a
+## The `poll` function will not, on its own, return any events. Instead
+## an appropriate `Future` object will be completed. A `Future` is a
 ## type which holds a value which is not yet available, but which *may* be
 ## available in the future. You can check whether a future is finished
-## by using the ``finished`` function. When a future is finished it means that
+## by using the `finished` function. When a future is finished it means that
 ## either the value that it holds is now available or it holds an error instead.
 ## The latter situation occurs when the operation to complete a future fails
 ## with an exception. You can distinguish between the two situations with the
-## ``failed`` function.
+## `failed` function.
 ##
 ## Future objects can also store a callback procedure which will be called
 ## automatically once the future completes.
@@ -48,49 +35,50 @@ export Port, SocketFlag
 ## pattern. In this
 ## pattern you make a request for an action, and once that action is fulfilled
 ## a future is completed with the result of that action. Requests can be
-## made by calling the appropriate functions. For example: calling the ``recv``
+## made by calling the appropriate functions. For example: calling the `recv`
 ## function will create a request for some data to be read from a socket. The
-## future which the ``recv`` function returns will then complete once the
+## future which the `recv` function returns will then complete once the
 ## requested amount of data is read **or** an exception occurs.
 ##
 ## Code to read some data from a socket may look something like this:
+##   ```Nim
+##   var future = socket.recv(100)
+##   future.addCallback(
+##     proc () =
+##       echo(future.read)
+##   )
+##   ```
 ##
-##   .. code-block::nim
-##      var future = socket.recv(100)
-##      future.callback =
-##        proc () =
-##          echo(future.read)
-##
-## All asynchronous functions returning a ``Future`` will not block. They
+## All asynchronous functions returning a `Future` will not block. They
 ## will not however return immediately. An asynchronous function will have
 ## code which will be executed before an asynchronous request is made, in most
 ## cases this code sets up the request.
 ##
-## In the above example, the ``recv`` function will return a brand new
-## ``Future`` instance once the request for data to be read from the socket
-## is made. This ``Future`` instance will complete once the requested amount
+## In the above example, the `recv` function will return a brand new
+## `Future` instance once the request for data to be read from the socket
+## is made. This `Future` instance will complete once the requested amount
 ## of data is read, in this case it is 100 bytes. The second line sets a
 ## callback on this future which will be called once the future completes.
-## All the callback does is write the data stored in the future to ``stdout``.
-## The ``read`` function is used for this and it checks whether the future
-## completes with an error for you (if it did it will simply raise the
-## error), if there is no error however it returns the value of the future.
+## All the callback does is write the data stored in the future to `stdout`.
+## The `read` function is used for this and it checks whether the future
+## completes with an error for you (if it did, it will simply raise the
+## error), if there is no error, however, it returns the value of the future.
 ##
 ## Asynchronous procedures
-## -----------------------
+## =======================
 ##
 ## Asynchronous procedures remove the pain of working with callbacks. They do
 ## this by allowing you to write asynchronous code the same way as you would
 ## write synchronous code.
 ##
-## An asynchronous procedure is marked using the ``{.async.}`` pragma.
-## When marking a procedure with the ``{.async.}`` pragma it must have a
-## ``Future[T]`` return type or no return type at all. If you do not specify
-## a return type then ``Future[void]`` is assumed.
+## An asynchronous procedure is marked using the `{.async.}` pragma.
+## When marking a procedure with the `{.async.}` pragma it must have a
+## `Future[T]` return type or no return type at all. If you do not specify
+## a return type then `Future[void]` is assumed.
 ##
-## Inside asynchronous procedures ``await`` can be used to call any
+## Inside asynchronous procedures `await` can be used to call any
 ## procedures which return a
-## ``Future``; this includes asynchronous procedures. When a procedure is
+## `Future`; this includes asynchronous procedures. When a procedure is
 ## "awaited", the asynchronous procedure it is awaited in will
 ## suspend its execution
 ## until the awaited procedure's Future completes. At which point the
@@ -98,315 +86,293 @@ export Port, SocketFlag
 ## when an asynchronous procedure is suspended other asynchronous procedures
 ## will be run by the dispatcher.
 ##
-## The ``await`` call may be used in many contexts. It can be used on the right
-## hand side of a variable declaration: ``var data = await socket.recv(100)``,
+## The `await` call may be used in many contexts. It can be used on the right
+## hand side of a variable declaration: `var data = await socket.recv(100)`,
 ## in which case the variable will be set to the value of the future
-## automatically. It can be used to await a ``Future`` object, and it can
-## be used to await a procedure returning a ``Future[void]``:
-## ``await socket.send("foobar")``.
+## automatically. It can be used to await a `Future` object, and it can
+## be used to await a procedure returning a `Future[void]`:
+## `await socket.send("foobar")`.
+##
+## If an awaited future completes with an error, then `await` will re-raise
+## this error. To avoid this, you can use the `yield` keyword instead of
+## `await`. The following section shows different ways that you can handle
+## exceptions in async procs.
+##
+## .. caution::
+##     Procedures marked {.async.} do not support mutable parameters such
+##     as `var int`. References such as `ref int` should be used instead.
+##
+## Handling Exceptions
+## -------------------
+##
+## You can handle exceptions in the same way as in ordinary Nim code;
+## by using the try statement:
+##
+##   ```Nim
+##   try:
+##     let data = await sock.recv(100)
+##     echo("Received ", data)
+##   except:
+##     # Handle exception
+##   ```
+##
+## An alternative approach to handling exceptions is to use `yield` on a future
+## then check the future's `failed` property. For example:
+##
+##   ```Nim
+##   var future = sock.recv(100)
+##   yield future
+##   if future.failed:
+##     # Handle exception
+##   ```
+##
 ##
 ## Discarding futures
-## ------------------
+## ==================
+##
+## Futures should **never** be discarded directly because they may contain
+## errors. If you do not care for the result of a Future then you should use
+## the `asyncCheck` procedure instead of the `discard` keyword. Note that this
+## does not wait for completion, and you should use `waitFor` or `await` for that purpose.
+##
+## .. note:: `await` also checks if the future fails, so you can safely discard
+##   its result.
 ##
-## Futures should **never** be discarded. This is because they may contain
-## errors. If you do not care for the result of a Future then you should
-## use the ``asyncCheck`` procedure instead of the ``discard`` keyword.
+## Handling futures
+## ================
+##
+## There are many different operations that apply to a future.
+## The three primary high-level operations are `asyncCheck`,
+## `waitFor`, and `await`.
+##
+## * `asyncCheck`: Raises an exception if the future fails. It neither waits
+##   for the future to finish nor returns the result of the future.
+## * `waitFor`: Polls the event loop and blocks the current thread until the
+##   future finishes. This is often used to call an async procedure from a
+##   synchronous context and should never be used in an `async` proc.
+## * `await`: Pauses execution in the current async procedure until the future
+##   finishes. While the current procedure is paused, other async procedures will
+##   continue running. Should be used instead of `waitFor` in an async
+##   procedure.
+##
+## Here is a handy quick reference chart showing their high-level differences:
+## ==============  =====================   =======================
+## Procedure       Context                 Blocking
+## ==============  =====================   =======================
+## `asyncCheck`    non-async and async     non-blocking
+## `waitFor`       non-async               blocks current thread
+## `await`         async                   suspends current proc
+## ==============  =====================   =======================
 ##
 ## Examples
-## --------
+## ========
 ##
 ## For examples take a look at the documentation for the modules implementing
 ## asynchronous IO. A good place to start is the
 ## `asyncnet module <asyncnet.html>`_.
 ##
+## Investigating pending futures
+## =============================
+##
+## It's possible to get into a situation where an async proc, or more accurately
+## a `Future[T]` gets stuck and
+## never completes. This can happen for various reasons and can cause serious
+## memory leaks. When this occurs it's hard to identify the procedure that is
+## stuck.
+##
+## Thankfully there is a mechanism which tracks the count of each pending future.
+## All you need to do to enable it is compile with `-d:futureLogging` and
+## use the `getFuturesInProgress` procedure to get the list of pending futures
+## together with the stack traces to the moment of their creation.
+##
+## You may also find it useful to use this
+## `prometheus package <https://github.com/dom96/prometheus>`_ which will log
+## the pending futures into prometheus, allowing you to analyse them via a nice
+## graph.
+##
+##
+##
 ## Limitations/Bugs
-## ----------------
+## ================
 ##
-## * The effect system (``raises: []``) does not work with async procedures.
-## * Can't await in a ``except`` body
-## * Forward declarations for async procs are broken,
-##   link includes workaround: https://github.com/nim-lang/Nim/issues/3182.
-## * FutureVar[T] needs to be completed manually.
-
-# TODO: Check if yielded future is nil and throw a more meaningful exception
-
-# -- Futures
-
-type
-  FutureBase* = ref object of RootObj ## Untyped future.
-    cb: proc () {.closure,gcsafe.}
-    finished: bool
-    error*: ref Exception ## Stored exception
-    errorStackTrace*: string
-    when not defined(release):
-      stackTrace: string ## For debugging purposes only.
-      id: int
-      fromProc: string
-
-  Future*[T] = ref object of FutureBase ## Typed future.
-    value: T ## Stored value
-
-  FutureVar*[T] = distinct Future[T]
-
-  FutureError* = object of Exception
-    cause*: FutureBase
-
-{.deprecated: [PFutureBase: FutureBase, PFuture: Future].}
-
-when not defined(release):
-  var currentID = 0
-proc newFuture*[T](fromProc: string = "unspecified"): Future[T] =
-  ## Creates a new future.
-  ##
-  ## Specifying ``fromProc``, which is a string specifying the name of the proc
-  ## that this future belongs to, is a good habit as it helps with debugging.
-  new(result)
-  result.finished = false
-  when not defined(release):
-    result.stackTrace = getStackTrace()
-    result.id = currentID
-    result.fromProc = fromProc
-    currentID.inc()
-
-proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] =
-  ## Create a new ``FutureVar``. This Future type is ideally suited for
-  ## situations where you want to avoid unnecessary allocations of Futures.
-  ##
-  ## Specifying ``fromProc``, which is a string specifying the name of the proc
-  ## that this future belongs to, is a good habit as it helps with debugging.
-  result = FutureVar[T](newFuture[T](fromProc))
-
-proc clean*[T](future: FutureVar[T]) =
-  ## Resets the ``finished`` status of ``future``.
-  Future[T](future).finished = false
-  Future[T](future).error = nil
-
-proc checkFinished[T](future: Future[T]) =
-  ## Checks whether `future` is finished. If it is then raises a
-  ## ``FutureError``.
-  when not defined(release):
-    if future.finished:
-      var msg = ""
-      msg.add("An attempt was made to complete a Future more than once. ")
-      msg.add("Details:")
-      msg.add("\n  Future ID: " & $future.id)
-      msg.add("\n  Created in proc: " & future.fromProc)
-      msg.add("\n  Stack trace to moment of creation:")
-      msg.add("\n" & indent(future.stackTrace.strip(), 4))
-      when T is string:
-        msg.add("\n  Contents (string): ")
-        msg.add("\n" & indent(future.value.repr, 4))
-      msg.add("\n  Stack trace to moment of secondary completion:")
-      msg.add("\n" & indent(getStackTrace().strip(), 4))
-      var err = newException(FutureError, msg)
-      err.cause = future
-      raise err
-
-proc complete*[T](future: Future[T], val: T) =
-  ## Completes ``future`` with value ``val``.
-  #assert(not future.finished, "Future already finished, cannot finish twice.")
-  checkFinished(future)
-  assert(future.error == nil)
-  future.value = val
-  future.finished = true
-  if future.cb != nil:
-    future.cb()
-
-proc complete*(future: Future[void]) =
-  ## Completes a void ``future``.
-  #assert(not future.finished, "Future already finished, cannot finish twice.")
-  checkFinished(future)
-  assert(future.error == nil)
-  future.finished = true
-  if future.cb != nil:
-    future.cb()
-
-proc complete*[T](future: FutureVar[T]) =
-  ## Completes a ``FutureVar``.
-  template fut: expr = Future[T](future)
-  checkFinished(fut)
-  assert(fut.error == nil)
-  fut.finished = true
-  if fut.cb != nil:
-    fut.cb()
-
-proc fail*[T](future: Future[T], error: ref Exception) =
-  ## Completes ``future`` with ``error``.
-  #assert(not future.finished, "Future already finished, cannot finish twice.")
-  checkFinished(future)
-  future.finished = true
-  future.error = error
-  future.errorStackTrace =
-    if getStackTrace(error) == "": getStackTrace() else: getStackTrace(error)
-  if future.cb != nil:
-    future.cb()
-  else:
-    # This is to prevent exceptions from being silently ignored when a future
-    # is discarded.
-    # TODO: This may turn out to be a bad idea.
-    # Turns out this is a bad idea.
-    #raise error
-    discard
-
-proc `callback=`*(future: FutureBase, cb: proc () {.closure,gcsafe.}) =
-  ## Sets the callback proc to be called when the future completes.
-  ##
-  ## If future has already completed then ``cb`` will be called immediately.
-  ##
-  ## **Note**: You most likely want the other ``callback`` setter which
-  ## passes ``future`` as a param to the callback.
-  future.cb = cb
-  if future.finished:
-    future.cb()
-
-proc `callback=`*[T](future: Future[T],
-    cb: proc (future: Future[T]) {.closure,gcsafe.}) =
-  ## Sets the callback proc to be called when the future completes.
-  ##
-  ## If future has already completed then ``cb`` will be called immediately.
-  future.callback = proc () = cb(future)
-
-proc injectStacktrace[T](future: Future[T]) =
-  # TODO: Come up with something better.
-  when not defined(release):
-    var msg = ""
-    msg.add("\n  " & future.fromProc & "'s lead up to read of failed Future:")
-
-    if not future.errorStackTrace.isNil and future.errorStackTrace != "":
-      msg.add("\n" & indent(future.errorStackTrace.strip(), 4))
-    else:
-      msg.add("\n    Empty or nil stack trace.")
-    future.error.msg.add(msg)
-
-proc read*[T](future: Future[T]): T =
-  ## Retrieves the value of ``future``. Future must be finished otherwise
-  ## this function will fail with a ``ValueError`` exception.
-  ##
-  ## If the result of the future is an error then that error will be raised.
-  if future.finished:
-    if future.error != nil:
-      injectStacktrace(future)
-      raise future.error
-    when T isnot void:
-      return future.value
-  else:
-    # TODO: Make a custom exception type for this?
-    raise newException(ValueError, "Future still in progress.")
-
-proc readError*[T](future: Future[T]): ref Exception =
-  ## Retrieves the exception stored in ``future``.
-  ##
-  ## An ``ValueError`` exception will be thrown if no exception exists
-  ## in the specified Future.
-  if future.error != nil: return future.error
-  else:
-    raise newException(ValueError, "No error in future.")
+## * The effect system (`raises: []`) does not work with async procedures.
+## * Mutable parameters are not supported by async procedures.
+##
+##
+## Multiple async backend support
+## ==============================
+##
+## Thanks to its powerful macro support, Nim allows ``async``/``await`` to be
+## implemented in libraries with only minimal support from the language - as
+## such, multiple ``async`` libraries exist, including ``asyncdispatch`` and
+## ``chronos``, and more may come to be developed in the future.
+##
+## Libraries built on top of async/await may wish to support multiple async
+## backends - the best way to do so is to create separate modules for each backend
+## that may be imported side-by-side.
+##
+## An alternative way is to select backend using a global compile flag - this
+## method makes it difficult to compose applications that use both backends as may
+## happen with transitive dependencies, but may be appropriate in some cases -
+## libraries choosing this path should call the flag `asyncBackend`, allowing
+## applications to choose the backend with `-d:asyncBackend=<backend_name>`.
+##
+## Known `async` backends include:
+##
+## * `-d:asyncBackend=none`: disable `async` support completely
+## * `-d:asyncBackend=asyncdispatch`: https://nim-lang.org/docs/asyncdispatch.html
+## * `-d:asyncBackend=chronos`: https://github.com/status-im/nim-chronos/
+##
+## ``none`` can be used when a library supports both a synchronous and
+## asynchronous API, to disable the latter.
 
-proc mget*[T](future: FutureVar[T]): var T =
-  ## Returns a mutable value stored in ``future``.
-  ##
-  ## Unlike ``read``, this function will not raise an exception if the
-  ## Future has not been finished.
-  result = Future[T](future).value
+import std/[os, tables, strutils, times, heapqueue, options, asyncstreams]
+import std/[math, monotimes]
+import std/asyncfutures except callSoon
 
-proc finished*[T](future: Future[T]): bool =
-  ## Determines whether ``future`` has completed.
-  ##
-  ## ``True`` may indicate an error or a value. Use ``failed`` to distinguish.
-  future.finished
+import std/[nativesockets, net, deques]
 
-proc failed*(future: FutureBase): bool =
-  ## Determines whether ``future`` completed with an error.
-  return future.error != nil
+when defined(nimPreviewSlimSystem):
+  import std/[assertions, syncio]
 
-proc asyncCheck*[T](future: Future[T]) =
-  ## Sets a callback on ``future`` which raises an exception if the future
-  ## finished with an error.
-  ##
-  ## This should be used instead of ``discard`` to discard void futures.
-  future.callback =
-    proc () =
-      if future.failed:
-        injectStacktrace(future)
-        raise future.error
-
-proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
-  ## Returns a future which will complete once both ``fut1`` and ``fut2``
-  ## complete.
-  var retFuture = newFuture[void]("asyncdispatch.`and`")
-  fut1.callback =
-    proc () =
-      if fut2.finished: retFuture.complete()
-  fut2.callback =
-    proc () =
-      if fut1.finished: retFuture.complete()
-  return retFuture
+export Port, SocketFlag
+export asyncfutures except callSoon
+export asyncstreams
 
-proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
-  ## Returns a future which will complete once either ``fut1`` or ``fut2``
-  ## complete.
-  var retFuture = newFuture[void]("asyncdispatch.`or`")
-  proc cb() =
-    if not retFuture.finished: retFuture.complete()
-  fut1.callback = cb
-  fut2.callback = cb
-  return retFuture
+# TODO: Check if yielded future is nil and throw a more meaningful exception
 
 type
   PDispatcherBase = ref object of RootRef
-    timers: seq[tuple[finishAt: float, fut: Future[void]]]
-
-proc processTimers(p: PDispatcherBase) =
-  var oldTimers = p.timers
-  p.timers = @[]
-  for t in oldTimers:
-    if epochTime() >= t.finishAt:
-      t.fut.complete()
-    else:
-      p.timers.add(t)
+    timers*: HeapQueue[tuple[finishAt: MonoTime, fut: Future[void]]]
+    callbacks*: Deque[proc () {.gcsafe.}]
+
+proc processTimers(
+  p: PDispatcherBase, didSomeWork: var bool
+): Option[int] {.inline.} =
+  # Pop the timers in the order in which they will expire (smaller `finishAt`).
+  var count = p.timers.len
+  let t = getMonoTime()
+  while count > 0 and t >= p.timers[0].finishAt:
+    p.timers.pop().fut.complete()
+    dec count
+    didSomeWork = true
+
+  # Return the number of milliseconds in which the next timer will expire.
+  if p.timers.len == 0: return
+
+  let millisecs = (p.timers[0].finishAt - getMonoTime()).inMilliseconds
+  return some(millisecs.int + 1)
+
+proc processPendingCallbacks(p: PDispatcherBase; didSomeWork: var bool) =
+  while p.callbacks.len > 0:
+    var cb = p.callbacks.popFirst()
+    cb()
+    didSomeWork = true
+
+proc adjustTimeout(
+  p: PDispatcherBase, pollTimeout: int, nextTimer: Option[int]
+): int {.inline.} =
+  if p.callbacks.len != 0:
+    return 0
+
+  if nextTimer.isNone() or pollTimeout == -1:
+    return pollTimeout
+
+  result = max(nextTimer.get(), 0)
+  result = min(pollTimeout, result)
+
+proc runOnce(timeout: int): bool {.gcsafe.}
+
+proc callSoon*(cbproc: proc () {.gcsafe.}) {.gcsafe.}
+  ## Schedule `cbproc` to be called as soon as possible.
+  ## The callback is called when control returns to the event loop.
+
+proc initCallSoonProc =
+  if asyncfutures.getCallSoonProc().isNil:
+    asyncfutures.setCallSoonProc(callSoon)
+
+template implementSetInheritable() {.dirty.} =
+  when declared(setInheritable):
+    proc setInheritable*(fd: AsyncFD, inheritable: bool): bool =
+      ## Control whether a file handle can be inherited by child processes.
+      ## Returns `true` on success.
+      ##
+      ## This procedure is not guaranteed to be available for all platforms.
+      ## Test for availability with `declared() <system.html#declared,untyped>`_.
+      fd.FileHandle.setInheritable(inheritable)
 
 when defined(windows) or defined(nimdoc):
-  import winlean, sets, hashes
+  import std/[winlean, sets, hashes]
   type
-    CompletionKey = Dword
+    CompletionKey = ULONG_PTR
 
     CompletionData* = object
-      fd*: AsyncFD # TODO: Rename this.
-      cb*: proc (fd: AsyncFD, bytesTransferred: Dword,
-                errcode: OSErrorCode) {.closure,gcsafe.}
+      fd*: AsyncFD       # TODO: Rename this.
+      cb*: owned(proc (fd: AsyncFD, bytesTransferred: DWORD,
+                errcode: OSErrorCode) {.closure, gcsafe.})
+      cell*: ForeignCell # we need this `cell` to protect our `cb` environment,
+                         # when using RegisterWaitForSingleObject, because
+                         # waiting is done in different thread.
 
     PDispatcher* = ref object of PDispatcherBase
       ioPort: Handle
-      handles: HashSet[AsyncFD]
+      handles*: HashSet[AsyncFD] # Export handles so that an external library can register them.
 
-    CustomOverlapped = object of OVERLAPPED
+    CustomObj = object of OVERLAPPED
       data*: CompletionData
 
-    PCustomOverlapped* = ref CustomOverlapped
+    CustomRef* = ref CustomObj
 
     AsyncFD* = distinct int
-  {.deprecated: [TCompletionKey: CompletionKey, TAsyncFD: AsyncFD,
-                TCustomOverlapped: CustomOverlapped, TCompletionData: CompletionData].}
+
+    PostCallbackData = object
+      ioPort: Handle
+      handleFd: AsyncFD
+      waitFd: Handle
+      ovl: owned CustomRef
+    PostCallbackDataPtr = ptr PostCallbackData
+
+    AsyncEventImpl = object
+      hEvent: Handle
+      hWaiter: Handle
+      pcd: PostCallbackDataPtr
+    AsyncEvent* = ptr AsyncEventImpl
+
+    Callback* = proc (fd: AsyncFD): bool {.closure, gcsafe.}
 
   proc hash(x: AsyncFD): Hash {.borrow.}
   proc `==`*(x: AsyncFD, y: AsyncFD): bool {.borrow.}
 
-  proc newDispatcher*(): PDispatcher =
+  proc newDispatcher*(): owned PDispatcher =
     ## Creates a new Dispatcher instance.
     new result
     result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
-    result.handles = initSet[AsyncFD]()
-    result.timers = @[]
+    result.handles = initHashSet[AsyncFD]()
+    result.timers.clear()
+    result.callbacks = initDeque[proc () {.closure, gcsafe.}](64)
+
+  var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher
+
+  proc setGlobalDispatcher*(disp: sink PDispatcher) =
+    if not gDisp.isNil:
+      assert gDisp.callbacks.len == 0
+    gDisp = disp
+    initCallSoonProc()
 
-  var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
   proc getGlobalDispatcher*(): PDispatcher =
-    ## Retrieves the global thread-local dispatcher.
-    if gDisp.isNil: gDisp = newDispatcher()
+    if gDisp.isNil:
+      setGlobalDispatcher(newDispatcher())
     result = gDisp
 
+  proc getIoHandler*(disp: PDispatcher): Handle =
+    ## Returns the underlying IO Completion Port handle (Windows) or selector
+    ## (Unix) for the specified dispatcher.
+    return disp.ioPort
+
   proc register*(fd: AsyncFD) =
-    ## Registers ``fd`` with the dispatcher.
+    ## Registers `fd` with the dispatcher.
     let p = getGlobalDispatcher()
+
     if createIoCompletionPort(fd.Handle, p.ioPort,
                               cast[CompletionKey](fd), 1) == 0:
       raiseOSError(osLastError())
@@ -414,28 +380,42 @@ when defined(windows) or defined(nimdoc):
 
   proc verifyPresence(fd: AsyncFD) =
     ## Ensures that file descriptor has been registered with the dispatcher.
+    ## Raises ValueError if `fd` has not been registered.
     let p = getGlobalDispatcher()
     if fd notin p.handles:
       raise newException(ValueError,
         "Operation performed on a socket which has not been registered with" &
         " the dispatcher yet.")
 
-  proc poll*(timeout = 500) =
-    ## Waits for completion events and processes them.
+  proc hasPendingOperations*(): bool =
+    ## Returns `true` if the global dispatcher has pending operations.
+    let p = getGlobalDispatcher()
+    p.handles.len != 0 or p.timers.len != 0 or p.callbacks.len != 0
+
+  proc runOnce(timeout: int): bool =
     let p = getGlobalDispatcher()
-    if p.handles.len == 0 and p.timers.len == 0:
+    if p.handles.len == 0 and p.timers.len == 0 and p.callbacks.len == 0:
       raise newException(ValueError,
         "No handles or timers registered in dispatcher.")
 
-    let llTimeout =
-      if timeout ==  -1: winlean.INFINITE
-      else: timeout.int32
-    var lpNumberOfBytesTransferred: Dword
-    var lpCompletionKey: ULONG
-    var customOverlapped: PCustomOverlapped
+    result = false
+    let nextTimer = processTimers(p, result)
+    let at = adjustTimeout(p, timeout, nextTimer)
+    var llTimeout =
+      if at == -1: winlean.INFINITE
+      else: at.int32
+
+    var lpNumberOfBytesTransferred: DWORD
+    var lpCompletionKey: ULONG_PTR
+    var customOverlapped: CustomRef
     let res = getQueuedCompletionStatus(p.ioPort,
         addr lpNumberOfBytesTransferred, addr lpCompletionKey,
         cast[ptr POVERLAPPED](addr customOverlapped), llTimeout).bool
+    result = true
+    # For 'gcDestructors' the destructor of 'customOverlapped' will
+    # be called at the end and we are the only owner here. This means
+    # We do not have to 'GC_unref(customOverlapped)' because the destructor
+    # does that for us.
 
     # http://stackoverflow.com/a/12277264/492186
     # TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html
@@ -445,162 +425,84 @@ when defined(windows) or defined(nimdoc):
 
       customOverlapped.data.cb(customOverlapped.data.fd,
           lpNumberOfBytesTransferred, OSErrorCode(-1))
-      GC_unref(customOverlapped)
+
+      # If cell.data != nil, then system.protect(rawEnv(cb)) was called,
+      # so we need to dispose our `cb` environment, because it is not needed
+      # anymore.
+      if customOverlapped.data.cell.data != nil:
+        system.dispose(customOverlapped.data.cell)
+
+      when not defined(gcDestructors):
+        GC_unref(customOverlapped)
     else:
       let errCode = osLastError()
       if customOverlapped != nil:
         assert customOverlapped.data.fd == lpCompletionKey.AsyncFD
         customOverlapped.data.cb(customOverlapped.data.fd,
             lpNumberOfBytesTransferred, errCode)
-        GC_unref(customOverlapped)
+        if customOverlapped.data.cell.data != nil:
+          system.dispose(customOverlapped.data.cell)
+        when not defined(gcDestructors):
+          GC_unref(customOverlapped)
       else:
         if errCode.int32 == WAIT_TIMEOUT:
           # Timed out
-          discard
+          result = false
         else: raiseOSError(errCode)
 
     # Timer processing.
-    processTimers(p)
+    discard processTimers(p, result)
+    # Callback queue processing
+    processPendingCallbacks(p, result)
 
-  var connectExPtr: pointer = nil
-  var acceptExPtr: pointer = nil
-  var getAcceptExSockAddrsPtr: pointer = nil
+
+  var acceptEx: WSAPROC_ACCEPTEX
+  var connectEx: WSAPROC_CONNECTEX
+  var getAcceptExSockAddrs: WSAPROC_GETACCEPTEXSOCKADDRS
 
   proc initPointer(s: SocketHandle, fun: var pointer, guid: var GUID): bool =
     # Ref: https://github.com/powdahound/twisted/blob/master/twisted/internet/iocpreactor/iocpsupport/winsock_pointers.c
-    var bytesRet: Dword
+    var bytesRet: DWORD
     fun = nil
     result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid,
-                      sizeof(GUID).Dword, addr fun, sizeof(pointer).Dword,
+                      sizeof(GUID).DWORD, addr fun, sizeof(pointer).DWORD,
                       addr bytesRet, nil, nil) == 0
 
   proc initAll() =
-    let dummySock = newNativeSocket()
-    if not initPointer(dummySock, connectExPtr, WSAID_CONNECTEX):
+    let dummySock = createNativeSocket()
+    if dummySock == INVALID_SOCKET:
       raiseOSError(osLastError())
-    if not initPointer(dummySock, acceptExPtr, WSAID_ACCEPTEX):
+    var fun: pointer = nil
+    if not initPointer(dummySock, fun, WSAID_CONNECTEX):
       raiseOSError(osLastError())
-    if not initPointer(dummySock, getAcceptExSockAddrsPtr, WSAID_GETACCEPTEXSOCKADDRS):
+    connectEx = cast[WSAPROC_CONNECTEX](fun)
+    if not initPointer(dummySock, fun, WSAID_ACCEPTEX):
       raiseOSError(osLastError())
-
-  proc connectEx(s: SocketHandle, name: ptr SockAddr, namelen: cint,
-                  lpSendBuffer: pointer, dwSendDataLength: Dword,
-                  lpdwBytesSent: PDword, lpOverlapped: POVERLAPPED): bool =
-    if connectExPtr.isNil: raise newException(ValueError, "Need to initialise ConnectEx().")
-    let fun =
-      cast[proc (s: SocketHandle, name: ptr SockAddr, namelen: cint,
-         lpSendBuffer: pointer, dwSendDataLength: Dword,
-         lpdwBytesSent: PDword, lpOverlapped: POVERLAPPED): bool {.stdcall,gcsafe.}](connectExPtr)
-
-    result = fun(s, name, namelen, lpSendBuffer, dwSendDataLength, lpdwBytesSent,
-         lpOverlapped)
-
-  proc acceptEx(listenSock, acceptSock: SocketHandle, lpOutputBuffer: pointer,
-                 dwReceiveDataLength, dwLocalAddressLength,
-                 dwRemoteAddressLength: Dword, lpdwBytesReceived: PDword,
-                 lpOverlapped: POVERLAPPED): bool =
-    if acceptExPtr.isNil: raise newException(ValueError, "Need to initialise AcceptEx().")
-    let fun =
-      cast[proc (listenSock, acceptSock: SocketHandle, lpOutputBuffer: pointer,
-                 dwReceiveDataLength, dwLocalAddressLength,
-                 dwRemoteAddressLength: Dword, lpdwBytesReceived: PDword,
-                 lpOverlapped: POVERLAPPED): bool {.stdcall,gcsafe.}](acceptExPtr)
-    result = fun(listenSock, acceptSock, lpOutputBuffer, dwReceiveDataLength,
-        dwLocalAddressLength, dwRemoteAddressLength, lpdwBytesReceived,
-        lpOverlapped)
-
-  proc getAcceptExSockaddrs(lpOutputBuffer: pointer,
-      dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength: Dword,
-      LocalSockaddr: ptr ptr SockAddr, LocalSockaddrLength: LPInt,
-      RemoteSockaddr: ptr ptr SockAddr, RemoteSockaddrLength: LPInt) =
-    if getAcceptExSockAddrsPtr.isNil:
-      raise newException(ValueError, "Need to initialise getAcceptExSockAddrs().")
-
-    let fun =
-      cast[proc (lpOutputBuffer: pointer,
-                 dwReceiveDataLength, dwLocalAddressLength,
-                 dwRemoteAddressLength: Dword, LocalSockaddr: ptr ptr SockAddr,
-                 LocalSockaddrLength: LPInt, RemoteSockaddr: ptr ptr SockAddr,
-                RemoteSockaddrLength: LPInt) {.stdcall,gcsafe.}](getAcceptExSockAddrsPtr)
-
-    fun(lpOutputBuffer, dwReceiveDataLength, dwLocalAddressLength,
-                  dwRemoteAddressLength, LocalSockaddr, LocalSockaddrLength,
-                  RemoteSockaddr, RemoteSockaddrLength)
-
-  proc connect*(socket: AsyncFD, address: string, port: Port,
-    domain = nativesockets.AF_INET): Future[void] =
-    ## Connects ``socket`` to server at ``address:port``.
-    ##
-    ## Returns a ``Future`` which will complete when the connection succeeds
-    ## or an error occurs.
-    verifyPresence(socket)
-    var retFuture = newFuture[void]("connect")
-    # Apparently ``ConnectEx`` expects the socket to be initially bound:
-    var saddr: Sockaddr_in
-    saddr.sin_family = int16(toInt(domain))
-    saddr.sin_port = 0
-    saddr.sin_addr.s_addr = INADDR_ANY
-    if bindAddr(socket.SocketHandle, cast[ptr SockAddr](addr(saddr)),
-                  sizeof(saddr).SockLen) < 0'i32:
+    acceptEx = cast[WSAPROC_ACCEPTEX](fun)
+    if not initPointer(dummySock, fun, WSAID_GETACCEPTEXSOCKADDRS):
       raiseOSError(osLastError())
+    getAcceptExSockAddrs = cast[WSAPROC_GETACCEPTEXSOCKADDRS](fun)
+    close(dummySock)
 
-    var aiList = getAddrInfo(address, port, domain)
-    var success = false
-    var lastError: OSErrorCode
-    var it = aiList
-    while it != nil:
-      # "the OVERLAPPED structure must remain valid until the I/O completes"
-      # http://blogs.msdn.com/b/oldnewthing/archive/2011/02/02/10123392.aspx
-      var ol = PCustomOverlapped()
-      GC_ref(ol)
-      ol.data = CompletionData(fd: socket, cb:
-        proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
-          if not retFuture.finished:
-            if errcode == OSErrorCode(-1):
-              retFuture.complete()
-            else:
-              retFuture.fail(newException(OSError, osErrorMsg(errcode)))
-      )
-
-      var ret = connectEx(socket.SocketHandle, it.ai_addr,
-                          sizeof(Sockaddr_in).cint, nil, 0, nil,
-                          cast[POVERLAPPED](ol))
-      if ret:
-        # Request to connect completed immediately.
-        success = true
-        retFuture.complete()
-        # We don't deallocate ``ol`` here because even though this completed
-        # immediately poll will still be notified about its completion and it will
-        # free ``ol``.
-        break
-      else:
-        lastError = osLastError()
-        if lastError.int32 == ERROR_IO_PENDING:
-          # In this case ``ol`` will be deallocated in ``poll``.
-          success = true
-          break
-        else:
-          GC_unref(ol)
-          success = false
-      it = it.ai_next
-
-    dealloc(aiList)
-    if not success:
-      retFuture.fail(newException(OSError, osErrorMsg(lastError)))
-    return retFuture
+  proc newCustom*(): CustomRef =
+    result = CustomRef() # 0
+    GC_ref(result) # 1  prevent destructor from doing a premature free.
+    # destructor of newCustom's caller --> 0. This means
+    # Windows holds a ref for us with RC == 0 (single owner).
+    # This is passed back to us in the IO completion port.
 
   proc recv*(socket: AsyncFD, size: int,
-             flags = {SocketFlag.SafeDisconn}): Future[string] =
-    ## Reads **up to** ``size`` bytes from ``socket``. Returned future will
+             flags = {SocketFlag.SafeDisconn}): owned(Future[string]) =
+    ## Reads **up to** `size` bytes from `socket`. Returned future will
     ## complete once all the data requested is read, a part of the data has been
     ## read, or the socket has disconnected in which case the future will
-    ## complete with a value of ``""``.
+    ## complete with a value of `""`.
     ##
-    ## **Warning**: The ``Peek`` socket flag is not supported on Windows.
+    ## .. warning:: The `Peek` socket flag is not supported on Windows.
 
 
     # Things to note:
-    #   * When WSARecv completes immediately then ``bytesReceived`` is very
+    #   * When WSARecv completes immediately then `bytesReceived` is very
     #     unreliable.
     #   * Still need to implement message-oriented socket disconnection,
     #     '\0' in the message currently signifies a socket disconnect. Who
@@ -613,12 +515,11 @@ when defined(windows) or defined(nimdoc):
     dataBuf.buf = cast[cstring](alloc0(size))
     dataBuf.len = size.ULONG
 
-    var bytesReceived: Dword
-    var flagsio = flags.toOSFlags().Dword
-    var ol = PCustomOverlapped()
-    GC_ref(ol)
+    var bytesReceived: DWORD
+    var flagsio = flags.toOSFlags().DWORD
+    var ol = newCustom()
     ol.data = CompletionData(fd: socket, cb:
-      proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
+      proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
         if not retFuture.finished:
           if errcode == OSErrorCode(-1):
             if bytesCount == 0 and dataBuf.buf[0] == '\0':
@@ -632,7 +533,7 @@ when defined(windows) or defined(nimdoc):
             if flags.isDisconnectionError(errcode):
               retFuture.complete("")
             else:
-              retFuture.fail(newException(OSError, osErrorMsg(errcode)))
+              retFuture.fail(newOSError(errcode))
         if dataBuf.buf != nil:
           dealloc dataBuf.buf
           dataBuf.buf = nil
@@ -650,50 +551,32 @@ when defined(windows) or defined(nimdoc):
         if flags.isDisconnectionError(err):
           retFuture.complete("")
         else:
-          retFuture.fail(newException(OSError, osErrorMsg(err)))
-    elif ret == 0 and bytesReceived == 0 and dataBuf.buf[0] == '\0':
-      # We have to ensure that the buffer is empty because WSARecv will tell
-      # us immediately when it was disconnected, even when there is still
-      # data in the buffer.
-      # We want to give the user as much data as we can. So we only return
-      # the empty string (which signals a disconnection) when there is
-      # nothing left to read.
-      retFuture.complete("")
-      # TODO: "For message-oriented sockets, where a zero byte message is often
-      # allowable, a failure with an error code of WSAEDISCON is used to
-      # indicate graceful closure."
-      # ~ http://msdn.microsoft.com/en-us/library/ms741688%28v=vs.85%29.aspx
-    else:
-      # Request to read completed immediately.
-      # From my tests bytesReceived isn't reliable.
-      let realSize =
-        if bytesReceived == 0:
-          size
-        else:
-          bytesReceived
-      var data = newString(realSize)
-      assert realSize <= size
-      copyMem(addr data[0], addr dataBuf.buf[0], realSize)
-      #dealloc dataBuf.buf
-      retFuture.complete($data)
-      # We don't deallocate ``ol`` here because even though this completed
-      # immediately poll will still be notified about its completion and it will
-      # free ``ol``.
+          retFuture.fail(newOSError(err))
+    elif ret == 0:
+      # Request completed immediately.
+      if bytesReceived != 0:
+        var data = newString(bytesReceived)
+        assert bytesReceived <= size
+        copyMem(addr data[0], addr dataBuf.buf[0], bytesReceived)
+        retFuture.complete($data)
+      else:
+        if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
+          retFuture.complete("")
     return retFuture
 
-  proc recvInto*(socket: AsyncFD, buf: cstring, size: int,
-                flags = {SocketFlag.SafeDisconn}): Future[int] =
-    ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``, which must
+  proc recvInto*(socket: AsyncFD, buf: pointer, size: int,
+                 flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
+    ## Reads **up to** `size` bytes from `socket` into `buf`, which must
     ## at least be of that size. Returned future will complete once all the
     ## data requested is read, a part of the data has been read, or the socket
     ## has disconnected in which case the future will complete with a value of
-    ## ``0``.
+    ## `0`.
     ##
-    ## **Warning**: The ``Peek`` socket flag is not supported on Windows.
+    ## .. warning:: The `Peek` socket flag is not supported on Windows.
 
 
     # Things to note:
-    #   * When WSARecv completes immediately then ``bytesReceived`` is very
+    #   * When WSARecv completes immediately then `bytesReceived` is very
     #     unreliable.
     #   * Still need to implement message-oriented socket disconnection,
     #     '\0' in the message currently signifies a socket disconnect. Who
@@ -705,26 +588,22 @@ when defined(windows) or defined(nimdoc):
 
     #buf[] = '\0'
     var dataBuf: TWSABuf
-    dataBuf.buf = buf
+    dataBuf.buf = cast[cstring](buf)
     dataBuf.len = size.ULONG
 
-    var bytesReceived: Dword
-    var flagsio = flags.toOSFlags().Dword
-    var ol = PCustomOverlapped()
-    GC_ref(ol)
+    var bytesReceived: DWORD
+    var flagsio = flags.toOSFlags().DWORD
+    var ol = newCustom()
     ol.data = CompletionData(fd: socket, cb:
-      proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
+      proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
         if not retFuture.finished:
           if errcode == OSErrorCode(-1):
-            if bytesCount == 0 and dataBuf.buf[0] == '\0':
-              retFuture.complete(0)
-            else:
-              retFuture.complete(bytesCount)
+            retFuture.complete(bytesCount)
           else:
             if flags.isDisconnectionError(errcode):
               retFuture.complete(0)
             else:
-              retFuture.fail(newException(OSError, osErrorMsg(errcode)))
+              retFuture.fail(newOSError(errcode))
         if dataBuf.buf != nil:
           dataBuf.buf = nil
     )
@@ -740,50 +619,35 @@ when defined(windows) or defined(nimdoc):
         if flags.isDisconnectionError(err):
           retFuture.complete(0)
         else:
-          retFuture.fail(newException(OSError, osErrorMsg(err)))
-    elif ret == 0 and bytesReceived == 0 and dataBuf.buf[0] == '\0':
-      # We have to ensure that the buffer is empty because WSARecv will tell
-      # us immediately when it was disconnected, even when there is still
-      # data in the buffer.
-      # We want to give the user as much data as we can. So we only return
-      # the empty string (which signals a disconnection) when there is
-      # nothing left to read.
-      retFuture.complete(0)
-      # TODO: "For message-oriented sockets, where a zero byte message is often
-      # allowable, a failure with an error code of WSAEDISCON is used to
-      # indicate graceful closure."
-      # ~ http://msdn.microsoft.com/en-us/library/ms741688%28v=vs.85%29.aspx
-    else:
-      # Request to read completed immediately.
-      # From my tests bytesReceived isn't reliable.
-      let realSize =
-        if bytesReceived == 0:
-          size
-        else:
-          bytesReceived
-      assert realSize <= size
-      retFuture.complete(realSize)
-      # We don't deallocate ``ol`` here because even though this completed
-      # immediately poll will still be notified about its completion and it will
-      # free ``ol``.
+          retFuture.fail(newOSError(err))
+    elif ret == 0:
+      # Request completed immediately.
+      if bytesReceived != 0:
+        assert bytesReceived <= size
+        retFuture.complete(bytesReceived)
+      else:
+        if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
+          retFuture.complete(bytesReceived)
     return retFuture
 
-  proc send*(socket: AsyncFD, data: string,
-             flags = {SocketFlag.SafeDisconn}): Future[void] =
-    ## Sends ``data`` to ``socket``. The returned future will complete once all
-    ## data has been sent.
+  proc send*(socket: AsyncFD, buf: pointer, size: int,
+             flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
+    ## Sends `size` bytes from `buf` to `socket`. The returned future
+    ## will complete once all data has been sent.
+    ##
+    ## .. warning:: Use it with caution. If `buf` refers to GC'ed object,
+    ##   you must use GC_ref/GC_unref calls to avoid early freeing of the buffer.
     verifyPresence(socket)
     var retFuture = newFuture[void]("send")
 
     var dataBuf: TWSABuf
-    dataBuf.buf = data # since this is not used in a callback, this is fine
-    dataBuf.len = data.len.ULONG
+    dataBuf.buf = cast[cstring](buf)
+    dataBuf.len = size.ULONG
 
-    var bytesReceived, lowFlags: Dword
-    var ol = PCustomOverlapped()
-    GC_ref(ol)
+    var bytesReceived, lowFlags: DWORD
+    var ol = newCustom()
     ol.data = CompletionData(fd: socket, cb:
-      proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
+      proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
         if not retFuture.finished:
           if errcode == OSErrorCode(-1):
             retFuture.complete()
@@ -791,7 +655,7 @@ when defined(windows) or defined(nimdoc):
             if flags.isDisconnectionError(errcode):
               retFuture.complete()
             else:
-              retFuture.fail(newException(OSError, osErrorMsg(errcode)))
+              retFuture.fail(newOSError(errcode))
     )
 
     let ret = WSASend(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
@@ -803,16 +667,110 @@ when defined(windows) or defined(nimdoc):
         if flags.isDisconnectionError(err):
           retFuture.complete()
         else:
-          retFuture.fail(newException(OSError, osErrorMsg(err)))
+          retFuture.fail(newOSError(err))
+    else:
+      retFuture.complete()
+      # We don't deallocate `ol` here because even though this completed
+      # immediately poll will still be notified about its completion and it will
+      # free `ol`.
+    return retFuture
+
+  proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr,
+               saddrLen: SockLen,
+               flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
+    ## Sends `data` to specified destination `saddr`, using
+    ## socket `socket`. The returned future will complete once all data
+    ## has been sent.
+    verifyPresence(socket)
+    var retFuture = newFuture[void]("sendTo")
+    var dataBuf: TWSABuf
+    dataBuf.buf = cast[cstring](data)
+    dataBuf.len = size.ULONG
+    var bytesSent = 0.DWORD
+    var lowFlags = 0.DWORD
+
+    # we will preserve address in our stack
+    var staddr: array[128, char] # SOCKADDR_STORAGE size is 128 bytes
+    var stalen: cint = cint(saddrLen)
+    zeroMem(addr(staddr[0]), 128)
+    copyMem(addr(staddr[0]), saddr, saddrLen)
+
+    var ol = newCustom()
+    ol.data = CompletionData(fd: socket, cb:
+      proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
+        if not retFuture.finished:
+          if errcode == OSErrorCode(-1):
+            retFuture.complete()
+          else:
+            retFuture.fail(newOSError(errcode))
+    )
+
+    let ret = WSASendTo(socket.SocketHandle, addr dataBuf, 1, addr bytesSent,
+                        lowFlags, cast[ptr SockAddr](addr(staddr[0])),
+                        stalen, cast[POVERLAPPED](ol), nil)
+    if ret == -1:
+      let err = osLastError()
+      if err.int32 != ERROR_IO_PENDING:
+        GC_unref(ol)
+        retFuture.fail(newOSError(err))
     else:
       retFuture.complete()
-      # We don't deallocate ``ol`` here because even though this completed
+      # We don't deallocate `ol` here because even though this completed
       # immediately poll will still be notified about its completion and it will
-      # free ``ol``.
+      # free `ol`.
+    return retFuture
+
+  proc recvFromInto*(socket: AsyncFD, data: pointer, size: int,
+                     saddr: ptr SockAddr, saddrLen: ptr SockLen,
+                     flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
+    ## Receives a datagram data from `socket` into `buf`, which must
+    ## be at least of size `size`, address of datagram's sender will be
+    ## stored into `saddr` and `saddrLen`. Returned future will complete
+    ## once one datagram has been received, and will return size of packet
+    ## received.
+    verifyPresence(socket)
+    var retFuture = newFuture[int]("recvFromInto")
+
+    var dataBuf = TWSABuf(buf: cast[cstring](data), len: size.ULONG)
+
+    var bytesReceived = 0.DWORD
+    var lowFlags = 0.DWORD
+
+    var ol = newCustom()
+    ol.data = CompletionData(fd: socket, cb:
+      proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
+        if not retFuture.finished:
+          if errcode == OSErrorCode(-1):
+            assert bytesCount <= size
+            retFuture.complete(bytesCount)
+          else:
+            # datagram sockets don't have disconnection,
+            # so we can just raise an exception
+            retFuture.fail(newOSError(errcode))
+    )
+
+    let res = WSARecvFrom(socket.SocketHandle, addr dataBuf, 1,
+                          addr bytesReceived, addr lowFlags,
+                          saddr, cast[ptr cint](saddrLen),
+                          cast[POVERLAPPED](ol), nil)
+    if res == -1:
+      let err = osLastError()
+      if err.int32 != ERROR_IO_PENDING:
+        GC_unref(ol)
+        retFuture.fail(newOSError(err))
+    else:
+      # Request completed immediately.
+      if bytesReceived != 0:
+        assert bytesReceived <= size
+        retFuture.complete(bytesReceived)
+      else:
+        if hasOverlappedIoCompleted(cast[POVERLAPPED](ol)):
+          retFuture.complete(bytesReceived)
     return retFuture
 
-  proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}):
-      Future[tuple[address: string, client: AsyncFD]] =
+  proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn},
+                   inheritable = defined(nimInheritHandles)):
+      owned(Future[tuple[address: string, client: AsyncFD]]) {.gcsafe.} =
     ## Accepts a new connection. Returns a future containing the client socket
     ## corresponding to that connection and the remote address of the client.
     ## The future will complete when the connection is successfully accepted.
@@ -820,44 +778,27 @@ when defined(windows) or defined(nimdoc):
     ## The resulting client socket is automatically registered to the
     ## dispatcher.
     ##
-    ## The ``accept`` call may result in an error if the connecting socket
-    ## disconnects during the duration of the ``accept``. If the ``SafeDisconn``
+    ## If `inheritable` is false (the default), the resulting client socket will
+    ## not be inheritable by child processes.
+    ##
+    ## The `accept` call may result in an error if the connecting socket
+    ## disconnects during the duration of the `accept`. If the `SafeDisconn`
     ## flag is specified then this error will not be raised and instead
     ## accept will be called again.
     verifyPresence(socket)
     var retFuture = newFuture[tuple[address: string, client: AsyncFD]]("acceptAddr")
 
-    var clientSock = newNativeSocket()
+    var clientSock = createNativeSocket(inheritable = inheritable)
     if clientSock == osInvalidSocket: raiseOSError(osLastError())
 
     const lpOutputLen = 1024
     var lpOutputBuf = newString(lpOutputLen)
-    var dwBytesReceived: Dword
-    let dwReceiveDataLength = 0.Dword # We don't want any data to be read.
-    let dwLocalAddressLength = Dword(sizeof (Sockaddr_in) + 16)
-    let dwRemoteAddressLength = Dword(sizeof(Sockaddr_in) + 16)
+    var dwBytesReceived: DWORD
+    let dwReceiveDataLength = 0.DWORD # We don't want any data to be read.
+    let dwLocalAddressLength = DWORD(sizeof(Sockaddr_in6) + 16)
+    let dwRemoteAddressLength = DWORD(sizeof(Sockaddr_in6) + 16)
 
-    template completeAccept(): stmt {.immediate, dirty.} =
-      var listenSock = socket
-      let setoptRet = setsockopt(clientSock, SOL_SOCKET,
-          SO_UPDATE_ACCEPT_CONTEXT, addr listenSock,
-          sizeof(listenSock).SockLen)
-      if setoptRet != 0: raiseOSError(osLastError())
-
-      var localSockaddr, remoteSockaddr: ptr SockAddr
-      var localLen, remoteLen: int32
-      getAcceptExSockaddrs(addr lpOutputBuf[0], dwReceiveDataLength,
-                           dwLocalAddressLength, dwRemoteAddressLength,
-                           addr localSockaddr, addr localLen,
-                           addr remoteSockaddr, addr remoteLen)
-      register(clientSock.AsyncFD)
-      # TODO: IPv6. Check ``sa_family``. http://stackoverflow.com/a/9212542/492186
-      retFuture.complete(
-        (address: $inet_ntoa(cast[ptr Sockaddr_in](remoteSockAddr).sin_addr),
-         client: clientSock.AsyncFD)
-      )
-
-    template failAccept(errcode): stmt =
+    template failAccept(errcode) =
       if flags.isDisconnectionError(errcode):
         var newAcceptFut = acceptAddr(socket, flags)
         newAcceptFut.callback =
@@ -867,12 +808,36 @@ when defined(windows) or defined(nimdoc):
             else:
               retFuture.complete(newAcceptFut.read)
       else:
-        retFuture.fail(newException(OSError, osErrorMsg(errcode)))
+        retFuture.fail(newOSError(errcode))
 
-    var ol = PCustomOverlapped()
-    GC_ref(ol)
+    template completeAccept() {.dirty.} =
+      var listenSock = socket
+      let setoptRet = setsockopt(clientSock, SOL_SOCKET,
+          SO_UPDATE_ACCEPT_CONTEXT, addr listenSock,
+          sizeof(listenSock).SockLen)
+      if setoptRet != 0:
+        let errcode = osLastError()
+        discard clientSock.closesocket()
+        failAccept(errcode)
+      else:
+        var localSockaddr, remoteSockaddr: ptr SockAddr
+        var localLen, remoteLen: int32
+        getAcceptExSockAddrs(addr lpOutputBuf[0], dwReceiveDataLength,
+                             dwLocalAddressLength, dwRemoteAddressLength,
+                             addr localSockaddr, addr localLen,
+                             addr remoteSockaddr, addr remoteLen)
+        try:
+          let address = getAddrString(remoteSockaddr)
+          register(clientSock.AsyncFD)
+          retFuture.complete((address: address, client: clientSock.AsyncFD))
+        except:
+          # getAddrString may raise
+          clientSock.close()
+          retFuture.fail(getCurrentException())
+
+    var ol = newCustom()
     ol.data = CompletionData(fd: socket, cb:
-      proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
+      proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) {.gcsafe.} =
         if not retFuture.finished:
           if errcode == OSErrorCode(-1):
             completeAccept()
@@ -894,25 +859,13 @@ when defined(windows) or defined(nimdoc):
         GC_unref(ol)
     else:
       completeAccept()
-      # We don't deallocate ``ol`` here because even though this completed
+      # We don't deallocate `ol` here because even though this completed
       # immediately poll will still be notified about its completion and it will
-      # free ``ol``.
+      # free `ol`.
 
     return retFuture
 
-  proc newAsyncNativeSocket*(domain, sockType, protocol: cint): AsyncFD =
-    ## Creates a new socket and registers it with the dispatcher implicitly.
-    result = newNativeSocket(domain, sockType, protocol).AsyncFD
-    result.SocketHandle.setBlocking(false)
-    register(result)
-
-  proc newAsyncNativeSocket*(domain: Domain = nativesockets.AF_INET,
-                             sockType: SockType = SOCK_STREAM,
-                             protocol: Protocol = IPPROTO_TCP): AsyncFD =
-    ## Creates a new socket and registers it with the dispatcher implicitly.
-    result = newNativeSocket(domain, sockType, protocol).AsyncFD
-    result.SocketHandle.setBlocking(false)
-    register(result)
+  implementSetInheritable()
 
   proc closeSocket*(socket: AsyncFD) =
     ## Closes a socket and ensures that it is unregistered.
@@ -920,186 +873,586 @@ when defined(windows) or defined(nimdoc):
     getGlobalDispatcher().handles.excl(socket)
 
   proc unregister*(fd: AsyncFD) =
-    ## Unregisters ``fd``.
+    ## Unregisters `fd`.
     getGlobalDispatcher().handles.excl(fd)
 
+  proc contains*(disp: PDispatcher, fd: AsyncFD): bool =
+    return fd in disp.handles
+
+  {.push stackTrace: off.}
+  proc waitableCallback(param: pointer,
+                        timerOrWaitFired: WINBOOL) {.stdcall.} =
+    var p = cast[PostCallbackDataPtr](param)
+    discard postQueuedCompletionStatus(p.ioPort, timerOrWaitFired.DWORD,
+                                       ULONG_PTR(p.handleFd),
+                                       cast[pointer](p.ovl))
+  {.pop.}
+
+  proc registerWaitableEvent(fd: AsyncFD, cb: Callback; mask: DWORD) =
+    let p = getGlobalDispatcher()
+    var flags = (WT_EXECUTEINWAITTHREAD or WT_EXECUTEONLYONCE).DWORD
+    var hEvent = wsaCreateEvent()
+    if hEvent == 0:
+      raiseOSError(osLastError())
+    var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
+    pcd.ioPort = p.ioPort
+    pcd.handleFd = fd
+    var ol = newCustom()
+
+    ol.data = CompletionData(fd: fd, cb:
+      proc(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) {.gcsafe.} =
+        # we excluding our `fd` because cb(fd) can register own handler
+        # for this `fd`
+        p.handles.excl(fd)
+        # unregisterWait() is called before callback, because appropriate
+        # winsockets function can re-enable event.
+        # https://msdn.microsoft.com/en-us/library/windows/desktop/ms741576(v=vs.85).aspx
+        if unregisterWait(pcd.waitFd) == 0:
+          let err = osLastError()
+          if err.int32 != ERROR_IO_PENDING:
+            deallocShared(cast[pointer](pcd))
+            discard wsaCloseEvent(hEvent)
+            raiseOSError(err)
+        if cb(fd):
+          # callback returned `true`, so we free all allocated resources
+          deallocShared(cast[pointer](pcd))
+          if not wsaCloseEvent(hEvent):
+            raiseOSError(osLastError())
+          # pcd.ovl will be unrefed in poll().
+        else:
+          # callback returned `false` we need to continue
+          if p.handles.contains(fd):
+            # new callback was already registered with `fd`, so we free all
+            # allocated resources. This happens because in callback `cb`
+            # addRead/addWrite was called with same `fd`.
+            deallocShared(cast[pointer](pcd))
+            if not wsaCloseEvent(hEvent):
+              raiseOSError(osLastError())
+          else:
+            # we need to include `fd` again
+            p.handles.incl(fd)
+            # and register WaitForSingleObject again
+            if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
+                                    cast[WAITORTIMERCALLBACK](waitableCallback),
+                                       cast[pointer](pcd), INFINITE, flags):
+              # pcd.ovl will be unrefed in poll()
+              let err = osLastError()
+              deallocShared(cast[pointer](pcd))
+              discard wsaCloseEvent(hEvent)
+              raiseOSError(err)
+            else:
+              # we incref `pcd.ovl` and `protect` callback one more time,
+              # because it will be unrefed and disposed in `poll()` after
+              # callback finishes.
+              GC_ref(pcd.ovl)
+              pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
+    )
+    # We need to protect our callback environment value, so GC will not free it
+    # accidentally.
+    ol.data.cell = system.protect(rawEnv(ol.data.cb))
+
+    # This is main part of `hacky way` is using WSAEventSelect, so `hEvent`
+    # will be signaled when appropriate `mask` events will be triggered.
+    if wsaEventSelect(fd.SocketHandle, hEvent, mask) != 0:
+      let err = osLastError()
+      GC_unref(ol)
+      deallocShared(cast[pointer](pcd))
+      discard wsaCloseEvent(hEvent)
+      raiseOSError(err)
+
+    pcd.ovl = ol
+    if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
+                                    cast[WAITORTIMERCALLBACK](waitableCallback),
+                                       cast[pointer](pcd), INFINITE, flags):
+      let err = osLastError()
+      GC_unref(ol)
+      deallocShared(cast[pointer](pcd))
+      discard wsaCloseEvent(hEvent)
+      raiseOSError(err)
+    p.handles.incl(fd)
+
+  proc addRead*(fd: AsyncFD, cb: Callback) =
+    ## Start watching the file descriptor for read availability and then call
+    ## the callback `cb`.
+    ##
+    ## This is not `pure` mechanism for Windows Completion Ports (IOCP),
+    ## so if you can avoid it, please do it. Use `addRead` only if really
+    ## need it (main usecase is adaptation of unix-like libraries to be
+    ## asynchronous on Windows).
+    ##
+    ## If you use this function, you don't need to use asyncdispatch.recv()
+    ## or asyncdispatch.accept(), because they are using IOCP, please use
+    ## nativesockets.recv() and nativesockets.accept() instead.
+    ##
+    ## Be sure your callback `cb` returns `true`, if you want to remove
+    ## watch of `read` notifications, and `false`, if you want to continue
+    ## receiving notifications.
+    registerWaitableEvent(fd, cb, FD_READ or FD_ACCEPT or FD_OOB or FD_CLOSE)
+
+  proc addWrite*(fd: AsyncFD, cb: Callback) =
+    ## Start watching the file descriptor for write availability and then call
+    ## the callback `cb`.
+    ##
+    ## This is not `pure` mechanism for Windows Completion Ports (IOCP),
+    ## so if you can avoid it, please do it. Use `addWrite` only if really
+    ## need it (main usecase is adaptation of unix-like libraries to be
+    ## asynchronous on Windows).
+    ##
+    ## If you use this function, you don't need to use asyncdispatch.send()
+    ## or asyncdispatch.connect(), because they are using IOCP, please use
+    ## nativesockets.send() and nativesockets.connect() instead.
+    ##
+    ## Be sure your callback `cb` returns `true`, if you want to remove
+    ## watch of `write` notifications, and `false`, if you want to continue
+    ## receiving notifications.
+    registerWaitableEvent(fd, cb, FD_WRITE or FD_CONNECT or FD_CLOSE)
+
+  template registerWaitableHandle(p, hEvent, flags, pcd, timeout,
+                                  handleCallback) =
+    let handleFD = AsyncFD(hEvent)
+    pcd.ioPort = p.ioPort
+    pcd.handleFd = handleFD
+    var ol = newCustom()
+    ol.data.fd = handleFD
+    ol.data.cb = handleCallback
+    # We need to protect our callback environment value, so GC will not free it
+    # accidentally.
+    ol.data.cell = system.protect(rawEnv(ol.data.cb))
+
+    pcd.ovl = ol
+    if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
+                                    cast[WAITORTIMERCALLBACK](waitableCallback),
+                                    cast[pointer](pcd), timeout.DWORD, flags):
+      let err = osLastError()
+      GC_unref(ol)
+      deallocShared(cast[pointer](pcd))
+      discard closeHandle(hEvent)
+      raiseOSError(err)
+    p.handles.incl(handleFD)
+
+  template closeWaitable(handle: untyped) =
+    let waitFd = pcd.waitFd
+    deallocShared(cast[pointer](pcd))
+    p.handles.excl(fd)
+    if unregisterWait(waitFd) == 0:
+      let err = osLastError()
+      if err.int32 != ERROR_IO_PENDING:
+        discard closeHandle(handle)
+        raiseOSError(err)
+    if closeHandle(handle) == 0:
+      raiseOSError(osLastError())
+
+  proc addTimer*(timeout: int, oneshot: bool, cb: Callback) =
+    ## Registers callback `cb` to be called when timer expired.
+    ##
+    ## Parameters:
+    ##
+    ## * `timeout` - timeout value in milliseconds.
+    ## * `oneshot`
+    ##   * `true` - generate only one timeout event
+    ##   * `false` - generate timeout events periodically
+
+    doAssert(timeout > 0)
+    let p = getGlobalDispatcher()
+
+    var hEvent = createEvent(nil, 1, 0, nil)
+    if hEvent == INVALID_HANDLE_VALUE:
+      raiseOSError(osLastError())
+
+    var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
+    var flags = WT_EXECUTEINWAITTHREAD.DWORD
+    if oneshot: flags = flags or WT_EXECUTEONLYONCE
+
+    proc timercb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
+      let res = cb(fd)
+      if res or oneshot:
+        closeWaitable(hEvent)
+      else:
+        # if callback returned `false`, then it wants to be called again, so
+        # we need to ref and protect `pcd.ovl` again, because it will be
+        # unrefed and disposed in `poll()`.
+        GC_ref(pcd.ovl)
+        pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
+
+    registerWaitableHandle(p, hEvent, flags, pcd, timeout, timercb)
+
+  proc addProcess*(pid: int, cb: Callback) =
+    ## Registers callback `cb` to be called when process with process ID
+    ## `pid` exited.
+    const NULL = Handle(0)
+    let p = getGlobalDispatcher()
+    let procFlags = SYNCHRONIZE
+    var hProcess = openProcess(procFlags, 0, pid.DWORD)
+    if hProcess == NULL:
+      raiseOSError(osLastError())
+
+    var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
+    var flags = WT_EXECUTEINWAITTHREAD.DWORD or WT_EXECUTEONLYONCE.DWORD
+
+    proc proccb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
+      closeWaitable(hProcess)
+      discard cb(fd)
+
+    registerWaitableHandle(p, hProcess, flags, pcd, INFINITE, proccb)
+
+  proc newAsyncEvent*(): AsyncEvent =
+    ## Creates a new thread-safe `AsyncEvent` object.
+    ##
+    ## New `AsyncEvent` object is not automatically registered with
+    ## dispatcher like `AsyncSocket`.
+    var sa = SECURITY_ATTRIBUTES(
+      nLength: sizeof(SECURITY_ATTRIBUTES).cint,
+      bInheritHandle: 1
+    )
+    var event = createEvent(addr(sa), 0'i32, 0'i32, nil)
+    if event == INVALID_HANDLE_VALUE:
+      raiseOSError(osLastError())
+    result = cast[AsyncEvent](allocShared0(sizeof(AsyncEventImpl)))
+    result.hEvent = event
+
+  proc trigger*(ev: AsyncEvent) =
+    ## Set event `ev` to signaled state.
+    if setEvent(ev.hEvent) == 0:
+      raiseOSError(osLastError())
+
+  proc unregister*(ev: AsyncEvent) =
+    ## Unregisters event `ev`.
+    doAssert(ev.hWaiter != 0, "Event is not registered in the queue!")
+    let p = getGlobalDispatcher()
+    p.handles.excl(AsyncFD(ev.hEvent))
+    if unregisterWait(ev.hWaiter) == 0:
+      let err = osLastError()
+      if err.int32 != ERROR_IO_PENDING:
+        raiseOSError(err)
+    ev.hWaiter = 0
+
+  proc close*(ev: AsyncEvent) =
+    ## Closes event `ev`.
+    let res = closeHandle(ev.hEvent)
+    deallocShared(cast[pointer](ev))
+    if res == 0:
+      raiseOSError(osLastError())
+
+  proc addEvent*(ev: AsyncEvent, cb: Callback) =
+    ## Registers callback `cb` to be called when `ev` will be signaled
+    doAssert(ev.hWaiter == 0, "Event is already registered in the queue!")
+
+    let p = getGlobalDispatcher()
+    let hEvent = ev.hEvent
+
+    var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
+    var flags = WT_EXECUTEINWAITTHREAD.DWORD
+
+    proc eventcb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
+      if ev.hWaiter != 0:
+        if cb(fd):
+          # we need this check to avoid exception, if `unregister(event)` was
+          # called in callback.
+          deallocShared(cast[pointer](pcd))
+          if ev.hWaiter != 0:
+            unregister(ev)
+        else:
+          # if callback returned `false`, then it wants to be called again, so
+          # we need to ref and protect `pcd.ovl` again, because it will be
+          # unrefed and disposed in `poll()`.
+          GC_ref(pcd.ovl)
+          pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
+      else:
+        # if ev.hWaiter == 0, then event was unregistered before `poll()` call.
+        deallocShared(cast[pointer](pcd))
+
+    registerWaitableHandle(p, hEvent, flags, pcd, INFINITE, eventcb)
+    ev.hWaiter = pcd.waitFd
+
   initAll()
 else:
-  import selectors
-  when defined(windows):
-    import winlean
-    const
-      EINTR = WSAEINPROGRESS
-      EINPROGRESS = WSAEINPROGRESS
-      EWOULDBLOCK = WSAEWOULDBLOCK
-      EAGAIN = EINPROGRESS
-      MSG_NOSIGNAL = 0
-  else:
-    from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK,
-                      MSG_NOSIGNAL
-
+  import std/selectors
+  from std/posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK,
+                    MSG_NOSIGNAL
+  when declared(posix.accept4):
+    from std/posix import accept4, SOCK_CLOEXEC
+  when defined(genode):
+    import genode/env # get the implicit Genode env
+    import genode/signals
+
+  const
+    InitCallbackListSize = 4         # initial size of callbacks sequence,
+                                     # associated with file/socket descriptor.
+    InitDelayedCallbackListSize = 64 # initial size of delayed callbacks
+                                     # queue.
   type
     AsyncFD* = distinct cint
-    Callback = proc (fd: AsyncFD): bool {.closure,gcsafe.}
+    Callback* = proc (fd: AsyncFD): bool {.closure, gcsafe.}
 
-    PData* = ref object of RootRef
-      fd: AsyncFD
-      readCBs: seq[Callback]
-      writeCBs: seq[Callback]
+    AsyncData = object
+      readList: seq[Callback]
+      writeList: seq[Callback]
+
+    AsyncEvent* = distinct SelectEvent
 
     PDispatcher* = ref object of PDispatcherBase
-      selector: Selector
-  {.deprecated: [TAsyncFD: AsyncFD, TCallback: Callback].}
+      selector: Selector[AsyncData]
+      when defined(genode):
+        signalHandler: SignalHandler
 
   proc `==`*(x, y: AsyncFD): bool {.borrow.}
+  proc `==`*(x, y: AsyncEvent): bool {.borrow.}
+
+  template newAsyncData(): AsyncData =
+    AsyncData(
+      readList: newSeqOfCap[Callback](InitCallbackListSize),
+      writeList: newSeqOfCap[Callback](InitCallbackListSize)
+    )
 
-  proc newDispatcher*(): PDispatcher =
+  proc newDispatcher*(): owned(PDispatcher) =
     new result
-    result.selector = newSelector()
-    result.timers = @[]
+    result.selector = newSelector[AsyncData]()
+    result.timers.clear()
+    result.callbacks = initDeque[proc () {.closure, gcsafe.}](InitDelayedCallbackListSize)
+    when defined(genode):
+      let entrypoint = ep(cast[GenodeEnv](runtimeEnv))
+      result.signalHandler = newSignalHandler(entrypoint):
+        discard runOnce(0)
+
+  var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher
+
+  when defined(nuttx):
+    import std/exitprocs
+
+    proc cleanDispatcher() {.noconv.} =
+      gDisp = nil
+
+    proc addFinalyzer() =
+      addExitProc(cleanDispatcher)
+
+  proc setGlobalDispatcher*(disp: owned PDispatcher) =
+    if not gDisp.isNil:
+      assert gDisp.callbacks.len == 0
+    gDisp = disp
+    initCallSoonProc()
 
-  var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
   proc getGlobalDispatcher*(): PDispatcher =
-    if gDisp.isNil: gDisp = newDispatcher()
+    if gDisp.isNil:
+      setGlobalDispatcher(newDispatcher())
+      when defined(nuttx):
+        addFinalyzer()
     result = gDisp
 
-  proc update(fd: AsyncFD, events: set[Event]) =
-    let p = getGlobalDispatcher()
-    assert fd.SocketHandle in p.selector
-    p.selector.update(fd.SocketHandle, events)
+  proc getIoHandler*(disp: PDispatcher): Selector[AsyncData] =
+    return disp.selector
 
   proc register*(fd: AsyncFD) =
     let p = getGlobalDispatcher()
-    var data = PData(fd: fd, readCBs: @[], writeCBs: @[])
-    p.selector.register(fd.SocketHandle, {}, data.RootRef)
-
-  proc newAsyncNativeSocket*(domain: cint, sockType: cint,
-                             protocol: cint): AsyncFD =
-    result = newNativeSocket(domain, sockType, protocol).AsyncFD
-    result.SocketHandle.setBlocking(false)
-    when defined(macosx):
-      result.SocketHandle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1)
-    register(result)
-
-  proc newAsyncNativeSocket*(domain: Domain = AF_INET,
-                             sockType: SockType = SOCK_STREAM,
-                             protocol: Protocol = IPPROTO_TCP): AsyncFD =
-    result = newNativeSocket(domain, sockType, protocol).AsyncFD
-    result.SocketHandle.setBlocking(false)
-    when defined(macosx):
-      result.SocketHandle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1)
-    register(result)
-
-  proc closeSocket*(sock: AsyncFD) =
-    let disp = getGlobalDispatcher()
-    disp.selector.unregister(sock.SocketHandle)
-    sock.SocketHandle.close()
+    var data = newAsyncData()
+    p.selector.registerHandle(fd.SocketHandle, {}, data)
 
   proc unregister*(fd: AsyncFD) =
     getGlobalDispatcher().selector.unregister(fd.SocketHandle)
 
+  proc unregister*(ev: AsyncEvent) =
+    getGlobalDispatcher().selector.unregister(SelectEvent(ev))
+
+  proc contains*(disp: PDispatcher, fd: AsyncFD): bool =
+    return fd.SocketHandle in disp.selector
+
   proc addRead*(fd: AsyncFD, cb: Callback) =
     let p = getGlobalDispatcher()
-    if fd.SocketHandle notin p.selector:
+    var newEvents = {Event.Read}
+    withData(p.selector, fd.SocketHandle, adata) do:
+      adata.readList.add(cb)
+      newEvents.incl(Event.Read)
+      if len(adata.writeList) != 0: newEvents.incl(Event.Write)
+    do:
       raise newException(ValueError, "File descriptor not registered.")
-    p.selector[fd.SocketHandle].data.PData.readCBs.add(cb)
-    update(fd, p.selector[fd.SocketHandle].events + {EvRead})
+    p.selector.updateHandle(fd.SocketHandle, newEvents)
 
   proc addWrite*(fd: AsyncFD, cb: Callback) =
     let p = getGlobalDispatcher()
-    if fd.SocketHandle notin p.selector:
+    var newEvents = {Event.Write}
+    withData(p.selector, fd.SocketHandle, adata) do:
+      adata.writeList.add(cb)
+      newEvents.incl(Event.Write)
+      if len(adata.readList) != 0: newEvents.incl(Event.Read)
+    do:
       raise newException(ValueError, "File descriptor not registered.")
-    p.selector[fd.SocketHandle].data.PData.writeCBs.add(cb)
-    update(fd, p.selector[fd.SocketHandle].events + {EvWrite})
+    p.selector.updateHandle(fd.SocketHandle, newEvents)
 
-  proc poll*(timeout = 500) =
+  proc hasPendingOperations*(): bool =
     let p = getGlobalDispatcher()
-    for info in p.selector.select(timeout):
-      let data = PData(info.key.data)
-      assert data.fd == info.key.fd.AsyncFD
-      #echo("In poll ", data.fd.cint)
-      # There may be EvError here, but we handle them in callbacks,
-      # so that exceptions can be raised from `send(...)` and
-      # `recv(...)` routines.
-
-      if EvRead in info.events:
-        # Callback may add items to ``data.readCBs`` which causes issues if
-        # we are iterating over ``data.readCBs`` at the same time. We therefore
-        # make a copy to iterate over.
-        let currentCBs = data.readCBs
-        data.readCBs = @[]
-        for cb in currentCBs:
-          if not cb(data.fd):
-            # Callback wants to be called again.
-            data.readCBs.add(cb)
-
-      if EvWrite in info.events:
-        let currentCBs = data.writeCBs
-        data.writeCBs = @[]
-        for cb in currentCBs:
-          if not cb(data.fd):
-            # Callback wants to be called again.
-            data.writeCBs.add(cb)
-
-      if info.key in p.selector:
-        var newEvents: set[Event]
-        if data.readCBs.len != 0: newEvents = {EvRead}
-        if data.writeCBs.len != 0: newEvents = newEvents + {EvWrite}
-        if newEvents != info.key.events:
-          update(data.fd, newEvents)
+    not p.selector.isEmpty() or p.timers.len != 0 or p.callbacks.len != 0
+
+  proc prependSeq(dest: var seq[Callback]; src: sink seq[Callback]) =
+    var old = move dest
+    dest = src
+    for i in 0..high(old):
+      dest.add(move old[i])
+
+  proc processBasicCallbacks(
+    fd: AsyncFD, event: Event
+  ): tuple[readCbListCount, writeCbListCount: int] =
+    # Process pending descriptor and AsyncEvent callbacks.
+    #
+    # Invoke every callback stored in `rwlist`, until one
+    # returns `false` (which means callback wants to stay
+    # alive). In such case all remaining callbacks will be added
+    # to `rwlist` again, in the order they have been inserted.
+    #
+    # `rwlist` associated with file descriptor MUST BE emptied before
+    # dispatching callback (See https://github.com/nim-lang/Nim/issues/5128),
+    # or it can be possible to fall into endless cycle.
+    var curList: seq[Callback]
+
+    let selector = getGlobalDispatcher().selector
+    withData(selector, fd.int, fdData):
+      case event
+      of Event.Read:
+        #shallowCopy(curList, fdData.readList)
+        curList = move fdData.readList
+        fdData.readList = newSeqOfCap[Callback](InitCallbackListSize)
+      of Event.Write:
+        #shallowCopy(curList, fdData.writeList)
+        curList = move fdData.writeList
+        fdData.writeList = newSeqOfCap[Callback](InitCallbackListSize)
       else:
-        # FD no longer a part of the selector. Likely been closed
-        # (e.g. socket disconnected).
-        discard
+        assert false, "Cannot process callbacks for " & $event
+
+    let newLength = max(len(curList), InitCallbackListSize)
+    var newList = newSeqOfCap[Callback](newLength)
+
+    var eventsExtinguished = false
+    for cb in curList:
+      if eventsExtinguished:
+        newList.add(cb)
+      elif not cb(fd):
+        # Callback wants to be called again.
+        newList.add(cb)
+        # This callback has returned with EAGAIN, so we don't need to
+        # call any other callbacks as they are all waiting for the same event
+        # on the same fd.
+        # We do need to ensure they are called again though.
+        eventsExtinguished = true
+
+    withData(selector, fd.int, fdData) do:
+      # Descriptor is still present in the queue.
+      case event
+      of Event.Read: prependSeq(fdData.readList, newList)
+      of Event.Write: prependSeq(fdData.writeList, newList)
+      else:
+        assert false, "Cannot process callbacks for " & $event
+
+      result.readCbListCount = len(fdData.readList)
+      result.writeCbListCount = len(fdData.writeList)
+    do:
+      # Descriptor was unregistered in callback via `unregister()`.
+      result.readCbListCount = -1
+      result.writeCbListCount = -1
+
+  proc processCustomCallbacks(p: PDispatcher; fd: AsyncFD) =
+    # Process pending custom event callbacks. Custom events are
+    # {Event.Timer, Event.Signal, Event.Process, Event.Vnode}.
+    # There can be only one callback registered with one descriptor,
+    # so there is no need to iterate over list.
+    var curList: seq[Callback]
+
+    withData(p.selector, fd.int, adata) do:
+      curList = move adata.readList
+      adata.readList = newSeqOfCap[Callback](InitCallbackListSize)
+
+    let newLength = len(curList)
+    var newList = newSeqOfCap[Callback](newLength)
+
+    var cb = curList[0]
+    if not cb(fd):
+      newList.add(cb)
+
+    withData(p.selector, fd.int, adata) do:
+      # descriptor still present in queue.
+      adata.readList = newList & adata.readList
+      if len(adata.readList) == 0:
+        # if no callbacks registered with descriptor, unregister it.
+        p.selector.unregister(fd.int)
+    do:
+      # descriptor was unregistered in callback via `unregister()`.
+      discard
+
+  implementSetInheritable()
 
-    processTimers(p)
+  proc closeSocket*(sock: AsyncFD) =
+    let selector = getGlobalDispatcher().selector
+    if sock.SocketHandle notin selector:
+      raise newException(ValueError, "File descriptor not registered.")
 
-  proc connect*(socket: AsyncFD, address: string, port: Port,
-    domain = AF_INET): Future[void] =
-    var retFuture = newFuture[void]("connect")
+    let data = selector.getData(sock.SocketHandle)
+    sock.unregister()
+    sock.SocketHandle.close()
+    # We need to unblock the read and write callbacks which could still be
+    # waiting for the socket to become readable and/or writeable.
+    for cb in data.readList & data.writeList:
+      if not cb(sock):
+        raise newException(
+          ValueError, "Expecting async operations to stop when fd has closed."
+        )
+
+  proc runOnce(timeout: int): bool =
+    let p = getGlobalDispatcher()
+    if p.selector.isEmpty() and p.timers.len == 0 and p.callbacks.len == 0:
+      when defined(genode):
+        if timeout == 0: return
+      raise newException(ValueError,
+        "No handles or timers registered in dispatcher.")
 
-    proc cb(fd: AsyncFD): bool =
-      var ret = SocketHandle(fd).getSockOptInt(cint(SOL_SOCKET), cint(SO_ERROR))
-      if ret == 0:
-          # We have connected.
-          retFuture.complete()
-          return true
-      elif ret == EINTR:
-          # interrupted, keep waiting
-          return false
-      else:
-          retFuture.fail(newException(OSError, osErrorMsg(OSErrorCode(ret))))
-          return true
+    result = false
+    var keys: array[64, ReadyKey]
+    let nextTimer = processTimers(p, result)
+    var count =
+      p.selector.selectInto(adjustTimeout(p, timeout, nextTimer), keys)
+    for i in 0..<count:
+      let fd = keys[i].fd.AsyncFD
+      let events = keys[i].events
+      var (readCbListCount, writeCbListCount) = (0, 0)
+
+      if Event.Read in events or events == {Event.Error}:
+        (readCbListCount, writeCbListCount) =
+          processBasicCallbacks(fd, Event.Read)
+        result = true
+
+      if Event.Write in events or events == {Event.Error}:
+        (readCbListCount, writeCbListCount) =
+          processBasicCallbacks(fd, Event.Write)
+        result = true
+
+      var isCustomEvent = false
+      if Event.User in events:
+        (readCbListCount, writeCbListCount) =
+          processBasicCallbacks(fd, Event.Read)
+        isCustomEvent = true
+        if readCbListCount == 0:
+          p.selector.unregister(fd.int)
+        result = true
+
+      when ioselSupportedPlatform:
+        const customSet = {Event.Timer, Event.Signal, Event.Process,
+                           Event.Vnode}
+        if (customSet * events) != {}:
+          isCustomEvent = true
+          processCustomCallbacks(p, fd)
+          result = true
 
-    assert getSockDomain(socket.SocketHandle) == domain
-    var aiList = getAddrInfo(address, port, domain)
-    var success = false
-    var lastError: OSErrorCode
-    var it = aiList
-    while it != nil:
-      var ret = connect(socket.SocketHandle, it.ai_addr, it.ai_addrlen.Socklen)
-      if ret == 0:
-        # Request to connect completed immediately.
-        success = true
-        retFuture.complete()
-        break
-      else:
-        lastError = osLastError()
-        if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS:
-          success = true
-          addWrite(socket, cb)
-          break
-        else:
-          success = false
-      it = it.ai_next
+      # because state `data` can be modified in callback we need to update
+      # descriptor events with currently registered callbacks.
+      if not isCustomEvent and (readCbListCount != -1 and writeCbListCount != -1):
+        var newEvents: set[Event] = {}
+        if readCbListCount > 0: incl(newEvents, Event.Read)
+        if writeCbListCount > 0: incl(newEvents, Event.Write)
+        p.selector.updateHandle(SocketHandle(fd), newEvents)
 
-    dealloc(aiList)
-    if not success:
-      retFuture.fail(newException(OSError, osErrorMsg(lastError)))
-    return retFuture
+    # Timer processing.
+    discard processTimers(p, result)
+    # Callback queue processing
+    processPendingCallbacks(p, result)
 
   proc recv*(socket: AsyncFD, size: int,
-             flags = {SocketFlag.SafeDisconn}): Future[string] =
+             flags = {SocketFlag.SafeDisconn}): owned(Future[string]) =
     var retFuture = newFuture[string]("recv")
 
     var readBuffer = newString(size)
@@ -1110,11 +1463,12 @@ else:
                      flags.toOSFlags())
       if res < 0:
         let lastError = osLastError()
-        if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
+        if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
+           lastError.int32 != EAGAIN:
           if flags.isDisconnectionError(lastError):
             retFuture.complete("")
           else:
-            retFuture.fail(newException(OSError, osErrorMsg(lastError)))
+            retFuture.fail(newOSError(lastError))
         else:
           result = false # We still want this callback to be called.
       elif res == 0:
@@ -1128,8 +1482,8 @@ else:
     addRead(socket, cb)
     return retFuture
 
-  proc recvInto*(socket: AsyncFD, buf: cstring, size: int,
-                  flags = {SocketFlag.SafeDisconn}): Future[int] =
+  proc recvInto*(socket: AsyncFD, buf: pointer, size: int,
+                 flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
     var retFuture = newFuture[int]("recvInto")
 
     proc cb(sock: AsyncFD): bool =
@@ -1138,11 +1492,12 @@ else:
                      flags.toOSFlags())
       if res < 0:
         let lastError = osLastError()
-        if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
+        if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
+           lastError.int32 != EAGAIN:
           if flags.isDisconnectionError(lastError):
             retFuture.complete(0)
           else:
-            retFuture.fail(newException(OSError, osErrorMsg(lastError)))
+            retFuture.fail(newOSError(lastError))
         else:
           result = false # We still want this callback to be called.
       else:
@@ -1152,25 +1507,27 @@ else:
     addRead(socket, cb)
     return retFuture
 
-  proc send*(socket: AsyncFD, data: string,
-             flags = {SocketFlag.SafeDisconn}): Future[void] =
+  proc send*(socket: AsyncFD, buf: pointer, size: int,
+             flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
     var retFuture = newFuture[void]("send")
 
     var written = 0
 
     proc cb(sock: AsyncFD): bool =
       result = true
-      let netSize = data.len-written
-      var d = data.cstring
+      let netSize = size-written
+      var d = cast[cstring](buf)
       let res = send(sock.SocketHandle, addr d[written], netSize.cint,
                      MSG_NOSIGNAL)
       if res < 0:
         let lastError = osLastError()
-        if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
+        if lastError.int32 != EINTR and
+           lastError.int32 != EWOULDBLOCK and
+           lastError.int32 != EAGAIN:
           if flags.isDisconnectionError(lastError):
             retFuture.complete()
           else:
-            retFuture.fail(newException(OSError, osErrorMsg(lastError)))
+            retFuture.fail(newOSError(lastError))
         else:
           result = false # We still want this callback to be called.
       else:
@@ -1184,47 +1541,427 @@ else:
     addWrite(socket, cb)
     return retFuture
 
-  proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}):
-      Future[tuple[address: string, client: AsyncFD]] =
+  proc sendTo*(socket: AsyncFD, data: pointer, size: int, saddr: ptr SockAddr,
+               saddrLen: SockLen,
+               flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
+    ## Sends `data` of size `size` in bytes to specified destination
+    ## (`saddr` of size `saddrLen` in bytes, using socket `socket`.
+    ## The returned future will complete once all data has been sent.
+    var retFuture = newFuture[void]("sendTo")
+
+    # we will preserve address in our stack
+    var staddr: array[128, char] # SOCKADDR_STORAGE size is 128 bytes
+    var stalen = saddrLen
+    zeroMem(addr(staddr[0]), 128)
+    copyMem(addr(staddr[0]), saddr, saddrLen)
+
+    proc cb(sock: AsyncFD): bool =
+      result = true
+      let res = sendto(sock.SocketHandle, data, size, MSG_NOSIGNAL,
+                       cast[ptr SockAddr](addr(staddr[0])), stalen)
+      if res < 0:
+        let lastError = osLastError()
+        if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
+           lastError.int32 != EAGAIN:
+          retFuture.fail(newOSError(lastError))
+        else:
+          result = false # We still want this callback to be called.
+      else:
+        retFuture.complete()
+
+    addWrite(socket, cb)
+    return retFuture
+
+  proc recvFromInto*(socket: AsyncFD, data: pointer, size: int,
+                     saddr: ptr SockAddr, saddrLen: ptr SockLen,
+                     flags = {SocketFlag.SafeDisconn}): owned(Future[int]) =
+    ## Receives a datagram data from `socket` into `data`, which must
+    ## be at least of size `size` in bytes, address of datagram's sender
+    ## will be stored into `saddr` and `saddrLen`. Returned future will
+    ## complete once one datagram has been received, and will return size
+    ## of packet received.
+    var retFuture = newFuture[int]("recvFromInto")
+    proc cb(sock: AsyncFD): bool =
+      result = true
+      let res = recvfrom(sock.SocketHandle, data, size.cint, flags.toOSFlags(),
+                         saddr, saddrLen)
+      if res < 0:
+        let lastError = osLastError()
+        if lastError.int32 != EINTR and lastError.int32 != EWOULDBLOCK and
+           lastError.int32 != EAGAIN:
+          retFuture.fail(newOSError(lastError))
+        else:
+          result = false
+      else:
+        retFuture.complete(res)
+    addRead(socket, cb)
+    return retFuture
+
+  proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn},
+                   inheritable = defined(nimInheritHandles)):
+      owned(Future[tuple[address: string, client: AsyncFD]]) =
     var retFuture = newFuture[tuple[address: string,
         client: AsyncFD]]("acceptAddr")
-    proc cb(sock: AsyncFD): bool =
+    proc cb(sock: AsyncFD): bool {.gcsafe.} =
       result = true
       var sockAddress: Sockaddr_storage
-      var addrLen = sizeof(sockAddress).Socklen
-      var client = accept(sock.SocketHandle,
-                          cast[ptr SockAddr](addr(sockAddress)), addr(addrLen))
+      var addrLen = sizeof(sockAddress).SockLen
+      var client =
+        when declared(accept4):
+          accept4(sock.SocketHandle, cast[ptr SockAddr](addr(sockAddress)),
+                  addr(addrLen), if inheritable: 0 else: SOCK_CLOEXEC)
+        else:
+          accept(sock.SocketHandle, cast[ptr SockAddr](addr(sockAddress)),
+                 addr(addrLen))
+      when declared(setInheritable) and not declared(accept4):
+        if client != osInvalidSocket and not setInheritable(client, inheritable):
+          # Set failure first because close() itself can fail,
+          # altering osLastError().
+          retFuture.fail(newOSError(osLastError()))
+          close client
+          return false
+
       if client == osInvalidSocket:
         let lastError = osLastError()
-        assert lastError.int32 notin {EWOULDBLOCK, EAGAIN}
+        assert lastError.int32 != EWOULDBLOCK and lastError.int32 != EAGAIN
         if lastError.int32 == EINTR:
           return false
         else:
           if flags.isDisconnectionError(lastError):
             return false
           else:
-            retFuture.fail(newException(OSError, osErrorMsg(lastError)))
+            retFuture.fail(newOSError(lastError))
       else:
-        register(client.AsyncFD)
-        retFuture.complete((getAddrString(cast[ptr SockAddr](addr sockAddress)), client.AsyncFD))
+        try:
+          let address = getAddrString(cast[ptr SockAddr](addr sockAddress))
+          register(client.AsyncFD)
+          retFuture.complete((address, client.AsyncFD))
+        except:
+          # getAddrString may raise
+          client.close()
+          retFuture.fail(getCurrentException())
     addRead(socket, cb)
     return retFuture
 
-proc sleepAsync*(ms: int): Future[void] =
+  when ioselSupportedPlatform:
+
+    proc addTimer*(timeout: int, oneshot: bool, cb: Callback) =
+      ## Start watching for timeout expiration, and then call the
+      ## callback `cb`.
+      ## `timeout` - time in milliseconds,
+      ## `oneshot` - if `true` only one event will be dispatched,
+      ## if `false` continuous events every `timeout` milliseconds.
+      let p = getGlobalDispatcher()
+      var data = newAsyncData()
+      data.readList.add(cb)
+      p.selector.registerTimer(timeout, oneshot, data)
+
+    proc addSignal*(signal: int, cb: Callback) =
+      ## Start watching signal `signal`, and when signal appears, call the
+      ## callback `cb`.
+      let p = getGlobalDispatcher()
+      var data = newAsyncData()
+      data.readList.add(cb)
+      p.selector.registerSignal(signal, data)
+
+    proc addProcess*(pid: int, cb: Callback) =
+      ## Start watching for process exit with pid `pid`, and then call
+      ## the callback `cb`.
+      let p = getGlobalDispatcher()
+      var data = newAsyncData()
+      data.readList.add(cb)
+      p.selector.registerProcess(pid, data)
+
+  proc newAsyncEvent*(): AsyncEvent =
+    ## Creates new `AsyncEvent`.
+    result = AsyncEvent(newSelectEvent())
+
+  proc trigger*(ev: AsyncEvent) =
+    ## Sets new `AsyncEvent` to signaled state.
+    trigger(SelectEvent(ev))
+
+  proc close*(ev: AsyncEvent) =
+    ## Closes `AsyncEvent`
+    close(SelectEvent(ev))
+
+  proc addEvent*(ev: AsyncEvent, cb: Callback) =
+    ## Start watching for event `ev`, and call callback `cb`, when
+    ## ev will be set to signaled state.
+    let p = getGlobalDispatcher()
+    var data = newAsyncData()
+    data.readList.add(cb)
+    p.selector.registerEvent(SelectEvent(ev), data)
+
+proc drain*(timeout = 500) =
+  ## Waits for completion of **all** events and processes them. Raises `ValueError`
+  ## if there are no pending operations. In contrast to `poll` this
+  ## processes as many events as are available until the timeout has elapsed.
+  var curTimeout = timeout
+  let start = now()
+  while hasPendingOperations():
+    discard runOnce(curTimeout)
+    curTimeout -= (now() - start).inMilliseconds.int
+    if curTimeout < 0:
+      break
+
+proc poll*(timeout = 500) =
+  ## Waits for completion events and processes them. Raises `ValueError`
+  ## if there are no pending operations. This runs the underlying OS
+  ## `epoll`:idx: or `kqueue`:idx: primitive only once.
+  discard runOnce(timeout)
+
+template createAsyncNativeSocketImpl(domain, sockType, protocol: untyped,
+                                     inheritable = defined(nimInheritHandles)) =
+  let handle = createNativeSocket(domain, sockType, protocol, inheritable)
+  if handle == osInvalidSocket:
+    return osInvalidSocket.AsyncFD
+  handle.setBlocking(false)
+  when defined(macosx) and not defined(nimdoc):
+    handle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1)
+  result = handle.AsyncFD
+  register(result)
+
+proc createAsyncNativeSocket*(domain: cint, sockType: cint,
+                              protocol: cint,
+                              inheritable = defined(nimInheritHandles)): AsyncFD =
+  createAsyncNativeSocketImpl(domain, sockType, protocol, inheritable)
+
+proc createAsyncNativeSocket*(domain: Domain = Domain.AF_INET,
+                              sockType: SockType = SOCK_STREAM,
+                              protocol: Protocol = IPPROTO_TCP,
+                              inheritable = defined(nimInheritHandles)): AsyncFD =
+  createAsyncNativeSocketImpl(domain, sockType, protocol, inheritable)
+
+when defined(windows) or defined(nimdoc):
+  proc bindToDomain(handle: SocketHandle, domain: Domain) =
+    # Extracted into a separate proc, because connect() on Windows requires
+    # the socket to be initially bound.
+    template doBind(saddr) =
+      if bindAddr(handle, cast[ptr SockAddr](addr(saddr)),
+                  sizeof(saddr).SockLen) < 0'i32:
+        raiseOSError(osLastError())
+
+    if domain == Domain.AF_INET6:
+      var saddr: Sockaddr_in6
+      saddr.sin6_family = uint16(toInt(domain))
+      doBind(saddr)
+    else:
+      var saddr: Sockaddr_in
+      saddr.sin_family = uint16(toInt(domain))
+      doBind(saddr)
+
+  proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): owned(Future[void]) =
+    let retFuture = newFuture[void]("doConnect")
+    result = retFuture
+
+    var ol = newCustom()
+    ol.data = CompletionData(fd: socket, cb:
+      proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
+        if not retFuture.finished:
+          if errcode == OSErrorCode(-1):
+            const SO_UPDATE_CONNECT_CONTEXT = 0x7010
+            socket.SocketHandle.setSockOptInt(SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, 1) # 15022
+            retFuture.complete()
+          else:
+            retFuture.fail(newOSError(errcode))
+    )
+
+    let ret = connectEx(socket.SocketHandle, addrInfo.ai_addr,
+                        cint(addrInfo.ai_addrlen), nil, 0, nil,
+                        cast[POVERLAPPED](ol))
+    if ret:
+      # Request to connect completed immediately.
+      retFuture.complete()
+      # We don't deallocate `ol` here because even though this completed
+      # immediately poll will still be notified about its completion and it
+      # will free `ol`.
+    else:
+      let lastError = osLastError()
+      if lastError.int32 != ERROR_IO_PENDING:
+        # With ERROR_IO_PENDING `ol` will be deallocated in `poll`,
+        # and the future will be completed/failed there, too.
+        GC_unref(ol)
+        retFuture.fail(newOSError(lastError))
+else:
+  proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): owned(Future[void]) =
+    let retFuture = newFuture[void]("doConnect")
+    result = retFuture
+
+    proc cb(fd: AsyncFD): bool =
+      let ret = SocketHandle(fd).getSockOptInt(
+        cint(SOL_SOCKET), cint(SO_ERROR))
+      if ret == 0:
+        # We have connected.
+        retFuture.complete()
+        return true
+      elif ret == EINTR:
+        # interrupted, keep waiting
+        return false
+      else:
+        retFuture.fail(newOSError(OSErrorCode(ret)))
+        return true
+
+    let ret = connect(socket.SocketHandle,
+                      addrInfo.ai_addr,
+                      addrInfo.ai_addrlen.SockLen)
+    if ret == 0:
+      # Request to connect completed immediately.
+      retFuture.complete()
+    else:
+      let lastError = osLastError()
+      if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS:
+        addWrite(socket, cb)
+      else:
+        retFuture.fail(newOSError(lastError))
+
+template asyncAddrInfoLoop(addrInfo: ptr AddrInfo, fd: untyped,
+                           protocol: Protocol = IPPROTO_RAW) =
+  ## Iterates through the AddrInfo linked list asynchronously
+  ## until the connection can be established.
+  const shouldCreateFd = not declared(fd)
+
+  when shouldCreateFd:
+    let sockType = protocol.toSockType()
+
+    var fdPerDomain: array[low(Domain).ord..high(Domain).ord, AsyncFD]
+    for i in low(fdPerDomain)..high(fdPerDomain):
+      fdPerDomain[i] = osInvalidSocket.AsyncFD
+    template closeUnusedFds(domainToKeep = -1) {.dirty.} =
+      for i, fd in fdPerDomain:
+        if fd != osInvalidSocket.AsyncFD and i != domainToKeep:
+          fd.closeSocket()
+
+  var lastException: ref Exception
+  var curAddrInfo = addrInfo
+  var domain: Domain
+  when shouldCreateFd:
+    var curFd: AsyncFD
+  else:
+    var curFd = fd
+  proc tryNextAddrInfo(fut: Future[void]) {.gcsafe.} =
+    if fut == nil or fut.failed:
+      if fut != nil:
+        lastException = fut.readError()
+
+      while curAddrInfo != nil:
+        let domainOpt = curAddrInfo.ai_family.toKnownDomain()
+        if domainOpt.isSome:
+          domain = domainOpt.unsafeGet()
+          break
+        curAddrInfo = curAddrInfo.ai_next
+
+      if curAddrInfo == nil:
+        freeAddrInfo(addrInfo)
+        when shouldCreateFd:
+          closeUnusedFds()
+        if lastException != nil:
+          retFuture.fail(lastException)
+        else:
+          retFuture.fail(newException(
+            IOError, "Couldn't resolve address: " & address))
+        return
+
+      when shouldCreateFd:
+        curFd = fdPerDomain[ord(domain)]
+        if curFd == osInvalidSocket.AsyncFD:
+          try:
+            curFd = createAsyncNativeSocket(domain, sockType, protocol)
+          except:
+            freeAddrInfo(addrInfo)
+            closeUnusedFds()
+            raise getCurrentException()
+          when defined(windows):
+            curFd.SocketHandle.bindToDomain(domain)
+          fdPerDomain[ord(domain)] = curFd
+
+      doConnect(curFd, curAddrInfo).callback = tryNextAddrInfo
+      curAddrInfo = curAddrInfo.ai_next
+    else:
+      freeAddrInfo(addrInfo)
+      when shouldCreateFd:
+        closeUnusedFds(ord(domain))
+        retFuture.complete(curFd)
+      else:
+        retFuture.complete()
+
+  tryNextAddrInfo(nil)
+
+proc dial*(address: string, port: Port,
+           protocol: Protocol = IPPROTO_TCP): owned(Future[AsyncFD]) =
+  ## Establishes connection to the specified `address`:`port` pair via the
+  ## specified protocol. The procedure iterates through possible
+  ## resolutions of the `address` until it succeeds, meaning that it
+  ## seamlessly works with both IPv4 and IPv6.
+  ## Returns the async file descriptor, registered in the dispatcher of
+  ## the current thread, ready to send or receive data.
+  let retFuture = newFuture[AsyncFD]("dial")
+  result = retFuture
+  let sockType = protocol.toSockType()
+
+  let aiList = getAddrInfo(address, port, Domain.AF_UNSPEC, sockType, protocol)
+  asyncAddrInfoLoop(aiList, noFD, protocol)
+
+proc connect*(socket: AsyncFD, address: string, port: Port,
+              domain = Domain.AF_INET): owned(Future[void]) =
+  let retFuture = newFuture[void]("connect")
+  result = retFuture
+
+  when defined(windows):
+    verifyPresence(socket)
+  else:
+    assert getSockDomain(socket.SocketHandle) == domain
+
+  let aiList = getAddrInfo(address, port, domain)
+  when defined(windows):
+    socket.SocketHandle.bindToDomain(domain)
+  asyncAddrInfoLoop(aiList, socket)
+
+proc sleepAsync*(ms: int | float): owned(Future[void]) =
   ## Suspends the execution of the current async procedure for the next
-  ## ``ms`` milliseconds.
+  ## `ms` milliseconds.
   var retFuture = newFuture[void]("sleepAsync")
   let p = getGlobalDispatcher()
-  p.timers.add((epochTime() + (ms / 1000), retFuture))
+  when ms is int:
+    p.timers.push((getMonoTime() + initDuration(milliseconds = ms), retFuture))
+  elif ms is float:
+    let ns = (ms * 1_000_000).int64
+    p.timers.push((getMonoTime() + initDuration(nanoseconds = ns), retFuture))
+  return retFuture
+
+proc withTimeout*[T](fut: Future[T], timeout: int): owned(Future[bool]) =
+  ## Returns a future which will complete once `fut` completes or after
+  ## `timeout` milliseconds has elapsed.
+  ##
+  ## If `fut` completes first the returned future will hold true,
+  ## otherwise, if `timeout` milliseconds has elapsed first, the returned
+  ## future will hold false.
+
+  var retFuture = newFuture[bool]("asyncdispatch.`withTimeout`")
+  var timeoutFuture = sleepAsync(timeout)
+  fut.callback =
+    proc () =
+      if not retFuture.finished:
+        if fut.failed:
+          retFuture.fail(fut.error)
+        else:
+          retFuture.complete(true)
+  timeoutFuture.callback =
+    proc () =
+      if not retFuture.finished: retFuture.complete(false)
   return retFuture
 
 proc accept*(socket: AsyncFD,
-    flags = {SocketFlag.SafeDisconn}): Future[AsyncFD] =
+             flags = {SocketFlag.SafeDisconn},
+             inheritable = defined(nimInheritHandles)): owned(Future[AsyncFD]) =
   ## Accepts a new connection. Returns a future containing the client socket
   ## corresponding to that connection.
+  ##
+  ## If `inheritable` is false (the default), the resulting client socket
+  ## will not be inheritable by child processes.
+  ##
   ## The future will complete when the connection is successfully accepted.
   var retFut = newFuture[AsyncFD]("accept")
-  var fut = acceptAddr(socket, flags)
+  var fut = acceptAddr(socket, flags, inheritable)
   fut.callback =
     proc (future: Future[tuple[address: string, client: AsyncFD]]) =
       assert future.finished
@@ -1234,382 +1971,45 @@ proc accept*(socket: AsyncFD,
         retFut.complete(future.read.client)
   return retFut
 
-# -- Await Macro
-
-proc skipUntilStmtList(node: NimNode): NimNode {.compileTime.} =
-  # Skips a nest of StmtList's.
-  result = node
-  if node[0].kind == nnkStmtList:
-    result = skipUntilStmtList(node[0])
-
-proc skipStmtList(node: NimNode): NimNode {.compileTime.} =
-  result = node
-  if node[0].kind == nnkStmtList:
-    result = node[0]
-
-template createCb(retFutureSym, iteratorNameSym,
-                   name: expr): stmt {.immediate.} =
-  var nameIterVar = iteratorNameSym
-  #{.push stackTrace: off.}
-  proc cb {.closure,gcsafe.} =
-    try:
-      if not nameIterVar.finished:
-        var next = nameIterVar()
-        if next == nil:
-          assert retFutureSym.finished, "Async procedure's (" &
-                 name & ") return Future was not finished."
-        else:
-          next.callback = cb
-    except:
-      if retFutureSym.finished:
-        # Take a look at tasyncexceptions for the bug which this fixes.
-        # That test explains it better than I can here.
-        raise
-      else:
-        retFutureSym.fail(getCurrentException())
-  cb()
-  #{.pop.}
-proc generateExceptionCheck(futSym,
-    tryStmt, rootReceiver, fromNode: NimNode): NimNode {.compileTime.} =
-  if tryStmt.kind == nnkNilLit:
-    result = rootReceiver
-  else:
-    var exceptionChecks: seq[tuple[cond, body: NimNode]] = @[]
-    let errorNode = newDotExpr(futSym, newIdentNode("error"))
-    for i in 1 .. <tryStmt.len:
-      let exceptBranch = tryStmt[i]
-      if exceptBranch[0].kind == nnkStmtList:
-        exceptionChecks.add((newIdentNode("true"), exceptBranch[0]))
-      else:
-        var exceptIdentCount = 0
-        var ifCond: NimNode
-        for i in 0 .. <exceptBranch.len:
-          let child = exceptBranch[i]
-          if child.kind == nnkIdent:
-            let cond = infix(errorNode, "of", child)
-            if exceptIdentCount == 0:
-              ifCond = cond
-            else:
-              ifCond = infix(ifCond, "or", cond)
-          else:
-            break
-          exceptIdentCount.inc
-
-        expectKind(exceptBranch[exceptIdentCount], nnkStmtList)
-        exceptionChecks.add((ifCond, exceptBranch[exceptIdentCount]))
-    # -> -> else: raise futSym.error
-    exceptionChecks.add((newIdentNode("true"),
-        newNimNode(nnkRaiseStmt).add(errorNode)))
-    # Read the future if there is no error.
-    # -> else: futSym.read
-    let elseNode = newNimNode(nnkElse, fromNode)
-    elseNode.add newNimNode(nnkStmtList, fromNode)
-    elseNode[0].add rootReceiver
-
-    let ifBody = newStmtList()
-    ifBody.add newCall(newIdentNode("setCurrentException"), errorNode)
-    ifBody.add newIfStmt(exceptionChecks)
-    ifBody.add newCall(newIdentNode("setCurrentException"), newNilLit())
-
-    result = newIfStmt(
-      (newDotExpr(futSym, newIdentNode("failed")), ifBody)
-    )
-    result.add elseNode
-
-template createVar(result: var NimNode, futSymName: string,
-                   asyncProc: NimNode,
-                   valueReceiver, rootReceiver: expr,
-                   fromNode: NimNode) =
-  result = newNimNode(nnkStmtList, fromNode)
-  var futSym = genSym(nskVar, "future")
-  result.add newVarStmt(futSym, asyncProc) # -> var future<x> = y
-  result.add newNimNode(nnkYieldStmt, fromNode).add(futSym) # -> yield future<x>
-  valueReceiver = newDotExpr(futSym, newIdentNode("read")) # -> future<x>.read
-  result.add generateExceptionCheck(futSym, tryStmt, rootReceiver, fromNode)
-
-proc processBody(node, retFutureSym: NimNode,
-                 subTypeIsVoid: bool,
-                 tryStmt: NimNode): NimNode {.compileTime.} =
-  #echo(node.treeRepr)
-  result = node
-  case node.kind
-  of nnkReturnStmt:
-    result = newNimNode(nnkStmtList, node)
-    if node[0].kind == nnkEmpty:
-      if not subTypeIsVoid:
-        result.add newCall(newIdentNode("complete"), retFutureSym,
-            newIdentNode("result"))
-      else:
-        result.add newCall(newIdentNode("complete"), retFutureSym)
-    else:
-      result.add newCall(newIdentNode("complete"), retFutureSym,
-        node[0].processBody(retFutureSym, subTypeIsVoid, tryStmt))
-
-    result.add newNimNode(nnkReturnStmt, node).add(newNilLit())
-    return # Don't process the children of this return stmt
-  of nnkCommand, nnkCall:
-    if node[0].kind == nnkIdent and node[0].ident == !"await":
-      case node[1].kind
-      of nnkIdent, nnkInfix:
-        # await x
-        result = newNimNode(nnkYieldStmt, node).add(node[1]) # -> yield x
-      of nnkCall, nnkCommand:
-        # await foo(p, x)
-        var futureValue: NimNode
-        result.createVar("future" & $node[1][0].toStrLit, node[1], futureValue,
-                  futureValue, node)
-      else:
-        error("Invalid node kind in 'await', got: " & $node[1].kind)
-    elif node.len > 1 and node[1].kind == nnkCommand and
-         node[1][0].kind == nnkIdent and node[1][0].ident == !"await":
-      # foo await x
-      var newCommand = node
-      result.createVar("future" & $node[0].toStrLit, node[1][1], newCommand[1],
-                newCommand, node)
-
-  of nnkVarSection, nnkLetSection:
-    case node[0][2].kind
-    of nnkCommand:
-      if node[0][2][0].kind == nnkIdent and node[0][2][0].ident == !"await":
-        # var x = await y
-        var newVarSection = node # TODO: Should this use copyNimNode?
-        result.createVar("future" & $node[0][0].ident, node[0][2][1],
-          newVarSection[0][2], newVarSection, node)
-    else: discard
-  of nnkAsgn:
-    case node[1].kind
-    of nnkCommand:
-      if node[1][0].ident == !"await":
-        # x = await y
-        var newAsgn = node
-        result.createVar("future" & $node[0].toStrLit, node[1][1], newAsgn[1], newAsgn, node)
-    else: discard
-  of nnkDiscardStmt:
-    # discard await x
-    if node[0].kind == nnkCommand and node[0][0].kind == nnkIdent and
-          node[0][0].ident == !"await":
-      var newDiscard = node
-      result.createVar("futureDiscard_" & $toStrLit(node[0][1]), node[0][1],
-                newDiscard[0], newDiscard, node)
-  of nnkTryStmt:
-    # try: await x; except: ...
-    result = newNimNode(nnkStmtList, node)
-    template wrapInTry(n, tryBody: expr) =
-      var temp = n
-      n[0] = tryBody
-      tryBody = temp
-
-      # Transform ``except`` body.
-      # TODO: Could we perform some ``await`` transformation here to get it
-      # working in ``except``?
-      tryBody[1] = processBody(n[1], retFutureSym, subTypeIsVoid, nil)
-
-    proc processForTry(n: NimNode, i: var int,
-                       res: NimNode): bool {.compileTime.} =
-      ## Transforms the body of the tryStmt. Does not transform the
-      ## body in ``except``.
-      ## Returns true if the tryStmt node was transformed into an ifStmt.
-      result = false
-      var skipped = n.skipStmtList()
-      while i < skipped.len:
-        var processed = processBody(skipped[i], retFutureSym,
-                                    subTypeIsVoid, n)
-
-        # Check if we transformed the node into an exception check.
-        # This suggests skipped[i] contains ``await``.
-        if processed.kind != skipped[i].kind or processed.len != skipped[i].len:
-          processed = processed.skipUntilStmtList()
-          expectKind(processed, nnkStmtList)
-          expectKind(processed[2][1], nnkElse)
-          i.inc
-
-          if not processForTry(n, i, processed[2][1][0]):
-            # We need to wrap the nnkElse nodes back into a tryStmt.
-            # As they are executed if an exception does not happen
-            # inside the awaited future.
-            # The following code will wrap the nodes inside the
-            # original tryStmt.
-            wrapInTry(n, processed[2][1][0])
-
-          res.add processed
-          result = true
+proc keepAlive(x: string) =
+  discard "mark 'x' as escaping so that it is put into a closure for us to keep the data alive"
+
+proc send*(socket: AsyncFD, data: string,
+           flags = {SocketFlag.SafeDisconn}): owned(Future[void]) =
+  ## Sends `data` to `socket`. The returned future will complete once all
+  ## data has been sent.
+  var retFuture = newFuture[void]("send")
+  if data.len > 0:
+    let sendFut = socket.send(unsafeAddr data[0], data.len, flags)
+    sendFut.callback =
+      proc () =
+        keepAlive(data)
+        if sendFut.failed:
+          retFuture.fail(sendFut.error)
         else:
-          res.add skipped[i]
-          i.inc
-    var i = 0
-    if not processForTry(node, i, result):
-      # If the tryStmt hasn't been transformed we can just put the body
-      # back into it.
-      wrapInTry(node, result)
-    return
-  else: discard
-
-  for i in 0 .. <result.len:
-    result[i] = processBody(result[i], retFutureSym, subTypeIsVoid, nil)
-
-proc getName(node: NimNode): string {.compileTime.} =
-  case node.kind
-  of nnkPostfix:
-    return $node[1].ident
-  of nnkIdent:
-    return $node.ident
-  of nnkEmpty:
-    return "anonymous"
-  else:
-    error("Unknown name.")
-
-proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} =
-  ## This macro transforms a single procedure into a closure iterator.
-  ## The ``async`` macro supports a stmtList holding multiple async procedures.
-  if prc.kind notin {nnkProcDef, nnkLambda}:
-      error("Cannot transform this node kind into an async proc." &
-            " Proc definition or lambda node expected.")
-
-  hint("Processing " & prc[0].getName & " as an async proc.")
-
-  let returnType = prc[3][0]
-  var baseType: NimNode
-  # Verify that the return type is a Future[T]
-  if returnType.kind == nnkBracketExpr:
-    let fut = repr(returnType[0])
-    if fut != "Future":
-      error("Expected return type of 'Future' got '" & fut & "'")
-    baseType = returnType[1]
-  elif returnType.kind in nnkCallKinds and $returnType[0] == "[]":
-    let fut = repr(returnType[1])
-    if fut != "Future":
-      error("Expected return type of 'Future' got '" & fut & "'")
-    baseType = returnType[2]
-  elif returnType.kind == nnkEmpty:
-    baseType = returnType
-  else:
-    error("Expected return type of 'Future' got '" & repr(returnType) & "'")
-
-  let subtypeIsVoid = returnType.kind == nnkEmpty or
-        (baseType.kind == nnkIdent and returnType[1].ident == !"void")
-
-  var outerProcBody = newNimNode(nnkStmtList, prc[6])
-
-  # -> var retFuture = newFuture[T]()
-  var retFutureSym = genSym(nskVar, "retFuture")
-  var subRetType =
-    if returnType.kind == nnkEmpty: newIdentNode("void")
-    else: baseType
-  outerProcBody.add(
-    newVarStmt(retFutureSym,
-      newCall(
-        newNimNode(nnkBracketExpr, prc[6]).add(
-          newIdentNode(!"newFuture"), # TODO: Strange bug here? Remove the `!`.
-          subRetType),
-      newLit(prc[0].getName)))) # Get type from return type of this proc
-
-  # -> iterator nameIter(): FutureBase {.closure.} =
-  # ->   {.push warning[resultshadowed]: off.}
-  # ->   var result: T
-  # ->   {.pop.}
-  # ->   <proc_body>
-  # ->   complete(retFuture, result)
-  var iteratorNameSym = genSym(nskIterator, $prc[0].getName & "Iter")
-  var procBody = prc[6].processBody(retFutureSym, subtypeIsVoid, nil)
-  if not subtypeIsVoid:
-    procBody.insert(0, newNimNode(nnkPragma).add(newIdentNode("push"),
-      newNimNode(nnkExprColonExpr).add(newNimNode(nnkBracketExpr).add(
-        newIdentNode("warning"), newIdentNode("resultshadowed")),
-      newIdentNode("off")))) # -> {.push warning[resultshadowed]: off.}
-
-    procBody.insert(1, newNimNode(nnkVarSection, prc[6]).add(
-      newIdentDefs(newIdentNode("result"), baseType))) # -> var result: T
-
-    procBody.insert(2, newNimNode(nnkPragma).add(
-      newIdentNode("pop"))) # -> {.pop.})
-
-    procBody.add(
-      newCall(newIdentNode("complete"),
-        retFutureSym, newIdentNode("result"))) # -> complete(retFuture, result)
-  else:
-    # -> complete(retFuture)
-    procBody.add(newCall(newIdentNode("complete"), retFutureSym))
-
-  var closureIterator = newProc(iteratorNameSym, [newIdentNode("FutureBase")],
-                                procBody, nnkIteratorDef)
-  closureIterator[4] = newNimNode(nnkPragma, prc[6]).add(newIdentNode("closure"))
-  outerProcBody.add(closureIterator)
-
-  # -> createCb(retFuture)
-  #var cbName = newIdentNode("cb")
-  var procCb = newCall(bindSym"createCb", retFutureSym, iteratorNameSym,
-                       newStrLitNode(prc[0].getName))
-  outerProcBody.add procCb
-
-  # -> return retFuture
-  outerProcBody.add newNimNode(nnkReturnStmt, prc[6][prc[6].len-1]).add(retFutureSym)
-
-  result = prc
-
-  # Remove the 'async' pragma.
-  for i in 0 .. <result[4].len:
-    if result[4][i].kind == nnkIdent and result[4][i].ident == !"async":
-      result[4].del(i)
-  if subtypeIsVoid:
-    # Add discardable pragma.
-    if returnType.kind == nnkEmpty:
-      # Add Future[void]
-      result[3][0] = parseExpr("Future[void]")
-
-  result[6] = outerProcBody
-
-  #echo(treeRepr(result))
-  #if prc[0].getName == "hubConnectionLoop":
-  #  echo(toStrLit(result))
-
-macro async*(prc: stmt): stmt {.immediate.} =
-  ## Macro which processes async procedures into the appropriate
-  ## iterators and yield statements.
-  if prc.kind == nnkStmtList:
-    for oneProc in prc:
-      result = newStmtList()
-      result.add asyncSingleProc(oneProc)
+          retFuture.complete()
   else:
-    result = asyncSingleProc(prc)
+    retFuture.complete()
 
-proc recvLine*(socket: AsyncFD): Future[string] {.async.} =
-  ## Reads a line of data from ``socket``. Returned future will complete once
-  ## a full line is read or an error occurs.
-  ##
-  ## If a full line is read ``\r\L`` is not
-  ## added to ``line``, however if solely ``\r\L`` is read then ``line``
-  ## will be set to it.
-  ##
-  ## If the socket is disconnected, ``line`` will be set to ``""``.
-  ##
-  ## If the socket is disconnected in the middle of a line (before ``\r\L``
-  ## is read) then line will be set to ``""``.
-  ## The partial line **will be lost**.
-  ##
-  ## **Warning**: This assumes that lines are delimited by ``\r\L``.
-  ##
-  ## **Note**: This procedure is mostly used for testing. You likely want to
-  ## use ``asyncnet.recvLine`` instead.
+  return retFuture
 
-  template addNLIfEmpty(): stmt =
-    if result.len == 0:
-      result.add("\c\L")
+# -- Await Macro
+import std/asyncmacro
+export asyncmacro
 
+proc readAll*(future: FutureStream[string]): owned(Future[string]) {.async.} =
+  ## Returns a future that will complete when all the string data from the
+  ## specified future stream is retrieved.
   result = ""
-  var c = ""
   while true:
-    c = await recv(socket, 1)
-    if c.len == 0:
-      return ""
-    if c == "\r":
-      c = await recv(socket, 1)
-      assert c == "\l"
-      addNLIfEmpty()
-      return
-    elif c == "\L":
-      addNLIfEmpty()
-      return
-    add(result, c)
+    let (hasValue, value) = await future.read()
+    if hasValue:
+      result.add(value)
+    else:
+      break
+
+proc callSoon(cbproc: proc () {.gcsafe.}) =
+  getGlobalDispatcher().callbacks.addLast(cbproc)
 
 proc runForever*() =
   ## Begins a never ending global dispatcher poll loop.
@@ -1622,3 +2022,44 @@ proc waitFor*[T](fut: Future[T]): T =
     poll()
 
   fut.read
+
+proc activeDescriptors*(): int {.inline.} =
+  ## Returns the current number of active file descriptors for the current
+  ## event loop. This is a cheap operation that does not involve a system call.
+  when defined(windows):
+    result = getGlobalDispatcher().handles.len
+  elif not defined(nimdoc):
+    result = getGlobalDispatcher().selector.count
+
+when defined(posix):
+  import std/posix
+
+when defined(linux) or defined(windows) or defined(macosx) or defined(bsd) or
+       defined(solaris) or defined(zephyr) or defined(freertos) or defined(nuttx) or defined(haiku):
+  proc maxDescriptors*(): int {.raises: OSError.} =
+    ## Returns the maximum number of active file descriptors for the current
+    ## process. This involves a system call. For now `maxDescriptors` is
+    ## supported on the following OSes: Windows, Linux, OSX, BSD, Solaris.
+    when defined(windows):
+      result = 16_700_000
+    elif defined(zephyr) or defined(freertos):
+      result = FD_MAX
+    else:
+      var fdLim: RLimit
+      if getrlimit(RLIMIT_NOFILE, fdLim) < 0:
+        raiseOSError(osLastError())
+      result = int(fdLim.rlim_cur) - 1
+
+when defined(genode):
+  proc scheduleCallbacks*(): bool {.discardable.} =
+    ## *Genode only.*
+    ## Schedule callback processing and return immediately.
+    ## Returns `false` if there is nothing to schedule.
+    ## RPC servers should call this to dispatch `callSoon`
+    ## bodies after retiring an RPC to its client.
+    ## This is effectively a non-blocking `poll(…)` and is
+    ## equivalent to scheduling a momentary no-op timeout
+    ## but faster and with less overhead.
+    let dis = getGlobalDispatcher()
+    result = dis.callbacks.len > 0
+    if result: submit(dis.signalHandler.cap)