From e54ab22bf9a67353e5a70f56e7801624d68ca4f5 Mon Sep 17 00:00:00 2001 From: Dominik Picheta Date: Wed, 14 May 2014 23:35:46 +0100 Subject: Fixes #1197. --- lib/pure/selectors.nim | 36 ++++++++++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 8 deletions(-) (limited to 'lib/pure/selectors.nim') diff --git a/lib/pure/selectors.nim b/lib/pure/selectors.nim index f630ba235..498f41e83 100644 --- a/lib/pure/selectors.nim +++ b/lib/pure/selectors.nim @@ -49,9 +49,10 @@ when defined(linux) or defined(nimdoc): ## Registers file descriptor ``fd`` to selector ``s`` with a set of TEvent ## ``events``. var event = createEventStruct(events, fd) - if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fd, addr(event)) != 0: - OSError(OSLastError()) - + if events != {}: + if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fd, addr(event)) != 0: + OSError(OSLastError()) + var key = PSelectorKey(fd: fd, events: events, data: data) s.fds[fd] = key @@ -61,11 +62,27 @@ when defined(linux) or defined(nimdoc): events: set[TEvent]): PSelectorKey {.discardable.} = ## Updates the events which ``fd`` wants notifications for. if s.fds[fd].events != events: - var event = createEventStruct(events, fd) + if events == {}: + # This fd is idle -- it should not be registered to epoll. + # But it should remain a part of this selector instance. + # This is to prevent epoll_wait from returning immediately + # because its got fds which are waiting for no events and + # are therefore constantly ready. (leading to 100% CPU usage). + if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fd, nil) != 0: + OSError(OSLastError()) + s.fds[fd].events = events + else: + var event = createEventStruct(events, fd) + if s.fds[fd].events == {}: + # This fd is idle. It's not a member of this epoll instance and must + # be re-registered. + if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fd, addr(event)) != 0: + OSError(OSLastError()) + else: + if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0: + OSError(OSLastError()) + s.fds[fd].events = events - s.fds[fd].events = events - if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0: - OSError(OSLastError()) result = s.fds[fd] proc unregister*(s: PSelector, fd: TSocketHandle): PSelectorKey {.discardable.} = @@ -123,7 +140,10 @@ when defined(linux) or defined(nimdoc): ## Determines whether selector contains a file descriptor. if s.fds.hasKey(fd): # Ensure the underlying epoll instance still contains this fd. - result = epollHasFd(s, fd) + if s.fds[fd].events != {}: + result = epollHasFd(s, fd) + else: + result = true else: return false -- cgit 1.4.1-2-gfad0 From 0dc770332eded939d953397a8725bbc9274dd35e Mon Sep 17 00:00:00 2001 From: Clay Sweetser Date: Sat, 24 May 2014 09:12:07 -0400 Subject: Fix issue #1134 Adds the necessary imports for selectors under MacOSX --- lib/pure/selectors.nim | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'lib/pure/selectors.nim') diff --git a/lib/pure/selectors.nim b/lib/pure/selectors.nim index 498f41e83..856462631 100644 --- a/lib/pure/selectors.nim +++ b/lib/pure/selectors.nim @@ -11,9 +11,12 @@ import tables, os, unsigned, hashes -when defined(linux): import posix, epoll -elif defined(windows): import winlean -else: import posix +when defined(linux) or defined(macosx): + import posix, epoll +elif defined(windows): + import winlean +else: + import posix proc hash*(x: TSocketHandle): THash {.borrow.} proc `$`*(x: TSocketHandle): string {.borrow.} -- cgit 1.4.1-2-gfad0 From f10f9c4b7ed2dff82b62f15d4f77f049edd0f2fd Mon Sep 17 00:00:00 2001 From: Varriount Date: Sun, 25 May 2014 11:35:10 -0400 Subject: Update selectors.nim Fixed selectors.nim on macosx --- lib/pure/selectors.nim | 37 +++++++++++++++++++++++++++++++++---- 1 file changed, 33 insertions(+), 4 deletions(-) (limited to 'lib/pure/selectors.nim') diff --git a/lib/pure/selectors.nim b/lib/pure/selectors.nim index 856462631..bea1a3dd4 100644 --- a/lib/pure/selectors.nim +++ b/lib/pure/selectors.nim @@ -11,7 +11,7 @@ import tables, os, unsigned, hashes -when defined(linux) or defined(macosx): +when defined(linux): import posix, epoll elif defined(windows): import winlean @@ -32,7 +32,36 @@ type TReadyInfo* = tuple[key: PSelectorKey, events: set[TEvent]] -when defined(linux) or defined(nimdoc): +when defined(nimdoc): + type + PSelector* = ref object + ## An object which holds file descripters to be checked for read/write + ## status. + fds: TTable[TSocketHandle, PSelectorKey] + + proc register*(s: PSelector, fd: TSocketHandle, events: set[TEvent], + data: PObject): PSelectorKey {.discardable.} = + ## Registers file descriptor ``fd`` to selector ``s`` with a set of TEvent + ## ``events``. + + proc update*(s: PSelector, fd: TSocketHandle, + events: set[TEvent]): PSelectorKey {.discardable.} = + ## Updates the events which ``fd`` wants notifications for. + + proc select*(s: PSelector, timeout: int): seq[TReadyInfo] = + ## The ``events`` field of the returned ``key`` contains the original events + ## for which the ``fd`` was bound. This is contrary to the ``events`` field + ## of the ``TReadyInfo`` tuple which determines which events are ready + ## on the ``fd``. + + proc contains*(s: PSelector, fd: TSocketHandle): bool = + ## Determines whether selector contains a file descriptor. + + proc `[]`*(s: PSelector, fd: TSocketHandle): PSelectorKey = + ## Retrieves the selector key for ``fd``. + + +elif defined(linux): type PSelector* = ref object epollFD: cint @@ -154,7 +183,7 @@ when defined(linux) or defined(nimdoc): ## Retrieves the selector key for ``fd``. return s.fds[fd] -else: +elif defined(openbsd) or defined(macosx): # TODO: kqueue for bsd/mac os x. type PSelector* = ref object @@ -253,7 +282,7 @@ proc contains*(s: PSelector, key: PSelectorKey): bool = ## the new one may have the same value. return key.fd in s and s.fds[key.fd] == key -when isMainModule: +when isMainModule and not defined(nimdoc): # Select() import sockets type -- cgit 1.4.1-2-gfad0 From 1d6c05edc399e31919114f2b07519ae79ae1b804 Mon Sep 17 00:00:00 2001 From: Varriount Date: Sun, 25 May 2014 12:20:24 -0400 Subject: Update selectors.nim --- lib/pure/selectors.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'lib/pure/selectors.nim') diff --git a/lib/pure/selectors.nim b/lib/pure/selectors.nim index bea1a3dd4..3af5f699c 100644 --- a/lib/pure/selectors.nim +++ b/lib/pure/selectors.nim @@ -183,7 +183,7 @@ elif defined(linux): ## Retrieves the selector key for ``fd``. return s.fds[fd] -elif defined(openbsd) or defined(macosx): +elif not defined(nimdoc): # TODO: kqueue for bsd/mac os x. type PSelector* = ref object -- cgit 1.4.1-2-gfad0 From cf5c8a204e76ff9ed5edb6ec0ebe3b97ee90f553 Mon Sep 17 00:00:00 2001 From: Dominik Picheta Date: Sat, 12 Jul 2014 22:51:06 +0100 Subject: Many async optimisations. * Selectors implementation will now attempt to immediately execute an IO operation instead of waiting for a ready notification. * Removed recursion in asynchttpserver. * Improved buffered implementation of recvLine in asyncnet. * Optimised ``respond`` in asynchttpserver removing a possible "Delayed ACK" situation. --- lib/pure/asyncdispatch.nim | 30 ++++++-- lib/pure/asynchttpserver.nim | 180 ++++++++++++++++++++++--------------------- lib/pure/asyncnet.nim | 74 ++++++++++++------ lib/pure/selectors.nim | 2 +- 4 files changed, 168 insertions(+), 118 deletions(-) (limited to 'lib/pure/selectors.nim') diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index 12329951c..14667a008 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -552,7 +552,18 @@ when defined(windows) or defined(nimdoc): initAll() else: import selectors - from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK + when defined(windows): + import winlean + const + EINTR = WSAEINPROGRESS + EINPROGRESS = WSAEINPROGRESS + EWOULDBLOCK = WSAEWOULDBLOCK + EAGAIN = EINPROGRESS + MSG_NOSIGNAL = 0 + else: + from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK, + MSG_NOSIGNAL + type TAsyncFD* = distinct cint TCallback = proc (sock: TAsyncFD): bool {.closure,gcsafe.} @@ -693,12 +704,12 @@ else: proc cb(sock: TAsyncFD): bool = result = true - let res = recv(sock.TSocketHandle, addr readBuffer[0], size, + let res = recv(sock.TSocketHandle, addr readBuffer[0], size.cint, flags.cint) #echo("recv cb res: ", res) if res < 0: let lastError = osLastError() - if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: + if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: retFuture.fail(newException(EOS, osErrorMsg(lastError))) else: result = false # We still want this callback to be called. @@ -708,8 +719,8 @@ else: else: readBuffer.setLen(res) retFuture.complete(readBuffer) - - addRead(socket, cb) + if not cb(socket): + addRead(socket, cb) return retFuture proc send*(socket: TAsyncFD, data: string): PFuture[void] = @@ -721,7 +732,8 @@ else: result = true let netSize = data.len-written var d = data.cstring - let res = send(sock.TSocketHandle, addr d[written], netSize, 0.cint) + let res = send(sock.TSocketHandle, addr d[written], netSize.cint, + MSG_NOSIGNAL) if res < 0: let lastError = osLastError() if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: @@ -734,7 +746,8 @@ else: result = false # We still have data to send. else: retFuture.complete() - addWrite(socket, cb) + if not cb(socket): + addWrite(socket, cb) return retFuture proc acceptAddr*(socket: TAsyncFD): @@ -756,7 +769,8 @@ else: else: register(client.TAsyncFD) retFuture.complete(($inet_ntoa(sockAddress.sin_addr), client.TAsyncFD)) - addRead(socket, cb) + if not cb(socket): + addRead(socket, cb) return retFuture proc accept*(socket: TAsyncFD): PFuture[TAsyncFD] = diff --git a/lib/pure/asynchttpserver.nim b/lib/pure/asynchttpserver.nim index 1b47cf5f1..6273d479c 100644 --- a/lib/pure/asynchttpserver.nim +++ b/lib/pure/asynchttpserver.nim @@ -51,10 +51,15 @@ proc `==`*(protocol: tuple[orig: string, major, minor: int], proc newAsyncHttpServer*(): PAsyncHttpServer = new result -proc sendHeaders*(req: TRequest, headers: PStringTable) {.async.} = - ## Sends the specified headers to the requesting client. +proc addHeaders(msg: var string, headers: PStringTable) = for k, v in headers: - await req.client.send(k & ": " & v & "\c\L") + msg.add(k & ": " & v & "\c\L") + +proc sendHeaders*(req: TRequest, headers: PStringTable): PFuture[void] = + ## Sends the specified headers to the requesting client. + var msg = "" + addHeaders(msg, headers) + return req.client.send(msg) proc respond*(req: TRequest, code: THttpCode, content: string, headers: PStringTable = newStringTable()) {.async.} = @@ -64,9 +69,9 @@ proc respond*(req: TRequest, code: THttpCode, ## This procedure will **not** close the client socket. var customHeaders = headers customHeaders["Content-Length"] = $content.len - await req.client.send("HTTP/1.1 " & $code & "\c\L") - await sendHeaders(req, headers) - await req.client.send("\c\L" & content) + var msg = "HTTP/1.1 " & $code & "\c\L" + msg.addHeaders(customHeaders) + await req.client.send(msg & "\c\L" & content) proc newRequest(): TRequest = result.headers = newStringTable(modeCaseInsensitive) @@ -93,90 +98,91 @@ proc sendStatus(client: PAsyncSocket, status: string): PFuture[void] = proc processClient(client: PAsyncSocket, address: string, callback: proc (request: TRequest): PFuture[void]) {.async.} = - # GET /path HTTP/1.1 - # Header: val - # \n - var request = newRequest() - request.hostname = address - assert client != nil - request.client = client - var runCallback = true - - # First line - GET /path HTTP/1.1 - let line = await client.recvLine() # TODO: Timeouts. - if line == "": - client.close() - return - let lineParts = line.split(' ') - if lineParts.len != 3: - request.respond(Http400, "Invalid request. Got: " & line) - runCallback = false - - let reqMethod = lineParts[0] - let path = lineParts[1] - let protocol = lineParts[2] - - # Headers - var i = 0 while true: - i = 0 - let headerLine = await client.recvLine() - if headerLine == "": - client.close(); return - if headerLine == "\c\L": break - # TODO: Compiler crash - #let (key, value) = parseHeader(headerLine) - let kv = parseHeader(headerLine) - request.headers[kv.key] = kv.value - - request.reqMethod = reqMethod - request.url = parseUrl(path) - try: - request.protocol = protocol.parseProtocol() - except EInvalidValue: - request.respond(Http400, "Invalid request protocol. Got: " & protocol) - runCallback = false - - if reqMethod.normalize == "post": - # Check for Expect header - if request.headers.hasKey("Expect"): - if request.headers["Expect"].toLower == "100-continue": - await client.sendStatus("100 Continue") - else: - await client.sendStatus("417 Expectation Failed") - - # Read the body - # - Check for Content-length header - if request.headers.hasKey("Content-Length"): - var contentLength = 0 - if parseInt(request.headers["Content-Length"], contentLength) == 0: - await request.respond(Http400, "Bad Request. Invalid Content-Length.") - else: - request.body = await client.recv(contentLength) - assert request.body.len == contentLength - else: - await request.respond(Http400, "Bad Request. No Content-Length.") + # GET /path HTTP/1.1 + # Header: val + # \n + var request = newRequest() + request.hostname = address + assert client != nil + request.client = client + var runCallback = true + + # First line - GET /path HTTP/1.1 + let line = await client.recvLine() # TODO: Timeouts. + if line == "": + client.close() + return + let lineParts = line.split(' ') + if lineParts.len != 3: + request.respond(Http400, "Invalid request. Got: " & line) + runCallback = false + + let reqMethod = lineParts[0] + let path = lineParts[1] + let protocol = lineParts[2] + + # Headers + var i = 0 + while true: + i = 0 + let headerLine = await client.recvLine() + if headerLine == "": + client.close(); return + if headerLine == "\c\L": break + # TODO: Compiler crash + #let (key, value) = parseHeader(headerLine) + let kv = parseHeader(headerLine) + request.headers[kv.key] = kv.value + + request.reqMethod = reqMethod + request.url = parseUrl(path) + try: + request.protocol = protocol.parseProtocol() + except EInvalidValue: + request.respond(Http400, "Invalid request protocol. Got: " & protocol) runCallback = false - case reqMethod.normalize - of "get", "post", "head", "put", "delete", "trace", "options", "connect", "patch": - if runCallback: - await callback(request) - else: - await request.respond(Http400, "Invalid request method. Got: " & reqMethod) - - # Persistent connections - if (request.protocol == HttpVer11 and - request.headers["connection"].normalize != "close") or - (request.protocol == HttpVer10 and - request.headers["connection"].normalize == "keep-alive"): - # In HTTP 1.1 we assume that connection is persistent. Unless connection - # header states otherwise. - # In HTTP 1.0 we assume that the connection should not be persistent. - # Unless the connection header states otherwise. - await processClient(client, address, callback) - else: - request.client.close() + if reqMethod.normalize == "post": + # Check for Expect header + if request.headers.hasKey("Expect"): + if request.headers["Expect"].toLower == "100-continue": + await client.sendStatus("100 Continue") + else: + await client.sendStatus("417 Expectation Failed") + + # Read the body + # - Check for Content-length header + if request.headers.hasKey("Content-Length"): + var contentLength = 0 + if parseInt(request.headers["Content-Length"], contentLength) == 0: + await request.respond(Http400, "Bad Request. Invalid Content-Length.") + else: + request.body = await client.recv(contentLength) + assert request.body.len == contentLength + else: + await request.respond(Http400, "Bad Request. No Content-Length.") + runCallback = false + + case reqMethod.normalize + of "get", "post", "head", "put", "delete", "trace", "options", "connect", "patch": + if runCallback: + await callback(request) + else: + await request.respond(Http400, "Invalid request method. Got: " & reqMethod) + + # Persistent connections + if (request.protocol == HttpVer11 and + request.headers["connection"].normalize != "close") or + (request.protocol == HttpVer10 and + request.headers["connection"].normalize == "keep-alive"): + # In HTTP 1.1 we assume that connection is persistent. Unless connection + # header states otherwise. + # In HTTP 1.0 we assume that the connection should not be persistent. + # Unless the connection header states otherwise. + else: + request.client.close() + break proc serve*(server: PAsyncHttpServer, port: TPort, callback: proc (request: TRequest): PFuture[void], diff --git a/lib/pure/asyncnet.nim b/lib/pure/asyncnet.nim index d16c85c58..6eb43b594 100644 --- a/lib/pure/asyncnet.nim +++ b/lib/pure/asyncnet.nim @@ -110,12 +110,10 @@ proc recv*(socket: PAsyncSocket, size: int, if socket.currPos >= socket.bufLen: if (flags and MSG_PEEK) == MSG_PEEK: # We don't want to get another buffer if we're peeking. - result.setLen(read) - return + break let res = await socket.readIntoBuf(flags and (not MSG_PEEK)) if res == 0: - result.setLen(read) - return + break let chunk = min(socket.bufLen-socket.currPos, size-read) copyMem(addr(result[read]), addr(socket.buffer[socket.currPos]), chunk) @@ -181,28 +179,60 @@ proc recvLine*(socket: PAsyncSocket): PFuture[string] {.async.} = ## If the socket is disconnected in the middle of a line (before ``\r\L`` ## is read) then line will be set to ``""``. ## The partial line **will be lost**. - template addNLIfEmpty(): stmt = if result.len == 0: result.add("\c\L") - result = "" - var c = "" - while true: - c = await recv(socket, 1) - if c.len == 0: - return "" - if c == "\r": - c = await recv(socket, 1, MSG_PEEK) - if c.len > 0 and c == "\L": - let dummy = await recv(socket, 1) - assert dummy == "\L" - addNLIfEmpty() - return - elif c == "\L": - addNLIfEmpty() - return - add(result.string, c) + if socket.isBuffered: + result = "" + if socket.bufLen == 0: + let res = await socket.readIntoBuf(0) + if res == 0: + return + + var lastR = false + while true: + if socket.currPos >= socket.bufLen: + let res = await socket.readIntoBuf(0) + if res == 0: + result = "" + break + + case socket.buffer[socket.currPos] + of '\r': + lastR = true + addNLIfEmpty() + of '\L': + addNLIfEmpty() + socket.currPos.inc() + return + else: + if lastR: + socket.currPos.inc() + return + else: + result.add socket.buffer[socket.currPos] + socket.currPos.inc() + else: + + + result = "" + var c = "" + while true: + c = await recv(socket, 1) + if c.len == 0: + return "" + if c == "\r": + c = await recv(socket, 1, MSG_PEEK) + if c.len > 0 and c == "\L": + let dummy = await recv(socket, 1) + assert dummy == "\L" + addNLIfEmpty() + return + elif c == "\L": + addNLIfEmpty() + return + add(result.string, c) proc bindAddr*(socket: PAsyncSocket, port = TPort(0), address = "") = ## Binds ``address``:``port`` to the socket. diff --git a/lib/pure/selectors.nim b/lib/pure/selectors.nim index 3af5f699c..bd53c2dbf 100644 --- a/lib/pure/selectors.nim +++ b/lib/pure/selectors.nim @@ -163,7 +163,7 @@ elif defined(linux): proc newSelector*(): PSelector = new result result.epollFD = epoll_create(64) - result.events = cast[array[64, epoll_event]](alloc0(sizeof(epoll_event)*64)) + #result.events = cast[array[64, epoll_event]](alloc0(sizeof(epoll_event)*64)) result.fds = initTable[TSocketHandle, PSelectorKey]() if result.epollFD < 0: OSError(OSLastError()) -- cgit 1.4.1-2-gfad0