diff options
author | Dominik Picheta <dominikpicheta@googlemail.com> | 2014-08-09 17:13:22 +0100 |
---|---|---|
committer | Dominik Picheta <dominikpicheta@googlemail.com> | 2014-08-09 17:13:22 +0100 |
commit | 4f5f98f0b1693a221bd3a2087bddf7e6ac350387 (patch) | |
tree | 93ff56f70b8995e249b0f38e9467f524de73fe77 | |
parent | fd086abb43d606188920c841bcd4abbed770e7d6 (diff) | |
download | Nim-4f5f98f0b1693a221bd3a2087bddf7e6ac350387.tar.gz |
Fixes incorrect async exception handling. Adds sleepAsync.
The tasyncexceptions test has been added which tests for this incorrect exception handling behaviour. The problem was that the exception was raised inside a callback which was called from a previously finished async procedure. This caused a "Future already finished" error. The fix was to simply reraise the exception if the retFutureSym is already finished. sleepAsync was added to help with the reproduction of this test. It should also be useful for users however. Finally some debug information was added to futures to help with future bugs.
-rw-r--r-- | lib/pure/asyncdispatch.nim | 147 | ||||
-rw-r--r-- | lib/pure/asynchttpserver.nim | 2 | ||||
-rw-r--r-- | lib/pure/asyncnet.nim | 4 | ||||
-rw-r--r-- | tests/async/tasyncexceptions.nim | 38 |
4 files changed, 141 insertions, 50 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index d410f8ce1..6339232f8 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -9,7 +9,7 @@ include "system/inclrtl" -import os, oids, tables, strutils, macros +import os, oids, tables, strutils, macros, times import rawsockets, net @@ -41,27 +41,40 @@ type cb: proc () {.closure,gcsafe.} finished: bool error*: ref EBase - stackTrace: string ## For debugging purposes only. + when defined(debug): + stackTrace: string ## For debugging purposes only. + id: int + fromProc: string PFuture*[T] = ref object of PFutureBase value: T -proc newFuture*[T](): PFuture[T] = +var currentID* = 0 +proc newFuture*[T](fromProc: string = "unspecified"): PFuture[T] = ## Creates a new future. + ## + ## Specifying ``fromProc``, which is a string specifying the name of the proc + ## that this future belongs to, is a good habit as it helps with debugging. new(result) result.finished = false - result.stackTrace = getStackTrace() + when defined(debug): + result.stackTrace = getStackTrace() + result.id = currentID + result.fromProc = fromProc + currentID.inc() proc checkFinished[T](future: PFuture[T]) = - if future.finished: - echo("<----->") - echo(future.stackTrace) - echo("-----") - when T is string: - echo("Contents: ", future.value.repr) - echo("<----->") - echo("Future already finished, cannot finish twice.") - assert false + when defined(debug): + if future.finished: + echo("<-----> ", future.id, " ", future.fromProc) + echo(future.stackTrace) + echo("-----") + when T is string: + echo("Contents: ", future.value.repr) + echo("<----->") + echo("Future already finished, cannot finish twice.") + echo getStackTrace() + assert false proc complete*[T](future: PFuture[T], val: T) = ## Completes ``future`` with value ``val``. @@ -121,7 +134,8 @@ proc read*[T](future: PFuture[T]): T = ## ## If the result of the future is an error then that error will be raised. if future.finished: - if future.error != nil: raise future.error + if future.error != nil: + raise future.error when T isnot void: return future.value else: @@ -150,7 +164,21 @@ proc asyncCheck*[T](future: PFuture[T]) = ## This should be used instead of ``discard`` to discard void futures. future.callback = proc () = - if future.failed: raise future.error + if future.failed: + raise future.error + +type + PDispatcherBase = ref object of PObject + timers: seq[tuple[finishAt: float, fut: PFuture[void]]] + +proc processTimers(p: PDispatcherBase) = + var oldTimers = p.timers + p.timers = @[] + for t in oldTimers: + if epochTime() >= t.finishAt: + t.fut.complete() + else: + p.timers.add(t) when defined(windows) or defined(nimdoc): import winlean, sets, hashes @@ -162,7 +190,7 @@ when defined(windows) or defined(nimdoc): cb: proc (sock: TAsyncFD, bytesTransferred: DWORD, errcode: TOSErrorCode) {.closure,gcsafe.} - PDispatcher* = ref object + PDispatcher* = ref object of PDispatcherBase ioPort: THandle handles: TSet[TAsyncFD] @@ -181,6 +209,7 @@ when defined(windows) or defined(nimdoc): new result result.ioPort = CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1) result.handles = initSet[TAsyncFD]() + result.timers = @[] var gDisp{.threadvar.}: PDispatcher ## Global dispatcher proc getGlobalDispatcher*(): PDispatcher = @@ -207,8 +236,9 @@ when defined(windows) or defined(nimdoc): proc poll*(timeout = 500) = ## Waits for completion events and processes them. let p = getGlobalDispatcher() - if p.handles.len == 0: - raise newException(EInvalidValue, "No handles registered in dispatcher.") + if p.handles.len == 0 and p.timers.len == 0: + raise newException(EInvalidValue, + "No handles or timers registered in dispatcher.") let llTimeout = if timeout == -1: winlean.INFINITE @@ -242,6 +272,9 @@ when defined(windows) or defined(nimdoc): discard else: osError(errCode) + # Timer processing. + processTimers(p) + var connectExPtr: pointer = nil var acceptExPtr: pointer = nil var getAcceptExSockAddrsPtr: pointer = nil @@ -314,7 +347,7 @@ when defined(windows) or defined(nimdoc): ## Returns a ``PFuture`` which will complete when the connection succeeds ## or an error occurs. verifyPresence(socket) - var retFuture = newFuture[void]() + var retFuture = newFuture[void]("connect") # Apparently ``ConnectEx`` expects the socket to be initially bound: var saddr: Tsockaddr_in saddr.sin_family = int16(toInt(af)) @@ -384,7 +417,7 @@ when defined(windows) or defined(nimdoc): # '\0' in the message currently signifies a socket disconnect. Who # knows what will happen when someone sends that to our socket. verifyPresence(socket) - var retFuture = newFuture[string]() + var retFuture = newFuture[string]("recv") var dataBuf: TWSABuf dataBuf.buf = cast[cstring](alloc0(size)) dataBuf.len = size @@ -459,7 +492,7 @@ when defined(windows) or defined(nimdoc): ## Sends ``data`` to ``socket``. The returned future will complete once all ## data has been sent. verifyPresence(socket) - var retFuture = newFuture[void]() + var retFuture = newFuture[void]("send") var dataBuf: TWSABuf dataBuf.buf = data # since this is not used in a callback, this is fine @@ -502,7 +535,7 @@ when defined(windows) or defined(nimdoc): ## ## The resulting client socket is automatically registered to dispatcher. verifyPresence(socket) - var retFuture = newFuture[tuple[address: string, client: TAsyncFD]]() + var retFuture = newFuture[tuple[address: string, client: TAsyncFD]]("acceptAddr") var clientSock = newRawSocket() if clientSock == osInvalidSocket: osError(osLastError()) @@ -614,6 +647,7 @@ else: proc newDispatcher*(): PDispatcher = new result result.selector = newSelector() + result.timers = @[] var gDisp{.threadvar.}: PDispatcher ## Global dispatcher proc getGlobalDispatcher*(): PDispatcher = @@ -693,6 +727,8 @@ else: else: # FD no longer a part of the selector. Likely been closed # (e.g. socket disconnected). + + processTimers(p) proc connect*(socket: TAsyncFD, address: string, port: TPort, af = AF_INET): PFuture[void] = @@ -814,11 +850,19 @@ else: addRead(socket, cb) return retFuture +proc sleepAsync*(ms: int): PFuture[void] = + ## Suspends the execution of the current async procedure for the next + ## ``ms`` miliseconds. + var retFuture = newFuture[void]("sleepAsync") + let p = getGlobalDispatcher() + p.timers.add((epochTime() + (ms / 1000), retFuture)) + return retFuture + proc accept*(socket: TAsyncFD): PFuture[TAsyncFD] = ## Accepts a new connection. Returns a future containing the client socket ## corresponding to that connection. ## The future will complete when the connection is successfully accepted. - var retFut = newFuture[TAsyncFD]() + var retFut = newFuture[TAsyncFD]("accept") var fut = acceptAddr(socket) fut.callback = proc (future: PFuture[tuple[address: string, client: TAsyncFD]]) = @@ -845,11 +889,16 @@ template createCb*(retFutureSym, iteratorNameSym, else: next.callback = cb except: - retFutureSym.fail(getCurrentException()) + if retFutureSym.finished: + # Take a look at tasyncexceptions for the bug which this fixes. + # That test explains it better than I can here. + raise + else: + retFutureSym.fail(getCurrentException()) cb() #{.pop.} proc generateExceptionCheck(futSym, - exceptBranch, rootReceiver: PNimrodNode): PNimrodNode {.compileTime.} = + exceptBranch, rootReceiver, fromNode: PNimrodNode): PNimrodNode {.compileTime.} = if exceptBranch == nil: result = rootReceiver else: @@ -869,20 +918,21 @@ proc generateExceptionCheck(futSym, ) ) ) - let elseNode = newNimNode(nnkElse) - elseNode.add newNimNode(nnkStmtList) + let elseNode = newNimNode(nnkElse, fromNode) + elseNode.add newNimNode(nnkStmtList, fromNode) elseNode[0].add rootReceiver result.add elseNode template createVar(result: var PNimrodNode, futSymName: string, asyncProc: PNimrodNode, - valueReceiver, rootReceiver: expr) = - result = newNimNode(nnkStmtList) + valueReceiver, rootReceiver: expr, + fromNode: PNimrodNode) = + result = newNimNode(nnkStmtList, fromNode) var futSym = genSym(nskVar, "future") result.add newVarStmt(futSym, asyncProc) # -> var future<x> = y - result.add newNimNode(nnkYieldStmt).add(futSym) # -> yield future<x> + result.add newNimNode(nnkYieldStmt, fromNode).add(futSym) # -> yield future<x> valueReceiver = newDotExpr(futSym, newIdentNode("read")) # -> future<x>.read - result.add generateExceptionCheck(futSym, exceptBranch, rootReceiver) + result.add generateExceptionCheck(futSym, exceptBranch, rootReceiver, fromNode) proc processBody(node, retFutureSym: PNimrodNode, subTypeIsVoid: bool, @@ -891,7 +941,7 @@ proc processBody(node, retFutureSym: PNimrodNode, result = node case node.kind of nnkReturnStmt: - result = newNimNode(nnkStmtList) + result = newNimNode(nnkStmtList, node) if node[0].kind == nnkEmpty: if not subtypeIsVoid: result.add newCall(newIdentNode("complete"), retFutureSym, @@ -902,19 +952,19 @@ proc processBody(node, retFutureSym: PNimrodNode, result.add newCall(newIdentNode("complete"), retFutureSym, node[0].processBody(retFutureSym, subtypeIsVoid, exceptBranch)) - result.add newNimNode(nnkReturnStmt).add(newNilLit()) + result.add newNimNode(nnkReturnStmt, node).add(newNilLit()) return # Don't process the children of this return stmt of nnkCommand: if node[0].kind == nnkIdent and node[0].ident == !"await": case node[1].kind of nnkIdent: # await x - result = newNimNode(nnkYieldStmt).add(node[1]) # -> yield x + result = newNimNode(nnkYieldStmt, node).add(node[1]) # -> yield x of nnkCall: # await foo(p, x) var futureValue: PNimrodNode result.createVar("future" & $node[1][0].toStrLit, node[1], futureValue, - futureValue) + futureValue, node) else: error("Invalid node kind in 'await', got: " & $node[1].kind) elif node[1].kind == nnkCommand and node[1][0].kind == nnkIdent and @@ -922,7 +972,7 @@ proc processBody(node, retFutureSym: PNimrodNode, # foo await x var newCommand = node result.createVar("future" & $node[0].toStrLit, node[1][1], newCommand[1], - newCommand) + newCommand, node) of nnkVarSection, nnkLetSection: case node[0][2].kind @@ -931,7 +981,7 @@ proc processBody(node, retFutureSym: PNimrodNode, # var x = await y var newVarSection = node # TODO: Should this use copyNimNode? result.createVar("future" & $node[0][0].ident, node[0][2][1], - newVarSection[0][2], newVarSection) + newVarSection[0][2], newVarSection, node) else: discard of nnkAsgn: case node[1].kind @@ -939,7 +989,7 @@ proc processBody(node, retFutureSym: PNimrodNode, if node[1][0].ident == !"await": # x = await y var newAsgn = node - result.createVar("future" & $node[0].toStrLit, node[1][1], newAsgn[1], newAsgn) + result.createVar("future" & $node[0].toStrLit, node[1][1], newAsgn[1], newAsgn, node) else: discard of nnkDiscardStmt: # discard await x @@ -947,10 +997,10 @@ proc processBody(node, retFutureSym: PNimrodNode, node[0][0].ident == !"await": var newDiscard = node result.createVar("futureDiscard_" & $toStrLit(node[0][1]), node[0][1], - newDiscard[0], newDiscard) + newDiscard[0], newDiscard, node) of nnkTryStmt: # try: await x; except: ... - result = newNimNode(nnkStmtList) + result = newNimNode(nnkStmtList, node) proc processForTry(n: PNimrodNode, i: var int, res: PNimrodNode): bool {.compileTime.} = result = false @@ -1009,7 +1059,7 @@ macro async*(prc: stmt): stmt {.immediate.} = (returnType.kind == nnkBracketExpr and returnType[1].kind == nnkIdent and returnType[1].ident == !"void") - var outerProcBody = newNimNode(nnkStmtList) + var outerProcBody = newNimNode(nnkStmtList, prc[6]) # -> var retFuture = newFuture[T]() var retFutureSym = genSym(nskVar, "retFuture") @@ -1019,9 +1069,10 @@ macro async*(prc: stmt): stmt {.immediate.} = outerProcBody.add( newVarStmt(retFutureSym, newCall( - newNimNode(nnkBracketExpr).add( + newNimNode(nnkBracketExpr, prc[6]).add( newIdentNode(!"newFuture"), # TODO: Strange bug here? Remove the `!`. - subRetType)))) # Get type from return type of this proc + subRetType), + newLit(prc[0].getName)))) # Get type from return type of this proc # -> iterator nameIter(): PFutureBase {.closure.} = # -> var result: T @@ -1030,7 +1081,7 @@ macro async*(prc: stmt): stmt {.immediate.} = var iteratorNameSym = genSym(nskIterator, $prc[0].getName & "Iter") var procBody = prc[6].processBody(retFutureSym, subtypeIsVoid, nil) if not subtypeIsVoid: - procBody.insert(0, newNimNode(nnkVarSection).add( + procBody.insert(0, newNimNode(nnkVarSection, prc[6]).add( newIdentDefs(newIdentNode("result"), returnType[1]))) # -> var result: T procBody.add( newCall(newIdentNode("complete"), @@ -1041,7 +1092,7 @@ macro async*(prc: stmt): stmt {.immediate.} = var closureIterator = newProc(iteratorNameSym, [newIdentNode("PFutureBase")], procBody, nnkIteratorDef) - closureIterator[4] = newNimNode(nnkPragma).add(newIdentNode("closure")) + closureIterator[4] = newNimNode(nnkPragma, prc[6]).add(newIdentNode("closure")) outerProcBody.add(closureIterator) # -> createCb(retFuture) @@ -1051,7 +1102,7 @@ macro async*(prc: stmt): stmt {.immediate.} = outerProcBody.add procCb # -> return retFuture - outerProcBody.add newNimNode(nnkReturnStmt).add(retFutureSym) + outerProcBody.add newNimNode(nnkReturnStmt, prc[6][prc[6].len-1]).add(retFutureSym) result = prc @@ -1068,8 +1119,8 @@ macro async*(prc: stmt): stmt {.immediate.} = result[6] = outerProcBody #echo(treeRepr(result)) - #if prc[0].getName == "routeReq": - #echo(toStrLit(result)) + #if prc[0].getName == "processClient": + # echo(toStrLit(result)) proc recvLine*(socket: TAsyncFD): PFuture[string] {.async.} = ## Reads a line of data from ``socket``. Returned future will complete once diff --git a/lib/pure/asynchttpserver.nim b/lib/pure/asynchttpserver.nim index ee6658fd1..e7abfb97c 100644 --- a/lib/pure/asynchttpserver.nim +++ b/lib/pure/asynchttpserver.nim @@ -199,6 +199,8 @@ proc serve*(server: PAsyncHttpServer, port: TPort, #var (address, client) = await server.socket.acceptAddr() var fut = await server.socket.acceptAddr() asyncCheck processClient(fut.client, fut.address, callback) + #echo(f.isNil) + #echo(f.repr) proc close*(server: PAsyncHttpServer) = ## Terminates the async http server instance. diff --git a/lib/pure/asyncnet.nim b/lib/pure/asyncnet.nim index 374ac77e3..db6f80b06 100644 --- a/lib/pure/asyncnet.nim +++ b/lib/pure/asyncnet.nim @@ -140,7 +140,7 @@ proc acceptAddr*(socket: PAsyncSocket): ## Accepts a new connection. Returns a future containing the client socket ## corresponding to that connection and the remote address of the client. ## The future will complete when the connection is successfully accepted. - var retFuture = newFuture[tuple[address: string, client: PAsyncSocket]]() + var retFuture = newFuture[tuple[address: string, client: PAsyncSocket]]("asyncnet.acceptAddr") var fut = acceptAddr(socket.fd.TAsyncFD) fut.callback = proc (future: PFuture[tuple[address: string, client: TAsyncFD]]) = @@ -157,7 +157,7 @@ proc accept*(socket: PAsyncSocket): PFuture[PAsyncSocket] = ## Accepts a new connection. Returns a future containing the client socket ## corresponding to that connection. ## The future will complete when the connection is successfully accepted. - var retFut = newFuture[PAsyncSocket]() + var retFut = newFuture[PAsyncSocket]("asyncnet.accept") var fut = acceptAddr(socket) fut.callback = proc (future: PFuture[tuple[address: string, client: PAsyncSocket]]) = diff --git a/tests/async/tasyncexceptions.nim b/tests/async/tasyncexceptions.nim new file mode 100644 index 000000000..9474fbae8 --- /dev/null +++ b/tests/async/tasyncexceptions.nim @@ -0,0 +1,38 @@ +discard """ + file: "tasyncexceptions.nim" + exitcode: 1 + outputsub: "Error: unhandled exception: foobar [E_Base]" +""" +import asyncdispatch + +proc accept(): PFuture[int] {.async.} = + await sleepAsync(100) + result = 4 + +proc recvLine(fd: int): PFuture[string] {.async.} = + await sleepAsync(100) + return "get" + +proc processClient(fd: int) {.async.} = + # these finish synchronously, we need some async delay to emulate this bug. + var line = await recvLine(fd) + var foo = line[0] + if foo == 'g': + raise newException(EBase, "foobar") + + +proc serve() {.async.} = + + while true: + var fut = await accept() + await processClient(fut) + +when isMainModule: + var fut = serve() + fut.callback = + proc () = + if fut.failed: + # This test ensures that this exception crashes the application + # as it is not handled. + raise fut.error + runForever() |