summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--lib/pure/asyncstreams.nim6
-rw-r--r--tests/async/tfuturestream.nim20
2 files changed, 24 insertions, 2 deletions
diff --git a/lib/pure/asyncstreams.nim b/lib/pure/asyncstreams.nim
index d3ea143f3..57800ce10 100644
--- a/lib/pure/asyncstreams.nim
+++ b/lib/pure/asyncstreams.nim
@@ -81,6 +81,9 @@ proc read*[T](future: FutureStream[T]): Future[(bool, T)] =
   let savedCb = future.cb
   future.callback =
     proc (fs: FutureStream[T]) =
+      # Exit early if `resFut` is already complete. (See #8994).
+      if resFut.finished: return
+
       # We don't want this callback called again.
       future.cb = nil
 
@@ -93,8 +96,7 @@ proc read*[T](future: FutureStream[T]): Future[(bool, T)] =
         res[0] = true
         res[1] = fs.queue.popFirst()
 
-      if not resFut.finished:
-        resFut.complete(res)
+      resFut.complete(res)
 
       # If the saved callback isn't nil then let's call it.
       if not savedCb.isNil: savedCb()
diff --git a/tests/async/tfuturestream.nim b/tests/async/tfuturestream.nim
index d76752b7e..32624b100 100644
--- a/tests/async/tfuturestream.nim
+++ b/tests/async/tfuturestream.nim
@@ -35,6 +35,26 @@ proc beta() {.async.} =
 asyncCheck alpha()
 waitFor beta()
 
+template ensureCallbacksAreScheduled =
+  # callbacks are called directly if the dispatcher is not running
+  discard getGlobalDispatcher()
+
+proc testCompletion() {.async.} =
+  ensureCallbacksAreScheduled
+
+  var stream = newFutureStream[string]()
+
+  for i in 1..5:
+    await stream.write($i)
+
+  var readFuture = stream.readAll()
+  stream.complete()
+  yield readFuture
+  let data = readFuture.read()
+  doAssert(data.len == 5, "actual data len = " & $data.len)
+
+waitFor testCompletion()
+
 # TODO: Something like this should work eventually.
 # proc delta(): FutureStream[string] {.async.} =
 #   for i in 0 .. 5: