diff options
author | Dominik Picheta <dominikpicheta@googlemail.com> | 2014-08-09 17:13:22 +0100 |
---|---|---|
committer | Dominik Picheta <dominikpicheta@googlemail.com> | 2014-08-09 17:13:22 +0100 |
commit | 4f5f98f0b1693a221bd3a2087bddf7e6ac350387 (patch) | |
tree | 93ff56f70b8995e249b0f38e9467f524de73fe77 /lib/pure/asyncdispatch.nim | |
parent | fd086abb43d606188920c841bcd4abbed770e7d6 (diff) | |
download | Nim-4f5f98f0b1693a221bd3a2087bddf7e6ac350387.tar.gz |
Fixes incorrect async exception handling. Adds sleepAsync.
The tasyncexceptions test has been added which tests for this incorrect exception handling behaviour. The problem was that the exception was raised inside a callback which was called from a previously finished async procedure. This caused a "Future already finished" error. The fix was to simply reraise the exception if the retFutureSym is already finished. sleepAsync was added to help with the reproduction of this test. It should also be useful for users however. Finally some debug information was added to futures to help with future bugs.
Diffstat (limited to 'lib/pure/asyncdispatch.nim')
-rw-r--r-- | lib/pure/asyncdispatch.nim | 147 |
1 files changed, 99 insertions, 48 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index d410f8ce1..6339232f8 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -9,7 +9,7 @@ include "system/inclrtl" -import os, oids, tables, strutils, macros +import os, oids, tables, strutils, macros, times import rawsockets, net @@ -41,27 +41,40 @@ type cb: proc () {.closure,gcsafe.} finished: bool error*: ref EBase - stackTrace: string ## For debugging purposes only. + when defined(debug): + stackTrace: string ## For debugging purposes only. + id: int + fromProc: string PFuture*[T] = ref object of PFutureBase value: T -proc newFuture*[T](): PFuture[T] = +var currentID* = 0 +proc newFuture*[T](fromProc: string = "unspecified"): PFuture[T] = ## Creates a new future. + ## + ## Specifying ``fromProc``, which is a string specifying the name of the proc + ## that this future belongs to, is a good habit as it helps with debugging. new(result) result.finished = false - result.stackTrace = getStackTrace() + when defined(debug): + result.stackTrace = getStackTrace() + result.id = currentID + result.fromProc = fromProc + currentID.inc() proc checkFinished[T](future: PFuture[T]) = - if future.finished: - echo("<----->") - echo(future.stackTrace) - echo("-----") - when T is string: - echo("Contents: ", future.value.repr) - echo("<----->") - echo("Future already finished, cannot finish twice.") - assert false + when defined(debug): + if future.finished: + echo("<-----> ", future.id, " ", future.fromProc) + echo(future.stackTrace) + echo("-----") + when T is string: + echo("Contents: ", future.value.repr) + echo("<----->") + echo("Future already finished, cannot finish twice.") + echo getStackTrace() + assert false proc complete*[T](future: PFuture[T], val: T) = ## Completes ``future`` with value ``val``. @@ -121,7 +134,8 @@ proc read*[T](future: PFuture[T]): T = ## ## If the result of the future is an error then that error will be raised. if future.finished: - if future.error != nil: raise future.error + if future.error != nil: + raise future.error when T isnot void: return future.value else: @@ -150,7 +164,21 @@ proc asyncCheck*[T](future: PFuture[T]) = ## This should be used instead of ``discard`` to discard void futures. future.callback = proc () = - if future.failed: raise future.error + if future.failed: + raise future.error + +type + PDispatcherBase = ref object of PObject + timers: seq[tuple[finishAt: float, fut: PFuture[void]]] + +proc processTimers(p: PDispatcherBase) = + var oldTimers = p.timers + p.timers = @[] + for t in oldTimers: + if epochTime() >= t.finishAt: + t.fut.complete() + else: + p.timers.add(t) when defined(windows) or defined(nimdoc): import winlean, sets, hashes @@ -162,7 +190,7 @@ when defined(windows) or defined(nimdoc): cb: proc (sock: TAsyncFD, bytesTransferred: DWORD, errcode: TOSErrorCode) {.closure,gcsafe.} - PDispatcher* = ref object + PDispatcher* = ref object of PDispatcherBase ioPort: THandle handles: TSet[TAsyncFD] @@ -181,6 +209,7 @@ when defined(windows) or defined(nimdoc): new result result.ioPort = CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1) result.handles = initSet[TAsyncFD]() + result.timers = @[] var gDisp{.threadvar.}: PDispatcher ## Global dispatcher proc getGlobalDispatcher*(): PDispatcher = @@ -207,8 +236,9 @@ when defined(windows) or defined(nimdoc): proc poll*(timeout = 500) = ## Waits for completion events and processes them. let p = getGlobalDispatcher() - if p.handles.len == 0: - raise newException(EInvalidValue, "No handles registered in dispatcher.") + if p.handles.len == 0 and p.timers.len == 0: + raise newException(EInvalidValue, + "No handles or timers registered in dispatcher.") let llTimeout = if timeout == -1: winlean.INFINITE @@ -242,6 +272,9 @@ when defined(windows) or defined(nimdoc): discard else: osError(errCode) + # Timer processing. + processTimers(p) + var connectExPtr: pointer = nil var acceptExPtr: pointer = nil var getAcceptExSockAddrsPtr: pointer = nil @@ -314,7 +347,7 @@ when defined(windows) or defined(nimdoc): ## Returns a ``PFuture`` which will complete when the connection succeeds ## or an error occurs. verifyPresence(socket) - var retFuture = newFuture[void]() + var retFuture = newFuture[void]("connect") # Apparently ``ConnectEx`` expects the socket to be initially bound: var saddr: Tsockaddr_in saddr.sin_family = int16(toInt(af)) @@ -384,7 +417,7 @@ when defined(windows) or defined(nimdoc): # '\0' in the message currently signifies a socket disconnect. Who # knows what will happen when someone sends that to our socket. verifyPresence(socket) - var retFuture = newFuture[string]() + var retFuture = newFuture[string]("recv") var dataBuf: TWSABuf dataBuf.buf = cast[cstring](alloc0(size)) dataBuf.len = size @@ -459,7 +492,7 @@ when defined(windows) or defined(nimdoc): ## Sends ``data`` to ``socket``. The returned future will complete once all ## data has been sent. verifyPresence(socket) - var retFuture = newFuture[void]() + var retFuture = newFuture[void]("send") var dataBuf: TWSABuf dataBuf.buf = data # since this is not used in a callback, this is fine @@ -502,7 +535,7 @@ when defined(windows) or defined(nimdoc): ## ## The resulting client socket is automatically registered to dispatcher. verifyPresence(socket) - var retFuture = newFuture[tuple[address: string, client: TAsyncFD]]() + var retFuture = newFuture[tuple[address: string, client: TAsyncFD]]("acceptAddr") var clientSock = newRawSocket() if clientSock == osInvalidSocket: osError(osLastError()) @@ -614,6 +647,7 @@ else: proc newDispatcher*(): PDispatcher = new result result.selector = newSelector() + result.timers = @[] var gDisp{.threadvar.}: PDispatcher ## Global dispatcher proc getGlobalDispatcher*(): PDispatcher = @@ -693,6 +727,8 @@ else: else: # FD no longer a part of the selector. Likely been closed # (e.g. socket disconnected). + + processTimers(p) proc connect*(socket: TAsyncFD, address: string, port: TPort, af = AF_INET): PFuture[void] = @@ -814,11 +850,19 @@ else: addRead(socket, cb) return retFuture +proc sleepAsync*(ms: int): PFuture[void] = + ## Suspends the execution of the current async procedure for the next + ## ``ms`` miliseconds. + var retFuture = newFuture[void]("sleepAsync") + let p = getGlobalDispatcher() + p.timers.add((epochTime() + (ms / 1000), retFuture)) + return retFuture + proc accept*(socket: TAsyncFD): PFuture[TAsyncFD] = ## Accepts a new connection. Returns a future containing the client socket ## corresponding to that connection. ## The future will complete when the connection is successfully accepted. - var retFut = newFuture[TAsyncFD]() + var retFut = newFuture[TAsyncFD]("accept") var fut = acceptAddr(socket) fut.callback = proc (future: PFuture[tuple[address: string, client: TAsyncFD]]) = @@ -845,11 +889,16 @@ template createCb*(retFutureSym, iteratorNameSym, else: next.callback = cb except: - retFutureSym.fail(getCurrentException()) + if retFutureSym.finished: + # Take a look at tasyncexceptions for the bug which this fixes. + # That test explains it better than I can here. + raise + else: + retFutureSym.fail(getCurrentException()) cb() #{.pop.} proc generateExceptionCheck(futSym, - exceptBranch, rootReceiver: PNimrodNode): PNimrodNode {.compileTime.} = + exceptBranch, rootReceiver, fromNode: PNimrodNode): PNimrodNode {.compileTime.} = if exceptBranch == nil: result = rootReceiver else: @@ -869,20 +918,21 @@ proc generateExceptionCheck(futSym, ) ) ) - let elseNode = newNimNode(nnkElse) - elseNode.add newNimNode(nnkStmtList) + let elseNode = newNimNode(nnkElse, fromNode) + elseNode.add newNimNode(nnkStmtList, fromNode) elseNode[0].add rootReceiver result.add elseNode template createVar(result: var PNimrodNode, futSymName: string, asyncProc: PNimrodNode, - valueReceiver, rootReceiver: expr) = - result = newNimNode(nnkStmtList) + valueReceiver, rootReceiver: expr, + fromNode: PNimrodNode) = + result = newNimNode(nnkStmtList, fromNode) var futSym = genSym(nskVar, "future") result.add newVarStmt(futSym, asyncProc) # -> var future<x> = y - result.add newNimNode(nnkYieldStmt).add(futSym) # -> yield future<x> + result.add newNimNode(nnkYieldStmt, fromNode).add(futSym) # -> yield future<x> valueReceiver = newDotExpr(futSym, newIdentNode("read")) # -> future<x>.read - result.add generateExceptionCheck(futSym, exceptBranch, rootReceiver) + result.add generateExceptionCheck(futSym, exceptBranch, rootReceiver, fromNode) proc processBody(node, retFutureSym: PNimrodNode, subTypeIsVoid: bool, @@ -891,7 +941,7 @@ proc processBody(node, retFutureSym: PNimrodNode, result = node case node.kind of nnkReturnStmt: - result = newNimNode(nnkStmtList) + result = newNimNode(nnkStmtList, node) if node[0].kind == nnkEmpty: if not subtypeIsVoid: result.add newCall(newIdentNode("complete"), retFutureSym, @@ -902,19 +952,19 @@ proc processBody(node, retFutureSym: PNimrodNode, result.add newCall(newIdentNode("complete"), retFutureSym, node[0].processBody(retFutureSym, subtypeIsVoid, exceptBranch)) - result.add newNimNode(nnkReturnStmt).add(newNilLit()) + result.add newNimNode(nnkReturnStmt, node).add(newNilLit()) return # Don't process the children of this return stmt of nnkCommand: if node[0].kind == nnkIdent and node[0].ident == !"await": case node[1].kind of nnkIdent: # await x - result = newNimNode(nnkYieldStmt).add(node[1]) # -> yield x + result = newNimNode(nnkYieldStmt, node).add(node[1]) # -> yield x of nnkCall: # await foo(p, x) var futureValue: PNimrodNode result.createVar("future" & $node[1][0].toStrLit, node[1], futureValue, - futureValue) + futureValue, node) else: error("Invalid node kind in 'await', got: " & $node[1].kind) elif node[1].kind == nnkCommand and node[1][0].kind == nnkIdent and @@ -922,7 +972,7 @@ proc processBody(node, retFutureSym: PNimrodNode, # foo await x var newCommand = node result.createVar("future" & $node[0].toStrLit, node[1][1], newCommand[1], - newCommand) + newCommand, node) of nnkVarSection, nnkLetSection: case node[0][2].kind @@ -931,7 +981,7 @@ proc processBody(node, retFutureSym: PNimrodNode, # var x = await y var newVarSection = node # TODO: Should this use copyNimNode? result.createVar("future" & $node[0][0].ident, node[0][2][1], - newVarSection[0][2], newVarSection) + newVarSection[0][2], newVarSection, node) else: discard of nnkAsgn: case node[1].kind @@ -939,7 +989,7 @@ proc processBody(node, retFutureSym: PNimrodNode, if node[1][0].ident == !"await": # x = await y var newAsgn = node - result.createVar("future" & $node[0].toStrLit, node[1][1], newAsgn[1], newAsgn) + result.createVar("future" & $node[0].toStrLit, node[1][1], newAsgn[1], newAsgn, node) else: discard of nnkDiscardStmt: # discard await x @@ -947,10 +997,10 @@ proc processBody(node, retFutureSym: PNimrodNode, node[0][0].ident == !"await": var newDiscard = node result.createVar("futureDiscard_" & $toStrLit(node[0][1]), node[0][1], - newDiscard[0], newDiscard) + newDiscard[0], newDiscard, node) of nnkTryStmt: # try: await x; except: ... - result = newNimNode(nnkStmtList) + result = newNimNode(nnkStmtList, node) proc processForTry(n: PNimrodNode, i: var int, res: PNimrodNode): bool {.compileTime.} = result = false @@ -1009,7 +1059,7 @@ macro async*(prc: stmt): stmt {.immediate.} = (returnType.kind == nnkBracketExpr and returnType[1].kind == nnkIdent and returnType[1].ident == !"void") - var outerProcBody = newNimNode(nnkStmtList) + var outerProcBody = newNimNode(nnkStmtList, prc[6]) # -> var retFuture = newFuture[T]() var retFutureSym = genSym(nskVar, "retFuture") @@ -1019,9 +1069,10 @@ macro async*(prc: stmt): stmt {.immediate.} = outerProcBody.add( newVarStmt(retFutureSym, newCall( - newNimNode(nnkBracketExpr).add( + newNimNode(nnkBracketExpr, prc[6]).add( newIdentNode(!"newFuture"), # TODO: Strange bug here? Remove the `!`. - subRetType)))) # Get type from return type of this proc + subRetType), + newLit(prc[0].getName)))) # Get type from return type of this proc # -> iterator nameIter(): PFutureBase {.closure.} = # -> var result: T @@ -1030,7 +1081,7 @@ macro async*(prc: stmt): stmt {.immediate.} = var iteratorNameSym = genSym(nskIterator, $prc[0].getName & "Iter") var procBody = prc[6].processBody(retFutureSym, subtypeIsVoid, nil) if not subtypeIsVoid: - procBody.insert(0, newNimNode(nnkVarSection).add( + procBody.insert(0, newNimNode(nnkVarSection, prc[6]).add( newIdentDefs(newIdentNode("result"), returnType[1]))) # -> var result: T procBody.add( newCall(newIdentNode("complete"), @@ -1041,7 +1092,7 @@ macro async*(prc: stmt): stmt {.immediate.} = var closureIterator = newProc(iteratorNameSym, [newIdentNode("PFutureBase")], procBody, nnkIteratorDef) - closureIterator[4] = newNimNode(nnkPragma).add(newIdentNode("closure")) + closureIterator[4] = newNimNode(nnkPragma, prc[6]).add(newIdentNode("closure")) outerProcBody.add(closureIterator) # -> createCb(retFuture) @@ -1051,7 +1102,7 @@ macro async*(prc: stmt): stmt {.immediate.} = outerProcBody.add procCb # -> return retFuture - outerProcBody.add newNimNode(nnkReturnStmt).add(retFutureSym) + outerProcBody.add newNimNode(nnkReturnStmt, prc[6][prc[6].len-1]).add(retFutureSym) result = prc @@ -1068,8 +1119,8 @@ macro async*(prc: stmt): stmt {.immediate.} = result[6] = outerProcBody #echo(treeRepr(result)) - #if prc[0].getName == "routeReq": - #echo(toStrLit(result)) + #if prc[0].getName == "processClient": + # echo(toStrLit(result)) proc recvLine*(socket: TAsyncFD): PFuture[string] {.async.} = ## Reads a line of data from ``socket``. Returned future will complete once |