diff options
author | Araq <rumpf_a@web.de> | 2014-08-10 03:19:00 +0200 |
---|---|---|
committer | Araq <rumpf_a@web.de> | 2014-08-10 03:19:00 +0200 |
commit | 037d7e4e5d3f142fb8486895820c70cf2e1cf641 (patch) | |
tree | 6e47c4b4029b9c57d00e3b2d5ff9f6ea5f3e0eda /lib/pure/asyncdispatch.nim | |
parent | 86b654c58c610df1157538da3e3a86cbbdb4fb99 (diff) | |
parent | 0f15ebf8cad19d4f0c1953c789ff46bcb7306085 (diff) | |
download | Nim-037d7e4e5d3f142fb8486895820c70cf2e1cf641.tar.gz |
Merge branch 'devel' of https://github.com/Araq/Nimrod into devel
Diffstat (limited to 'lib/pure/asyncdispatch.nim')
-rw-r--r-- | lib/pure/asyncdispatch.nim | 168 |
1 files changed, 118 insertions, 50 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index d410f8ce1..dea17d146 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -9,7 +9,7 @@ include "system/inclrtl" -import os, oids, tables, strutils, macros +import os, oids, tables, strutils, macros, times import rawsockets, net @@ -41,27 +41,41 @@ type cb: proc () {.closure,gcsafe.} finished: bool error*: ref EBase - stackTrace: string ## For debugging purposes only. + errorStackTrace*: string + when not defined(release): + stackTrace: string ## For debugging purposes only. + id: int + fromProc: string PFuture*[T] = ref object of PFutureBase value: T -proc newFuture*[T](): PFuture[T] = +var currentID* = 0 +proc newFuture*[T](fromProc: string = "unspecified"): PFuture[T] = ## Creates a new future. + ## + ## Specifying ``fromProc``, which is a string specifying the name of the proc + ## that this future belongs to, is a good habit as it helps with debugging. new(result) result.finished = false - result.stackTrace = getStackTrace() + when not defined(release): + result.stackTrace = getStackTrace() + result.id = currentID + result.fromProc = fromProc + currentID.inc() proc checkFinished[T](future: PFuture[T]) = - if future.finished: - echo("<----->") - echo(future.stackTrace) - echo("-----") - when T is string: - echo("Contents: ", future.value.repr) - echo("<----->") - echo("Future already finished, cannot finish twice.") - assert false + when not defined(release): + if future.finished: + echo("<-----> ", future.id, " ", future.fromProc) + echo(future.stackTrace) + echo("-----") + when T is string: + echo("Contents: ", future.value.repr) + echo("<----->") + echo("Future already finished, cannot finish twice.") + echo getStackTrace() + assert false proc complete*[T](future: PFuture[T], val: T) = ## Completes ``future`` with value ``val``. @@ -88,6 +102,8 @@ proc fail*[T](future: PFuture[T], error: ref EBase) = checkFinished(future) future.finished = true future.error = error + future.errorStackTrace = + if getStackTrace(error) == "": getStackTrace() else: getStackTrace(error) if future.cb != nil: future.cb() else: @@ -115,13 +131,24 @@ proc `callback=`*[T](future: PFuture[T], ## If future has already completed then ``cb`` will be called immediately. future.callback = proc () = cb(future) +proc echoOriginalStackTrace[T](future: PFuture[T]) = + # TODO: Come up with something better. + when not defined(release): + echo("Original stack trace in ", future.fromProc, ":") + if not future.errorStackTrace.isNil() and future.errorStackTrace != "": + echo(future.errorStackTrace) + else: + echo("Empty or nil stack trace.") + proc read*[T](future: PFuture[T]): T = ## Retrieves the value of ``future``. Future must be finished otherwise ## this function will fail with a ``EInvalidValue`` exception. ## ## If the result of the future is an error then that error will be raised. if future.finished: - if future.error != nil: raise future.error + if future.error != nil: + echoOriginalStackTrace(future) + raise future.error when T isnot void: return future.value else: @@ -150,7 +177,22 @@ proc asyncCheck*[T](future: PFuture[T]) = ## This should be used instead of ``discard`` to discard void futures. future.callback = proc () = - if future.failed: raise future.error + if future.failed: + echoOriginalStackTrace(future) + raise future.error + +type + PDispatcherBase = ref object of PObject + timers: seq[tuple[finishAt: float, fut: PFuture[void]]] + +proc processTimers(p: PDispatcherBase) = + var oldTimers = p.timers + p.timers = @[] + for t in oldTimers: + if epochTime() >= t.finishAt: + t.fut.complete() + else: + p.timers.add(t) when defined(windows) or defined(nimdoc): import winlean, sets, hashes @@ -162,7 +204,7 @@ when defined(windows) or defined(nimdoc): cb: proc (sock: TAsyncFD, bytesTransferred: DWORD, errcode: TOSErrorCode) {.closure,gcsafe.} - PDispatcher* = ref object + PDispatcher* = ref object of PDispatcherBase ioPort: THandle handles: TSet[TAsyncFD] @@ -181,6 +223,7 @@ when defined(windows) or defined(nimdoc): new result result.ioPort = CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1) result.handles = initSet[TAsyncFD]() + result.timers = @[] var gDisp{.threadvar.}: PDispatcher ## Global dispatcher proc getGlobalDispatcher*(): PDispatcher = @@ -207,8 +250,9 @@ when defined(windows) or defined(nimdoc): proc poll*(timeout = 500) = ## Waits for completion events and processes them. let p = getGlobalDispatcher() - if p.handles.len == 0: - raise newException(EInvalidValue, "No handles registered in dispatcher.") + if p.handles.len == 0 and p.timers.len == 0: + raise newException(EInvalidValue, + "No handles or timers registered in dispatcher.") let llTimeout = if timeout == -1: winlean.INFINITE @@ -242,6 +286,9 @@ when defined(windows) or defined(nimdoc): discard else: osError(errCode) + # Timer processing. + processTimers(p) + var connectExPtr: pointer = nil var acceptExPtr: pointer = nil var getAcceptExSockAddrsPtr: pointer = nil @@ -314,7 +361,7 @@ when defined(windows) or defined(nimdoc): ## Returns a ``PFuture`` which will complete when the connection succeeds ## or an error occurs. verifyPresence(socket) - var retFuture = newFuture[void]() + var retFuture = newFuture[void]("connect") # Apparently ``ConnectEx`` expects the socket to be initially bound: var saddr: Tsockaddr_in saddr.sin_family = int16(toInt(af)) @@ -384,7 +431,7 @@ when defined(windows) or defined(nimdoc): # '\0' in the message currently signifies a socket disconnect. Who # knows what will happen when someone sends that to our socket. verifyPresence(socket) - var retFuture = newFuture[string]() + var retFuture = newFuture[string]("recv") var dataBuf: TWSABuf dataBuf.buf = cast[cstring](alloc0(size)) dataBuf.len = size @@ -405,7 +452,10 @@ when defined(windows) or defined(nimdoc): copyMem(addr data[0], addr dataBuf.buf[0], bytesCount) retFuture.complete($data) else: - retFuture.fail(newException(EOS, osErrorMsg(errcode))) + if flags.isDisconnectionError(errcode): + retFuture.complete("") + else: + retFuture.fail(newException(EOS, osErrorMsg(errcode))) if dataBuf.buf != nil: dealloc dataBuf.buf dataBuf.buf = nil @@ -459,7 +509,7 @@ when defined(windows) or defined(nimdoc): ## Sends ``data`` to ``socket``. The returned future will complete once all ## data has been sent. verifyPresence(socket) - var retFuture = newFuture[void]() + var retFuture = newFuture[void]("send") var dataBuf: TWSABuf dataBuf.buf = data # since this is not used in a callback, this is fine @@ -502,7 +552,7 @@ when defined(windows) or defined(nimdoc): ## ## The resulting client socket is automatically registered to dispatcher. verifyPresence(socket) - var retFuture = newFuture[tuple[address: string, client: TAsyncFD]]() + var retFuture = newFuture[tuple[address: string, client: TAsyncFD]]("acceptAddr") var clientSock = newRawSocket() if clientSock == osInvalidSocket: osError(osLastError()) @@ -606,7 +656,7 @@ else: readCBs: seq[TCallback] writeCBs: seq[TCallback] - PDispatcher* = ref object + PDispatcher* = ref object of PDispatcherBase selector: PSelector proc `==`*(x, y: TAsyncFD): bool {.borrow.} @@ -614,6 +664,7 @@ else: proc newDispatcher*(): PDispatcher = new result result.selector = newSelector() + result.timers = @[] var gDisp{.threadvar.}: PDispatcher ## Global dispatcher proc getGlobalDispatcher*(): PDispatcher = @@ -693,6 +744,8 @@ else: else: # FD no longer a part of the selector. Likely been closed # (e.g. socket disconnected). + + processTimers(p) proc connect*(socket: TAsyncFD, address: string, port: TPort, af = AF_INET): PFuture[void] = @@ -814,11 +867,19 @@ else: addRead(socket, cb) return retFuture +proc sleepAsync*(ms: int): PFuture[void] = + ## Suspends the execution of the current async procedure for the next + ## ``ms`` miliseconds. + var retFuture = newFuture[void]("sleepAsync") + let p = getGlobalDispatcher() + p.timers.add((epochTime() + (ms / 1000), retFuture)) + return retFuture + proc accept*(socket: TAsyncFD): PFuture[TAsyncFD] = ## Accepts a new connection. Returns a future containing the client socket ## corresponding to that connection. ## The future will complete when the connection is successfully accepted. - var retFut = newFuture[TAsyncFD]() + var retFut = newFuture[TAsyncFD]("accept") var fut = acceptAddr(socket) fut.callback = proc (future: PFuture[tuple[address: string, client: TAsyncFD]]) = @@ -845,11 +906,16 @@ template createCb*(retFutureSym, iteratorNameSym, else: next.callback = cb except: - retFutureSym.fail(getCurrentException()) + if retFutureSym.finished: + # Take a look at tasyncexceptions for the bug which this fixes. + # That test explains it better than I can here. + raise + else: + retFutureSym.fail(getCurrentException()) cb() #{.pop.} proc generateExceptionCheck(futSym, - exceptBranch, rootReceiver: PNimrodNode): PNimrodNode {.compileTime.} = + exceptBranch, rootReceiver, fromNode: PNimrodNode): PNimrodNode {.compileTime.} = if exceptBranch == nil: result = rootReceiver else: @@ -869,20 +935,21 @@ proc generateExceptionCheck(futSym, ) ) ) - let elseNode = newNimNode(nnkElse) - elseNode.add newNimNode(nnkStmtList) + let elseNode = newNimNode(nnkElse, fromNode) + elseNode.add newNimNode(nnkStmtList, fromNode) elseNode[0].add rootReceiver result.add elseNode template createVar(result: var PNimrodNode, futSymName: string, asyncProc: PNimrodNode, - valueReceiver, rootReceiver: expr) = - result = newNimNode(nnkStmtList) + valueReceiver, rootReceiver: expr, + fromNode: PNimrodNode) = + result = newNimNode(nnkStmtList, fromNode) var futSym = genSym(nskVar, "future") result.add newVarStmt(futSym, asyncProc) # -> var future<x> = y - result.add newNimNode(nnkYieldStmt).add(futSym) # -> yield future<x> + result.add newNimNode(nnkYieldStmt, fromNode).add(futSym) # -> yield future<x> valueReceiver = newDotExpr(futSym, newIdentNode("read")) # -> future<x>.read - result.add generateExceptionCheck(futSym, exceptBranch, rootReceiver) + result.add generateExceptionCheck(futSym, exceptBranch, rootReceiver, fromNode) proc processBody(node, retFutureSym: PNimrodNode, subTypeIsVoid: bool, @@ -891,7 +958,7 @@ proc processBody(node, retFutureSym: PNimrodNode, result = node case node.kind of nnkReturnStmt: - result = newNimNode(nnkStmtList) + result = newNimNode(nnkStmtList, node) if node[0].kind == nnkEmpty: if not subtypeIsVoid: result.add newCall(newIdentNode("complete"), retFutureSym, @@ -902,19 +969,19 @@ proc processBody(node, retFutureSym: PNimrodNode, result.add newCall(newIdentNode("complete"), retFutureSym, node[0].processBody(retFutureSym, subtypeIsVoid, exceptBranch)) - result.add newNimNode(nnkReturnStmt).add(newNilLit()) + result.add newNimNode(nnkReturnStmt, node).add(newNilLit()) return # Don't process the children of this return stmt of nnkCommand: if node[0].kind == nnkIdent and node[0].ident == !"await": case node[1].kind of nnkIdent: # await x - result = newNimNode(nnkYieldStmt).add(node[1]) # -> yield x + result = newNimNode(nnkYieldStmt, node).add(node[1]) # -> yield x of nnkCall: # await foo(p, x) var futureValue: PNimrodNode result.createVar("future" & $node[1][0].toStrLit, node[1], futureValue, - futureValue) + futureValue, node) else: error("Invalid node kind in 'await', got: " & $node[1].kind) elif node[1].kind == nnkCommand and node[1][0].kind == nnkIdent and @@ -922,7 +989,7 @@ proc processBody(node, retFutureSym: PNimrodNode, # foo await x var newCommand = node result.createVar("future" & $node[0].toStrLit, node[1][1], newCommand[1], - newCommand) + newCommand, node) of nnkVarSection, nnkLetSection: case node[0][2].kind @@ -931,7 +998,7 @@ proc processBody(node, retFutureSym: PNimrodNode, # var x = await y var newVarSection = node # TODO: Should this use copyNimNode? result.createVar("future" & $node[0][0].ident, node[0][2][1], - newVarSection[0][2], newVarSection) + newVarSection[0][2], newVarSection, node) else: discard of nnkAsgn: case node[1].kind @@ -939,7 +1006,7 @@ proc processBody(node, retFutureSym: PNimrodNode, if node[1][0].ident == !"await": # x = await y var newAsgn = node - result.createVar("future" & $node[0].toStrLit, node[1][1], newAsgn[1], newAsgn) + result.createVar("future" & $node[0].toStrLit, node[1][1], newAsgn[1], newAsgn, node) else: discard of nnkDiscardStmt: # discard await x @@ -947,10 +1014,10 @@ proc processBody(node, retFutureSym: PNimrodNode, node[0][0].ident == !"await": var newDiscard = node result.createVar("futureDiscard_" & $toStrLit(node[0][1]), node[0][1], - newDiscard[0], newDiscard) + newDiscard[0], newDiscard, node) of nnkTryStmt: # try: await x; except: ... - result = newNimNode(nnkStmtList) + result = newNimNode(nnkStmtList, node) proc processForTry(n: PNimrodNode, i: var int, res: PNimrodNode): bool {.compileTime.} = result = false @@ -1009,7 +1076,7 @@ macro async*(prc: stmt): stmt {.immediate.} = (returnType.kind == nnkBracketExpr and returnType[1].kind == nnkIdent and returnType[1].ident == !"void") - var outerProcBody = newNimNode(nnkStmtList) + var outerProcBody = newNimNode(nnkStmtList, prc[6]) # -> var retFuture = newFuture[T]() var retFutureSym = genSym(nskVar, "retFuture") @@ -1019,9 +1086,10 @@ macro async*(prc: stmt): stmt {.immediate.} = outerProcBody.add( newVarStmt(retFutureSym, newCall( - newNimNode(nnkBracketExpr).add( + newNimNode(nnkBracketExpr, prc[6]).add( newIdentNode(!"newFuture"), # TODO: Strange bug here? Remove the `!`. - subRetType)))) # Get type from return type of this proc + subRetType), + newLit(prc[0].getName)))) # Get type from return type of this proc # -> iterator nameIter(): PFutureBase {.closure.} = # -> var result: T @@ -1030,7 +1098,7 @@ macro async*(prc: stmt): stmt {.immediate.} = var iteratorNameSym = genSym(nskIterator, $prc[0].getName & "Iter") var procBody = prc[6].processBody(retFutureSym, subtypeIsVoid, nil) if not subtypeIsVoid: - procBody.insert(0, newNimNode(nnkVarSection).add( + procBody.insert(0, newNimNode(nnkVarSection, prc[6]).add( newIdentDefs(newIdentNode("result"), returnType[1]))) # -> var result: T procBody.add( newCall(newIdentNode("complete"), @@ -1041,7 +1109,7 @@ macro async*(prc: stmt): stmt {.immediate.} = var closureIterator = newProc(iteratorNameSym, [newIdentNode("PFutureBase")], procBody, nnkIteratorDef) - closureIterator[4] = newNimNode(nnkPragma).add(newIdentNode("closure")) + closureIterator[4] = newNimNode(nnkPragma, prc[6]).add(newIdentNode("closure")) outerProcBody.add(closureIterator) # -> createCb(retFuture) @@ -1051,7 +1119,7 @@ macro async*(prc: stmt): stmt {.immediate.} = outerProcBody.add procCb # -> return retFuture - outerProcBody.add newNimNode(nnkReturnStmt).add(retFutureSym) + outerProcBody.add newNimNode(nnkReturnStmt, prc[6][prc[6].len-1]).add(retFutureSym) result = prc @@ -1068,8 +1136,8 @@ macro async*(prc: stmt): stmt {.immediate.} = result[6] = outerProcBody #echo(treeRepr(result)) - #if prc[0].getName == "routeReq": - #echo(toStrLit(result)) + #if prc[0].getName == "processClient": + # echo(toStrLit(result)) proc recvLine*(socket: TAsyncFD): PFuture[string] {.async.} = ## Reads a line of data from ``socket``. Returned future will complete once |