summary refs log tree commit diff stats
path: root/lib/pure/asyncdispatch.nim
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pure/asyncdispatch.nim')
-rw-r--r--lib/pure/asyncdispatch.nim57
1 files changed, 32 insertions, 25 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim
index 8336da1fb..01088c2e7 100644
--- a/lib/pure/asyncdispatch.nim
+++ b/lib/pure/asyncdispatch.nim
@@ -11,7 +11,7 @@ include "system/inclrtl"
 
 import os, oids, tables, strutils, times, heapqueue
 
-import nativesockets, net, queues
+import nativesockets, net, deques
 
 export Port, SocketFlag
 
@@ -132,7 +132,7 @@ export Port, SocketFlag
 ##        # Handle exception
 ##
 ## Unfortunately the semantics of the try statement may not always be correct,
-## and occassionally the compilation may fail altogether.
+## and occasionally the compilation may fail altogether.
 ## As such it is better to use the former style when possible.
 ##
 ## Discarding futures
@@ -164,7 +164,7 @@ include includes/asyncfutures
 type
   PDispatcherBase = ref object of RootRef
     timers: HeapQueue[tuple[finishAt: float, fut: Future[void]]]
-    callbacks: Queue[proc ()]
+    callbacks: Deque[proc ()]
 
 proc processTimers(p: PDispatcherBase) {.inline.} =
   while p.timers.len > 0 and epochTime() >= p.timers[0].finishAt:
@@ -172,7 +172,7 @@ proc processTimers(p: PDispatcherBase) {.inline.} =
 
 proc processPendingCallbacks(p: PDispatcherBase) =
   while p.callbacks.len > 0:
-    var cb = p.callbacks.dequeue()
+    var cb = p.callbacks.popFirst()
     cb()
 
 proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} =
@@ -230,7 +230,7 @@ when defined(windows) or defined(nimdoc):
     result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
     result.handles = initSet[AsyncFD]()
     result.timers.newHeapQueue()
-    result.callbacks = initQueue[proc ()](64)
+    result.callbacks = initDeque[proc ()](64)
 
   var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
   proc getGlobalDispatcher*(): PDispatcher =
@@ -987,7 +987,7 @@ else:
     new result
     result.selector = newSelector()
     result.timers.newHeapQueue()
-    result.callbacks = initQueue[proc ()](64)
+    result.callbacks = initDeque[proc ()](64)
 
   var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
   proc getGlobalDispatcher*(): PDispatcher =
@@ -1043,6 +1043,26 @@ else:
     p.selector[fd.SocketHandle].data.PData.writeCBs.add(cb)
     update(fd, p.selector[fd.SocketHandle].events + {EvWrite})
 
+  template processCallbacks(callbacks: expr) =
+    # Callback may add items to ``callbacks`` which causes issues if
+    # we are iterating over it at the same time. We therefore
+    # make a copy to iterate over.
+    let currentCBs = callbacks
+    callbacks = @[]
+    # Using another sequence because callbacks themselves can add
+    # other callbacks.
+    var newCBs: seq[Callback] = @[]
+    for cb in currentCBs:
+      if newCBs.len > 0:
+        # A callback has already returned with EAGAIN, don't call any
+        # others until next `poll`.
+        newCBs.add(cb)
+      else:
+        if not cb(data.fd):
+          # Callback wants to be called again.
+          newCBs.add(cb)
+    callbacks = newCBs & callbacks
+
   proc poll*(timeout = 500) =
     let p = getGlobalDispatcher()
 
@@ -1055,24 +1075,11 @@ else:
         # so that exceptions can be raised from `send(...)` and
         # `recv(...)` routines.
 
-        if EvRead in info.events:
-          # Callback may add items to ``data.readCBs`` which causes issues if
-          # we are iterating over ``data.readCBs`` at the same time. We therefore
-          # make a copy to iterate over.
-          let currentCBs = data.readCBs
-          data.readCBs = @[]
-          for cb in currentCBs:
-            if not cb(data.fd):
-              # Callback wants to be called again.
-              data.readCBs.add(cb)
-
-        if EvWrite in info.events:
-          let currentCBs = data.writeCBs
-          data.writeCBs = @[]
-          for cb in currentCBs:
-            if not cb(data.fd):
-              # Callback wants to be called again.
-              data.writeCBs.add(cb)
+        if EvRead in info.events or info.events == {EvError}:
+          processCallbacks(data.readCBs)
+
+        if EvWrite in info.events or info.events == {EvError}:
+          processCallbacks(data.writeCBs)
 
         if info.key in p.selector:
           var newEvents: set[Event]
@@ -1410,7 +1417,7 @@ proc recvLine*(socket: AsyncFD): Future[string] {.async, deprecated.} =
 proc callSoon*(cbproc: proc ()) =
   ## Schedule `cbproc` to be called as soon as possible.
   ## The callback is called when control returns to the event loop.
-  getGlobalDispatcher().callbacks.enqueue(cbproc)
+  getGlobalDispatcher().callbacks.addLast(cbproc)
 
 proc runForever*() =
   ## Begins a never ending global dispatcher poll loop.