diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/pure/includes/asyncfutures.nim | 7 |
1 files changed, 6 insertions, 1 deletions
diff --git a/lib/pure/includes/asyncfutures.nim b/lib/pure/includes/asyncfutures.nim index eb0c6bbf2..6f6693605 100644 --- a/lib/pure/includes/asyncfutures.nim +++ b/lib/pure/includes/asyncfutures.nim @@ -182,7 +182,7 @@ proc `callback=`*[T](future: FutureStream[T], ## If the future stream already has data then ``cb`` will be called ## immediately. future.cb = proc () = cb(future) - if future.queue.len > 0: + if future.queue.len > 0 or future.finished: callSoon(future.cb) proc injectStacktrace[T](future: Future[T]) = @@ -257,6 +257,7 @@ proc put*[T](future: FutureStream[T], value: T): Future[void] = if future.finished: let msg = "FutureStream is finished and so no longer accepts new data." result.fail(newException(ValueError, msg)) + return # TODO: Buffering. future.queue.enqueue(value) if not future.cb.isNil: future.cb() @@ -294,6 +295,10 @@ proc take*[T](future: FutureStream[T]): Future[(bool, T)] = if not savedCb.isNil: savedCb() return resFut +proc len*[T](future: FutureStream[T]): int = + ## Returns the amount of data pieces inside the stream. + future.queue.len + proc asyncCheck*[T](future: Future[T]) = ## Sets a callback on ``future`` which raises an exception if the future ## finished with an error. |