summary refs log tree commit diff stats
path: root/lib/pure/asyncdispatch.nim
diff options
context:
space:
mode:
authorDominik Picheta <dominikpicheta@googlemail.com>2014-08-09 17:13:22 +0100
committerDominik Picheta <dominikpicheta@googlemail.com>2014-08-09 17:13:22 +0100
commit4f5f98f0b1693a221bd3a2087bddf7e6ac350387 (patch)
tree93ff56f70b8995e249b0f38e9467f524de73fe77 /lib/pure/asyncdispatch.nim
parentfd086abb43d606188920c841bcd4abbed770e7d6 (diff)
downloadNim-4f5f98f0b1693a221bd3a2087bddf7e6ac350387.tar.gz
Fixes incorrect async exception handling. Adds sleepAsync.
The tasyncexceptions test has been added which tests for this incorrect
exception handling behaviour. The problem was that the exception was
raised inside a callback which was called from a previously finished async
procedure. This caused a "Future already finished" error. The fix was to
simply reraise the exception if the retFutureSym is already finished.

sleepAsync was added to help with the reproduction of this test. It should
also be useful for users however.

Finally some debug information was added to futures to help with future
bugs.
Diffstat (limited to 'lib/pure/asyncdispatch.nim')
-rw-r--r--lib/pure/asyncdispatch.nim147
1 files changed, 99 insertions, 48 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim
index d410f8ce1..6339232f8 100644
--- a/lib/pure/asyncdispatch.nim
+++ b/lib/pure/asyncdispatch.nim
@@ -9,7 +9,7 @@
 
 include "system/inclrtl"
 
-import os, oids, tables, strutils, macros
+import os, oids, tables, strutils, macros, times
 
 import rawsockets, net
 
@@ -41,27 +41,40 @@ type
     cb: proc () {.closure,gcsafe.}
     finished: bool
     error*: ref EBase
-    stackTrace: string ## For debugging purposes only.
+    when defined(debug):
+      stackTrace: string ## For debugging purposes only.
+      id: int
+      fromProc: string
 
   PFuture*[T] = ref object of PFutureBase
     value: T
 
-proc newFuture*[T](): PFuture[T] =
+var currentID* = 0
+proc newFuture*[T](fromProc: string = "unspecified"): PFuture[T] =
   ## Creates a new future.
+  ##
+  ## Specifying ``fromProc``, which is a string specifying the name of the proc
+  ## that this future belongs to, is a good habit as it helps with debugging.
   new(result)
   result.finished = false
-  result.stackTrace = getStackTrace()
+  when defined(debug):
+    result.stackTrace = getStackTrace()
+    result.id = currentID
+    result.fromProc = fromProc
+    currentID.inc()
 
 proc checkFinished[T](future: PFuture[T]) =
-  if future.finished:
-    echo("<----->")
-    echo(future.stackTrace)
-    echo("-----")
-    when T is string:
-      echo("Contents: ", future.value.repr)
-    echo("<----->")
-    echo("Future already finished, cannot finish twice.")
-    assert false
+  when defined(debug):
+    if future.finished:
+      echo("<-----> ", future.id, " ", future.fromProc)
+      echo(future.stackTrace)
+      echo("-----")
+      when T is string:
+        echo("Contents: ", future.value.repr)
+      echo("<----->")
+      echo("Future already finished, cannot finish twice.")
+      echo getStackTrace()
+      assert false
 
 proc complete*[T](future: PFuture[T], val: T) =
   ## Completes ``future`` with value ``val``.
@@ -121,7 +134,8 @@ proc read*[T](future: PFuture[T]): T =
   ##
   ## If the result of the future is an error then that error will be raised.
   if future.finished:
-    if future.error != nil: raise future.error
+    if future.error != nil:
+      raise future.error
     when T isnot void:
       return future.value
   else:
@@ -150,7 +164,21 @@ proc asyncCheck*[T](future: PFuture[T]) =
   ## This should be used instead of ``discard`` to discard void futures.
   future.callback =
     proc () =
-      if future.failed: raise future.error
+      if future.failed:
+        raise future.error
+
+type
+  PDispatcherBase = ref object of PObject
+    timers: seq[tuple[finishAt: float, fut: PFuture[void]]]
+
+proc processTimers(p: PDispatcherBase) =
+  var oldTimers = p.timers
+  p.timers = @[]
+  for t in oldTimers:
+    if epochTime() >= t.finishAt:
+      t.fut.complete()
+    else:
+      p.timers.add(t)
 
 when defined(windows) or defined(nimdoc):
   import winlean, sets, hashes
@@ -162,7 +190,7 @@ when defined(windows) or defined(nimdoc):
       cb: proc (sock: TAsyncFD, bytesTransferred: DWORD,
                 errcode: TOSErrorCode) {.closure,gcsafe.}
 
-    PDispatcher* = ref object
+    PDispatcher* = ref object of PDispatcherBase
       ioPort: THandle
       handles: TSet[TAsyncFD]
 
@@ -181,6 +209,7 @@ when defined(windows) or defined(nimdoc):
     new result
     result.ioPort = CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
     result.handles = initSet[TAsyncFD]()
+    result.timers = @[]
 
   var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
   proc getGlobalDispatcher*(): PDispatcher =
@@ -207,8 +236,9 @@ when defined(windows) or defined(nimdoc):
   proc poll*(timeout = 500) =
     ## Waits for completion events and processes them.
     let p = getGlobalDispatcher()
-    if p.handles.len == 0:
-      raise newException(EInvalidValue, "No handles registered in dispatcher.")
+    if p.handles.len == 0 and p.timers.len == 0:
+      raise newException(EInvalidValue,
+        "No handles or timers registered in dispatcher.")
     
     let llTimeout =
       if timeout ==  -1: winlean.INFINITE
@@ -242,6 +272,9 @@ when defined(windows) or defined(nimdoc):
           discard
         else: osError(errCode)
 
+    # Timer processing.
+    processTimers(p)
+
   var connectExPtr: pointer = nil
   var acceptExPtr: pointer = nil
   var getAcceptExSockAddrsPtr: pointer = nil
@@ -314,7 +347,7 @@ when defined(windows) or defined(nimdoc):
     ## Returns a ``PFuture`` which will complete when the connection succeeds
     ## or an error occurs.
     verifyPresence(socket)
-    var retFuture = newFuture[void]()
+    var retFuture = newFuture[void]("connect")
     # Apparently ``ConnectEx`` expects the socket to be initially bound:
     var saddr: Tsockaddr_in
     saddr.sin_family = int16(toInt(af))
@@ -384,7 +417,7 @@ when defined(windows) or defined(nimdoc):
     #     '\0' in the message currently signifies a socket disconnect. Who
     #     knows what will happen when someone sends that to our socket.
     verifyPresence(socket)
-    var retFuture = newFuture[string]()    
+    var retFuture = newFuture[string]("recv")
     var dataBuf: TWSABuf
     dataBuf.buf = cast[cstring](alloc0(size))
     dataBuf.len = size
@@ -459,7 +492,7 @@ when defined(windows) or defined(nimdoc):
     ## Sends ``data`` to ``socket``. The returned future will complete once all
     ## data has been sent.
     verifyPresence(socket)
-    var retFuture = newFuture[void]()
+    var retFuture = newFuture[void]("send")
 
     var dataBuf: TWSABuf
     dataBuf.buf = data # since this is not used in a callback, this is fine
@@ -502,7 +535,7 @@ when defined(windows) or defined(nimdoc):
     ##
     ## The resulting client socket is automatically registered to dispatcher.
     verifyPresence(socket)
-    var retFuture = newFuture[tuple[address: string, client: TAsyncFD]]()
+    var retFuture = newFuture[tuple[address: string, client: TAsyncFD]]("acceptAddr")
 
     var clientSock = newRawSocket()
     if clientSock == osInvalidSocket: osError(osLastError())
@@ -614,6 +647,7 @@ else:
   proc newDispatcher*(): PDispatcher =
     new result
     result.selector = newSelector()
+    result.timers = @[]
 
   var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
   proc getGlobalDispatcher*(): PDispatcher =
@@ -693,6 +727,8 @@ else:
       else:
         # FD no longer a part of the selector. Likely been closed
         # (e.g. socket disconnected).
+
+    processTimers(p)
   
   proc connect*(socket: TAsyncFD, address: string, port: TPort,
     af = AF_INET): PFuture[void] =
@@ -814,11 +850,19 @@ else:
     addRead(socket, cb)
     return retFuture
 
+proc sleepAsync*(ms: int): PFuture[void] =
+  ## Suspends the execution of the current async procedure for the next
+  ## ``ms`` miliseconds.
+  var retFuture = newFuture[void]("sleepAsync")
+  let p = getGlobalDispatcher()
+  p.timers.add((epochTime() + (ms / 1000), retFuture))
+  return retFuture
+
 proc accept*(socket: TAsyncFD): PFuture[TAsyncFD] =
   ## Accepts a new connection. Returns a future containing the client socket
   ## corresponding to that connection.
   ## The future will complete when the connection is successfully accepted.
-  var retFut = newFuture[TAsyncFD]()
+  var retFut = newFuture[TAsyncFD]("accept")
   var fut = acceptAddr(socket)
   fut.callback =
     proc (future: PFuture[tuple[address: string, client: TAsyncFD]]) =
@@ -845,11 +889,16 @@ template createCb*(retFutureSym, iteratorNameSym,
         else:
           next.callback = cb
     except:
-      retFutureSym.fail(getCurrentException())
+      if retFutureSym.finished:
+        # Take a look at tasyncexceptions for the bug which this fixes.
+        # That test explains it better than I can here.
+        raise
+      else:
+        retFutureSym.fail(getCurrentException())
   cb()
   #{.pop.}
 proc generateExceptionCheck(futSym,
-    exceptBranch, rootReceiver: PNimrodNode): PNimrodNode {.compileTime.} =
+    exceptBranch, rootReceiver, fromNode: PNimrodNode): PNimrodNode {.compileTime.} =
   if exceptBranch == nil:
     result = rootReceiver
   else:
@@ -869,20 +918,21 @@ proc generateExceptionCheck(futSym,
            )
          )
       )
-    let elseNode = newNimNode(nnkElse)
-    elseNode.add newNimNode(nnkStmtList)
+    let elseNode = newNimNode(nnkElse, fromNode)
+    elseNode.add newNimNode(nnkStmtList, fromNode)
     elseNode[0].add rootReceiver
     result.add elseNode
 
 template createVar(result: var PNimrodNode, futSymName: string,
                    asyncProc: PNimrodNode,
-                   valueReceiver, rootReceiver: expr) =
-  result = newNimNode(nnkStmtList)
+                   valueReceiver, rootReceiver: expr,
+                   fromNode: PNimrodNode) =
+  result = newNimNode(nnkStmtList, fromNode)
   var futSym = genSym(nskVar, "future")
   result.add newVarStmt(futSym, asyncProc) # -> var future<x> = y
-  result.add newNimNode(nnkYieldStmt).add(futSym) # -> yield future<x>
+  result.add newNimNode(nnkYieldStmt, fromNode).add(futSym) # -> yield future<x>
   valueReceiver = newDotExpr(futSym, newIdentNode("read")) # -> future<x>.read
-  result.add generateExceptionCheck(futSym, exceptBranch, rootReceiver)
+  result.add generateExceptionCheck(futSym, exceptBranch, rootReceiver, fromNode)
 
 proc processBody(node, retFutureSym: PNimrodNode,
                  subTypeIsVoid: bool,
@@ -891,7 +941,7 @@ proc processBody(node, retFutureSym: PNimrodNode,
   result = node
   case node.kind
   of nnkReturnStmt:
-    result = newNimNode(nnkStmtList)
+    result = newNimNode(nnkStmtList, node)
     if node[0].kind == nnkEmpty:
       if not subtypeIsVoid:
         result.add newCall(newIdentNode("complete"), retFutureSym,
@@ -902,19 +952,19 @@ proc processBody(node, retFutureSym: PNimrodNode,
       result.add newCall(newIdentNode("complete"), retFutureSym,
         node[0].processBody(retFutureSym, subtypeIsVoid, exceptBranch))
 
-    result.add newNimNode(nnkReturnStmt).add(newNilLit())
+    result.add newNimNode(nnkReturnStmt, node).add(newNilLit())
     return # Don't process the children of this return stmt
   of nnkCommand:
     if node[0].kind == nnkIdent and node[0].ident == !"await":
       case node[1].kind
       of nnkIdent:
         # await x
-        result = newNimNode(nnkYieldStmt).add(node[1]) # -> yield x
+        result = newNimNode(nnkYieldStmt, node).add(node[1]) # -> yield x
       of nnkCall:
         # await foo(p, x)
         var futureValue: PNimrodNode
         result.createVar("future" & $node[1][0].toStrLit, node[1], futureValue,
-                  futureValue)
+                  futureValue, node)
       else:
         error("Invalid node kind in 'await', got: " & $node[1].kind)
     elif node[1].kind == nnkCommand and node[1][0].kind == nnkIdent and
@@ -922,7 +972,7 @@ proc processBody(node, retFutureSym: PNimrodNode,
       # foo await x
       var newCommand = node
       result.createVar("future" & $node[0].toStrLit, node[1][1], newCommand[1],
-                newCommand)
+                newCommand, node)
 
   of nnkVarSection, nnkLetSection:
     case node[0][2].kind
@@ -931,7 +981,7 @@ proc processBody(node, retFutureSym: PNimrodNode,
         # var x = await y
         var newVarSection = node # TODO: Should this use copyNimNode?
         result.createVar("future" & $node[0][0].ident, node[0][2][1],
-          newVarSection[0][2], newVarSection)
+          newVarSection[0][2], newVarSection, node)
     else: discard
   of nnkAsgn:
     case node[1].kind
@@ -939,7 +989,7 @@ proc processBody(node, retFutureSym: PNimrodNode,
       if node[1][0].ident == !"await":
         # x = await y
         var newAsgn = node
-        result.createVar("future" & $node[0].toStrLit, node[1][1], newAsgn[1], newAsgn)
+        result.createVar("future" & $node[0].toStrLit, node[1][1], newAsgn[1], newAsgn, node)
     else: discard
   of nnkDiscardStmt:
     # discard await x
@@ -947,10 +997,10 @@ proc processBody(node, retFutureSym: PNimrodNode,
           node[0][0].ident == !"await":
       var newDiscard = node
       result.createVar("futureDiscard_" & $toStrLit(node[0][1]), node[0][1],
-                newDiscard[0], newDiscard)
+                newDiscard[0], newDiscard, node)
   of nnkTryStmt:
     # try: await x; except: ...
-    result = newNimNode(nnkStmtList)
+    result = newNimNode(nnkStmtList, node)
     proc processForTry(n: PNimrodNode, i: var int,
                        res: PNimrodNode): bool {.compileTime.} =
       result = false
@@ -1009,7 +1059,7 @@ macro async*(prc: stmt): stmt {.immediate.} =
         (returnType.kind == nnkBracketExpr and
          returnType[1].kind == nnkIdent and returnType[1].ident == !"void")
 
-  var outerProcBody = newNimNode(nnkStmtList)
+  var outerProcBody = newNimNode(nnkStmtList, prc[6])
 
   # -> var retFuture = newFuture[T]()
   var retFutureSym = genSym(nskVar, "retFuture")
@@ -1019,9 +1069,10 @@ macro async*(prc: stmt): stmt {.immediate.} =
   outerProcBody.add(
     newVarStmt(retFutureSym, 
       newCall(
-        newNimNode(nnkBracketExpr).add(
+        newNimNode(nnkBracketExpr, prc[6]).add(
           newIdentNode(!"newFuture"), # TODO: Strange bug here? Remove the `!`.
-          subRetType)))) # Get type from return type of this proc
+          subRetType),
+      newLit(prc[0].getName)))) # Get type from return type of this proc
   
   # -> iterator nameIter(): PFutureBase {.closure.} = 
   # ->   var result: T
@@ -1030,7 +1081,7 @@ macro async*(prc: stmt): stmt {.immediate.} =
   var iteratorNameSym = genSym(nskIterator, $prc[0].getName & "Iter")
   var procBody = prc[6].processBody(retFutureSym, subtypeIsVoid, nil)
   if not subtypeIsVoid:
-    procBody.insert(0, newNimNode(nnkVarSection).add(
+    procBody.insert(0, newNimNode(nnkVarSection, prc[6]).add(
       newIdentDefs(newIdentNode("result"), returnType[1]))) # -> var result: T
     procBody.add(
       newCall(newIdentNode("complete"),
@@ -1041,7 +1092,7 @@ macro async*(prc: stmt): stmt {.immediate.} =
   
   var closureIterator = newProc(iteratorNameSym, [newIdentNode("PFutureBase")],
                                 procBody, nnkIteratorDef)
-  closureIterator[4] = newNimNode(nnkPragma).add(newIdentNode("closure"))
+  closureIterator[4] = newNimNode(nnkPragma, prc[6]).add(newIdentNode("closure"))
   outerProcBody.add(closureIterator)
 
   # -> createCb(retFuture)
@@ -1051,7 +1102,7 @@ macro async*(prc: stmt): stmt {.immediate.} =
   outerProcBody.add procCb
 
   # -> return retFuture
-  outerProcBody.add newNimNode(nnkReturnStmt).add(retFutureSym)
+  outerProcBody.add newNimNode(nnkReturnStmt, prc[6][prc[6].len-1]).add(retFutureSym)
   
   result = prc
 
@@ -1068,8 +1119,8 @@ macro async*(prc: stmt): stmt {.immediate.} =
   result[6] = outerProcBody
 
   #echo(treeRepr(result))
-  #if prc[0].getName == "routeReq":
-  #echo(toStrLit(result))
+  #if prc[0].getName == "processClient":
+  #  echo(toStrLit(result))
 
 proc recvLine*(socket: TAsyncFD): PFuture[string] {.async.} =
   ## Reads a line of data from ``socket``. Returned future will complete once