diff options
Diffstat (limited to 'lib/pure/asyncdispatch.nim')
-rw-r--r-- | lib/pure/asyncdispatch.nim | 193 |
1 files changed, 100 insertions, 93 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index a52c667fc..820f34703 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -10,6 +10,7 @@ include "system/inclrtl" import os, tables, strutils, times, heapqueue, lists, options, asyncstreams +import options, math import asyncfutures except callSoon import nativesockets, net, deques @@ -157,9 +158,6 @@ export asyncfutures, asyncstreams ## ---------------- ## ## * The effect system (``raises: []``) does not work with async procedures. -## * Can't await in a ``except`` body -## * Forward declarations for async procs are broken, -## link includes workaround: https://github.com/nim-lang/Nim/issues/3182. # TODO: Check if yielded future is nil and throw a more meaningful exception @@ -168,8 +166,10 @@ type timers*: HeapQueue[tuple[finishAt: float, fut: Future[void]]] callbacks*: Deque[proc ()] -proc processTimers(p: PDispatcherBase; didSomeWork: var bool) {.inline.} = - #Process just part if timers at a step +proc processTimers( + p: PDispatcherBase, didSomeWork: var bool +): Option[int] {.inline.} = + # Pop the timers in the order in which they will expire (smaller `finishAt`). var count = p.timers.len let t = epochTime() while count > 0 and t >= p.timers[0].finishAt: @@ -177,22 +177,25 @@ proc processTimers(p: PDispatcherBase; didSomeWork: var bool) {.inline.} = dec count didSomeWork = true + # Return the number of miliseconds in which the next timer will expire. + if p.timers.len == 0: return + + let milisecs = (p.timers[0].finishAt - epochTime()) * 1000 + return some(ceil(milisecs).int) + proc processPendingCallbacks(p: PDispatcherBase; didSomeWork: var bool) = while p.callbacks.len > 0: var cb = p.callbacks.popFirst() cb() didSomeWork = true -proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} = - # If dispatcher has active timers this proc returns the timeout - # of the nearest timer. Returns `timeout` otherwise. - result = timeout - if p.timers.len > 0: - let timerTimeout = p.timers[0].finishAt - let curTime = epochTime() - if timeout == -1 or (curTime + (timeout / 1000)) > timerTimeout: - result = int((timerTimeout - curTime) * 1000) - if result < 0: result = 0 +proc adjustTimeout(pollTimeout: int, nextTimer: Option[int]): int {.inline.} = + if nextTimer.isNone(): + return pollTimeout + + result = nextTimer.get() + if pollTimeout == -1: return + result = min(pollTimeout, result) proc callSoon(cbproc: proc ()) {.gcsafe.} @@ -299,53 +302,53 @@ when defined(windows) or defined(nimdoc): "No handles or timers registered in dispatcher.") result = false - if p.handles.len != 0: - let at = p.adjustedTimeout(timeout) - var llTimeout = - if at == -1: winlean.INFINITE - else: at.int32 - - var lpNumberOfBytesTransferred: Dword - var lpCompletionKey: ULONG_PTR - var customOverlapped: PCustomOverlapped - let res = getQueuedCompletionStatus(p.ioPort, - addr lpNumberOfBytesTransferred, addr lpCompletionKey, - cast[ptr POVERLAPPED](addr customOverlapped), llTimeout).bool - result = true - - # http://stackoverflow.com/a/12277264/492186 - # TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html - if res: - # This is useful for ensuring the reliability of the overlapped struct. + let nextTimer = processTimers(p, result) + let at = adjustTimeout(timeout, nextTimer) + var llTimeout = + if at == -1: winlean.INFINITE + else: at.int32 + + var lpNumberOfBytesTransferred: Dword + var lpCompletionKey: ULONG_PTR + var customOverlapped: PCustomOverlapped + let res = getQueuedCompletionStatus(p.ioPort, + addr lpNumberOfBytesTransferred, addr lpCompletionKey, + cast[ptr POVERLAPPED](addr customOverlapped), llTimeout).bool + result = true + + # http://stackoverflow.com/a/12277264/492186 + # TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html + if res: + # This is useful for ensuring the reliability of the overlapped struct. + assert customOverlapped.data.fd == lpCompletionKey.AsyncFD + + customOverlapped.data.cb(customOverlapped.data.fd, + lpNumberOfBytesTransferred, OSErrorCode(-1)) + + # If cell.data != nil, then system.protect(rawEnv(cb)) was called, + # so we need to dispose our `cb` environment, because it is not needed + # anymore. + if customOverlapped.data.cell.data != nil: + system.dispose(customOverlapped.data.cell) + + GC_unref(customOverlapped) + else: + let errCode = osLastError() + if customOverlapped != nil: assert customOverlapped.data.fd == lpCompletionKey.AsyncFD - customOverlapped.data.cb(customOverlapped.data.fd, - lpNumberOfBytesTransferred, OSErrorCode(-1)) - - # If cell.data != nil, then system.protect(rawEnv(cb)) was called, - # so we need to dispose our `cb` environment, because it is not needed - # anymore. + lpNumberOfBytesTransferred, errCode) if customOverlapped.data.cell.data != nil: system.dispose(customOverlapped.data.cell) - GC_unref(customOverlapped) else: - let errCode = osLastError() - if customOverlapped != nil: - assert customOverlapped.data.fd == lpCompletionKey.AsyncFD - customOverlapped.data.cb(customOverlapped.data.fd, - lpNumberOfBytesTransferred, errCode) - if customOverlapped.data.cell.data != nil: - system.dispose(customOverlapped.data.cell) - GC_unref(customOverlapped) - else: - if errCode.int32 == WAIT_TIMEOUT: - # Timed out - result = false - else: raiseOSError(errCode) + if errCode.int32 == WAIT_TIMEOUT: + # Timed out + result = false + else: raiseOSError(errCode) # Timer processing. - processTimers(p, result) + discard processTimers(p, result) # Callback queue processing processPendingCallbacks(p, result) @@ -1231,48 +1234,48 @@ else: "No handles or timers registered in dispatcher.") result = false - if not p.selector.isEmpty(): - var keys: array[64, ReadyKey] - var count = p.selector.selectInto(p.adjustedTimeout(timeout), keys) - for i in 0..<count: - var custom = false - let fd = keys[i].fd - let events = keys[i].events - var rLength = 0 # len(data.readList) after callback - var wLength = 0 # len(data.writeList) after callback - - if Event.Read in events or events == {Event.Error}: - processBasicCallbacks(fd, readList) - result = true - - if Event.Write in events or events == {Event.Error}: - processBasicCallbacks(fd, writeList) - result = true - - if Event.User in events: - processBasicCallbacks(fd, readList) + var keys: array[64, ReadyKey] + let nextTimer = processTimers(p, result) + var count = p.selector.selectInto(adjustTimeout(timeout, nextTimer), keys) + for i in 0..<count: + var custom = false + let fd = keys[i].fd + let events = keys[i].events + var rLength = 0 # len(data.readList) after callback + var wLength = 0 # len(data.writeList) after callback + + if Event.Read in events or events == {Event.Error}: + processBasicCallbacks(fd, readList) + result = true + + if Event.Write in events or events == {Event.Error}: + processBasicCallbacks(fd, writeList) + result = true + + if Event.User in events: + processBasicCallbacks(fd, readList) + custom = true + if rLength == 0: + p.selector.unregister(fd) + result = true + + when ioselSupportedPlatform: + if (customSet * events) != {}: custom = true - if rLength == 0: - p.selector.unregister(fd) + processCustomCallbacks(fd) result = true - when ioselSupportedPlatform: - if (customSet * events) != {}: - custom = true - processCustomCallbacks(fd) - result = true - - # because state `data` can be modified in callback we need to update - # descriptor events with currently registered callbacks. - if not custom: - var newEvents: set[Event] = {} - if rLength != -1 and wLength != -1: - if rLength > 0: incl(newEvents, Event.Read) - if wLength > 0: incl(newEvents, Event.Write) - p.selector.updateHandle(SocketHandle(fd), newEvents) + # because state `data` can be modified in callback we need to update + # descriptor events with currently registered callbacks. + if not custom: + var newEvents: set[Event] = {} + if rLength != -1 and wLength != -1: + if rLength > 0: incl(newEvents, Event.Read) + if wLength > 0: incl(newEvents, Event.Write) + p.selector.updateHandle(SocketHandle(fd), newEvents) # Timer processing. - processTimers(p, result) + discard processTimers(p, result) # Callback queue processing processPendingCallbacks(p, result) @@ -1513,7 +1516,7 @@ proc poll*(timeout = 500) = # Common procedures between current and upcoming asyncdispatch include includes.asynccommon -proc sleepAsync*(ms: int): Future[void] = +proc sleepAsync*(ms: int | float): Future[void] = ## Suspends the execution of the current async procedure for the next ## ``ms`` milliseconds. var retFuture = newFuture[void]("sleepAsync") @@ -1533,7 +1536,11 @@ proc withTimeout*[T](fut: Future[T], timeout: int): Future[bool] = var timeoutFuture = sleepAsync(timeout) fut.callback = proc () = - if not retFuture.finished: retFuture.complete(true) + if not retFuture.finished: + if fut.failed: + retFuture.fail(fut.error) + else: + retFuture.complete(true) timeoutFuture.callback = proc () = if not retFuture.finished: retFuture.complete(false) |