summary refs log blame commit diff stats
path: root/lib/pure/asyncfutures.nim
blob: 5037c8a24019bdd4dcd94f1e14054ef30fb275ca (plain) (tree)
1
2
3
4
5
6
7
8
9
10
                                                                         


                                                                           





                                            
                                                        

                           















                                                        


                          
                                                                 







                                                                                                                                    
 
                                 




                                                                                             
                        



                                                                                              
 
                                            







                                       






                                                                               















                                                                               
                            
















                                                                          



                                        
                                  

                                
                          








                                                               
                              


                                 










                                           
 






                                                                               
                         






                                                                               
                         


                                         
                                           


                          
                      




                                                     
                                           
                    
                           

                     
                      








                                                                               
                         
 
                                          









                                                                            
       

                           

                                                                       


                                                                            



                     

                                                                       
                                                                                                    


                                                                            


                                                                 







                                                                            



                                                                         
                                                        
                                                                
                                         
                                       
                                                                

                                                       

                                                             






                                                                     

                                     



                                            
                

                       





                                           
                                           

              
                                                        
                                                 





                                         
                                                        
 
                                             
                            
                                         
 





                                                                          
 
 
                                      
 
                                                    
                        
 

                                                           
 

                                                      

                                





                                                                            
                                                
                             
         

























                                                                      
                                                      


                                                                              

                                          

                            








                                                                          

                                                                        
                                           


























                                                                          


                                              






                                              
                                                                         














                                                                         
                                           






                                                
 


                          









                                                        
                                             








                                                    


                    


                                   
                    
import os, tables, strutils, times, heapqueue, options, deques, cstrutils

# TODO: This shouldn't need to be included, but should ideally be exported.
type
  CallbackFunc = proc () {.closure, gcsafe.}

  CallbackList = object
    function: CallbackFunc
    next: ref CallbackList

  FutureBase* = ref object of RootObj ## Untyped future.
    callbacks: CallbackList

    finished: bool
    error*: ref Exception ## Stored exception
    errorStackTrace*: string
    when not defined(release):
      stackTrace: string ## For debugging purposes only.
      id: int
      fromProc: string

  Future*[T] = ref object of FutureBase ## Typed future.
    value: T ## Stored value

  FutureVar*[T] = distinct Future[T]

  FutureError* = object of Exception
    cause*: FutureBase

when not defined(release):
  var currentID = 0

var callSoonProc {.threadvar.}: proc (cbproc: proc ()) {.gcsafe.}

proc getCallSoonProc*(): (proc(cbproc: proc ()) {.gcsafe.}) =
  ## Get current implementation of ``callSoon``.
  return callSoonProc

proc setCallSoonProc*(p: (proc(cbproc: proc ()) {.gcsafe.})) =
  ## Change current implementation of ``callSoon``. This is normally called when dispatcher from ``asyncdispatcher`` is initialized.
  callSoonProc = p

proc callSoon*(cbproc: proc ()) =
  ## Call ``cbproc`` "soon".
  ##
  ## If async dispatcher is running, ``cbproc`` will be executed during next dispatcher tick.
  ##
  ## If async dispatcher is not running, ``cbproc`` will be executed immediately.
  if callSoonProc.isNil:
    # Loop not initialized yet. Call the function directly to allow setup code to use futures.
    cbproc()
  else:
    callSoonProc(cbproc)

template setupFutureBase(fromProc: string) =
  new(result)
  result.finished = false
  when not defined(release):
    result.stackTrace = getStackTrace()
    result.id = currentID
    result.fromProc = fromProc
    currentID.inc()

proc newFuture*[T](fromProc: string = "unspecified"): Future[T] =
  ## Creates a new future.
  ##
  ## Specifying ``fromProc``, which is a string specifying the name of the proc
  ## that this future belongs to, is a good habit as it helps with debugging.
  setupFutureBase(fromProc)

proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] =
  ## Create a new ``FutureVar``. This Future type is ideally suited for
  ## situations where you want to avoid unnecessary allocations of Futures.
  ##
  ## Specifying ``fromProc``, which is a string specifying the name of the proc
  ## that this future belongs to, is a good habit as it helps with debugging.
  result = FutureVar[T](newFuture[T](fromProc))

proc clean*[T](future: FutureVar[T]) =
  ## Resets the ``finished`` status of ``future``.
  Future[T](future).finished = false
  Future[T](future).error = nil

proc checkFinished[T](future: Future[T]) =
  ## Checks whether `future` is finished. If it is then raises a
  ## ``FutureError``.
  when not defined(release):
    if future.finished:
      var msg = ""
      msg.add("An attempt was made to complete a Future more than once. ")
      msg.add("Details:")
      msg.add("\n  Future ID: " & $future.id)
      msg.add("\n  Created in proc: " & future.fromProc)
      msg.add("\n  Stack trace to moment of creation:")
      msg.add("\n" & indent(future.stackTrace.strip(), 4))
      when T is string:
        msg.add("\n  Contents (string): ")
        msg.add("\n" & indent(future.value.repr, 4))
      msg.add("\n  Stack trace to moment of secondary completion:")
      msg.add("\n" & indent(getStackTrace().strip(), 4))
      var err = newException(FutureError, msg)
      err.cause = future
      raise err

proc call(callbacks: var CallbackList) =
  var current = callbacks

  while true:
    if not current.function.isNil:
      callSoon(current.function)

    if current.next.isNil:
      break
    else:
      current = current.next[]

  # callback will be called only once, let GC collect them now
  callbacks.next = nil
  callbacks.function = nil

proc add(callbacks: var CallbackList, function: CallbackFunc) =
  if callbacks.function.isNil:
    callbacks.function = function
    assert callbacks.next == nil
  else:
    let newCallback = new(ref CallbackList)
    newCallback.function = function
    newCallback.next = nil

    if callbacks.next == nil:
      callbacks.next = newCallback
    else:
      var last = callbacks.next
      while last.next != nil:
        last = last.next
      last.next = newCallback

proc complete*[T](future: Future[T], val: T) =
  ## Completes ``future`` with value ``val``.
  #assert(not future.finished, "Future already finished, cannot finish twice.")
  checkFinished(future)
  assert(future.error == nil)
  future.value = val
  future.finished = true
  future.callbacks.call()

proc complete*(future: Future[void]) =
  ## Completes a void ``future``.
  #assert(not future.finished, "Future already finished, cannot finish twice.")
  checkFinished(future)
  assert(future.error == nil)
  future.finished = true
  future.callbacks.call()

proc complete*[T](future: FutureVar[T]) =
  ## Completes a ``FutureVar``.
  template fut: untyped = Future[T](future)
  checkFinished(fut)
  assert(fut.error == nil)
  fut.finished = true
  fut.callbacks.call()

proc complete*[T](future: FutureVar[T], val: T) =
  ## Completes a ``FutureVar`` with value ``val``.
  ##
  ## Any previously stored value will be overwritten.
  template fut: untyped = Future[T](future)
  checkFinished(fut)
  assert(fut.error.isNil())
  fut.finished = true
  fut.value = val
  fut.callbacks.call()

proc fail*[T](future: Future[T], error: ref Exception) =
  ## Completes ``future`` with ``error``.
  #assert(not future.finished, "Future already finished, cannot finish twice.")
  checkFinished(future)
  future.finished = true
  future.error = error
  future.errorStackTrace =
    if getStackTrace(error) == "": getStackTrace() else: getStackTrace(error)
  future.callbacks.call()

proc clearCallbacks*(future: FutureBase) =
  future.callbacks.function = nil
  future.callbacks.next = nil

proc addCallback*(future: FutureBase, cb: proc() {.closure,gcsafe.}) =
  ## Adds the callbacks proc to be called when the future completes.
  ##
  ## If future has already completed then ``cb`` will be called immediately.
  assert cb != nil
  if future.finished:
    callSoon(cb)
  else:
    future.callbacks.add cb

proc addCallback*[T](future: Future[T],
                     cb: proc (future: Future[T]) {.closure,gcsafe.}) =
  ## Adds the callbacks proc to be called when the future completes.
  ##
  ## If future has already completed then ``cb`` will be called immediately.
  future.addCallback(
    proc() =
      cb(future)
  )

proc `callback=`*(future: FutureBase, cb: proc () {.closure,gcsafe.}) =
  ## Clears the list of callbacks and sets the callback proc to be called when the future completes.
  ##
  ## If future has already completed then ``cb`` will be called immediately.
  ##
  ## It's recommended to use ``addCallback`` or ``then`` instead.
  future.clearCallbacks
  future.addCallback cb

proc `callback=`*[T](future: Future[T],
    cb: proc (future: Future[T]) {.closure,gcsafe.}) =
  ## Sets the callback proc to be called when the future completes.
  ##
  ## If future has already completed then ``cb`` will be called immediately.
  future.callback = proc () = cb(future)

proc getHint(entry: StackTraceEntry): string =
  ## We try to provide some hints about stack trace entries that the user
  ## may not be familiar with, in particular calls inside the stdlib.
  result = ""
  if entry.procname == cstring"processPendingCallbacks":
    if cmpIgnoreStyle(entry.filename, "asyncdispatch.nim") == 0:
      return "Executes pending callbacks"
  elif entry.procname == cstring"poll":
    if cmpIgnoreStyle(entry.filename, "asyncdispatch.nim") == 0:
      return "Processes asynchronous completion events"

  if entry.procname.endsWith("_continue"):
    if cmpIgnoreStyle(entry.filename, "asyncmacro.nim") == 0:
      return "Resumes an async procedure"

proc `$`*(entries: seq[StackTraceEntry]): string =
  result = ""
  # Find longest filename & line number combo for alignment purposes.
  var longestLeft = 0
  for entry in entries:
    if entry.procName.isNil: continue

    let left = $entry.filename & $entry.line
    if left.len > longestLeft:
      longestLeft = left.len

  var indent = 2
  # Format the entries.
  for entry in entries:
    if entry.procName.isNil:
      if entry.line == -10:
        result.add(spaces(indent) & "#[\n")
        indent.inc(2)
      else:
        indent.dec(2)
        result.add(spaces(indent) & "]#\n")
      continue

    let left = "$#($#)" % [$entry.filename, $entry.line]
    result.add((spaces(indent) & "$#$# $#\n") % [
      left,
      spaces(longestLeft - left.len + 2),
      $entry.procName
    ])
    let hint = getHint(entry)
    if hint.len > 0:
      result.add(spaces(indent+2) & "## " & hint & "\n")

proc injectStacktrace[T](future: Future[T]) =
  when not defined(release):
    const header = "\nAsync traceback:\n"

    var exceptionMsg = future.error.msg
    if header in exceptionMsg:
      # This is messy: extract the original exception message from the msg
      # containing the async traceback.
      let start = exceptionMsg.find(header)
      exceptionMsg = exceptionMsg[0..<start]


    var newMsg = exceptionMsg & header

    let entries = getStackTraceEntries(future.error)
    newMsg.add($entries)

    newMsg.add("Exception message: " & exceptionMsg & "\n")
    newMsg.add("Exception type:")

    # # For debugging purposes
    # for entry in getStackTraceEntries(future.error):
    #   newMsg.add "\n" & $entry
    future.error.msg = newMsg

proc read*[T](future: Future[T] | FutureVar[T]): T =
  ## Retrieves the value of ``future``. Future must be finished otherwise
  ## this function will fail with a ``ValueError`` exception.
  ##
  ## If the result of the future is an error then that error will be raised.
  {.push hint[ConvFromXtoItselfNotNeeded]: off.}
  let fut = Future[T](future)
  {.pop.}
  if fut.finished:
    if fut.error != nil:
      injectStacktrace(fut)
      raise fut.error
    when T isnot void:
      return fut.value
  else:
    # TODO: Make a custom exception type for this?
    raise newException(ValueError, "Future still in progress.")

proc readError*[T](future: Future[T]): ref Exception =
  ## Retrieves the exception stored in ``future``.
  ##
  ## An ``ValueError`` exception will be thrown if no exception exists
  ## in the specified Future.
  if future.error != nil: return future.error
  else:
    raise newException(ValueError, "No error in future.")

proc mget*[T](future: FutureVar[T]): var T =
  ## Returns a mutable value stored in ``future``.
  ##
  ## Unlike ``read``, this function will not raise an exception if the
  ## Future has not been finished.
  result = Future[T](future).value

proc finished*(future: FutureBase | FutureVar): bool =
  ## Determines whether ``future`` has completed.
  ##
  ## ``True`` may indicate an error or a value. Use ``failed`` to distinguish.
  when future is FutureVar:
    result = (FutureBase(future)).finished
  else:
    result = future.finished

proc failed*(future: FutureBase): bool =
  ## Determines whether ``future`` completed with an error.
  return future.error != nil

proc asyncCheck*[T](future: Future[T]) =
  ## Sets a callback on ``future`` which raises an exception if the future
  ## finished with an error.
  ##
  ## This should be used instead of ``discard`` to discard void futures,
  ## or use ``waitFor`` if you need to wait for the future's completion.
  assert(not future.isNil, "Future is nil")
  future.callback =
    proc () =
      if future.failed:
        injectStacktrace(future)
        raise future.error

proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
  ## Returns a future which will complete once both ``fut1`` and ``fut2``
  ## complete.
  var retFuture = newFuture[void]("asyncdispatch.`and`")
  fut1.callback =
    proc () =
      if not retFuture.finished:
        if fut1.failed: retFuture.fail(fut1.error)
        elif fut2.finished: retFuture.complete()
  fut2.callback =
    proc () =
      if not retFuture.finished:
        if fut2.failed: retFuture.fail(fut2.error)
        elif fut1.finished: retFuture.complete()
  return retFuture

proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
  ## Returns a future which will complete once either ``fut1`` or ``fut2``
  ## complete.
  var retFuture = newFuture[void]("asyncdispatch.`or`")
  proc cb[X](fut: Future[X]) =
    if not retFuture.finished:
      if fut.failed: retFuture.fail(fut.error)
      else: retFuture.complete()
  fut1.callback = cb[T]
  fut2.callback = cb[Y]
  return retFuture

proc all*[T](futs: varargs[Future[T]]): auto =
  ## Returns a future which will complete once
  ## all futures in ``futs`` complete.
  ## If the argument is empty, the returned future completes immediately.
  ##
  ## If the awaited futures are not ``Future[void]``, the returned future
  ## will hold the values of all awaited futures in a sequence.
  ##
  ## If the awaited futures *are* ``Future[void]``,
  ## this proc returns ``Future[void]``.

  when T is void:
    var
      retFuture = newFuture[void]("asyncdispatch.all")
      completedFutures = 0

    let totalFutures = len(futs)

    for fut in futs:
      fut.addCallback proc (f: Future[T]) =
        inc(completedFutures)
        if not retFuture.finished:
          if f.failed:
            retFuture.fail(f.error)
          else:
            if completedFutures == totalFutures:
              retFuture.complete()

    if totalFutures == 0:
      retFuture.complete()

    return retFuture

  else:
    var
      retFuture = newFuture[seq[T]]("asyncdispatch.all")
      retValues = newSeq[T](len(futs))
      completedFutures = 0

    for i, fut in futs:
      proc setCallback(i: int) =
        fut.addCallback proc (f: Future[T]) =
          inc(completedFutures)
          if not retFuture.finished:
            if f.failed:
              retFuture.fail(f.error)
            else:
              retValues[i] = f.read()

              if completedFutures == len(retValues):
                retFuture.complete(retValues)

      setCallback(i)

    if retValues.len == 0:
      retFuture.complete(retValues)

    return retFuture