summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--lib/pure/asyncdispatch.nim43
-rw-r--r--tests/async/tioselectors.nim6
2 files changed, 35 insertions, 14 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim
index a71d30ab9..675e8fc5e 100644
--- a/lib/pure/asyncdispatch.nim
+++ b/lib/pure/asyncdispatch.nim
@@ -168,18 +168,20 @@ type
     timers*: HeapQueue[tuple[finishAt: float, fut: Future[void]]]
     callbacks*: Deque[proc ()]
 
-proc processTimers(p: PDispatcherBase) {.inline.} =
+proc processTimers(p: PDispatcherBase; didSomeWork: var bool) {.inline.} =
   #Process just part if timers at a step
   var count = p.timers.len
   let t = epochTime()
   while count > 0 and t >= p.timers[0].finishAt:
     p.timers.pop().fut.complete()
     dec count
+    didSomeWork = true
 
-proc processPendingCallbacks(p: PDispatcherBase) =
+proc processPendingCallbacks(p: PDispatcherBase; didSomeWork: var bool) =
   while p.callbacks.len > 0:
     var cb = p.callbacks.popFirst()
     cb()
+    didSomeWork = true
 
 proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} =
   # If dispatcher has active timers this proc returns the timeout
@@ -284,14 +286,13 @@ when defined(windows) or defined(nimdoc):
     let p = getGlobalDispatcher()
     p.handles.len != 0 or p.timers.len != 0 or p.callbacks.len != 0
 
-  proc poll*(timeout = 500) =
-    ## Waits for completion events and processes them. Raises ``ValueError``
-    ## if there are no pending operations.
+  proc runOnce(timeout = 500): bool =
     let p = getGlobalDispatcher()
     if p.handles.len == 0 and p.timers.len == 0 and p.callbacks.len == 0:
       raise newException(ValueError,
         "No handles or timers registered in dispatcher.")
 
+    result = false
     if p.handles.len != 0:
       let at = p.adjustedTimeout(timeout)
       var llTimeout =
@@ -304,6 +305,7 @@ when defined(windows) or defined(nimdoc):
       let res = getQueuedCompletionStatus(p.ioPort,
           addr lpNumberOfBytesTransferred, addr lpCompletionKey,
           cast[ptr POVERLAPPED](addr customOverlapped), llTimeout).bool
+      result = true
 
       # http://stackoverflow.com/a/12277264/492186
       # TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html
@@ -333,13 +335,14 @@ when defined(windows) or defined(nimdoc):
         else:
           if errCode.int32 == WAIT_TIMEOUT:
             # Timed out
-            discard
+            result = false
           else: raiseOSError(errCode)
 
     # Timer processing.
-    processTimers(p)
+    processTimers(p, result)
     # Callback queue processing
-    processPendingCallbacks(p)
+    processPendingCallbacks(p, result)
+
 
   var acceptEx: WSAPROC_ACCEPTEX
   var connectEx: WSAPROC_CONNECTEX
@@ -1202,7 +1205,7 @@ else:
       # descriptor was unregistered in callback via `unregister()`.
       discard
 
-  proc poll*(timeout = 500) =
+  proc runOnce(timeout = 500): bool =
     let p = getGlobalDispatcher()
     when ioselSupportedPlatform:
       let customSet = {Event.Timer, Event.Signal, Event.Process,
@@ -1212,6 +1215,7 @@ else:
       raise newException(ValueError,
         "No handles or timers registered in dispatcher.")
 
+    result = false
     if not p.selector.isEmpty():
       var keys: array[64, ReadyKey]
       var count = p.selector.selectInto(p.adjustedTimeout(timeout), keys)
@@ -1224,20 +1228,24 @@ else:
 
         if Event.Read in events or events == {Event.Error}:
           processBasicCallbacks(fd, readList)
+          result = true
 
         if Event.Write in events or events == {Event.Error}:
           processBasicCallbacks(fd, writeList)
+          result = true
 
         if Event.User in events or events == {Event.Error}:
           processBasicCallbacks(fd, readList)
           custom = true
           if rLength == 0:
             p.selector.unregister(fd)
+          result = true
 
         when ioselSupportedPlatform:
           if (customSet * events) != {}:
             custom = true
             processCustomCallbacks(fd)
+            result = true
 
         # because state `data` can be modified in callback we need to update
         # descriptor events with currently registered callbacks.
@@ -1249,9 +1257,9 @@ else:
             p.selector.updateHandle(SocketHandle(fd), newEvents)
 
     # Timer processing.
-    processTimers(p)
+    processTimers(p, result)
     # Callback queue processing
-    processPendingCallbacks(p)
+    processPendingCallbacks(p, result)
 
   proc recv*(socket: AsyncFD, size: int,
              flags = {SocketFlag.SafeDisconn}): Future[string] =
@@ -1474,6 +1482,19 @@ else:
     data.readList.add(cb)
     p.selector.registerEvent(SelectEvent(ev), data)
 
+proc drain*(timeout = 500) =
+  ## Waits for completion events and processes them. Raises ``ValueError``
+  ## if there are no pending operations. In contrast to ``poll`` this
+  ## processes as many events as are available.
+  if runOnce(timeout):
+    while runOnce(0): discard
+
+proc poll*(timeout = 500) =
+  ## Waits for completion events and processes them. Raises ``ValueError``
+  ## if there are no pending operations. This runs the underlying OS
+  ## `epoll`:idx: or `kqueue`:idx: primitive only once.
+  discard runOnce()
+
 # Common procedures between current and upcoming asyncdispatch
 include includes.asynccommon
 
diff --git a/tests/async/tioselectors.nim b/tests/async/tioselectors.nim
index 48043b4b5..d2e4cfec1 100644
--- a/tests/async/tioselectors.nim
+++ b/tests/async/tioselectors.nim
@@ -579,9 +579,9 @@ else:
     var event = newSelectEvent()
     selector.registerEvent(event, 1)
     discard selector.select(0)
-    event.setEvent()
+    event.trigger()
     var rc1 = selector.select(0)
-    event.setEvent()
+    event.trigger()
     var rc2 = selector.select(0)
     var rc3 = selector.select(0)
     assert(len(rc1) == 1 and len(rc2) == 1 and len(rc3) == 0)
@@ -611,7 +611,7 @@ else:
       var event = newSelectEvent()
       for i in 0..high(thr):
         createThread(thr[i], event_wait_thread, event)
-      event.setEvent()
+      event.trigger()
       joinThreads(thr)
       assert(counter == 1)
       result = true