diff options
Diffstat (limited to 'lib/pure/asyncstreams.nim')
-rw-r--r-- | lib/pure/asyncstreams.nim | 88 |
1 files changed, 56 insertions, 32 deletions
diff --git a/lib/pure/asyncstreams.nim b/lib/pure/asyncstreams.nim index 44e73003e..c97b98d55 100644 --- a/lib/pure/asyncstreams.nim +++ b/lib/pure/asyncstreams.nim @@ -9,9 +9,12 @@ ## Unstable API. -import asyncfutures +import std/asyncfutures -import deques +when defined(nimPreviewSlimSystem): + import std/assertions + +import std/deques type FutureStream*[T] = ref object ## Special future that acts as @@ -21,16 +24,17 @@ type queue: Deque[T] finished: bool cb: proc () {.closure, gcsafe.} + error*: ref Exception proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] = - ## Create a new ``FutureStream``. This future's callback is activated when + ## Create a new `FutureStream`. This future's callback is activated when ## two events occur: ## ## * New data is written into the future stream. ## * The future stream is completed (this means that no more data will be ## written). ## - ## Specifying ``fromProc``, which is a string specifying the name of the proc + ## Specifying `fromProc`, which is a string specifying the name of the proc ## that this future belongs to, is a good habit as it helps with debugging. ## ## **Note:** The API of FutureStream is still new and so has a higher @@ -39,8 +43,17 @@ proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] = result.queue = initDeque[T]() proc complete*[T](future: FutureStream[T]) = - ## Completes a ``FutureStream`` signalling the end of data. + ## Completes a `FutureStream` signalling the end of data. + assert(future.error == nil, "Trying to complete failed stream") + future.finished = true + if not future.cb.isNil: + future.cb() + +proc fail*[T](future: FutureStream[T], error: ref Exception) = + ## Completes `future` with `error`. + assert(not future.finished) future.finished = true + future.error = error if not future.cb.isNil: future.cb() @@ -50,24 +63,29 @@ proc `callback=`*[T](future: FutureStream[T], ## future stream. ## ## The callback is also called when the future is completed. So you should - ## use ``finished`` to check whether data is available. + ## use `finished` to check whether data is available. ## - ## If the future stream already has data or is finished then ``cb`` will be + ## If the future stream already has data or is finished then `cb` will be ## called immediately. - future.cb = proc () = cb(future) + proc named() = cb(future) + future.cb = named if future.queue.len > 0 or future.finished: callSoon(future.cb) proc finished*[T](future: FutureStream[T]): bool = - ## Check if a ``FutureStream`` is finished. ``true`` value means that + ## Check if a `FutureStream` is finished. `true` value means that ## no more data will be placed inside the stream *and* that there is ## no data waiting to be retrieved. result = future.finished and future.queue.len == 0 +proc failed*[T](future: FutureStream[T]): bool = + ## Determines whether `future` completed with an error. + return future.error != nil + proc write*[T](future: FutureStream[T], value: T): Future[void] = ## Writes the specified value inside the specified future stream. ## - ## This will raise ``ValueError`` if ``future`` is finished. + ## This will raise `ValueError` if `future` is finished. result = newFuture[void]("FutureStream.put") if future.finished: let msg = "FutureStream is finished and so no longer accepts new data." @@ -80,37 +98,43 @@ proc write*[T](future: FutureStream[T], value: T): Future[void] = result.complete() proc read*[T](future: FutureStream[T]): owned(Future[(bool, T)]) = - ## Returns a future that will complete when the ``FutureStream`` has data + ## Returns a future that will complete when the `FutureStream` has data ## placed into it. The future will be completed with the oldest ## value stored inside the stream. The return value will also determine - ## whether data was retrieved, ``false`` means that the future stream was + ## whether data was retrieved, `false` means that the future stream was ## completed and no data was retrieved. ## ## This function will remove the data that was returned from the underlying - ## ``FutureStream``. + ## `FutureStream`. var resFut = newFuture[(bool, T)]("FutureStream.take") let savedCb = future.cb - var newCb = - proc (fs: FutureStream[T]) = - # Exit early if `resFut` is already complete. (See #8994). - if resFut.finished: return - - # We don't want this callback called again. - future.cb = nil - - # The return value depends on whether the FutureStream has finished. - var res: (bool, T) - if finished(fs): - # Remember, this callback is called when the FutureStream is completed. - res[0] = false - else: - res[0] = true - res[1] = fs.queue.popFirst() - + proc newCb(fs: FutureStream[T]) = + # Exit early if `resFut` is already complete. (See #8994). + if resFut.finished: return + + # We don't want this callback called again. + #future.cb = nil + + # The return value depends on whether the FutureStream has finished. + var res: (bool, T) + if finished(fs): + # Remember, this callback is called when the FutureStream is completed. + res[0] = false + else: + res[0] = true + res[1] = fs.queue.popFirst() + + if fs.failed: + resFut.fail(fs.error) + else: resFut.complete(res) - # If the saved callback isn't nil then let's call it. - if not savedCb.isNil: savedCb() + # If the saved callback isn't nil then let's call it. + if not savedCb.isNil: + if fs.queue.len > 0: + savedCb() + else: + future.cb = savedCb if future.queue.len > 0 or future.finished: newCb(future) |