summary refs log tree commit diff stats
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/pure/includes/asyncfutures.nim26
1 files changed, 8 insertions, 18 deletions
diff --git a/lib/pure/includes/asyncfutures.nim b/lib/pure/includes/asyncfutures.nim
index 13d927e92..eb0c6bbf2 100644
--- a/lib/pure/includes/asyncfutures.nim
+++ b/lib/pure/includes/asyncfutures.nim
@@ -249,30 +249,20 @@ proc failed*(future: FutureBase): bool =
   ## Determines whether ``future`` completed with an error.
   return future.error != nil
 
-proc take*[T](future: FutureStream[T]): T {.raises: [IndexError].} =
-  ## Retrieves the oldest value stored inside the stream. If the stream
-  ## contains no data then this function will fail with a ``IndexError``
-  ## exception.
-  ##
-  ## This function will remove the data that was returned from the underlying
-  ## ``FutureStream``.
-  return future.queue.dequeue()
-
-proc put*[T](future: FutureStream[T], value: T) =
+proc put*[T](future: FutureStream[T], value: T): Future[void] =
   ## Writes the specified value inside the specified future stream.
   ##
   ## This will raise ``ValueError`` if ``future`` is finished.
+  result = newFuture[void]("FutureStream.put")
   if future.finished:
     let msg = "FutureStream is finished and so no longer accepts new data."
-    raise newException(ValueError, msg)
+    result.fail(newException(ValueError, msg))
+  # TODO: Buffering.
   future.queue.enqueue(value)
   if not future.cb.isNil: future.cb()
+  result.complete()
 
-proc peek*[T](future: FutureStream[T]): T =
-  ## Returns the oldest value stored inside the specified future stream.
-  return future.queue.front()
-
-proc takeAsync*[T](future: FutureStream[T]): Future[(bool, T)] =
+proc take*[T](future: FutureStream[T]): Future[(bool, T)] =
   ## Returns a future that will complete when the ``FutureStream`` has data
   ## placed into it. The future will be completed with the oldest
   ## value stored inside the stream. The return value will also determine
@@ -281,7 +271,7 @@ proc takeAsync*[T](future: FutureStream[T]): Future[(bool, T)] =
   ##
   ## This function will remove the data that was returned from the underlying
   ## ``FutureStream``.
-  var resFut = newFuture[(bool, T)]("FutureStream.takeAsync")
+  var resFut = newFuture[(bool, T)]("FutureStream.take")
   let savedCb = future.cb
   future.callback =
     proc (fs: FutureStream[T]) =
@@ -295,7 +285,7 @@ proc takeAsync*[T](future: FutureStream[T]): Future[(bool, T)] =
         res[0] = false
       else:
         res[0] = true
-        res[1] = fs.take()
+        res[1] = fs.queue.dequeue()
 
       if not resFut.finished:
         resFut.complete(res)