diff options
Diffstat (limited to 'lib/pure/asyncfutures.nim')
-rw-r--r-- | lib/pure/asyncfutures.nim | 234 |
1 files changed, 121 insertions, 113 deletions
diff --git a/lib/pure/asyncfutures.nim b/lib/pure/asyncfutures.nim index c1e2770bb..29ebf8f89 100644 --- a/lib/pure/asyncfutures.nim +++ b/lib/pure/asyncfutures.nim @@ -7,7 +7,13 @@ # distribution, for details about the copyright. # -import os, tables, strutils, times, heapqueue, options, deques, cstrutils +import std/[os, tables, strutils, times, heapqueue, options, deques, cstrutils, typetraits] + +import system/stacktraces + +when defined(nimPreviewSlimSystem): + import std/objectdollar # for StackTraceEntry + import std/assertions # TODO: This shouldn't need to be included, but should ideally be exported. type @@ -23,7 +29,7 @@ type finished: bool error*: ref Exception ## Stored exception errorStackTrace*: string - when not defined(release): + when not defined(release) or defined(futureLogging): stackTrace: seq[StackTraceEntry] ## For debugging purposes only. id: int fromProc: string @@ -45,7 +51,7 @@ const NimAsyncContinueSuffix* = "NimAsyncContinue" ## For internal usage. Do not use. when isFutureLoggingEnabled: - import hashes + import std/hashes type FutureInfo* = object stackTrace*: seq[StackTraceEntry] @@ -84,19 +90,19 @@ when isFutureLoggingEnabled: var callSoonProc {.threadvar.}: proc (cbproc: proc ()) {.gcsafe.} proc getCallSoonProc*(): (proc(cbproc: proc ()) {.gcsafe.}) = - ## Get current implementation of ``callSoon``. + ## Get current implementation of `callSoon`. return callSoonProc proc setCallSoonProc*(p: (proc(cbproc: proc ()) {.gcsafe.})) = - ## Change current implementation of ``callSoon``. This is normally called when dispatcher from ``asyncdispatcher`` is initialized. + ## Change current implementation of `callSoon`. This is normally called when dispatcher from `asyncdispatcher` is initialized. callSoonProc = p -proc callSoon*(cbproc: proc ()) = - ## Call ``cbproc`` "soon". +proc callSoon*(cbproc: proc () {.gcsafe.}) = + ## Call `cbproc` "soon". ## - ## If async dispatcher is running, ``cbproc`` will be executed during next dispatcher tick. + ## If async dispatcher is running, `cbproc` will be executed during next dispatcher tick. ## - ## If async dispatcher is not running, ``cbproc`` will be executed immediately. + ## If async dispatcher is not running, `cbproc` will be executed immediately. if callSoonProc.isNil: # Loop not initialized yet. Call the function directly to allow setup code to use futures. cbproc() @@ -115,29 +121,29 @@ template setupFutureBase(fromProc: string) = proc newFuture*[T](fromProc: string = "unspecified"): owned(Future[T]) = ## Creates a new future. ## - ## Specifying ``fromProc``, which is a string specifying the name of the proc + ## Specifying `fromProc`, which is a string specifying the name of the proc ## that this future belongs to, is a good habit as it helps with debugging. setupFutureBase(fromProc) when isFutureLoggingEnabled: logFutureStart(result) proc newFutureVar*[T](fromProc = "unspecified"): owned(FutureVar[T]) = - ## Create a new ``FutureVar``. This Future type is ideally suited for + ## Create a new `FutureVar`. This Future type is ideally suited for ## situations where you want to avoid unnecessary allocations of Futures. ## - ## Specifying ``fromProc``, which is a string specifying the name of the proc + ## Specifying `fromProc`, which is a string specifying the name of the proc ## that this future belongs to, is a good habit as it helps with debugging. let fo = newFuture[T](fromProc) result = typeof(result)(fo) when isFutureLoggingEnabled: logFutureStart(Future[T](result)) proc clean*[T](future: FutureVar[T]) = - ## Resets the ``finished`` status of ``future``. + ## Resets the `finished` status of `future`. Future[T](future).finished = false Future[T](future).error = nil proc checkFinished[T](future: Future[T]) = ## Checks whether `future` is finished. If it is then raises a - ## ``FutureError``. + ## `FutureError`. when not defined(release): if future.finished: var msg = "" @@ -157,33 +163,15 @@ proc checkFinished[T](future: Future[T]) = raise err proc call(callbacks: var CallbackList) = - when not defined(nimV2): - # strictly speaking a little code duplication here, but we strive - # to minimize regressions and I'm not sure I got the 'nimV2' logic - # right: - var current = callbacks - while true: - if not current.function.isNil: - callSoon(current.function) - - if current.next.isNil: - break - else: - current = current.next[] - else: - var currentFunc = unown callbacks.function - var currentNext = unown callbacks.next - - while true: - if not currentFunc.isNil: - callSoon(currentFunc) - - if currentNext.isNil: - break - else: - currentFunc = currentNext.function - currentNext = unown currentNext.next + var current = callbacks + while true: + if not current.function.isNil: + callSoon(current.function) + if current.next.isNil: + break + else: + current = current.next[] # callback will be called only once, let GC collect them now callbacks.next = nil callbacks.function = nil @@ -205,27 +193,25 @@ proc add(callbacks: var CallbackList, function: CallbackFunc) = last = last.next last.next = newCallback -proc complete*[T](future: Future[T], val: T) = - ## Completes ``future`` with value ``val``. +proc completeImpl[T, U](future: Future[T], val: sink U, isVoid: static bool) = #assert(not future.finished, "Future already finished, cannot finish twice.") checkFinished(future) assert(future.error == nil) - future.value = val + when not isVoid: + future.value = val future.finished = true future.callbacks.call() when isFutureLoggingEnabled: logFutureFinish(future) -proc complete*(future: Future[void]) = - ## Completes a void ``future``. - #assert(not future.finished, "Future already finished, cannot finish twice.") - checkFinished(future) - assert(future.error == nil) - future.finished = true - future.callbacks.call() - when isFutureLoggingEnabled: logFutureFinish(future) +proc complete*[T](future: Future[T], val: sink T) = + ## Completes `future` with value `val`. + completeImpl(future, val, false) + +proc complete*(future: Future[void], val = Future[void].default) = + completeImpl(future, (), true) proc complete*[T](future: FutureVar[T]) = - ## Completes a ``FutureVar``. + ## Completes a `FutureVar`. template fut: untyped = Future[T](future) checkFinished(fut) assert(fut.error == nil) @@ -233,8 +219,8 @@ proc complete*[T](future: FutureVar[T]) = fut.callbacks.call() when isFutureLoggingEnabled: logFutureFinish(Future[T](future)) -proc complete*[T](future: FutureVar[T], val: T) = - ## Completes a ``FutureVar`` with value ``val``. +proc complete*[T](future: FutureVar[T], val: sink T) = + ## Completes a `FutureVar` with value `val`. ## ## Any previously stored value will be overwritten. template fut: untyped = Future[T](future) @@ -243,10 +229,10 @@ proc complete*[T](future: FutureVar[T], val: T) = fut.finished = true fut.value = val fut.callbacks.call() - when isFutureLoggingEnabled: logFutureFinish(future) + when isFutureLoggingEnabled: logFutureFinish(fut) proc fail*[T](future: Future[T], error: ref Exception) = - ## Completes ``future`` with ``error``. + ## Completes `future` with `error`. #assert(not future.finished, "Future already finished, cannot finish twice.") checkFinished(future) future.finished = true @@ -263,7 +249,7 @@ proc clearCallbacks*(future: FutureBase) = proc addCallback*(future: FutureBase, cb: proc() {.closure, gcsafe.}) = ## Adds the callbacks proc to be called when the future completes. ## - ## If future has already completed then ``cb`` will be called immediately. + ## If future has already completed then `cb` will be called immediately. assert cb != nil if future.finished: callSoon(cb) @@ -274,7 +260,7 @@ proc addCallback*[T](future: Future[T], cb: proc (future: Future[T]) {.closure, gcsafe.}) = ## Adds the callbacks proc to be called when the future completes. ## - ## If future has already completed then ``cb`` will be called immediately. + ## If future has already completed then `cb` will be called immediately. future.addCallback( proc() = cb(future) @@ -283,9 +269,9 @@ proc addCallback*[T](future: Future[T], proc `callback=`*(future: FutureBase, cb: proc () {.closure, gcsafe.}) = ## Clears the list of callbacks and sets the callback proc to be called when the future completes. ## - ## If future has already completed then ``cb`` will be called immediately. + ## If future has already completed then `cb` will be called immediately. ## - ## It's recommended to use ``addCallback`` or ``then`` instead. + ## It's recommended to use `addCallback` or `then` instead. future.clearCallbacks future.addCallback cb @@ -293,56 +279,71 @@ proc `callback=`*[T](future: Future[T], cb: proc (future: Future[T]) {.closure, gcsafe.}) = ## Sets the callback proc to be called when the future completes. ## - ## If future has already completed then ``cb`` will be called immediately. + ## If future has already completed then `cb` will be called immediately. future.callback = proc () = cb(future) +template getFilenameProcname(entry: StackTraceEntry): (string, string) = + when compiles(entry.filenameStr) and compiles(entry.procnameStr): + # We can't rely on "entry.filename" and "entry.procname" still being valid + # cstring pointers, because the "string.data" buffers they pointed to might + # be already garbage collected (this entry being a non-shallow copy, + # "entry.filename" no longer points to "entry.filenameStr.data", but to the + # buffer of the original object). + (entry.filenameStr, entry.procnameStr) + else: + ($entry.filename, $entry.procname) + proc getHint(entry: StackTraceEntry): string = ## We try to provide some hints about stack trace entries that the user ## may not be familiar with, in particular calls inside the stdlib. + + let (filename, procname) = getFilenameProcname(entry) + result = "" - if entry.procname == cstring"processPendingCallbacks": - if cmpIgnoreStyle(entry.filename, "asyncdispatch.nim") == 0: + if procname == "processPendingCallbacks": + if cmpIgnoreStyle(filename, "asyncdispatch.nim") == 0: return "Executes pending callbacks" - elif entry.procname == cstring"poll": - if cmpIgnoreStyle(entry.filename, "asyncdispatch.nim") == 0: + elif procname == "poll": + if cmpIgnoreStyle(filename, "asyncdispatch.nim") == 0: return "Processes asynchronous completion events" - if entry.procname.endsWith(NimAsyncContinueSuffix): - if cmpIgnoreStyle(entry.filename, "asyncmacro.nim") == 0: + if procname.endsWith(NimAsyncContinueSuffix): + if cmpIgnoreStyle(filename, "asyncmacro.nim") == 0: return "Resumes an async procedure" -proc `$`*(entries: seq[StackTraceEntry]): string = +proc `$`*(stackTraceEntries: seq[StackTraceEntry]): string = + when defined(nimStackTraceOverride): + let entries = addDebuggingInfo(stackTraceEntries) + else: + let entries = stackTraceEntries + result = "" # Find longest filename & line number combo for alignment purposes. var longestLeft = 0 for entry in entries: - if entry.procname.isNil: continue + let (filename, procname) = getFilenameProcname(entry) + + if procname == "": continue - let left = $entry.filename & $entry.line - if left.len > longestLeft: - longestLeft = left.len + let leftLen = filename.len + len($entry.line) + if leftLen > longestLeft: + longestLeft = leftLen - var indent = 2 # Format the entries. for entry in entries: - if entry.procname.isNil: - if entry.line == -10: - result.add(spaces(indent) & "#[\n") - indent.inc(2) - else: - indent.dec(2) - result.add(spaces(indent) & "]#\n") - continue - - let left = "$#($#)" % [$entry.filename, $entry.line] - result.add((spaces(indent) & "$#$# $#\n") % [ + let (filename, procname) = getFilenameProcname(entry) + + if procname == "" and entry.line == reraisedFromBegin: + break + + let left = "$#($#)" % [filename, $entry.line] + result.add((spaces(2) & "$# $#\n") % [ left, - spaces(longestLeft - left.len + 2), - $entry.procname + procname ]) let hint = getHint(entry) if hint.len > 0: - result.add(spaces(indent+2) & "## " & hint & "\n") + result.add(spaces(4) & "## " & hint & "\n") proc injectStacktrace[T](future: Future[T]) = when not defined(release): @@ -362,66 +363,73 @@ proc injectStacktrace[T](future: Future[T]) = newMsg.add($entries) newMsg.add("Exception message: " & exceptionMsg & "\n") - newMsg.add("Exception type:") # # For debugging purposes + # newMsg.add("Exception type:") # for entry in getStackTraceEntries(future.error): # newMsg.add "\n" & $entry future.error.msg = newMsg -proc read*[T](future: Future[T] | FutureVar[T]): T = - ## Retrieves the value of ``future``. Future must be finished otherwise - ## this function will fail with a ``ValueError`` exception. - ## - ## If the result of the future is an error then that error will be raised. - {.push hint[ConvFromXtoItselfNotNeeded]: off.} - let fut = Future[T](future) - {.pop.} +template readImpl(future, T) = + when future is Future[T]: + let fut {.cursor.} = future + else: + let fut {.cursor.} = Future[T](future) if fut.finished: if fut.error != nil: injectStacktrace(fut) raise fut.error when T isnot void: - result = fut.value + result = distinctBase(future).value else: # TODO: Make a custom exception type for this? raise newException(ValueError, "Future still in progress.") +proc read*[T](future: Future[T] | FutureVar[T]): lent T = + ## Retrieves the value of `future`. Future must be finished otherwise + ## this function will fail with a `ValueError` exception. + ## + ## If the result of the future is an error then that error will be raised. + readImpl(future, T) + +proc read*(future: Future[void] | FutureVar[void]) = + readImpl(future, void) + proc readError*[T](future: Future[T]): ref Exception = - ## Retrieves the exception stored in ``future``. + ## Retrieves the exception stored in `future`. ## - ## An ``ValueError`` exception will be thrown if no exception exists + ## An `ValueError` exception will be thrown if no exception exists ## in the specified Future. if future.error != nil: return future.error else: raise newException(ValueError, "No error in future.") proc mget*[T](future: FutureVar[T]): var T = - ## Returns a mutable value stored in ``future``. + ## Returns a mutable value stored in `future`. ## - ## Unlike ``read``, this function will not raise an exception if the + ## Unlike `read`, this function will not raise an exception if the ## Future has not been finished. result = Future[T](future).value proc finished*(future: FutureBase | FutureVar): bool = - ## Determines whether ``future`` has completed. + ## Determines whether `future` has completed. ## - ## ``True`` may indicate an error or a value. Use ``failed`` to distinguish. + ## `True` may indicate an error or a value. Use `failed` to distinguish. when future is FutureVar: result = (FutureBase(future)).finished else: result = future.finished proc failed*(future: FutureBase): bool = - ## Determines whether ``future`` completed with an error. + ## Determines whether `future` completed with an error. return future.error != nil proc asyncCheck*[T](future: Future[T]) = - ## Sets a callback on ``future`` which raises an exception if the future + ## Sets a callback on `future` which raises an exception if the future ## finished with an error. ## - ## This should be used instead of ``discard`` to discard void futures, - ## or use ``waitFor`` if you need to wait for the future's completion. + ## This should be used instead of `discard` to discard void futures, + ## or use `waitFor` if you need to wait for the future's completion. assert(not future.isNil, "Future is nil") # TODO: We can likely look at the stack trace here and inject the location # where the `asyncCheck` was called to give a better error stack message. @@ -432,7 +440,7 @@ proc asyncCheck*[T](future: Future[T]) = future.callback = asyncCheckCallback proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] = - ## Returns a future which will complete once both ``fut1`` and ``fut2`` + ## Returns a future which will complete once both `fut1` and `fut2` ## complete. var retFuture = newFuture[void]("asyncdispatch.`and`") fut1.callback = @@ -448,7 +456,7 @@ proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] = return retFuture proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] = - ## Returns a future which will complete once either ``fut1`` or ``fut2`` + ## Returns a future which will complete once either `fut1` or `fut2` ## complete. var retFuture = newFuture[void]("asyncdispatch.`or`") proc cb[X](fut: Future[X]) = @@ -461,14 +469,14 @@ proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] = proc all*[T](futs: varargs[Future[T]]): auto = ## Returns a future which will complete once - ## all futures in ``futs`` complete. + ## all futures in `futs` complete. ## If the argument is empty, the returned future completes immediately. ## - ## If the awaited futures are not ``Future[void]``, the returned future + ## If the awaited futures are not `Future[void]`, the returned future ## will hold the values of all awaited futures in a sequence. ## - ## If the awaited futures *are* ``Future[void]``, - ## this proc returns ``Future[void]``. + ## If the awaited futures *are* `Future[void]`, + ## this proc returns `Future[void]`. when T is void: var |