diff options
author | Yuriy Glukhov <yglukhov@users.noreply.github.com> | 2021-01-14 09:53:21 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-14 08:53:21 +0100 |
commit | 4ae520711d72fa903cccf3e59754fd64f632d992 (patch) | |
tree | c3b607e6d13419b0ef8cb9a4ec36ec581fe23322 /lib/pure/asyncstreams.nim | |
parent | 7f67c593c11ea4d2c5228eb983c8f1e2cafee0e4 (diff) | |
download | Nim-4ae520711d72fa903cccf3e59754fd64f632d992.tar.gz |
Fixes #16436 (#16695)
* Fixes #16436 * Comments addressed
Diffstat (limited to 'lib/pure/asyncstreams.nim')
-rw-r--r-- | lib/pure/asyncstreams.nim | 19 |
1 files changed, 18 insertions, 1 deletions
diff --git a/lib/pure/asyncstreams.nim b/lib/pure/asyncstreams.nim index 7ffde9c10..6bc103f05 100644 --- a/lib/pure/asyncstreams.nim +++ b/lib/pure/asyncstreams.nim @@ -21,6 +21,7 @@ type queue: Deque[T] finished: bool cb: proc () {.closure, gcsafe.} + error*: ref Exception proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] = ## Create a new ``FutureStream``. This future's callback is activated when @@ -40,10 +41,19 @@ proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] = proc complete*[T](future: FutureStream[T]) = ## Completes a ``FutureStream`` signalling the end of data. + assert(future.error == nil, "Trying to complete failed stream") future.finished = true if not future.cb.isNil: future.cb() +proc fail*[T](future: FutureStream[T], error: ref Exception) = + ## Completes ``future`` with ``error``. + assert(not future.finished) + future.finished = true + future.error = error + if not future.cb.isNil: + future.cb() + proc `callback=`*[T](future: FutureStream[T], cb: proc (future: FutureStream[T]) {.closure, gcsafe.}) = ## Sets the callback proc to be called when data was placed inside the @@ -65,6 +75,10 @@ proc finished*[T](future: FutureStream[T]): bool = ## no data waiting to be retrieved. result = future.finished and future.queue.len == 0 +proc failed*[T](future: FutureStream[T]): bool = + ## Determines whether ``future`` completed with an error. + return future.error != nil + proc write*[T](future: FutureStream[T], value: T): Future[void] = ## Writes the specified value inside the specified future stream. ## @@ -107,7 +121,10 @@ proc read*[T](future: FutureStream[T]): owned(Future[(bool, T)]) = res[0] = true res[1] = fs.queue.popFirst() - resFut.complete(res) + if fs.failed: + resFut.fail(fs.error) + else: + resFut.complete(res) # If the saved callback isn't nil then let's call it. if not savedCb.isNil: savedCb() |