summary refs log tree commit diff stats
path: root/lib
diff options
context:
space:
mode:
authorAraq <rumpf_a@web.de>2020-11-13 13:03:14 +0100
committerAraq <rumpf_a@web.de>2020-11-13 13:03:14 +0100
commit8778d4a6f36fee5a4c31f6f8f355b8607f2a592c (patch)
treedc6deabd418ce991893793d995dc1cc72e0be724 /lib
parent778914c3ca9d6c57b0158d8bf69f92b01b0bd2e1 (diff)
downloadNim-8778d4a6f36fee5a4c31f6f8f355b8607f2a592c.tar.gz
attempt to make asynchttpserver better; fixes #15925; [backport:1.0]
Diffstat (limited to 'lib')
-rw-r--r--lib/pure/asyncdispatch.nim22
-rw-r--r--lib/pure/asynchttpserver.nim125
-rw-r--r--lib/pure/ioselects/ioselectors_epoll.nim4
-rw-r--r--lib/pure/ioselects/ioselectors_kqueue.nim8
-rw-r--r--lib/pure/ioselects/ioselectors_poll.nim4
-rw-r--r--lib/pure/ioselects/ioselectors_select.nim4
6 files changed, 117 insertions, 50 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim
index d8b274c34..0afc60864 100644
--- a/lib/pure/asyncdispatch.nim
+++ b/lib/pure/asyncdispatch.nim
@@ -1934,3 +1934,25 @@ proc waitFor*[T](fut: Future[T]): T =
     poll()
 
   fut.read
+
+proc activeDescriptors*(): int {.inline.} =
+  ## Returns the current number of active file descriptors for the current
+  ## event loop. This is a cheap operation that does not involve a system call.
+  when defined(windows):
+    result = getGlobalDispatcher().handles.len
+  else:
+    result = getGlobalDispatcher().selector.count
+
+when defined(posix):
+  import posix
+
+proc maxDescriptors*(): int {.raises: OSError.} =
+  ## Returns the maximum number of active file descriptors for the current
+  ## process. This involves a system call.
+  when defined(windows):
+    result = 16_700_000
+  else:
+    var fdLim: RLimit
+    if getrlimit(RLIMIT_NOFILE, fdLim) < 0:
+      raiseOSError(osLastError())
+    result = int(fdLim.rlim_cur) - 1
diff --git a/lib/pure/asynchttpserver.nim b/lib/pure/asynchttpserver.nim
index ec7f2a0de..74b7d17b3 100644
--- a/lib/pure/asynchttpserver.nim
+++ b/lib/pure/asynchttpserver.nim
@@ -7,28 +7,44 @@
 #    distribution, for details about the copyright.
 #
 
-## This module implements a high performance asynchronous HTTP server.
-##
-## This HTTP server has not been designed to be used in production, but
-## for testing applications locally. Because of this, when deploying your
-## application in production you should use a reverse proxy (for example nginx) 
-## instead of allowing users to connect directly to this server.
-##
-## Basic usage
-## ===========
-##
-## This example will create an HTTP server on port 8080. The server will
-## respond to all requests with a ``200 OK`` response code and "Hello World"
-## as the response body.
-##
-## .. code-block::nim
-##    import asynchttpserver, asyncdispatch
-##
-##    var server = newAsyncHttpServer()
-##    proc cb(req: Request) {.async.} =
-##      await req.respond(Http200, "Hello World")
-##
-##    waitFor server.serve(Port(8080), cb)
+##[ This module implements a high performance asynchronous HTTP server.
+
+This HTTP server has not been designed to be used in production, but
+for testing applications locally. Because of this, when deploying your
+application in production you should use a reverse proxy (for example nginx)
+instead of allowing users to connect directly to this server.
+
+Basic usage
+===========
+
+This example will create an HTTP server on port 8080. The server will
+respond to all requests with a ``200 OK`` response code and "Hello World"
+as the response body.
+
+.. code-block::nim
+   import asynchttpserver, asyncdispatch
+
+  proc main {.async.} =
+    var server = newAsyncHttpServer()
+    proc cb(req: Request) {.async.} =
+      #echo(req.reqMethod, " ", req.url)
+      #echo(req.headers)
+      let headers = {"Date": "Tue, 29 Apr 2014 23:40:08 GMT",
+          "Content-type": "text/plain; charset=utf-8"}
+      await req.respond(Http200, "Hello World", headers.newHttpHeaders())
+
+    server.listen Port(5555)
+    while true:
+      if server.shouldAcceptRequest(5):
+        var (address, client) = await server.socket.acceptAddr()
+        asyncCheck processClient(server, client, address, cb)
+      else:
+        poll()
+
+  asyncCheck main()
+  runForever()
+
+]##
 
 import asyncnet, asyncdispatch, parseutils, uri, strutils
 import httpcore
@@ -58,14 +74,12 @@ type
     reuseAddr: bool
     reusePort: bool
     maxBody: int ## The maximum content-length that will be read for the body.
+    maxFDs: int
 
 proc newAsyncHttpServer*(reuseAddr = true, reusePort = false,
                          maxBody = 8388608): AsyncHttpServer =
   ## Creates a new ``AsyncHttpServer`` instance.
-  new result
-  result.reuseAddr = reuseAddr
-  result.reusePort = reusePort
-  result.maxBody = maxBody
+  result = AsyncHttpServer(reuseAddr: reuseAddr, reusePort: reusePort, maxBody: maxBody)
 
 proc addHeaders(msg: var string, headers: HttpHeaders) =
   for k, v in headers:
@@ -279,7 +293,7 @@ proc processRequest(
     request.client.close()
     return false
 
-proc processClient(server: AsyncHttpServer, client: AsyncSocket, address: string,
+proc processClient*(server: AsyncHttpServer, client: AsyncSocket, address: string,
                    callback: proc (request: Request):
                       Future[void] {.closure, gcsafe.}) {.async.} =
   var request = newFutureVar[Request]("asynchttpserver.processClient")
@@ -294,13 +308,8 @@ proc processClient(server: AsyncHttpServer, client: AsyncSocket, address: string
     )
     if not retry: break
 
-proc serve*(server: AsyncHttpServer, port: Port,
-            callback: proc (request: Request): Future[void] {.closure, gcsafe.},
-            address = "") {.async.} =
-  ## Starts the process of listening for incoming HTTP connections on the
-  ## specified address and port.
-  ##
-  ## When a request is made by a client the specified callback will be called.
+proc listen*(server: AsyncHttpServer; port: Port; address = "") =
+  server.maxFDs = maxDescriptors()
   server.socket = newAsyncSocket()
   if server.reuseAddr:
     server.socket.setSockOpt(OptReuseAddr, true)
@@ -309,9 +318,38 @@ proc serve*(server: AsyncHttpServer, port: Port,
   server.socket.bindAddr(port, address)
   server.socket.listen()
 
+proc shouldAcceptRequest*(server: AsyncHttpServer;
+                          assumedDescriptorsPerRequest = 5): bool {.inline.} =
+  ## Returns true if the process's current number of opened file
+  ## descriptors is still within the maximum limit and so it's reasonable to
+  ## accept yet another request.
+  result = assumedDescriptorsPerRequest < 0 or
+    (activeDescriptors() + assumedDescriptorsPerRequest < server.maxFDs)
+
+proc acceptRequest*(server: AsyncHttpServer, port: Port,
+            callback: proc (request: Request): Future[void] {.closure, gcsafe.}) {.async.} =
+  ## Accepts a single request.
+  var (address, client) = await server.socket.acceptAddr()
+  asyncCheck processClient(server, client, address, callback)
+
+proc serve*(server: AsyncHttpServer, port: Port,
+            callback: proc (request: Request): Future[void] {.closure, gcsafe.},
+            address = "";
+            assumedDescriptorsPerRequest = 5) {.async.} =
+  ## Starts the process of listening for incoming HTTP connections on the
+  ## specified address and port.
+  ##
+  ## When a request is made by a client the specified callback will be called.
+  ##
+  ## If `flowControl` is true the server cares about the process's maximum
+  ## file descriptor limit.
+  listen server, port, address
   while true:
-    var (address, client) = await server.socket.acceptAddr()
-    asyncCheck processClient(server, client, address, callback)
+    if shouldAcceptRequest(server, assumedDescriptorsPerRequest):
+      var (address, client) = await server.socket.acceptAddr()
+      asyncCheck processClient(server, client, address, callback)
+    else:
+      poll()
     #echo(f.isNil)
     #echo(f.repr)
 
@@ -320,7 +358,7 @@ proc close*(server: AsyncHttpServer) =
   server.socket.close()
 
 when not defined(testing) and isMainModule:
-  proc main =
+  proc main {.async.} =
     var server = newAsyncHttpServer()
     proc cb(req: Request) {.async.} =
       #echo(req.reqMethod, " ", req.url)
@@ -329,6 +367,13 @@ when not defined(testing) and isMainModule:
           "Content-type": "text/plain; charset=utf-8"}
       await req.respond(Http200, "Hello World", headers.newHttpHeaders())
 
-    asyncCheck server.serve(Port(5555), cb)
-    runForever()
-  main()
+    server.listen Port(5555)
+    while true:
+      if server.shouldAcceptRequest(5):
+        var (address, client) = await server.socket.acceptAddr()
+        asyncCheck processClient(server, client, address, cb)
+      else:
+        poll()
+
+  asyncCheck main()
+  runForever()
diff --git a/lib/pure/ioselects/ioselectors_epoll.nim b/lib/pure/ioselects/ioselectors_epoll.nim
index 3dcf547bd..b62b4c2db 100644
--- a/lib/pure/ioselects/ioselectors_epoll.nim
+++ b/lib/pure/ioselects/ioselectors_epoll.nim
@@ -55,7 +55,7 @@ when hasThreadSupport:
       maxFD: int
       numFD: int
       fds: ptr SharedArray[SelectorKey[T]]
-      count: int
+      count*: int
     Selector*[T] = ptr SelectorImpl[T]
 else:
   type
@@ -64,7 +64,7 @@ else:
       maxFD: int
       numFD: int
       fds: seq[SelectorKey[T]]
-      count: int
+      count*: int
     Selector*[T] = ref SelectorImpl[T]
 type
   SelectEventImpl = object
diff --git a/lib/pure/ioselects/ioselectors_kqueue.nim b/lib/pure/ioselects/ioselectors_kqueue.nim
index a65be9842..68be969c7 100644
--- a/lib/pure/ioselects/ioselectors_kqueue.nim
+++ b/lib/pure/ioselects/ioselectors_kqueue.nim
@@ -30,7 +30,7 @@ when defined(macosx) or defined(freebsd) or defined(dragonfly):
   proc sysctl(name: ptr cint, namelen: cuint, oldp: pointer, oldplen: ptr csize_t,
               newp: pointer, newplen: csize_t): cint
        {.importc: "sysctl",header: """#include <sys/types.h>
-                                      #include <sys/sysctl.h>"""}
+                                      #include <sys/sysctl.h>""".}
 elif defined(netbsd) or defined(openbsd):
   # OpenBSD and NetBSD don't have KERN_MAXFILESPERPROC, so we are using
   # KERN_MAXFILES, because KERN_MAXFILES is always bigger,
@@ -39,7 +39,7 @@ elif defined(netbsd) or defined(openbsd):
   proc sysctl(name: ptr cint, namelen: cuint, oldp: pointer, oldplen: ptr csize_t,
               newp: pointer, newplen: csize_t): cint
        {.importc: "sysctl",header: """#include <sys/param.h>
-                                      #include <sys/sysctl.h>"""}
+                                      #include <sys/sysctl.h>""".}
 
 when hasThreadSupport:
   type
@@ -48,7 +48,7 @@ when hasThreadSupport:
       maxFD: int
       changes: ptr SharedArray[KEvent]
       fds: ptr SharedArray[SelectorKey[T]]
-      count: int
+      count*: int
       changesLock: Lock
       changesSize: int
       changesLength: int
@@ -61,7 +61,7 @@ else:
       maxFD: int
       changes: seq[KEvent]
       fds: seq[SelectorKey[T]]
-      count: int
+      count*: int
       sock: cint
     Selector*[T] = ref SelectorImpl[T]
 
diff --git a/lib/pure/ioselects/ioselectors_poll.nim b/lib/pure/ioselects/ioselectors_poll.nim
index 1af2a46db..00e2f3fe9 100644
--- a/lib/pure/ioselects/ioselectors_poll.nim
+++ b/lib/pure/ioselects/ioselectors_poll.nim
@@ -21,7 +21,7 @@ when hasThreadSupport:
       pollcnt: int
       fds: ptr SharedArray[SelectorKey[T]]
       pollfds: ptr SharedArray[TPollFd]
-      count: int
+      count*: int
       lock: Lock
     Selector*[T] = ptr SelectorImpl[T]
 else:
@@ -31,7 +31,7 @@ else:
       pollcnt: int
       fds: seq[SelectorKey[T]]
       pollfds: seq[TPollFd]
-      count: int
+      count*: int
     Selector*[T] = ref SelectorImpl[T]
 
 type
diff --git a/lib/pure/ioselects/ioselectors_select.nim b/lib/pure/ioselects/ioselectors_select.nim
index eed64a34d..2fd9ac0ba 100644
--- a/lib/pure/ioselects/ioselectors_select.nim
+++ b/lib/pure/ioselects/ioselectors_select.nim
@@ -58,7 +58,7 @@ when hasThreadSupport:
       eSet: FdSet
       maxFD: int
       fds: ptr SharedArray[SelectorKey[T]]
-      count: int
+      count*: int
       lock: Lock
     Selector*[T] = ptr SelectorImpl[T]
 else:
@@ -69,7 +69,7 @@ else:
       eSet: FdSet
       maxFD: int
       fds: seq[SelectorKey[T]]
-      count: int
+      count*: int
     Selector*[T] = ref SelectorImpl[T]
 
 type