diff options
-rw-r--r-- | lib/pure/asyncdispatch.nim | 43 | ||||
-rw-r--r-- | tests/async/tioselectors.nim | 6 |
2 files changed, 35 insertions, 14 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index a71d30ab9..675e8fc5e 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -168,18 +168,20 @@ type timers*: HeapQueue[tuple[finishAt: float, fut: Future[void]]] callbacks*: Deque[proc ()] -proc processTimers(p: PDispatcherBase) {.inline.} = +proc processTimers(p: PDispatcherBase; didSomeWork: var bool) {.inline.} = #Process just part if timers at a step var count = p.timers.len let t = epochTime() while count > 0 and t >= p.timers[0].finishAt: p.timers.pop().fut.complete() dec count + didSomeWork = true -proc processPendingCallbacks(p: PDispatcherBase) = +proc processPendingCallbacks(p: PDispatcherBase; didSomeWork: var bool) = while p.callbacks.len > 0: var cb = p.callbacks.popFirst() cb() + didSomeWork = true proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} = # If dispatcher has active timers this proc returns the timeout @@ -284,14 +286,13 @@ when defined(windows) or defined(nimdoc): let p = getGlobalDispatcher() p.handles.len != 0 or p.timers.len != 0 or p.callbacks.len != 0 - proc poll*(timeout = 500) = - ## Waits for completion events and processes them. Raises ``ValueError`` - ## if there are no pending operations. + proc runOnce(timeout = 500): bool = let p = getGlobalDispatcher() if p.handles.len == 0 and p.timers.len == 0 and p.callbacks.len == 0: raise newException(ValueError, "No handles or timers registered in dispatcher.") + result = false if p.handles.len != 0: let at = p.adjustedTimeout(timeout) var llTimeout = @@ -304,6 +305,7 @@ when defined(windows) or defined(nimdoc): let res = getQueuedCompletionStatus(p.ioPort, addr lpNumberOfBytesTransferred, addr lpCompletionKey, cast[ptr POVERLAPPED](addr customOverlapped), llTimeout).bool + result = true # http://stackoverflow.com/a/12277264/492186 # TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html @@ -333,13 +335,14 @@ when defined(windows) or defined(nimdoc): else: if errCode.int32 == WAIT_TIMEOUT: # Timed out - discard + result = false else: raiseOSError(errCode) # Timer processing. - processTimers(p) + processTimers(p, result) # Callback queue processing - processPendingCallbacks(p) + processPendingCallbacks(p, result) + var acceptEx: WSAPROC_ACCEPTEX var connectEx: WSAPROC_CONNECTEX @@ -1202,7 +1205,7 @@ else: # descriptor was unregistered in callback via `unregister()`. discard - proc poll*(timeout = 500) = + proc runOnce(timeout = 500): bool = let p = getGlobalDispatcher() when ioselSupportedPlatform: let customSet = {Event.Timer, Event.Signal, Event.Process, @@ -1212,6 +1215,7 @@ else: raise newException(ValueError, "No handles or timers registered in dispatcher.") + result = false if not p.selector.isEmpty(): var keys: array[64, ReadyKey] var count = p.selector.selectInto(p.adjustedTimeout(timeout), keys) @@ -1224,20 +1228,24 @@ else: if Event.Read in events or events == {Event.Error}: processBasicCallbacks(fd, readList) + result = true if Event.Write in events or events == {Event.Error}: processBasicCallbacks(fd, writeList) + result = true if Event.User in events or events == {Event.Error}: processBasicCallbacks(fd, readList) custom = true if rLength == 0: p.selector.unregister(fd) + result = true when ioselSupportedPlatform: if (customSet * events) != {}: custom = true processCustomCallbacks(fd) + result = true # because state `data` can be modified in callback we need to update # descriptor events with currently registered callbacks. @@ -1249,9 +1257,9 @@ else: p.selector.updateHandle(SocketHandle(fd), newEvents) # Timer processing. - processTimers(p) + processTimers(p, result) # Callback queue processing - processPendingCallbacks(p) + processPendingCallbacks(p, result) proc recv*(socket: AsyncFD, size: int, flags = {SocketFlag.SafeDisconn}): Future[string] = @@ -1474,6 +1482,19 @@ else: data.readList.add(cb) p.selector.registerEvent(SelectEvent(ev), data) +proc drain*(timeout = 500) = + ## Waits for completion events and processes them. Raises ``ValueError`` + ## if there are no pending operations. In contrast to ``poll`` this + ## processes as many events as are available. + if runOnce(timeout): + while runOnce(0): discard + +proc poll*(timeout = 500) = + ## Waits for completion events and processes them. Raises ``ValueError`` + ## if there are no pending operations. This runs the underlying OS + ## `epoll`:idx: or `kqueue`:idx: primitive only once. + discard runOnce() + # Common procedures between current and upcoming asyncdispatch include includes.asynccommon diff --git a/tests/async/tioselectors.nim b/tests/async/tioselectors.nim index 48043b4b5..d2e4cfec1 100644 --- a/tests/async/tioselectors.nim +++ b/tests/async/tioselectors.nim @@ -579,9 +579,9 @@ else: var event = newSelectEvent() selector.registerEvent(event, 1) discard selector.select(0) - event.setEvent() + event.trigger() var rc1 = selector.select(0) - event.setEvent() + event.trigger() var rc2 = selector.select(0) var rc3 = selector.select(0) assert(len(rc1) == 1 and len(rc2) == 1 and len(rc3) == 0) @@ -611,7 +611,7 @@ else: var event = newSelectEvent() for i in 0..high(thr): createThread(thr[i], event_wait_thread, event) - event.setEvent() + event.trigger() joinThreads(thr) assert(counter == 1) result = true |