summary refs log tree commit diff stats
path: root/lib/pure/includes
diff options
context:
space:
mode:
authorDominik Picheta <dominikpicheta@gmail.com>2017-02-26 12:52:19 +0100
committerDominik Picheta <dominikpicheta@gmail.com>2017-02-26 12:52:19 +0100
commitf9cce320974c33554302d2a801c89e413c70a80b (patch)
treecf951f97934e6d27ee517940197b08db1d6df44a /lib/pure/includes
parent912d95a6ea2583bd783f85527dc3526b77710568 (diff)
downloadNim-f9cce320974c33554302d2a801c89e413c70a80b.tar.gz
Various fixes to FutureStreams based on PR feedback.
Diffstat (limited to 'lib/pure/includes')
-rw-r--r--lib/pure/includes/asyncfutures.nim43
1 files changed, 27 insertions, 16 deletions
diff --git a/lib/pure/includes/asyncfutures.nim b/lib/pure/includes/asyncfutures.nim
index 6f6693605..a597de5cf 100644
--- a/lib/pure/includes/asyncfutures.nim
+++ b/lib/pure/includes/asyncfutures.nim
@@ -17,8 +17,10 @@ type
   FutureVar*[T] = distinct Future[T]
 
   FutureStream*[T] = ref object of FutureBase   ## Special future that acts as
-                                                ## a queue.
-    queue: Queue[T]
+                                                ## a queue. Its API is still
+                                                ## experimental and so is
+                                                ## subject to change.
+    queue: Deque[T]
 
   FutureError* = object of Exception
     cause*: FutureBase
@@ -30,7 +32,7 @@ when not defined(release):
 
 proc callSoon*(cbproc: proc ()) {.gcsafe.}
 
-template setupFutureBase(fromProc: string): stmt =
+template setupFutureBase(fromProc: string) =
   new(result)
   result.finished = false
   when not defined(release):
@@ -55,13 +57,20 @@ proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] =
   result = FutureVar[T](newFuture[T](fromProc))
 
 proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] =
-  ## Create a new ``FutureStream``. This Future type's callback can be activated
-  ## multiple times when new data is written to it.
+  ## Create a new ``FutureStream``. This future's callback is activated when
+  ## two events occur:
+  ##
+  ## * New data is written into the future stream.
+  ## * The future stream is completed (this means that no more data will be
+  ##   written).
   ##
   ## Specifying ``fromProc``, which is a string specifying the name of the proc
   ## that this future belongs to, is a good habit as it helps with debugging.
+  ##
+  ## **Note:** The API of FutureStream is still new and so has a higher
+  ## likelihood of changing in the future.
   setupFutureBase(fromProc)
-  result.queue = initQueue[T]()
+  result.queue = initDeque[T]()
 
 proc clean*[T](future: FutureVar[T]) =
   ## Resets the ``finished`` status of ``future``.
@@ -130,7 +139,7 @@ proc complete*[T](future: FutureVar[T], val: T) =
     fut.cb()
 
 proc complete*[T](future: FutureStream[T]) =
-  ## Completes a ``FutureStream`` signifying the end of data.
+  ## Completes a ``FutureStream`` signalling the end of data.
   future.finished = true
   if not future.cb.isNil():
     future.cb()
@@ -179,8 +188,8 @@ proc `callback=`*[T](future: FutureStream[T],
   ## The callback is also called when the future is completed. So you should
   ## use ``finished`` to check whether data is available.
   ##
-  ## If the future stream already has data then ``cb`` will be called
-  ## immediately.
+  ## If the future stream already has data or is finished then ``cb`` will be
+  ## called immediately.
   future.cb = proc () = cb(future)
   if future.queue.len > 0 or future.finished:
     callSoon(future.cb)
@@ -236,8 +245,9 @@ proc finished*[T](future: Future[T] | FutureVar[T] | FutureStream[T]): bool =
   ##
   ## ``True`` may indicate an error or a value. Use ``failed`` to distinguish.
   ##
-  ## For a ``FutureStream`` this signifies that no more data will be placed
-  ## inside it and that there is no data waiting to be retrieved.
+  ## For a ``FutureStream`` a ``true`` value means that no more data will be
+  ## placed inside the stream _and_ that there is no data waiting to be
+  ## retrieved.
   when future is FutureVar[T]:
     result = (Future[T](future)).finished
   elif future is FutureStream[T]:
@@ -249,7 +259,7 @@ proc failed*(future: FutureBase): bool =
   ## Determines whether ``future`` completed with an error.
   return future.error != nil
 
-proc put*[T](future: FutureStream[T], value: T): Future[void] =
+proc write*[T](future: FutureStream[T], value: T): Future[void] =
   ## Writes the specified value inside the specified future stream.
   ##
   ## This will raise ``ValueError`` if ``future`` is finished.
@@ -258,12 +268,13 @@ proc put*[T](future: FutureStream[T], value: T): Future[void] =
     let msg = "FutureStream is finished and so no longer accepts new data."
     result.fail(newException(ValueError, msg))
     return
-  # TODO: Buffering.
-  future.queue.enqueue(value)
+  # TODO: Implement limiting of the streams storage to prevent it growing
+  # infinitely when no reads are occuring.
+  future.queue.addLast(value)
   if not future.cb.isNil: future.cb()
   result.complete()
 
-proc take*[T](future: FutureStream[T]): Future[(bool, T)] =
+proc read*[T](future: FutureStream[T]): Future[(bool, T)] =
   ## Returns a future that will complete when the ``FutureStream`` has data
   ## placed into it. The future will be completed with the oldest
   ## value stored inside the stream. The return value will also determine
@@ -286,7 +297,7 @@ proc take*[T](future: FutureStream[T]): Future[(bool, T)] =
         res[0] = false
       else:
         res[0] = true
-        res[1] = fs.queue.dequeue()
+        res[1] = fs.queue.popLast()
 
       if not resFut.finished:
         resFut.complete(res)