summary refs log tree commit diff stats
path: root/lib/pure/asyncdispatch.nim
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pure/asyncdispatch.nim')
-rw-r--r--lib/pure/asyncdispatch.nim193
1 files changed, 100 insertions, 93 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim
index a52c667fc..820f34703 100644
--- a/lib/pure/asyncdispatch.nim
+++ b/lib/pure/asyncdispatch.nim
@@ -10,6 +10,7 @@
 include "system/inclrtl"
 
 import os, tables, strutils, times, heapqueue, lists, options, asyncstreams
+import options, math
 import asyncfutures except callSoon
 
 import nativesockets, net, deques
@@ -157,9 +158,6 @@ export asyncfutures, asyncstreams
 ## ----------------
 ##
 ## * The effect system (``raises: []``) does not work with async procedures.
-## * Can't await in a ``except`` body
-## * Forward declarations for async procs are broken,
-##   link includes workaround: https://github.com/nim-lang/Nim/issues/3182.
 
 # TODO: Check if yielded future is nil and throw a more meaningful exception
 
@@ -168,8 +166,10 @@ type
     timers*: HeapQueue[tuple[finishAt: float, fut: Future[void]]]
     callbacks*: Deque[proc ()]
 
-proc processTimers(p: PDispatcherBase; didSomeWork: var bool) {.inline.} =
-  #Process just part if timers at a step
+proc processTimers(
+  p: PDispatcherBase, didSomeWork: var bool
+): Option[int] {.inline.} =
+  # Pop the timers in the order in which they will expire (smaller `finishAt`).
   var count = p.timers.len
   let t = epochTime()
   while count > 0 and t >= p.timers[0].finishAt:
@@ -177,22 +177,25 @@ proc processTimers(p: PDispatcherBase; didSomeWork: var bool) {.inline.} =
     dec count
     didSomeWork = true
 
+  # Return the number of miliseconds in which the next timer will expire.
+  if p.timers.len == 0: return
+
+  let milisecs = (p.timers[0].finishAt - epochTime()) * 1000
+  return some(ceil(milisecs).int)
+
 proc processPendingCallbacks(p: PDispatcherBase; didSomeWork: var bool) =
   while p.callbacks.len > 0:
     var cb = p.callbacks.popFirst()
     cb()
     didSomeWork = true
 
-proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} =
-  # If dispatcher has active timers this proc returns the timeout
-  # of the nearest timer. Returns `timeout` otherwise.
-  result = timeout
-  if p.timers.len > 0:
-    let timerTimeout = p.timers[0].finishAt
-    let curTime = epochTime()
-    if timeout == -1 or (curTime + (timeout / 1000)) > timerTimeout:
-      result = int((timerTimeout - curTime) * 1000)
-      if result < 0: result = 0
+proc adjustTimeout(pollTimeout: int, nextTimer: Option[int]): int {.inline.} =
+  if nextTimer.isNone():
+    return pollTimeout
+
+  result = nextTimer.get()
+  if pollTimeout == -1: return
+  result = min(pollTimeout, result)
 
 proc callSoon(cbproc: proc ()) {.gcsafe.}
 
@@ -299,53 +302,53 @@ when defined(windows) or defined(nimdoc):
         "No handles or timers registered in dispatcher.")
 
     result = false
-    if p.handles.len != 0:
-      let at = p.adjustedTimeout(timeout)
-      var llTimeout =
-        if at == -1: winlean.INFINITE
-        else: at.int32
-
-      var lpNumberOfBytesTransferred: Dword
-      var lpCompletionKey: ULONG_PTR
-      var customOverlapped: PCustomOverlapped
-      let res = getQueuedCompletionStatus(p.ioPort,
-          addr lpNumberOfBytesTransferred, addr lpCompletionKey,
-          cast[ptr POVERLAPPED](addr customOverlapped), llTimeout).bool
-      result = true
-
-      # http://stackoverflow.com/a/12277264/492186
-      # TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html
-      if res:
-        # This is useful for ensuring the reliability of the overlapped struct.
+    let nextTimer = processTimers(p, result)
+    let at = adjustTimeout(timeout, nextTimer)
+    var llTimeout =
+      if at == -1: winlean.INFINITE
+      else: at.int32
+
+    var lpNumberOfBytesTransferred: Dword
+    var lpCompletionKey: ULONG_PTR
+    var customOverlapped: PCustomOverlapped
+    let res = getQueuedCompletionStatus(p.ioPort,
+        addr lpNumberOfBytesTransferred, addr lpCompletionKey,
+        cast[ptr POVERLAPPED](addr customOverlapped), llTimeout).bool
+    result = true
+
+    # http://stackoverflow.com/a/12277264/492186
+    # TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html
+    if res:
+      # This is useful for ensuring the reliability of the overlapped struct.
+      assert customOverlapped.data.fd == lpCompletionKey.AsyncFD
+
+      customOverlapped.data.cb(customOverlapped.data.fd,
+          lpNumberOfBytesTransferred, OSErrorCode(-1))
+
+      # If cell.data != nil, then system.protect(rawEnv(cb)) was called,
+      # so we need to dispose our `cb` environment, because it is not needed
+      # anymore.
+      if customOverlapped.data.cell.data != nil:
+        system.dispose(customOverlapped.data.cell)
+
+      GC_unref(customOverlapped)
+    else:
+      let errCode = osLastError()
+      if customOverlapped != nil:
         assert customOverlapped.data.fd == lpCompletionKey.AsyncFD
-
         customOverlapped.data.cb(customOverlapped.data.fd,
-            lpNumberOfBytesTransferred, OSErrorCode(-1))
-
-        # If cell.data != nil, then system.protect(rawEnv(cb)) was called,
-        # so we need to dispose our `cb` environment, because it is not needed
-        # anymore.
+            lpNumberOfBytesTransferred, errCode)
         if customOverlapped.data.cell.data != nil:
           system.dispose(customOverlapped.data.cell)
-
         GC_unref(customOverlapped)
       else:
-        let errCode = osLastError()
-        if customOverlapped != nil:
-          assert customOverlapped.data.fd == lpCompletionKey.AsyncFD
-          customOverlapped.data.cb(customOverlapped.data.fd,
-              lpNumberOfBytesTransferred, errCode)
-          if customOverlapped.data.cell.data != nil:
-            system.dispose(customOverlapped.data.cell)
-          GC_unref(customOverlapped)
-        else:
-          if errCode.int32 == WAIT_TIMEOUT:
-            # Timed out
-            result = false
-          else: raiseOSError(errCode)
+        if errCode.int32 == WAIT_TIMEOUT:
+          # Timed out
+          result = false
+        else: raiseOSError(errCode)
 
     # Timer processing.
-    processTimers(p, result)
+    discard processTimers(p, result)
     # Callback queue processing
     processPendingCallbacks(p, result)
 
@@ -1231,48 +1234,48 @@ else:
         "No handles or timers registered in dispatcher.")
 
     result = false
-    if not p.selector.isEmpty():
-      var keys: array[64, ReadyKey]
-      var count = p.selector.selectInto(p.adjustedTimeout(timeout), keys)
-      for i in 0..<count:
-        var custom = false
-        let fd = keys[i].fd
-        let events = keys[i].events
-        var rLength = 0 # len(data.readList) after callback
-        var wLength = 0 # len(data.writeList) after callback
-
-        if Event.Read in events or events == {Event.Error}:
-          processBasicCallbacks(fd, readList)
-          result = true
-
-        if Event.Write in events or events == {Event.Error}:
-          processBasicCallbacks(fd, writeList)
-          result = true
-
-        if Event.User in events:
-          processBasicCallbacks(fd, readList)
+    var keys: array[64, ReadyKey]
+    let nextTimer = processTimers(p, result)
+    var count = p.selector.selectInto(adjustTimeout(timeout, nextTimer), keys)
+    for i in 0..<count:
+      var custom = false
+      let fd = keys[i].fd
+      let events = keys[i].events
+      var rLength = 0 # len(data.readList) after callback
+      var wLength = 0 # len(data.writeList) after callback
+
+      if Event.Read in events or events == {Event.Error}:
+        processBasicCallbacks(fd, readList)
+        result = true
+
+      if Event.Write in events or events == {Event.Error}:
+        processBasicCallbacks(fd, writeList)
+        result = true
+
+      if Event.User in events:
+        processBasicCallbacks(fd, readList)
+        custom = true
+        if rLength == 0:
+          p.selector.unregister(fd)
+        result = true
+
+      when ioselSupportedPlatform:
+        if (customSet * events) != {}:
           custom = true
-          if rLength == 0:
-            p.selector.unregister(fd)
+          processCustomCallbacks(fd)
           result = true
 
-        when ioselSupportedPlatform:
-          if (customSet * events) != {}:
-            custom = true
-            processCustomCallbacks(fd)
-            result = true
-
-        # because state `data` can be modified in callback we need to update
-        # descriptor events with currently registered callbacks.
-        if not custom:
-          var newEvents: set[Event] = {}
-          if rLength != -1 and wLength != -1:
-            if rLength > 0: incl(newEvents, Event.Read)
-            if wLength > 0: incl(newEvents, Event.Write)
-            p.selector.updateHandle(SocketHandle(fd), newEvents)
+      # because state `data` can be modified in callback we need to update
+      # descriptor events with currently registered callbacks.
+      if not custom:
+        var newEvents: set[Event] = {}
+        if rLength != -1 and wLength != -1:
+          if rLength > 0: incl(newEvents, Event.Read)
+          if wLength > 0: incl(newEvents, Event.Write)
+          p.selector.updateHandle(SocketHandle(fd), newEvents)
 
     # Timer processing.
-    processTimers(p, result)
+    discard processTimers(p, result)
     # Callback queue processing
     processPendingCallbacks(p, result)
 
@@ -1513,7 +1516,7 @@ proc poll*(timeout = 500) =
 # Common procedures between current and upcoming asyncdispatch
 include includes.asynccommon
 
-proc sleepAsync*(ms: int): Future[void] =
+proc sleepAsync*(ms: int | float): Future[void] =
   ## Suspends the execution of the current async procedure for the next
   ## ``ms`` milliseconds.
   var retFuture = newFuture[void]("sleepAsync")
@@ -1533,7 +1536,11 @@ proc withTimeout*[T](fut: Future[T], timeout: int): Future[bool] =
   var timeoutFuture = sleepAsync(timeout)
   fut.callback =
     proc () =
-      if not retFuture.finished: retFuture.complete(true)
+      if not retFuture.finished:
+        if fut.failed:
+          retFuture.fail(fut.error)
+        else:
+          retFuture.complete(true)
   timeoutFuture.callback =
     proc () =
       if not retFuture.finished: retFuture.complete(false)