summary refs log tree commit diff stats
path: root/lib/pure/includes/asyncfutures.nim
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pure/includes/asyncfutures.nim')
-rw-r--r--lib/pure/includes/asyncfutures.nim88
1 files changed, 48 insertions, 40 deletions
diff --git a/lib/pure/includes/asyncfutures.nim b/lib/pure/includes/asyncfutures.nim
index a41f03a00..8d7ace7a1 100644
--- a/lib/pure/includes/asyncfutures.nim
+++ b/lib/pure/includes/asyncfutures.nim
@@ -1,4 +1,3 @@
-import queues
 
 # TODO: This shouldn't need to be included, but should ideally be exported.
 type
@@ -130,9 +129,11 @@ proc complete*[T](future: FutureVar[T], val: T) =
   if not fut.cb.isNil():
     fut.cb()
 
-proc complete*[T](future: FutureStream[T]) =
-  ## Completes a ``FutureStream`` to signify the end of data.
+proc complete*[T](future: FutureStream[T], value: T) =
+  ## Completes a ``FutureStream`` with the last value, signifying the end of
+  ## data.
   future.finished = true
+  future.queue.enqueue(value)
   if not future.cb.isNil():
     future.cb()
 
@@ -216,42 +217,6 @@ proc read*[T](future: Future[T] | FutureVar[T]): T =
     # TODO: Make a custom exception type for this?
     raise newException(ValueError, "Future still in progress.")
 
-proc take*[T](future: FutureStream[T]): T {.raises: [ValueError].} =
-  ## Retrieves the oldest value stored inside the stream. If the stream
-  ## contains no data then this function will fail with a ``ValueError``
-  ## exception.
-  ##
-  ## This function will remove the data that was returned from the underlying
-  ## ``FutureStream``.
-  return future.queue.dequeue()
-
-proc put*[T](future: FutureStream[T], value: T): T =
-  ## Writes the specified value inside the specified future stream.
-  ##
-  ## This will raise ``ValueError`` if ``future`` is finished.
-  if future.finished:
-    let msg = "FutureStream is finished and so no longer accepts new data."
-    raise newException(ValueError, msg)
-  future.queue.enqueue(value)
-
-proc peek*[T](future: FutureStream[T]): T =
-  ## Returns the oldest value stored inside the specified future stream.
-  return future.queue.front()
-
-proc takeAsync*[T](future: FutureStream[T]): Future[T] =
-  ## Returns a future that will complete when the ``FutureStream`` has data
-  ## placed into it. The future will be completed with the oldest value stored
-  ## inside the stream.
-  ##
-  ## This function will remove the data that was returned from the underlying
-  ## ``FutureStream``.
-  var resFut = newFuture[T]("FutureStream.wait")
-  let cb = future.cb
-  future.callback =
-    proc (fs: FutureStream[T]) =
-      resFut.complete(fs.take())
-      if not cb.isNil: cb()
-
 proc readError*[T](future: Future[T]): ref Exception =
   ## Retrieves the exception stored in ``future``.
   ##
@@ -274,9 +239,11 @@ proc finished*[T](future: Future[T] | FutureVar[T] | FutureStream[T]): bool =
   ## ``True`` may indicate an error or a value. Use ``failed`` to distinguish.
   ##
   ## For a ``FutureStream`` this signifies that no more data will be placed
-  ## inside it.
+  ## inside it and that there is no data waiting to be retrieved.
   when future is FutureVar[T]:
     result = (Future[T](future)).finished
+  elif future is FutureStream[T]:
+    result = future.finished and future.queue.len == 0
   else:
     result = future.finished
 
@@ -284,6 +251,47 @@ proc failed*(future: FutureBase): bool =
   ## Determines whether ``future`` completed with an error.
   return future.error != nil
 
+proc take*[T](future: FutureStream[T]): T {.raises: [IndexError].} =
+  ## Retrieves the oldest value stored inside the stream. If the stream
+  ## contains no data then this function will fail with a ``IndexError``
+  ## exception.
+  ##
+  ## This function will remove the data that was returned from the underlying
+  ## ``FutureStream``.
+  return future.queue.dequeue()
+
+proc put*[T](future: FutureStream[T], value: T) =
+  ## Writes the specified value inside the specified future stream.
+  ##
+  ## This will raise ``ValueError`` if ``future`` is finished.
+  if future.finished:
+    let msg = "FutureStream is finished and so no longer accepts new data."
+    raise newException(ValueError, msg)
+  future.queue.enqueue(value)
+  if not future.cb.isNil: future.cb()
+
+proc peek*[T](future: FutureStream[T]): T =
+  ## Returns the oldest value stored inside the specified future stream.
+  return future.queue.front()
+
+proc takeAsync*[T](future: FutureStream[T]): Future[T] =
+  ## Returns a future that will complete when the ``FutureStream`` has data
+  ## placed into it. The future will be completed with the oldest value stored
+  ## inside the stream.
+  ##
+  ## This function will remove the data that was returned from the underlying
+  ## ``FutureStream``.
+  var resFut = newFuture[T]("FutureStream.takeAsync")
+  let cb = future.cb
+  future.callback =
+    proc (fs: FutureStream[T]) =
+      # TODO: When finished(fs) should we "cancel" resFut? This assumes that we
+      # TODO: can `complete` with no value.
+      if not resFut.finished and (not finished(fs)):
+        resFut.complete(fs.take())
+      if not cb.isNil: cb()
+  return resFut
+
 proc asyncCheck*[T](future: Future[T]) =
   ## Sets a callback on ``future`` which raises an exception if the future
   ## finished with an error.