about summary refs log tree commit diff stats
path: root/src/loader/loader.nim
diff options
context:
space:
mode:
Diffstat (limited to 'src/loader/loader.nim')
-rw-r--r--src/loader/loader.nim93
1 files changed, 67 insertions, 26 deletions
diff --git a/src/loader/loader.nim b/src/loader/loader.nim
index 02585630..749f7d29 100644
--- a/src/loader/loader.nim
+++ b/src/loader/loader.nim
@@ -139,7 +139,9 @@ proc loadResource(ctx: LoaderContext, request: Request, handle: LoaderHandle) =
         handle.close()
       else:
         let fd = handle.istream.fd
+        handle.setBlocking(false)
         ctx.selector.registerHandle(fd, {Read}, 0)
+        ctx.selector.registerHandle(handle.fd, {Write}, 0)
         let ofl = fcntl(fd, F_GETFL, 0)
         discard fcntl(fd, F_SETFL, ofl or O_NONBLOCK)
         # yes, this puts the istream fd in addition to the ostream fd in
@@ -164,7 +166,7 @@ proc loadResource(ctx: LoaderContext, request: Request, handle: LoaderHandle) =
 proc onLoad(ctx: LoaderContext, stream: SocketStream) =
   var request: Request
   stream.sread(request)
-  let handle = newLoaderHandle(stream, request.canredir)
+  let handle = newLoaderHandle(stream, request.canredir, request.url)
   if not ctx.config.filter.match(request.url):
     handle.sendResult(ERROR_DISALLOWED_URL)
     handle.close()
@@ -188,9 +190,6 @@ proc onLoad(ctx: LoaderContext, stream: SocketStream) =
     ctx.loadResource(request, handle)
 
 proc acceptConnection(ctx: LoaderContext) =
-  #TODO TODO TODO acceptSocketStream should be non-blocking here,
-  # otherwise the client disconnecting between poll and accept could
-  # block this indefinitely.
   let stream = ctx.ssock.acceptSocketStream()
   try:
     var cmd: LoaderCommand
@@ -250,7 +249,7 @@ proc initLoaderContext(fd: cint, config: LoaderConfig): LoaderContext =
   gctx = ctx
   #TODO ideally, buffered would be true. Unfortunately this conflicts with
   # sendFileHandle/recvFileHandle.
-  ctx.ssock = initServerSocket(buffered = false)
+  ctx.ssock = initServerSocket(buffered = false, blocking = false)
   ctx.fd = int(ctx.ssock.sock.getFd())
   ctx.selector.registerHandle(ctx.fd, {Read}, 0)
   # The server has been initialized, so the main process can resume execution.
@@ -271,41 +270,81 @@ proc initLoaderContext(fd: cint, config: LoaderConfig): LoaderContext =
 
 proc runFileLoader*(fd: cint, config: LoaderConfig) =
   var ctx = initLoaderContext(fd, config)
-  var buffer {.noinit.}: array[16384, uint8]
   while ctx.alive:
     let events = ctx.selector.select(-1)
-    var unreg: seq[int]
+    var unregRead: seq[LoaderHandle]
+    var unregWrite: seq[LoaderHandle]
     for event in events:
       if Read in event.events:
         if event.fd == ctx.fd: # incoming connection
           ctx.acceptConnection()
         else:
           let handle = ctx.handleMap[event.fd]
-          while not handle.istream.atEnd:
+          assert event.fd != handle.fd
+          while true:
             try:
-              let n = handle.istream.readData(addr buffer[0], buffer.len)
-              handle.sendData(addr buffer[0], n)
+              let buffer = newLoaderBuffer()
+              buffer.len = handle.istream.readData(addr buffer[0], buffer.cap)
+              if buffer.len == 0:
+                dealloc(buffer)
+                break
+              handle.addBuffer(buffer)
+              if buffer.len < buffer.cap:
+                break
             except ErrorAgain, ErrorWouldBlock: # retry later
               break
-            except ErrorBrokenPipe: # receiver died; stop streaming
-              unreg.add(event.fd)
+            except ErrorBrokenPipe: # sender died; stop streaming
+              unregRead.add(handle)
               break
+      if Write in event.events:
+        let handle = ctx.handleMap[event.fd]
+        assert event.fd == handle.fd
+        while handle.currentBuffer != nil:
+          let buffer = handle.currentBuffer
+          try:
+            let i = handle.currentBufferIdx
+            assert buffer.len - i > 0
+            let n = handle.sendData(addr buffer[i], buffer.len - i)
+            handle.currentBufferIdx += n
+            if handle.currentBufferIdx < buffer.len:
+              break
+            handle.bufferCleared() # swap out buffer
+          except ErrorAgain, ErrorWouldBlock: # never mind
+            break
+          except ErrorBrokenPipe: # receiver died; stop streaming
+            unregWrite.add(handle)
+            break
+        if handle.istream == nil and handle.currentBuffer == nil and
+            (unregWrite.len == 0 or unregWrite[^1] != handle):
+          # after EOF, but not appended in this send cycle
+          unregWrite.add(handle)
       if Error in event.events:
         assert event.fd != ctx.fd
-        when defined(debug):
-          # sanity check
-          let handle = ctx.handleMap[event.fd]
-          if not handle.istream.atEnd():
-            let n = handle.istream.readData(addr buffer[0], buffer.len)
-            assert n == 0
-            assert handle.istream.atEnd()
-        unreg.add(event.fd)
-    for fd in unreg:
-      ctx.selector.unregister(fd)
-      let handle = ctx.handleMap[fd]
-      ctx.handleMap.del(fd)
-      ctx.handleMap.del(handle.getFd())
-      handle.close()
+        let handle = ctx.handleMap[event.fd]
+        if handle.fd == event.fd: # ostream died
+          unregWrite.add(handle)
+        else: # istream died
+          unregRead.add(handle)
+    for handle in unregRead:
+      ctx.selector.unregister(handle.istream.fd)
+      ctx.handleMap.del(handle.istream.fd)
+      handle.istream.close()
+      handle.istream = nil
+      if handle.currentBuffer == nil:
+        unregWrite.add(handle)
+      #TODO TODO TODO what to do about sostream
+    for handle in unregWrite:
+      ctx.selector.unregister(handle.fd)
+      ctx.handleMap.del(handle.fd)
+      handle.ostream.close()
+      handle.ostream = nil
+      if handle.istream != nil:
+        handle.istream.close()
+        ctx.handleMap.del(handle.istream.fd)
+        ctx.selector.unregister(handle.istream.fd)
+        handle.istream.close()
+        handle.istream = nil
+      #TODO TODO TODO what to do about sostream
   ctx.exitLoader()
 
 proc getAttribute(contentType, attrname: string): string =
@@ -478,6 +517,8 @@ proc onRead*(loader: FileLoader, fd: int) =
         buffer[].buf.setLen(olen + BufferSize)
         let n = response.body.readData(addr buffer[].buf[olen], BufferSize)
         buffer[].buf.setLen(olen + n)
+        if n == 0:
+          break
       except ErrorAgain, ErrorWouldBlock:
         break
     if response.body.atEnd():