diff options
author | Dominik Picheta <dominikpicheta@gmail.com> | 2017-02-10 00:06:18 +0100 |
---|---|---|
committer | Dominik Picheta <dominikpicheta@gmail.com> | 2017-02-10 00:06:18 +0100 |
commit | 7766fdfec1993129da4a84a93c1c09aadfc9a6d6 (patch) | |
tree | 34d78361add75a0cca0a9e1ffe4e776cd042d3b3 /lib/pure | |
parent | c4d5cc652f9207d3c30f0746babaa5186e845ebb (diff) | |
download | Nim-7766fdfec1993129da4a84a93c1c09aadfc9a6d6.tar.gz |
Implemented a first working version of FutureStreams.
Diffstat (limited to 'lib/pure')
-rw-r--r-- | lib/pure/asyncdispatch.nim | 2 | ||||
-rw-r--r-- | lib/pure/includes/asyncfutures.nim | 88 |
2 files changed, 49 insertions, 41 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index d97214d15..58113ae69 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -9,7 +9,7 @@ include "system/inclrtl" -import os, oids, tables, strutils, times, heapqueue +import os, oids, tables, strutils, times, heapqueue, queues import nativesockets, net, deques diff --git a/lib/pure/includes/asyncfutures.nim b/lib/pure/includes/asyncfutures.nim index a41f03a00..8d7ace7a1 100644 --- a/lib/pure/includes/asyncfutures.nim +++ b/lib/pure/includes/asyncfutures.nim @@ -1,4 +1,3 @@ -import queues # TODO: This shouldn't need to be included, but should ideally be exported. type @@ -130,9 +129,11 @@ proc complete*[T](future: FutureVar[T], val: T) = if not fut.cb.isNil(): fut.cb() -proc complete*[T](future: FutureStream[T]) = - ## Completes a ``FutureStream`` to signify the end of data. +proc complete*[T](future: FutureStream[T], value: T) = + ## Completes a ``FutureStream`` with the last value, signifying the end of + ## data. future.finished = true + future.queue.enqueue(value) if not future.cb.isNil(): future.cb() @@ -216,42 +217,6 @@ proc read*[T](future: Future[T] | FutureVar[T]): T = # TODO: Make a custom exception type for this? raise newException(ValueError, "Future still in progress.") -proc take*[T](future: FutureStream[T]): T {.raises: [ValueError].} = - ## Retrieves the oldest value stored inside the stream. If the stream - ## contains no data then this function will fail with a ``ValueError`` - ## exception. - ## - ## This function will remove the data that was returned from the underlying - ## ``FutureStream``. - return future.queue.dequeue() - -proc put*[T](future: FutureStream[T], value: T): T = - ## Writes the specified value inside the specified future stream. - ## - ## This will raise ``ValueError`` if ``future`` is finished. - if future.finished: - let msg = "FutureStream is finished and so no longer accepts new data." - raise newException(ValueError, msg) - future.queue.enqueue(value) - -proc peek*[T](future: FutureStream[T]): T = - ## Returns the oldest value stored inside the specified future stream. - return future.queue.front() - -proc takeAsync*[T](future: FutureStream[T]): Future[T] = - ## Returns a future that will complete when the ``FutureStream`` has data - ## placed into it. The future will be completed with the oldest value stored - ## inside the stream. - ## - ## This function will remove the data that was returned from the underlying - ## ``FutureStream``. - var resFut = newFuture[T]("FutureStream.wait") - let cb = future.cb - future.callback = - proc (fs: FutureStream[T]) = - resFut.complete(fs.take()) - if not cb.isNil: cb() - proc readError*[T](future: Future[T]): ref Exception = ## Retrieves the exception stored in ``future``. ## @@ -274,9 +239,11 @@ proc finished*[T](future: Future[T] | FutureVar[T] | FutureStream[T]): bool = ## ``True`` may indicate an error or a value. Use ``failed`` to distinguish. ## ## For a ``FutureStream`` this signifies that no more data will be placed - ## inside it. + ## inside it and that there is no data waiting to be retrieved. when future is FutureVar[T]: result = (Future[T](future)).finished + elif future is FutureStream[T]: + result = future.finished and future.queue.len == 0 else: result = future.finished @@ -284,6 +251,47 @@ proc failed*(future: FutureBase): bool = ## Determines whether ``future`` completed with an error. return future.error != nil +proc take*[T](future: FutureStream[T]): T {.raises: [IndexError].} = + ## Retrieves the oldest value stored inside the stream. If the stream + ## contains no data then this function will fail with a ``IndexError`` + ## exception. + ## + ## This function will remove the data that was returned from the underlying + ## ``FutureStream``. + return future.queue.dequeue() + +proc put*[T](future: FutureStream[T], value: T) = + ## Writes the specified value inside the specified future stream. + ## + ## This will raise ``ValueError`` if ``future`` is finished. + if future.finished: + let msg = "FutureStream is finished and so no longer accepts new data." + raise newException(ValueError, msg) + future.queue.enqueue(value) + if not future.cb.isNil: future.cb() + +proc peek*[T](future: FutureStream[T]): T = + ## Returns the oldest value stored inside the specified future stream. + return future.queue.front() + +proc takeAsync*[T](future: FutureStream[T]): Future[T] = + ## Returns a future that will complete when the ``FutureStream`` has data + ## placed into it. The future will be completed with the oldest value stored + ## inside the stream. + ## + ## This function will remove the data that was returned from the underlying + ## ``FutureStream``. + var resFut = newFuture[T]("FutureStream.takeAsync") + let cb = future.cb + future.callback = + proc (fs: FutureStream[T]) = + # TODO: When finished(fs) should we "cancel" resFut? This assumes that we + # TODO: can `complete` with no value. + if not resFut.finished and (not finished(fs)): + resFut.complete(fs.take()) + if not cb.isNil: cb() + return resFut + proc asyncCheck*[T](future: Future[T]) = ## Sets a callback on ``future`` which raises an exception if the future ## finished with an error. |