summary refs log tree commit diff stats
path: root/lib/upcoming/asyncdispatch.nim
diff options
context:
space:
mode:
Diffstat (limited to 'lib/upcoming/asyncdispatch.nim')
-rw-r--r--lib/upcoming/asyncdispatch.nim177
1 files changed, 100 insertions, 77 deletions
diff --git a/lib/upcoming/asyncdispatch.nim b/lib/upcoming/asyncdispatch.nim
index 78c7afffc..80a4f0e4f 100644
--- a/lib/upcoming/asyncdispatch.nim
+++ b/lib/upcoming/asyncdispatch.nim
@@ -526,10 +526,10 @@ when defined(windows) or defined(nimdoc):
 
   proc send*(socket: AsyncFD, buf: pointer, size: int,
              flags = {SocketFlag.SafeDisconn}): Future[void] =
-    ## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future will complete once all
-    ## data has been sent.
-    ## **WARNING**: Use it with caution. If ``buf`` refers to GC'ed object, you must use GC_ref/GC_unref calls
-    ## to avoid early freeing of the buffer
+    ## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future
+    ## will complete once all data has been sent.
+    ## **WARNING**: Use it with caution. If ``buf`` refers to GC'ed object,
+    ## you must use GC_ref/GC_unref calls to avoid early freeing of the buffer.
     verifyPresence(socket)
     var retFuture = newFuture[void]("send")
 
@@ -946,7 +946,8 @@ when defined(windows) or defined(nimdoc):
     ## receiving notifies.
     registerWaitableEvent(fd, cb, FD_WRITE or FD_CONNECT or FD_CLOSE)
 
-  template registerWaitableHandle(p, hEvent, flags, pcd, timeout, handleCallback) =
+  template registerWaitableHandle(p, hEvent, flags, pcd, timeout,
+                                  handleCallback) =
     let handleFD = AsyncFD(hEvent)
     pcd.ioPort = p.ioPort
     pcd.handleFd = handleFD
@@ -961,7 +962,7 @@ when defined(windows) or defined(nimdoc):
     pcd.ovl = ol
     if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
                                     cast[WAITORTIMERCALLBACK](waitableCallback),
-                                       cast[pointer](pcd), timeout.Dword, flags):
+                                    cast[pointer](pcd), timeout.Dword, flags):
       GC_unref(ol)
       deallocShared(cast[pointer](pcd))
       discard closeHandle(hEvent)
@@ -1098,15 +1099,18 @@ else:
   import ioselectors
   from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK,
                     MSG_NOSIGNAL
+  const
+    InitCallbackListSize = 4         # initial size of callbacks sequence,
+                                     # associated with file/socket descriptor.
+    InitDelayedCallbackListSize = 64 # initial size of delayed callbacks
+                                     # queue.
   type
     AsyncFD* = distinct cint
     Callback = proc (fd: AsyncFD): bool {.closure,gcsafe.}
 
-    DoublyLinkedListRef = ref DoublyLinkedList[Callback]
-
     AsyncData = object
-      readCBs: DoublyLinkedListRef
-      writeCBs: DoublyLinkedListRef
+      readList: seq[Callback]
+      writeList: seq[Callback]
 
     AsyncEvent* = distinct SelectEvent
 
@@ -1117,11 +1121,17 @@ else:
   proc `==`*(x, y: AsyncFD): bool {.borrow.}
   proc `==`*(x, y: AsyncEvent): bool {.borrow.}
 
+  template newAsyncData(): AsyncData =
+    AsyncData(
+      readList: newSeqOfCap[Callback](InitCallbackListSize),
+      writeList: newSeqOfCap[Callback](InitCallbackListSize)
+    )
+
   proc newDispatcher*(): PDispatcher =
     new result
     result.selector = newSelector[AsyncData]()
     result.timers.newHeapQueue()
-    result.callbacks = initDeque[proc ()](64)
+    result.callbacks = initDeque[proc ()](InitDelayedCallbackListSize)
 
   var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
   proc getGlobalDispatcher*(): PDispatcher =
@@ -1130,10 +1140,7 @@ else:
 
   proc register*(fd: AsyncFD) =
     let p = getGlobalDispatcher()
-    var data = AsyncData(
-      readCBs: DoublyLinkedListRef(),
-      writeCBs: DoublyLinkedListRef()
-    )
+    var data = newAsyncData()
     p.selector.registerHandle(fd.SocketHandle, {}, data)
 
   proc newAsyncNativeSocket*(domain: cint, sockType: cint,
@@ -1168,10 +1175,9 @@ else:
     let p = getGlobalDispatcher()
     var newEvents = {Event.Read}
     withData(p.selector, fd.SocketHandle, adata) do:
-      adata.readCBs[].append(cb)
+      adata.readList.add(cb)
       newEvents.incl(Event.Read)
-      if not isNil(adata.writeCBs.head):
-        newEvents.incl(Event.Write)
+      if len(adata.writeList) != 0: newEvents.incl(Event.Write)
     do:
       raise newException(ValueError, "File descriptor not registered.")
     p.selector.updateHandle(fd.SocketHandle, newEvents)
@@ -1180,10 +1186,9 @@ else:
     let p = getGlobalDispatcher()
     var newEvents = {Event.Write}
     withData(p.selector, fd.SocketHandle, adata) do:
-      adata.writeCBs[].append(cb)
+      adata.writeList.add(cb)
       newEvents.incl(Event.Write)
-      if not isNil(adata.readCBs.head):
-        newEvents.incl(Event.Read)
+      if len(adata.readList) != 0: newEvents.incl(Event.Read)
     do:
       raise newException(ValueError, "File descriptor not registered.")
     p.selector.updateHandle(fd.SocketHandle, newEvents)
@@ -1192,13 +1197,65 @@ else:
     let p = getGlobalDispatcher()
     not p.selector.isEmpty() or p.timers.len != 0 or p.callbacks.len != 0
 
+  template processBasicCallbacks(ident, rwlist: untyped) =
+    # Process pending descriptor's callbacks.
+    # Invoke every callback stored in `rwlist`, until first one
+    # returned `false`, which means callback wants to stay
+    # alive. In such case all remaining callbacks will be added
+    # to `rwlist` again, in the order they have been inserted.
+    #
+    # `rwlist` associated with file descriptor MUST BE emptied before
+    # dispatching callback (See https://github.com/nim-lang/Nim/issues/5128),
+    # or it can be possible to fall into endless cycle.
+    var curList: seq[Callback]
+
+    withData(p.selector, ident, adata) do:
+      shallowCopy(curList, adata.rwlist)
+      adata.rwlist = newSeqOfCap[Callback](InitCallbackListSize)
+
+    let newLength = max(len(curList), InitCallbackListSize)
+    var newList = newSeqOfCap[Callback](newLength)
+
+    for cb in curList:
+      if len(newList) > 0:
+        newList.add(cb)
+      else:
+        if not cb(fd.AsyncFD):
+          newList.add(cb)
+
+    withData(p.selector, ident, adata) do:
+      adata.rwlist = newList & adata.rwlist
+
+  template processCustomCallbacks(ident: untyped) =
+    # Process pending custom event callbacks. Custom events are
+    # {Event.Timer, Event.Signal, Event.Process, Event.Vnode}.
+    # There can be only one callback registered with one descriptor,
+    # so there no need to iterate over list.
+    var curList: seq[Callback]
+
+    withData(p.selector, ident, adata) do:
+      shallowCopy(curList, adata.readList)
+      adata.readList = newSeqOfCap[Callback](InitCallbackListSize)
+
+    let newLength = len(curList)
+    var newList = newSeqOfCap[Callback](newLength)
+
+    var cb = curList[0]
+    if not cb(fd.AsyncFD):
+      newList.add(cb)
+    else:
+      p.selector.unregister(fd)
+
+    withData(p.selector, ident, adata) do:
+      adata.readList = newList & adata.readList
+
   proc poll*(timeout = 500) =
-    var keys: array[64, ReadyKey[AsyncData]]
+    var keys: array[64, ReadyKey]
 
     let p = getGlobalDispatcher()
     when ioselSupportedPlatform:
       let customSet = {Event.Timer, Event.Signal, Event.Process,
-                       Event.Vnode, Event.User}
+                       Event.Vnode}
 
     if p.selector.isEmpty() and p.timers.len == 0 and p.callbacks.len == 0:
       raise newException(ValueError,
@@ -1209,45 +1266,23 @@ else:
       var i = 0
       while i < count:
         var custom = false
-        var fd = keys[i].fd.SocketHandle
+        let fd = keys[i].fd
         let events = keys[i].events
 
         if Event.Read in events or events == {Event.Error}:
-          for node in keys[i].data.readCBs[].nodes():
-            let cb = node.value
-            if cb != nil:
-              if cb(fd.AsyncFD):
-                keys[i].data.readCBs[].remove(node)
-              else:
-                break
+          processBasicCallbacks(fd, readList)
 
         if Event.Write in events or events == {Event.Error}:
-          for node in keys[i].data.writeCBs[].nodes():
-            let cb = node.value
-            if cb != nil:
-              if cb(fd.AsyncFD):
-                keys[i].data.writeCBs[].remove(node)
-              else:
-                break
+          processBasicCallbacks(fd, writeList)
+
+        if Event.User in events or events == {Event.Error}:
+          custom = true
+          processBasicCallbacks(fd, readList)
 
         when ioselSupportedPlatform:
           if (customSet * events) != {}:
-            for node in keys[i].data.readCBs[].nodes():
-              let cb = node.value
-              doAssert(cb != nil)
-              custom = true
-              if cb(fd.AsyncFD):
-                keys[i].data.readCBs[].remove(node)
-                p.selector.unregister(fd)
-        else:
-          if Event.User in events or events == {Event.Error}:
-            for node in keys[i].data.readCBs[].nodes():
-              let cb = node.value
-              custom = true
-              if cb != nil:
-                if cb(fd.AsyncFD):
-                  keys[i].data.readCBs[].remove(node)
-                  p.selector.unregister(fd)
+            custom = true
+            processCustomCallbacks(fd)
 
         # because state `data` can be modified in callback we need to update
         # descriptor events with currently registered callbacks.
@@ -1255,11 +1290,11 @@ else:
           var update = false
           var newEvents: set[Event] = {}
           p.selector.withData(fd, adata) do:
-            if not isNil(adata.readCBs.head): incl(newEvents, Event.Read)
-            if not isNil(adata.writeCBs.head): incl(newEvents, Event.Write)
+            if len(adata.readList) > 0: incl(newEvents, Event.Read)
+            if len(adata.writeList) > 0: incl(newEvents, Event.Write)
             update = true
           if update:
-            p.selector.updateHandle(fd, newEvents)
+            p.selector.updateHandle(SocketHandle(fd), newEvents)
         inc(i)
 
     # Timer processing.
@@ -1519,33 +1554,24 @@ else:
       ## ``oneshot`` - if ``true`` only one event will be dispatched,
       ## if ``false`` continuous events every ``timeout`` milliseconds.
       let p = getGlobalDispatcher()
-      var data = AsyncData(
-        readCBs: DoublyLinkedListRef(),
-        writeCBs: DoublyLinkedListRef()
-      )
-      data.readCBs[].append(cb)
+      var data = newAsyncData()
+      data.readList.add(cb)
       p.selector.registerTimer(timeout, oneshot, data)
 
     proc addSignal*(signal: int, cb: Callback) =
       ## Start watching signal ``signal``, and when signal appears, call the
       ## callback ``cb``.
       let p = getGlobalDispatcher()
-      var data = AsyncData(
-        readCBs: DoublyLinkedListRef(),
-        writeCBs: DoublyLinkedListRef()
-      )
-      data.readCBs[].append(cb)
+      var data = newAsyncData()
+      data.readList.add(cb)
       p.selector.registerSignal(signal, data)
 
     proc addProcess*(pid: int, cb: Callback) =
       ## Start watching for process exit with pid ``pid``, and then call
       ## the callback ``cb``.
       let p = getGlobalDispatcher()
-      var data = AsyncData(
-        readCBs: DoublyLinkedListRef(),
-        writeCBs: DoublyLinkedListRef()
-      )
-      data.readCBs[].append(cb)
+      var data = newAsyncData()
+      data.readList.add(cb)
       p.selector.registerProcess(pid, data)
 
   proc newAsyncEvent*(): AsyncEvent =
@@ -1564,11 +1590,8 @@ else:
     ## Start watching for event ``ev``, and call callback ``cb``, when
     ## ev will be set to signaled state.
     let p = getGlobalDispatcher()
-    var data = AsyncData(
-      readCBs: DoublyLinkedListRef(),
-      writeCBs: DoublyLinkedListRef()
-    )
-    data.readCBs[].append(cb)
+    var data = newAsyncData()
+    data.readList.add(cb)
     p.selector.registerEvent(SelectEvent(ev), data)
 
 proc sleepAsync*(ms: int): Future[void] =