summary refs log tree commit diff stats
path: root/lib/pure/asyncdispatch.nim
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pure/asyncdispatch.nim')
-rw-r--r--lib/pure/asyncdispatch.nim44
1 files changed, 26 insertions, 18 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim
index a4800284b..b662c3095 100644
--- a/lib/pure/asyncdispatch.nim
+++ b/lib/pure/asyncdispatch.nim
@@ -9,7 +9,7 @@
 
 include "system/inclrtl"
 
-import os, oids, tables, strutils, macros, times
+import os, oids, tables, strutils, macros, times, heapqueue
 
 import nativesockets, net
 
@@ -354,16 +354,22 @@ proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
 
 type
   PDispatcherBase = ref object of RootRef
-    timers: seq[tuple[finishAt: float, fut: Future[void]]]
-
-proc processTimers(p: PDispatcherBase) =
-  var oldTimers = p.timers
-  p.timers = @[]
-  for t in oldTimers:
-    if epochTime() >= t.finishAt:
-      t.fut.complete()
-    else:
-      p.timers.add(t)
+    timers: HeapQueue[tuple[finishAt: float, fut: Future[void]]]
+
+proc processTimers(p: PDispatcherBase) {.inline.} =
+  while p.timers.len > 0 and epochTime() >= p.timers[0].finishAt:
+    p.timers.pop().fut.complete()
+
+proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} =
+  # If dispatcher has active timers this proc returns the timeout
+  # of the nearest timer. Returns `timeout` otherwise.
+  result = timeout
+  if p.timers.len > 0:
+    let timerTimeout = p.timers[0].finishAt
+    let curTime = epochTime()
+    if timeout == -1 or (curTime + (timeout / 1000)) > timerTimeout:
+      result = int((timerTimeout - curTime) * 1000)
+      if result < 0: result = 0
 
 when defined(windows) or defined(nimdoc):
   import winlean, sets, hashes
@@ -396,7 +402,7 @@ when defined(windows) or defined(nimdoc):
     new result
     result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
     result.handles = initSet[AsyncFD]()
-    result.timers = @[]
+    result.timers.newHeapQueue()
 
   var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
   proc getGlobalDispatcher*(): PDispatcher =
@@ -427,9 +433,11 @@ when defined(windows) or defined(nimdoc):
       raise newException(ValueError,
         "No handles or timers registered in dispatcher.")
 
-    let llTimeout =
-      if timeout ==  -1: winlean.INFINITE
-      else: timeout.int32
+    let at = p.adjustedTimeout(timeout)
+    var llTimeout =
+      if at == -1: winlean.INFINITE
+      else: at.int32
+
     var lpNumberOfBytesTransferred: Dword
     var lpCompletionKey: ULONG_PTR
     var customOverlapped: PCustomOverlapped
@@ -956,7 +964,7 @@ else:
   proc newDispatcher*(): PDispatcher =
     new result
     result.selector = newSelector()
-    result.timers = @[]
+    result.timers.newHeapQueue()
 
   var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
   proc getGlobalDispatcher*(): PDispatcher =
@@ -1014,7 +1022,7 @@ else:
 
   proc poll*(timeout = 500) =
     let p = getGlobalDispatcher()
-    for info in p.selector.select(timeout):
+    for info in p.selector.select(p.adjustedTimeout(timeout)):
       let data = PData(info.key.data)
       assert data.fd == info.key.fd.AsyncFD
       #echo("In poll ", data.fd.cint)
@@ -1215,7 +1223,7 @@ proc sleepAsync*(ms: int): Future[void] =
   ## ``ms`` milliseconds.
   var retFuture = newFuture[void]("sleepAsync")
   let p = getGlobalDispatcher()
-  p.timers.add((epochTime() + (ms / 1000), retFuture))
+  p.timers.push((epochTime() + (ms / 1000), retFuture))
   return retFuture
 
 proc accept*(socket: AsyncFD,