summary refs log tree commit diff stats
path: root/lib/pure/asyncstreams.nim
diff options
context:
space:
mode:
authorYuriy Glukhov <yglukhov@users.noreply.github.com>2021-01-14 09:53:21 +0200
committerGitHub <noreply@github.com>2021-01-14 08:53:21 +0100
commit4ae520711d72fa903cccf3e59754fd64f632d992 (patch)
treec3b607e6d13419b0ef8cb9a4ec36ec581fe23322 /lib/pure/asyncstreams.nim
parent7f67c593c11ea4d2c5228eb983c8f1e2cafee0e4 (diff)
downloadNim-4ae520711d72fa903cccf3e59754fd64f632d992.tar.gz
Fixes #16436 (#16695)
* Fixes #16436

* Comments addressed
Diffstat (limited to 'lib/pure/asyncstreams.nim')
-rw-r--r--lib/pure/asyncstreams.nim19
1 files changed, 18 insertions, 1 deletions
diff --git a/lib/pure/asyncstreams.nim b/lib/pure/asyncstreams.nim
index 7ffde9c10..6bc103f05 100644
--- a/lib/pure/asyncstreams.nim
+++ b/lib/pure/asyncstreams.nim
@@ -21,6 +21,7 @@ type
     queue: Deque[T]
     finished: bool
     cb: proc () {.closure, gcsafe.}
+    error*: ref Exception
 
 proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] =
   ## Create a new ``FutureStream``. This future's callback is activated when
@@ -40,10 +41,19 @@ proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] =
 
 proc complete*[T](future: FutureStream[T]) =
   ## Completes a ``FutureStream`` signalling the end of data.
+  assert(future.error == nil, "Trying to complete failed stream")
   future.finished = true
   if not future.cb.isNil:
     future.cb()
 
+proc fail*[T](future: FutureStream[T], error: ref Exception) =
+  ## Completes ``future`` with ``error``.
+  assert(not future.finished)
+  future.finished = true
+  future.error = error
+  if not future.cb.isNil:
+    future.cb()
+
 proc `callback=`*[T](future: FutureStream[T],
     cb: proc (future: FutureStream[T]) {.closure, gcsafe.}) =
   ## Sets the callback proc to be called when data was placed inside the
@@ -65,6 +75,10 @@ proc finished*[T](future: FutureStream[T]): bool =
   ## no data waiting to be retrieved.
   result = future.finished and future.queue.len == 0
 
+proc failed*[T](future: FutureStream[T]): bool =
+  ## Determines whether ``future`` completed with an error.
+  return future.error != nil
+
 proc write*[T](future: FutureStream[T], value: T): Future[void] =
   ## Writes the specified value inside the specified future stream.
   ##
@@ -107,7 +121,10 @@ proc read*[T](future: FutureStream[T]): owned(Future[(bool, T)]) =
       res[0] = true
       res[1] = fs.queue.popFirst()
 
-    resFut.complete(res)
+    if fs.failed:
+      resFut.fail(fs.error)
+    else:
+      resFut.complete(res)
 
     # If the saved callback isn't nil then let's call it.
     if not savedCb.isNil: savedCb()