summary refs log tree commit diff stats
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/pure/asyncdispatch.nim147
-rw-r--r--lib/pure/asynchttpserver.nim2
-rw-r--r--lib/pure/asyncnet.nim4
3 files changed, 103 insertions, 50 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim
index d410f8ce1..6339232f8 100644
--- a/lib/pure/asyncdispatch.nim
+++ b/lib/pure/asyncdispatch.nim
@@ -9,7 +9,7 @@
 
 include "system/inclrtl"
 
-import os, oids, tables, strutils, macros
+import os, oids, tables, strutils, macros, times
 
 import rawsockets, net
 
@@ -41,27 +41,40 @@ type
     cb: proc () {.closure,gcsafe.}
     finished: bool
     error*: ref EBase
-    stackTrace: string ## For debugging purposes only.
+    when defined(debug):
+      stackTrace: string ## For debugging purposes only.
+      id: int
+      fromProc: string
 
   PFuture*[T] = ref object of PFutureBase
     value: T
 
-proc newFuture*[T](): PFuture[T] =
+var currentID* = 0
+proc newFuture*[T](fromProc: string = "unspecified"): PFuture[T] =
   ## Creates a new future.
+  ##
+  ## Specifying ``fromProc``, which is a string specifying the name of the proc
+  ## that this future belongs to, is a good habit as it helps with debugging.
   new(result)
   result.finished = false
-  result.stackTrace = getStackTrace()
+  when defined(debug):
+    result.stackTrace = getStackTrace()
+    result.id = currentID
+    result.fromProc = fromProc
+    currentID.inc()
 
 proc checkFinished[T](future: PFuture[T]) =
-  if future.finished:
-    echo("<----->")
-    echo(future.stackTrace)
-    echo("-----")
-    when T is string:
-      echo("Contents: ", future.value.repr)
-    echo("<----->")
-    echo("Future already finished, cannot finish twice.")
-    assert false
+  when defined(debug):
+    if future.finished:
+      echo("<-----> ", future.id, " ", future.fromProc)
+      echo(future.stackTrace)
+      echo("-----")
+      when T is string:
+        echo("Contents: ", future.value.repr)
+      echo("<----->")
+      echo("Future already finished, cannot finish twice.")
+      echo getStackTrace()
+      assert false
 
 proc complete*[T](future: PFuture[T], val: T) =
   ## Completes ``future`` with value ``val``.
@@ -121,7 +134,8 @@ proc read*[T](future: PFuture[T]): T =
   ##
   ## If the result of the future is an error then that error will be raised.
   if future.finished:
-    if future.error != nil: raise future.error
+    if future.error != nil:
+      raise future.error
     when T isnot void:
       return future.value
   else:
@@ -150,7 +164,21 @@ proc asyncCheck*[T](future: PFuture[T]) =
   ## This should be used instead of ``discard`` to discard void futures.
   future.callback =
     proc () =
-      if future.failed: raise future.error
+      if future.failed:
+        raise future.error
+
+type
+  PDispatcherBase = ref object of PObject
+    timers: seq[tuple[finishAt: float, fut: PFuture[void]]]
+
+proc processTimers(p: PDispatcherBase) =
+  var oldTimers = p.timers
+  p.timers = @[]
+  for t in oldTimers:
+    if epochTime() >= t.finishAt:
+      t.fut.complete()
+    else:
+      p.timers.add(t)
 
 when defined(windows) or defined(nimdoc):
   import winlean, sets, hashes
@@ -162,7 +190,7 @@ when defined(windows) or defined(nimdoc):
       cb: proc (sock: TAsyncFD, bytesTransferred: DWORD,
                 errcode: TOSErrorCode) {.closure,gcsafe.}
 
-    PDispatcher* = ref object
+    PDispatcher* = ref object of PDispatcherBase
       ioPort: THandle
       handles: TSet[TAsyncFD]
 
@@ -181,6 +209,7 @@ when defined(windows) or defined(nimdoc):
     new result
     result.ioPort = CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
     result.handles = initSet[TAsyncFD]()
+    result.timers = @[]
 
   var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
   proc getGlobalDispatcher*(): PDispatcher =
@@ -207,8 +236,9 @@ when defined(windows) or defined(nimdoc):
   proc poll*(timeout = 500) =
     ## Waits for completion events and processes them.
     let p = getGlobalDispatcher()
-    if p.handles.len == 0:
-      raise newException(EInvalidValue, "No handles registered in dispatcher.")
+    if p.handles.len == 0 and p.timers.len == 0:
+      raise newException(EInvalidValue,
+        "No handles or timers registered in dispatcher.")
     
     let llTimeout =
       if timeout ==  -1: winlean.INFINITE
@@ -242,6 +272,9 @@ when defined(windows) or defined(nimdoc):
           discard
         else: osError(errCode)
 
+    # Timer processing.
+    processTimers(p)
+
   var connectExPtr: pointer = nil
   var acceptExPtr: pointer = nil
   var getAcceptExSockAddrsPtr: pointer = nil
@@ -314,7 +347,7 @@ when defined(windows) or defined(nimdoc):
     ## Returns a ``PFuture`` which will complete when the connection succeeds
     ## or an error occurs.
     verifyPresence(socket)
-    var retFuture = newFuture[void]()
+    var retFuture = newFuture[void]("connect")
     # Apparently ``ConnectEx`` expects the socket to be initially bound:
     var saddr: Tsockaddr_in
     saddr.sin_family = int16(toInt(af))
@@ -384,7 +417,7 @@ when defined(windows) or defined(nimdoc):
     #     '\0' in the message currently signifies a socket disconnect. Who
     #     knows what will happen when someone sends that to our socket.
     verifyPresence(socket)
-    var retFuture = newFuture[string]()    
+    var retFuture = newFuture[string]("recv")
     var dataBuf: TWSABuf
     dataBuf.buf = cast[cstring](alloc0(size))
     dataBuf.len = size
@@ -459,7 +492,7 @@ when defined(windows) or defined(nimdoc):
     ## Sends ``data`` to ``socket``. The returned future will complete once all
     ## data has been sent.
     verifyPresence(socket)
-    var retFuture = newFuture[void]()
+    var retFuture = newFuture[void]("send")
 
     var dataBuf: TWSABuf
     dataBuf.buf = data # since this is not used in a callback, this is fine
@@ -502,7 +535,7 @@ when defined(windows) or defined(nimdoc):
     ##
     ## The resulting client socket is automatically registered to dispatcher.
     verifyPresence(socket)
-    var retFuture = newFuture[tuple[address: string, client: TAsyncFD]]()
+    var retFuture = newFuture[tuple[address: string, client: TAsyncFD]]("acceptAddr")
 
     var clientSock = newRawSocket()
     if clientSock == osInvalidSocket: osError(osLastError())
@@ -614,6 +647,7 @@ else:
   proc newDispatcher*(): PDispatcher =
     new result
     result.selector = newSelector()
+    result.timers = @[]
 
   var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
   proc getGlobalDispatcher*(): PDispatcher =
@@ -693,6 +727,8 @@ else:
       else:
         # FD no longer a part of the selector. Likely been closed
         # (e.g. socket disconnected).
+
+    processTimers(p)
   
   proc connect*(socket: TAsyncFD, address: string, port: TPort,
     af = AF_INET): PFuture[void] =
@@ -814,11 +850,19 @@ else:
     addRead(socket, cb)
     return retFuture
 
+proc sleepAsync*(ms: int): PFuture[void] =
+  ## Suspends the execution of the current async procedure for the next
+  ## ``ms`` miliseconds.
+  var retFuture = newFuture[void]("sleepAsync")
+  let p = getGlobalDispatcher()
+  p.timers.add((epochTime() + (ms / 1000), retFuture))
+  return retFuture
+
 proc accept*(socket: TAsyncFD): PFuture[TAsyncFD] =
   ## Accepts a new connection. Returns a future containing the client socket
   ## corresponding to that connection.
   ## The future will complete when the connection is successfully accepted.
-  var retFut = newFuture[TAsyncFD]()
+  var retFut = newFuture[TAsyncFD]("accept")
   var fut = acceptAddr(socket)
   fut.callback =
     proc (future: PFuture[tuple[address: string, client: TAsyncFD]]) =
@@ -845,11 +889,16 @@ template createCb*(retFutureSym, iteratorNameSym,
         else:
           next.callback = cb
     except:
-      retFutureSym.fail(getCurrentException())
+      if retFutureSym.finished:
+        # Take a look at tasyncexceptions for the bug which this fixes.
+        # That test explains it better than I can here.
+        raise
+      else:
+        retFutureSym.fail(getCurrentException())
   cb()
   #{.pop.}
 proc generateExceptionCheck(futSym,
-    exceptBranch, rootReceiver: PNimrodNode): PNimrodNode {.compileTime.} =
+    exceptBranch, rootReceiver, fromNode: PNimrodNode): PNimrodNode {.compileTime.} =
   if exceptBranch == nil:
     result = rootReceiver
   else:
@@ -869,20 +918,21 @@ proc generateExceptionCheck(futSym,
            )
          )
       )
-    let elseNode = newNimNode(nnkElse)
-    elseNode.add newNimNode(nnkStmtList)
+    let elseNode = newNimNode(nnkElse, fromNode)
+    elseNode.add newNimNode(nnkStmtList, fromNode)
     elseNode[0].add rootReceiver
     result.add elseNode
 
 template createVar(result: var PNimrodNode, futSymName: string,
                    asyncProc: PNimrodNode,
-                   valueReceiver, rootReceiver: expr) =
-  result = newNimNode(nnkStmtList)
+                   valueReceiver, rootReceiver: expr,
+                   fromNode: PNimrodNode) =
+  result = newNimNode(nnkStmtList, fromNode)
   var futSym = genSym(nskVar, "future")
   result.add newVarStmt(futSym, asyncProc) # -> var future<x> = y
-  result.add newNimNode(nnkYieldStmt).add(futSym) # -> yield future<x>
+  result.add newNimNode(nnkYieldStmt, fromNode).add(futSym) # -> yield future<x>
   valueReceiver = newDotExpr(futSym, newIdentNode("read")) # -> future<x>.read
-  result.add generateExceptionCheck(futSym, exceptBranch, rootReceiver)
+  result.add generateExceptionCheck(futSym, exceptBranch, rootReceiver, fromNode)
 
 proc processBody(node, retFutureSym: PNimrodNode,
                  subTypeIsVoid: bool,
@@ -891,7 +941,7 @@ proc processBody(node, retFutureSym: PNimrodNode,
   result = node
   case node.kind
   of nnkReturnStmt:
-    result = newNimNode(nnkStmtList)
+    result = newNimNode(nnkStmtList, node)
     if node[0].kind == nnkEmpty:
       if not subtypeIsVoid:
         result.add newCall(newIdentNode("complete"), retFutureSym,
@@ -902,19 +952,19 @@ proc processBody(node, retFutureSym: PNimrodNode,
       result.add newCall(newIdentNode("complete"), retFutureSym,
         node[0].processBody(retFutureSym, subtypeIsVoid, exceptBranch))
 
-    result.add newNimNode(nnkReturnStmt).add(newNilLit())
+    result.add newNimNode(nnkReturnStmt, node).add(newNilLit())
     return # Don't process the children of this return stmt
   of nnkCommand:
     if node[0].kind == nnkIdent and node[0].ident == !"await":
       case node[1].kind
       of nnkIdent:
         # await x
-        result = newNimNode(nnkYieldStmt).add(node[1]) # -> yield x
+        result = newNimNode(nnkYieldStmt, node).add(node[1]) # -> yield x
       of nnkCall:
         # await foo(p, x)
         var futureValue: PNimrodNode
         result.createVar("future" & $node[1][0].toStrLit, node[1], futureValue,
-                  futureValue)
+                  futureValue, node)
       else:
         error("Invalid node kind in 'await', got: " & $node[1].kind)
     elif node[1].kind == nnkCommand and node[1][0].kind == nnkIdent and
@@ -922,7 +972,7 @@ proc processBody(node, retFutureSym: PNimrodNode,
       # foo await x
       var newCommand = node
       result.createVar("future" & $node[0].toStrLit, node[1][1], newCommand[1],
-                newCommand)
+                newCommand, node)
 
   of nnkVarSection, nnkLetSection:
     case node[0][2].kind
@@ -931,7 +981,7 @@ proc processBody(node, retFutureSym: PNimrodNode,
         # var x = await y
         var newVarSection = node # TODO: Should this use copyNimNode?
         result.createVar("future" & $node[0][0].ident, node[0][2][1],
-          newVarSection[0][2], newVarSection)
+          newVarSection[0][2], newVarSection, node)
     else: discard
   of nnkAsgn:
     case node[1].kind
@@ -939,7 +989,7 @@ proc processBody(node, retFutureSym: PNimrodNode,
       if node[1][0].ident == !"await":
         # x = await y
         var newAsgn = node
-        result.createVar("future" & $node[0].toStrLit, node[1][1], newAsgn[1], newAsgn)
+        result.createVar("future" & $node[0].toStrLit, node[1][1], newAsgn[1], newAsgn, node)
     else: discard
   of nnkDiscardStmt:
     # discard await x
@@ -947,10 +997,10 @@ proc processBody(node, retFutureSym: PNimrodNode,
           node[0][0].ident == !"await":
       var newDiscard = node
       result.createVar("futureDiscard_" & $toStrLit(node[0][1]), node[0][1],
-                newDiscard[0], newDiscard)
+                newDiscard[0], newDiscard, node)
   of nnkTryStmt:
     # try: await x; except: ...
-    result = newNimNode(nnkStmtList)
+    result = newNimNode(nnkStmtList, node)
     proc processForTry(n: PNimrodNode, i: var int,
                        res: PNimrodNode): bool {.compileTime.} =
       result = false
@@ -1009,7 +1059,7 @@ macro async*(prc: stmt): stmt {.immediate.} =
         (returnType.kind == nnkBracketExpr and
          returnType[1].kind == nnkIdent and returnType[1].ident == !"void")
 
-  var outerProcBody = newNimNode(nnkStmtList)
+  var outerProcBody = newNimNode(nnkStmtList, prc[6])
 
   # -> var retFuture = newFuture[T]()
   var retFutureSym = genSym(nskVar, "retFuture")
@@ -1019,9 +1069,10 @@ macro async*(prc: stmt): stmt {.immediate.} =
   outerProcBody.add(
     newVarStmt(retFutureSym, 
       newCall(
-        newNimNode(nnkBracketExpr).add(
+        newNimNode(nnkBracketExpr, prc[6]).add(
           newIdentNode(!"newFuture"), # TODO: Strange bug here? Remove the `!`.
-          subRetType)))) # Get type from return type of this proc
+          subRetType),
+      newLit(prc[0].getName)))) # Get type from return type of this proc
   
   # -> iterator nameIter(): PFutureBase {.closure.} = 
   # ->   var result: T
@@ -1030,7 +1081,7 @@ macro async*(prc: stmt): stmt {.immediate.} =
   var iteratorNameSym = genSym(nskIterator, $prc[0].getName & "Iter")
   var procBody = prc[6].processBody(retFutureSym, subtypeIsVoid, nil)
   if not subtypeIsVoid:
-    procBody.insert(0, newNimNode(nnkVarSection).add(
+    procBody.insert(0, newNimNode(nnkVarSection, prc[6]).add(
       newIdentDefs(newIdentNode("result"), returnType[1]))) # -> var result: T
     procBody.add(
       newCall(newIdentNode("complete"),
@@ -1041,7 +1092,7 @@ macro async*(prc: stmt): stmt {.immediate.} =
   
   var closureIterator = newProc(iteratorNameSym, [newIdentNode("PFutureBase")],
                                 procBody, nnkIteratorDef)
-  closureIterator[4] = newNimNode(nnkPragma).add(newIdentNode("closure"))
+  closureIterator[4] = newNimNode(nnkPragma, prc[6]).add(newIdentNode("closure"))
   outerProcBody.add(closureIterator)
 
   # -> createCb(retFuture)
@@ -1051,7 +1102,7 @@ macro async*(prc: stmt): stmt {.immediate.} =
   outerProcBody.add procCb
 
   # -> return retFuture
-  outerProcBody.add newNimNode(nnkReturnStmt).add(retFutureSym)
+  outerProcBody.add newNimNode(nnkReturnStmt, prc[6][prc[6].len-1]).add(retFutureSym)
   
   result = prc
 
@@ -1068,8 +1119,8 @@ macro async*(prc: stmt): stmt {.immediate.} =
   result[6] = outerProcBody
 
   #echo(treeRepr(result))
-  #if prc[0].getName == "routeReq":
-  #echo(toStrLit(result))
+  #if prc[0].getName == "processClient":
+  #  echo(toStrLit(result))
 
 proc recvLine*(socket: TAsyncFD): PFuture[string] {.async.} =
   ## Reads a line of data from ``socket``. Returned future will complete once
diff --git a/lib/pure/asynchttpserver.nim b/lib/pure/asynchttpserver.nim
index ee6658fd1..e7abfb97c 100644
--- a/lib/pure/asynchttpserver.nim
+++ b/lib/pure/asynchttpserver.nim
@@ -199,6 +199,8 @@ proc serve*(server: PAsyncHttpServer, port: TPort,
     #var (address, client) = await server.socket.acceptAddr()
     var fut = await server.socket.acceptAddr()
     asyncCheck processClient(fut.client, fut.address, callback)
+    #echo(f.isNil)
+    #echo(f.repr)
 
 proc close*(server: PAsyncHttpServer) =
   ## Terminates the async http server instance.
diff --git a/lib/pure/asyncnet.nim b/lib/pure/asyncnet.nim
index 374ac77e3..db6f80b06 100644
--- a/lib/pure/asyncnet.nim
+++ b/lib/pure/asyncnet.nim
@@ -140,7 +140,7 @@ proc acceptAddr*(socket: PAsyncSocket):
   ## Accepts a new connection. Returns a future containing the client socket
   ## corresponding to that connection and the remote address of the client.
   ## The future will complete when the connection is successfully accepted.
-  var retFuture = newFuture[tuple[address: string, client: PAsyncSocket]]()
+  var retFuture = newFuture[tuple[address: string, client: PAsyncSocket]]("asyncnet.acceptAddr")
   var fut = acceptAddr(socket.fd.TAsyncFD)
   fut.callback =
     proc (future: PFuture[tuple[address: string, client: TAsyncFD]]) =
@@ -157,7 +157,7 @@ proc accept*(socket: PAsyncSocket): PFuture[PAsyncSocket] =
   ## Accepts a new connection. Returns a future containing the client socket
   ## corresponding to that connection.
   ## The future will complete when the connection is successfully accepted.
-  var retFut = newFuture[PAsyncSocket]()
+  var retFut = newFuture[PAsyncSocket]("asyncnet.accept")
   var fut = acceptAddr(socket)
   fut.callback =
     proc (future: PFuture[tuple[address: string, client: PAsyncSocket]]) =