diff options
author | Araq <rumpf_a@web.de> | 2014-03-26 20:31:29 +0100 |
---|---|---|
committer | Araq <rumpf_a@web.de> | 2014-03-26 20:31:29 +0100 |
commit | a1e97ad4a4fef8004d833853828a4ea7da3958c3 (patch) | |
tree | 4bdc59f04a1fc64d59296dc444e66589b475f4f1 | |
parent | 78cc4de9a1d8db66d997e5ba02c6ce208637d70c (diff) | |
parent | 783087fd57e265c06e5081073391058c0e28864f (diff) | |
download | Nim-a1e97ad4a4fef8004d833853828a4ea7da3958c3.tar.gz |
Merge branch 'devel' of https://github.com/Araq/Nimrod into devel
-rw-r--r-- | compiler/semdata.nim | 2 | ||||
-rw-r--r-- | lib/pure/asyncdispatch.nim | 85 | ||||
-rw-r--r-- | lib/pure/asyncnet.nim | 108 | ||||
-rw-r--r-- | lib/pure/httpclient.nim | 239 | ||||
-rw-r--r-- | lib/pure/net.nim | 32 | ||||
-rw-r--r-- | tests/async/tasyncawait.nim | 2 |
6 files changed, 310 insertions, 158 deletions
diff --git a/compiler/semdata.nim b/compiler/semdata.nim index 5ec66b51c..b46d83a92 100644 --- a/compiler/semdata.nim +++ b/compiler/semdata.nim @@ -258,7 +258,7 @@ proc nMinusOne*(n: PNode): PNode = # Remember to fix the procs below this one when you make changes! proc makeRangeWithStaticExpr*(c: PContext, n: PNode): PType = - let intType = getSysType tyInt + let intType = getSysType(tyInt) result = newTypeS(tyRange, c) result.sons = @[intType] result.n = newNode(nkRange, n.info, @[ diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index 4a4ef8bdd..72d2fc4e4 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -766,7 +766,7 @@ proc processBody(node, retFutureSym: PNimrodNode): PNimrodNode {.compileTime.} = if node[0].kind == nnkEmpty: newIdentNode("result") else: node[0]) result.add newNimNode(nnkYieldStmt).add(newNilLit()) of nnkCommand: - if node[0].ident == !"await": + if node[0].kind == nnkIdent and node[0].ident == !"await": case node[1].kind of nnkIdent: # await x @@ -782,7 +782,7 @@ proc processBody(node, retFutureSym: PNimrodNode): PNimrodNode {.compileTime.} = node[1][0].ident == !"await": # foo await x var newCommand = node - createVar("future" & $node[0].ident, node[1][0], newCommand[1]) + createVar("future" & $node[0].toStrLit, node[1][1], newCommand[1]) result.add newCommand of nnkVarSection, nnkLetSection: @@ -801,7 +801,7 @@ proc processBody(node, retFutureSym: PNimrodNode): PNimrodNode {.compileTime.} = if node[1][0].ident == !"await": # x = await y var newAsgn = node - createVar("future" & $node[0].ident, node[1][1], newAsgn[1]) + createVar("future" & $node[0].toStrLit, node[1][1], newAsgn[1]) result.add newAsgn else: discard of nnkDiscardStmt: @@ -857,7 +857,7 @@ macro async*(prc: stmt): stmt {.immediate.} = newNimNode(nnkBracketExpr).add( newIdentNode(!"newFuture"), # TODO: Strange bug here? Remove the `!`. newIdentNode(subtypeName))))) # Get type from return type of this proc - echo(treeRepr(outerProcBody)) + # -> iterator nameIter(): PFutureBase {.closure.} = # -> var result: T # -> <proc_body> @@ -957,80 +957,3 @@ proc runForever*() = ## Begins a never ending global dispatcher poll loop. while true: poll() - -when isMainModule: - - var p = newDispatcher() - var sock = p.socket() - sock.setBlocking false - - - when false: - # Await tests - proc main(p: PDispatcher): PFuture[int] {.async.} = - discard await p.connect(sock, "irc.freenode.net", TPort(6667)) - while true: - echo("recvLine") - var line = await p.recvLine(sock) - echo("Line is: ", line.repr) - if line == "": - echo "Disconnected" - break - - proc peekTest(p: PDispatcher): PFuture[int] {.async.} = - discard await p.connect(sock, "localhost", TPort(6667)) - while true: - var line = await p.recv(sock, 1, MSG_PEEK) - var line2 = await p.recv(sock, 1) - echo(line.repr) - echo(line2.repr) - echo("---") - if line2 == "": break - sleep(500) - - var f = main(p) - - - else: - when false: - - var f = p.connect(sock, "irc.poop.nl", TPort(6667)) - f.callback = - proc (future: PFuture[int]) = - echo("Connected in future!") - echo(future.read) - for i in 0 .. 50: - var recvF = p.recv(sock, 10) - recvF.callback = - proc (future: PFuture[string]) = - echo("Read ", future.read.len, ": ", future.read.repr) - - else: - - sock.bindAddr(TPort(6667)) - sock.listen() - proc onAccept(future: PFuture[TSocketHandle]) = - let client = future.read - echo "Accepted ", client.cint - var t = p.send(client, "test\c\L") - t.callback = - proc (future: PFuture[int]) = - echo("Send: ", future.read) - client.close() - - var f = p.accept(sock) - f.callback = onAccept - - var f = p.accept(sock) - f.callback = onAccept - - while true: - p.poll() - - - - - - - - diff --git a/lib/pure/asyncnet.nim b/lib/pure/asyncnet.nim index 451ef25ac..24651b08c 100644 --- a/lib/pure/asyncnet.nim +++ b/lib/pure/asyncnet.nim @@ -14,32 +14,17 @@ when defined(ssl): import openssl type - TAsyncSocket = object ## socket type - fd: TAsyncFD - case isBuffered: bool # determines whether this socket is buffered. - of true: - buffer: array[0..BufferSize, char] - currPos: int # current index in buffer - bufLen: int # current length of buffer - of false: nil - when defined(ssl): - case isSsl: bool - of true: - sslHandle: PSSL - sslContext: PSSLContext - sslNoHandshake: bool # True if needs handshake. - sslHasPeekChar: bool - sslPeekChar: char - of false: nil - + # TODO: I would prefer to just do: + # PAsyncSocket* {.borrow: `.`.} = distinct PSocket. But that doesn't work. + TAsyncSocket {.borrow: `.`.} = distinct TSocketImpl PAsyncSocket* = ref TAsyncSocket # TODO: Save AF, domain etc info and reuse it in procs which need it like connect. proc newSocket(fd: TAsyncFD, isBuff: bool): PAsyncSocket = assert fd != osInvalidSocket.TAsyncFD - new(result) - result.fd = fd + new(result.PSocket) + result.fd = fd.TSocketHandle result.isBuffered = isBuff if isBuff: result.currPos = 0 @@ -55,7 +40,7 @@ proc connect*(socket: PAsyncSocket, address: string, port: TPort, ## ## Returns a ``PFuture`` which will complete when the connection succeeds ## or an error occurs. - result = connect(socket.fd, address, port, af) + result = connect(socket.fd.TAsyncFD, address, port, af) proc recv*(socket: PAsyncSocket, size: int, flags: int = 0): PFuture[string] = @@ -64,12 +49,12 @@ proc recv*(socket: PAsyncSocket, size: int, ## recv operation then the future may complete with only a part of the ## requested data read. If socket is disconnected and no data is available ## to be read then the future will complete with a value of ``""``. - result = recv(socket.fd, size, flags) + result = recv(socket.fd.TAsyncFD, size, flags) proc send*(socket: PAsyncSocket, data: string): PFuture[void] = ## Sends ``data`` to ``socket``. The returned future will complete once all ## data has been sent. - result = send(socket.fd, data) + result = send(socket.fd.TAsyncFD, data) proc acceptAddr*(socket: PAsyncSocket): PFuture[tuple[address: string, client: PAsyncSocket]] = @@ -77,7 +62,7 @@ proc acceptAddr*(socket: PAsyncSocket): ## corresponding to that connection and the remote address of the client. ## The future will complete when the connection is successfully accepted. var retFuture = newFuture[tuple[address: string, client: PAsyncSocket]]() - var fut = acceptAddr(socket.fd) + var fut = acceptAddr(socket.fd.TAsyncFD) fut.callback = proc (future: PFuture[tuple[address: string, client: TAsyncFD]]) = assert future.finished @@ -139,17 +124,72 @@ proc recvLine*(socket: PAsyncSocket): PFuture[string] {.async.} = return add(result.string, c) +proc bindAddr*(socket: PAsyncSocket, port = TPort(0), address = "") = + ## Binds ``address``:``port`` to the socket. + ## + ## If ``address`` is "" then ADDR_ANY will be bound. + socket.PSocket.bindAddr(port, address) + +proc listen*(socket: PAsyncSocket, backlog = SOMAXCONN) = + ## Marks ``socket`` as accepting connections. + ## ``Backlog`` specifies the maximum length of the + ## queue of pending connections. + ## + ## Raises an EOS error upon failure. + socket.PSocket.listen(backlog) + +proc close*(socket: PAsyncSocket) = + ## Closes the socket. + socket.fd.TAsyncFD.close() + # TODO SSL + when isMainModule: - proc main() {.async.} = + type + TestCases = enum + HighClient, LowClient, LowServer + + const test = LowServer + + when test == HighClient: + proc main() {.async.} = + var sock = newAsyncSocket() + await sock.connect("irc.freenode.net", TPort(6667)) + while true: + let line = await sock.recvLine() + if line == "": + echo("Disconnected") + break + else: + echo("Got line: ", line) + main() + elif test == LowClient: var sock = newAsyncSocket() - await sock.connect("irc.freenode.net", TPort(6667)) - while true: - let line = await sock.recvLine() - if line == "": - echo("Disconnected") - break - else: - echo("Got line: ", line) - main() + var f = connect(sock, "irc.freenode.net", TPort(6667)) + f.callback = + proc (future: PFuture[void]) = + echo("Connected in future!") + for i in 0 .. 50: + var recvF = recv(sock, 10) + recvF.callback = + proc (future: PFuture[string]) = + echo("Read ", future.read.len, ": ", future.read.repr) + elif test == LowServer: + var sock = newAsyncSocket() + sock.bindAddr(TPort(6667)) + sock.listen() + proc onAccept(future: PFuture[PAsyncSocket]) = + let client = future.read + echo "Accepted ", client.fd.cint + var t = send(client, "test\c\L") + t.callback = + proc (future: PFuture[void]) = + echo("Send") + client.close() + + var f = accept(sock) + f.callback = onAccept + + var f = accept(sock) + f.callback = onAccept runForever() diff --git a/lib/pure/httpclient.nim b/lib/pure/httpclient.nim index bb9835fe7..62d9bea7c 100644 --- a/lib/pure/httpclient.nim +++ b/lib/pure/httpclient.nim @@ -76,6 +76,8 @@ ## currently only basic authentication is supported. import sockets, strutils, parseurl, parseutils, strtabs, base64 +import asyncnet, asyncdispatch +import rawsockets type TResponse* = tuple[ @@ -286,16 +288,16 @@ proc request*(url: string, httpMethod = httpGET, extraHeaders = "", add(headers, "\c\L") var s = socket() - var port = TPort(80) + var port = sockets.TPort(80) if r.scheme == "https": when defined(ssl): sslContext.wrapSocket(s) - port = TPort(443) + port = sockets.TPort(443) else: raise newException(EHttpRequestErr, "SSL support is not available. Cannot connect over SSL.") if r.port != "": - port = TPort(r.port.parseInt) + port = sockets.TPort(r.port.parseInt) if timeout == -1: s.connect(r.hostname, port) @@ -413,28 +415,217 @@ proc downloadFile*(url: string, outputFilename: string, else: fileError("Unable to open file") +proc generateHeaders(r: TURL, httpMethod: THttpMethod, + headers: PStringTable): string = + result = substr($httpMethod, len("http")) + # TODO: Proxies + result.add(" /" & r.path & r.query) + result.add(" HTTP/1.1\c\L") -when isMainModule: - #downloadFile("http://force7.de/nimrod/index.html", "nimrodindex.html") - #downloadFile("http://www.httpwatch.com/", "ChunkTest.html") - #downloadFile("http://validator.w3.org/check?uri=http%3A%2F%2Fgoogle.com", - # "validator.html") + add(result, "Host: " & r.hostname & "\c\L") + add(result, "Connection: Keep-Alive\c\L") + for key, val in headers: + add(result, key & ": " & val & "\c\L") + + add(result, "\c\L") + +type + PAsyncHttpClient = ref object + socket: PAsyncSocket + connected: bool + currentURL: TURL ## Where we are currently connected. + headers: PStringTable + userAgent: string + +proc newAsyncHttpClient*(): PAsyncHttpClient = + new result + result.socket = newAsyncSocket() + result.headers = newStringTable(modeCaseInsensitive) + result.userAgent = defUserAgent + +proc parseChunks(client: PAsyncHttpClient): PFuture[string] {.async.} = + result = "" + var ri = 0 + while true: + var chunkSize = 0 + var chunkSizeStr = await client.socket.recvLine() + var i = 0 + if chunkSizeStr == "": + httpError("Server terminated connection prematurely") + while true: + case chunkSizeStr[i] + of '0'..'9': + chunkSize = chunkSize shl 4 or (ord(chunkSizeStr[i]) - ord('0')) + of 'a'..'f': + chunkSize = chunkSize shl 4 or (ord(chunkSizeStr[i]) - ord('a') + 10) + of 'A'..'F': + chunkSize = chunkSize shl 4 or (ord(chunkSizeStr[i]) - ord('A') + 10) + of '\0': + break + of ';': + # http://tools.ietf.org/html/rfc2616#section-3.6.1 + # We don't care about chunk-extensions. + break + else: + httpError("Invalid chunk size: " & chunkSizeStr) + inc(i) + if chunkSize <= 0: break + result.add await recv(client.socket, chunkSize) + discard await recv(client.socket, 2) # Skip \c\L + # Trailer headers will only be sent if the request specifies that we want + # them: http://tools.ietf.org/html/rfc2616#section-3.6.1 + +proc parseBody(client: PAsyncHttpClient, + headers: PStringTable): PFuture[string] {.async.} = + result = "" + if headers["Transfer-Encoding"] == "chunked": + result = await parseChunks(client) + else: + # -REGION- Content-Length + # (http://tools.ietf.org/html/rfc2616#section-4.4) NR.3 + var contentLengthHeader = headers["Content-Length"] + if contentLengthHeader != "": + var length = contentLengthHeader.parseint() + result = await client.socket.recv(length) + if result == "": + httpError("Got disconnected while trying to recv body.") + else: + # (http://tools.ietf.org/html/rfc2616#section-4.4) NR.4 TODO + + # -REGION- Connection: Close + # (http://tools.ietf.org/html/rfc2616#section-4.4) NR.5 + if headers["Connection"] == "close": + var buf = "" + while True: + buf = await client.socket.recv(4000) + if buf == "": break + result.add(buf) + +proc parseResponse(client: PAsyncHttpClient, + getBody: bool): PFuture[TResponse] {.async.} = + var parsedStatus = false + var linei = 0 + var fullyRead = false + var line = "" + result.headers = newStringTable(modeCaseInsensitive) + while True: + linei = 0 + line = await client.socket.recvLine() + if line == "": break # We've been disconnected. + if line == "\c\L": + fullyRead = true + break + if not parsedStatus: + # Parse HTTP version info and status code. + var le = skipIgnoreCase(line, "HTTP/", linei) + if le <= 0: httpError("invalid http version") + inc(linei, le) + le = skipIgnoreCase(line, "1.1", linei) + if le > 0: result.version = "1.1" + else: + le = skipIgnoreCase(line, "1.0", linei) + if le <= 0: httpError("unsupported http version") + result.version = "1.0" + inc(linei, le) + # Status code + linei.inc skipWhitespace(line, linei) + result.status = line[linei .. -1] + parsedStatus = true + else: + # Parse headers + var name = "" + var le = parseUntil(line, name, ':', linei) + if le <= 0: httpError("invalid headers") + inc(linei, le) + if line[linei] != ':': httpError("invalid headers") + inc(linei) # Skip : + + result.headers[name] = line[linei.. -1].strip() + if not fullyRead: + httpError("Connection was closed before full request has been made") + if getBody: + result.body = await parseBody(client, result.headers) + else: + result.body = "" + +proc close*(client: PAsyncHttpClient) = + ## Closes any connections held by the HttpClient. + if client.connected: + client.socket.close() + client.connected = false + #client.socket = newAsyncSocket() + +proc newConnection(client: PAsyncHttpClient, url: TURL) {.async.} = + if not client.connected or client.currentURL.hostname != url.hostname or + client.currentURL.scheme != url.scheme: + if client.connected: client.close() + if url.scheme == "https": + assert false, "TODO SSL" - #var r = get("http://validator.w3.org/check?uri=http%3A%2F%2Fgoogle.com& - # charset=%28detect+automatically%29&doctype=Inline&group=0") + # TODO: I should be able to write 'net.TPort' here... + let port = + if url.port == "": rawsockets.TPort(80) + else: rawsockets.TPort(url.port.parseInt) + + await client.socket.connect(url.hostname, port) + client.currentURL = url + +proc request*(client: PAsyncHttpClient, url: string, httpMethod = httpGET, + body = ""): PFuture[TResponse] {.async.} = + let r = parseUrl(url) + await newConnection(client, r) + + if not client.headers.hasKey("user-agent") and client.userAgent != "": + client.headers["User-Agent"] = client.userAgent - var headers: string = "Content-Type: multipart/form-data; boundary=xyz\c\L" - var body: string = "--xyz\c\L" - # soap 1.2 output - body.add("Content-Disposition: form-data; name=\"output\"\c\L") - body.add("\c\Lsoap12\c\L") + var headers = generateHeaders(r, httpMethod, client.headers) + + await client.socket.send(headers) + if body != "": + await client.socket.send(body) - # html - body.add("--xyz\c\L") - body.add("Content-Disposition: form-data; name=\"uploaded_file\";" & - " filename=\"test.html\"\c\L") - body.add("Content-Type: text/html\c\L") - body.add("\c\L<html><head></head><body><p>test</p></body></html>\c\L") - body.add("--xyz--") - - echo(postContent("http://validator.w3.org/check", headers, body)) + result = await parseResponse(client, httpMethod != httpHEAD) + +when isMainModule: + when true: + # Async + proc main() {.async.} = + var client = newAsyncHttpClient() + var resp = await client.request("http://picheta.me") + + echo("Got response: ", resp.status) + echo("Body:\n") + echo(resp.body) + + #var resp1 = await client.request("http://freenode.net") + #echo("Got response: ", resp1.status) + + var resp2 = await client.request("http://picheta.me/aasfasgf.html") + echo("Got response: ", resp2.status) + main() + runForever() + + else: + #downloadFile("http://force7.de/nimrod/index.html", "nimrodindex.html") + #downloadFile("http://www.httpwatch.com/", "ChunkTest.html") + #downloadFile("http://validator.w3.org/check?uri=http%3A%2F%2Fgoogle.com", + # "validator.html") + + #var r = get("http://validator.w3.org/check?uri=http%3A%2F%2Fgoogle.com& + # charset=%28detect+automatically%29&doctype=Inline&group=0") + + var headers: string = "Content-Type: multipart/form-data; boundary=xyz\c\L" + var body: string = "--xyz\c\L" + # soap 1.2 output + body.add("Content-Disposition: form-data; name=\"output\"\c\L") + body.add("\c\Lsoap12\c\L") + + # html + body.add("--xyz\c\L") + body.add("Content-Disposition: form-data; name=\"uploaded_file\";" & + " filename=\"test.html\"\c\L") + body.add("Content-Type: text/html\c\L") + body.add("\c\L<html><head></head><body><p>test</p></body></html>\c\L") + body.add("--xyz--") + + echo(postContent("http://validator.w3.org/check", headers, body)) diff --git a/lib/pure/net.nim b/lib/pure/net.nim index 2461ece1b..4afb5c6ab 100644 --- a/lib/pure/net.nim +++ b/lib/pure/net.nim @@ -11,7 +11,7 @@ {.deadCodeElim: on.} import rawsockets, os, strutils, unsigned, parseutils, times - +export TPort type IpAddressFamily* {.pure.} = enum ## Describes the type of an IP address IPv6, ## IPv6 address @@ -318,22 +318,22 @@ const BufferSize*: int = 4000 ## size of a buffered socket's buffer type - TSocketImpl = object ## socket type - fd: TSocketHandle - case isBuffered: bool # determines whether this socket is buffered. + TSocketImpl* = object ## socket type + fd*: TSocketHandle + case isBuffered*: bool # determines whether this socket is buffered. of true: - buffer: array[0..BufferSize, char] - currPos: int # current index in buffer - bufLen: int # current length of buffer + buffer*: array[0..BufferSize, char] + currPos*: int # current index in buffer + bufLen*: int # current length of buffer of false: nil when defined(ssl): - case isSsl: bool + case isSsl*: bool of true: - sslHandle: PSSL - sslContext: PSSLContext - sslNoHandshake: bool # True if needs handshake. - sslHasPeekChar: bool - sslPeekChar: char + sslHandle*: PSSL + sslContext*: PSSLContext + sslNoHandshake*: bool # True if needs handshake. + sslHasPeekChar*: bool + sslPeekChar*: char of false: nil PSocket* = ref TSocketImpl @@ -519,9 +519,9 @@ proc listen*(socket: PSocket, backlog = SOMAXCONN) {.tags: [FReadIO].} = proc bindAddr*(socket: PSocket, port = TPort(0), address = "") {. tags: [FReadIO].} = - ## Binds an address/port number to a socket. - ## Use address string in dotted decimal form like "a.b.c.d" - ## or leave "" for any address. + ## Binds ``address``:``port`` to the socket. + ## + ## If ``address`` is "" then ADDR_ANY will be bound. if address == "": var name: TSockaddr_in diff --git a/tests/async/tasyncawait.nim b/tests/async/tasyncawait.nim index 7e6270247..bd722842f 100644 --- a/tests/async/tasyncawait.nim +++ b/tests/async/tasyncawait.nim @@ -21,7 +21,6 @@ proc launchSwarm(port: TPort) {.async.} = for i in 0 .. <swarmSize: var sock = newAsyncRawSocket() - #disp.register(sock) await connect(sock, "localhost", port) when true: await sendMessages(sock) @@ -48,7 +47,6 @@ proc readMessages(client: TAsyncFD) {.async.} = proc createServer(port: TPort) {.async.} = var server = newAsyncRawSocket() - #disp.register(server) block: var name: TSockaddr_in when defined(windows): |