diff options
Diffstat (limited to 'lib/pure/includes/asyncfutures.nim')
-rw-r--r-- | lib/pure/includes/asyncfutures.nim | 118 |
1 files changed, 110 insertions, 8 deletions
diff --git a/lib/pure/includes/asyncfutures.nim b/lib/pure/includes/asyncfutures.nim index c83228014..6af5bf3cf 100644 --- a/lib/pure/includes/asyncfutures.nim +++ b/lib/pure/includes/asyncfutures.nim @@ -16,6 +16,12 @@ type FutureVar*[T] = distinct Future[T] + FutureStream*[T] = ref object of FutureBase ## Special future that acts as + ## a queue. Its API is still + ## experimental and so is + ## subject to change. + queue: Deque[T] + FutureError* = object of Exception cause*: FutureBase @@ -26,11 +32,7 @@ when not defined(release): proc callSoon*(cbproc: proc ()) {.gcsafe.} -proc newFuture*[T](fromProc: string = "unspecified"): Future[T] = - ## Creates a new future. - ## - ## Specifying ``fromProc``, which is a string specifying the name of the proc - ## that this future belongs to, is a good habit as it helps with debugging. +template setupFutureBase(fromProc: string) = new(result) result.finished = false when not defined(release): @@ -39,6 +41,13 @@ proc newFuture*[T](fromProc: string = "unspecified"): Future[T] = result.fromProc = fromProc currentID.inc() +proc newFuture*[T](fromProc: string = "unspecified"): Future[T] = + ## Creates a new future. + ## + ## Specifying ``fromProc``, which is a string specifying the name of the proc + ## that this future belongs to, is a good habit as it helps with debugging. + setupFutureBase(fromProc) + proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] = ## Create a new ``FutureVar``. This Future type is ideally suited for ## situations where you want to avoid unnecessary allocations of Futures. @@ -47,6 +56,22 @@ proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] = ## that this future belongs to, is a good habit as it helps with debugging. result = FutureVar[T](newFuture[T](fromProc)) +proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] = + ## Create a new ``FutureStream``. This future's callback is activated when + ## two events occur: + ## + ## * New data is written into the future stream. + ## * The future stream is completed (this means that no more data will be + ## written). + ## + ## Specifying ``fromProc``, which is a string specifying the name of the proc + ## that this future belongs to, is a good habit as it helps with debugging. + ## + ## **Note:** The API of FutureStream is still new and so has a higher + ## likelihood of changing in the future. + setupFutureBase(fromProc) + result.queue = initDeque[T]() + proc clean*[T](future: FutureVar[T]) = ## Resets the ``finished`` status of ``future``. Future[T](future).finished = false @@ -107,12 +132,18 @@ proc complete*[T](future: FutureVar[T], val: T) = ## Any previously stored value will be overwritten. template fut: untyped = Future[T](future) checkFinished(fut) - assert(fut.error == nil) + assert(fut.error.isNil()) fut.finished = true fut.value = val - if fut.cb != nil: + if not fut.cb.isNil(): fut.cb() +proc complete*[T](future: FutureStream[T]) = + ## Completes a ``FutureStream`` signalling the end of data. + future.finished = true + if not future.cb.isNil(): + future.cb() + proc fail*[T](future: Future[T], error: ref Exception) = ## Completes ``future`` with ``error``. #assert(not future.finished, "Future already finished, cannot finish twice.") @@ -149,6 +180,20 @@ proc `callback=`*[T](future: Future[T], ## If future has already completed then ``cb`` will be called immediately. future.callback = proc () = cb(future) +proc `callback=`*[T](future: FutureStream[T], + cb: proc (future: FutureStream[T]) {.closure,gcsafe.}) = + ## Sets the callback proc to be called when data was placed inside the + ## future stream. + ## + ## The callback is also called when the future is completed. So you should + ## use ``finished`` to check whether data is available. + ## + ## If the future stream already has data or is finished then ``cb`` will be + ## called immediately. + future.cb = proc () = cb(future) + if future.queue.len > 0 or future.finished: + callSoon(future.cb) + proc injectStacktrace[T](future: Future[T]) = # TODO: Come up with something better. when not defined(release): @@ -195,12 +240,18 @@ proc mget*[T](future: FutureVar[T]): var T = ## Future has not been finished. result = Future[T](future).value -proc finished*[T](future: Future[T] | FutureVar[T]): bool = +proc finished*[T](future: Future[T] | FutureVar[T] | FutureStream[T]): bool = ## Determines whether ``future`` has completed. ## ## ``True`` may indicate an error or a value. Use ``failed`` to distinguish. + ## + ## For a ``FutureStream`` a ``true`` value means that no more data will be + ## placed inside the stream _and_ that there is no data waiting to be + ## retrieved. when future is FutureVar[T]: result = (Future[T](future)).finished + elif future is FutureStream[T]: + result = future.finished and future.queue.len == 0 else: result = future.finished @@ -208,6 +259,57 @@ proc failed*(future: FutureBase): bool = ## Determines whether ``future`` completed with an error. return future.error != nil +proc write*[T](future: FutureStream[T], value: T): Future[void] = + ## Writes the specified value inside the specified future stream. + ## + ## This will raise ``ValueError`` if ``future`` is finished. + result = newFuture[void]("FutureStream.put") + if future.finished: + let msg = "FutureStream is finished and so no longer accepts new data." + result.fail(newException(ValueError, msg)) + return + # TODO: Implement limiting of the streams storage to prevent it growing + # infinitely when no reads are occuring. + future.queue.addLast(value) + if not future.cb.isNil: future.cb() + result.complete() + +proc read*[T](future: FutureStream[T]): Future[(bool, T)] = + ## Returns a future that will complete when the ``FutureStream`` has data + ## placed into it. The future will be completed with the oldest + ## value stored inside the stream. The return value will also determine + ## whether data was retrieved, ``false`` means that the future stream was + ## completed and no data was retrieved. + ## + ## This function will remove the data that was returned from the underlying + ## ``FutureStream``. + var resFut = newFuture[(bool, T)]("FutureStream.take") + let savedCb = future.cb + future.callback = + proc (fs: FutureStream[T]) = + # We don't want this callback called again. + future.cb = nil + + # The return value depends on whether the FutureStream has finished. + var res: (bool, T) + if finished(fs): + # Remember, this callback is called when the FutureStream is completed. + res[0] = false + else: + res[0] = true + res[1] = fs.queue.popFirst() + + if not resFut.finished: + resFut.complete(res) + + # If the saved callback isn't nil then let's call it. + if not savedCb.isNil: savedCb() + return resFut + +proc len*[T](future: FutureStream[T]): int = + ## Returns the amount of data pieces inside the stream. + future.queue.len + proc asyncCheck*[T](future: Future[T]) = ## Sets a callback on ``future`` which raises an exception if the future ## finished with an error. |