diff options
Diffstat (limited to 'lib/pure/asyncfutures.nim')
-rw-r--r-- | lib/pure/asyncfutures.nim | 321 |
1 files changed, 207 insertions, 114 deletions
diff --git a/lib/pure/asyncfutures.nim b/lib/pure/asyncfutures.nim index 863a6843b..29ebf8f89 100644 --- a/lib/pure/asyncfutures.nim +++ b/lib/pure/asyncfutures.nim @@ -1,4 +1,19 @@ -import os, tables, strutils, times, heapqueue, options, deques, cstrutils +# +# +# Nim's Runtime Library +# (c) Copyright 2015 Dominik Picheta +# +# See the file "copying.txt", included in this +# distribution, for details about the copyright. +# + +import std/[os, tables, strutils, times, heapqueue, options, deques, cstrutils, typetraits] + +import system/stacktraces + +when defined(nimPreviewSlimSystem): + import std/objectdollar # for StackTraceEntry + import std/assertions # TODO: This shouldn't need to be included, but should ideally be exported. type @@ -6,46 +21,88 @@ type CallbackList = object function: CallbackFunc - next: ref CallbackList + next: owned(ref CallbackList) - FutureBase* = ref object of RootObj ## Untyped future. + FutureBase* = ref object of RootObj ## Untyped future. callbacks: CallbackList finished: bool - error*: ref Exception ## Stored exception + error*: ref Exception ## Stored exception errorStackTrace*: string - when not defined(release): - stackTrace: string ## For debugging purposes only. + when not defined(release) or defined(futureLogging): + stackTrace: seq[StackTraceEntry] ## For debugging purposes only. id: int fromProc: string Future*[T] = ref object of FutureBase ## Typed future. - value: T ## Stored value + value: T ## Stored value FutureVar*[T] = distinct Future[T] - FutureError* = object of Exception + FutureError* = object of Defect cause*: FutureBase when not defined(release): var currentID = 0 +const isFutureLoggingEnabled* = defined(futureLogging) + +const + NimAsyncContinueSuffix* = "NimAsyncContinue" ## For internal usage. Do not use. + +when isFutureLoggingEnabled: + import std/hashes + type + FutureInfo* = object + stackTrace*: seq[StackTraceEntry] + fromProc*: string + + var futuresInProgress {.threadvar.}: Table[FutureInfo, int] + + proc getFuturesInProgress*(): var Table[FutureInfo, int] = + return futuresInProgress + + proc hash(s: StackTraceEntry): Hash = + result = hash(s.procname) !& hash(s.line) !& + hash(s.filename) + result = !$result + + proc hash(fi: FutureInfo): Hash = + result = hash(fi.stackTrace) !& hash(fi.fromProc) + result = !$result + + proc getFutureInfo(fut: FutureBase): FutureInfo = + let info = FutureInfo( + stackTrace: fut.stackTrace, + fromProc: fut.fromProc + ) + return info + + proc logFutureStart(fut: FutureBase) = + let info = getFutureInfo(fut) + if info notin getFuturesInProgress(): + getFuturesInProgress()[info] = 0 + getFuturesInProgress()[info].inc() + + proc logFutureFinish(fut: FutureBase) = + getFuturesInProgress()[getFutureInfo(fut)].dec() + var callSoonProc {.threadvar.}: proc (cbproc: proc ()) {.gcsafe.} proc getCallSoonProc*(): (proc(cbproc: proc ()) {.gcsafe.}) = - ## Get current implementation of ``callSoon``. + ## Get current implementation of `callSoon`. return callSoonProc proc setCallSoonProc*(p: (proc(cbproc: proc ()) {.gcsafe.})) = - ## Change current implementation of ``callSoon``. This is normally called when dispatcher from ``asyncdispatcher`` is initialized. + ## Change current implementation of `callSoon`. This is normally called when dispatcher from `asyncdispatcher` is initialized. callSoonProc = p -proc callSoon*(cbproc: proc ()) = - ## Call ``cbproc`` "soon". +proc callSoon*(cbproc: proc () {.gcsafe.}) = + ## Call `cbproc` "soon". ## - ## If async dispatcher is running, ``cbproc`` will be executed during next dispatcher tick. + ## If async dispatcher is running, `cbproc` will be executed during next dispatcher tick. ## - ## If async dispatcher is not running, ``cbproc`` will be executed immediately. + ## If async dispatcher is not running, `cbproc` will be executed immediately. if callSoonProc.isNil: # Loop not initialized yet. Call the function directly to allow setup code to use futures. cbproc() @@ -56,34 +113,37 @@ template setupFutureBase(fromProc: string) = new(result) result.finished = false when not defined(release): - result.stackTrace = getStackTrace() + result.stackTrace = getStackTraceEntries() result.id = currentID result.fromProc = fromProc currentID.inc() -proc newFuture*[T](fromProc: string = "unspecified"): Future[T] = +proc newFuture*[T](fromProc: string = "unspecified"): owned(Future[T]) = ## Creates a new future. ## - ## Specifying ``fromProc``, which is a string specifying the name of the proc + ## Specifying `fromProc`, which is a string specifying the name of the proc ## that this future belongs to, is a good habit as it helps with debugging. setupFutureBase(fromProc) + when isFutureLoggingEnabled: logFutureStart(result) -proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] = - ## Create a new ``FutureVar``. This Future type is ideally suited for +proc newFutureVar*[T](fromProc = "unspecified"): owned(FutureVar[T]) = + ## Create a new `FutureVar`. This Future type is ideally suited for ## situations where you want to avoid unnecessary allocations of Futures. ## - ## Specifying ``fromProc``, which is a string specifying the name of the proc + ## Specifying `fromProc`, which is a string specifying the name of the proc ## that this future belongs to, is a good habit as it helps with debugging. - result = FutureVar[T](newFuture[T](fromProc)) + let fo = newFuture[T](fromProc) + result = typeof(result)(fo) + when isFutureLoggingEnabled: logFutureStart(Future[T](result)) proc clean*[T](future: FutureVar[T]) = - ## Resets the ``finished`` status of ``future``. + ## Resets the `finished` status of `future`. Future[T](future).finished = false Future[T](future).error = nil proc checkFinished[T](future: Future[T]) = ## Checks whether `future` is finished. If it is then raises a - ## ``FutureError``. + ## `FutureError`. when not defined(release): if future.finished: var msg = "" @@ -92,10 +152,10 @@ proc checkFinished[T](future: Future[T]) = msg.add("\n Future ID: " & $future.id) msg.add("\n Created in proc: " & future.fromProc) msg.add("\n Stack trace to moment of creation:") - msg.add("\n" & indent(future.stackTrace.strip(), 4)) + msg.add("\n" & indent(($future.stackTrace).strip(), 4)) when T is string: msg.add("\n Contents (string): ") - msg.add("\n" & indent(future.value.repr, 4)) + msg.add("\n" & indent($future.value, 4)) msg.add("\n Stack trace to moment of secondary completion:") msg.add("\n" & indent(getStackTrace().strip(), 4)) var err = newException(FutureError, msg) @@ -104,7 +164,6 @@ proc checkFinished[T](future: Future[T]) = proc call(callbacks: var CallbackList) = var current = callbacks - while true: if not current.function.isNil: callSoon(current.function) @@ -113,7 +172,6 @@ proc call(callbacks: var CallbackList) = break else: current = current.next[] - # callback will be called only once, let GC collect them now callbacks.next = nil callbacks.function = nil @@ -123,39 +181,46 @@ proc add(callbacks: var CallbackList, function: CallbackFunc) = callbacks.function = function assert callbacks.next == nil else: - let newNext = new(ref CallbackList) - newNext.function = callbacks.function - newNext.next = callbacks.next - callbacks.next = newNext - callbacks.function = function + let newCallback = new(ref CallbackList) + newCallback.function = function + newCallback.next = nil -proc complete*[T](future: Future[T], val: T) = - ## Completes ``future`` with value ``val``. - #assert(not future.finished, "Future already finished, cannot finish twice.") - checkFinished(future) - assert(future.error == nil) - future.value = val - future.finished = true - future.callbacks.call() + if callbacks.next == nil: + callbacks.next = newCallback + else: + var last = callbacks.next + while last.next != nil: + last = last.next + last.next = newCallback -proc complete*(future: Future[void]) = - ## Completes a void ``future``. +proc completeImpl[T, U](future: Future[T], val: sink U, isVoid: static bool) = #assert(not future.finished, "Future already finished, cannot finish twice.") checkFinished(future) assert(future.error == nil) + when not isVoid: + future.value = val future.finished = true future.callbacks.call() + when isFutureLoggingEnabled: logFutureFinish(future) + +proc complete*[T](future: Future[T], val: sink T) = + ## Completes `future` with value `val`. + completeImpl(future, val, false) + +proc complete*(future: Future[void], val = Future[void].default) = + completeImpl(future, (), true) proc complete*[T](future: FutureVar[T]) = - ## Completes a ``FutureVar``. + ## Completes a `FutureVar`. template fut: untyped = Future[T](future) checkFinished(fut) assert(fut.error == nil) fut.finished = true fut.callbacks.call() + when isFutureLoggingEnabled: logFutureFinish(Future[T](future)) -proc complete*[T](future: FutureVar[T], val: T) = - ## Completes a ``FutureVar`` with value ``val``. +proc complete*[T](future: FutureVar[T], val: sink T) = + ## Completes a `FutureVar` with value `val`. ## ## Any previously stored value will be overwritten. template fut: untyped = Future[T](future) @@ -164,9 +229,10 @@ proc complete*[T](future: FutureVar[T], val: T) = fut.finished = true fut.value = val fut.callbacks.call() + when isFutureLoggingEnabled: logFutureFinish(fut) proc fail*[T](future: Future[T], error: ref Exception) = - ## Completes ``future`` with ``error``. + ## Completes `future` with `error`. #assert(not future.finished, "Future already finished, cannot finish twice.") checkFinished(future) future.finished = true @@ -174,15 +240,16 @@ proc fail*[T](future: Future[T], error: ref Exception) = future.errorStackTrace = if getStackTrace(error) == "": getStackTrace() else: getStackTrace(error) future.callbacks.call() + when isFutureLoggingEnabled: logFutureFinish(future) proc clearCallbacks*(future: FutureBase) = future.callbacks.function = nil future.callbacks.next = nil -proc addCallback*(future: FutureBase, cb: proc() {.closure,gcsafe.}) = +proc addCallback*(future: FutureBase, cb: proc() {.closure, gcsafe.}) = ## Adds the callbacks proc to be called when the future completes. ## - ## If future has already completed then ``cb`` will be called immediately. + ## If future has already completed then `cb` will be called immediately. assert cb != nil if future.finished: callSoon(cb) @@ -190,78 +257,93 @@ proc addCallback*(future: FutureBase, cb: proc() {.closure,gcsafe.}) = future.callbacks.add cb proc addCallback*[T](future: Future[T], - cb: proc (future: Future[T]) {.closure,gcsafe.}) = + cb: proc (future: Future[T]) {.closure, gcsafe.}) = ## Adds the callbacks proc to be called when the future completes. ## - ## If future has already completed then ``cb`` will be called immediately. + ## If future has already completed then `cb` will be called immediately. future.addCallback( proc() = - cb(future) + cb(future) ) -proc `callback=`*(future: FutureBase, cb: proc () {.closure,gcsafe.}) = +proc `callback=`*(future: FutureBase, cb: proc () {.closure, gcsafe.}) = ## Clears the list of callbacks and sets the callback proc to be called when the future completes. ## - ## If future has already completed then ``cb`` will be called immediately. + ## If future has already completed then `cb` will be called immediately. ## - ## It's recommended to use ``addCallback`` or ``then`` instead. + ## It's recommended to use `addCallback` or `then` instead. future.clearCallbacks future.addCallback cb proc `callback=`*[T](future: Future[T], - cb: proc (future: Future[T]) {.closure,gcsafe.}) = + cb: proc (future: Future[T]) {.closure, gcsafe.}) = ## Sets the callback proc to be called when the future completes. ## - ## If future has already completed then ``cb`` will be called immediately. + ## If future has already completed then `cb` will be called immediately. future.callback = proc () = cb(future) +template getFilenameProcname(entry: StackTraceEntry): (string, string) = + when compiles(entry.filenameStr) and compiles(entry.procnameStr): + # We can't rely on "entry.filename" and "entry.procname" still being valid + # cstring pointers, because the "string.data" buffers they pointed to might + # be already garbage collected (this entry being a non-shallow copy, + # "entry.filename" no longer points to "entry.filenameStr.data", but to the + # buffer of the original object). + (entry.filenameStr, entry.procnameStr) + else: + ($entry.filename, $entry.procname) + proc getHint(entry: StackTraceEntry): string = ## We try to provide some hints about stack trace entries that the user ## may not be familiar with, in particular calls inside the stdlib. + + let (filename, procname) = getFilenameProcname(entry) + result = "" - if entry.procname == "processPendingCallbacks": - if cmpIgnoreStyle(entry.filename, "asyncdispatch.nim") == 0: + if procname == "processPendingCallbacks": + if cmpIgnoreStyle(filename, "asyncdispatch.nim") == 0: return "Executes pending callbacks" - elif entry.procname == "poll": - if cmpIgnoreStyle(entry.filename, "asyncdispatch.nim") == 0: + elif procname == "poll": + if cmpIgnoreStyle(filename, "asyncdispatch.nim") == 0: return "Processes asynchronous completion events" - if entry.procname.endsWith("_continue"): - if cmpIgnoreStyle(entry.filename, "asyncmacro.nim") == 0: + if procname.endsWith(NimAsyncContinueSuffix): + if cmpIgnoreStyle(filename, "asyncmacro.nim") == 0: return "Resumes an async procedure" -proc `$`*(entries: seq[StackTraceEntry]): string = +proc `$`*(stackTraceEntries: seq[StackTraceEntry]): string = + when defined(nimStackTraceOverride): + let entries = addDebuggingInfo(stackTraceEntries) + else: + let entries = stackTraceEntries + result = "" # Find longest filename & line number combo for alignment purposes. var longestLeft = 0 for entry in entries: - if entry.procName.isNil: continue + let (filename, procname) = getFilenameProcname(entry) - let left = $entry.filename & $entry.line - if left.len > longestLeft: - longestLeft = left.len + if procname == "": continue + + let leftLen = filename.len + len($entry.line) + if leftLen > longestLeft: + longestLeft = leftLen - var indent = 2 # Format the entries. for entry in entries: - if entry.procName.isNil: - if entry.line == -10: - result.add(spaces(indent) & "#[\n") - indent.inc(2) - else: - indent.dec(2) - result.add(spaces(indent)& "]#\n") - continue - - let left = "$#($#)" % [$entry.filename, $entry.line] - result.add((spaces(indent) & "$#$# $#\n") % [ + let (filename, procname) = getFilenameProcname(entry) + + if procname == "" and entry.line == reraisedFromBegin: + break + + let left = "$#($#)" % [filename, $entry.line] + result.add((spaces(2) & "$# $#\n") % [ left, - spaces(longestLeft - left.len + 2), - $entry.procName + procname ]) let hint = getHint(entry) if hint.len > 0: - result.add(spaces(indent+2) & "## " & hint & "\n") + result.add(spaces(4) & "## " & hint & "\n") proc injectStacktrace[T](future: Future[T]) = when not defined(release): @@ -281,74 +363,84 @@ proc injectStacktrace[T](future: Future[T]) = newMsg.add($entries) newMsg.add("Exception message: " & exceptionMsg & "\n") - newMsg.add("Exception type:") # # For debugging purposes + # newMsg.add("Exception type:") # for entry in getStackTraceEntries(future.error): # newMsg.add "\n" & $entry future.error.msg = newMsg -proc read*[T](future: Future[T] | FutureVar[T]): T = - ## Retrieves the value of ``future``. Future must be finished otherwise - ## this function will fail with a ``ValueError`` exception. - ## - ## If the result of the future is an error then that error will be raised. - {.push hint[ConvFromXtoItselfNotNeeded]: off.} - let fut = Future[T](future) - {.pop.} +template readImpl(future, T) = + when future is Future[T]: + let fut {.cursor.} = future + else: + let fut {.cursor.} = Future[T](future) if fut.finished: if fut.error != nil: injectStacktrace(fut) raise fut.error when T isnot void: - return fut.value + result = distinctBase(future).value else: # TODO: Make a custom exception type for this? raise newException(ValueError, "Future still in progress.") +proc read*[T](future: Future[T] | FutureVar[T]): lent T = + ## Retrieves the value of `future`. Future must be finished otherwise + ## this function will fail with a `ValueError` exception. + ## + ## If the result of the future is an error then that error will be raised. + readImpl(future, T) + +proc read*(future: Future[void] | FutureVar[void]) = + readImpl(future, void) + proc readError*[T](future: Future[T]): ref Exception = - ## Retrieves the exception stored in ``future``. + ## Retrieves the exception stored in `future`. ## - ## An ``ValueError`` exception will be thrown if no exception exists + ## An `ValueError` exception will be thrown if no exception exists ## in the specified Future. if future.error != nil: return future.error else: raise newException(ValueError, "No error in future.") proc mget*[T](future: FutureVar[T]): var T = - ## Returns a mutable value stored in ``future``. + ## Returns a mutable value stored in `future`. ## - ## Unlike ``read``, this function will not raise an exception if the + ## Unlike `read`, this function will not raise an exception if the ## Future has not been finished. result = Future[T](future).value proc finished*(future: FutureBase | FutureVar): bool = - ## Determines whether ``future`` has completed. + ## Determines whether `future` has completed. ## - ## ``True`` may indicate an error or a value. Use ``failed`` to distinguish. + ## `True` may indicate an error or a value. Use `failed` to distinguish. when future is FutureVar: result = (FutureBase(future)).finished else: result = future.finished proc failed*(future: FutureBase): bool = - ## Determines whether ``future`` completed with an error. + ## Determines whether `future` completed with an error. return future.error != nil proc asyncCheck*[T](future: Future[T]) = - ## Sets a callback on ``future`` which raises an exception if the future + ## Sets a callback on `future` which raises an exception if the future ## finished with an error. ## - ## This should be used instead of ``discard`` to discard void futures. + ## This should be used instead of `discard` to discard void futures, + ## or use `waitFor` if you need to wait for the future's completion. assert(not future.isNil, "Future is nil") - future.callback = - proc () = - if future.failed: - injectStacktrace(future) - raise future.error + # TODO: We can likely look at the stack trace here and inject the location + # where the `asyncCheck` was called to give a better error stack message. + proc asyncCheckCallback() = + if future.failed: + injectStacktrace(future) + raise future.error + future.callback = asyncCheckCallback proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] = - ## Returns a future which will complete once both ``fut1`` and ``fut2`` + ## Returns a future which will complete once both `fut1` and `fut2` ## complete. var retFuture = newFuture[void]("asyncdispatch.`and`") fut1.callback = @@ -364,26 +456,27 @@ proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] = return retFuture proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] = - ## Returns a future which will complete once either ``fut1`` or ``fut2`` + ## Returns a future which will complete once either `fut1` or `fut2` ## complete. var retFuture = newFuture[void]("asyncdispatch.`or`") proc cb[X](fut: Future[X]) = - if fut.failed: retFuture.fail(fut.error) - if not retFuture.finished: retFuture.complete() + if not retFuture.finished: + if fut.failed: retFuture.fail(fut.error) + else: retFuture.complete() fut1.callback = cb[T] fut2.callback = cb[Y] return retFuture proc all*[T](futs: varargs[Future[T]]): auto = ## Returns a future which will complete once - ## all futures in ``futs`` complete. + ## all futures in `futs` complete. ## If the argument is empty, the returned future completes immediately. ## - ## If the awaited futures are not ``Future[void]``, the returned future + ## If the awaited futures are not `Future[void]`, the returned future ## will hold the values of all awaited futures in a sequence. ## - ## If the awaited futures *are* ``Future[void]``, - ## this proc returns ``Future[void]``. + ## If the awaited futures *are* `Future[void]`, + ## this proc returns `Future[void]`. when T is void: var |