diff options
-rw-r--r-- | lib/pure/asyncstreams.nim | 6 | ||||
-rw-r--r-- | tests/async/tfuturestream.nim | 20 |
2 files changed, 24 insertions, 2 deletions
diff --git a/lib/pure/asyncstreams.nim b/lib/pure/asyncstreams.nim index d3ea143f3..57800ce10 100644 --- a/lib/pure/asyncstreams.nim +++ b/lib/pure/asyncstreams.nim @@ -81,6 +81,9 @@ proc read*[T](future: FutureStream[T]): Future[(bool, T)] = let savedCb = future.cb future.callback = proc (fs: FutureStream[T]) = + # Exit early if `resFut` is already complete. (See #8994). + if resFut.finished: return + # We don't want this callback called again. future.cb = nil @@ -93,8 +96,7 @@ proc read*[T](future: FutureStream[T]): Future[(bool, T)] = res[0] = true res[1] = fs.queue.popFirst() - if not resFut.finished: - resFut.complete(res) + resFut.complete(res) # If the saved callback isn't nil then let's call it. if not savedCb.isNil: savedCb() diff --git a/tests/async/tfuturestream.nim b/tests/async/tfuturestream.nim index d76752b7e..32624b100 100644 --- a/tests/async/tfuturestream.nim +++ b/tests/async/tfuturestream.nim @@ -35,6 +35,26 @@ proc beta() {.async.} = asyncCheck alpha() waitFor beta() +template ensureCallbacksAreScheduled = + # callbacks are called directly if the dispatcher is not running + discard getGlobalDispatcher() + +proc testCompletion() {.async.} = + ensureCallbacksAreScheduled + + var stream = newFutureStream[string]() + + for i in 1..5: + await stream.write($i) + + var readFuture = stream.readAll() + stream.complete() + yield readFuture + let data = readFuture.read() + doAssert(data.len == 5, "actual data len = " & $data.len) + +waitFor testCompletion() + # TODO: Something like this should work eventually. # proc delta(): FutureStream[string] {.async.} = # for i in 0 .. 5: |