summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--lib/pure/asyncio2.nim320
1 files changed, 270 insertions, 50 deletions
diff --git a/lib/pure/asyncio2.nim b/lib/pure/asyncio2.nim
index 606e4b349..1cf2292de 100644
--- a/lib/pure/asyncio2.nim
+++ b/lib/pure/asyncio2.nim
@@ -7,7 +7,7 @@
 #    distribution, for details about the copyright.
 #
 
-import os, oids, tables, strutils
+import os, oids, tables, strutils, macros
 
 import winlean
 
@@ -38,7 +38,7 @@ proc newFuture*[T](): PFuture[T] =
 
 proc complete*[T](future: PFuture[T], val: T) =
   ## Completes ``future`` with value ``val``.
-  assert(not future.finished)
+  assert(not future.finished, "Future already finished, cannot finish twice.")
   assert(future.error == nil)
   future.value = val
   future.finished = true
@@ -47,7 +47,7 @@ proc complete*[T](future: PFuture[T], val: T) =
 
 proc fail*[T](future: PFuture[T], error: ref EBase) =
   ## Completes ``future`` with ``error``.
-  assert(not future.finished)
+  assert(not future.finished, "Future already finished, cannot finish twice.")
   future.finished = true
   future.error = error
   if future.cb != nil:
@@ -140,6 +140,7 @@ when defined(windows):
     # TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html
     var customOverlapped = cast[PCustomOverlapped](lpOverlapped)
     if res:
+      # This is useful for ensuring the reliability of the overlapped struct.
       assert customOverlapped.data.sock == lpCompletionKey.TSocketHandle
 
       customOverlapped.data.cb(customOverlapped.data.sock, TOSErrorCode(-1))
@@ -148,8 +149,8 @@ when defined(windows):
       let errCode = OSLastError()
       if lpOverlapped != nil:
         assert customOverlapped.data.sock == lpCompletionKey.TSocketHandle
-        dealloc(customOverlapped)
         customOverlapped.data.cb(customOverlapped.data.sock, errCode)
+        dealloc(customOverlapped)
       else:
         if errCode.int32 == WAIT_TIMEOUT:
           # Timed out
@@ -248,10 +249,11 @@ when defined(windows):
       var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped)))
       ol.data = TCompletionData(sock: socket, cb:
         proc (sock: TSocketHandle, errcode: TOSErrorCode) =
-          if errcode == TOSErrorCode(-1):
-            retFuture.complete(0)
-          else:
-            retFuture.fail(newException(EOS, osErrorMsg(errcode)))
+          if not retFuture.finished:
+            if errcode == TOSErrorCode(-1):
+              retFuture.complete(0)
+            else:
+              retFuture.fail(newException(EOS, osErrorMsg(errcode)))
       )
       
       var ret = connectEx(socket, it.ai_addr, sizeof(TSockAddrIn).cint,
@@ -260,7 +262,9 @@ when defined(windows):
         # Request to connect completed immediately.
         success = true
         retFuture.complete(0)
-        dealloc(ol)
+        # We don't deallocate ``ol`` here because even though this completed
+        # immediately poll will still be notified about its completion and it will
+        # free ``ol``.
         break
       else:
         lastError = OSLastError()
@@ -278,7 +282,8 @@ when defined(windows):
       retFuture.fail(newException(EOS, osErrorMsg(lastError)))
     return retFuture
 
-  proc recv*(p: PDispatcher, socket: TSocketHandle, size: int): PFuture[string] =
+  proc recv*(p: PDispatcher, socket: TSocketHandle, size: int,
+             flags: int = 0): PFuture[string] =
     ## Reads ``size`` bytes from ``socket``. Returned future will complete once
     ## all of the requested data is read.
 
@@ -288,31 +293,42 @@ when defined(windows):
     dataBuf.buf = newString(size)
     dataBuf.len = size
     
-    var bytesReceived, flags: DWord
+    var bytesReceived: DWord
+    var flagsio = flags.dword
     var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped)))
     ol.data = TCompletionData(sock: socket, cb:
       proc (sock: TSocketHandle, errcode: TOSErrorCode) =
-        if errcode == TOSErrorCode(-1):
-          var data = newString(size)
-          copyMem(addr data[0], addr dataBuf.buf[0], size)
-          retFuture.complete($data)
-        else:
-          retFuture.fail(newException(EOS, osErrorMsg(errcode)))
+        if not retFuture.finished:
+          if errcode == TOSErrorCode(-1):
+            var data = newString(size)
+            copyMem(addr data[0], addr dataBuf.buf[0], size)
+            retFuture.complete($data)
+          else:
+            retFuture.fail(newException(EOS, osErrorMsg(errcode)))
     )
     
     let ret = WSARecv(socket, addr dataBuf, 1, addr bytesReceived,
-                      addr flags, cast[POverlapped](ol), nil)
+                      addr flagsio, cast[POverlapped](ol), nil)
     if ret == -1:
       let err = OSLastError()
       if err.int32 != ERROR_IO_PENDING:
         retFuture.fail(newException(EOS, osErrorMsg(err)))
         dealloc(ol)
+    elif ret == 0 and bytesReceived == 0:
+      # Disconnected
+      retFuture.complete("")
+      # TODO: "For message-oriented sockets, where a zero byte message is often 
+      # allowable, a failure with an error code of WSAEDISCON is used to 
+      # indicate graceful closure." 
+      # ~ http://msdn.microsoft.com/en-us/library/ms741688%28v=vs.85%29.aspx
     else:
       # Request to read completed immediately.
       var data = newString(size)
       copyMem(addr data[0], addr dataBuf.buf[0], size)
       retFuture.complete($data)
-      dealloc(ol)
+      # We don't deallocate ``ol`` here because even though this completed
+      # immediately poll will still be notified about its completion and it will
+      # free ``ol``.
     return retFuture
 
   proc send*(p: PDispatcher, socket: TSocketHandle, data: string): PFuture[int] =
@@ -328,10 +344,11 @@ when defined(windows):
     var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped)))
     ol.data = TCompletionData(sock: socket, cb:
       proc (sock: TSocketHandle, errcode: TOSErrorCode) =
-        if errcode == TOSErrorCode(-1):
-          retFuture.complete(0)
-        else:
-          retFuture.fail(newException(EOS, osErrorMsg(errcode)))
+        if not retFuture.finished:
+          if errcode == TOSErrorCode(-1):
+            retFuture.complete(0)
+          else:
+            retFuture.fail(newException(EOS, osErrorMsg(errcode)))
     )
 
     let ret = WSASend(socket, addr dataBuf, 1, addr bytesReceived,
@@ -343,7 +360,9 @@ when defined(windows):
         dealloc(ol)
     else:
       retFuture.complete(0)
-      dealloc(ol)
+      # We don't deallocate ``ol`` here because even though this completed
+      # immediately poll will still be notified about its completion and it will
+      # free ``ol``.
     return retFuture
 
   proc acceptAddr*(p: PDispatcher, socket: TSocketHandle): 
@@ -386,10 +405,11 @@ when defined(windows):
     var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped)))
     ol.data = TCompletionData(sock: socket, cb:
       proc (sock: TSocketHandle, errcode: TOSErrorCode) =
-        if errcode == TOSErrorCode(-1):
-          completeAccept()
-        else:
-          retFuture.fail(newException(EOS, osErrorMsg(errcode)))
+        if not retFuture.finished:
+          if errcode == TOSErrorCode(-1):
+            completeAccept()
+          else:
+            retFuture.fail(newException(EOS, osErrorMsg(errcode)))
     )
 
     # http://msdn.microsoft.com/en-us/library/windows/desktop/ms737524%28v=vs.85%29.aspx
@@ -406,7 +426,9 @@ when defined(windows):
         dealloc(ol)
     else:
       completeAccept()
-      dealloc(ol)
+      # We don't deallocate ``ol`` here because even though this completed
+      # immediately poll will still be notified about its completion and it will
+      # free ``ol``.
 
     return retFuture
 
@@ -429,6 +451,188 @@ when defined(windows):
 else:
   # TODO: Selectors.
 
+# -- Await Macro
+
+template createCb*(cbName, varNameIterSym, retFutureSym: expr): stmt {.immediate, dirty.} =
+  proc cbName {.closure.} =
+    if not varNameIterSym.finished:
+      var next = varNameIterSym()
+      if next == nil:
+        assert retFutureSym.finished, "Async procedure's return Future was not finished."
+      else:
+        next.callback = cbName
+
+template createVar(futSymName: string, asyncProc: PNimrodNode,
+                   valueReceiver: expr) {.immediate, dirty.} =
+  # TODO: Used template here due to bug #926
+  result = newNimNode(nnkStmtList)
+  var futSym = newIdentNode(futSymName) #genSym(nskVar, "future")
+  result.add newVarStmt(futSym, asyncProc) # -> var future<x> = y
+  result.add newNimNode(nnkYieldStmt).add(futSym) # -> yield future<x>
+  valueReceiver = newDotExpr(futSym, newIdentNode("read")) # -> future<x>.read
+
+proc processBody(node, retFutureSym: PNimrodNode): PNimrodNode {.compileTime.} =
+  case node.kind
+  of nnkReturnStmt:
+    result = newNimNode(nnkStmtList)
+    result.add newCall(newIdentNode("complete"), retFutureSym,
+      if node[0].kind == nnkEmpty: newIdentNode("result") else: node[0])
+    result.add newNimNode(nnkYieldStmt).add(newNilLit())
+  of nnkCommand:
+    result = node
+    echo(treeRepr(node))
+    if node[0].ident == !"await":
+      case node[1].kind
+      of nnkIdent, nnkCall:
+        # await x
+        # await foo(p, x)
+        result = newNimNode(nnkYieldStmt).add(node[1]) # -> yield x
+      else:
+        error("Invalid node kind in 'await', got: " & $node[1].kind)
+    elif node[1].kind == nnkIdent and node[1][0].ident == !"await":
+      # foo await x
+      var newCommand = node
+      createVar("future" & $node[0].ident, node[1][0], newCommand[1])
+      result.add newCommand
+  of nnkVarSection, nnkLetSection:
+    result = node
+    case node[0][2].kind
+    of nnkCommand:
+      if node[0][2][0].ident == !"await":
+        # var x = await y
+        var newVarSection = node # TODO: Should this use copyNimNode?
+        createVar("future" & $node[0][0].ident, node[0][2][1],
+          newVarSection[0][2])
+        result.add newVarSection
+    else: discard
+  of nnkAsgn:
+    result = node
+    case node[1].kind
+    of nnkCommand:
+      if node[1][0].ident == !"await":
+        # x = await y
+        var newAsgn = node
+        createVar("future" & $node[0].ident, node[1][1], newAsgn[1])
+        result.add newAsgn
+    else: discard
+  of nnkDiscardStmt:
+    # discard await x
+    if node[0][0].ident == !"await":
+      var dummy = newNimNode(nnkStmtList)
+      createVar("futureDiscard_" & $toStrLit(node[0][1]), node[0][1], dummy)
+  else:
+    result = node
+    for i in 0 .. <node.len:
+      result[i] = processBody(node[i], retFutureSym)
+
+macro async*(prc: stmt): stmt {.immediate.} =
+  expectKind(prc, nnkProcDef)
+
+  # Verify that the return type is a PFuture[T]
+  if prc[3][0].kind == nnkIdent:
+    error("Expected return type of 'PFuture' got '" & $prc[3][0] & "'")
+  elif prc[3][0].kind == nnkBracketExpr:
+    if $prc[3][0][0] != "PFuture":
+      error("Expected return type of 'PFuture' got '" & $prc[3][0][0] & "'")
+  
+  # TODO: Why can't I use genSym? I get illegal capture errors for Syms.
+  # TODO: It seems genSym is broken. Change all usages back to genSym when fixed
+
+  var outerProcBody = newNimNode(nnkStmtList)
+
+  # -> var retFuture = newFuture[T]()
+  var retFutureSym = newIdentNode("retFuture") #genSym(nskVar, "retFuture")
+  outerProcBody.add(
+    newVarStmt(retFutureSym, 
+      newCall(
+        newNimNode(nnkBracketExpr).add(
+          newIdentNode("newFuture"),
+          prc[3][0][1])))) # Get type from return type of this proc.
+
+  # -> iterator nameIter(): PFutureBase {.closure.} = 
+  # ->   var result: T
+  # ->   <proc_body>
+  # ->   complete(retFuture, result)
+  var iteratorNameSym = newIdentNode($prc[0].ident & "Iter") #genSym(nskIterator, $prc[0].ident & "Iter")
+  var procBody = prc[6].processBody(retFutureSym)
+  procBody.insert(0, newNimNode(nnkVarSection).add(
+    newIdentDefs(newIdentNode("result"), prc[3][0][1]))) # -> var result: T
+  procBody.add(
+    newCall(newIdentNode("complete"),
+      retFutureSym, newIdentNode("result"))) # -> complete(retFuture, result)
+  
+  var closureIterator = newProc(iteratorNameSym, [newIdentNode("PFutureBase")],
+                                procBody, nnkIteratorDef)
+  closureIterator[4] = newNimNode(nnkPragma).add(newIdentNode("closure"))
+  outerProcBody.add(closureIterator)
+
+  # -> var nameIterVar = nameIter
+  # -> var first = nameIterVar()
+  var varNameIterSym = newIdentNode($prc[0].ident & "IterVar") #genSym(nskVar, $prc[0].ident & "IterVar")
+  var varNameIter = newVarStmt(varNameIterSym, iteratorNameSym)
+  outerProcBody.add varNameIter
+  var varFirstSym = genSym(nskVar, "first")
+  var varFirst = newVarStmt(varFirstSym, newCall(varNameIterSym))
+  outerProcBody.add varFirst
+
+  # -> createCb(cb, nameIter, retFuture)
+  var cbName = newIdentNode("cb")
+  var procCb = newCall("createCb", cbName, varNameIterSym, retFutureSym)
+  outerProcBody.add procCb
+
+  # -> first.callback = cb
+  outerProcBody.add newAssignment(
+    newDotExpr(varFirstSym, newIdentNode("callback")),
+    cbName)
+
+  # -> return retFuture
+  outerProcBody.add newNimNode(nnkReturnStmt).add(retFutureSym)
+  
+  result = prc
+
+  # Remove the 'async' pragma.
+  for i in 0 .. <result[4].len:
+    if result[4][i].ident == !"async":
+      result[4].del(i)
+
+  result[6] = outerProcBody
+
+  echo(toStrLit(result))
+
+proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.} =
+  ## Reads a line of data from ``socket``.
+  ##
+  ## If a full line is read ``\r\L`` is not
+  ## added to ``line``, however if solely ``\r\L`` is read then ``line``
+  ## will be set to it.
+  ## 
+  ## If the socket is disconnected, ``line`` will be set to ``""``.
+  ##
+  ## An EOS exception will be raised in the case of a socket error.
+  ##
+  ## A timeout can be specified in miliseconds, if data is not received within
+  ## the specified time an ETimeout exception will be raised.
+  
+  template addNLIfEmpty(): stmt =
+    if result.len == 0:
+      result.add("\c\L")
+
+  result = ""
+  var c = ""
+  while true:
+    c = await p.recv(socket, 1)
+    if c.len == 0:
+      return
+    if c == "\r":
+      c = await p.recv(socket, 1, MSG_PEEK)
+      if c.len > 0 and c == "\L":
+        discard await p.recv(socket, 1)
+      addNLIfEmpty()
+      return
+    elif c == "\L": 
+      addNLIfEmpty()
+      return
+    add(result.string, c)
 
 when isMainModule:
   
@@ -437,39 +641,55 @@ when isMainModule:
   #sock.setBlocking false
   p.register(sock)
 
+
   when true:
+    # Await tests
+    proc main(p: PDispatcher): PFuture[int] {.async.} =
+      discard await p.connect(sock, "localhost", TPort(6667))
+      while true:
+        var line = await p.recvLine(sock)
+        echo("Line is: ", line.repr)
+        if line == "":
+          echo "Disconnected"
+          break
+        
 
-    var f = p.connect(sock, "irc.freenode.org", TPort(6667))
-    f.callback =
-      proc (future: PFuture[int]) =
-        echo("Connected in future!")
-        echo(future.read)
-        for i in 0 .. 50:
-          var recvF = p.recv(sock, 10)
-          recvF.callback =
-            proc (future: PFuture[string]) =
-              echo("Read: ", future.read)
+    var f = main(p)
+    
 
   else:
+    when true:
 
-    sock.bindAddr(TPort(6667))
-    sock.listen()
-    proc onAccept(future: PFuture[TSocketHandle]) =
-      echo "Accepted"
-      var t = p.send(future.read, "test\c\L")
-      t.callback =
+      var f = p.connect(sock, "irc.freenode.org", TPort(6667))
+      f.callback =
         proc (future: PFuture[int]) =
+          echo("Connected in future!")
           echo(future.read)
-      
+          for i in 0 .. 50:
+            var recvF = p.recv(sock, 10)
+            recvF.callback =
+              proc (future: PFuture[string]) =
+                echo("Read: ", future.read)
+
+    else:
+
+      sock.bindAddr(TPort(6667))
+      sock.listen()
+      proc onAccept(future: PFuture[TSocketHandle]) =
+        echo "Accepted"
+        var t = p.send(future.read, "test\c\L")
+        t.callback =
+          proc (future: PFuture[int]) =
+            echo(future.read)
+        
+        var f = p.accept(sock)
+        f.callback = onAccept
+        
       var f = p.accept(sock)
       f.callback = onAccept
-      
-    var f = p.accept(sock)
-    f.callback = onAccept
   
   while true:
     p.poll()
-    echo "polled"