summary refs log tree commit diff stats
path: root/lib/pure/asyncfutures.nim
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pure/asyncfutures.nim')
-rw-r--r--lib/pure/asyncfutures.nim527
1 files changed, 527 insertions, 0 deletions
diff --git a/lib/pure/asyncfutures.nim b/lib/pure/asyncfutures.nim
new file mode 100644
index 000000000..29ebf8f89
--- /dev/null
+++ b/lib/pure/asyncfutures.nim
@@ -0,0 +1,527 @@
+#
+#
+#            Nim's Runtime Library
+#        (c) Copyright 2015 Dominik Picheta
+#
+#    See the file "copying.txt", included in this
+#    distribution, for details about the copyright.
+#
+
+import std/[os, tables, strutils, times, heapqueue, options, deques, cstrutils, typetraits]
+
+import system/stacktraces
+
+when defined(nimPreviewSlimSystem):
+  import std/objectdollar # for StackTraceEntry
+  import std/assertions
+
+# TODO: This shouldn't need to be included, but should ideally be exported.
+type
+  CallbackFunc = proc () {.closure, gcsafe.}
+
+  CallbackList = object
+    function: CallbackFunc
+    next: owned(ref CallbackList)
+
+  FutureBase* = ref object of RootObj  ## Untyped future.
+    callbacks: CallbackList
+
+    finished: bool
+    error*: ref Exception              ## Stored exception
+    errorStackTrace*: string
+    when not defined(release) or defined(futureLogging):
+      stackTrace: seq[StackTraceEntry] ## For debugging purposes only.
+      id: int
+      fromProc: string
+
+  Future*[T] = ref object of FutureBase ## Typed future.
+    value: T                            ## Stored value
+
+  FutureVar*[T] = distinct Future[T]
+
+  FutureError* = object of Defect
+    cause*: FutureBase
+
+when not defined(release):
+  var currentID = 0
+
+const isFutureLoggingEnabled* = defined(futureLogging)
+
+const
+  NimAsyncContinueSuffix* = "NimAsyncContinue" ## For internal usage. Do not use.
+
+when isFutureLoggingEnabled:
+  import std/hashes
+  type
+    FutureInfo* = object
+      stackTrace*: seq[StackTraceEntry]
+      fromProc*: string
+
+  var futuresInProgress {.threadvar.}: Table[FutureInfo, int]
+
+  proc getFuturesInProgress*(): var Table[FutureInfo, int] =
+    return futuresInProgress
+
+  proc hash(s: StackTraceEntry): Hash =
+    result = hash(s.procname) !& hash(s.line) !&
+      hash(s.filename)
+    result = !$result
+
+  proc hash(fi: FutureInfo): Hash =
+    result = hash(fi.stackTrace) !& hash(fi.fromProc)
+    result = !$result
+
+  proc getFutureInfo(fut: FutureBase): FutureInfo =
+    let info = FutureInfo(
+      stackTrace: fut.stackTrace,
+      fromProc: fut.fromProc
+    )
+    return info
+
+  proc logFutureStart(fut: FutureBase) =
+    let info = getFutureInfo(fut)
+    if info notin getFuturesInProgress():
+      getFuturesInProgress()[info] = 0
+    getFuturesInProgress()[info].inc()
+
+  proc logFutureFinish(fut: FutureBase) =
+    getFuturesInProgress()[getFutureInfo(fut)].dec()
+
+var callSoonProc {.threadvar.}: proc (cbproc: proc ()) {.gcsafe.}
+
+proc getCallSoonProc*(): (proc(cbproc: proc ()) {.gcsafe.}) =
+  ## Get current implementation of `callSoon`.
+  return callSoonProc
+
+proc setCallSoonProc*(p: (proc(cbproc: proc ()) {.gcsafe.})) =
+  ## Change current implementation of `callSoon`. This is normally called when dispatcher from `asyncdispatcher` is initialized.
+  callSoonProc = p
+
+proc callSoon*(cbproc: proc () {.gcsafe.}) =
+  ## Call `cbproc` "soon".
+  ##
+  ## If async dispatcher is running, `cbproc` will be executed during next dispatcher tick.
+  ##
+  ## If async dispatcher is not running, `cbproc` will be executed immediately.
+  if callSoonProc.isNil:
+    # Loop not initialized yet. Call the function directly to allow setup code to use futures.
+    cbproc()
+  else:
+    callSoonProc(cbproc)
+
+template setupFutureBase(fromProc: string) =
+  new(result)
+  result.finished = false
+  when not defined(release):
+    result.stackTrace = getStackTraceEntries()
+    result.id = currentID
+    result.fromProc = fromProc
+    currentID.inc()
+
+proc newFuture*[T](fromProc: string = "unspecified"): owned(Future[T]) =
+  ## Creates a new future.
+  ##
+  ## Specifying `fromProc`, which is a string specifying the name of the proc
+  ## that this future belongs to, is a good habit as it helps with debugging.
+  setupFutureBase(fromProc)
+  when isFutureLoggingEnabled: logFutureStart(result)
+
+proc newFutureVar*[T](fromProc = "unspecified"): owned(FutureVar[T]) =
+  ## Create a new `FutureVar`. This Future type is ideally suited for
+  ## situations where you want to avoid unnecessary allocations of Futures.
+  ##
+  ## Specifying `fromProc`, which is a string specifying the name of the proc
+  ## that this future belongs to, is a good habit as it helps with debugging.
+  let fo = newFuture[T](fromProc)
+  result = typeof(result)(fo)
+  when isFutureLoggingEnabled: logFutureStart(Future[T](result))
+
+proc clean*[T](future: FutureVar[T]) =
+  ## Resets the `finished` status of `future`.
+  Future[T](future).finished = false
+  Future[T](future).error = nil
+
+proc checkFinished[T](future: Future[T]) =
+  ## Checks whether `future` is finished. If it is then raises a
+  ## `FutureError`.
+  when not defined(release):
+    if future.finished:
+      var msg = ""
+      msg.add("An attempt was made to complete a Future more than once. ")
+      msg.add("Details:")
+      msg.add("\n  Future ID: " & $future.id)
+      msg.add("\n  Created in proc: " & future.fromProc)
+      msg.add("\n  Stack trace to moment of creation:")
+      msg.add("\n" & indent(($future.stackTrace).strip(), 4))
+      when T is string:
+        msg.add("\n  Contents (string): ")
+        msg.add("\n" & indent($future.value, 4))
+      msg.add("\n  Stack trace to moment of secondary completion:")
+      msg.add("\n" & indent(getStackTrace().strip(), 4))
+      var err = newException(FutureError, msg)
+      err.cause = future
+      raise err
+
+proc call(callbacks: var CallbackList) =
+  var current = callbacks
+  while true:
+    if not current.function.isNil:
+      callSoon(current.function)
+
+    if current.next.isNil:
+      break
+    else:
+      current = current.next[]
+  # callback will be called only once, let GC collect them now
+  callbacks.next = nil
+  callbacks.function = nil
+
+proc add(callbacks: var CallbackList, function: CallbackFunc) =
+  if callbacks.function.isNil:
+    callbacks.function = function
+    assert callbacks.next == nil
+  else:
+    let newCallback = new(ref CallbackList)
+    newCallback.function = function
+    newCallback.next = nil
+
+    if callbacks.next == nil:
+      callbacks.next = newCallback
+    else:
+      var last = callbacks.next
+      while last.next != nil:
+        last = last.next
+      last.next = newCallback
+
+proc completeImpl[T, U](future: Future[T], val: sink U, isVoid: static bool) =
+  #assert(not future.finished, "Future already finished, cannot finish twice.")
+  checkFinished(future)
+  assert(future.error == nil)
+  when not isVoid:
+    future.value = val
+  future.finished = true
+  future.callbacks.call()
+  when isFutureLoggingEnabled: logFutureFinish(future)
+
+proc complete*[T](future: Future[T], val: sink T) =
+  ## Completes `future` with value `val`.
+  completeImpl(future, val, false)
+
+proc complete*(future: Future[void], val = Future[void].default) =
+  completeImpl(future, (), true)
+
+proc complete*[T](future: FutureVar[T]) =
+  ## Completes a `FutureVar`.
+  template fut: untyped = Future[T](future)
+  checkFinished(fut)
+  assert(fut.error == nil)
+  fut.finished = true
+  fut.callbacks.call()
+  when isFutureLoggingEnabled: logFutureFinish(Future[T](future))
+
+proc complete*[T](future: FutureVar[T], val: sink T) =
+  ## Completes a `FutureVar` with value `val`.
+  ##
+  ## Any previously stored value will be overwritten.
+  template fut: untyped = Future[T](future)
+  checkFinished(fut)
+  assert(fut.error.isNil())
+  fut.finished = true
+  fut.value = val
+  fut.callbacks.call()
+  when isFutureLoggingEnabled: logFutureFinish(fut)
+
+proc fail*[T](future: Future[T], error: ref Exception) =
+  ## Completes `future` with `error`.
+  #assert(not future.finished, "Future already finished, cannot finish twice.")
+  checkFinished(future)
+  future.finished = true
+  future.error = error
+  future.errorStackTrace =
+    if getStackTrace(error) == "": getStackTrace() else: getStackTrace(error)
+  future.callbacks.call()
+  when isFutureLoggingEnabled: logFutureFinish(future)
+
+proc clearCallbacks*(future: FutureBase) =
+  future.callbacks.function = nil
+  future.callbacks.next = nil
+
+proc addCallback*(future: FutureBase, cb: proc() {.closure, gcsafe.}) =
+  ## Adds the callbacks proc to be called when the future completes.
+  ##
+  ## If future has already completed then `cb` will be called immediately.
+  assert cb != nil
+  if future.finished:
+    callSoon(cb)
+  else:
+    future.callbacks.add cb
+
+proc addCallback*[T](future: Future[T],
+                     cb: proc (future: Future[T]) {.closure, gcsafe.}) =
+  ## Adds the callbacks proc to be called when the future completes.
+  ##
+  ## If future has already completed then `cb` will be called immediately.
+  future.addCallback(
+    proc() =
+    cb(future)
+  )
+
+proc `callback=`*(future: FutureBase, cb: proc () {.closure, gcsafe.}) =
+  ## Clears the list of callbacks and sets the callback proc to be called when the future completes.
+  ##
+  ## If future has already completed then `cb` will be called immediately.
+  ##
+  ## It's recommended to use `addCallback` or `then` instead.
+  future.clearCallbacks
+  future.addCallback cb
+
+proc `callback=`*[T](future: Future[T],
+    cb: proc (future: Future[T]) {.closure, gcsafe.}) =
+  ## Sets the callback proc to be called when the future completes.
+  ##
+  ## If future has already completed then `cb` will be called immediately.
+  future.callback = proc () = cb(future)
+
+template getFilenameProcname(entry: StackTraceEntry): (string, string) =
+  when compiles(entry.filenameStr) and compiles(entry.procnameStr):
+    # We can't rely on "entry.filename" and "entry.procname" still being valid
+    # cstring pointers, because the "string.data" buffers they pointed to might
+    # be already garbage collected (this entry being a non-shallow copy,
+    # "entry.filename" no longer points to "entry.filenameStr.data", but to the
+    # buffer of the original object).
+    (entry.filenameStr, entry.procnameStr)
+  else:
+    ($entry.filename, $entry.procname)
+
+proc getHint(entry: StackTraceEntry): string =
+  ## We try to provide some hints about stack trace entries that the user
+  ## may not be familiar with, in particular calls inside the stdlib.
+
+  let (filename, procname) = getFilenameProcname(entry)
+
+  result = ""
+  if procname == "processPendingCallbacks":
+    if cmpIgnoreStyle(filename, "asyncdispatch.nim") == 0:
+      return "Executes pending callbacks"
+  elif procname == "poll":
+    if cmpIgnoreStyle(filename, "asyncdispatch.nim") == 0:
+      return "Processes asynchronous completion events"
+
+  if procname.endsWith(NimAsyncContinueSuffix):
+    if cmpIgnoreStyle(filename, "asyncmacro.nim") == 0:
+      return "Resumes an async procedure"
+
+proc `$`*(stackTraceEntries: seq[StackTraceEntry]): string =
+  when defined(nimStackTraceOverride):
+    let entries = addDebuggingInfo(stackTraceEntries)
+  else:
+    let entries = stackTraceEntries
+
+  result = ""
+  # Find longest filename & line number combo for alignment purposes.
+  var longestLeft = 0
+  for entry in entries:
+    let (filename, procname) = getFilenameProcname(entry)
+
+    if procname == "": continue
+
+    let leftLen = filename.len + len($entry.line)
+    if leftLen > longestLeft:
+      longestLeft = leftLen
+
+  # Format the entries.
+  for entry in entries:
+    let (filename, procname) = getFilenameProcname(entry)
+
+    if procname == "" and entry.line == reraisedFromBegin:
+      break
+
+    let left = "$#($#)" % [filename, $entry.line]
+    result.add((spaces(2) & "$# $#\n") % [
+      left,
+      procname
+    ])
+    let hint = getHint(entry)
+    if hint.len > 0:
+      result.add(spaces(4) & "## " & hint & "\n")
+
+proc injectStacktrace[T](future: Future[T]) =
+  when not defined(release):
+    const header = "\nAsync traceback:\n"
+
+    var exceptionMsg = future.error.msg
+    if header in exceptionMsg:
+      # This is messy: extract the original exception message from the msg
+      # containing the async traceback.
+      let start = exceptionMsg.find(header)
+      exceptionMsg = exceptionMsg[0..<start]
+
+
+    var newMsg = exceptionMsg & header
+
+    let entries = getStackTraceEntries(future.error)
+    newMsg.add($entries)
+
+    newMsg.add("Exception message: " & exceptionMsg & "\n")
+
+    # # For debugging purposes
+    # newMsg.add("Exception type:")
+    # for entry in getStackTraceEntries(future.error):
+    #   newMsg.add "\n" & $entry
+    future.error.msg = newMsg
+
+template readImpl(future, T) =
+  when future is Future[T]:
+    let fut {.cursor.} = future
+  else:
+    let fut {.cursor.} = Future[T](future)
+  if fut.finished:
+    if fut.error != nil:
+      injectStacktrace(fut)
+      raise fut.error
+    when T isnot void:
+      result = distinctBase(future).value
+  else:
+    # TODO: Make a custom exception type for this?
+    raise newException(ValueError, "Future still in progress.")
+
+proc read*[T](future: Future[T] | FutureVar[T]): lent T =
+  ## Retrieves the value of `future`. Future must be finished otherwise
+  ## this function will fail with a `ValueError` exception.
+  ##
+  ## If the result of the future is an error then that error will be raised.
+  readImpl(future, T)
+
+proc read*(future: Future[void] | FutureVar[void]) =
+  readImpl(future, void)
+
+proc readError*[T](future: Future[T]): ref Exception =
+  ## Retrieves the exception stored in `future`.
+  ##
+  ## An `ValueError` exception will be thrown if no exception exists
+  ## in the specified Future.
+  if future.error != nil: return future.error
+  else:
+    raise newException(ValueError, "No error in future.")
+
+proc mget*[T](future: FutureVar[T]): var T =
+  ## Returns a mutable value stored in `future`.
+  ##
+  ## Unlike `read`, this function will not raise an exception if the
+  ## Future has not been finished.
+  result = Future[T](future).value
+
+proc finished*(future: FutureBase | FutureVar): bool =
+  ## Determines whether `future` has completed.
+  ##
+  ## `True` may indicate an error or a value. Use `failed` to distinguish.
+  when future is FutureVar:
+    result = (FutureBase(future)).finished
+  else:
+    result = future.finished
+
+proc failed*(future: FutureBase): bool =
+  ## Determines whether `future` completed with an error.
+  return future.error != nil
+
+proc asyncCheck*[T](future: Future[T]) =
+  ## Sets a callback on `future` which raises an exception if the future
+  ## finished with an error.
+  ##
+  ## This should be used instead of `discard` to discard void futures,
+  ## or use `waitFor` if you need to wait for the future's completion.
+  assert(not future.isNil, "Future is nil")
+  # TODO: We can likely look at the stack trace here and inject the location
+  # where the `asyncCheck` was called to give a better error stack message.
+  proc asyncCheckCallback() =
+    if future.failed:
+      injectStacktrace(future)
+      raise future.error
+  future.callback = asyncCheckCallback
+
+proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
+  ## Returns a future which will complete once both `fut1` and `fut2`
+  ## complete.
+  var retFuture = newFuture[void]("asyncdispatch.`and`")
+  fut1.callback =
+    proc () =
+      if not retFuture.finished:
+        if fut1.failed: retFuture.fail(fut1.error)
+        elif fut2.finished: retFuture.complete()
+  fut2.callback =
+    proc () =
+      if not retFuture.finished:
+        if fut2.failed: retFuture.fail(fut2.error)
+        elif fut1.finished: retFuture.complete()
+  return retFuture
+
+proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
+  ## Returns a future which will complete once either `fut1` or `fut2`
+  ## complete.
+  var retFuture = newFuture[void]("asyncdispatch.`or`")
+  proc cb[X](fut: Future[X]) =
+    if not retFuture.finished:
+      if fut.failed: retFuture.fail(fut.error)
+      else: retFuture.complete()
+  fut1.callback = cb[T]
+  fut2.callback = cb[Y]
+  return retFuture
+
+proc all*[T](futs: varargs[Future[T]]): auto =
+  ## Returns a future which will complete once
+  ## all futures in `futs` complete.
+  ## If the argument is empty, the returned future completes immediately.
+  ##
+  ## If the awaited futures are not `Future[void]`, the returned future
+  ## will hold the values of all awaited futures in a sequence.
+  ##
+  ## If the awaited futures *are* `Future[void]`,
+  ## this proc returns `Future[void]`.
+
+  when T is void:
+    var
+      retFuture = newFuture[void]("asyncdispatch.all")
+      completedFutures = 0
+
+    let totalFutures = len(futs)
+
+    for fut in futs:
+      fut.addCallback proc (f: Future[T]) =
+        inc(completedFutures)
+        if not retFuture.finished:
+          if f.failed:
+            retFuture.fail(f.error)
+          else:
+            if completedFutures == totalFutures:
+              retFuture.complete()
+
+    if totalFutures == 0:
+      retFuture.complete()
+
+    return retFuture
+
+  else:
+    var
+      retFuture = newFuture[seq[T]]("asyncdispatch.all")
+      retValues = newSeq[T](len(futs))
+      completedFutures = 0
+
+    for i, fut in futs:
+      proc setCallback(i: int) =
+        fut.addCallback proc (f: Future[T]) =
+          inc(completedFutures)
+          if not retFuture.finished:
+            if f.failed:
+              retFuture.fail(f.error)
+            else:
+              retValues[i] = f.read()
+
+              if completedFutures == len(retValues):
+                retFuture.complete(retValues)
+
+      setCallback(i)
+
+    if retValues.len == 0:
+      retFuture.complete(retValues)
+
+    return retFuture