diff options
-rw-r--r-- | lib/pure/asyncdispatch.nim | 34 | ||||
-rw-r--r-- | lib/pure/asyncfutures.nim (renamed from lib/pure/includes/asyncfutures.nim) | 211 | ||||
-rw-r--r-- | lib/pure/asyncstreams.nim | 105 | ||||
-rw-r--r-- | lib/upcoming/asyncdispatch.nim | 33 | ||||
-rw-r--r-- | tests/async/tasyncrecursion.nim | 1 | ||||
-rw-r--r-- | tests/async/tcallbacks.nim | 20 |
6 files changed, 259 insertions, 145 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index 8c1cf6b18..28b20feaa 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -9,11 +9,12 @@ include "system/inclrtl" -import os, tables, strutils, times, heapqueue, options - +import os, tables, strutils, times, heapqueue, options, asyncstreams +import asyncfutures except callSoon import nativesockets, net, deques export Port, SocketFlag +export asyncfutures, asyncstreams #{.injectStmt: newGcInvariant().} @@ -159,8 +160,6 @@ export Port, SocketFlag # TODO: Check if yielded future is nil and throw a more meaningful exception -include includes/asyncfutures - type PDispatcherBase = ref object of RootRef timers*: HeapQueue[tuple[finishAt: float, fut: Future[void]]] @@ -190,6 +189,12 @@ proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} = result = int((timerTimeout - curTime) * 1000) if result < 0: result = 0 +proc callSoon(cbproc: proc ()) {.gcsafe.} + +proc initCallSoonProc = + if asyncfutures.getCallSoonProc().isNil: + asyncfutures.setCallSoonProc(callSoon) + when defined(windows) or defined(nimdoc): import winlean, sets, hashes type @@ -237,15 +242,17 @@ when defined(windows) or defined(nimdoc): result.callbacks = initDeque[proc ()](64) var gDisp{.threadvar.}: PDispatcher ## Global dispatcher - proc getGlobalDispatcher*(): PDispatcher = - ## Retrieves the global thread-local dispatcher. - if gDisp.isNil: gDisp = newDispatcher() - result = gDisp proc setGlobalDispatcher*(disp: PDispatcher) = if not gDisp.isNil: assert gDisp.callbacks.len == 0 gDisp = disp + initCallSoonProc() + + proc getGlobalDispatcher*(): PDispatcher = + if gDisp.isNil: + setGlobalDispatcher(newDispatcher()) + result = gDisp proc register*(fd: AsyncFD) = ## Registers ``fd`` with the dispatcher. @@ -932,14 +939,17 @@ else: result.callbacks = initDeque[proc ()](64) var gDisp{.threadvar.}: PDispatcher ## Global dispatcher - proc getGlobalDispatcher*(): PDispatcher = - if gDisp.isNil: gDisp = newDispatcher() - result = gDisp proc setGlobalDispatcher*(disp: PDispatcher) = if not gDisp.isNil: assert gDisp.callbacks.len == 0 gDisp = disp + initCallSoonProc() + + proc getGlobalDispatcher*(): PDispatcher = + if gDisp.isNil: + setGlobalDispatcher(newDispatcher()) + result = gDisp proc update(fd: AsyncFD, events: set[Event]) = let p = getGlobalDispatcher() @@ -1327,7 +1337,7 @@ proc recvLine*(socket: AsyncFD): Future[string] {.async, deprecated.} = return add(result, c) -proc callSoon*(cbproc: proc ()) = +proc callSoon(cbproc: proc ()) = ## Schedule `cbproc` to be called as soon as possible. ## The callback is called when control returns to the event loop. getGlobalDispatcher().callbacks.addLast(cbproc) diff --git a/lib/pure/includes/asyncfutures.nim b/lib/pure/asyncfutures.nim index 6af5bf3cf..a9e97c14c 100644 --- a/lib/pure/includes/asyncfutures.nim +++ b/lib/pure/asyncfutures.nim @@ -1,8 +1,16 @@ +import os, tables, strutils, times, heapqueue, options, deques # TODO: This shouldn't need to be included, but should ideally be exported. type + CallbackFunc = proc () {.closure, gcsafe.} + + CallbackList = object + function: CallbackFunc + next: ref CallbackList + FutureBase* = ref object of RootObj ## Untyped future. - cb: proc () {.closure,gcsafe.} + callbacks: CallbackList + finished: bool error*: ref Exception ## Stored exception errorStackTrace*: string @@ -16,12 +24,6 @@ type FutureVar*[T] = distinct Future[T] - FutureStream*[T] = ref object of FutureBase ## Special future that acts as - ## a queue. Its API is still - ## experimental and so is - ## subject to change. - queue: Deque[T] - FutureError* = object of Exception cause*: FutureBase @@ -30,7 +32,27 @@ type when not defined(release): var currentID = 0 -proc callSoon*(cbproc: proc ()) {.gcsafe.} +var callSoonProc {.threadvar.}: proc (cbproc: proc ()) {.gcsafe.} + +proc getCallSoonProc*(): (proc(cbproc: proc ()) {.gcsafe.}) = + ## Get current implementation of ``callSoon``. + return callSoonProc + +proc setCallSoonProc*(p: (proc(cbproc: proc ()) {.gcsafe.})) = + ## Change current implementation of ``callSoon``. This is normally called when dispatcher from ``asyncdispatcher`` is initialized. + callSoonProc = p + +proc callSoon*(cbproc: proc ()) = + ## Call ``cbproc`` "soon". + ## + ## If async dispatcher is running, ``cbproc`` will be executed during next dispatcher tick. + ## + ## If async dispatcher is not running, ``cbproc`` will be executed immediately. + if callSoonProc.isNil: + # Loop not initialized yet. Call the function directly to allow setup code to use futures. + cbproc() + else: + callSoonProc(cbproc) template setupFutureBase(fromProc: string) = new(result) @@ -56,22 +78,6 @@ proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] = ## that this future belongs to, is a good habit as it helps with debugging. result = FutureVar[T](newFuture[T](fromProc)) -proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] = - ## Create a new ``FutureStream``. This future's callback is activated when - ## two events occur: - ## - ## * New data is written into the future stream. - ## * The future stream is completed (this means that no more data will be - ## written). - ## - ## Specifying ``fromProc``, which is a string specifying the name of the proc - ## that this future belongs to, is a good habit as it helps with debugging. - ## - ## **Note:** The API of FutureStream is still new and so has a higher - ## likelihood of changing in the future. - setupFutureBase(fromProc) - result.queue = initDeque[T]() - proc clean*[T](future: FutureVar[T]) = ## Resets the ``finished`` status of ``future``. Future[T](future).finished = false @@ -98,6 +104,33 @@ proc checkFinished[T](future: Future[T]) = err.cause = future raise err +proc call(callbacks: var CallbackList) = + var current = callbacks + + while true: + if not current.function.isNil: + callSoon(current.function) + + if current.next.isNil: + break + else: + current = current.next[] + + # callback will be called only once, let GC collect them now + callbacks.next = nil + callbacks.function = nil + +proc add(callbacks: var CallbackList, function: CallbackFunc) = + if callbacks.function.isNil: + callbacks.function = function + assert callbacks.next == nil + else: + let newNext = new(ref CallbackList) + newNext.function = callbacks.function + newNext.next = callbacks.next + callbacks.next = newNext + callbacks.function = function + proc complete*[T](future: Future[T], val: T) = ## Completes ``future`` with value ``val``. #assert(not future.finished, "Future already finished, cannot finish twice.") @@ -105,8 +138,7 @@ proc complete*[T](future: Future[T], val: T) = assert(future.error == nil) future.value = val future.finished = true - if future.cb != nil: - future.cb() + future.callbacks.call() proc complete*(future: Future[void]) = ## Completes a void ``future``. @@ -114,8 +146,7 @@ proc complete*(future: Future[void]) = checkFinished(future) assert(future.error == nil) future.finished = true - if future.cb != nil: - future.cb() + future.callbacks.call() proc complete*[T](future: FutureVar[T]) = ## Completes a ``FutureVar``. @@ -123,8 +154,7 @@ proc complete*[T](future: FutureVar[T]) = checkFinished(fut) assert(fut.error == nil) fut.finished = true - if fut.cb != nil: - fut.cb() + fut.callbacks.call() proc complete*[T](future: FutureVar[T], val: T) = ## Completes a ``FutureVar`` with value ``val``. @@ -138,12 +168,6 @@ proc complete*[T](future: FutureVar[T], val: T) = if not fut.cb.isNil(): fut.cb() -proc complete*[T](future: FutureStream[T]) = - ## Completes a ``FutureStream`` signalling the end of data. - future.finished = true - if not future.cb.isNil(): - future.cb() - proc fail*[T](future: Future[T], error: ref Exception) = ## Completes ``future`` with ``error``. #assert(not future.finished, "Future already finished, cannot finish twice.") @@ -152,26 +176,40 @@ proc fail*[T](future: Future[T], error: ref Exception) = future.error = error future.errorStackTrace = if getStackTrace(error) == "": getStackTrace() else: getStackTrace(error) - if future.cb != nil: - future.cb() + future.callbacks.call() + +proc clearCallbacks(future: FutureBase) = + future.callbacks.function = nil + future.callbacks.next = nil + +proc addCallback*(future: FutureBase, cb: proc() {.closure,gcsafe.}) = + ## Adds the callbacks proc to be called when the future completes. + ## + ## If future has already completed then ``cb`` will be called immediately. + assert cb != nil + if future.finished: + callSoon(cb) else: - # This is to prevent exceptions from being silently ignored when a future - # is discarded. - # TODO: This may turn out to be a bad idea. - # Turns out this is a bad idea. - #raise error - discard + future.callbacks.add cb + +proc addCallback*[T](future: Future[T], + cb: proc (future: Future[T]) {.closure,gcsafe.}) = + ## Adds the callbacks proc to be called when the future completes. + ## + ## If future has already completed then ``cb`` will be called immediately. + future.addCallback( + proc() = + cb(future) + ) proc `callback=`*(future: FutureBase, cb: proc () {.closure,gcsafe.}) = - ## Sets the callback proc to be called when the future completes. + ## Clears the list of callbacks and sets the callback proc to be called when the future completes. ## ## If future has already completed then ``cb`` will be called immediately. ## - ## **Note**: You most likely want the other ``callback`` setter which - ## passes ``future`` as a param to the callback. - future.cb = cb - if future.finished: - callSoon(future.cb) + ## It's recommended to use ``addCallback`` or ``then`` instead. + future.clearCallbacks + future.addCallback cb proc `callback=`*[T](future: Future[T], cb: proc (future: Future[T]) {.closure,gcsafe.}) = @@ -180,20 +218,6 @@ proc `callback=`*[T](future: Future[T], ## If future has already completed then ``cb`` will be called immediately. future.callback = proc () = cb(future) -proc `callback=`*[T](future: FutureStream[T], - cb: proc (future: FutureStream[T]) {.closure,gcsafe.}) = - ## Sets the callback proc to be called when data was placed inside the - ## future stream. - ## - ## The callback is also called when the future is completed. So you should - ## use ``finished`` to check whether data is available. - ## - ## If the future stream already has data or is finished then ``cb`` will be - ## called immediately. - future.cb = proc () = cb(future) - if future.queue.len > 0 or future.finished: - callSoon(future.cb) - proc injectStacktrace[T](future: Future[T]) = # TODO: Come up with something better. when not defined(release): @@ -240,18 +264,12 @@ proc mget*[T](future: FutureVar[T]): var T = ## Future has not been finished. result = Future[T](future).value -proc finished*[T](future: Future[T] | FutureVar[T] | FutureStream[T]): bool = +proc finished*[T](future: Future[T] | FutureVar[T]): bool = ## Determines whether ``future`` has completed. ## ## ``True`` may indicate an error or a value. Use ``failed`` to distinguish. - ## - ## For a ``FutureStream`` a ``true`` value means that no more data will be - ## placed inside the stream _and_ that there is no data waiting to be - ## retrieved. when future is FutureVar[T]: result = (Future[T](future)).finished - elif future is FutureStream[T]: - result = future.finished and future.queue.len == 0 else: result = future.finished @@ -259,57 +277,6 @@ proc failed*(future: FutureBase): bool = ## Determines whether ``future`` completed with an error. return future.error != nil -proc write*[T](future: FutureStream[T], value: T): Future[void] = - ## Writes the specified value inside the specified future stream. - ## - ## This will raise ``ValueError`` if ``future`` is finished. - result = newFuture[void]("FutureStream.put") - if future.finished: - let msg = "FutureStream is finished and so no longer accepts new data." - result.fail(newException(ValueError, msg)) - return - # TODO: Implement limiting of the streams storage to prevent it growing - # infinitely when no reads are occuring. - future.queue.addLast(value) - if not future.cb.isNil: future.cb() - result.complete() - -proc read*[T](future: FutureStream[T]): Future[(bool, T)] = - ## Returns a future that will complete when the ``FutureStream`` has data - ## placed into it. The future will be completed with the oldest - ## value stored inside the stream. The return value will also determine - ## whether data was retrieved, ``false`` means that the future stream was - ## completed and no data was retrieved. - ## - ## This function will remove the data that was returned from the underlying - ## ``FutureStream``. - var resFut = newFuture[(bool, T)]("FutureStream.take") - let savedCb = future.cb - future.callback = - proc (fs: FutureStream[T]) = - # We don't want this callback called again. - future.cb = nil - - # The return value depends on whether the FutureStream has finished. - var res: (bool, T) - if finished(fs): - # Remember, this callback is called when the FutureStream is completed. - res[0] = false - else: - res[0] = true - res[1] = fs.queue.popFirst() - - if not resFut.finished: - resFut.complete(res) - - # If the saved callback isn't nil then let's call it. - if not savedCb.isNil: savedCb() - return resFut - -proc len*[T](future: FutureStream[T]): int = - ## Returns the amount of data pieces inside the stream. - future.queue.len - proc asyncCheck*[T](future: Future[T]) = ## Sets a callback on ``future`` which raises an exception if the future ## finished with an error. diff --git a/lib/pure/asyncstreams.nim b/lib/pure/asyncstreams.nim new file mode 100644 index 000000000..d3ea143f3 --- /dev/null +++ b/lib/pure/asyncstreams.nim @@ -0,0 +1,105 @@ +import asyncfutures + +import deques + +type + FutureStream*[T] = ref object ## Special future that acts as + ## a queue. Its API is still + ## experimental and so is + ## subject to change. + queue: Deque[T] + finished: bool + cb: proc () {.closure, gcsafe.} + +proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] = + ## Create a new ``FutureStream``. This future's callback is activated when + ## two events occur: + ## + ## * New data is written into the future stream. + ## * The future stream is completed (this means that no more data will be + ## written). + ## + ## Specifying ``fromProc``, which is a string specifying the name of the proc + ## that this future belongs to, is a good habit as it helps with debugging. + ## + ## **Note:** The API of FutureStream is still new and so has a higher + ## likelihood of changing in the future. + result = FutureStream[T](finished: false, cb: nil) + result.queue = initDeque[T]() + +proc complete*[T](future: FutureStream[T]) = + ## Completes a ``FutureStream`` signalling the end of data. + future.finished = true + if not future.cb.isNil: + future.cb() + +proc `callback=`*[T](future: FutureStream[T], + cb: proc (future: FutureStream[T]) {.closure,gcsafe.}) = + ## Sets the callback proc to be called when data was placed inside the + ## future stream. + ## + ## The callback is also called when the future is completed. So you should + ## use ``finished`` to check whether data is available. + ## + ## If the future stream already has data or is finished then ``cb`` will be + ## called immediately. + future.cb = proc () = cb(future) + if future.queue.len > 0 or future.finished: + callSoon(future.cb) + +proc finished*[T](future: FutureStream[T]): bool = + ## Check if a ``FutureStream`` is finished. ``true`` value means that + ## no more data will be placed inside the stream _and_ that there is + ## no data waiting to be retrieved. + result = future.finished and future.queue.len == 0 + +proc write*[T](future: FutureStream[T], value: T): Future[void] = + ## Writes the specified value inside the specified future stream. + ## + ## This will raise ``ValueError`` if ``future`` is finished. + result = newFuture[void]("FutureStream.put") + if future.finished: + let msg = "FutureStream is finished and so no longer accepts new data." + result.fail(newException(ValueError, msg)) + return + # TODO: Implement limiting of the streams storage to prevent it growing + # infinitely when no reads are occuring. + future.queue.addLast(value) + if not future.cb.isNil: future.cb() + result.complete() + +proc read*[T](future: FutureStream[T]): Future[(bool, T)] = + ## Returns a future that will complete when the ``FutureStream`` has data + ## placed into it. The future will be completed with the oldest + ## value stored inside the stream. The return value will also determine + ## whether data was retrieved, ``false`` means that the future stream was + ## completed and no data was retrieved. + ## + ## This function will remove the data that was returned from the underlying + ## ``FutureStream``. + var resFut = newFuture[(bool, T)]("FutureStream.take") + let savedCb = future.cb + future.callback = + proc (fs: FutureStream[T]) = + # We don't want this callback called again. + future.cb = nil + + # The return value depends on whether the FutureStream has finished. + var res: (bool, T) + if finished(fs): + # Remember, this callback is called when the FutureStream is completed. + res[0] = false + else: + res[0] = true + res[1] = fs.queue.popFirst() + + if not resFut.finished: + resFut.complete(res) + + # If the saved callback isn't nil then let's call it. + if not savedCb.isNil: savedCb() + return resFut + +proc len*[T](future: FutureStream[T]): int = + ## Returns the amount of data pieces inside the stream. + future.queue.len diff --git a/lib/upcoming/asyncdispatch.nim b/lib/upcoming/asyncdispatch.nim index 1623d8375..4e3b06173 100644 --- a/lib/upcoming/asyncdispatch.nim +++ b/lib/upcoming/asyncdispatch.nim @@ -9,11 +9,13 @@ include "system/inclrtl" -import os, tables, strutils, times, heapqueue, lists, options +import os, tables, strutils, times, heapqueue, lists, options, asyncstreams +import asyncfutures except callSoon import nativesockets, net, deques export Port, SocketFlag +export asyncfutures, asyncstreams #{.injectStmt: newGcInvariant().} @@ -130,8 +132,6 @@ export Port, SocketFlag # TODO: Check if yielded future is nil and throw a more meaningful exception -include "../includes/asyncfutures" - type PDispatcherBase = ref object of RootRef timers: HeapQueue[tuple[finishAt: float, fut: Future[void]]] @@ -161,6 +161,12 @@ proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} = result = int((timerTimeout - curTime) * 1000) if result < 0: result = 0 +proc callSoon(cbproc: proc ()) {.gcsafe.} + +proc initCallSoonProc = + if asyncfutures.getCallSoonProc().isNil: + asyncfutures.setCallSoonProc(callSoon) + when defined(windows) or defined(nimdoc): import winlean, sets, hashes type @@ -214,15 +220,17 @@ when defined(windows) or defined(nimdoc): result.callbacks = initDeque[proc ()](64) var gDisp{.threadvar.}: PDispatcher ## Global dispatcher - proc getGlobalDispatcher*(): PDispatcher = - ## Retrieves the global thread-local dispatcher. - if gDisp.isNil: gDisp = newDispatcher() - result = gDisp proc setGlobalDispatcher*(disp: PDispatcher) = if not gDisp.isNil: assert gDisp.callbacks.len == 0 gDisp = disp + initCallSoonProc() + + proc getGlobalDispatcher*(): PDispatcher = + if gDisp.isNil: + setGlobalDispatcher(newDispatcher()) + result = gDisp proc register*(fd: AsyncFD) = ## Registers ``fd`` with the dispatcher. @@ -1081,14 +1089,17 @@ else: result.callbacks = initDeque[proc ()](InitDelayedCallbackListSize) var gDisp{.threadvar.}: PDispatcher ## Global dispatcher - proc getGlobalDispatcher*(): PDispatcher = - if gDisp.isNil: gDisp = newDispatcher() - result = gDisp proc setGlobalDispatcher*(disp: PDispatcher) = if not gDisp.isNil: assert gDisp.callbacks.len == 0 gDisp = disp + initCallSoonProc() + + proc getGlobalDispatcher*(): PDispatcher = + if gDisp.isNil: + setGlobalDispatcher(newDispatcher()) + result = gDisp proc register*(fd: AsyncFD) = let p = getGlobalDispatcher() @@ -1601,7 +1612,7 @@ proc recvLine*(socket: AsyncFD): Future[string] {.async.} = return add(result, c) -proc callSoon*(cbproc: proc ()) = +proc callSoon(cbproc: proc ()) = ## Schedule `cbproc` to be called as soon as possible. ## The callback is called when control returns to the event loop. getGlobalDispatcher().callbacks.addLast(cbproc) diff --git a/tests/async/tasyncrecursion.nim b/tests/async/tasyncrecursion.nim index 54482edab..1aeebe9b4 100644 --- a/tests/async/tasyncrecursion.nim +++ b/tests/async/tasyncrecursion.nim @@ -17,5 +17,6 @@ proc asyncRecursionTest*(): Future[int] {.async.} = inc(i) when isMainModule: + setGlobalDispatcher(newDispatcher()) var i = waitFor asyncRecursionTest() echo i diff --git a/tests/async/tcallbacks.nim b/tests/async/tcallbacks.nim new file mode 100644 index 000000000..8c08032cd --- /dev/null +++ b/tests/async/tcallbacks.nim @@ -0,0 +1,20 @@ +discard """ + exitcode: 0 + output: '''3 +2 +1 +5 +''' +""" +import asyncfutures + +let f1: Future[int] = newFuture[int]() +f1.addCallback(proc() = echo 1) +f1.addCallback(proc() = echo 2) +f1.addCallback(proc() = echo 3) +f1.complete(10) + +let f2: Future[int] = newFuture[int]() +f2.addCallback(proc() = echo 4) +f2.callback = proc() = echo 5 +f2.complete(10) |