summary refs log tree commit diff stats
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/pure/includes/asyncfutures.nim7
1 files changed, 6 insertions, 1 deletions
diff --git a/lib/pure/includes/asyncfutures.nim b/lib/pure/includes/asyncfutures.nim
index eb0c6bbf2..6f6693605 100644
--- a/lib/pure/includes/asyncfutures.nim
+++ b/lib/pure/includes/asyncfutures.nim
@@ -182,7 +182,7 @@ proc `callback=`*[T](future: FutureStream[T],
   ## If the future stream already has data then ``cb`` will be called
   ## immediately.
   future.cb = proc () = cb(future)
-  if future.queue.len > 0:
+  if future.queue.len > 0 or future.finished:
     callSoon(future.cb)
 
 proc injectStacktrace[T](future: Future[T]) =
@@ -257,6 +257,7 @@ proc put*[T](future: FutureStream[T], value: T): Future[void] =
   if future.finished:
     let msg = "FutureStream is finished and so no longer accepts new data."
     result.fail(newException(ValueError, msg))
+    return
   # TODO: Buffering.
   future.queue.enqueue(value)
   if not future.cb.isNil: future.cb()
@@ -294,6 +295,10 @@ proc take*[T](future: FutureStream[T]): Future[(bool, T)] =
       if not savedCb.isNil: savedCb()
   return resFut
 
+proc len*[T](future: FutureStream[T]): int =
+  ## Returns the amount of data pieces inside the stream.
+  future.queue.len
+
 proc asyncCheck*[T](future: Future[T]) =
   ## Sets a callback on ``future`` which raises an exception if the future
   ## finished with an error.