summary refs log tree commit diff stats
path: root/lib/pure/includes
diff options
context:
space:
mode:
authorMichał Zieliński <michal@zielinscy.org.pl>2017-06-05 00:12:44 +0200
committerMichał Zieliński <michal@zielinscy.org.pl>2017-07-05 12:54:09 +0200
commite86863e8f55d862b7690fbe6a9cca87ebf425913 (patch)
tree1ed662c7ee46738c4736d839837424fa075ec6cd /lib/pure/includes
parent9e12db445959ce7c791ec7480ea08e9e02f96bba (diff)
downloadNim-e86863e8f55d862b7690fbe6a9cca87ebf425913.tar.gz
asyncdispatch: split asyncfutures into its own module
This slightly changes behaviour of callSoon - before loop is initialized, callSoon will call the function immediately.
Diffstat (limited to 'lib/pure/includes')
-rw-r--r--lib/pure/includes/asyncfutures.nim408
1 files changed, 0 insertions, 408 deletions
diff --git a/lib/pure/includes/asyncfutures.nim b/lib/pure/includes/asyncfutures.nim
deleted file mode 100644
index 6af5bf3cf..000000000
--- a/lib/pure/includes/asyncfutures.nim
+++ /dev/null
@@ -1,408 +0,0 @@
-
-# TODO: This shouldn't need to be included, but should ideally be exported.
-type
-  FutureBase* = ref object of RootObj ## Untyped future.
-    cb: proc () {.closure,gcsafe.}
-    finished: bool
-    error*: ref Exception ## Stored exception
-    errorStackTrace*: string
-    when not defined(release):
-      stackTrace: string ## For debugging purposes only.
-      id: int
-      fromProc: string
-
-  Future*[T] = ref object of FutureBase ## Typed future.
-    value: T ## Stored value
-
-  FutureVar*[T] = distinct Future[T]
-
-  FutureStream*[T] = ref object of FutureBase   ## Special future that acts as
-                                                ## a queue. Its API is still
-                                                ## experimental and so is
-                                                ## subject to change.
-    queue: Deque[T]
-
-  FutureError* = object of Exception
-    cause*: FutureBase
-
-{.deprecated: [PFutureBase: FutureBase, PFuture: Future].}
-
-when not defined(release):
-  var currentID = 0
-
-proc callSoon*(cbproc: proc ()) {.gcsafe.}
-
-template setupFutureBase(fromProc: string) =
-  new(result)
-  result.finished = false
-  when not defined(release):
-    result.stackTrace = getStackTrace()
-    result.id = currentID
-    result.fromProc = fromProc
-    currentID.inc()
-
-proc newFuture*[T](fromProc: string = "unspecified"): Future[T] =
-  ## Creates a new future.
-  ##
-  ## Specifying ``fromProc``, which is a string specifying the name of the proc
-  ## that this future belongs to, is a good habit as it helps with debugging.
-  setupFutureBase(fromProc)
-
-proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] =
-  ## Create a new ``FutureVar``. This Future type is ideally suited for
-  ## situations where you want to avoid unnecessary allocations of Futures.
-  ##
-  ## Specifying ``fromProc``, which is a string specifying the name of the proc
-  ## that this future belongs to, is a good habit as it helps with debugging.
-  result = FutureVar[T](newFuture[T](fromProc))
-
-proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] =
-  ## Create a new ``FutureStream``. This future's callback is activated when
-  ## two events occur:
-  ##
-  ## * New data is written into the future stream.
-  ## * The future stream is completed (this means that no more data will be
-  ##   written).
-  ##
-  ## Specifying ``fromProc``, which is a string specifying the name of the proc
-  ## that this future belongs to, is a good habit as it helps with debugging.
-  ##
-  ## **Note:** The API of FutureStream is still new and so has a higher
-  ## likelihood of changing in the future.
-  setupFutureBase(fromProc)
-  result.queue = initDeque[T]()
-
-proc clean*[T](future: FutureVar[T]) =
-  ## Resets the ``finished`` status of ``future``.
-  Future[T](future).finished = false
-  Future[T](future).error = nil
-
-proc checkFinished[T](future: Future[T]) =
-  ## Checks whether `future` is finished. If it is then raises a
-  ## ``FutureError``.
-  when not defined(release):
-    if future.finished:
-      var msg = ""
-      msg.add("An attempt was made to complete a Future more than once. ")
-      msg.add("Details:")
-      msg.add("\n  Future ID: " & $future.id)
-      msg.add("\n  Created in proc: " & future.fromProc)
-      msg.add("\n  Stack trace to moment of creation:")
-      msg.add("\n" & indent(future.stackTrace.strip(), 4))
-      when T is string:
-        msg.add("\n  Contents (string): ")
-        msg.add("\n" & indent(future.value.repr, 4))
-      msg.add("\n  Stack trace to moment of secondary completion:")
-      msg.add("\n" & indent(getStackTrace().strip(), 4))
-      var err = newException(FutureError, msg)
-      err.cause = future
-      raise err
-
-proc complete*[T](future: Future[T], val: T) =
-  ## Completes ``future`` with value ``val``.
-  #assert(not future.finished, "Future already finished, cannot finish twice.")
-  checkFinished(future)
-  assert(future.error == nil)
-  future.value = val
-  future.finished = true
-  if future.cb != nil:
-    future.cb()
-
-proc complete*(future: Future[void]) =
-  ## Completes a void ``future``.
-  #assert(not future.finished, "Future already finished, cannot finish twice.")
-  checkFinished(future)
-  assert(future.error == nil)
-  future.finished = true
-  if future.cb != nil:
-    future.cb()
-
-proc complete*[T](future: FutureVar[T]) =
-  ## Completes a ``FutureVar``.
-  template fut: untyped = Future[T](future)
-  checkFinished(fut)
-  assert(fut.error == nil)
-  fut.finished = true
-  if fut.cb != nil:
-    fut.cb()
-
-proc complete*[T](future: FutureVar[T], val: T) =
-  ## Completes a ``FutureVar`` with value ``val``.
-  ##
-  ## Any previously stored value will be overwritten.
-  template fut: untyped = Future[T](future)
-  checkFinished(fut)
-  assert(fut.error.isNil())
-  fut.finished = true
-  fut.value = val
-  if not fut.cb.isNil():
-    fut.cb()
-
-proc complete*[T](future: FutureStream[T]) =
-  ## Completes a ``FutureStream`` signalling the end of data.
-  future.finished = true
-  if not future.cb.isNil():
-    future.cb()
-
-proc fail*[T](future: Future[T], error: ref Exception) =
-  ## Completes ``future`` with ``error``.
-  #assert(not future.finished, "Future already finished, cannot finish twice.")
-  checkFinished(future)
-  future.finished = true
-  future.error = error
-  future.errorStackTrace =
-    if getStackTrace(error) == "": getStackTrace() else: getStackTrace(error)
-  if future.cb != nil:
-    future.cb()
-  else:
-    # This is to prevent exceptions from being silently ignored when a future
-    # is discarded.
-    # TODO: This may turn out to be a bad idea.
-    # Turns out this is a bad idea.
-    #raise error
-    discard
-
-proc `callback=`*(future: FutureBase, cb: proc () {.closure,gcsafe.}) =
-  ## Sets the callback proc to be called when the future completes.
-  ##
-  ## If future has already completed then ``cb`` will be called immediately.
-  ##
-  ## **Note**: You most likely want the other ``callback`` setter which
-  ## passes ``future`` as a param to the callback.
-  future.cb = cb
-  if future.finished:
-    callSoon(future.cb)
-
-proc `callback=`*[T](future: Future[T],
-    cb: proc (future: Future[T]) {.closure,gcsafe.}) =
-  ## Sets the callback proc to be called when the future completes.
-  ##
-  ## If future has already completed then ``cb`` will be called immediately.
-  future.callback = proc () = cb(future)
-
-proc `callback=`*[T](future: FutureStream[T],
-    cb: proc (future: FutureStream[T]) {.closure,gcsafe.}) =
-  ## Sets the callback proc to be called when data was placed inside the
-  ## future stream.
-  ##
-  ## The callback is also called when the future is completed. So you should
-  ## use ``finished`` to check whether data is available.
-  ##
-  ## If the future stream already has data or is finished then ``cb`` will be
-  ## called immediately.
-  future.cb = proc () = cb(future)
-  if future.queue.len > 0 or future.finished:
-    callSoon(future.cb)
-
-proc injectStacktrace[T](future: Future[T]) =
-  # TODO: Come up with something better.
-  when not defined(release):
-    var msg = ""
-    msg.add("\n  " & future.fromProc & "'s lead up to read of failed Future:")
-
-    if not future.errorStackTrace.isNil and future.errorStackTrace != "":
-      msg.add("\n" & indent(future.errorStackTrace.strip(), 4))
-    else:
-      msg.add("\n    Empty or nil stack trace.")
-    future.error.msg.add(msg)
-
-proc read*[T](future: Future[T] | FutureVar[T]): T =
-  ## Retrieves the value of ``future``. Future must be finished otherwise
-  ## this function will fail with a ``ValueError`` exception.
-  ##
-  ## If the result of the future is an error then that error will be raised.
-  {.push hint[ConvFromXtoItselfNotNeeded]: off.}
-  let fut = Future[T](future)
-  {.pop.}
-  if fut.finished:
-    if fut.error != nil:
-      injectStacktrace(fut)
-      raise fut.error
-    when T isnot void:
-      return fut.value
-  else:
-    # TODO: Make a custom exception type for this?
-    raise newException(ValueError, "Future still in progress.")
-
-proc readError*[T](future: Future[T]): ref Exception =
-  ## Retrieves the exception stored in ``future``.
-  ##
-  ## An ``ValueError`` exception will be thrown if no exception exists
-  ## in the specified Future.
-  if future.error != nil: return future.error
-  else:
-    raise newException(ValueError, "No error in future.")
-
-proc mget*[T](future: FutureVar[T]): var T =
-  ## Returns a mutable value stored in ``future``.
-  ##
-  ## Unlike ``read``, this function will not raise an exception if the
-  ## Future has not been finished.
-  result = Future[T](future).value
-
-proc finished*[T](future: Future[T] | FutureVar[T] | FutureStream[T]): bool =
-  ## Determines whether ``future`` has completed.
-  ##
-  ## ``True`` may indicate an error or a value. Use ``failed`` to distinguish.
-  ##
-  ## For a ``FutureStream`` a ``true`` value means that no more data will be
-  ## placed inside the stream _and_ that there is no data waiting to be
-  ## retrieved.
-  when future is FutureVar[T]:
-    result = (Future[T](future)).finished
-  elif future is FutureStream[T]:
-    result = future.finished and future.queue.len == 0
-  else:
-    result = future.finished
-
-proc failed*(future: FutureBase): bool =
-  ## Determines whether ``future`` completed with an error.
-  return future.error != nil
-
-proc write*[T](future: FutureStream[T], value: T): Future[void] =
-  ## Writes the specified value inside the specified future stream.
-  ##
-  ## This will raise ``ValueError`` if ``future`` is finished.
-  result = newFuture[void]("FutureStream.put")
-  if future.finished:
-    let msg = "FutureStream is finished and so no longer accepts new data."
-    result.fail(newException(ValueError, msg))
-    return
-  # TODO: Implement limiting of the streams storage to prevent it growing
-  # infinitely when no reads are occuring.
-  future.queue.addLast(value)
-  if not future.cb.isNil: future.cb()
-  result.complete()
-
-proc read*[T](future: FutureStream[T]): Future[(bool, T)] =
-  ## Returns a future that will complete when the ``FutureStream`` has data
-  ## placed into it. The future will be completed with the oldest
-  ## value stored inside the stream. The return value will also determine
-  ## whether data was retrieved, ``false`` means that the future stream was
-  ## completed and no data was retrieved.
-  ##
-  ## This function will remove the data that was returned from the underlying
-  ## ``FutureStream``.
-  var resFut = newFuture[(bool, T)]("FutureStream.take")
-  let savedCb = future.cb
-  future.callback =
-    proc (fs: FutureStream[T]) =
-      # We don't want this callback called again.
-      future.cb = nil
-
-      # The return value depends on whether the FutureStream has finished.
-      var res: (bool, T)
-      if finished(fs):
-        # Remember, this callback is called when the FutureStream is completed.
-        res[0] = false
-      else:
-        res[0] = true
-        res[1] = fs.queue.popFirst()
-
-      if not resFut.finished:
-        resFut.complete(res)
-
-      # If the saved callback isn't nil then let's call it.
-      if not savedCb.isNil: savedCb()
-  return resFut
-
-proc len*[T](future: FutureStream[T]): int =
-  ## Returns the amount of data pieces inside the stream.
-  future.queue.len
-
-proc asyncCheck*[T](future: Future[T]) =
-  ## Sets a callback on ``future`` which raises an exception if the future
-  ## finished with an error.
-  ##
-  ## This should be used instead of ``discard`` to discard void futures.
-  future.callback =
-    proc () =
-      if future.failed:
-        injectStacktrace(future)
-        raise future.error
-
-proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
-  ## Returns a future which will complete once both ``fut1`` and ``fut2``
-  ## complete.
-  var retFuture = newFuture[void]("asyncdispatch.`and`")
-  fut1.callback =
-    proc () =
-      if not retFuture.finished:
-        if fut1.failed: retFuture.fail(fut1.error)
-        elif fut2.finished: retFuture.complete()
-  fut2.callback =
-    proc () =
-      if not retFuture.finished:
-        if fut2.failed: retFuture.fail(fut2.error)
-        elif fut1.finished: retFuture.complete()
-  return retFuture
-
-proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
-  ## Returns a future which will complete once either ``fut1`` or ``fut2``
-  ## complete.
-  var retFuture = newFuture[void]("asyncdispatch.`or`")
-  proc cb[X](fut: Future[X]) =
-    if fut.failed: retFuture.fail(fut.error)
-    if not retFuture.finished: retFuture.complete()
-  fut1.callback = cb[T]
-  fut2.callback = cb[Y]
-  return retFuture
-
-proc all*[T](futs: varargs[Future[T]]): auto =
-  ## Returns a future which will complete once
-  ## all futures in ``futs`` complete.
-  ## If the argument is empty, the returned future completes immediately.
-  ##
-  ## If the awaited futures are not ``Future[void]``, the returned future
-  ## will hold the values of all awaited futures in a sequence.
-  ##
-  ## If the awaited futures *are* ``Future[void]``,
-  ## this proc returns ``Future[void]``.
-
-  when T is void:
-    var
-      retFuture = newFuture[void]("asyncdispatch.all")
-      completedFutures = 0
-
-    let totalFutures = len(futs)
-
-    for fut in futs:
-      fut.callback = proc(f: Future[T]) =
-        inc(completedFutures)
-        if not retFuture.finished:
-          if f.failed:
-            retFuture.fail(f.error)
-          else:
-            if completedFutures == totalFutures:
-              retFuture.complete()
-
-    if totalFutures == 0:
-      retFuture.complete()
-
-    return retFuture
-
-  else:
-    var
-      retFuture = newFuture[seq[T]]("asyncdispatch.all")
-      retValues = newSeq[T](len(futs))
-      completedFutures = 0
-
-    for i, fut in futs:
-      proc setCallback(i: int) =
-        fut.callback = proc(f: Future[T]) =
-          inc(completedFutures)
-          if not retFuture.finished:
-            if f.failed:
-              retFuture.fail(f.error)
-            else:
-              retValues[i] = f.read()
-
-              if completedFutures == len(retValues):
-                retFuture.complete(retValues)
-
-      setCallback(i)
-
-    if retValues.len == 0:
-      retFuture.complete(retValues)
-
-    return retFuture