summary refs log tree commit diff stats
diff options
context:
space:
mode:
authorDominik Picheta <dominikpicheta@gmail.com>2017-02-10 20:40:32 +0100
committerDominik Picheta <dominikpicheta@gmail.com>2017-02-10 20:40:32 +0100
commit2f502e2a9ede75be3f56a0206e1314c758e1ad90 (patch)
tree587378ca25c56289a4a8deadf40b18311b3d7763
parentddd3d3f44a7bd83d97e17b46bc7fd6b92043520f (diff)
downloadNim-2f502e2a9ede75be3f56a0206e1314c758e1ad90.tar.gz
Remove immediate FutureStream procs and make 'put' awaitable.
-rw-r--r--lib/pure/includes/asyncfutures.nim26
-rw-r--r--tests/async/tfuturestream.nim4
2 files changed, 10 insertions, 20 deletions
diff --git a/lib/pure/includes/asyncfutures.nim b/lib/pure/includes/asyncfutures.nim
index 13d927e92..eb0c6bbf2 100644
--- a/lib/pure/includes/asyncfutures.nim
+++ b/lib/pure/includes/asyncfutures.nim
@@ -249,30 +249,20 @@ proc failed*(future: FutureBase): bool =
   ## Determines whether ``future`` completed with an error.
   return future.error != nil
 
-proc take*[T](future: FutureStream[T]): T {.raises: [IndexError].} =
-  ## Retrieves the oldest value stored inside the stream. If the stream
-  ## contains no data then this function will fail with a ``IndexError``
-  ## exception.
-  ##
-  ## This function will remove the data that was returned from the underlying
-  ## ``FutureStream``.
-  return future.queue.dequeue()
-
-proc put*[T](future: FutureStream[T], value: T) =
+proc put*[T](future: FutureStream[T], value: T): Future[void] =
   ## Writes the specified value inside the specified future stream.
   ##
   ## This will raise ``ValueError`` if ``future`` is finished.
+  result = newFuture[void]("FutureStream.put")
   if future.finished:
     let msg = "FutureStream is finished and so no longer accepts new data."
-    raise newException(ValueError, msg)
+    result.fail(newException(ValueError, msg))
+  # TODO: Buffering.
   future.queue.enqueue(value)
   if not future.cb.isNil: future.cb()
+  result.complete()
 
-proc peek*[T](future: FutureStream[T]): T =
-  ## Returns the oldest value stored inside the specified future stream.
-  return future.queue.front()
-
-proc takeAsync*[T](future: FutureStream[T]): Future[(bool, T)] =
+proc take*[T](future: FutureStream[T]): Future[(bool, T)] =
   ## Returns a future that will complete when the ``FutureStream`` has data
   ## placed into it. The future will be completed with the oldest
   ## value stored inside the stream. The return value will also determine
@@ -281,7 +271,7 @@ proc takeAsync*[T](future: FutureStream[T]): Future[(bool, T)] =
   ##
   ## This function will remove the data that was returned from the underlying
   ## ``FutureStream``.
-  var resFut = newFuture[(bool, T)]("FutureStream.takeAsync")
+  var resFut = newFuture[(bool, T)]("FutureStream.take")
   let savedCb = future.cb
   future.callback =
     proc (fs: FutureStream[T]) =
@@ -295,7 +285,7 @@ proc takeAsync*[T](future: FutureStream[T]): Future[(bool, T)] =
         res[0] = false
       else:
         res[0] = true
-        res[1] = fs.take()
+        res[1] = fs.queue.dequeue()
 
       if not resFut.finished:
         resFut.complete(res)
diff --git a/tests/async/tfuturestream.nim b/tests/async/tfuturestream.nim
index 61b3863ac..bf8c9b4c4 100644
--- a/tests/async/tfuturestream.nim
+++ b/tests/async/tfuturestream.nim
@@ -19,13 +19,13 @@ var fs = newFutureStream[int]()
 proc alpha() {.async.} =
   for i in 0 .. 5:
     await sleepAsync(1000)
-    fs.put(i)
+    await fs.put(i)
 
   fs.complete()
 
 proc beta() {.async.} =
   while not fs.finished:
-    let (hasValue, value) = await fs.takeAsync()
+    let (hasValue, value) = await fs.take()
     if hasValue:
       echo(value)