diff options
author | Dominik Picheta <dominikpicheta@googlemail.com> | 2014-07-12 22:51:06 +0100 |
---|---|---|
committer | Dominik Picheta <dominikpicheta@googlemail.com> | 2014-07-12 22:51:06 +0100 |
commit | cf5c8a204e76ff9ed5edb6ec0ebe3b97ee90f553 (patch) | |
tree | 90d9ce26be982b12b55e2dfaf021eda13fb8e995 /lib | |
parent | c260b22fbca0e8a3be903c1c6db2027cfcf1c7f2 (diff) | |
download | Nim-cf5c8a204e76ff9ed5edb6ec0ebe3b97ee90f553.tar.gz |
Many async optimisations.
* Selectors implementation will now attempt to immediately execute an IO operation instead of waiting for a ready notification. * Removed recursion in asynchttpserver. * Improved buffered implementation of recvLine in asyncnet. * Optimised ``respond`` in asynchttpserver removing a possible "Delayed ACK" situation.
Diffstat (limited to 'lib')
-rw-r--r-- | lib/pure/asyncdispatch.nim | 30 | ||||
-rw-r--r-- | lib/pure/asynchttpserver.nim | 180 | ||||
-rw-r--r-- | lib/pure/asyncnet.nim | 74 | ||||
-rw-r--r-- | lib/pure/selectors.nim | 2 |
4 files changed, 168 insertions, 118 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index 12329951c..14667a008 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -552,7 +552,18 @@ when defined(windows) or defined(nimdoc): initAll() else: import selectors - from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK + when defined(windows): + import winlean + const + EINTR = WSAEINPROGRESS + EINPROGRESS = WSAEINPROGRESS + EWOULDBLOCK = WSAEWOULDBLOCK + EAGAIN = EINPROGRESS + MSG_NOSIGNAL = 0 + else: + from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK, + MSG_NOSIGNAL + type TAsyncFD* = distinct cint TCallback = proc (sock: TAsyncFD): bool {.closure,gcsafe.} @@ -693,12 +704,12 @@ else: proc cb(sock: TAsyncFD): bool = result = true - let res = recv(sock.TSocketHandle, addr readBuffer[0], size, + let res = recv(sock.TSocketHandle, addr readBuffer[0], size.cint, flags.cint) #echo("recv cb res: ", res) if res < 0: let lastError = osLastError() - if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: + if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: retFuture.fail(newException(EOS, osErrorMsg(lastError))) else: result = false # We still want this callback to be called. @@ -708,8 +719,8 @@ else: else: readBuffer.setLen(res) retFuture.complete(readBuffer) - - addRead(socket, cb) + if not cb(socket): + addRead(socket, cb) return retFuture proc send*(socket: TAsyncFD, data: string): PFuture[void] = @@ -721,7 +732,8 @@ else: result = true let netSize = data.len-written var d = data.cstring - let res = send(sock.TSocketHandle, addr d[written], netSize, 0.cint) + let res = send(sock.TSocketHandle, addr d[written], netSize.cint, + MSG_NOSIGNAL) if res < 0: let lastError = osLastError() if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: @@ -734,7 +746,8 @@ else: result = false # We still have data to send. else: retFuture.complete() - addWrite(socket, cb) + if not cb(socket): + addWrite(socket, cb) return retFuture proc acceptAddr*(socket: TAsyncFD): @@ -756,7 +769,8 @@ else: else: register(client.TAsyncFD) retFuture.complete(($inet_ntoa(sockAddress.sin_addr), client.TAsyncFD)) - addRead(socket, cb) + if not cb(socket): + addRead(socket, cb) return retFuture proc accept*(socket: TAsyncFD): PFuture[TAsyncFD] = diff --git a/lib/pure/asynchttpserver.nim b/lib/pure/asynchttpserver.nim index 1b47cf5f1..6273d479c 100644 --- a/lib/pure/asynchttpserver.nim +++ b/lib/pure/asynchttpserver.nim @@ -51,10 +51,15 @@ proc `==`*(protocol: tuple[orig: string, major, minor: int], proc newAsyncHttpServer*(): PAsyncHttpServer = new result -proc sendHeaders*(req: TRequest, headers: PStringTable) {.async.} = - ## Sends the specified headers to the requesting client. +proc addHeaders(msg: var string, headers: PStringTable) = for k, v in headers: - await req.client.send(k & ": " & v & "\c\L") + msg.add(k & ": " & v & "\c\L") + +proc sendHeaders*(req: TRequest, headers: PStringTable): PFuture[void] = + ## Sends the specified headers to the requesting client. + var msg = "" + addHeaders(msg, headers) + return req.client.send(msg) proc respond*(req: TRequest, code: THttpCode, content: string, headers: PStringTable = newStringTable()) {.async.} = @@ -64,9 +69,9 @@ proc respond*(req: TRequest, code: THttpCode, ## This procedure will **not** close the client socket. var customHeaders = headers customHeaders["Content-Length"] = $content.len - await req.client.send("HTTP/1.1 " & $code & "\c\L") - await sendHeaders(req, headers) - await req.client.send("\c\L" & content) + var msg = "HTTP/1.1 " & $code & "\c\L" + msg.addHeaders(customHeaders) + await req.client.send(msg & "\c\L" & content) proc newRequest(): TRequest = result.headers = newStringTable(modeCaseInsensitive) @@ -93,90 +98,91 @@ proc sendStatus(client: PAsyncSocket, status: string): PFuture[void] = proc processClient(client: PAsyncSocket, address: string, callback: proc (request: TRequest): PFuture[void]) {.async.} = - # GET /path HTTP/1.1 - # Header: val - # \n - var request = newRequest() - request.hostname = address - assert client != nil - request.client = client - var runCallback = true - - # First line - GET /path HTTP/1.1 - let line = await client.recvLine() # TODO: Timeouts. - if line == "": - client.close() - return - let lineParts = line.split(' ') - if lineParts.len != 3: - request.respond(Http400, "Invalid request. Got: " & line) - runCallback = false - - let reqMethod = lineParts[0] - let path = lineParts[1] - let protocol = lineParts[2] - - # Headers - var i = 0 while true: - i = 0 - let headerLine = await client.recvLine() - if headerLine == "": - client.close(); return - if headerLine == "\c\L": break - # TODO: Compiler crash - #let (key, value) = parseHeader(headerLine) - let kv = parseHeader(headerLine) - request.headers[kv.key] = kv.value - - request.reqMethod = reqMethod - request.url = parseUrl(path) - try: - request.protocol = protocol.parseProtocol() - except EInvalidValue: - request.respond(Http400, "Invalid request protocol. Got: " & protocol) - runCallback = false - - if reqMethod.normalize == "post": - # Check for Expect header - if request.headers.hasKey("Expect"): - if request.headers["Expect"].toLower == "100-continue": - await client.sendStatus("100 Continue") - else: - await client.sendStatus("417 Expectation Failed") - - # Read the body - # - Check for Content-length header - if request.headers.hasKey("Content-Length"): - var contentLength = 0 - if parseInt(request.headers["Content-Length"], contentLength) == 0: - await request.respond(Http400, "Bad Request. Invalid Content-Length.") - else: - request.body = await client.recv(contentLength) - assert request.body.len == contentLength - else: - await request.respond(Http400, "Bad Request. No Content-Length.") + # GET /path HTTP/1.1 + # Header: val + # \n + var request = newRequest() + request.hostname = address + assert client != nil + request.client = client + var runCallback = true + + # First line - GET /path HTTP/1.1 + let line = await client.recvLine() # TODO: Timeouts. + if line == "": + client.close() + return + let lineParts = line.split(' ') + if lineParts.len != 3: + request.respond(Http400, "Invalid request. Got: " & line) + runCallback = false + + let reqMethod = lineParts[0] + let path = lineParts[1] + let protocol = lineParts[2] + + # Headers + var i = 0 + while true: + i = 0 + let headerLine = await client.recvLine() + if headerLine == "": + client.close(); return + if headerLine == "\c\L": break + # TODO: Compiler crash + #let (key, value) = parseHeader(headerLine) + let kv = parseHeader(headerLine) + request.headers[kv.key] = kv.value + + request.reqMethod = reqMethod + request.url = parseUrl(path) + try: + request.protocol = protocol.parseProtocol() + except EInvalidValue: + request.respond(Http400, "Invalid request protocol. Got: " & protocol) runCallback = false - case reqMethod.normalize - of "get", "post", "head", "put", "delete", "trace", "options", "connect", "patch": - if runCallback: - await callback(request) - else: - await request.respond(Http400, "Invalid request method. Got: " & reqMethod) - - # Persistent connections - if (request.protocol == HttpVer11 and - request.headers["connection"].normalize != "close") or - (request.protocol == HttpVer10 and - request.headers["connection"].normalize == "keep-alive"): - # In HTTP 1.1 we assume that connection is persistent. Unless connection - # header states otherwise. - # In HTTP 1.0 we assume that the connection should not be persistent. - # Unless the connection header states otherwise. - await processClient(client, address, callback) - else: - request.client.close() + if reqMethod.normalize == "post": + # Check for Expect header + if request.headers.hasKey("Expect"): + if request.headers["Expect"].toLower == "100-continue": + await client.sendStatus("100 Continue") + else: + await client.sendStatus("417 Expectation Failed") + + # Read the body + # - Check for Content-length header + if request.headers.hasKey("Content-Length"): + var contentLength = 0 + if parseInt(request.headers["Content-Length"], contentLength) == 0: + await request.respond(Http400, "Bad Request. Invalid Content-Length.") + else: + request.body = await client.recv(contentLength) + assert request.body.len == contentLength + else: + await request.respond(Http400, "Bad Request. No Content-Length.") + runCallback = false + + case reqMethod.normalize + of "get", "post", "head", "put", "delete", "trace", "options", "connect", "patch": + if runCallback: + await callback(request) + else: + await request.respond(Http400, "Invalid request method. Got: " & reqMethod) + + # Persistent connections + if (request.protocol == HttpVer11 and + request.headers["connection"].normalize != "close") or + (request.protocol == HttpVer10 and + request.headers["connection"].normalize == "keep-alive"): + # In HTTP 1.1 we assume that connection is persistent. Unless connection + # header states otherwise. + # In HTTP 1.0 we assume that the connection should not be persistent. + # Unless the connection header states otherwise. + else: + request.client.close() + break proc serve*(server: PAsyncHttpServer, port: TPort, callback: proc (request: TRequest): PFuture[void], diff --git a/lib/pure/asyncnet.nim b/lib/pure/asyncnet.nim index d16c85c58..6eb43b594 100644 --- a/lib/pure/asyncnet.nim +++ b/lib/pure/asyncnet.nim @@ -110,12 +110,10 @@ proc recv*(socket: PAsyncSocket, size: int, if socket.currPos >= socket.bufLen: if (flags and MSG_PEEK) == MSG_PEEK: # We don't want to get another buffer if we're peeking. - result.setLen(read) - return + break let res = await socket.readIntoBuf(flags and (not MSG_PEEK)) if res == 0: - result.setLen(read) - return + break let chunk = min(socket.bufLen-socket.currPos, size-read) copyMem(addr(result[read]), addr(socket.buffer[socket.currPos]), chunk) @@ -181,28 +179,60 @@ proc recvLine*(socket: PAsyncSocket): PFuture[string] {.async.} = ## If the socket is disconnected in the middle of a line (before ``\r\L`` ## is read) then line will be set to ``""``. ## The partial line **will be lost**. - template addNLIfEmpty(): stmt = if result.len == 0: result.add("\c\L") - result = "" - var c = "" - while true: - c = await recv(socket, 1) - if c.len == 0: - return "" - if c == "\r": - c = await recv(socket, 1, MSG_PEEK) - if c.len > 0 and c == "\L": - let dummy = await recv(socket, 1) - assert dummy == "\L" - addNLIfEmpty() - return - elif c == "\L": - addNLIfEmpty() - return - add(result.string, c) + if socket.isBuffered: + result = "" + if socket.bufLen == 0: + let res = await socket.readIntoBuf(0) + if res == 0: + return + + var lastR = false + while true: + if socket.currPos >= socket.bufLen: + let res = await socket.readIntoBuf(0) + if res == 0: + result = "" + break + + case socket.buffer[socket.currPos] + of '\r': + lastR = true + addNLIfEmpty() + of '\L': + addNLIfEmpty() + socket.currPos.inc() + return + else: + if lastR: + socket.currPos.inc() + return + else: + result.add socket.buffer[socket.currPos] + socket.currPos.inc() + else: + + + result = "" + var c = "" + while true: + c = await recv(socket, 1) + if c.len == 0: + return "" + if c == "\r": + c = await recv(socket, 1, MSG_PEEK) + if c.len > 0 and c == "\L": + let dummy = await recv(socket, 1) + assert dummy == "\L" + addNLIfEmpty() + return + elif c == "\L": + addNLIfEmpty() + return + add(result.string, c) proc bindAddr*(socket: PAsyncSocket, port = TPort(0), address = "") = ## Binds ``address``:``port`` to the socket. diff --git a/lib/pure/selectors.nim b/lib/pure/selectors.nim index 3af5f699c..bd53c2dbf 100644 --- a/lib/pure/selectors.nim +++ b/lib/pure/selectors.nim @@ -163,7 +163,7 @@ elif defined(linux): proc newSelector*(): PSelector = new result result.epollFD = epoll_create(64) - result.events = cast[array[64, epoll_event]](alloc0(sizeof(epoll_event)*64)) + #result.events = cast[array[64, epoll_event]](alloc0(sizeof(epoll_event)*64)) result.fds = initTable[TSocketHandle, PSelectorKey]() if result.epollFD < 0: OSError(OSLastError()) |