diff options
-rw-r--r-- | lib/pure/includes/asyncfutures.nim | 92 | ||||
-rw-r--r-- | tests/async/tfuturestream.nim | 19 |
2 files changed, 103 insertions, 8 deletions
diff --git a/lib/pure/includes/asyncfutures.nim b/lib/pure/includes/asyncfutures.nim index c83228014..a41f03a00 100644 --- a/lib/pure/includes/asyncfutures.nim +++ b/lib/pure/includes/asyncfutures.nim @@ -1,3 +1,4 @@ +import queues # TODO: This shouldn't need to be included, but should ideally be exported. type @@ -16,6 +17,10 @@ type FutureVar*[T] = distinct Future[T] + FutureStream*[T] = ref object of FutureBase ## Special future that acts as + ## a queue. + queue: Queue[T] + FutureError* = object of Exception cause*: FutureBase @@ -26,11 +31,7 @@ when not defined(release): proc callSoon*(cbproc: proc ()) {.gcsafe.} -proc newFuture*[T](fromProc: string = "unspecified"): Future[T] = - ## Creates a new future. - ## - ## Specifying ``fromProc``, which is a string specifying the name of the proc - ## that this future belongs to, is a good habit as it helps with debugging. +template setupFutureBase(fromProc: string): stmt = new(result) result.finished = false when not defined(release): @@ -39,6 +40,13 @@ proc newFuture*[T](fromProc: string = "unspecified"): Future[T] = result.fromProc = fromProc currentID.inc() +proc newFuture*[T](fromProc: string = "unspecified"): Future[T] = + ## Creates a new future. + ## + ## Specifying ``fromProc``, which is a string specifying the name of the proc + ## that this future belongs to, is a good habit as it helps with debugging. + setupFutureBase(fromProc) + proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] = ## Create a new ``FutureVar``. This Future type is ideally suited for ## situations where you want to avoid unnecessary allocations of Futures. @@ -47,6 +55,15 @@ proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] = ## that this future belongs to, is a good habit as it helps with debugging. result = FutureVar[T](newFuture[T](fromProc)) +proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] = + ## Create a new ``FutureStream``. This Future type's callback can be activated + ## multiple times when new data is written to it. + ## + ## Specifying ``fromProc``, which is a string specifying the name of the proc + ## that this future belongs to, is a good habit as it helps with debugging. + setupFutureBase(fromProc) + result.queue = initQueue[T]() + proc clean*[T](future: FutureVar[T]) = ## Resets the ``finished`` status of ``future``. Future[T](future).finished = false @@ -107,12 +124,18 @@ proc complete*[T](future: FutureVar[T], val: T) = ## Any previously stored value will be overwritten. template fut: untyped = Future[T](future) checkFinished(fut) - assert(fut.error == nil) + assert(fut.error.isNil()) fut.finished = true fut.value = val - if fut.cb != nil: + if not fut.cb.isNil(): fut.cb() +proc complete*[T](future: FutureStream[T]) = + ## Completes a ``FutureStream`` to signify the end of data. + future.finished = true + if not future.cb.isNil(): + future.cb() + proc fail*[T](future: Future[T], error: ref Exception) = ## Completes ``future`` with ``error``. #assert(not future.finished, "Future already finished, cannot finish twice.") @@ -149,6 +172,20 @@ proc `callback=`*[T](future: Future[T], ## If future has already completed then ``cb`` will be called immediately. future.callback = proc () = cb(future) +proc `callback=`*[T](future: FutureStream[T], + cb: proc (future: FutureStream[T]) {.closure,gcsafe.}) = + ## Sets the callback proc to be called when data was placed inside the + ## future stream. + ## + ## The callback is also called when the future is completed. So you should + ## use ``finished`` to check whether data is available. + ## + ## If the future stream already has data then ``cb`` will be called + ## immediately. + future.cb = proc () = cb(future) + if future.queue.len > 0: + callSoon(future.cb) + proc injectStacktrace[T](future: Future[T]) = # TODO: Come up with something better. when not defined(release): @@ -179,6 +216,42 @@ proc read*[T](future: Future[T] | FutureVar[T]): T = # TODO: Make a custom exception type for this? raise newException(ValueError, "Future still in progress.") +proc take*[T](future: FutureStream[T]): T {.raises: [ValueError].} = + ## Retrieves the oldest value stored inside the stream. If the stream + ## contains no data then this function will fail with a ``ValueError`` + ## exception. + ## + ## This function will remove the data that was returned from the underlying + ## ``FutureStream``. + return future.queue.dequeue() + +proc put*[T](future: FutureStream[T], value: T): T = + ## Writes the specified value inside the specified future stream. + ## + ## This will raise ``ValueError`` if ``future`` is finished. + if future.finished: + let msg = "FutureStream is finished and so no longer accepts new data." + raise newException(ValueError, msg) + future.queue.enqueue(value) + +proc peek*[T](future: FutureStream[T]): T = + ## Returns the oldest value stored inside the specified future stream. + return future.queue.front() + +proc takeAsync*[T](future: FutureStream[T]): Future[T] = + ## Returns a future that will complete when the ``FutureStream`` has data + ## placed into it. The future will be completed with the oldest value stored + ## inside the stream. + ## + ## This function will remove the data that was returned from the underlying + ## ``FutureStream``. + var resFut = newFuture[T]("FutureStream.wait") + let cb = future.cb + future.callback = + proc (fs: FutureStream[T]) = + resFut.complete(fs.take()) + if not cb.isNil: cb() + proc readError*[T](future: Future[T]): ref Exception = ## Retrieves the exception stored in ``future``. ## @@ -195,10 +268,13 @@ proc mget*[T](future: FutureVar[T]): var T = ## Future has not been finished. result = Future[T](future).value -proc finished*[T](future: Future[T] | FutureVar[T]): bool = +proc finished*[T](future: Future[T] | FutureVar[T] | FutureStream[T]): bool = ## Determines whether ``future`` has completed. ## ## ``True`` may indicate an error or a value. Use ``failed`` to distinguish. + ## + ## For a ``FutureStream`` this signifies that no more data will be placed + ## inside it. when future is FutureVar[T]: result = (Future[T](future)).finished else: diff --git a/tests/async/tfuturestream.nim b/tests/async/tfuturestream.nim new file mode 100644 index 000000000..ed5ac5785 --- /dev/null +++ b/tests/async/tfuturestream.nim @@ -0,0 +1,19 @@ +import asyncdispatch + +var fs = newFutureStream[string]() + +proc alpha() {.async.} = + for i in 0 .. 5: + fs.put($i) + await sleepAsync(1000) + + fs.complete() + +proc beta() {.async.} = + while not fs.finished(): + echo(await fs.takeAsync()) + + echo("Finished") + +asyncCheck alpha() +asyncCheck beta() \ No newline at end of file |