import os, tables, strutils, times, heapqueue, options, deques, cstrutils
# TODO: This shouldn't need to be included, but should ideally be exported.
type
CallbackFunc = proc () {.closure, gcsafe.}
CallbackList = object
function: CallbackFunc
next: ref CallbackList
FutureBase* = ref object of RootObj ## Untyped future.
callbacks: CallbackList
finished: bool
error*: ref Exception ## Stored exception
errorStackTrace*: string
when not defined(release):
stackTrace: string ## For debugging purposes only.
id: int
fromProc: string
Future*[T] = ref object of FutureBase ## Typed future.
value: T ## Stored value
FutureVar*[T] = distinct Future[T]
FutureError* = object of Exception
cause*: FutureBase
when not defined(release):
var currentID = 0
var callSoonProc {.threadvar.}: proc (cbproc: proc ()) {.gcsafe.}
proc getCallSoonProc*(): (proc(cbproc: proc ()) {.gcsafe.}) =
## Get current implementation of ``callSoon``.
return callSoonProc
proc setCallSoonProc*(p: (proc(cbproc: proc ()) {.gcsafe.})) =
## Change current implementation of ``callSoon``. This is normally called when dispatcher from ``asyncdispatcher`` is initialized.
callSoonProc = p
proc callSoon*(cbproc: proc ()) =
## Call ``cbproc`` "soon".
##
## If async dispatcher is running, ``cbproc`` will be executed during next dispatcher tick.
##
## If async dispatcher is not running, ``cbproc`` will be executed immediately.
if callSoonProc.isNil:
# Loop not initialized yet. Call the function directly to allow setup code to use futures.
cbproc()
else:
callSoonProc(cbproc)
template setupFutureBase(fromProc: string) =
new(result)
result.finished = false
when not defined(release):
result.stackTrace = getStackTrace()
result.id = currentID
result.fromProc = fromProc
currentID.inc()
proc newFuture*[T](fromProc: string = "unspecified"): Future[T] =
## Creates a new future.
##
## Specifying ``fromProc``, which is a string specifying the name of the proc
## that this future belongs to, is a good habit as it helps with debugging.
setupFutureBase(fromProc)
proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] =
## Create a new ``FutureVar``. This Future type is ideally suited for
## situations where you want to avoid unnecessary allocations of Futures.
##
## Specifying ``fromProc``, which is a string specifying the name of the proc
## that this future belongs to, is a good habit as it helps with debugging.
result = FutureVar[T](newFuture[T](fromProc))
proc clean*[T](future: FutureVar[T]) =
## Resets the ``finished`` status of ``future``.
Future[T](future).finished = false
Future[T](future).error = nil
proc checkFinished[T](future: Future[T]) =
## Checks whether `future` is finished. If it is then raises a
## ``FutureError``.
when not defined(release):
if future.finished:
var msg = ""
msg.add("An attempt was made to complete a Future more than once. ")
msg.add("Details:")
msg.add("\n Future ID: " & $future.id)
msg.add("\n Created in proc: " & future.fromProc)
msg.add("\n Stack trace to moment of creation:")
msg.add("\n" & indent(future.stackTrace.strip(), 4))
when T is string:
msg.add("\n Contents (string): ")
msg.add("\n" & indent(future.value.repr, 4))
msg.add("\n Stack trace to moment of secondary completion:")
msg.add("\n" & indent(getStackTrace().strip(), 4))
var err = newException(FutureError, msg)
err.cause = future
raise err
proc call(callbacks: var CallbackList) =
var current = callbacks
while true:
if not current.function.isNil:
callSoon(current.function)
if current.next.isNil:
break
else:
current = current.next[]
# callback will be called only once, let GC collect them now
callbacks.next = nil
callbacks.function = nil
proc add(callbacks: var CallbackList, function: CallbackFunc) =
if callbacks.function.isNil:
callbacks.function = function
assert callbacks.next == nil
else:
let newCallback = new(ref CallbackList)
newCallback.function = function
newCallback.next = nil
if callbacks.next == nil:
callbacks.next = newCallback
else:
var last = callbacks.next
while last.next != nil:
last = last.next
last.next = newCallback
proc complete*[T](future: Future[T], val: T) =
## Completes ``future`` with value ``val``.
#assert(not future.finished, "Future already finished, cannot finish twice.")
checkFinished(future)
assert(future.error == nil)
future.value = val
future.finished = true
future.callbacks.call()
proc complete*(future: Future[void]) =
## Completes a void ``future``.
#assert(not future.finished, "Future already finished, cannot finish twice.")
checkFinished(future)
assert(future.error == nil)
future.finished = true
future.callbacks.call()
proc complete*[T](future: FutureVar[T]) =
## Completes a ``FutureVar``.
template fut: untyped = Future[T](future)
checkFinished(fut)
assert(fut.error == nil)
fut.finished = true
fut.callbacks.call()
proc complete*[T](future: FutureVar[T], val: T) =
## Completes a ``FutureVar`` with value ``val``.
##
## Any previously stored value will be overwritten.
template fut: untyped = Future[T](future)
checkFinished(fut)
assert(fut.error.isNil())
fut.finished = true
fut.value = val
fut.callbacks.call()
proc fail*[T](future: Future[T], error: ref Exception) =
## Completes ``future`` with ``error``.
#assert(not future.finished, "Future already finished, cannot finish twice.")
checkFinished(future)
future.finished = true
future.error = error
future.errorStackTrace =
if getStackTrace(error) == "": getStackTrace() else: getStackTrace(error)
future.callbacks.call()
proc clearCallbacks*(future: FutureBase) =
future.callbacks.function = nil
future.callbacks.next = nil
proc addCallback*(future: FutureBase, cb: proc() {.closure,gcsafe.}) =
## Adds the callbacks proc to be called when the future completes.
##
## If future has already completed then ``cb`` will be called immediately.
assert cb != nil
if future.finished:
callSoon(cb)
else:
future.callbacks.add cb
proc addCallback*[T](future: Future[T],
cb: proc (future: Future[T]) {.closure,gcsafe.}) =
## Adds the callbacks proc to be called when the future completes.
##
## If future has already completed then ``cb`` will be called immediately.
future.addCallback(
proc() =
cb(future)
)
proc `callback=`*(future: FutureBase, cb: proc () {.closure,gcsafe.}) =
## Clears the list of callbacks and sets the callback proc to be called when the future completes.
##
## If future has already completed then ``cb`` will be called immediately.
##
## It's recommended to use ``addCallback`` or ``then`` instead.
future.clearCallbacks
future.addCallback cb
proc `callback=`*[T](future: Future[T],
cb: proc (future: Future[T]) {.closure,gcsafe.}) =
## Sets the callback proc to be called when the future completes.
##
## If future has already completed then ``cb`` will be called immediately.
future.callback = proc () = cb(future)
proc getHint(entry: StackTraceEntry): string =
## We try to provide some hints about stack trace entries that the user
## may not be familiar with, in particular calls inside the stdlib.
result = ""
if entry.procname == cstring"processPendingCallbacks":
if cmpIgnoreStyle(entry.filename, "asyncdispatch.nim") == 0:
return "Executes pending callbacks"
elif entry.procname == cstring"poll":
if cmpIgnoreStyle(entry.filename, "asyncdispatch.nim") == 0:
return "Processes asynchronous completion events"
if entry.procname.endsWith("_continue"):
if cmpIgnoreStyle(entry.filename, "asyncmacro.nim") == 0:
return "Resumes an async procedure"
proc `$`*(entries: seq[StackTraceEntry]): string =
result = ""
# Find longest filename & line number combo for alignment purposes.
var longestLeft = 0
for entry in entries:
if entry.procName.isNil: continue
let left = $entry.filename & $entry.line
if left.len > longestLeft:
longestLeft = left.len
var indent = 2
# Format the entries.
for entry in entries:
if entry.procName.isNil:
if entry.line == -10:
result.add(spaces(indent) & "#[\n")
indent.inc(2)
else:
indent.dec(2)
result.add(spaces(indent) & "]#\n")
continue
let left = "$#($#)" % [$entry.filename, $entry.line]
result.add((spaces(indent) & "$#$# $#\n") % [
left,
spaces(longestLeft - left.len + 2),
$entry.procName
])
let hint = getHint(entry)
if hint.len > 0:
result.add(spaces(indent+2) & "## " & hint & "\n")
proc injectStacktrace[T](future: Future[T]) =
when not defined(release):
const header = "\nAsync traceback:\n"
var exceptionMsg = future.error.msg
if header in exceptionMsg:
# This is messy: extract the original exception message from the msg
# containing the async traceback.
let start = exceptionMsg.find(header)
exceptionMsg = exceptionMsg[0..<start]
var newMsg = exceptionMsg & header
let entries = getStackTraceEntries(future.error)
newMsg.add($entries)
newMsg.add("Exception message: " & exceptionMsg & "\n")
newMsg.add("Exception type:")
# # For debugging purposes
# for entry in getStackTraceEntries(future.error):
# newMsg.add "\n" & $entry
future.error.msg = newMsg
proc read*[T](future: Future[T] | FutureVar[T]): T =
## Retrieves the value of ``future``. Future must be finished otherwise
## this function will fail with a ``ValueError`` exception.
##
## If the result of the future is an error then that error will be raised.
{.push hint[ConvFromXtoItselfNotNeeded]: off.}
let fut = Future[T](future)
{.pop.}
if fut.finished:
if fut.error != nil:
injectStacktrace(fut)
raise fut.error
when T isnot void:
return fut.value
else:
# TODO: Make a custom exception type for this?
raise newException(ValueError, "Future still in progress.")
proc readError*[T](future: Future[T]): ref Exception =
## Retrieves the exception stored in ``future``.
##
## An ``ValueError`` exception will be thrown if no exception exists
## in the specified Future.
if future.error != nil: return future.error
else:
raise newException(ValueError, "No error in future.")
proc mget*[T](future: FutureVar[T]): var T =
## Returns a mutable value stored in ``future``.
##
## Unlike ``read``, this function will not raise an exception if the
## Future has not been finished.
result = Future[T](future).value
proc finished*(future: FutureBase | FutureVar): bool =
## Determines whether ``future`` has completed.
##
## ``True`` may indicate an error or a value. Use ``failed`` to distinguish.
when future is FutureVar:
result = (FutureBase(future)).finished
else:
result = future.finished
proc failed*(future: FutureBase): bool =
## Determines whether ``future`` completed with an error.
return future.error != nil
proc asyncCheck*[T](future: Future[T]) =
## Sets a callback on ``future`` which raises an exception if the future
## finished with an error.
##
## This should be used instead of ``discard`` to discard void futures,
## or use ``waitFor`` if you need to wait for the future's completion.
assert(not future.isNil, "Future is nil")
future.callback =
proc () =
if future.failed:
injectStacktrace(future)
raise future.error
proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
## Returns a future which will complete once both ``fut1`` and ``fut2``
## complete.
var retFuture = newFuture[void]("asyncdispatch.`and`")
fut1.callback =
proc () =
if not retFuture.finished:
if fut1.failed: retFuture.fail(fut1.error)
elif fut2.finished: retFuture.complete()
fut2.callback =
proc () =
if not retFuture.finished:
if fut2.failed: retFuture.fail(fut2.error)
elif fut1.finished: retFuture.complete()
return retFuture
proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
## Returns a future which will complete once either ``fut1`` or ``fut2``
## complete.
var retFuture = newFuture[void]("asyncdispatch.`or`")
proc cb[X](fut: Future[X]) =
if not retFuture.finished:
if fut.failed: retFuture.fail(fut.error)
else: retFuture.complete()
fut1.callback = cb[T]
fut2.callback = cb[Y]
return retFuture
proc all*[T](futs: varargs[Future[T]]): auto =
## Returns a future which will complete once
## all futures in ``futs`` complete.
## If the argument is empty, the returned future completes immediately.
##
## If the awaited futures are not ``Future[void]``, the returned future
## will hold the values of all awaited futures in a sequence.
##
## If the awaited futures *are* ``Future[void]``,
## this proc returns ``Future[void]``.
when T is void:
var
retFuture = newFuture[void]("asyncdispatch.all")
completedFutures = 0
let totalFutures = len(futs)
for fut in futs:
fut.addCallback proc (f: Future[T]) =
inc(completedFutures)
if not retFuture.finished:
if f.failed:
retFuture.fail(f.error)
else:
if completedFutures == totalFutures:
retFuture.complete()
if totalFutures == 0:
retFuture.complete()
return retFuture
else:
var
retFuture = newFuture[seq[T]]("asyncdispatch.all")
retValues = newSeq[T](len(futs))
completedFutures = 0
for i, fut in futs:
proc setCallback(i: int) =
fut.addCallback proc (f: Future[T]) =
inc(completedFutures)
if not retFuture.finished:
if f.failed:
retFuture.fail(f.error)
else:
retValues[i] = f.read()
if completedFutures == len(retValues):
retFuture.complete(retValues)
setCallback(i)
if retValues.len == 0:
retFuture.complete(retValues)
return retFuture