diff options
Diffstat (limited to 'lib/pure/asyncdispatch.nim')
-rw-r--r-- | lib/pure/asyncdispatch.nim | 44 |
1 files changed, 26 insertions, 18 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index a4800284b..b662c3095 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -9,7 +9,7 @@ include "system/inclrtl" -import os, oids, tables, strutils, macros, times +import os, oids, tables, strutils, macros, times, heapqueue import nativesockets, net @@ -354,16 +354,22 @@ proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] = type PDispatcherBase = ref object of RootRef - timers: seq[tuple[finishAt: float, fut: Future[void]]] - -proc processTimers(p: PDispatcherBase) = - var oldTimers = p.timers - p.timers = @[] - for t in oldTimers: - if epochTime() >= t.finishAt: - t.fut.complete() - else: - p.timers.add(t) + timers: HeapQueue[tuple[finishAt: float, fut: Future[void]]] + +proc processTimers(p: PDispatcherBase) {.inline.} = + while p.timers.len > 0 and epochTime() >= p.timers[0].finishAt: + p.timers.pop().fut.complete() + +proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} = + # If dispatcher has active timers this proc returns the timeout + # of the nearest timer. Returns `timeout` otherwise. + result = timeout + if p.timers.len > 0: + let timerTimeout = p.timers[0].finishAt + let curTime = epochTime() + if timeout == -1 or (curTime + (timeout / 1000)) > timerTimeout: + result = int((timerTimeout - curTime) * 1000) + if result < 0: result = 0 when defined(windows) or defined(nimdoc): import winlean, sets, hashes @@ -396,7 +402,7 @@ when defined(windows) or defined(nimdoc): new result result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1) result.handles = initSet[AsyncFD]() - result.timers = @[] + result.timers.newHeapQueue() var gDisp{.threadvar.}: PDispatcher ## Global dispatcher proc getGlobalDispatcher*(): PDispatcher = @@ -427,9 +433,11 @@ when defined(windows) or defined(nimdoc): raise newException(ValueError, "No handles or timers registered in dispatcher.") - let llTimeout = - if timeout == -1: winlean.INFINITE - else: timeout.int32 + let at = p.adjustedTimeout(timeout) + var llTimeout = + if at == -1: winlean.INFINITE + else: at.int32 + var lpNumberOfBytesTransferred: Dword var lpCompletionKey: ULONG_PTR var customOverlapped: PCustomOverlapped @@ -956,7 +964,7 @@ else: proc newDispatcher*(): PDispatcher = new result result.selector = newSelector() - result.timers = @[] + result.timers.newHeapQueue() var gDisp{.threadvar.}: PDispatcher ## Global dispatcher proc getGlobalDispatcher*(): PDispatcher = @@ -1014,7 +1022,7 @@ else: proc poll*(timeout = 500) = let p = getGlobalDispatcher() - for info in p.selector.select(timeout): + for info in p.selector.select(p.adjustedTimeout(timeout)): let data = PData(info.key.data) assert data.fd == info.key.fd.AsyncFD #echo("In poll ", data.fd.cint) @@ -1215,7 +1223,7 @@ proc sleepAsync*(ms: int): Future[void] = ## ``ms`` milliseconds. var retFuture = newFuture[void]("sleepAsync") let p = getGlobalDispatcher() - p.timers.add((epochTime() + (ms / 1000), retFuture)) + p.timers.push((epochTime() + (ms / 1000), retFuture)) return retFuture proc accept*(socket: AsyncFD, |