diff options
Diffstat (limited to 'lib/pure/asyncdispatch.nim')
-rw-r--r-- | lib/pure/asyncdispatch.nim | 57 |
1 files changed, 32 insertions, 25 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index 8336da1fb..01088c2e7 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -11,7 +11,7 @@ include "system/inclrtl" import os, oids, tables, strutils, times, heapqueue -import nativesockets, net, queues +import nativesockets, net, deques export Port, SocketFlag @@ -132,7 +132,7 @@ export Port, SocketFlag ## # Handle exception ## ## Unfortunately the semantics of the try statement may not always be correct, -## and occassionally the compilation may fail altogether. +## and occasionally the compilation may fail altogether. ## As such it is better to use the former style when possible. ## ## Discarding futures @@ -164,7 +164,7 @@ include includes/asyncfutures type PDispatcherBase = ref object of RootRef timers: HeapQueue[tuple[finishAt: float, fut: Future[void]]] - callbacks: Queue[proc ()] + callbacks: Deque[proc ()] proc processTimers(p: PDispatcherBase) {.inline.} = while p.timers.len > 0 and epochTime() >= p.timers[0].finishAt: @@ -172,7 +172,7 @@ proc processTimers(p: PDispatcherBase) {.inline.} = proc processPendingCallbacks(p: PDispatcherBase) = while p.callbacks.len > 0: - var cb = p.callbacks.dequeue() + var cb = p.callbacks.popFirst() cb() proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} = @@ -230,7 +230,7 @@ when defined(windows) or defined(nimdoc): result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1) result.handles = initSet[AsyncFD]() result.timers.newHeapQueue() - result.callbacks = initQueue[proc ()](64) + result.callbacks = initDeque[proc ()](64) var gDisp{.threadvar.}: PDispatcher ## Global dispatcher proc getGlobalDispatcher*(): PDispatcher = @@ -987,7 +987,7 @@ else: new result result.selector = newSelector() result.timers.newHeapQueue() - result.callbacks = initQueue[proc ()](64) + result.callbacks = initDeque[proc ()](64) var gDisp{.threadvar.}: PDispatcher ## Global dispatcher proc getGlobalDispatcher*(): PDispatcher = @@ -1043,6 +1043,26 @@ else: p.selector[fd.SocketHandle].data.PData.writeCBs.add(cb) update(fd, p.selector[fd.SocketHandle].events + {EvWrite}) + template processCallbacks(callbacks: expr) = + # Callback may add items to ``callbacks`` which causes issues if + # we are iterating over it at the same time. We therefore + # make a copy to iterate over. + let currentCBs = callbacks + callbacks = @[] + # Using another sequence because callbacks themselves can add + # other callbacks. + var newCBs: seq[Callback] = @[] + for cb in currentCBs: + if newCBs.len > 0: + # A callback has already returned with EAGAIN, don't call any + # others until next `poll`. + newCBs.add(cb) + else: + if not cb(data.fd): + # Callback wants to be called again. + newCBs.add(cb) + callbacks = newCBs & callbacks + proc poll*(timeout = 500) = let p = getGlobalDispatcher() @@ -1055,24 +1075,11 @@ else: # so that exceptions can be raised from `send(...)` and # `recv(...)` routines. - if EvRead in info.events: - # Callback may add items to ``data.readCBs`` which causes issues if - # we are iterating over ``data.readCBs`` at the same time. We therefore - # make a copy to iterate over. - let currentCBs = data.readCBs - data.readCBs = @[] - for cb in currentCBs: - if not cb(data.fd): - # Callback wants to be called again. - data.readCBs.add(cb) - - if EvWrite in info.events: - let currentCBs = data.writeCBs - data.writeCBs = @[] - for cb in currentCBs: - if not cb(data.fd): - # Callback wants to be called again. - data.writeCBs.add(cb) + if EvRead in info.events or info.events == {EvError}: + processCallbacks(data.readCBs) + + if EvWrite in info.events or info.events == {EvError}: + processCallbacks(data.writeCBs) if info.key in p.selector: var newEvents: set[Event] @@ -1410,7 +1417,7 @@ proc recvLine*(socket: AsyncFD): Future[string] {.async, deprecated.} = proc callSoon*(cbproc: proc ()) = ## Schedule `cbproc` to be called as soon as possible. ## The callback is called when control returns to the event loop. - getGlobalDispatcher().callbacks.enqueue(cbproc) + getGlobalDispatcher().callbacks.addLast(cbproc) proc runForever*() = ## Begins a never ending global dispatcher poll loop. |