diff options
-rw-r--r-- | lib/pure/asyncio2.nim | 320 |
1 files changed, 270 insertions, 50 deletions
diff --git a/lib/pure/asyncio2.nim b/lib/pure/asyncio2.nim index 606e4b349..1cf2292de 100644 --- a/lib/pure/asyncio2.nim +++ b/lib/pure/asyncio2.nim @@ -7,7 +7,7 @@ # distribution, for details about the copyright. # -import os, oids, tables, strutils +import os, oids, tables, strutils, macros import winlean @@ -38,7 +38,7 @@ proc newFuture*[T](): PFuture[T] = proc complete*[T](future: PFuture[T], val: T) = ## Completes ``future`` with value ``val``. - assert(not future.finished) + assert(not future.finished, "Future already finished, cannot finish twice.") assert(future.error == nil) future.value = val future.finished = true @@ -47,7 +47,7 @@ proc complete*[T](future: PFuture[T], val: T) = proc fail*[T](future: PFuture[T], error: ref EBase) = ## Completes ``future`` with ``error``. - assert(not future.finished) + assert(not future.finished, "Future already finished, cannot finish twice.") future.finished = true future.error = error if future.cb != nil: @@ -140,6 +140,7 @@ when defined(windows): # TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html var customOverlapped = cast[PCustomOverlapped](lpOverlapped) if res: + # This is useful for ensuring the reliability of the overlapped struct. assert customOverlapped.data.sock == lpCompletionKey.TSocketHandle customOverlapped.data.cb(customOverlapped.data.sock, TOSErrorCode(-1)) @@ -148,8 +149,8 @@ when defined(windows): let errCode = OSLastError() if lpOverlapped != nil: assert customOverlapped.data.sock == lpCompletionKey.TSocketHandle - dealloc(customOverlapped) customOverlapped.data.cb(customOverlapped.data.sock, errCode) + dealloc(customOverlapped) else: if errCode.int32 == WAIT_TIMEOUT: # Timed out @@ -248,10 +249,11 @@ when defined(windows): var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped))) ol.data = TCompletionData(sock: socket, cb: proc (sock: TSocketHandle, errcode: TOSErrorCode) = - if errcode == TOSErrorCode(-1): - retFuture.complete(0) - else: - retFuture.fail(newException(EOS, osErrorMsg(errcode))) + if not retFuture.finished: + if errcode == TOSErrorCode(-1): + retFuture.complete(0) + else: + retFuture.fail(newException(EOS, osErrorMsg(errcode))) ) var ret = connectEx(socket, it.ai_addr, sizeof(TSockAddrIn).cint, @@ -260,7 +262,9 @@ when defined(windows): # Request to connect completed immediately. success = true retFuture.complete(0) - dealloc(ol) + # We don't deallocate ``ol`` here because even though this completed + # immediately poll will still be notified about its completion and it will + # free ``ol``. break else: lastError = OSLastError() @@ -278,7 +282,8 @@ when defined(windows): retFuture.fail(newException(EOS, osErrorMsg(lastError))) return retFuture - proc recv*(p: PDispatcher, socket: TSocketHandle, size: int): PFuture[string] = + proc recv*(p: PDispatcher, socket: TSocketHandle, size: int, + flags: int = 0): PFuture[string] = ## Reads ``size`` bytes from ``socket``. Returned future will complete once ## all of the requested data is read. @@ -288,31 +293,42 @@ when defined(windows): dataBuf.buf = newString(size) dataBuf.len = size - var bytesReceived, flags: DWord + var bytesReceived: DWord + var flagsio = flags.dword var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped))) ol.data = TCompletionData(sock: socket, cb: proc (sock: TSocketHandle, errcode: TOSErrorCode) = - if errcode == TOSErrorCode(-1): - var data = newString(size) - copyMem(addr data[0], addr dataBuf.buf[0], size) - retFuture.complete($data) - else: - retFuture.fail(newException(EOS, osErrorMsg(errcode))) + if not retFuture.finished: + if errcode == TOSErrorCode(-1): + var data = newString(size) + copyMem(addr data[0], addr dataBuf.buf[0], size) + retFuture.complete($data) + else: + retFuture.fail(newException(EOS, osErrorMsg(errcode))) ) let ret = WSARecv(socket, addr dataBuf, 1, addr bytesReceived, - addr flags, cast[POverlapped](ol), nil) + addr flagsio, cast[POverlapped](ol), nil) if ret == -1: let err = OSLastError() if err.int32 != ERROR_IO_PENDING: retFuture.fail(newException(EOS, osErrorMsg(err))) dealloc(ol) + elif ret == 0 and bytesReceived == 0: + # Disconnected + retFuture.complete("") + # TODO: "For message-oriented sockets, where a zero byte message is often + # allowable, a failure with an error code of WSAEDISCON is used to + # indicate graceful closure." + # ~ http://msdn.microsoft.com/en-us/library/ms741688%28v=vs.85%29.aspx else: # Request to read completed immediately. var data = newString(size) copyMem(addr data[0], addr dataBuf.buf[0], size) retFuture.complete($data) - dealloc(ol) + # We don't deallocate ``ol`` here because even though this completed + # immediately poll will still be notified about its completion and it will + # free ``ol``. return retFuture proc send*(p: PDispatcher, socket: TSocketHandle, data: string): PFuture[int] = @@ -328,10 +344,11 @@ when defined(windows): var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped))) ol.data = TCompletionData(sock: socket, cb: proc (sock: TSocketHandle, errcode: TOSErrorCode) = - if errcode == TOSErrorCode(-1): - retFuture.complete(0) - else: - retFuture.fail(newException(EOS, osErrorMsg(errcode))) + if not retFuture.finished: + if errcode == TOSErrorCode(-1): + retFuture.complete(0) + else: + retFuture.fail(newException(EOS, osErrorMsg(errcode))) ) let ret = WSASend(socket, addr dataBuf, 1, addr bytesReceived, @@ -343,7 +360,9 @@ when defined(windows): dealloc(ol) else: retFuture.complete(0) - dealloc(ol) + # We don't deallocate ``ol`` here because even though this completed + # immediately poll will still be notified about its completion and it will + # free ``ol``. return retFuture proc acceptAddr*(p: PDispatcher, socket: TSocketHandle): @@ -386,10 +405,11 @@ when defined(windows): var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped))) ol.data = TCompletionData(sock: socket, cb: proc (sock: TSocketHandle, errcode: TOSErrorCode) = - if errcode == TOSErrorCode(-1): - completeAccept() - else: - retFuture.fail(newException(EOS, osErrorMsg(errcode))) + if not retFuture.finished: + if errcode == TOSErrorCode(-1): + completeAccept() + else: + retFuture.fail(newException(EOS, osErrorMsg(errcode))) ) # http://msdn.microsoft.com/en-us/library/windows/desktop/ms737524%28v=vs.85%29.aspx @@ -406,7 +426,9 @@ when defined(windows): dealloc(ol) else: completeAccept() - dealloc(ol) + # We don't deallocate ``ol`` here because even though this completed + # immediately poll will still be notified about its completion and it will + # free ``ol``. return retFuture @@ -429,6 +451,188 @@ when defined(windows): else: # TODO: Selectors. +# -- Await Macro + +template createCb*(cbName, varNameIterSym, retFutureSym: expr): stmt {.immediate, dirty.} = + proc cbName {.closure.} = + if not varNameIterSym.finished: + var next = varNameIterSym() + if next == nil: + assert retFutureSym.finished, "Async procedure's return Future was not finished." + else: + next.callback = cbName + +template createVar(futSymName: string, asyncProc: PNimrodNode, + valueReceiver: expr) {.immediate, dirty.} = + # TODO: Used template here due to bug #926 + result = newNimNode(nnkStmtList) + var futSym = newIdentNode(futSymName) #genSym(nskVar, "future") + result.add newVarStmt(futSym, asyncProc) # -> var future<x> = y + result.add newNimNode(nnkYieldStmt).add(futSym) # -> yield future<x> + valueReceiver = newDotExpr(futSym, newIdentNode("read")) # -> future<x>.read + +proc processBody(node, retFutureSym: PNimrodNode): PNimrodNode {.compileTime.} = + case node.kind + of nnkReturnStmt: + result = newNimNode(nnkStmtList) + result.add newCall(newIdentNode("complete"), retFutureSym, + if node[0].kind == nnkEmpty: newIdentNode("result") else: node[0]) + result.add newNimNode(nnkYieldStmt).add(newNilLit()) + of nnkCommand: + result = node + echo(treeRepr(node)) + if node[0].ident == !"await": + case node[1].kind + of nnkIdent, nnkCall: + # await x + # await foo(p, x) + result = newNimNode(nnkYieldStmt).add(node[1]) # -> yield x + else: + error("Invalid node kind in 'await', got: " & $node[1].kind) + elif node[1].kind == nnkIdent and node[1][0].ident == !"await": + # foo await x + var newCommand = node + createVar("future" & $node[0].ident, node[1][0], newCommand[1]) + result.add newCommand + of nnkVarSection, nnkLetSection: + result = node + case node[0][2].kind + of nnkCommand: + if node[0][2][0].ident == !"await": + # var x = await y + var newVarSection = node # TODO: Should this use copyNimNode? + createVar("future" & $node[0][0].ident, node[0][2][1], + newVarSection[0][2]) + result.add newVarSection + else: discard + of nnkAsgn: + result = node + case node[1].kind + of nnkCommand: + if node[1][0].ident == !"await": + # x = await y + var newAsgn = node + createVar("future" & $node[0].ident, node[1][1], newAsgn[1]) + result.add newAsgn + else: discard + of nnkDiscardStmt: + # discard await x + if node[0][0].ident == !"await": + var dummy = newNimNode(nnkStmtList) + createVar("futureDiscard_" & $toStrLit(node[0][1]), node[0][1], dummy) + else: + result = node + for i in 0 .. <node.len: + result[i] = processBody(node[i], retFutureSym) + +macro async*(prc: stmt): stmt {.immediate.} = + expectKind(prc, nnkProcDef) + + # Verify that the return type is a PFuture[T] + if prc[3][0].kind == nnkIdent: + error("Expected return type of 'PFuture' got '" & $prc[3][0] & "'") + elif prc[3][0].kind == nnkBracketExpr: + if $prc[3][0][0] != "PFuture": + error("Expected return type of 'PFuture' got '" & $prc[3][0][0] & "'") + + # TODO: Why can't I use genSym? I get illegal capture errors for Syms. + # TODO: It seems genSym is broken. Change all usages back to genSym when fixed + + var outerProcBody = newNimNode(nnkStmtList) + + # -> var retFuture = newFuture[T]() + var retFutureSym = newIdentNode("retFuture") #genSym(nskVar, "retFuture") + outerProcBody.add( + newVarStmt(retFutureSym, + newCall( + newNimNode(nnkBracketExpr).add( + newIdentNode("newFuture"), + prc[3][0][1])))) # Get type from return type of this proc. + + # -> iterator nameIter(): PFutureBase {.closure.} = + # -> var result: T + # -> <proc_body> + # -> complete(retFuture, result) + var iteratorNameSym = newIdentNode($prc[0].ident & "Iter") #genSym(nskIterator, $prc[0].ident & "Iter") + var procBody = prc[6].processBody(retFutureSym) + procBody.insert(0, newNimNode(nnkVarSection).add( + newIdentDefs(newIdentNode("result"), prc[3][0][1]))) # -> var result: T + procBody.add( + newCall(newIdentNode("complete"), + retFutureSym, newIdentNode("result"))) # -> complete(retFuture, result) + + var closureIterator = newProc(iteratorNameSym, [newIdentNode("PFutureBase")], + procBody, nnkIteratorDef) + closureIterator[4] = newNimNode(nnkPragma).add(newIdentNode("closure")) + outerProcBody.add(closureIterator) + + # -> var nameIterVar = nameIter + # -> var first = nameIterVar() + var varNameIterSym = newIdentNode($prc[0].ident & "IterVar") #genSym(nskVar, $prc[0].ident & "IterVar") + var varNameIter = newVarStmt(varNameIterSym, iteratorNameSym) + outerProcBody.add varNameIter + var varFirstSym = genSym(nskVar, "first") + var varFirst = newVarStmt(varFirstSym, newCall(varNameIterSym)) + outerProcBody.add varFirst + + # -> createCb(cb, nameIter, retFuture) + var cbName = newIdentNode("cb") + var procCb = newCall("createCb", cbName, varNameIterSym, retFutureSym) + outerProcBody.add procCb + + # -> first.callback = cb + outerProcBody.add newAssignment( + newDotExpr(varFirstSym, newIdentNode("callback")), + cbName) + + # -> return retFuture + outerProcBody.add newNimNode(nnkReturnStmt).add(retFutureSym) + + result = prc + + # Remove the 'async' pragma. + for i in 0 .. <result[4].len: + if result[4][i].ident == !"async": + result[4].del(i) + + result[6] = outerProcBody + + echo(toStrLit(result)) + +proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.} = + ## Reads a line of data from ``socket``. + ## + ## If a full line is read ``\r\L`` is not + ## added to ``line``, however if solely ``\r\L`` is read then ``line`` + ## will be set to it. + ## + ## If the socket is disconnected, ``line`` will be set to ``""``. + ## + ## An EOS exception will be raised in the case of a socket error. + ## + ## A timeout can be specified in miliseconds, if data is not received within + ## the specified time an ETimeout exception will be raised. + + template addNLIfEmpty(): stmt = + if result.len == 0: + result.add("\c\L") + + result = "" + var c = "" + while true: + c = await p.recv(socket, 1) + if c.len == 0: + return + if c == "\r": + c = await p.recv(socket, 1, MSG_PEEK) + if c.len > 0 and c == "\L": + discard await p.recv(socket, 1) + addNLIfEmpty() + return + elif c == "\L": + addNLIfEmpty() + return + add(result.string, c) when isMainModule: @@ -437,39 +641,55 @@ when isMainModule: #sock.setBlocking false p.register(sock) + when true: + # Await tests + proc main(p: PDispatcher): PFuture[int] {.async.} = + discard await p.connect(sock, "localhost", TPort(6667)) + while true: + var line = await p.recvLine(sock) + echo("Line is: ", line.repr) + if line == "": + echo "Disconnected" + break + - var f = p.connect(sock, "irc.freenode.org", TPort(6667)) - f.callback = - proc (future: PFuture[int]) = - echo("Connected in future!") - echo(future.read) - for i in 0 .. 50: - var recvF = p.recv(sock, 10) - recvF.callback = - proc (future: PFuture[string]) = - echo("Read: ", future.read) + var f = main(p) + else: + when true: - sock.bindAddr(TPort(6667)) - sock.listen() - proc onAccept(future: PFuture[TSocketHandle]) = - echo "Accepted" - var t = p.send(future.read, "test\c\L") - t.callback = + var f = p.connect(sock, "irc.freenode.org", TPort(6667)) + f.callback = proc (future: PFuture[int]) = + echo("Connected in future!") echo(future.read) - + for i in 0 .. 50: + var recvF = p.recv(sock, 10) + recvF.callback = + proc (future: PFuture[string]) = + echo("Read: ", future.read) + + else: + + sock.bindAddr(TPort(6667)) + sock.listen() + proc onAccept(future: PFuture[TSocketHandle]) = + echo "Accepted" + var t = p.send(future.read, "test\c\L") + t.callback = + proc (future: PFuture[int]) = + echo(future.read) + + var f = p.accept(sock) + f.callback = onAccept + var f = p.accept(sock) f.callback = onAccept - - var f = p.accept(sock) - f.callback = onAccept while true: p.poll() - echo "polled" |