summary refs log tree commit diff stats
path: root/lib
diff options
context:
space:
mode:
authorDominik Picheta <dominikpicheta@gmail.com>2017-02-10 20:18:59 +0100
committerDominik Picheta <dominikpicheta@gmail.com>2017-02-10 20:18:59 +0100
commitddd3d3f44a7bd83d97e17b46bc7fd6b92043520f (patch)
tree2be75d84f608cd7ac2b087c8447247f263b28671 /lib
parentd87fb236d101b9c1bc14f18fe17798cc214a9620 (diff)
downloadNim-ddd3d3f44a7bd83d97e17b46bc7fd6b92043520f.tar.gz
Improve implementation of takeAsync for FutureStreams.
Diffstat (limited to 'lib')
-rw-r--r--lib/pure/includes/asyncfutures.nim40
1 files changed, 26 insertions, 14 deletions
diff --git a/lib/pure/includes/asyncfutures.nim b/lib/pure/includes/asyncfutures.nim
index 8d7ace7a1..13d927e92 100644
--- a/lib/pure/includes/asyncfutures.nim
+++ b/lib/pure/includes/asyncfutures.nim
@@ -129,11 +129,9 @@ proc complete*[T](future: FutureVar[T], val: T) =
   if not fut.cb.isNil():
     fut.cb()
 
-proc complete*[T](future: FutureStream[T], value: T) =
-  ## Completes a ``FutureStream`` with the last value, signifying the end of
-  ## data.
+proc complete*[T](future: FutureStream[T]) =
+  ## Completes a ``FutureStream`` signifying the end of data.
   future.finished = true
-  future.queue.enqueue(value)
   if not future.cb.isNil():
     future.cb()
 
@@ -274,22 +272,36 @@ proc peek*[T](future: FutureStream[T]): T =
   ## Returns the oldest value stored inside the specified future stream.
   return future.queue.front()
 
-proc takeAsync*[T](future: FutureStream[T]): Future[T] =
+proc takeAsync*[T](future: FutureStream[T]): Future[(bool, T)] =
   ## Returns a future that will complete when the ``FutureStream`` has data
-  ## placed into it. The future will be completed with the oldest value stored
-  ## inside the stream.
+  ## placed into it. The future will be completed with the oldest
+  ## value stored inside the stream. The return value will also determine
+  ## whether data was retrieved, ``false`` means that the future stream was
+  ## completed and no data was retrieved.
   ##
   ## This function will remove the data that was returned from the underlying
   ## ``FutureStream``.
-  var resFut = newFuture[T]("FutureStream.takeAsync")
-  let cb = future.cb
+  var resFut = newFuture[(bool, T)]("FutureStream.takeAsync")
+  let savedCb = future.cb
   future.callback =
     proc (fs: FutureStream[T]) =
-      # TODO: When finished(fs) should we "cancel" resFut? This assumes that we
-      # TODO: can `complete` with no value.
-      if not resFut.finished and (not finished(fs)):
-        resFut.complete(fs.take())
-      if not cb.isNil: cb()
+      # We don't want this callback called again.
+      future.cb = nil
+
+      # The return value depends on whether the FutureStream has finished.
+      var res: (bool, T)
+      if finished(fs):
+        # Remember, this callback is called when the FutureStream is completed.
+        res[0] = false
+      else:
+        res[0] = true
+        res[1] = fs.take()
+
+      if not resFut.finished:
+        resFut.complete(res)
+
+      # If the saved callback isn't nil then let's call it.
+      if not savedCb.isNil: savedCb()
   return resFut
 
 proc asyncCheck*[T](future: Future[T]) =