diff options
author | Dominik Picheta <dominikpicheta@gmail.com> | 2017-02-10 20:18:59 +0100 |
---|---|---|
committer | Dominik Picheta <dominikpicheta@gmail.com> | 2017-02-10 20:18:59 +0100 |
commit | ddd3d3f44a7bd83d97e17b46bc7fd6b92043520f (patch) | |
tree | 2be75d84f608cd7ac2b087c8447247f263b28671 /lib | |
parent | d87fb236d101b9c1bc14f18fe17798cc214a9620 (diff) | |
download | Nim-ddd3d3f44a7bd83d97e17b46bc7fd6b92043520f.tar.gz |
Improve implementation of takeAsync for FutureStreams.
Diffstat (limited to 'lib')
-rw-r--r-- | lib/pure/includes/asyncfutures.nim | 40 |
1 files changed, 26 insertions, 14 deletions
diff --git a/lib/pure/includes/asyncfutures.nim b/lib/pure/includes/asyncfutures.nim index 8d7ace7a1..13d927e92 100644 --- a/lib/pure/includes/asyncfutures.nim +++ b/lib/pure/includes/asyncfutures.nim @@ -129,11 +129,9 @@ proc complete*[T](future: FutureVar[T], val: T) = if not fut.cb.isNil(): fut.cb() -proc complete*[T](future: FutureStream[T], value: T) = - ## Completes a ``FutureStream`` with the last value, signifying the end of - ## data. +proc complete*[T](future: FutureStream[T]) = + ## Completes a ``FutureStream`` signifying the end of data. future.finished = true - future.queue.enqueue(value) if not future.cb.isNil(): future.cb() @@ -274,22 +272,36 @@ proc peek*[T](future: FutureStream[T]): T = ## Returns the oldest value stored inside the specified future stream. return future.queue.front() -proc takeAsync*[T](future: FutureStream[T]): Future[T] = +proc takeAsync*[T](future: FutureStream[T]): Future[(bool, T)] = ## Returns a future that will complete when the ``FutureStream`` has data - ## placed into it. The future will be completed with the oldest value stored - ## inside the stream. + ## placed into it. The future will be completed with the oldest + ## value stored inside the stream. The return value will also determine + ## whether data was retrieved, ``false`` means that the future stream was + ## completed and no data was retrieved. ## ## This function will remove the data that was returned from the underlying ## ``FutureStream``. - var resFut = newFuture[T]("FutureStream.takeAsync") - let cb = future.cb + var resFut = newFuture[(bool, T)]("FutureStream.takeAsync") + let savedCb = future.cb future.callback = proc (fs: FutureStream[T]) = - # TODO: When finished(fs) should we "cancel" resFut? This assumes that we - # TODO: can `complete` with no value. - if not resFut.finished and (not finished(fs)): - resFut.complete(fs.take()) - if not cb.isNil: cb() + # We don't want this callback called again. + future.cb = nil + + # The return value depends on whether the FutureStream has finished. + var res: (bool, T) + if finished(fs): + # Remember, this callback is called when the FutureStream is completed. + res[0] = false + else: + res[0] = true + res[1] = fs.take() + + if not resFut.finished: + resFut.complete(res) + + # If the saved callback isn't nil then let's call it. + if not savedCb.isNil: savedCb() return resFut proc asyncCheck*[T](future: Future[T]) = |