diff options
author | Dominik Picheta <dominikpicheta@gmail.com> | 2017-02-10 20:40:32 +0100 |
---|---|---|
committer | Dominik Picheta <dominikpicheta@gmail.com> | 2017-02-10 20:40:32 +0100 |
commit | 2f502e2a9ede75be3f56a0206e1314c758e1ad90 (patch) | |
tree | 587378ca25c56289a4a8deadf40b18311b3d7763 /lib/pure/includes | |
parent | ddd3d3f44a7bd83d97e17b46bc7fd6b92043520f (diff) | |
download | Nim-2f502e2a9ede75be3f56a0206e1314c758e1ad90.tar.gz |
Remove immediate FutureStream procs and make 'put' awaitable.
Diffstat (limited to 'lib/pure/includes')
-rw-r--r-- | lib/pure/includes/asyncfutures.nim | 26 |
1 files changed, 8 insertions, 18 deletions
diff --git a/lib/pure/includes/asyncfutures.nim b/lib/pure/includes/asyncfutures.nim index 13d927e92..eb0c6bbf2 100644 --- a/lib/pure/includes/asyncfutures.nim +++ b/lib/pure/includes/asyncfutures.nim @@ -249,30 +249,20 @@ proc failed*(future: FutureBase): bool = ## Determines whether ``future`` completed with an error. return future.error != nil -proc take*[T](future: FutureStream[T]): T {.raises: [IndexError].} = - ## Retrieves the oldest value stored inside the stream. If the stream - ## contains no data then this function will fail with a ``IndexError`` - ## exception. - ## - ## This function will remove the data that was returned from the underlying - ## ``FutureStream``. - return future.queue.dequeue() - -proc put*[T](future: FutureStream[T], value: T) = +proc put*[T](future: FutureStream[T], value: T): Future[void] = ## Writes the specified value inside the specified future stream. ## ## This will raise ``ValueError`` if ``future`` is finished. + result = newFuture[void]("FutureStream.put") if future.finished: let msg = "FutureStream is finished and so no longer accepts new data." - raise newException(ValueError, msg) + result.fail(newException(ValueError, msg)) + # TODO: Buffering. future.queue.enqueue(value) if not future.cb.isNil: future.cb() + result.complete() -proc peek*[T](future: FutureStream[T]): T = - ## Returns the oldest value stored inside the specified future stream. - return future.queue.front() - -proc takeAsync*[T](future: FutureStream[T]): Future[(bool, T)] = +proc take*[T](future: FutureStream[T]): Future[(bool, T)] = ## Returns a future that will complete when the ``FutureStream`` has data ## placed into it. The future will be completed with the oldest ## value stored inside the stream. The return value will also determine @@ -281,7 +271,7 @@ proc takeAsync*[T](future: FutureStream[T]): Future[(bool, T)] = ## ## This function will remove the data that was returned from the underlying ## ``FutureStream``. - var resFut = newFuture[(bool, T)]("FutureStream.takeAsync") + var resFut = newFuture[(bool, T)]("FutureStream.take") let savedCb = future.cb future.callback = proc (fs: FutureStream[T]) = @@ -295,7 +285,7 @@ proc takeAsync*[T](future: FutureStream[T]): Future[(bool, T)] = res[0] = false else: res[0] = true - res[1] = fs.take() + res[1] = fs.queue.dequeue() if not resFut.finished: resFut.complete(res) |