about summary refs log tree commit diff stats
path: root/src
diff options
context:
space:
mode:
authorbptato <nincsnevem662@gmail.com>2025-03-05 21:45:02 +0100
committerbptato <nincsnevem662@gmail.com>2025-03-05 21:51:31 +0100
commit747d8aeafe86421e1d6a4ee989d075be918b630c (patch)
treeae922fd9d7c72086b839c576c969491be032e94e /src
parentdd522de024f19c864a6929a19a99837002eaaea0 (diff)
downloadchawan-747d8aeafe86421e1d6a4ee989d075be918b630c.tar.gz
loader: asyncify inputhandle status responses
I've also removed the rsBeforeStatus error check from iclose.  I'm not
sure if it's still needed at all, but if it is, then it was implemented
in the wrong place.
Diffstat (limited to 'src')
-rw-r--r--src/io/packetwriter.nim21
-rw-r--r--src/server/loader.nim297
2 files changed, 175 insertions, 143 deletions
diff --git a/src/io/packetwriter.nim b/src/io/packetwriter.nim
index b6786ce0..6c99ae8a 100644
--- a/src/io/packetwriter.nim
+++ b/src/io/packetwriter.nim
@@ -13,9 +13,8 @@ import types/color
 import types/opt
 
 type PacketWriter* = object
-  stream: DynStream
-  buffer: seq[uint8]
-  bufLen: int
+  buffer*: seq[uint8]
+  bufLen*: int
   # file descriptors to send in the packet
   fds: seq[cint]
 
@@ -40,31 +39,33 @@ proc sendFd*(w: var PacketWriter; fd: cint) =
 
 const InitLen = sizeof(int) * 2
 const SizeInit = max(64, InitLen)
-proc initWriter(stream: DynStream): PacketWriter =
+proc initPacketWriter*(): PacketWriter =
   return PacketWriter(
-    stream: stream,
     buffer: newSeqUninitialized[uint8](SizeInit),
     bufLen: InitLen
   )
 
-proc flush*(w: var PacketWriter) =
+proc writeSize*(w: var PacketWriter) =
   # subtract the length field's size
   let len = [w.bufLen - InitLen, w.fds.len]
   copyMem(addr w.buffer[0], unsafeAddr len[0], sizeof(len))
-  if not w.stream.writeDataLoop(w.buffer.toOpenArray(0, w.bufLen - 1)):
+
+proc flush*(w: var PacketWriter; stream: DynStream) =
+  w.writeSize()
+  if not stream.writeDataLoop(w.buffer.toOpenArray(0, w.bufLen - 1)):
     raise newException(EOFError, "end of file")
   if w.fds.len > 0:
     w.fds.reverse()
-    let n = SocketStream(w.stream).sendMsg([0u8], w.fds)
+    let n = SocketStream(stream).sendMsg([0u8], w.fds)
     if n < 1:
       raise newException(EOFError, "end of file")
   w.bufLen = 0
   w.fds.setLen(0)
 
 template withPacketWriter*(stream: DynStream; w, body: untyped) =
-  var w = stream.initWriter()
+  var w = initPacketWriter()
   body
-  w.flush()
+  w.flush(stream)
 
 proc writeData*(w: var PacketWriter; buffer: pointer; len: int) =
   let targetLen = w.bufLen + len
diff --git a/src/server/loader.nim b/src/server/loader.nim
index fb496189..57bbb25a 100644
--- a/src/server/loader.nim
+++ b/src/server/loader.nim
@@ -1,17 +1,17 @@
 # A file loader server (?)
-# The idea here is that we receive requests with a socket, then respond to each
-# with a response (ideally a document.)
-# For now, the protocol looks like:
+# We receive various types of requests on a control socket, then respond
+# to each with a response.  In case of the "load" request, we return one
+# half of a socket pair, and then send connection information before the
+# response body so that the protocol looks like:
 # C: Request
-# S: res (0 => success, _ => error)
+# S: (packet 1) res (0 => success, _ => error)
 # if success:
-#  S: output ID
-#  S: status code
-#  S: headers
-#  C: resume
+#  S: (packet 1) output ID
+#  S: (packet 2) status code, headers
+#  C: resume (on control socket)
 #  S: response body
 # else:
-#  S: error message
+#  S: (packet 1) error message
 #
 # The body is passed to the stream as-is, so effectively nothing can follow it.
 #
@@ -19,9 +19,6 @@
 # passed, it will *not* be cleaned up until a `resume' command is
 # received. (This allows for passing outputIds to the pager for later
 # addCacheFile commands there.)
-#
-# Note 2: We also have a separate control socket that can receive
-# various messages, of which "load" is just one.
 
 import std/algorithm
 import std/deques
@@ -163,6 +160,12 @@ type
     configdir*: string
     bookmark*: string
 
+  PushBufferResult = enum
+    pbrDone, pbrUnregister
+
+proc pushBuffer(ctx: var LoaderContext; output: OutputHandle;
+  buffer: LoaderBuffer; ignoreSuspension: bool): PushBufferResult
+
 when defined(debug):
   func `$`*(buffer: LoaderBuffer): string =
     var s = newString(buffer.len)
@@ -223,15 +226,21 @@ proc tee(outputIn: OutputHandle; ostream: PosixStream; outputId, pid: int):
   return output
 
 template output(handle: InputHandle): OutputHandle =
+  assert handle.outputs.len == 1
   handle.outputs[0]
 
-proc sendResult(handle: InputHandle; res: int; msg = "") =
+template bufferFromWriter(w, body: untyped): LoaderBuffer =
+  var w = initPacketWriter()
+  body
+  w.writeSize()
+  LoaderBuffer(page: move(w.buffer), len: w.bufLen)
+
+proc sendResult(ctx: var LoaderContext; handle: InputHandle; res: int;
+    msg = ""): PushBufferResult =
   assert handle.rstate == rsBeforeResult
-  inc handle.rstate
   let output = handle.output
-  let blocking = output.stream.blocking
-  output.stream.setBlocking(true)
-  output.stream.withPacketWriter w:
+  inc handle.rstate
+  let buffer = bufferFromWriter w:
     w.swrite(res)
     if res == 0: # success
       assert msg == ""
@@ -239,20 +248,19 @@ proc sendResult(handle: InputHandle; res: int; msg = "") =
       inc handle.rstate
     else: # error
       w.swrite(msg)
-  output.stream.setBlocking(blocking)
+  return ctx.pushBuffer(output, buffer, ignoreSuspension = true)
 
-proc sendStatus(handle: InputHandle; status: uint16; headers: Headers) =
+proc sendStatus(ctx: var LoaderContext; handle: InputHandle; status: uint16;
+    headers: Headers): PushBufferResult =
   assert handle.rstate == rsBeforeStatus
   inc handle.rstate
-  let blocking = handle.output.stream.blocking
   let contentLens = headers.getOrDefault("Content-Length")
   handle.startTime = getTime()
   handle.contentLen = parseUInt64(contentLens).get(uint64.high)
-  handle.output.stream.setBlocking(true)
-  handle.output.stream.withPacketWriter w:
+  let buffer = bufferFromWriter w:
     w.swrite(status)
     w.swrite(headers)
-  handle.output.stream.setBlocking(blocking)
+  return ctx.pushBuffer(handle.output, buffer, ignoreSuspension = true)
 
 proc writeData(ps: PosixStream; buffer: LoaderBuffer; si = 0): int {.inline.} =
   assert buffer.len - si > 0
@@ -261,17 +269,6 @@ proc writeData(ps: PosixStream; buffer: LoaderBuffer; si = 0): int {.inline.} =
 proc iclose(handle: InputHandle) =
   if handle.stream != nil:
     assert not handle.registered
-    if handle.rstate == rsBeforeStatus:
-      assert handle.outputs.len == 1
-      # not an ideal solution, but better than silently eating malformed
-      # headers
-      handle.output.stream.setBlocking(true)
-      try:
-        handle.sendStatus(500, newHeaders())
-        const msg = "Error: malformed header in CGI script"
-        discard handle.output.stream.writeData(msg)
-      except EOFError:
-        discard
     handle.stream.sclose()
     handle.stream = nil
 
@@ -309,9 +306,13 @@ func canRewriteForCGICompat(ctx: LoaderContext; path: string): bool =
       return true
   return false
 
-proc rejectHandle(handle: InputHandle; code: ConnectionError; msg = "") =
-  handle.sendResult(code, msg)
-  handle.close()
+proc rejectHandle(ctx: var LoaderContext; handle: InputHandle;
+    code: ConnectionError; msg = "") =
+  case ctx.sendResult(handle, code, msg)
+  of pbrDone: discard
+  of pbrUnregister:
+    ctx.unregWrite.add(handle.output)
+    handle.output.dead = true
 
 iterator inputHandles(ctx: LoaderContext): InputHandle {.inline.} =
   for it in ctx.handleMap:
@@ -346,9 +347,6 @@ func find(cacheMap: openArray[CachedItem]; id: int): int =
       return i
   -1
 
-type PushBufferResult = enum
-  pbrDone, pbrUnregister
-
 proc register(ctx: var LoaderContext; handle: InputHandle) =
   assert not handle.registered
   ctx.pollData.register(handle.stream.fd, cshort(POLLIN))
@@ -381,37 +379,30 @@ proc unregister(ctx: var LoaderContext; client: ClientHandle) =
   ctx.pollData.unregister(int(client.stream.fd))
   client.registered = false
 
-# Either write data to the target output, or append it to the list of buffers to
-# write and register the output in our selector.
+# Either write data to the target output, or append it to the list of
+# buffers to write and register the output in our selector.
+# ignoreSuspension is meant to be used when sending the connection
+# result and headers, which are sent irrespective of whether the handle
+# is suspended or not.
 proc pushBuffer(ctx: var LoaderContext; output: OutputHandle;
-    buffer: LoaderBuffer; si: int): PushBufferResult =
-  if output.suspended:
+    buffer: LoaderBuffer; ignoreSuspension: bool): PushBufferResult =
+  if output.suspended and not ignoreSuspension:
     if output.currentBuffer == nil:
       output.currentBuffer = buffer
-      output.currentBufferIdx = si
+      output.currentBufferIdx = 0
     else:
-      # si must be 0 here in all cases. Why? Well, it indicates the first unread
-      # position after reading headers, and at that point currentBuffer will
-      # be empty.
-      #
-      # Obviously, this breaks down if anything is pushed into the stream
-      # before the header parser destroys itself. For now it never does, so we
-      # should be fine.
-      doAssert si == 0
       output.buffers.addLast(buffer)
   elif output.currentBuffer == nil:
-    var n = si
-    let m = output.stream.writeData(buffer, si)
-    if m < 0:
+    var n = output.stream.writeData(buffer)
+    if n < 0:
       let e = errno
       if e == EAGAIN or e == EWOULDBLOCK:
-        discard
+        n = 0
       else:
         assert e == EPIPE, $strerror(e)
         return pbrUnregister
     else:
-      output.bytesSent += uint64(m)
-      n += m
+      output.bytesSent += uint64(n)
     if n < buffer.len:
       output.currentBuffer = buffer
       output.currentBufferIdx = n
@@ -510,66 +501,75 @@ proc unset(ctx: var LoaderContext; handle: LoaderHandle) =
     ctx.handleMap[fd] = nil
 
 proc addFd(ctx: var LoaderContext; handle: InputHandle) =
-  let output = handle.output
-  output.stream.setBlocking(false)
   handle.stream.setBlocking(false)
   ctx.register(handle)
   ctx.put(handle)
-  ctx.put(output)
 
 type ControlResult = enum
   crDone, crContinue, crError
 
-proc handleFirstLine(handle: InputHandle; line: string): ControlResult =
+proc handleFirstLine(ctx: var LoaderContext; handle: InputHandle; line: string):
+    ControlResult =
   if line.startsWithIgnoreCase("HTTP/1.0") or
       line.startsWithIgnoreCase("HTTP/1.1"):
     let codes = line.until(' ', "HTTP/1.0 ".len)
     let code = parseUInt16(codes)
     if codes.len > 3 or code.isNone:
-      handle.sendResult(ceCGIMalformedHeader)
+      ctx.rejectHandle(handle, ceCGIMalformedHeader)
       return crError
-    handle.sendResult(0) # Success
+    case ctx.sendResult(handle, 0) # Success
+    of pbrDone: discard
+    of pbrUnregister: return crError
     handle.parser.status = code.get
     return crDone
   let k = line.until(':')
   if k.len == line.len:
     # invalid
-    handle.sendResult(ceCGIMalformedHeader)
+    ctx.rejectHandle(handle, ceCGIMalformedHeader)
     return crError
   let v = line.substr(k.len + 1).strip()
   if k.equalsIgnoreCase("Status"):
-    handle.sendResult(0) # success
+    case ctx.sendResult(handle, 0) # success
+    of pbrDone: discard
+    of pbrUnregister: return crError
     let code = parseUInt16(v)
     if v.len > 3 or code.isNone:
-      handle.sendResult(ceCGIMalformedHeader)
+      ctx.rejectHandle(handle, ceCGIMalformedHeader)
       return crError
     handle.parser.status = code.get
     return crContinue
   if k.equalsIgnoreCase("Cha-Control"):
     if v.startsWithIgnoreCase("Connected"):
-      handle.sendResult(0) # success
+      case ctx.sendResult(handle, 0) # success
+      of pbrDone: discard
+      of pbrUnregister: return crError
       return crContinue
     if v.startsWithIgnoreCase("ConnectionError"):
       let errs = v.split(' ')
-      var code = int32(ceCGIInvalidChaControl)
+      var code = ceCGIInvalidChaControl
       var message = ""
       if errs.len > 1:
         if (let x = parseInt32(errs[1]); x.isSome):
+          let n = x.get
+          if n > 0 and n <= int32(ConnectionError.high):
+            code = ConnectionError(x.get)
+        elif (let x = strictParseEnum[ConnectionError](errs[1]);
+            x.get(ceNone) != ceNone):
           code = x.get
-        elif (let x = strictParseEnum[ConnectionError](errs[1]); x.isSome):
-          code = int32(x.get)
         if errs.len > 2:
           message &= errs[2]
           for i in 3 ..< errs.len:
             message &= ' '
             message &= errs[i]
-      handle.sendResult(code, message)
+      ctx.rejectHandle(handle, code, message)
       return crError
     if v.startsWithIgnoreCase("ControlDone"):
       return crDone
-    handle.sendResult(ceCGIInvalidChaControl)
+    ctx.rejectHandle(handle, ceCGIInvalidChaControl)
     return crError
-  handle.sendResult(0) # success
+  case ctx.sendResult(handle, 0) # success
+  of pbrDone: discard
+  of pbrUnregister: return crError
   handle.parser.headers.add(k, v)
   return crDone
 
@@ -598,7 +598,8 @@ proc handleLine(handle: InputHandle; line: string) =
     let v = line.substr(k.len + 1).strip()
     handle.parser.headers.add(k, v)
 
-proc parseHeaders0(handle: InputHandle; data: openArray[char]): int =
+proc parseHeaders0(ctx: var LoaderContext; handle: InputHandle;
+    data: openArray[char]): int =
   let parser = handle.parser
   for i, c in data:
     template die =
@@ -614,13 +615,17 @@ proc parseHeaders0(handle: InputHandle; data: openArray[char]): int =
         if parser.state == hpsBeforeLines:
           # body comes immediately, so we haven't had a chance to send result
           # yet.
-          handle.sendResult(0)
-        handle.sendStatus(parser.status, parser.headers)
+          case ctx.sendResult(handle, 0)
+          of pbrDone: discard
+          of pbrUnregister: die
+        let res = ctx.sendStatus(handle, parser.status, parser.headers)
         handle.parser = nil
-        return i + 1 # +1 to skip \n
+        return case res
+        of pbrDone: i + 1 # +1 to skip \n
+        of pbrUnregister: -1
       case parser.state
       of hpsBeforeLines:
-        case handle.handleFirstLine(parser.lineBuffer)
+        case ctx.handleFirstLine(handle, parser.lineBuffer)
         of crDone: parser.state = hpsControlDone
         of crContinue: parser.state = hpsAfterFirstLine
         of crError: die
@@ -636,17 +641,14 @@ proc parseHeaders0(handle: InputHandle; data: openArray[char]): int =
       parser.lineBuffer &= c
   return data.len
 
-proc parseHeaders(handle: InputHandle; buffer: LoaderBuffer): int =
-  try:
-    if buffer == nil:
-      return handle.parseHeaders0(['\n'])
-    let p = cast[ptr UncheckedArray[char]](addr buffer.page[0])
-    return handle.parseHeaders0(p.toOpenArray(0, buffer.len - 1))
-  except EOFError:
-    handle.parser = nil
-    return -1
+proc parseHeaders(ctx: var LoaderContext; handle: InputHandle;
+    buffer: LoaderBuffer): int =
+  if buffer == nil:
+    return ctx.parseHeaders0(handle, ['\n'])
+  let p = cast[ptr UncheckedArray[char]](addr buffer.page[0])
+  return ctx.parseHeaders0(handle, p.toOpenArray(0, buffer.len - 1))
 
-proc finishParse(handle: InputHandle) =
+proc finishParse(ctx: var LoaderContext; handle: InputHandle) =
   if handle.cacheRef != nil:
     assert handle.cacheRef.offset == -1
     let ps = newPosixStream(handle.cacheRef.path, O_RDONLY, 0)
@@ -658,7 +660,7 @@ proc finishParse(handle: InputHandle) =
         if n <= 0:
           assert n == 0 or errno != EBADF
           break
-        let pn = handle.parseHeaders0(buffer.toOpenArray(0, n - 1))
+        let pn = ctx.parseHeaders0(handle, buffer.toOpenArray(0, n - 1))
         if pn == -1:
           break
         off += int64(pn)
@@ -669,7 +671,7 @@ proc finishParse(handle: InputHandle) =
       ps.sclose()
     handle.cacheRef = nil
   if handle.parser != nil:
-    discard handle.parseHeaders(nil)
+    discard ctx.parseHeaders(handle, nil)
 
 type HandleReadResult = enum
   hrrDone, hrrUnregister, hrrBrokenPipe
@@ -680,7 +682,7 @@ proc handleRead(ctx: var LoaderContext; handle: InputHandle;
   var unregs = 0
   let maxUnregs = handle.outputs.len
   while true:
-    let buffer = newLoaderBuffer()
+    var buffer = newLoaderBuffer()
     let n = handle.stream.readData(buffer.page)
     if n < 0:
       let e = errno
@@ -694,11 +696,22 @@ proc handleRead(ctx: var LoaderContext; handle: InputHandle;
     buffer.len = n
     var si = 0
     if handle.parser != nil:
-      si = handle.parseHeaders(buffer)
+      si = ctx.parseHeaders(handle, buffer)
       if si == -1: # died while parsing headers; unregister
         return hrrUnregister
       if si == n: # parsed the entire buffer as headers; skip output handling
         continue
+      if si != 0:
+        # Some parts of the buffer have been consumed as headers; others
+        # must be passed on to the client.
+        # We *could* store si as an offset to the buffer, but it would
+        # make things much more complex.  Let's just do this:
+        let nlen = buffer.len - si
+        let nbuffer = newLoaderBuffer(nlen)
+        nbuffer.len = nlen
+        copyMem(addr nbuffer.page[0], addr buffer.page[si], nbuffer.len)
+        buffer = nbuffer
+        assert nbuffer.len != 0, $si & ' ' & $buffer.len & " n " & $n
     else:
       handle.bytesSeen += uint64(n)
       #TODO stop reading if Content-Length exceeded
@@ -706,7 +719,7 @@ proc handleRead(ctx: var LoaderContext; handle: InputHandle;
       if output.dead:
         # do not push to unregWrite candidates
         continue
-      case ctx.pushBuffer(output, buffer, si)
+      case ctx.pushBuffer(output, buffer, ignoreSuspension = false)
       of pbrUnregister:
         output.dead = true
         unregWrite.add(output)
@@ -741,11 +754,9 @@ proc loadStreamRegular(ctx: var LoaderContext;
     elif cachedHandle != nil:
       output.parent = cachedHandle
       cachedHandle.outputs.add(output)
-      ctx.put(output)
     elif output.registered or output.suspended:
       output.parent = nil
       output.istreamAtEnd = true
-      ctx.put(output)
     else:
       assert output.stream.fd >= ctx.handleMap.len or
         ctx.handleMap[output.stream.fd] == nil
@@ -856,15 +867,15 @@ proc loadCGI(ctx: var LoaderContext; client: ClientHandle; handle: InputHandle;
   let cpath = ctx.parseCGIPath(request)
   if cpath.cmd == "" or cpath.basename in ["", ".", ".."] or
       cpath.basename[0] == '~':
-    handle.sendResult(ceInvalidCGIPath)
+    ctx.rejectHandle(handle, ceInvalidCGIPath)
     return
   if not fileExists(cpath.cmd):
-    handle.sendResult(ceCGIFileNotFound)
+    ctx.rejectHandle(handle, ceCGIFileNotFound)
     return
   # Pipe the response body as stdout.
   var pipefd: array[2, cint] # child -> parent
   if pipe(pipefd) == -1:
-    handle.sendResult(ceFailedToSetUpCGI)
+    ctx.rejectHandle(handle, ceFailedToSetUpCGI)
     return
   let istreamOut = newPosixStream(pipefd[0]) # read by loader
   var ostreamOut = newPosixStream(pipefd[1]) # written by child
@@ -877,7 +888,7 @@ proc loadCGI(ctx: var LoaderContext; client: ClientHandle; handle: InputHandle;
     # RDWR, otherwise mmap won't work
     ostreamOut = newPosixStream(tmpf, O_CREAT or O_RDWR, 0o600)
     if ostreamOut == nil:
-      handle.sendResult(ceCGIFailedToOpenCacheOutput)
+      ctx.rejectHandle(handle, ceCGIFailedToOpenCacheOutput)
       return
     let cacheId = handle.output.outputId # welp
     let item = CachedItem(
@@ -898,24 +909,24 @@ proc loadCGI(ctx: var LoaderContext; client: ClientHandle; handle: InputHandle;
     var n: int
     (istream, n) = client.openCachedItem(request.body.cacheId)
     if istream == nil:
-      handle.sendResult(ceCGICachedBodyNotFound)
+      ctx.rejectHandle(handle, ceCGICachedBodyNotFound)
       return
     cachedHandle = ctx.findCachedHandle(request.body.cacheId)
     if cachedHandle != nil: # cached item still open, switch to streaming mode
       if client.cacheMap[n].offset == -1:
-        handle.sendResult(ceCGICachedBodyUnavailable)
+        ctx.rejectHandle(handle, ceCGICachedBodyUnavailable)
         return
       istream2 = istream
   elif request.body.t == rbtOutput:
     outputIn = ctx.findOutput(request.body.outputId, client)
     if outputIn == nil:
-      handle.sendResult(ceCGIOutputHandleNotFound)
+      ctx.rejectHandle(handle, ceCGIOutputHandleNotFound)
       return
   if request.body.t in {rbtString, rbtMultipart, rbtOutput} or
       request.body.t == rbtCache and istream2 != nil:
     var pipefdRead: array[2, cint] # parent -> child
     if pipe(pipefdRead) == -1:
-      handle.sendResult(ceFailedToSetUpCGI)
+      ctx.rejectHandle(handle, ceFailedToSetUpCGI)
       return
     istream = newPosixStream(pipefdRead[0])
     ostream = newPosixStream(pipefdRead[1])
@@ -923,7 +934,7 @@ proc loadCGI(ctx: var LoaderContext; client: ClientHandle; handle: InputHandle;
   stderr.flushFile()
   let pid = fork()
   if pid == -1:
-    handle.sendResult(ceFailedToSetUpCGI)
+    ctx.rejectHandle(handle, ceFailedToSetUpCGI)
   elif pid == 0:
     istreamOut.sclose() # close read
     ostreamOut.moveFd(STDOUT_FILENO) # dup stdout
@@ -1001,10 +1012,14 @@ proc loadStream(ctx: var LoaderContext; client: ClientHandle;
     handle: InputHandle; request: Request) =
   let i = client.findPassedFd(request.url.pathname)
   if i == -1:
-    handle.sendResult(ceFileNotFound, "stream not found")
+    ctx.rejectHandle(handle, ceFileNotFound, "stream not found")
     return
-  handle.sendResult(0)
-  handle.sendStatus(200, newHeaders())
+  case ctx.sendResult(handle, 0)
+  of pbrDone: discard
+  of pbrUnregister: return
+  case ctx.sendStatus(handle, 200, newHeaders())
+  of pbrDone: discard
+  of pbrUnregister: return
   let ps = client.passedFdMap[i].ps
   var stats: Stat
   doAssert fstat(ps.fd, stats) != -1
@@ -1027,35 +1042,52 @@ proc loadFromCache(ctx: var LoaderContext; client: ClientHandle;
       discard ps.seek(startFrom)
     handle.stream = ps
     if ps == nil:
-      handle.rejectHandle(ceFileNotInCache)
+      ctx.rejectHandle(handle, ceFileNotInCache)
       client.cacheMap.del(n)
       return
-    handle.sendResult(0)
-    handle.sendStatus(200, newHeaders())
+    case ctx.sendResult(handle, 0)
+    of pbrDone: discard
+    of pbrUnregister:
+      client.cacheMap.del(n)
+      handle.close()
+      return
+    case ctx.sendStatus(handle, 200, newHeaders())
+    of pbrDone: discard
+    of pbrUnregister:
+      client.cacheMap.del(n)
+      handle.close()
+      return
     handle.output.stream.setBlocking(false)
     let cachedHandle = ctx.findCachedHandle(id)
     ctx.loadStreamRegular(handle, cachedHandle)
   else:
-    handle.sendResult(ceURLNotInCache)
+    ctx.rejectHandle(handle, ceURLNotInCache)
 
 # Data URL handler.
 # Moved back into loader from CGI, because data URLs can get extremely long
 # and thus no longer fit into the environment.
 proc loadDataSend(ctx: var LoaderContext; handle: InputHandle; s, ct: string) =
-  handle.sendResult(0)
-  handle.sendStatus(200, newHeaders({"Content-Type": ct}))
+  case ctx.sendResult(handle, 0)
+  of pbrDone: discard
+  of pbrUnregister:
+    handle.close()
+    return
+  case ctx.sendStatus(handle, 200, newHeaders({"Content-Type": ct}))
+  of pbrDone: discard
+  of pbrUnregister:
+    handle.close()
+    return
   let output = handle.output
   if s.len == 0:
     if output.suspended:
       output.istreamAtEnd = true
-      ctx.put(output)
     else:
       output.oclose()
     return
   let buffer = newLoaderBuffer(s.len)
   buffer.len = s.len
   copyMem(addr buffer.page[0], unsafeAddr s[0], s.len)
-  case ctx.pushBuffer(output, buffer, 0)
+  case ctx.pushBuffer(output, buffer, ignoreSuspension = false)
   of pbrUnregister:
     if output.registered:
       ctx.unregister(output)
@@ -1063,7 +1095,6 @@ proc loadDataSend(ctx: var LoaderContext; handle: InputHandle; s, ct: string) =
   of pbrDone:
     if output.registered or output.suspended:
       output.istreamAtEnd = true
-      ctx.put(output)
     else:
       output.oclose()
 
@@ -1071,15 +1102,14 @@ proc loadData(ctx: var LoaderContext; handle: InputHandle; request: Request) =
   let url = request.url
   var ct = url.pathname.until(',')
   if AllChars - Ascii + Controls - {'\t'} in ct:
-    handle.sendResult(ceInvalidURL, "invalid data URL")
-    handle.close()
+    ctx.rejectHandle(handle, ceInvalidURL, "invalid data URL")
     return
   let sd = ct.len + 1 # data start
   let body = percentDecode(url.pathname.toOpenArray(sd, url.pathname.high))
   if ct.endsWith(";base64"):
     var d: string
     if d.atob(body).isNone:
-      handle.rejectHandle(ceInvalidURL, "invalid data URL")
+      ctx.rejectHandle(handle, ceInvalidURL, "invalid data URL")
       return
     ct.setLen(ct.len - ";base64".len) # remove base64 indicator
     ctx.loadDataSend(handle, d, ct)
@@ -1185,7 +1215,7 @@ proc loadAbout(ctx: var LoaderContext; handle: InputHandle; request: Request) =
     if request.httpMethod == hmPost:
       # OK/STOP/PAUSE/RESUME clicked
       if request.body.t != rbtString:
-        handle.rejectHandle(ceInvalidURL, "wat")
+        ctx.rejectHandle(handle, ceInvalidURL, "wat")
         return
       for it in ctx.parseDownloadActions(request.body.s):
         let dl = ctx.downloadList[it.n]
@@ -1229,7 +1259,7 @@ proc loadAbout(ctx: var LoaderContext; handle: InputHandle; request: Request) =
     const body = staticRead"res/license.md"
     ctx.loadDataSend(handle, body, "text/markdown")
   else:
-    handle.rejectHandle(ceInvalidURL, "invalid download URL")
+    ctx.rejectHandle(handle, ceInvalidURL, "invalid download URL")
 
 proc loadResource(ctx: var LoaderContext; client: ClientHandle;
     config: LoaderClientConfig; request: Request; handle: InputHandle) =
@@ -1263,7 +1293,6 @@ proc loadResource(ctx: var LoaderContext; client: ClientHandle;
     of "cache":
       ctx.loadFromCache(client, handle, request)
       assert handle.stream == nil
-      handle.close()
     of "data":
       ctx.loadData(handle, request)
     of "about":
@@ -1275,11 +1304,11 @@ proc loadResource(ctx: var LoaderContext; client: ClientHandle;
         inc tries
         redo = true
       of ummrWrongURL:
-        handle.rejectHandle(ceInvalidURIMethodEntry)
+        ctx.rejectHandle(handle, ceInvalidURIMethodEntry)
       of ummrNotFound:
-        handle.rejectHandle(ceUnknownScheme)
+        ctx.rejectHandle(handle, ceUnknownScheme)
   if tries >= MaxRewrites:
-    handle.rejectHandle(ceTooManyRewrites)
+    ctx.rejectHandle(handle, ceTooManyRewrites)
 
 proc setupRequestDefaults(request: Request; config: LoaderClientConfig) =
   for k, v in config.defaultHeaders.table:
@@ -1309,12 +1338,14 @@ proc load(ctx: var LoaderContext; stream: SocketStream; request: Request;
   if not fail:
     discard close(sv[1])
     let stream = newSocketStream(sv[0])
+    stream.setBlocking(false)
     let handle = newInputHandle(stream, ctx.getOutputId(), client.pid)
+    ctx.put(handle.output)
     when defined(debug):
       handle.url = request.url
       handle.output.url = request.url
     if not config.filter.match(request.url):
-      handle.rejectHandle(ceDisallowedURL)
+      ctx.rejectHandle(handle, ceDisallowedURL)
     else:
       request.setupRequestDefaults(config)
       ctx.loadResource(client, config, request, handle)
@@ -1403,14 +1434,14 @@ proc addCacheFile(ctx: var LoaderContext; stream: SocketStream;
     w.swrite(id)
 
 proc redirectToFile(ctx: var LoaderContext; stream: SocketStream;
-    r: var PacketReader) =
+    client: ClientHandle; r: var PacketReader) =
   var outputId: int
   var targetPath: string
   var displayUrl: string
   r.sread(outputId)
   r.sread(targetPath)
   r.sread(displayUrl)
-  let output = ctx.findOutput(outputId, ctx.pagerClient)
+  let output = ctx.findOutput(outputId, client)
   var success = false
   if output != nil:
     var fileOutput: OutputHandle
@@ -1599,7 +1630,7 @@ proc readCommand(ctx: var LoaderContext; client: ClientHandle) =
         ctx.openCachedItem(stream, client, r)
       of lcRedirectToFile:
         privileged_command
-        ctx.redirectToFile(stream, r)
+        ctx.redirectToFile(stream, client, r)
       of lcLoadConfig:
         privileged_command
         ctx.loadConfig(stream, client, r)
@@ -1677,7 +1708,7 @@ proc finishCycle(ctx: var LoaderContext) =
       ctx.unregister(handle)
       ctx.unset(handle)
       if handle.parser != nil:
-        handle.finishParse()
+        ctx.finishParse(handle)
       handle.iclose()
       for output in handle.outputs:
         output.istreamAtEnd = true
@@ -1698,7 +1729,7 @@ proc finishCycle(ctx: var LoaderContext) =
           ctx.unregister(handle)
           ctx.unset(handle)
           if handle.parser != nil:
-            handle.finishParse()
+            ctx.finishParse(handle)
           handle.iclose()
   for client in ctx.unregClient:
     if client.stream != nil: