# # # Nim's Runtime Library # (c) Copyright 2015 Dominik Picheta # # See the file "copying.txt", included in this # distribution, for details about the copyright. # import os, tables, strutils, times, heapqueue, options, deques, cstrutils import system/stacktraces # TODO: This shouldn't need to be included, but should ideally be exported. type CallbackFunc = proc () {.closure, gcsafe.} CallbackList = object function: CallbackFunc next: owned(ref CallbackList) FutureBase* = ref object of RootObj ## Untyped future. callbacks: CallbackList finished: bool error*: ref Exception ## Stored exception errorStackTrace*: string when not defined(release): stackTrace: seq[StackTraceEntry] ## For debugging purposes only. id: int fromProc: string Future*[T] = ref object of FutureBase ## Typed future. value: T ## Stored value FutureVar*[T] = distinct Future[T] FutureError* = object of Defect cause*: FutureBase when not defined(release): var currentID = 0 const isFutureLoggingEnabled* = defined(futureLogging) const NimAsyncContinueSuffix* = "NimAsyncContinue" ## For internal usage. Do not use. when isFutureLoggingEnabled: import hashes type FutureInfo* = object stackTrace*: seq[StackTraceEntry] fromProc*: string var futuresInProgress {.threadvar.}: Table[FutureInfo, int] proc getFuturesInProgress*(): var Table[FutureInfo, int] = return futuresInProgress proc hash(s: StackTraceEntry): Hash = result = hash(s.procname) !& hash(s.line) !& hash(s.filename) result = !$result proc hash(fi: FutureInfo): Hash = result = hash(fi.stackTrace) !& hash(fi.fromProc) result = !$result proc getFutureInfo(fut: FutureBase): FutureInfo = let info = FutureInfo( stackTrace: fut.stackTrace, fromProc: fut.fromProc ) return info proc logFutureStart(fut: FutureBase) = let info = getFutureInfo(fut) if info notin getFuturesInProgress(): getFuturesInProgress()[info] = 0 getFuturesInProgress()[info].inc() proc logFutureFinish(fut: FutureBase) = getFuturesInProgress()[getFutureInfo(fut)].dec() var callSoonProc {.threadvar.}: proc (cbproc: proc ()) {.gcsafe.} proc getCallSoonProc*(): (proc(cbproc: proc ()) {.gcsafe.}) = ## Get current implementation of `callSoon`. return callSoonProc proc setCallSoonProc*(p: (proc(cbproc: proc ()) {.gcsafe.})) = ## Change current implementation of `callSoon`. This is normally called when dispatcher from `asyncdispatcher` is initialized. callSoonProc = p proc callSoon*(cbproc: proc ()) = ## Call `cbproc` "soon". ## ## If async dispatcher is running, `cbproc` will be executed during next dispatcher tick. ## ## If async dispatcher is not running, `cbproc` will be executed immediately. if callSoonProc.isNil: # Loop not initialized yet. Call the function directly to allow setup code to use futures. cbproc() else: callSoonProc(cbproc) template setupFutureBase(fromProc: string) = new(result) result.finished = false when not defined(release): result.stackTrace = getStackTraceEntries() result.id = currentID result.fromProc = fromProc currentID.inc() proc newFuture*[T](fromProc: string = "unspecified"): owned(Future[T]) = ## Creates a new future. ## ## Specifying `fromProc`, which is a string specifying the name of the proc ## that this future belongs to, is a good habit as it helps with debugging. setupFutureBase(fromProc) when isFutureLoggingEnabled: logFutureStart(result) proc newFutureVar*[T](fromProc = "unspecified"): owned(FutureVar[T]) = ## Create a new `FutureVar`. This Future type is ideally suited for ## situations where you want to avoid unnecessary allocations of Futures. ## ## Specifying `fromProc`, which is a string specifying the name of the proc ## that this future belongs to, is a good habit as it helps with debugging. let fo = newFuture[T](fromProc) result = typeof(result)(fo) when isFutureLoggingEnabled: logFutureStart(Future[T](result)) proc clean*[T](future: FutureVar[T]) = ## Resets the `finished` status of `future`. Future[T](future).finished = false Future[T](future).error = nil proc checkFinished[T](future: Future[T]) = ## Checks whether `future` is finished. If it is then raises a ## `FutureError`. when not defined(release): if future.finished: var msg = "" msg.add("An attempt was made to complete a Future more than once. ") msg.add("Details:") msg.add("\n Future ID: " & $future.id) msg.add("\n Created in proc: " & future.fromProc) msg.add("\n Stack trace to moment of creation:") msg.add("\n" & indent(($future.stackTrace).strip(), 4)) when T is string: msg.add("\n Contents (string): ") msg.add("\n" & indent($future.value, 4)) msg.add("\n Stack trace to moment of secondary completion:") msg.add("\n" & indent(getStackTrace().strip(), 4)) var err = newException(FutureError, msg) err.cause = future raise err proc call(callbacks: var CallbackList) = var current = callbacks while true: if not current.function.isNil: callSoon(current.function) if current.next.isNil: break else: current = current.next[] # callback will be called only once, let GC collect them now callbacks.next = nil callbacks.function = nil proc add(callbacks: var CallbackList, function: CallbackFunc) = if callbacks.function.isNil: callbacks.function = function assert callbacks.next == nil else: let newCallback = new(ref CallbackList) newCallback.function = function newCallback.next = nil if callbacks.next == nil: callbacks.next = newCallback else: var last = callbacks.next while last.next != nil: last = last.next last.next = newCallback proc complete*[T](future: Future[T], val: T) = ## Completes `future` with value `val`. #assert(not future.finished, "Future already finished, cannot finish twice.") checkFinished(future) assert(future.error == nil) future.value = val future.finished = true future.callbacks.call() when isFutureLoggingEnabled: logFutureFinish(future) proc complete*(future: Future[void]) = ## Completes a void `future`. #assert(not future.finished, "Future already finished, cannot finish twice.") checkFinished(future) assert(future.error == nil) future.finished = true future.callbacks.call() when isFutureLoggingEnabled: logFutureFinish(future) proc complete*[T](future: FutureVar[T]) = ## Completes a `FutureVar`. template fut: untyped = Future[T](future) checkFinished(fut) assert(fut.error == nil) fut.finished = true fut.callbacks.call() when isFutureLoggingEnabled: logFutureFinish(Future[T](future)) proc complete*[T](future: FutureVar[T], val: T) = ## Completes a `FutureVar` with value `val`. ## ## Any previously stored value will be overwritten. template fut: untyped = Future[T](future) checkFinished(fut) assert(fut.error.isNil()) fut.finished = true fut.value = val fut.callbacks.call() when isFutureLoggingEnabled: logFutureFinish(future) proc fail*[T](future: Future[T], error: ref Exception) = ## Completes `future` with `error`. #assert(not future.finished, "Future already finished, cannot finish twice.") checkFinished(future) future.finished = true future.error = error future.errorStackTrace = if getStackTrace(error) == "": getStackTrace() else: getStackTrace(error) future.callbacks.call() when isFutureLoggingEnabled: logFutureFinish(future) proc clearCallbacks*(future: FutureBase) = future.callbacks.function = nil future.callbacks.next = nil proc addCallback*(future: FutureBase, cb: proc() {.closure, gcsafe.}) = ## Adds the callbacks proc to be called when the future completes. ## ## If future has already completed then `cb` will be called immediately. assert cb != nil if future.finished: callSoon(cb) else: future.callbacks.add cb proc addCallback*[T](future: Future[T], cb: proc (future: Future[T]) {.closure, gcsafe.}) = ## Adds the callbacks proc to be called when the future completes. ## ## If future has already completed then `cb` will be called immediately. future.addCallback( proc() = cb(future) ) proc `callback=`*(future: FutureBase, cb: proc () {.closure, gcsafe.}) = ## Clears the list of callbacks and sets the callback proc to be called when the future completes. ## ## If future has already completed then `cb` will be called immediately. ## ## It's recommended to use `addCallback` or `then` instead. future.clearCallbacks future.addCallback cb proc `callback=`*[T](future: Future[T], cb: proc (future: Future[T]) {.closure, gcsafe.}) = ## Sets the callback proc to be called when the future completes. ## ## If future has already completed then `cb` will be called immediately. future.callback = proc () = cb(future) proc getHint(entry: StackTraceEntry): string = ## We try to provide some hints about stack trace entries that the user ## may not be familiar with, in particular calls inside the stdlib. result = "" if entry.procname == cstring"processPendingCallbacks": if cmpIgnoreStyle(entry.filename, "asyncdispatch.nim") == 0: return "Executes pending callbacks" elif entry.procname == cstring"poll": if cmpIgnoreStyle(entry.filename, "asyncdispatch.nim") == 0: return "Processes asynchronous completion events" if entry.procname.endsWith(NimAsyncContinueSuffix): if cmpIgnoreStyle(entry.filename, "asyncmacro.nim") == 0: return "Resumes an async procedure" proc `$`*(stackTraceEntries: seq[StackTraceEntry]): string = when defined(nimStackTraceOverride): let entries = addDebuggingInfo(stackTraceEntries) else: let entries = stackTraceEntries result = "" # Find longest filename & line number combo for alignment purposes. var longestLeft = 0 for entry in entries: if entry.procname.isNil: continue let left = $entry.filename & $entry.line if left.len > longestLeft: longestLeft = left.len var indent = 2 # Format the entries. for entry in entries: if entry.procname.isNil: if entry.line == reraisedFromBegin: result.add(spaces(indent) & "#[\n") indent.inc(2) elif entry.line == reraisedFromEnd: indent.dec(2) result.add(spaces(indent) & "]#\n") continue let left = "$#($#)" % [$entry.filename, $entry.line] result.add((spaces(indent) & "$#$# $#\n") % [ left, spaces(longestLeft - left.len + 2), $entry.procname ]) let hint = getHint(entry) if hint.len > 0: result.add(spaces(indent+2) & "## " & hint & "\n") proc injectStacktrace[T](future: Future[T]) = when not defined(release): const header = "\nAsync traceback:\n" var exceptionMsg = future.error.msg if header in exceptionMsg: # This is messy: extract the original exception message from the msg # containing the async traceback. let start = exceptionMsg.find(header) exceptionMsg = exceptionMsg[0..