summary refs log tree commit diff stats
path: root/lib/pure/includes
diff options
context:
space:
mode:
authorDominik Picheta <dominikpicheta@gmail.com>2017-02-10 20:40:32 +0100
committerDominik Picheta <dominikpicheta@gmail.com>2017-02-10 20:40:32 +0100
commit2f502e2a9ede75be3f56a0206e1314c758e1ad90 (patch)
tree587378ca25c56289a4a8deadf40b18311b3d7763 /lib/pure/includes
parentddd3d3f44a7bd83d97e17b46bc7fd6b92043520f (diff)
downloadNim-2f502e2a9ede75be3f56a0206e1314c758e1ad90.tar.gz
Remove immediate FutureStream procs and make 'put' awaitable.
Diffstat (limited to 'lib/pure/includes')
-rw-r--r--lib/pure/includes/asyncfutures.nim26
1 files changed, 8 insertions, 18 deletions
diff --git a/lib/pure/includes/asyncfutures.nim b/lib/pure/includes/asyncfutures.nim
index 13d927e92..eb0c6bbf2 100644
--- a/lib/pure/includes/asyncfutures.nim
+++ b/lib/pure/includes/asyncfutures.nim
@@ -249,30 +249,20 @@ proc failed*(future: FutureBase): bool =
   ## Determines whether ``future`` completed with an error.
   return future.error != nil
 
-proc take*[T](future: FutureStream[T]): T {.raises: [IndexError].} =
-  ## Retrieves the oldest value stored inside the stream. If the stream
-  ## contains no data then this function will fail with a ``IndexError``
-  ## exception.
-  ##
-  ## This function will remove the data that was returned from the underlying
-  ## ``FutureStream``.
-  return future.queue.dequeue()
-
-proc put*[T](future: FutureStream[T], value: T) =
+proc put*[T](future: FutureStream[T], value: T): Future[void] =
   ## Writes the specified value inside the specified future stream.
   ##
   ## This will raise ``ValueError`` if ``future`` is finished.
+  result = newFuture[void]("FutureStream.put")
   if future.finished:
     let msg = "FutureStream is finished and so no longer accepts new data."
-    raise newException(ValueError, msg)
+    result.fail(newException(ValueError, msg))
+  # TODO: Buffering.
   future.queue.enqueue(value)
   if not future.cb.isNil: future.cb()
+  result.complete()
 
-proc peek*[T](future: FutureStream[T]): T =
-  ## Returns the oldest value stored inside the specified future stream.
-  return future.queue.front()
-
-proc takeAsync*[T](future: FutureStream[T]): Future[(bool, T)] =
+proc take*[T](future: FutureStream[T]): Future[(bool, T)] =
   ## Returns a future that will complete when the ``FutureStream`` has data
   ## placed into it. The future will be completed with the oldest
   ## value stored inside the stream. The return value will also determine
@@ -281,7 +271,7 @@ proc takeAsync*[T](future: FutureStream[T]): Future[(bool, T)] =
   ##
   ## This function will remove the data that was returned from the underlying
   ## ``FutureStream``.
-  var resFut = newFuture[(bool, T)]("FutureStream.takeAsync")
+  var resFut = newFuture[(bool, T)]("FutureStream.take")
   let savedCb = future.cb
   future.callback =
     proc (fs: FutureStream[T]) =
@@ -295,7 +285,7 @@ proc takeAsync*[T](future: FutureStream[T]): Future[(bool, T)] =
         res[0] = false
       else:
         res[0] = true
-        res[1] = fs.take()
+        res[1] = fs.queue.dequeue()
 
       if not resFut.finished:
         resFut.complete(res)