diff options
author | Dominik Picheta <dominikpicheta@gmail.com> | 2017-02-10 20:18:59 +0100 |
---|---|---|
committer | Dominik Picheta <dominikpicheta@gmail.com> | 2017-02-10 20:18:59 +0100 |
commit | ddd3d3f44a7bd83d97e17b46bc7fd6b92043520f (patch) | |
tree | 2be75d84f608cd7ac2b087c8447247f263b28671 | |
parent | d87fb236d101b9c1bc14f18fe17798cc214a9620 (diff) | |
download | Nim-ddd3d3f44a7bd83d97e17b46bc7fd6b92043520f.tar.gz |
Improve implementation of takeAsync for FutureStreams.
-rw-r--r-- | lib/pure/includes/asyncfutures.nim | 40 | ||||
-rw-r--r-- | tests/async/tfuturestream.nim | 26 |
2 files changed, 48 insertions, 18 deletions
diff --git a/lib/pure/includes/asyncfutures.nim b/lib/pure/includes/asyncfutures.nim index 8d7ace7a1..13d927e92 100644 --- a/lib/pure/includes/asyncfutures.nim +++ b/lib/pure/includes/asyncfutures.nim @@ -129,11 +129,9 @@ proc complete*[T](future: FutureVar[T], val: T) = if not fut.cb.isNil(): fut.cb() -proc complete*[T](future: FutureStream[T], value: T) = - ## Completes a ``FutureStream`` with the last value, signifying the end of - ## data. +proc complete*[T](future: FutureStream[T]) = + ## Completes a ``FutureStream`` signifying the end of data. future.finished = true - future.queue.enqueue(value) if not future.cb.isNil(): future.cb() @@ -274,22 +272,36 @@ proc peek*[T](future: FutureStream[T]): T = ## Returns the oldest value stored inside the specified future stream. return future.queue.front() -proc takeAsync*[T](future: FutureStream[T]): Future[T] = +proc takeAsync*[T](future: FutureStream[T]): Future[(bool, T)] = ## Returns a future that will complete when the ``FutureStream`` has data - ## placed into it. The future will be completed with the oldest value stored - ## inside the stream. + ## placed into it. The future will be completed with the oldest + ## value stored inside the stream. The return value will also determine + ## whether data was retrieved, ``false`` means that the future stream was + ## completed and no data was retrieved. ## ## This function will remove the data that was returned from the underlying ## ``FutureStream``. - var resFut = newFuture[T]("FutureStream.takeAsync") - let cb = future.cb + var resFut = newFuture[(bool, T)]("FutureStream.takeAsync") + let savedCb = future.cb future.callback = proc (fs: FutureStream[T]) = - # TODO: When finished(fs) should we "cancel" resFut? This assumes that we - # TODO: can `complete` with no value. - if not resFut.finished and (not finished(fs)): - resFut.complete(fs.take()) - if not cb.isNil: cb() + # We don't want this callback called again. + future.cb = nil + + # The return value depends on whether the FutureStream has finished. + var res: (bool, T) + if finished(fs): + # Remember, this callback is called when the FutureStream is completed. + res[0] = false + else: + res[0] = true + res[1] = fs.take() + + if not resFut.finished: + resFut.complete(res) + + # If the saved callback isn't nil then let's call it. + if not savedCb.isNil: savedCb() return resFut proc asyncCheck*[T](future: Future[T]) = diff --git a/tests/async/tfuturestream.nim b/tests/async/tfuturestream.nim index e3480126f..61b3863ac 100644 --- a/tests/async/tfuturestream.nim +++ b/tests/async/tfuturestream.nim @@ -14,21 +14,39 @@ Finished """ import asyncdispatch -var fs = newFutureStream[string]() +var fs = newFutureStream[int]() proc alpha() {.async.} = for i in 0 .. 5: await sleepAsync(1000) - fs.put($i) + fs.put(i) - fs.complete("Done") + fs.complete() proc beta() {.async.} = while not fs.finished: - echo(await fs.takeAsync()) + let (hasValue, value) = await fs.takeAsync() + if hasValue: + echo(value) echo("Finished") asyncCheck alpha() waitFor beta() +# TODO: Something like this should work eventually. +# proc delta(): FutureStream[string] {.async.} = +# for i in 0 .. 5: +# await sleepAsync(1000) +# result.put($i) + +# return "" + +# proc omega() {.async.} = +# let fut = delta() +# while not fut.finished(): +# echo(await fs.takeAsync()) + +# echo("Finished") + +# waitFor omega() \ No newline at end of file |