summary refs log tree commit diff stats
diff options
context:
space:
mode:
authorDominik Picheta <dominikpicheta@gmail.com>2017-02-10 20:18:59 +0100
committerDominik Picheta <dominikpicheta@gmail.com>2017-02-10 20:18:59 +0100
commitddd3d3f44a7bd83d97e17b46bc7fd6b92043520f (patch)
tree2be75d84f608cd7ac2b087c8447247f263b28671
parentd87fb236d101b9c1bc14f18fe17798cc214a9620 (diff)
downloadNim-ddd3d3f44a7bd83d97e17b46bc7fd6b92043520f.tar.gz
Improve implementation of takeAsync for FutureStreams.
-rw-r--r--lib/pure/includes/asyncfutures.nim40
-rw-r--r--tests/async/tfuturestream.nim26
2 files changed, 48 insertions, 18 deletions
diff --git a/lib/pure/includes/asyncfutures.nim b/lib/pure/includes/asyncfutures.nim
index 8d7ace7a1..13d927e92 100644
--- a/lib/pure/includes/asyncfutures.nim
+++ b/lib/pure/includes/asyncfutures.nim
@@ -129,11 +129,9 @@ proc complete*[T](future: FutureVar[T], val: T) =
   if not fut.cb.isNil():
     fut.cb()
 
-proc complete*[T](future: FutureStream[T], value: T) =
-  ## Completes a ``FutureStream`` with the last value, signifying the end of
-  ## data.
+proc complete*[T](future: FutureStream[T]) =
+  ## Completes a ``FutureStream`` signifying the end of data.
   future.finished = true
-  future.queue.enqueue(value)
   if not future.cb.isNil():
     future.cb()
 
@@ -274,22 +272,36 @@ proc peek*[T](future: FutureStream[T]): T =
   ## Returns the oldest value stored inside the specified future stream.
   return future.queue.front()
 
-proc takeAsync*[T](future: FutureStream[T]): Future[T] =
+proc takeAsync*[T](future: FutureStream[T]): Future[(bool, T)] =
   ## Returns a future that will complete when the ``FutureStream`` has data
-  ## placed into it. The future will be completed with the oldest value stored
-  ## inside the stream.
+  ## placed into it. The future will be completed with the oldest
+  ## value stored inside the stream. The return value will also determine
+  ## whether data was retrieved, ``false`` means that the future stream was
+  ## completed and no data was retrieved.
   ##
   ## This function will remove the data that was returned from the underlying
   ## ``FutureStream``.
-  var resFut = newFuture[T]("FutureStream.takeAsync")
-  let cb = future.cb
+  var resFut = newFuture[(bool, T)]("FutureStream.takeAsync")
+  let savedCb = future.cb
   future.callback =
     proc (fs: FutureStream[T]) =
-      # TODO: When finished(fs) should we "cancel" resFut? This assumes that we
-      # TODO: can `complete` with no value.
-      if not resFut.finished and (not finished(fs)):
-        resFut.complete(fs.take())
-      if not cb.isNil: cb()
+      # We don't want this callback called again.
+      future.cb = nil
+
+      # The return value depends on whether the FutureStream has finished.
+      var res: (bool, T)
+      if finished(fs):
+        # Remember, this callback is called when the FutureStream is completed.
+        res[0] = false
+      else:
+        res[0] = true
+        res[1] = fs.take()
+
+      if not resFut.finished:
+        resFut.complete(res)
+
+      # If the saved callback isn't nil then let's call it.
+      if not savedCb.isNil: savedCb()
   return resFut
 
 proc asyncCheck*[T](future: Future[T]) =
diff --git a/tests/async/tfuturestream.nim b/tests/async/tfuturestream.nim
index e3480126f..61b3863ac 100644
--- a/tests/async/tfuturestream.nim
+++ b/tests/async/tfuturestream.nim
@@ -14,21 +14,39 @@ Finished
 """
 import asyncdispatch
 
-var fs = newFutureStream[string]()
+var fs = newFutureStream[int]()
 
 proc alpha() {.async.} =
   for i in 0 .. 5:
     await sleepAsync(1000)
-    fs.put($i)
+    fs.put(i)
 
-  fs.complete("Done")
+  fs.complete()
 
 proc beta() {.async.} =
   while not fs.finished:
-    echo(await fs.takeAsync())
+    let (hasValue, value) = await fs.takeAsync()
+    if hasValue:
+      echo(value)
 
   echo("Finished")
 
 asyncCheck alpha()
 waitFor beta()
 
+# TODO: Something like this should work eventually.
+# proc delta(): FutureStream[string] {.async.} =
+#   for i in 0 .. 5:
+#     await sleepAsync(1000)
+#     result.put($i)
+
+#   return ""
+
+# proc omega() {.async.} =
+#   let fut = delta()
+#   while not fut.finished():
+#     echo(await fs.takeAsync())
+
+#   echo("Finished")
+
+# waitFor omega()
\ No newline at end of file