diff options
Diffstat (limited to 'lib/pure')
-rw-r--r-- | lib/pure/asyncdispatch.nim | 29 | ||||
-rw-r--r-- | lib/pure/asyncfutures.nim | 34 | ||||
-rw-r--r-- | lib/pure/asyncstreams.nim | 44 | ||||
-rw-r--r-- | lib/pure/ioselects/ioselectors_epoll.nim | 4 | ||||
-rw-r--r-- | lib/pure/selectors.nim | 2 |
5 files changed, 47 insertions, 66 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index 41d67d172..6f38583d8 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -290,7 +290,7 @@ when defined(windows) or defined(nimdoc): var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher - proc setGlobalDispatcher*(disp: owned PDispatcher) = + proc setGlobalDispatcher*(disp: sink PDispatcher) = if not gDisp.isNil: assert gDisp.callbacks.len == 0 gDisp = disp @@ -1217,10 +1217,12 @@ else: withData(selector, fd.int, fdData): case event of Event.Read: - shallowCopy(curList, fdData.readList) + #shallowCopy(curList, fdData.readList) + curList = move fdData.readList fdData.readList = newSeqOfCap[Callback](InitCallbackListSize) of Event.Write: - shallowCopy(curList, fdData.writeList) + #shallowCopy(curList, fdData.writeList) + curList = move fdData.writeList fdData.writeList = newSeqOfCap[Callback](InitCallbackListSize) else: assert false, "Cannot process callbacks for " & $event @@ -1232,8 +1234,7 @@ else: for cb in curList: if eventsExtinguished: newList.add(cb) - continue - if not cb(fd): + elif not cb(fd): # Callback wants to be called again. newList.add(cb) # This callback has returned with EAGAIN, so we don't need to @@ -1259,15 +1260,15 @@ else: result.readCbListCount = -1 result.writeCbListCount = -1 - template processCustomCallbacks(ident: untyped) = + proc processCustomCallbacks(p: PDispatcher; fd: AsyncFD) = # Process pending custom event callbacks. Custom events are # {Event.Timer, Event.Signal, Event.Process, Event.Vnode}. # There can be only one callback registered with one descriptor, # so there is no need to iterate over list. var curList: seq[Callback] - withData(p.selector, ident.int, adata) do: - shallowCopy(curList, adata.readList) + withData(p.selector, fd.int, adata) do: + curList = move adata.readList adata.readList = newSeqOfCap[Callback](InitCallbackListSize) let newLength = len(curList) @@ -1277,7 +1278,7 @@ else: if not cb(fd.AsyncFD): newList.add(cb) - withData(p.selector, ident.int, adata) do: + withData(p.selector, fd.int, adata) do: # descriptor still present in queue. adata.readList = newList & adata.readList if len(adata.readList) == 0: @@ -1308,10 +1309,6 @@ else: proc runOnce(timeout = 500): bool = let p = getGlobalDispatcher() - when ioselSupportedPlatform: - let customSet = {Event.Timer, Event.Signal, Event.Process, - Event.Vnode} - if p.selector.isEmpty() and p.timers.len == 0 and p.callbacks.len == 0: raise newException(ValueError, "No handles or timers registered in dispatcher.") @@ -1346,9 +1343,11 @@ else: result = true when ioselSupportedPlatform: + const customSet = {Event.Timer, Event.Signal, Event.Process, + Event.Vnode} if (customSet * events) != {}: isCustomEvent = true - processCustomCallbacks(fd) + processCustomCallbacks(p, fd) result = true # because state `data` can be modified in callback we need to update @@ -1612,7 +1611,7 @@ proc drain*(timeout = 500) = var curTimeout = timeout let start = now() while hasPendingOperations(): - discard runOnce(curTimeout) + discard runOnce(curTimeout) curTimeout -= (now() - start).inMilliseconds.int if curTimeout < 0: break diff --git a/lib/pure/asyncfutures.nim b/lib/pure/asyncfutures.nim index c1e2770bb..97abf72c9 100644 --- a/lib/pure/asyncfutures.nim +++ b/lib/pure/asyncfutures.nim @@ -157,33 +157,15 @@ proc checkFinished[T](future: Future[T]) = raise err proc call(callbacks: var CallbackList) = - when not defined(nimV2): - # strictly speaking a little code duplication here, but we strive - # to minimize regressions and I'm not sure I got the 'nimV2' logic - # right: - var current = callbacks - while true: - if not current.function.isNil: - callSoon(current.function) - - if current.next.isNil: - break - else: - current = current.next[] - else: - var currentFunc = unown callbacks.function - var currentNext = unown callbacks.next - - while true: - if not currentFunc.isNil: - callSoon(currentFunc) - - if currentNext.isNil: - break - else: - currentFunc = currentNext.function - currentNext = unown currentNext.next + var current = callbacks + while true: + if not current.function.isNil: + callSoon(current.function) + if current.next.isNil: + break + else: + current = current.next[] # callback will be called only once, let GC collect them now callbacks.next = nil callbacks.function = nil diff --git a/lib/pure/asyncstreams.nim b/lib/pure/asyncstreams.nim index 393262c4f..7ffde9c10 100644 --- a/lib/pure/asyncstreams.nim +++ b/lib/pure/asyncstreams.nim @@ -54,7 +54,8 @@ proc `callback=`*[T](future: FutureStream[T], ## ## If the future stream already has data or is finished then ``cb`` will be ## called immediately. - future.cb = proc () = cb(future) + proc named() = cb(future) + future.cb = named if future.queue.len > 0 or future.finished: callSoon(future.cb) @@ -90,27 +91,26 @@ proc read*[T](future: FutureStream[T]): owned(Future[(bool, T)]) = ## ``FutureStream``. var resFut = newFuture[(bool, T)]("FutureStream.take") let savedCb = future.cb - var newCb = - proc (fs: FutureStream[T]) = - # Exit early if `resFut` is already complete. (See #8994). - if resFut.finished: return - - # We don't want this callback called again. - #future.cb = nil - - # The return value depends on whether the FutureStream has finished. - var res: (bool, T) - if finished(fs): - # Remember, this callback is called when the FutureStream is completed. - res[0] = false - else: - res[0] = true - res[1] = fs.queue.popFirst() - - resFut.complete(res) - - # If the saved callback isn't nil then let's call it. - if not savedCb.isNil: savedCb() + proc newCb(fs: FutureStream[T]) = + # Exit early if `resFut` is already complete. (See #8994). + if resFut.finished: return + + # We don't want this callback called again. + #future.cb = nil + + # The return value depends on whether the FutureStream has finished. + var res: (bool, T) + if finished(fs): + # Remember, this callback is called when the FutureStream is completed. + res[0] = false + else: + res[0] = true + res[1] = fs.queue.popFirst() + + resFut.complete(res) + + # If the saved callback isn't nil then let's call it. + if not savedCb.isNil: savedCb() if future.queue.len > 0 or future.finished: newCb(future) diff --git a/lib/pure/ioselects/ioselectors_epoll.nim b/lib/pure/ioselects/ioselectors_epoll.nim index 1b317f8dc..3dcf547bd 100644 --- a/lib/pure/ioselects/ioselectors_epoll.nim +++ b/lib/pure/ioselects/ioselectors_epoll.nim @@ -514,7 +514,7 @@ template withData*[T](s: Selector[T], fd: SocketHandle|int, value, let fdi = int(fd) s.checkFd(fdi) if fdi in s: - var value = addr(s.getData(fdi)) + var value = addr(s.fds[fdi].data) body template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1, @@ -523,7 +523,7 @@ template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1, let fdi = int(fd) s.checkFd(fdi) if fdi in s: - var value = addr(s.getData(fdi)) + var value = addr(s.fds[fdi].data) body1 else: body2 diff --git a/lib/pure/selectors.nim b/lib/pure/selectors.nim index f3726efaa..e89cc7447 100644 --- a/lib/pure/selectors.nim +++ b/lib/pure/selectors.nim @@ -255,7 +255,7 @@ else: IOSelectorsException* = object of CatchableError ReadyKey* = object - fd* : int + fd*: int events*: set[Event] errorCode*: OSErrorCode |