diff options
author | Dominik Picheta <dominikpicheta@gmail.com> | 2017-02-11 12:42:30 +0100 |
---|---|---|
committer | Dominik Picheta <dominikpicheta@gmail.com> | 2017-02-11 12:42:30 +0100 |
commit | 77071eb767dabc78ea23c0ea623331acac640694 (patch) | |
tree | 7d01afc662141b31c385f1824299cd80381c5b34 /lib/pure/includes | |
parent | 4a7ea8f8650d7168c7bfaed725125d4a9a3920a0 (diff) | |
download | Nim-77071eb767dabc78ea23c0ea623331acac640694.tar.gz |
FutureStream's cb call behaviour fixed + other fixes.
Diffstat (limited to 'lib/pure/includes')
-rw-r--r-- | lib/pure/includes/asyncfutures.nim | 7 |
1 files changed, 6 insertions, 1 deletions
diff --git a/lib/pure/includes/asyncfutures.nim b/lib/pure/includes/asyncfutures.nim index eb0c6bbf2..6f6693605 100644 --- a/lib/pure/includes/asyncfutures.nim +++ b/lib/pure/includes/asyncfutures.nim @@ -182,7 +182,7 @@ proc `callback=`*[T](future: FutureStream[T], ## If the future stream already has data then ``cb`` will be called ## immediately. future.cb = proc () = cb(future) - if future.queue.len > 0: + if future.queue.len > 0 or future.finished: callSoon(future.cb) proc injectStacktrace[T](future: Future[T]) = @@ -257,6 +257,7 @@ proc put*[T](future: FutureStream[T], value: T): Future[void] = if future.finished: let msg = "FutureStream is finished and so no longer accepts new data." result.fail(newException(ValueError, msg)) + return # TODO: Buffering. future.queue.enqueue(value) if not future.cb.isNil: future.cb() @@ -294,6 +295,10 @@ proc take*[T](future: FutureStream[T]): Future[(bool, T)] = if not savedCb.isNil: savedCb() return resFut +proc len*[T](future: FutureStream[T]): int = + ## Returns the amount of data pieces inside the stream. + future.queue.len + proc asyncCheck*[T](future: Future[T]) = ## Sets a callback on ``future`` which raises an exception if the future ## finished with an error. |