summary refs log tree commit diff stats
path: root/lib
diff options
context:
space:
mode:
authorDominik Picheta <dominikpicheta@googlemail.com>2014-07-12 22:51:06 +0100
committerDominik Picheta <dominikpicheta@googlemail.com>2014-07-12 22:51:06 +0100
commitcf5c8a204e76ff9ed5edb6ec0ebe3b97ee90f553 (patch)
tree90d9ce26be982b12b55e2dfaf021eda13fb8e995 /lib
parentc260b22fbca0e8a3be903c1c6db2027cfcf1c7f2 (diff)
downloadNim-cf5c8a204e76ff9ed5edb6ec0ebe3b97ee90f553.tar.gz
Many async optimisations.
* Selectors implementation will now attempt to immediately execute an IO
  operation instead of waiting for a ready notification.
* Removed recursion in asynchttpserver.
* Improved buffered implementation of recvLine in asyncnet.
* Optimised ``respond`` in asynchttpserver removing a possible "Delayed ACK"
  situation.
Diffstat (limited to 'lib')
-rw-r--r--lib/pure/asyncdispatch.nim30
-rw-r--r--lib/pure/asynchttpserver.nim180
-rw-r--r--lib/pure/asyncnet.nim74
-rw-r--r--lib/pure/selectors.nim2
4 files changed, 168 insertions, 118 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim
index 12329951c..14667a008 100644
--- a/lib/pure/asyncdispatch.nim
+++ b/lib/pure/asyncdispatch.nim
@@ -552,7 +552,18 @@ when defined(windows) or defined(nimdoc):
   initAll()
 else:
   import selectors
-  from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK
+  when defined(windows):
+    import winlean
+    const
+      EINTR = WSAEINPROGRESS
+      EINPROGRESS = WSAEINPROGRESS
+      EWOULDBLOCK = WSAEWOULDBLOCK
+      EAGAIN = EINPROGRESS
+      MSG_NOSIGNAL = 0
+  else:
+    from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK,
+                      MSG_NOSIGNAL
+  
   type
     TAsyncFD* = distinct cint
     TCallback = proc (sock: TAsyncFD): bool {.closure,gcsafe.}
@@ -693,12 +704,12 @@ else:
 
     proc cb(sock: TAsyncFD): bool =
       result = true
-      let res = recv(sock.TSocketHandle, addr readBuffer[0], size,
+      let res = recv(sock.TSocketHandle, addr readBuffer[0], size.cint,
                      flags.cint)
       #echo("recv cb res: ", res)
       if res < 0:
         let lastError = osLastError()
-        if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: 
+        if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
           retFuture.fail(newException(EOS, osErrorMsg(lastError)))
         else:
           result = false # We still want this callback to be called.
@@ -708,8 +719,8 @@ else:
       else:
         readBuffer.setLen(res)
         retFuture.complete(readBuffer)
-  
-    addRead(socket, cb)
+    if not cb(socket):
+      addRead(socket, cb)
     return retFuture
 
   proc send*(socket: TAsyncFD, data: string): PFuture[void] =
@@ -721,7 +732,8 @@ else:
       result = true
       let netSize = data.len-written
       var d = data.cstring
-      let res = send(sock.TSocketHandle, addr d[written], netSize, 0.cint)
+      let res = send(sock.TSocketHandle, addr d[written], netSize.cint,
+                     MSG_NOSIGNAL)
       if res < 0:
         let lastError = osLastError()
         if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
@@ -734,7 +746,8 @@ else:
           result = false # We still have data to send.
         else:
           retFuture.complete()
-    addWrite(socket, cb)
+    if not cb(socket):
+      addWrite(socket, cb)
     return retFuture
 
   proc acceptAddr*(socket: TAsyncFD): 
@@ -756,7 +769,8 @@ else:
       else:
         register(client.TAsyncFD)
         retFuture.complete(($inet_ntoa(sockAddress.sin_addr), client.TAsyncFD))
-    addRead(socket, cb)
+    if not cb(socket):
+      addRead(socket, cb)
     return retFuture
 
 proc accept*(socket: TAsyncFD): PFuture[TAsyncFD] =
diff --git a/lib/pure/asynchttpserver.nim b/lib/pure/asynchttpserver.nim
index 1b47cf5f1..6273d479c 100644
--- a/lib/pure/asynchttpserver.nim
+++ b/lib/pure/asynchttpserver.nim
@@ -51,10 +51,15 @@ proc `==`*(protocol: tuple[orig: string, major, minor: int],
 proc newAsyncHttpServer*(): PAsyncHttpServer =
   new result
 
-proc sendHeaders*(req: TRequest, headers: PStringTable) {.async.} =
-  ## Sends the specified headers to the requesting client.
+proc addHeaders(msg: var string, headers: PStringTable) =
   for k, v in headers:
-    await req.client.send(k & ": " & v & "\c\L")
+    msg.add(k & ": " & v & "\c\L")
+
+proc sendHeaders*(req: TRequest, headers: PStringTable): PFuture[void] =
+  ## Sends the specified headers to the requesting client.
+  var msg = ""
+  addHeaders(msg, headers)
+  return req.client.send(msg)
 
 proc respond*(req: TRequest, code: THttpCode,
         content: string, headers: PStringTable = newStringTable()) {.async.} =
@@ -64,9 +69,9 @@ proc respond*(req: TRequest, code: THttpCode,
   ## This procedure will **not** close the client socket.
   var customHeaders = headers
   customHeaders["Content-Length"] = $content.len
-  await req.client.send("HTTP/1.1 " & $code & "\c\L")
-  await sendHeaders(req, headers)
-  await req.client.send("\c\L" & content)
+  var msg = "HTTP/1.1 " & $code & "\c\L"
+  msg.addHeaders(customHeaders)
+  await req.client.send(msg & "\c\L" & content)
 
 proc newRequest(): TRequest =
   result.headers = newStringTable(modeCaseInsensitive)
@@ -93,90 +98,91 @@ proc sendStatus(client: PAsyncSocket, status: string): PFuture[void] =
 
 proc processClient(client: PAsyncSocket, address: string,
                  callback: proc (request: TRequest): PFuture[void]) {.async.} =
-  # GET /path HTTP/1.1
-  # Header: val
-  # \n
-  var request = newRequest()
-  request.hostname = address
-  assert client != nil
-  request.client = client
-  var runCallback = true
-
-  # First line - GET /path HTTP/1.1
-  let line = await client.recvLine() # TODO: Timeouts.
-  if line == "":
-    client.close()
-    return
-  let lineParts = line.split(' ')
-  if lineParts.len != 3:
-    request.respond(Http400, "Invalid request. Got: " & line)
-    runCallback = false
-
-  let reqMethod = lineParts[0]
-  let path = lineParts[1]
-  let protocol = lineParts[2]
-
-  # Headers
-  var i = 0
   while true:
-    i = 0
-    let headerLine = await client.recvLine()
-    if headerLine == "":
-      client.close(); return
-    if headerLine == "\c\L": break
-    # TODO: Compiler crash
-    #let (key, value) = parseHeader(headerLine)
-    let kv = parseHeader(headerLine)
-    request.headers[kv.key] = kv.value
-
-  request.reqMethod = reqMethod
-  request.url = parseUrl(path)
-  try:
-    request.protocol = protocol.parseProtocol()
-  except EInvalidValue:
-    request.respond(Http400, "Invalid request protocol. Got: " & protocol)
-    runCallback = false
-
-  if reqMethod.normalize == "post":
-    # Check for Expect header
-    if request.headers.hasKey("Expect"):
-      if request.headers["Expect"].toLower == "100-continue":
-        await client.sendStatus("100 Continue")
-      else:
-        await client.sendStatus("417 Expectation Failed")
-  
-    # Read the body
-    # - Check for Content-length header
-    if request.headers.hasKey("Content-Length"):
-      var contentLength = 0
-      if parseInt(request.headers["Content-Length"], contentLength) == 0:
-        await request.respond(Http400, "Bad Request. Invalid Content-Length.")
-      else:
-        request.body = await client.recv(contentLength)
-        assert request.body.len == contentLength
-    else:
-      await request.respond(Http400, "Bad Request. No Content-Length.")
+    # GET /path HTTP/1.1
+    # Header: val
+    # \n
+    var request = newRequest()
+    request.hostname = address
+    assert client != nil
+    request.client = client
+    var runCallback = true
+
+    # First line - GET /path HTTP/1.1
+    let line = await client.recvLine() # TODO: Timeouts.
+    if line == "":
+      client.close()
+      return
+    let lineParts = line.split(' ')
+    if lineParts.len != 3:
+      request.respond(Http400, "Invalid request. Got: " & line)
+      runCallback = false
+
+    let reqMethod = lineParts[0]
+    let path = lineParts[1]
+    let protocol = lineParts[2]
+
+    # Headers
+    var i = 0
+    while true:
+      i = 0
+      let headerLine = await client.recvLine()
+      if headerLine == "":
+        client.close(); return
+      if headerLine == "\c\L": break
+      # TODO: Compiler crash
+      #let (key, value) = parseHeader(headerLine)
+      let kv = parseHeader(headerLine)
+      request.headers[kv.key] = kv.value
+
+    request.reqMethod = reqMethod
+    request.url = parseUrl(path)
+    try:
+      request.protocol = protocol.parseProtocol()
+    except EInvalidValue:
+      request.respond(Http400, "Invalid request protocol. Got: " & protocol)
       runCallback = false
 
-  case reqMethod.normalize
-  of "get", "post", "head", "put", "delete", "trace", "options", "connect", "patch":
-    if runCallback:
-      await callback(request)
-  else:
-    await request.respond(Http400, "Invalid request method. Got: " & reqMethod)
-
-  # Persistent connections
-  if (request.protocol == HttpVer11 and
-      request.headers["connection"].normalize != "close") or
-     (request.protocol == HttpVer10 and
-      request.headers["connection"].normalize == "keep-alive"):
-    # In HTTP 1.1 we assume that connection is persistent. Unless connection
-    # header states otherwise.
-    # In HTTP 1.0 we assume that the connection should not be persistent.
-    # Unless the connection header states otherwise.
-    await processClient(client, address, callback)
-  else:
-    request.client.close()
+    if reqMethod.normalize == "post":
+      # Check for Expect header
+      if request.headers.hasKey("Expect"):
+        if request.headers["Expect"].toLower == "100-continue":
+          await client.sendStatus("100 Continue")
+        else:
+          await client.sendStatus("417 Expectation Failed")
+    
+      # Read the body
+      # - Check for Content-length header
+      if request.headers.hasKey("Content-Length"):
+        var contentLength = 0
+        if parseInt(request.headers["Content-Length"], contentLength) == 0:
+          await request.respond(Http400, "Bad Request. Invalid Content-Length.")
+        else:
+          request.body = await client.recv(contentLength)
+          assert request.body.len == contentLength
+      else:
+        await request.respond(Http400, "Bad Request. No Content-Length.")
+        runCallback = false
+
+    case reqMethod.normalize
+    of "get", "post", "head", "put", "delete", "trace", "options", "connect", "patch":
+      if runCallback:
+        await callback(request)
+    else:
+      await request.respond(Http400, "Invalid request method. Got: " & reqMethod)
+
+    # Persistent connections
+    if (request.protocol == HttpVer11 and
+        request.headers["connection"].normalize != "close") or
+       (request.protocol == HttpVer10 and
+        request.headers["connection"].normalize == "keep-alive"):
+      # In HTTP 1.1 we assume that connection is persistent. Unless connection
+      # header states otherwise.
+      # In HTTP 1.0 we assume that the connection should not be persistent.
+      # Unless the connection header states otherwise.
+    else:
+      request.client.close()
+      break
 
 proc serve*(server: PAsyncHttpServer, port: TPort,
             callback: proc (request: TRequest): PFuture[void],
diff --git a/lib/pure/asyncnet.nim b/lib/pure/asyncnet.nim
index d16c85c58..6eb43b594 100644
--- a/lib/pure/asyncnet.nim
+++ b/lib/pure/asyncnet.nim
@@ -110,12 +110,10 @@ proc recv*(socket: PAsyncSocket, size: int,
       if socket.currPos >= socket.bufLen:
         if (flags and MSG_PEEK) == MSG_PEEK:
           # We don't want to get another buffer if we're peeking.
-          result.setLen(read)
-          return
+          break
         let res = await socket.readIntoBuf(flags and (not MSG_PEEK))
         if res == 0:
-          result.setLen(read)
-          return
+          break
 
       let chunk = min(socket.bufLen-socket.currPos, size-read)
       copyMem(addr(result[read]), addr(socket.buffer[socket.currPos]), chunk)
@@ -181,28 +179,60 @@ proc recvLine*(socket: PAsyncSocket): PFuture[string] {.async.} =
   ## If the socket is disconnected in the middle of a line (before ``\r\L``
   ## is read) then line will be set to ``""``.
   ## The partial line **will be lost**.
-  
   template addNLIfEmpty(): stmt =
     if result.len == 0:
       result.add("\c\L")
 
-  result = ""
-  var c = ""
-  while true:
-    c = await recv(socket, 1)
-    if c.len == 0:
-      return ""
-    if c == "\r":
-      c = await recv(socket, 1, MSG_PEEK)
-      if c.len > 0 and c == "\L":
-        let dummy = await recv(socket, 1)
-        assert dummy == "\L"
-      addNLIfEmpty()
-      return
-    elif c == "\L":
-      addNLIfEmpty()
-      return
-    add(result.string, c)
+  if socket.isBuffered:
+    result = ""
+    if socket.bufLen == 0:
+      let res = await socket.readIntoBuf(0)
+      if res == 0:
+        return
+
+    var lastR = false
+    while true:
+      if socket.currPos >= socket.bufLen:
+        let res = await socket.readIntoBuf(0)
+        if res == 0:
+          result = ""
+          break
+
+      case socket.buffer[socket.currPos]
+      of '\r':
+        lastR = true
+        addNLIfEmpty()
+      of '\L':
+        addNLIfEmpty()
+        socket.currPos.inc()
+        return
+      else:
+        if lastR:
+          socket.currPos.inc()
+          return
+        else:
+          result.add socket.buffer[socket.currPos]
+      socket.currPos.inc()
+  else:
+    
+
+    result = ""
+    var c = ""
+    while true:
+      c = await recv(socket, 1)
+      if c.len == 0:
+        return ""
+      if c == "\r":
+        c = await recv(socket, 1, MSG_PEEK)
+        if c.len > 0 and c == "\L":
+          let dummy = await recv(socket, 1)
+          assert dummy == "\L"
+        addNLIfEmpty()
+        return
+      elif c == "\L":
+        addNLIfEmpty()
+        return
+      add(result.string, c)
 
 proc bindAddr*(socket: PAsyncSocket, port = TPort(0), address = "") =
   ## Binds ``address``:``port`` to the socket.
diff --git a/lib/pure/selectors.nim b/lib/pure/selectors.nim
index 3af5f699c..bd53c2dbf 100644
--- a/lib/pure/selectors.nim
+++ b/lib/pure/selectors.nim
@@ -163,7 +163,7 @@ elif defined(linux):
   proc newSelector*(): PSelector =
     new result
     result.epollFD = epoll_create(64)
-    result.events = cast[array[64, epoll_event]](alloc0(sizeof(epoll_event)*64))
+    #result.events = cast[array[64, epoll_event]](alloc0(sizeof(epoll_event)*64))
     result.fds = initTable[TSocketHandle, PSelectorKey]()
     if result.epollFD < 0:
       OSError(OSLastError())