summary refs log tree commit diff stats
path: root/lib/pure/asyncfutures.nim
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pure/asyncfutures.nim')
-rw-r--r--lib/pure/asyncfutures.nim321
1 files changed, 207 insertions, 114 deletions
diff --git a/lib/pure/asyncfutures.nim b/lib/pure/asyncfutures.nim
index 863a6843b..29ebf8f89 100644
--- a/lib/pure/asyncfutures.nim
+++ b/lib/pure/asyncfutures.nim
@@ -1,4 +1,19 @@
-import os, tables, strutils, times, heapqueue, options, deques, cstrutils
+#
+#
+#            Nim's Runtime Library
+#        (c) Copyright 2015 Dominik Picheta
+#
+#    See the file "copying.txt", included in this
+#    distribution, for details about the copyright.
+#
+
+import std/[os, tables, strutils, times, heapqueue, options, deques, cstrutils, typetraits]
+
+import system/stacktraces
+
+when defined(nimPreviewSlimSystem):
+  import std/objectdollar # for StackTraceEntry
+  import std/assertions
 
 # TODO: This shouldn't need to be included, but should ideally be exported.
 type
@@ -6,46 +21,88 @@ type
 
   CallbackList = object
     function: CallbackFunc
-    next: ref CallbackList
+    next: owned(ref CallbackList)
 
-  FutureBase* = ref object of RootObj ## Untyped future.
+  FutureBase* = ref object of RootObj  ## Untyped future.
     callbacks: CallbackList
 
     finished: bool
-    error*: ref Exception ## Stored exception
+    error*: ref Exception              ## Stored exception
     errorStackTrace*: string
-    when not defined(release):
-      stackTrace: string ## For debugging purposes only.
+    when not defined(release) or defined(futureLogging):
+      stackTrace: seq[StackTraceEntry] ## For debugging purposes only.
       id: int
       fromProc: string
 
   Future*[T] = ref object of FutureBase ## Typed future.
-    value: T ## Stored value
+    value: T                            ## Stored value
 
   FutureVar*[T] = distinct Future[T]
 
-  FutureError* = object of Exception
+  FutureError* = object of Defect
     cause*: FutureBase
 
 when not defined(release):
   var currentID = 0
 
+const isFutureLoggingEnabled* = defined(futureLogging)
+
+const
+  NimAsyncContinueSuffix* = "NimAsyncContinue" ## For internal usage. Do not use.
+
+when isFutureLoggingEnabled:
+  import std/hashes
+  type
+    FutureInfo* = object
+      stackTrace*: seq[StackTraceEntry]
+      fromProc*: string
+
+  var futuresInProgress {.threadvar.}: Table[FutureInfo, int]
+
+  proc getFuturesInProgress*(): var Table[FutureInfo, int] =
+    return futuresInProgress
+
+  proc hash(s: StackTraceEntry): Hash =
+    result = hash(s.procname) !& hash(s.line) !&
+      hash(s.filename)
+    result = !$result
+
+  proc hash(fi: FutureInfo): Hash =
+    result = hash(fi.stackTrace) !& hash(fi.fromProc)
+    result = !$result
+
+  proc getFutureInfo(fut: FutureBase): FutureInfo =
+    let info = FutureInfo(
+      stackTrace: fut.stackTrace,
+      fromProc: fut.fromProc
+    )
+    return info
+
+  proc logFutureStart(fut: FutureBase) =
+    let info = getFutureInfo(fut)
+    if info notin getFuturesInProgress():
+      getFuturesInProgress()[info] = 0
+    getFuturesInProgress()[info].inc()
+
+  proc logFutureFinish(fut: FutureBase) =
+    getFuturesInProgress()[getFutureInfo(fut)].dec()
+
 var callSoonProc {.threadvar.}: proc (cbproc: proc ()) {.gcsafe.}
 
 proc getCallSoonProc*(): (proc(cbproc: proc ()) {.gcsafe.}) =
-  ## Get current implementation of ``callSoon``.
+  ## Get current implementation of `callSoon`.
   return callSoonProc
 
 proc setCallSoonProc*(p: (proc(cbproc: proc ()) {.gcsafe.})) =
-  ## Change current implementation of ``callSoon``. This is normally called when dispatcher from ``asyncdispatcher`` is initialized.
+  ## Change current implementation of `callSoon`. This is normally called when dispatcher from `asyncdispatcher` is initialized.
   callSoonProc = p
 
-proc callSoon*(cbproc: proc ()) =
-  ## Call ``cbproc`` "soon".
+proc callSoon*(cbproc: proc () {.gcsafe.}) =
+  ## Call `cbproc` "soon".
   ##
-  ## If async dispatcher is running, ``cbproc`` will be executed during next dispatcher tick.
+  ## If async dispatcher is running, `cbproc` will be executed during next dispatcher tick.
   ##
-  ## If async dispatcher is not running, ``cbproc`` will be executed immediately.
+  ## If async dispatcher is not running, `cbproc` will be executed immediately.
   if callSoonProc.isNil:
     # Loop not initialized yet. Call the function directly to allow setup code to use futures.
     cbproc()
@@ -56,34 +113,37 @@ template setupFutureBase(fromProc: string) =
   new(result)
   result.finished = false
   when not defined(release):
-    result.stackTrace = getStackTrace()
+    result.stackTrace = getStackTraceEntries()
     result.id = currentID
     result.fromProc = fromProc
     currentID.inc()
 
-proc newFuture*[T](fromProc: string = "unspecified"): Future[T] =
+proc newFuture*[T](fromProc: string = "unspecified"): owned(Future[T]) =
   ## Creates a new future.
   ##
-  ## Specifying ``fromProc``, which is a string specifying the name of the proc
+  ## Specifying `fromProc`, which is a string specifying the name of the proc
   ## that this future belongs to, is a good habit as it helps with debugging.
   setupFutureBase(fromProc)
+  when isFutureLoggingEnabled: logFutureStart(result)
 
-proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] =
-  ## Create a new ``FutureVar``. This Future type is ideally suited for
+proc newFutureVar*[T](fromProc = "unspecified"): owned(FutureVar[T]) =
+  ## Create a new `FutureVar`. This Future type is ideally suited for
   ## situations where you want to avoid unnecessary allocations of Futures.
   ##
-  ## Specifying ``fromProc``, which is a string specifying the name of the proc
+  ## Specifying `fromProc`, which is a string specifying the name of the proc
   ## that this future belongs to, is a good habit as it helps with debugging.
-  result = FutureVar[T](newFuture[T](fromProc))
+  let fo = newFuture[T](fromProc)
+  result = typeof(result)(fo)
+  when isFutureLoggingEnabled: logFutureStart(Future[T](result))
 
 proc clean*[T](future: FutureVar[T]) =
-  ## Resets the ``finished`` status of ``future``.
+  ## Resets the `finished` status of `future`.
   Future[T](future).finished = false
   Future[T](future).error = nil
 
 proc checkFinished[T](future: Future[T]) =
   ## Checks whether `future` is finished. If it is then raises a
-  ## ``FutureError``.
+  ## `FutureError`.
   when not defined(release):
     if future.finished:
       var msg = ""
@@ -92,10 +152,10 @@ proc checkFinished[T](future: Future[T]) =
       msg.add("\n  Future ID: " & $future.id)
       msg.add("\n  Created in proc: " & future.fromProc)
       msg.add("\n  Stack trace to moment of creation:")
-      msg.add("\n" & indent(future.stackTrace.strip(), 4))
+      msg.add("\n" & indent(($future.stackTrace).strip(), 4))
       when T is string:
         msg.add("\n  Contents (string): ")
-        msg.add("\n" & indent(future.value.repr, 4))
+        msg.add("\n" & indent($future.value, 4))
       msg.add("\n  Stack trace to moment of secondary completion:")
       msg.add("\n" & indent(getStackTrace().strip(), 4))
       var err = newException(FutureError, msg)
@@ -104,7 +164,6 @@ proc checkFinished[T](future: Future[T]) =
 
 proc call(callbacks: var CallbackList) =
   var current = callbacks
-
   while true:
     if not current.function.isNil:
       callSoon(current.function)
@@ -113,7 +172,6 @@ proc call(callbacks: var CallbackList) =
       break
     else:
       current = current.next[]
-
   # callback will be called only once, let GC collect them now
   callbacks.next = nil
   callbacks.function = nil
@@ -123,39 +181,46 @@ proc add(callbacks: var CallbackList, function: CallbackFunc) =
     callbacks.function = function
     assert callbacks.next == nil
   else:
-    let newNext = new(ref CallbackList)
-    newNext.function = callbacks.function
-    newNext.next = callbacks.next
-    callbacks.next = newNext
-    callbacks.function = function
+    let newCallback = new(ref CallbackList)
+    newCallback.function = function
+    newCallback.next = nil
 
-proc complete*[T](future: Future[T], val: T) =
-  ## Completes ``future`` with value ``val``.
-  #assert(not future.finished, "Future already finished, cannot finish twice.")
-  checkFinished(future)
-  assert(future.error == nil)
-  future.value = val
-  future.finished = true
-  future.callbacks.call()
+    if callbacks.next == nil:
+      callbacks.next = newCallback
+    else:
+      var last = callbacks.next
+      while last.next != nil:
+        last = last.next
+      last.next = newCallback
 
-proc complete*(future: Future[void]) =
-  ## Completes a void ``future``.
+proc completeImpl[T, U](future: Future[T], val: sink U, isVoid: static bool) =
   #assert(not future.finished, "Future already finished, cannot finish twice.")
   checkFinished(future)
   assert(future.error == nil)
+  when not isVoid:
+    future.value = val
   future.finished = true
   future.callbacks.call()
+  when isFutureLoggingEnabled: logFutureFinish(future)
+
+proc complete*[T](future: Future[T], val: sink T) =
+  ## Completes `future` with value `val`.
+  completeImpl(future, val, false)
+
+proc complete*(future: Future[void], val = Future[void].default) =
+  completeImpl(future, (), true)
 
 proc complete*[T](future: FutureVar[T]) =
-  ## Completes a ``FutureVar``.
+  ## Completes a `FutureVar`.
   template fut: untyped = Future[T](future)
   checkFinished(fut)
   assert(fut.error == nil)
   fut.finished = true
   fut.callbacks.call()
+  when isFutureLoggingEnabled: logFutureFinish(Future[T](future))
 
-proc complete*[T](future: FutureVar[T], val: T) =
-  ## Completes a ``FutureVar`` with value ``val``.
+proc complete*[T](future: FutureVar[T], val: sink T) =
+  ## Completes a `FutureVar` with value `val`.
   ##
   ## Any previously stored value will be overwritten.
   template fut: untyped = Future[T](future)
@@ -164,9 +229,10 @@ proc complete*[T](future: FutureVar[T], val: T) =
   fut.finished = true
   fut.value = val
   fut.callbacks.call()
+  when isFutureLoggingEnabled: logFutureFinish(fut)
 
 proc fail*[T](future: Future[T], error: ref Exception) =
-  ## Completes ``future`` with ``error``.
+  ## Completes `future` with `error`.
   #assert(not future.finished, "Future already finished, cannot finish twice.")
   checkFinished(future)
   future.finished = true
@@ -174,15 +240,16 @@ proc fail*[T](future: Future[T], error: ref Exception) =
   future.errorStackTrace =
     if getStackTrace(error) == "": getStackTrace() else: getStackTrace(error)
   future.callbacks.call()
+  when isFutureLoggingEnabled: logFutureFinish(future)
 
 proc clearCallbacks*(future: FutureBase) =
   future.callbacks.function = nil
   future.callbacks.next = nil
 
-proc addCallback*(future: FutureBase, cb: proc() {.closure,gcsafe.}) =
+proc addCallback*(future: FutureBase, cb: proc() {.closure, gcsafe.}) =
   ## Adds the callbacks proc to be called when the future completes.
   ##
-  ## If future has already completed then ``cb`` will be called immediately.
+  ## If future has already completed then `cb` will be called immediately.
   assert cb != nil
   if future.finished:
     callSoon(cb)
@@ -190,78 +257,93 @@ proc addCallback*(future: FutureBase, cb: proc() {.closure,gcsafe.}) =
     future.callbacks.add cb
 
 proc addCallback*[T](future: Future[T],
-                     cb: proc (future: Future[T]) {.closure,gcsafe.}) =
+                     cb: proc (future: Future[T]) {.closure, gcsafe.}) =
   ## Adds the callbacks proc to be called when the future completes.
   ##
-  ## If future has already completed then ``cb`` will be called immediately.
+  ## If future has already completed then `cb` will be called immediately.
   future.addCallback(
     proc() =
-      cb(future)
+    cb(future)
   )
 
-proc `callback=`*(future: FutureBase, cb: proc () {.closure,gcsafe.}) =
+proc `callback=`*(future: FutureBase, cb: proc () {.closure, gcsafe.}) =
   ## Clears the list of callbacks and sets the callback proc to be called when the future completes.
   ##
-  ## If future has already completed then ``cb`` will be called immediately.
+  ## If future has already completed then `cb` will be called immediately.
   ##
-  ## It's recommended to use ``addCallback`` or ``then`` instead.
+  ## It's recommended to use `addCallback` or `then` instead.
   future.clearCallbacks
   future.addCallback cb
 
 proc `callback=`*[T](future: Future[T],
-    cb: proc (future: Future[T]) {.closure,gcsafe.}) =
+    cb: proc (future: Future[T]) {.closure, gcsafe.}) =
   ## Sets the callback proc to be called when the future completes.
   ##
-  ## If future has already completed then ``cb`` will be called immediately.
+  ## If future has already completed then `cb` will be called immediately.
   future.callback = proc () = cb(future)
 
+template getFilenameProcname(entry: StackTraceEntry): (string, string) =
+  when compiles(entry.filenameStr) and compiles(entry.procnameStr):
+    # We can't rely on "entry.filename" and "entry.procname" still being valid
+    # cstring pointers, because the "string.data" buffers they pointed to might
+    # be already garbage collected (this entry being a non-shallow copy,
+    # "entry.filename" no longer points to "entry.filenameStr.data", but to the
+    # buffer of the original object).
+    (entry.filenameStr, entry.procnameStr)
+  else:
+    ($entry.filename, $entry.procname)
+
 proc getHint(entry: StackTraceEntry): string =
   ## We try to provide some hints about stack trace entries that the user
   ## may not be familiar with, in particular calls inside the stdlib.
+
+  let (filename, procname) = getFilenameProcname(entry)
+
   result = ""
-  if entry.procname == "processPendingCallbacks":
-    if cmpIgnoreStyle(entry.filename, "asyncdispatch.nim") == 0:
+  if procname == "processPendingCallbacks":
+    if cmpIgnoreStyle(filename, "asyncdispatch.nim") == 0:
       return "Executes pending callbacks"
-  elif entry.procname == "poll":
-    if cmpIgnoreStyle(entry.filename, "asyncdispatch.nim") == 0:
+  elif procname == "poll":
+    if cmpIgnoreStyle(filename, "asyncdispatch.nim") == 0:
       return "Processes asynchronous completion events"
 
-  if entry.procname.endsWith("_continue"):
-    if cmpIgnoreStyle(entry.filename, "asyncmacro.nim") == 0:
+  if procname.endsWith(NimAsyncContinueSuffix):
+    if cmpIgnoreStyle(filename, "asyncmacro.nim") == 0:
       return "Resumes an async procedure"
 
-proc `$`*(entries: seq[StackTraceEntry]): string =
+proc `$`*(stackTraceEntries: seq[StackTraceEntry]): string =
+  when defined(nimStackTraceOverride):
+    let entries = addDebuggingInfo(stackTraceEntries)
+  else:
+    let entries = stackTraceEntries
+
   result = ""
   # Find longest filename & line number combo for alignment purposes.
   var longestLeft = 0
   for entry in entries:
-    if entry.procName.isNil: continue
+    let (filename, procname) = getFilenameProcname(entry)
 
-    let left = $entry.filename & $entry.line
-    if left.len > longestLeft:
-      longestLeft = left.len
+    if procname == "": continue
+
+    let leftLen = filename.len + len($entry.line)
+    if leftLen > longestLeft:
+      longestLeft = leftLen
 
-  var indent = 2
   # Format the entries.
   for entry in entries:
-    if entry.procName.isNil:
-      if entry.line == -10:
-        result.add(spaces(indent) & "#[\n")
-        indent.inc(2)
-      else:
-        indent.dec(2)
-        result.add(spaces(indent)& "]#\n")
-      continue
-
-    let left = "$#($#)" % [$entry.filename, $entry.line]
-    result.add((spaces(indent) & "$#$# $#\n") % [
+    let (filename, procname) = getFilenameProcname(entry)
+
+    if procname == "" and entry.line == reraisedFromBegin:
+      break
+
+    let left = "$#($#)" % [filename, $entry.line]
+    result.add((spaces(2) & "$# $#\n") % [
       left,
-      spaces(longestLeft - left.len + 2),
-      $entry.procName
+      procname
     ])
     let hint = getHint(entry)
     if hint.len > 0:
-      result.add(spaces(indent+2) & "## " & hint & "\n")
+      result.add(spaces(4) & "## " & hint & "\n")
 
 proc injectStacktrace[T](future: Future[T]) =
   when not defined(release):
@@ -281,74 +363,84 @@ proc injectStacktrace[T](future: Future[T]) =
     newMsg.add($entries)
 
     newMsg.add("Exception message: " & exceptionMsg & "\n")
-    newMsg.add("Exception type:")
 
     # # For debugging purposes
+    # newMsg.add("Exception type:")
     # for entry in getStackTraceEntries(future.error):
     #   newMsg.add "\n" & $entry
     future.error.msg = newMsg
 
-proc read*[T](future: Future[T] | FutureVar[T]): T =
-  ## Retrieves the value of ``future``. Future must be finished otherwise
-  ## this function will fail with a ``ValueError`` exception.
-  ##
-  ## If the result of the future is an error then that error will be raised.
-  {.push hint[ConvFromXtoItselfNotNeeded]: off.}
-  let fut = Future[T](future)
-  {.pop.}
+template readImpl(future, T) =
+  when future is Future[T]:
+    let fut {.cursor.} = future
+  else:
+    let fut {.cursor.} = Future[T](future)
   if fut.finished:
     if fut.error != nil:
       injectStacktrace(fut)
       raise fut.error
     when T isnot void:
-      return fut.value
+      result = distinctBase(future).value
   else:
     # TODO: Make a custom exception type for this?
     raise newException(ValueError, "Future still in progress.")
 
+proc read*[T](future: Future[T] | FutureVar[T]): lent T =
+  ## Retrieves the value of `future`. Future must be finished otherwise
+  ## this function will fail with a `ValueError` exception.
+  ##
+  ## If the result of the future is an error then that error will be raised.
+  readImpl(future, T)
+
+proc read*(future: Future[void] | FutureVar[void]) =
+  readImpl(future, void)
+
 proc readError*[T](future: Future[T]): ref Exception =
-  ## Retrieves the exception stored in ``future``.
+  ## Retrieves the exception stored in `future`.
   ##
-  ## An ``ValueError`` exception will be thrown if no exception exists
+  ## An `ValueError` exception will be thrown if no exception exists
   ## in the specified Future.
   if future.error != nil: return future.error
   else:
     raise newException(ValueError, "No error in future.")
 
 proc mget*[T](future: FutureVar[T]): var T =
-  ## Returns a mutable value stored in ``future``.
+  ## Returns a mutable value stored in `future`.
   ##
-  ## Unlike ``read``, this function will not raise an exception if the
+  ## Unlike `read`, this function will not raise an exception if the
   ## Future has not been finished.
   result = Future[T](future).value
 
 proc finished*(future: FutureBase | FutureVar): bool =
-  ## Determines whether ``future`` has completed.
+  ## Determines whether `future` has completed.
   ##
-  ## ``True`` may indicate an error or a value. Use ``failed`` to distinguish.
+  ## `True` may indicate an error or a value. Use `failed` to distinguish.
   when future is FutureVar:
     result = (FutureBase(future)).finished
   else:
     result = future.finished
 
 proc failed*(future: FutureBase): bool =
-  ## Determines whether ``future`` completed with an error.
+  ## Determines whether `future` completed with an error.
   return future.error != nil
 
 proc asyncCheck*[T](future: Future[T]) =
-  ## Sets a callback on ``future`` which raises an exception if the future
+  ## Sets a callback on `future` which raises an exception if the future
   ## finished with an error.
   ##
-  ## This should be used instead of ``discard`` to discard void futures.
+  ## This should be used instead of `discard` to discard void futures,
+  ## or use `waitFor` if you need to wait for the future's completion.
   assert(not future.isNil, "Future is nil")
-  future.callback =
-    proc () =
-      if future.failed:
-        injectStacktrace(future)
-        raise future.error
+  # TODO: We can likely look at the stack trace here and inject the location
+  # where the `asyncCheck` was called to give a better error stack message.
+  proc asyncCheckCallback() =
+    if future.failed:
+      injectStacktrace(future)
+      raise future.error
+  future.callback = asyncCheckCallback
 
 proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
-  ## Returns a future which will complete once both ``fut1`` and ``fut2``
+  ## Returns a future which will complete once both `fut1` and `fut2`
   ## complete.
   var retFuture = newFuture[void]("asyncdispatch.`and`")
   fut1.callback =
@@ -364,26 +456,27 @@ proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
   return retFuture
 
 proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
-  ## Returns a future which will complete once either ``fut1`` or ``fut2``
+  ## Returns a future which will complete once either `fut1` or `fut2`
   ## complete.
   var retFuture = newFuture[void]("asyncdispatch.`or`")
   proc cb[X](fut: Future[X]) =
-    if fut.failed: retFuture.fail(fut.error)
-    if not retFuture.finished: retFuture.complete()
+    if not retFuture.finished:
+      if fut.failed: retFuture.fail(fut.error)
+      else: retFuture.complete()
   fut1.callback = cb[T]
   fut2.callback = cb[Y]
   return retFuture
 
 proc all*[T](futs: varargs[Future[T]]): auto =
   ## Returns a future which will complete once
-  ## all futures in ``futs`` complete.
+  ## all futures in `futs` complete.
   ## If the argument is empty, the returned future completes immediately.
   ##
-  ## If the awaited futures are not ``Future[void]``, the returned future
+  ## If the awaited futures are not `Future[void]`, the returned future
   ## will hold the values of all awaited futures in a sequence.
   ##
-  ## If the awaited futures *are* ``Future[void]``,
-  ## this proc returns ``Future[void]``.
+  ## If the awaited futures *are* `Future[void]`,
+  ## this proc returns `Future[void]`.
 
   when T is void:
     var