diff options
author | Dominik Picheta <dominikpicheta@googlemail.com> | 2018-10-09 18:39:12 +0100 |
---|---|---|
committer | Andreas Rumpf <rumpf_a@web.de> | 2018-10-09 19:39:12 +0200 |
commit | 21ecf64d243a93fab29aed6d3e439918d72c6e16 (patch) | |
tree | 2c4afca01f2d42348fa0c07502fe2867112f85e8 | |
parent | 5076fda2e25a7f14dac130b591de6cc1eebfcc06 (diff) | |
download | Nim-21ecf64d243a93fab29aed6d3e439918d72c6e16.tar.gz |
Fixes #8994. FutureStream read procedure data loss no longer occurs. (#9183)
* Fixes #8994. FutureStream read procedure data loss no longer occurs. * Optimises the fix for #8994.
-rw-r--r-- | lib/pure/asyncstreams.nim | 6 | ||||
-rw-r--r-- | tests/async/tfuturestream.nim | 20 |
2 files changed, 24 insertions, 2 deletions
diff --git a/lib/pure/asyncstreams.nim b/lib/pure/asyncstreams.nim index d3ea143f3..57800ce10 100644 --- a/lib/pure/asyncstreams.nim +++ b/lib/pure/asyncstreams.nim @@ -81,6 +81,9 @@ proc read*[T](future: FutureStream[T]): Future[(bool, T)] = let savedCb = future.cb future.callback = proc (fs: FutureStream[T]) = + # Exit early if `resFut` is already complete. (See #8994). + if resFut.finished: return + # We don't want this callback called again. future.cb = nil @@ -93,8 +96,7 @@ proc read*[T](future: FutureStream[T]): Future[(bool, T)] = res[0] = true res[1] = fs.queue.popFirst() - if not resFut.finished: - resFut.complete(res) + resFut.complete(res) # If the saved callback isn't nil then let's call it. if not savedCb.isNil: savedCb() diff --git a/tests/async/tfuturestream.nim b/tests/async/tfuturestream.nim index d76752b7e..32624b100 100644 --- a/tests/async/tfuturestream.nim +++ b/tests/async/tfuturestream.nim @@ -35,6 +35,26 @@ proc beta() {.async.} = asyncCheck alpha() waitFor beta() +template ensureCallbacksAreScheduled = + # callbacks are called directly if the dispatcher is not running + discard getGlobalDispatcher() + +proc testCompletion() {.async.} = + ensureCallbacksAreScheduled + + var stream = newFutureStream[string]() + + for i in 1..5: + await stream.write($i) + + var readFuture = stream.readAll() + stream.complete() + yield readFuture + let data = readFuture.read() + doAssert(data.len == 5, "actual data len = " & $data.len) + +waitFor testCompletion() + # TODO: Something like this should work eventually. # proc delta(): FutureStream[string] {.async.} = # for i in 0 .. 5: |