summary refs log tree commit diff stats
path: root/lib/pure/includes/asyncfutures.nim
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pure/includes/asyncfutures.nim')
-rw-r--r--lib/pure/includes/asyncfutures.nim118
1 files changed, 110 insertions, 8 deletions
diff --git a/lib/pure/includes/asyncfutures.nim b/lib/pure/includes/asyncfutures.nim
index c83228014..6af5bf3cf 100644
--- a/lib/pure/includes/asyncfutures.nim
+++ b/lib/pure/includes/asyncfutures.nim
@@ -16,6 +16,12 @@ type
 
   FutureVar*[T] = distinct Future[T]
 
+  FutureStream*[T] = ref object of FutureBase   ## Special future that acts as
+                                                ## a queue. Its API is still
+                                                ## experimental and so is
+                                                ## subject to change.
+    queue: Deque[T]
+
   FutureError* = object of Exception
     cause*: FutureBase
 
@@ -26,11 +32,7 @@ when not defined(release):
 
 proc callSoon*(cbproc: proc ()) {.gcsafe.}
 
-proc newFuture*[T](fromProc: string = "unspecified"): Future[T] =
-  ## Creates a new future.
-  ##
-  ## Specifying ``fromProc``, which is a string specifying the name of the proc
-  ## that this future belongs to, is a good habit as it helps with debugging.
+template setupFutureBase(fromProc: string) =
   new(result)
   result.finished = false
   when not defined(release):
@@ -39,6 +41,13 @@ proc newFuture*[T](fromProc: string = "unspecified"): Future[T] =
     result.fromProc = fromProc
     currentID.inc()
 
+proc newFuture*[T](fromProc: string = "unspecified"): Future[T] =
+  ## Creates a new future.
+  ##
+  ## Specifying ``fromProc``, which is a string specifying the name of the proc
+  ## that this future belongs to, is a good habit as it helps with debugging.
+  setupFutureBase(fromProc)
+
 proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] =
   ## Create a new ``FutureVar``. This Future type is ideally suited for
   ## situations where you want to avoid unnecessary allocations of Futures.
@@ -47,6 +56,22 @@ proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] =
   ## that this future belongs to, is a good habit as it helps with debugging.
   result = FutureVar[T](newFuture[T](fromProc))
 
+proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] =
+  ## Create a new ``FutureStream``. This future's callback is activated when
+  ## two events occur:
+  ##
+  ## * New data is written into the future stream.
+  ## * The future stream is completed (this means that no more data will be
+  ##   written).
+  ##
+  ## Specifying ``fromProc``, which is a string specifying the name of the proc
+  ## that this future belongs to, is a good habit as it helps with debugging.
+  ##
+  ## **Note:** The API of FutureStream is still new and so has a higher
+  ## likelihood of changing in the future.
+  setupFutureBase(fromProc)
+  result.queue = initDeque[T]()
+
 proc clean*[T](future: FutureVar[T]) =
   ## Resets the ``finished`` status of ``future``.
   Future[T](future).finished = false
@@ -107,12 +132,18 @@ proc complete*[T](future: FutureVar[T], val: T) =
   ## Any previously stored value will be overwritten.
   template fut: untyped = Future[T](future)
   checkFinished(fut)
-  assert(fut.error == nil)
+  assert(fut.error.isNil())
   fut.finished = true
   fut.value = val
-  if fut.cb != nil:
+  if not fut.cb.isNil():
     fut.cb()
 
+proc complete*[T](future: FutureStream[T]) =
+  ## Completes a ``FutureStream`` signalling the end of data.
+  future.finished = true
+  if not future.cb.isNil():
+    future.cb()
+
 proc fail*[T](future: Future[T], error: ref Exception) =
   ## Completes ``future`` with ``error``.
   #assert(not future.finished, "Future already finished, cannot finish twice.")
@@ -149,6 +180,20 @@ proc `callback=`*[T](future: Future[T],
   ## If future has already completed then ``cb`` will be called immediately.
   future.callback = proc () = cb(future)
 
+proc `callback=`*[T](future: FutureStream[T],
+    cb: proc (future: FutureStream[T]) {.closure,gcsafe.}) =
+  ## Sets the callback proc to be called when data was placed inside the
+  ## future stream.
+  ##
+  ## The callback is also called when the future is completed. So you should
+  ## use ``finished`` to check whether data is available.
+  ##
+  ## If the future stream already has data or is finished then ``cb`` will be
+  ## called immediately.
+  future.cb = proc () = cb(future)
+  if future.queue.len > 0 or future.finished:
+    callSoon(future.cb)
+
 proc injectStacktrace[T](future: Future[T]) =
   # TODO: Come up with something better.
   when not defined(release):
@@ -195,12 +240,18 @@ proc mget*[T](future: FutureVar[T]): var T =
   ## Future has not been finished.
   result = Future[T](future).value
 
-proc finished*[T](future: Future[T] | FutureVar[T]): bool =
+proc finished*[T](future: Future[T] | FutureVar[T] | FutureStream[T]): bool =
   ## Determines whether ``future`` has completed.
   ##
   ## ``True`` may indicate an error or a value. Use ``failed`` to distinguish.
+  ##
+  ## For a ``FutureStream`` a ``true`` value means that no more data will be
+  ## placed inside the stream _and_ that there is no data waiting to be
+  ## retrieved.
   when future is FutureVar[T]:
     result = (Future[T](future)).finished
+  elif future is FutureStream[T]:
+    result = future.finished and future.queue.len == 0
   else:
     result = future.finished
 
@@ -208,6 +259,57 @@ proc failed*(future: FutureBase): bool =
   ## Determines whether ``future`` completed with an error.
   return future.error != nil
 
+proc write*[T](future: FutureStream[T], value: T): Future[void] =
+  ## Writes the specified value inside the specified future stream.
+  ##
+  ## This will raise ``ValueError`` if ``future`` is finished.
+  result = newFuture[void]("FutureStream.put")
+  if future.finished:
+    let msg = "FutureStream is finished and so no longer accepts new data."
+    result.fail(newException(ValueError, msg))
+    return
+  # TODO: Implement limiting of the streams storage to prevent it growing
+  # infinitely when no reads are occuring.
+  future.queue.addLast(value)
+  if not future.cb.isNil: future.cb()
+  result.complete()
+
+proc read*[T](future: FutureStream[T]): Future[(bool, T)] =
+  ## Returns a future that will complete when the ``FutureStream`` has data
+  ## placed into it. The future will be completed with the oldest
+  ## value stored inside the stream. The return value will also determine
+  ## whether data was retrieved, ``false`` means that the future stream was
+  ## completed and no data was retrieved.
+  ##
+  ## This function will remove the data that was returned from the underlying
+  ## ``FutureStream``.
+  var resFut = newFuture[(bool, T)]("FutureStream.take")
+  let savedCb = future.cb
+  future.callback =
+    proc (fs: FutureStream[T]) =
+      # We don't want this callback called again.
+      future.cb = nil
+
+      # The return value depends on whether the FutureStream has finished.
+      var res: (bool, T)
+      if finished(fs):
+        # Remember, this callback is called when the FutureStream is completed.
+        res[0] = false
+      else:
+        res[0] = true
+        res[1] = fs.queue.popFirst()
+
+      if not resFut.finished:
+        resFut.complete(res)
+
+      # If the saved callback isn't nil then let's call it.
+      if not savedCb.isNil: savedCb()
+  return resFut
+
+proc len*[T](future: FutureStream[T]): int =
+  ## Returns the amount of data pieces inside the stream.
+  future.queue.len
+
 proc asyncCheck*[T](future: Future[T]) =
   ## Sets a callback on ``future`` which raises an exception if the future
   ## finished with an error.