summary refs log tree commit diff stats
path: root/lib/pure/asyncdispatch.nim
diff options
context:
space:
mode:
authorDaniil Yarancev <21169548+Yardanico@users.noreply.github.com>2018-01-07 21:02:00 +0300
committerGitHub <noreply@github.com>2018-01-07 21:02:00 +0300
commitfb44c522e6173528efa8035ecc459c84887d0167 (patch)
treea2f5e98606be265981a5f72748896967033e23d7 /lib/pure/asyncdispatch.nim
parentccf99fa5ce4fe992fb80dc89271faa51456c3fa5 (diff)
parente23ea64c41e101d4e1d933f0b015f51cc6c2f7de (diff)
downloadNim-fb44c522e6173528efa8035ecc459c84887d0167.tar.gz
Merge pull request #1 from nim-lang/devel
upstream
Diffstat (limited to 'lib/pure/asyncdispatch.nim')
-rw-r--r--lib/pure/asyncdispatch.nim587
1 files changed, 434 insertions, 153 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim
index 281d5b848..42ffa236c 100644
--- a/lib/pure/asyncdispatch.nim
+++ b/lib/pure/asyncdispatch.nim
@@ -9,8 +9,9 @@
 
 include "system/inclrtl"
 
-import os, tables, strutils, times, heapqueue, options, asyncstreams
+import os, tables, strutils, times, heapqueue, lists, options, asyncstreams
 import asyncfutures except callSoon
+
 import nativesockets, net, deques
 
 export Port, SocketFlag
@@ -58,9 +59,10 @@ export asyncfutures, asyncstreams
 ##
 ##   .. code-block::nim
 ##      var future = socket.recv(100)
-##      future.callback =
+##      future.addCallback(
 ##        proc () =
 ##          echo(future.read)
+##      )
 ##
 ## All asynchronous functions returning a ``Future`` will not block. They
 ## will not however return immediately. An asynchronous function will have
@@ -136,6 +138,7 @@ export asyncfutures, asyncstreams
 ## and occasionally the compilation may fail altogether.
 ## As such it is better to use the former style when possible.
 ##
+##
 ## Discarding futures
 ## ------------------
 ##
@@ -165,18 +168,20 @@ type
     timers*: HeapQueue[tuple[finishAt: float, fut: Future[void]]]
     callbacks*: Deque[proc ()]
 
-proc processTimers(p: PDispatcherBase) {.inline.} =
+proc processTimers(p: PDispatcherBase; didSomeWork: var bool) {.inline.} =
   #Process just part if timers at a step
   var count = p.timers.len
   let t = epochTime()
   while count > 0 and t >= p.timers[0].finishAt:
     p.timers.pop().fut.complete()
     dec count
+    didSomeWork = true
 
-proc processPendingCallbacks(p: PDispatcherBase) =
+proc processPendingCallbacks(p: PDispatcherBase; didSomeWork: var bool) =
   while p.callbacks.len > 0:
     var cb = p.callbacks.popFirst()
     cb()
+    didSomeWork = true
 
 proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} =
   # If dispatcher has active timers this proc returns the timeout
@@ -226,6 +231,12 @@ when defined(windows) or defined(nimdoc):
       ovl: PCustomOverlapped
     PostCallbackDataPtr = ptr PostCallbackData
 
+    AsyncEventImpl = object
+      hEvent: Handle
+      hWaiter: Handle
+      pcd: PostCallbackDataPtr
+    AsyncEvent* = ptr AsyncEventImpl
+
     Callback = proc (fd: AsyncFD): bool {.closure,gcsafe.}
   {.deprecated: [TCompletionKey: CompletionKey, TAsyncFD: AsyncFD,
                 TCustomOverlapped: CustomOverlapped, TCompletionData: CompletionData].}
@@ -275,14 +286,13 @@ when defined(windows) or defined(nimdoc):
     let p = getGlobalDispatcher()
     p.handles.len != 0 or p.timers.len != 0 or p.callbacks.len != 0
 
-  proc poll*(timeout = 500) =
-    ## Waits for completion events and processes them. Raises ``ValueError``
-    ## if there are no pending operations.
+  proc runOnce(timeout = 500): bool =
     let p = getGlobalDispatcher()
     if p.handles.len == 0 and p.timers.len == 0 and p.callbacks.len == 0:
       raise newException(ValueError,
         "No handles or timers registered in dispatcher.")
 
+    result = false
     if p.handles.len != 0:
       let at = p.adjustedTimeout(timeout)
       var llTimeout =
@@ -295,6 +305,7 @@ when defined(windows) or defined(nimdoc):
       let res = getQueuedCompletionStatus(p.ioPort,
           addr lpNumberOfBytesTransferred, addr lpCompletionKey,
           cast[ptr POVERLAPPED](addr customOverlapped), llTimeout).bool
+      result = true
 
       # http://stackoverflow.com/a/12277264/492186
       # TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html
@@ -324,17 +335,18 @@ when defined(windows) or defined(nimdoc):
         else:
           if errCode.int32 == WAIT_TIMEOUT:
             # Timed out
-            discard
+            result = false
           else: raiseOSError(errCode)
 
     # Timer processing.
-    processTimers(p)
+    processTimers(p, result)
     # Callback queue processing
-    processPendingCallbacks(p)
+    processPendingCallbacks(p, result)
+
 
-  var connectExPtr: pointer = nil
-  var acceptExPtr: pointer = nil
-  var getAcceptExSockAddrsPtr: pointer = nil
+  var acceptEx: WSAPROC_ACCEPTEX
+  var connectEx: WSAPROC_CONNECTEX
+  var getAcceptExSockAddrs: WSAPROC_GETACCEPTEXSOCKADDRS
 
   proc initPointer(s: SocketHandle, fun: var pointer, guid: var GUID): bool =
     # Ref: https://github.com/powdahound/twisted/blob/master/twisted/internet/iocpreactor/iocpsupport/winsock_pointers.c
@@ -346,56 +358,19 @@ when defined(windows) or defined(nimdoc):
 
   proc initAll() =
     let dummySock = newNativeSocket()
-    if not initPointer(dummySock, connectExPtr, WSAID_CONNECTEX):
+    if dummySock == INVALID_SOCKET:
       raiseOSError(osLastError())
-    if not initPointer(dummySock, acceptExPtr, WSAID_ACCEPTEX):
+    var fun: pointer = nil
+    if not initPointer(dummySock, fun, WSAID_CONNECTEX):
       raiseOSError(osLastError())
-    if not initPointer(dummySock, getAcceptExSockAddrsPtr, WSAID_GETACCEPTEXSOCKADDRS):
+    connectEx = cast[WSAPROC_CONNECTEX](fun)
+    if not initPointer(dummySock, fun, WSAID_ACCEPTEX):
       raiseOSError(osLastError())
-
-  proc connectEx(s: SocketHandle, name: ptr SockAddr, namelen: cint,
-                  lpSendBuffer: pointer, dwSendDataLength: Dword,
-                  lpdwBytesSent: PDword, lpOverlapped: POVERLAPPED): bool =
-    if connectExPtr.isNil: raise newException(ValueError, "Need to initialise ConnectEx().")
-    let fun =
-      cast[proc (s: SocketHandle, name: ptr SockAddr, namelen: cint,
-         lpSendBuffer: pointer, dwSendDataLength: Dword,
-         lpdwBytesSent: PDword, lpOverlapped: POVERLAPPED): bool {.stdcall,gcsafe.}](connectExPtr)
-
-    result = fun(s, name, namelen, lpSendBuffer, dwSendDataLength, lpdwBytesSent,
-         lpOverlapped)
-
-  proc acceptEx(listenSock, acceptSock: SocketHandle, lpOutputBuffer: pointer,
-                 dwReceiveDataLength, dwLocalAddressLength,
-                 dwRemoteAddressLength: Dword, lpdwBytesReceived: PDword,
-                 lpOverlapped: POVERLAPPED): bool =
-    if acceptExPtr.isNil: raise newException(ValueError, "Need to initialise AcceptEx().")
-    let fun =
-      cast[proc (listenSock, acceptSock: SocketHandle, lpOutputBuffer: pointer,
-                 dwReceiveDataLength, dwLocalAddressLength,
-                 dwRemoteAddressLength: Dword, lpdwBytesReceived: PDword,
-                 lpOverlapped: POVERLAPPED): bool {.stdcall,gcsafe.}](acceptExPtr)
-    result = fun(listenSock, acceptSock, lpOutputBuffer, dwReceiveDataLength,
-        dwLocalAddressLength, dwRemoteAddressLength, lpdwBytesReceived,
-        lpOverlapped)
-
-  proc getAcceptExSockaddrs(lpOutputBuffer: pointer,
-      dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength: Dword,
-      LocalSockaddr: ptr ptr SockAddr, LocalSockaddrLength: LPInt,
-      RemoteSockaddr: ptr ptr SockAddr, RemoteSockaddrLength: LPInt) =
-    if getAcceptExSockAddrsPtr.isNil:
-      raise newException(ValueError, "Need to initialise getAcceptExSockAddrs().")
-
-    let fun =
-      cast[proc (lpOutputBuffer: pointer,
-                 dwReceiveDataLength, dwLocalAddressLength,
-                 dwRemoteAddressLength: Dword, LocalSockaddr: ptr ptr SockAddr,
-                 LocalSockaddrLength: LPInt, RemoteSockaddr: ptr ptr SockAddr,
-                RemoteSockaddrLength: LPInt) {.stdcall,gcsafe.}](getAcceptExSockAddrsPtr)
-
-    fun(lpOutputBuffer, dwReceiveDataLength, dwLocalAddressLength,
-                  dwRemoteAddressLength, LocalSockaddr, LocalSockaddrLength,
-                  RemoteSockaddr, RemoteSockaddrLength)
+    acceptEx = cast[WSAPROC_ACCEPTEX](fun)
+    if not initPointer(dummySock, fun, WSAID_GETACCEPTEXSOCKADDRS):
+      raiseOSError(osLastError())
+    getAcceptExSockAddrs = cast[WSAPROC_GETACCEPTEXSOCKADDRS](fun)
+    close(dummySock)
 
   proc recv*(socket: AsyncFD, size: int,
              flags = {SocketFlag.SafeDisconn}): Future[string] =
@@ -506,10 +481,7 @@ when defined(windows) or defined(nimdoc):
       proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
         if not retFuture.finished:
           if errcode == OSErrorCode(-1):
-            if bytesCount == 0 and dataBuf.buf[0] == '\0':
-              retFuture.complete(0)
-            else:
-              retFuture.complete(bytesCount)
+            retFuture.complete(bytesCount)
           else:
             if flags.isDisconnectionError(errcode):
               retFuture.complete(0)
@@ -543,10 +515,11 @@ when defined(windows) or defined(nimdoc):
 
   proc send*(socket: AsyncFD, buf: pointer, size: int,
              flags = {SocketFlag.SafeDisconn}): Future[void] =
-    ## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future will complete once all
-    ## data has been sent.
-    ## **WARNING**: Use it with caution. If ``buf`` refers to GC'ed object, you must use GC_ref/GC_unref calls
-    ## to avoid early freeing of the buffer
+    ## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future
+    ## will complete once all data has been sent.
+    ##
+    ## **WARNING**: Use it with caution. If ``buf`` refers to GC'ed object,
+    ## you must use GC_ref/GC_unref calls to avoid early freeing of the buffer.
     verifyPresence(socket)
     var retFuture = newFuture[void]("send")
 
@@ -793,7 +766,7 @@ when defined(windows) or defined(nimdoc):
                                        cast[pointer](p.ovl))
   {.pop.}
 
-  template registerWaitableEvent(mask) =
+  proc registerWaitableEvent(fd: AsyncFD, cb: Callback; mask: Dword) =
     let p = getGlobalDispatcher()
     var flags = (WT_EXECUTEINWAITTHREAD or WT_EXECUTEONLYONCE).Dword
     var hEvent = wsaCreateEvent()
@@ -843,8 +816,8 @@ when defined(windows) or defined(nimdoc):
                                        cast[pointer](pcd), INFINITE, flags):
               # pcd.ovl will be unrefed in poll()
               let err = osLastError()
-              discard wsaCloseEvent(hEvent)
               deallocShared(cast[pointer](pcd))
+              discard wsaCloseEvent(hEvent)
               raiseOSError(err)
             else:
               # we incref `pcd.ovl` and `protect` callback one more time,
@@ -883,16 +856,17 @@ when defined(windows) or defined(nimdoc):
     ##
     ## This is not ``pure`` mechanism for Windows Completion Ports (IOCP),
     ## so if you can avoid it, please do it. Use `addRead` only if really
-    ## need it (main usecase is adaptation of `unix like` libraries to be
+    ## need it (main usecase is adaptation of unix-like libraries to be
     ## asynchronous on Windows).
-    ## If you use this function, you dont need to use asyncdispatch.recv()
+    ##
+    ## If you use this function, you don't need to use asyncdispatch.recv()
     ## or asyncdispatch.accept(), because they are using IOCP, please use
     ## nativesockets.recv() and nativesockets.accept() instead.
     ##
     ## Be sure your callback ``cb`` returns ``true``, if you want to remove
     ## watch of `read` notifications, and ``false``, if you want to continue
-    ## receiving notifies.
-    registerWaitableEvent(FD_READ or FD_ACCEPT or FD_OOB or FD_CLOSE)
+    ## receiving notifications.
+    registerWaitableEvent(fd, cb, FD_READ or FD_ACCEPT or FD_OOB or FD_CLOSE)
 
   proc addWrite*(fd: AsyncFD, cb: Callback) =
     ## Start watching the file descriptor for write availability and then call
@@ -900,43 +874,213 @@ when defined(windows) or defined(nimdoc):
     ##
     ## This is not ``pure`` mechanism for Windows Completion Ports (IOCP),
     ## so if you can avoid it, please do it. Use `addWrite` only if really
-    ## need it (main usecase is adaptation of `unix like` libraries to be
+    ## need it (main usecase is adaptation of unix-like libraries to be
     ## asynchronous on Windows).
-    ## If you use this function, you dont need to use asyncdispatch.send()
+    ##
+    ## If you use this function, you don't need to use asyncdispatch.send()
     ## or asyncdispatch.connect(), because they are using IOCP, please use
     ## nativesockets.send() and nativesockets.connect() instead.
     ##
     ## Be sure your callback ``cb`` returns ``true``, if you want to remove
     ## watch of `write` notifications, and ``false``, if you want to continue
-    ## receiving notifies.
-    registerWaitableEvent(FD_WRITE or FD_CONNECT or FD_CLOSE)
+    ## receiving notifications.
+    registerWaitableEvent(fd, cb, FD_WRITE or FD_CONNECT or FD_CLOSE)
+
+  template registerWaitableHandle(p, hEvent, flags, pcd, timeout,
+                                  handleCallback) =
+    let handleFD = AsyncFD(hEvent)
+    pcd.ioPort = p.ioPort
+    pcd.handleFd = handleFD
+    var ol = PCustomOverlapped()
+    GC_ref(ol)
+    ol.data.fd = handleFD
+    ol.data.cb = handleCallback
+    # We need to protect our callback environment value, so GC will not free it
+    # accidentally.
+    ol.data.cell = system.protect(rawEnv(ol.data.cb))
+
+    pcd.ovl = ol
+    if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
+                                    cast[WAITORTIMERCALLBACK](waitableCallback),
+                                    cast[pointer](pcd), timeout.Dword, flags):
+      let err = osLastError()
+      GC_unref(ol)
+      deallocShared(cast[pointer](pcd))
+      discard closeHandle(hEvent)
+      raiseOSError(err)
+    p.handles.incl(handleFD)
+
+  template closeWaitable(handle: untyped) =
+    let waitFd = pcd.waitFd
+    deallocShared(cast[pointer](pcd))
+    p.handles.excl(fd)
+    if unregisterWait(waitFd) == 0:
+        let err = osLastError()
+        if err.int32 != ERROR_IO_PENDING:
+          discard closeHandle(handle)
+          raiseOSError(err)
+    if closeHandle(handle) == 0:
+      raiseOSError(osLastError())
+
+  proc addTimer*(timeout: int, oneshot: bool, cb: Callback) =
+    ## Registers callback ``cb`` to be called when timer expired.
+    ##
+    ## Parameters:
+    ##
+    ## * ``timeout`` - timeout value in milliseconds.
+    ## * ``oneshot``
+    ##   * `true` - generate only one timeout event
+    ##   * `false` - generate timeout events periodically
+
+    doAssert(timeout > 0)
+    let p = getGlobalDispatcher()
+
+    var hEvent = createEvent(nil, 1, 0, nil)
+    if hEvent == INVALID_HANDLE_VALUE:
+      raiseOSError(osLastError())
+
+    var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
+    var flags = WT_EXECUTEINWAITTHREAD.Dword
+    if oneshot: flags = flags or WT_EXECUTEONLYONCE
+
+    proc timercb(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
+      let res = cb(fd)
+      if res or oneshot:
+        closeWaitable(hEvent)
+      else:
+        # if callback returned `false`, then it wants to be called again, so
+        # we need to ref and protect `pcd.ovl` again, because it will be
+        # unrefed and disposed in `poll()`.
+        GC_ref(pcd.ovl)
+        pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
+
+    registerWaitableHandle(p, hEvent, flags, pcd, timeout, timercb)
+
+  proc addProcess*(pid: int, cb: Callback) =
+    ## Registers callback ``cb`` to be called when process with process ID
+    ## ``pid`` exited.
+    let p = getGlobalDispatcher()
+    let procFlags = SYNCHRONIZE
+    var hProcess = openProcess(procFlags, 0, pid.Dword)
+    if hProcess == INVALID_HANDLE_VALUE:
+      raiseOSError(osLastError())
+
+    var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
+    var flags = WT_EXECUTEINWAITTHREAD.Dword
+
+    proc proccb(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
+      closeWaitable(hProcess)
+      discard cb(fd)
+
+    registerWaitableHandle(p, hProcess, flags, pcd, INFINITE, proccb)
+
+  proc newAsyncEvent*(): AsyncEvent =
+    ## Creates a new thread-safe ``AsyncEvent`` object.
+    ##
+    ## New ``AsyncEvent`` object is not automatically registered with             # TODO: Why? -- DP
+    ## dispatcher like ``AsyncSocket``.
+    var sa = SECURITY_ATTRIBUTES(
+      nLength: sizeof(SECURITY_ATTRIBUTES).cint,
+      bInheritHandle: 1
+    )
+    var event = createEvent(addr(sa), 0'i32, 0'i32, nil)
+    if event == INVALID_HANDLE_VALUE:
+      raiseOSError(osLastError())
+    result = cast[AsyncEvent](allocShared0(sizeof(AsyncEventImpl)))
+    result.hEvent = event
+
+  proc trigger*(ev: AsyncEvent) =
+    ## Set event ``ev`` to signaled state.
+    if setEvent(ev.hEvent) == 0:
+      raiseOSError(osLastError())
+
+  proc unregister*(ev: AsyncEvent) =
+    ## Unregisters event ``ev``.
+    doAssert(ev.hWaiter != 0, "Event is not registered in the queue!")
+    let p = getGlobalDispatcher()
+    p.handles.excl(AsyncFD(ev.hEvent))
+    if unregisterWait(ev.hWaiter) == 0:
+      let err = osLastError()
+      if err.int32 != ERROR_IO_PENDING:
+        raiseOSError(err)
+    ev.hWaiter = 0
+
+  proc close*(ev: AsyncEvent) =
+    ## Closes event ``ev``.
+    let res = closeHandle(ev.hEvent)
+    deallocShared(cast[pointer](ev))
+    if res == 0:
+      raiseOSError(osLastError())
+
+  proc addEvent*(ev: AsyncEvent, cb: Callback) =
+    ## Registers callback ``cb`` to be called when ``ev`` will be signaled
+    doAssert(ev.hWaiter == 0, "Event is already registered in the queue!")
+
+    let p = getGlobalDispatcher()
+    let hEvent = ev.hEvent
+
+    var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
+    var flags = WT_EXECUTEINWAITTHREAD.Dword
+
+    proc eventcb(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
+      if ev.hWaiter != 0:
+        if cb(fd):
+          # we need this check to avoid exception, if `unregister(event)` was
+          # called in callback.
+          deallocShared(cast[pointer](pcd))
+          if ev.hWaiter != 0:
+            unregister(ev)
+        else:
+          # if callback returned `false`, then it wants to be called again, so
+          # we need to ref and protect `pcd.ovl` again, because it will be
+          # unrefed and disposed in `poll()`.
+          GC_ref(pcd.ovl)
+          pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
+      else:
+        # if ev.hWaiter == 0, then event was unregistered before `poll()` call.
+        deallocShared(cast[pointer](pcd))
+
+    registerWaitableHandle(p, hEvent, flags, pcd, INFINITE, eventcb)
+    ev.hWaiter = pcd.waitFd
 
   initAll()
 else:
   import selectors
   from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK,
                     MSG_NOSIGNAL
-
+  const
+    InitCallbackListSize = 4         # initial size of callbacks sequence,
+                                     # associated with file/socket descriptor.
+    InitDelayedCallbackListSize = 64 # initial size of delayed callbacks
+                                     # queue.
   type
     AsyncFD* = distinct cint
     Callback = proc (fd: AsyncFD): bool {.closure,gcsafe.}
 
-    PData* = ref object of RootRef
-      fd: AsyncFD
-      readCBs: seq[Callback]
-      writeCBs: seq[Callback]
+    AsyncData = object
+      readList: seq[Callback]
+      writeList: seq[Callback]
+
+    AsyncEvent* = distinct SelectEvent
 
     PDispatcher* = ref object of PDispatcherBase
-      selector: Selector
+      selector: Selector[AsyncData]
   {.deprecated: [TAsyncFD: AsyncFD, TCallback: Callback].}
 
   proc `==`*(x, y: AsyncFD): bool {.borrow.}
+  proc `==`*(x, y: AsyncEvent): bool {.borrow.}
+
+  template newAsyncData(): AsyncData =
+    AsyncData(
+      readList: newSeqOfCap[Callback](InitCallbackListSize),
+      writeList: newSeqOfCap[Callback](InitCallbackListSize)
+    )
 
   proc newDispatcher*(): PDispatcher =
     new result
-    result.selector = newSelector()
+    result.selector = newSelector[AsyncData]()
     result.timers.newHeapQueue()
-    result.callbacks = initDeque[proc ()](64)
+    result.callbacks = initDeque[proc ()](InitDelayedCallbackListSize)
 
   var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
 
@@ -951,15 +1095,10 @@ else:
       setGlobalDispatcher(newDispatcher())
     result = gDisp
 
-  proc update(fd: AsyncFD, events: set[Event]) =
-    let p = getGlobalDispatcher()
-    assert fd.SocketHandle in p.selector
-    p.selector.update(fd.SocketHandle, events)
-
   proc register*(fd: AsyncFD) =
     let p = getGlobalDispatcher()
-    var data = PData(fd: fd, readCBs: @[], writeCBs: @[])
-    p.selector.register(fd.SocketHandle, {}, data.RootRef)
+    var data = newAsyncData()
+    p.selector.registerHandle(fd.SocketHandle, {}, data)
 
   proc closeSocket*(sock: AsyncFD) =
     let disp = getGlobalDispatcher()
@@ -969,80 +1108,158 @@ else:
   proc unregister*(fd: AsyncFD) =
     getGlobalDispatcher().selector.unregister(fd.SocketHandle)
 
+  proc unregister*(ev: AsyncEvent) =
+    getGlobalDispatcher().selector.unregister(SelectEvent(ev))
+
   proc addRead*(fd: AsyncFD, cb: Callback) =
     let p = getGlobalDispatcher()
-    if fd.SocketHandle notin p.selector:
+    var newEvents = {Event.Read}
+    withData(p.selector, fd.SocketHandle, adata) do:
+      adata.readList.add(cb)
+      newEvents.incl(Event.Read)
+      if len(adata.writeList) != 0: newEvents.incl(Event.Write)
+    do:
       raise newException(ValueError, "File descriptor not registered.")
-    p.selector[fd.SocketHandle].data.PData.readCBs.add(cb)
-    update(fd, p.selector[fd.SocketHandle].events + {EvRead})
+    p.selector.updateHandle(fd.SocketHandle, newEvents)
 
   proc addWrite*(fd: AsyncFD, cb: Callback) =
     let p = getGlobalDispatcher()
-    if fd.SocketHandle notin p.selector:
+    var newEvents = {Event.Write}
+    withData(p.selector, fd.SocketHandle, adata) do:
+      adata.writeList.add(cb)
+      newEvents.incl(Event.Write)
+      if len(adata.readList) != 0: newEvents.incl(Event.Read)
+    do:
       raise newException(ValueError, "File descriptor not registered.")
-    p.selector[fd.SocketHandle].data.PData.writeCBs.add(cb)
-    update(fd, p.selector[fd.SocketHandle].events + {EvWrite})
-
-  template processCallbacks(callbacks: untyped) =
-    # Callback may add items to ``callbacks`` which causes issues if
-    # we are iterating over it at the same time. We therefore
-    # make a copy to iterate over.
-    let currentCBs = callbacks
-    callbacks = @[]
-    # Using another sequence because callbacks themselves can add
-    # other callbacks.
-    var newCBs: seq[Callback] = @[]
-    for cb in currentCBs:
-      if newCBs.len > 0:
-        # A callback has already returned with EAGAIN, don't call any
-        # others until next `poll`.
-        newCBs.add(cb)
-      else:
-        if not cb(data.fd):
-          # Callback wants to be called again.
-          newCBs.add(cb)
-    callbacks = newCBs & callbacks
+    p.selector.updateHandle(fd.SocketHandle, newEvents)
 
   proc hasPendingOperations*(): bool =
     let p = getGlobalDispatcher()
-    p.selector.len != 0 or p.timers.len != 0 or p.callbacks.len != 0
-
-  proc poll*(timeout = 500) =
+    not p.selector.isEmpty() or p.timers.len != 0 or p.callbacks.len != 0
+
+  template processBasicCallbacks(ident, rwlist: untyped) =
+    # Process pending descriptor and AsyncEvent callbacks.
+    #
+    # Invoke every callback stored in `rwlist`, until one
+    # returns `false` (which means callback wants to stay
+    # alive). In such case all remaining callbacks will be added
+    # to `rwlist` again, in the order they have been inserted.
+    #
+    # `rwlist` associated with file descriptor MUST BE emptied before
+    # dispatching callback (See https://github.com/nim-lang/Nim/issues/5128),
+    # or it can be possible to fall into endless cycle.
+    var curList: seq[Callback]
+
+    withData(p.selector, ident, adata) do:
+      shallowCopy(curList, adata.rwlist)
+      adata.rwlist = newSeqOfCap[Callback](InitCallbackListSize)
+
+    let newLength = max(len(curList), InitCallbackListSize)
+    var newList = newSeqOfCap[Callback](newLength)
+
+    for cb in curList:
+      if len(newList) > 0:
+        # A callback has already returned with EAGAIN, don't call any others
+        # until next `poll`.
+        newList.add(cb)
+      else:
+        if not cb(fd.AsyncFD):
+          # Callback wants to be called again.
+          newList.add(cb)
+
+    withData(p.selector, ident, adata) do:
+      # descriptor still present in queue.
+      adata.rwlist = newList & adata.rwlist
+      rLength = len(adata.readList)
+      wLength = len(adata.writeList)
+    do:
+      # descriptor was unregistered in callback via `unregister()`.
+      rLength = -1
+      wLength = -1
+
+  template processCustomCallbacks(ident: untyped) =
+    # Process pending custom event callbacks. Custom events are
+    # {Event.Timer, Event.Signal, Event.Process, Event.Vnode}.
+    # There can be only one callback registered with one descriptor,
+    # so there is no need to iterate over list.
+    var curList: seq[Callback]
+
+    withData(p.selector, ident, adata) do:
+      shallowCopy(curList, adata.readList)
+      adata.readList = newSeqOfCap[Callback](InitCallbackListSize)
+
+    let newLength = len(curList)
+    var newList = newSeqOfCap[Callback](newLength)
+
+    var cb = curList[0]
+    if not cb(fd.AsyncFD):
+      newList.add(cb)
+
+    withData(p.selector, ident, adata) do:
+      # descriptor still present in queue.
+      adata.readList = newList & adata.readList
+      if len(adata.readList) == 0:
+        # if no callbacks registered with descriptor, unregister it.
+        p.selector.unregister(fd)
+    do:
+      # descriptor was unregistered in callback via `unregister()`.
+      discard
+
+  proc runOnce(timeout = 500): bool =
     let p = getGlobalDispatcher()
-    if p.selector.len == 0 and p.timers.len == 0 and p.callbacks.len == 0:
+    when ioselSupportedPlatform:
+      let customSet = {Event.Timer, Event.Signal, Event.Process,
+                       Event.Vnode}
+
+    if p.selector.isEmpty() and p.timers.len == 0 and p.callbacks.len == 0:
       raise newException(ValueError,
         "No handles or timers registered in dispatcher.")
 
-    if p.selector.len > 0:
-      for info in p.selector.select(p.adjustedTimeout(timeout)):
-        let data = PData(info.key.data)
-        assert data.fd == info.key.fd.AsyncFD
-        #echo("In poll ", data.fd.cint)
-        # There may be EvError here, but we handle them in callbacks,
-        # so that exceptions can be raised from `send(...)` and
-        # `recv(...)` routines.
-
-        if EvRead in info.events or info.events == {EvError}:
-          processCallbacks(data.readCBs)
-
-        if EvWrite in info.events or info.events == {EvError}:
-          processCallbacks(data.writeCBs)
-
-        if info.key in p.selector:
-          var newEvents: set[Event]
-          if data.readCBs.len != 0: newEvents = {EvRead}
-          if data.writeCBs.len != 0: newEvents = newEvents + {EvWrite}
-          if newEvents != info.key.events:
-            update(data.fd, newEvents)
-        else:
-          # FD no longer a part of the selector. Likely been closed
-          # (e.g. socket disconnected).
-          discard
+    result = false
+    if not p.selector.isEmpty():
+      var keys: array[64, ReadyKey]
+      var count = p.selector.selectInto(p.adjustedTimeout(timeout), keys)
+      for i in 0..<count:
+        var custom = false
+        let fd = keys[i].fd
+        let events = keys[i].events
+        var rLength = 0 # len(data.readList) after callback
+        var wLength = 0 # len(data.writeList) after callback
+
+        if Event.Read in events or events == {Event.Error}:
+          processBasicCallbacks(fd, readList)
+          result = true
+
+        if Event.Write in events or events == {Event.Error}:
+          processBasicCallbacks(fd, writeList)
+          result = true
+
+        if Event.User in events:
+          processBasicCallbacks(fd, readList)
+          custom = true
+          if rLength == 0:
+            p.selector.unregister(fd)
+          result = true
+
+        when ioselSupportedPlatform:
+          if (customSet * events) != {}:
+            custom = true
+            processCustomCallbacks(fd)
+            result = true
+
+        # because state `data` can be modified in callback we need to update
+        # descriptor events with currently registered callbacks.
+        if not custom:
+          var newEvents: set[Event] = {}
+          if rLength != -1 and wLength != -1:
+            if rLength > 0: incl(newEvents, Event.Read)
+            if wLength > 0: incl(newEvents, Event.Write)
+            p.selector.updateHandle(SocketHandle(fd), newEvents)
 
     # Timer processing.
-    processTimers(p)
+    processTimers(p, result)
     # Callback queue processing
-    processPendingCallbacks(p)
+    processPendingCallbacks(p, result)
 
   proc recv*(socket: AsyncFD, size: int,
              flags = {SocketFlag.SafeDisconn}): Future[string] =
@@ -1075,7 +1292,7 @@ else:
     return retFuture
 
   proc recvInto*(socket: AsyncFD, buf: pointer, size: int,
-                  flags = {SocketFlag.SafeDisconn}): Future[int] =
+                 flags = {SocketFlag.SafeDisconn}): Future[int] =
     var retFuture = newFuture[int]("recvInto")
 
     proc cb(sock: AsyncFD): bool =
@@ -1216,6 +1433,68 @@ else:
     addRead(socket, cb)
     return retFuture
 
+  when ioselSupportedPlatform:
+
+    proc addTimer*(timeout: int, oneshot: bool, cb: Callback) =
+      ## Start watching for timeout expiration, and then call the
+      ## callback ``cb``.
+      ## ``timeout`` - time in milliseconds,
+      ## ``oneshot`` - if ``true`` only one event will be dispatched,
+      ## if ``false`` continuous events every ``timeout`` milliseconds.
+      let p = getGlobalDispatcher()
+      var data = newAsyncData()
+      data.readList.add(cb)
+      p.selector.registerTimer(timeout, oneshot, data)
+
+    proc addSignal*(signal: int, cb: Callback) =
+      ## Start watching signal ``signal``, and when signal appears, call the
+      ## callback ``cb``.
+      let p = getGlobalDispatcher()
+      var data = newAsyncData()
+      data.readList.add(cb)
+      p.selector.registerSignal(signal, data)
+
+    proc addProcess*(pid: int, cb: Callback) =
+      ## Start watching for process exit with pid ``pid``, and then call
+      ## the callback ``cb``.
+      let p = getGlobalDispatcher()
+      var data = newAsyncData()
+      data.readList.add(cb)
+      p.selector.registerProcess(pid, data)
+
+  proc newAsyncEvent*(): AsyncEvent =
+    ## Creates new ``AsyncEvent``.
+    result = AsyncEvent(newSelectEvent())
+
+  proc trigger*(ev: AsyncEvent) =
+    ## Sets new ``AsyncEvent`` to signaled state.
+    trigger(SelectEvent(ev))
+
+  proc close*(ev: AsyncEvent) =
+    ## Closes ``AsyncEvent``
+    close(SelectEvent(ev))
+
+  proc addEvent*(ev: AsyncEvent, cb: Callback) =
+    ## Start watching for event ``ev``, and call callback ``cb``, when
+    ## ev will be set to signaled state.
+    let p = getGlobalDispatcher()
+    var data = newAsyncData()
+    data.readList.add(cb)
+    p.selector.registerEvent(SelectEvent(ev), data)
+
+proc drain*(timeout = 500) =
+  ## Waits for completion events and processes them. Raises ``ValueError``
+  ## if there are no pending operations. In contrast to ``poll`` this
+  ## processes as many events as are available.
+  if runOnce(timeout):
+    while hasPendingOperations() and runOnce(0): discard
+
+proc poll*(timeout = 500) =
+  ## Waits for completion events and processes them. Raises ``ValueError``
+  ## if there are no pending operations. This runs the underlying OS
+  ## `epoll`:idx: or `kqueue`:idx: primitive only once.
+  discard runOnce(timeout)
+
 # Common procedures between current and upcoming asyncdispatch
 include includes.asynccommon
 
@@ -1269,7 +1548,7 @@ proc send*(socket: AsyncFD, data: string,
 
   var copiedData = data
   GC_ref(copiedData) # we need to protect data until send operation is completed
-               # or failed.
+                     # or failed.
 
   let sendFut = socket.send(addr copiedData[0], data.len, flags)
   sendFut.callback =
@@ -1317,7 +1596,7 @@ proc recvLine*(socket: AsyncFD): Future[string] {.async, deprecated.} =
   ##
   ## **Deprecated since version 0.15.0**: Use ``asyncnet.recvLine()`` instead.
 
-  template addNLIfEmpty(): untyped =
+  template addNLIfEmpty(): typed =
     if result.len == 0:
       result.add("\c\L")
 
@@ -1353,3 +1632,5 @@ proc waitFor*[T](fut: Future[T]): T =
     poll()
 
   fut.read
+
+{.deprecated: [setEvent: trigger].}