about summary refs log tree commit diff stats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/io/packetwriter.nim21
-rw-r--r--src/server/loader.nim297
2 files changed, 175 insertions, 143 deletions
diff --git a/src/io/packetwriter.nim b/src/io/packetwriter.nim
index b6786ce0..6c99ae8a 100644
--- a/src/io/packetwriter.nim
+++ b/src/io/packetwriter.nim
@@ -13,9 +13,8 @@ import types/color
 import types/opt
 
 type PacketWriter* = object
-  stream: DynStream
-  buffer: seq[uint8]
-  bufLen: int
+  buffer*: seq[uint8]
+  bufLen*: int
   # file descriptors to send in the packet
   fds: seq[cint]
 
@@ -40,31 +39,33 @@ proc sendFd*(w: var PacketWriter; fd: cint) =
 
 const InitLen = sizeof(int) * 2
 const SizeInit = max(64, InitLen)
-proc initWriter(stream: DynStream): PacketWriter =
+proc initPacketWriter*(): PacketWriter =
   return PacketWriter(
-    stream: stream,
     buffer: newSeqUninitialized[uint8](SizeInit),
     bufLen: InitLen
   )
 
-proc flush*(w: var PacketWriter) =
+proc writeSize*(w: var PacketWriter) =
   # subtract the length field's size
   let len = [w.bufLen - InitLen, w.fds.len]
   copyMem(addr w.buffer[0], unsafeAddr len[0], sizeof(len))
-  if not w.stream.writeDataLoop(w.buffer.toOpenArray(0, w.bufLen - 1)):
+
+proc flush*(w: var PacketWriter; stream: DynStream) =
+  w.writeSize()
+  if not stream.writeDataLoop(w.buffer.toOpenArray(0, w.bufLen - 1)):
     raise newException(EOFError, "end of file")
   if w.fds.len > 0:
     w.fds.reverse()
-    let n = SocketStream(w.stream).sendMsg([0u8], w.fds)
+    let n = SocketStream(stream).sendMsg([0u8], w.fds)
     if n < 1:
       raise newException(EOFError, "end of file")
   w.bufLen = 0
   w.fds.setLen(0)
 
 template withPacketWriter*(stream: DynStream; w, body: untyped) =
-  var w = stream.initWriter()
+  var w = initPacketWriter()
   body
-  w.flush()
+  w.flush(stream)
 
 proc writeData*(w: var PacketWriter; buffer: pointer; len: int) =
   let targetLen = w.bufLen + len
diff --git a/src/server/loader.nim b/src/server/loader.nim
index fb496189..57bbb25a 100644
--- a/src/server/loader.nim
+++ b/src/server/loader.nim
@@ -1,17 +1,17 @@
 # A file loader server (?)
-# The idea here is that we receive requests with a socket, then respond to each
-# with a response (ideally a document.)
-# For now, the protocol looks like:
+# We receive various types of requests on a control socket, then respond
+# to each with a response.  In case of the "load" request, we return one
+# half of a socket pair, and then send connection information before the
+# response body so that the protocol looks like:
 # C: Request
-# S: res (0 => success, _ => error)
+# S: (packet 1) res (0 => success, _ => error)
 # if success:
-#  S: output ID
-#  S: status code
-#  S: headers
-#  C: resume
+#  S: (packet 1) output ID
+#  S: (packet 2) status code, headers
+#  C: resume (on control socket)
 #  S: response body
 # else:
-#  S: error message
+#  S: (packet 1) error message
 #
 # The body is passed to the stream as-is, so effectively nothing can follow it.
 #
@@ -19,9 +19,6 @@
 # passed, it will *not* be cleaned up until a `resume' command is
 # received. (This allows for passing outputIds to the pager for later
 # addCacheFile commands there.)
-#
-# Note 2: We also have a separate control socket that can receive
-# various messages, of which "load" is just one.
 
 import std/algorithm
 import std/deques
@@ -163,6 +160,12 @@ type
     configdir*: string
     bookmark*: string
 
+  PushBufferResult = enum
+    pbrDone, pbrUnregister
+
+proc pushBuffer(ctx: var LoaderContext; output: OutputHandle;
+  buffer: LoaderBuffer; ignoreSuspension: bool): PushBufferResult
+
 when defined(debug):
   func `$`*(buffer: LoaderBuffer): string =
     var s = newString(buffer.len)
@@ -223,15 +226,21 @@ proc tee(outputIn: OutputHandle; ostream: PosixStream; outputId, pid: int):
   return output
 
 template output(handle: InputHandle): OutputHandle =
+  assert handle.outputs.len == 1
   handle.outputs[0]
 
-proc sendResult(handle: InputHandle; res: int; msg = "") =
+template bufferFromWriter(w, body: untyped): LoaderBuffer =
+  var w = initPacketWriter()
+  body
+  w.writeSize()
+  LoaderBuffer(page: move(w.buffer), len: w.bufLen)
+
+proc sendResult(ctx: var LoaderContext; handle: InputHandle; res: int;
+    msg = ""): PushBufferResult =
   assert handle.rstate == rsBeforeResult
-  inc handle.rstate
   let output = handle.output
-  let blocking = output.stream.blocking
-  output.stream.setBlocking(true)
-  output.stream.withPacketWriter w:
+  inc handle.rstate
+  let buffer = bufferFromWriter w:
     w.swrite(res)
     if res == 0: # success
       assert msg == ""
@@ -239,20 +248,19 @@ proc sendResult(handle: InputHandle; res: int; msg = "") =
       inc handle.rstate
     else: # error
       w.swrite(msg)
-  output.stream.setBlocking(blocking)
+  return ctx.pushBuffer(output, buffer, ignoreSuspension = true)
 
-proc sendStatus(handle: InputHandle; status: uint16; headers: Headers) =
+proc sendStatus(ctx: var LoaderContext; handle: InputHandle; status: uint16;
+    headers: Headers): PushBufferResult =
   assert handle.rstate == rsBeforeStatus
   inc handle.rstate
-  let blocking = handle.output.stream.blocking
   let contentLens = headers.getOrDefault("Content-Length")
   handle.startTime = getTime()
   handle.contentLen = parseUInt64(contentLens).get(uint64.high)
-  handle.output.stream.setBlocking(true)
-  handle.output.stream.withPacketWriter w:
+  let buffer = bufferFromWriter w:
     w.swrite(status)
     w.swrite(headers)
-  handle.output.stream.setBlocking(blocking)
+  return ctx.pushBuffer(handle.output, buffer, ignoreSuspension = true)
 
 proc writeData(ps: PosixStream; buffer: LoaderBuffer; si = 0): int {.inline.} =
   assert buffer.len - si > 0
@@ -261,17 +269,6 @@ proc writeData(ps: PosixStream; buffer: LoaderBuffer; si = 0): int {.inline.} =
 proc iclose(handle: InputHandle) =
   if handle.stream != nil:
     assert not handle.registered
-    if handle.rstate == rsBeforeStatus:
-      assert handle.outputs.len == 1
-      # not an ideal solution, but better than silently eating malformed
-      # headers
-      handle.output.stream.setBlocking(true)
-      try:
-        handle.sendStatus(500, newHeaders())
-        const msg = "Error: malformed header in CGI script"
-        discard handle.output.stream.writeData(msg)
-      except EOFError:
-        discard
     handle.stream.sclose()
     handle.stream = nil
 
@@ -309,9 +306,13 @@ func canRewriteForCGICompat(ctx: LoaderContext; path: string): bool =
       return true
   return false
 
-proc rejectHandle(handle: InputHandle; code: ConnectionError; msg = "") =
-  handle.sendResult(code, msg)
-  handle.close()
+proc rejectHandle(ctx: var LoaderContext; handle: InputHandle;
+    code: ConnectionError; msg = "") =
+  case ctx.sendResult(handle, code, msg)
+  of pbrDone: discard
+  of pbrUnregister:
+    ctx.unregWrite.add(handle.output)
+    handle.output.dead = true
 
 iterator inputHandles(ctx: LoaderContext): InputHandle {.inline.} =
   for it in ctx.handleMap:
@@ -346,9 +347,6 @@ func find(cacheMap: openArray[CachedItem]; id: int): int =
       return i
   -1
 
-type PushBufferResult = enum
-  pbrDone, pbrUnregister
-
 proc register(ctx: var LoaderContext; handle: InputHandle) =
   assert not handle.registered
   ctx.pollData.register(handle.stream.fd, cshort(POLLIN))
@@ -381,37 +379,30 @@ proc unregister(ctx: var LoaderContext; client: ClientHandle) =
   ctx.pollData.unregister(int(client.stream.fd))
   client.registered = false
 
-# Either write data to the target output, or append it to the list of buffers to
-# write and register the output in our selector.
+# Either write data to the target output, or append it to the list of
+# buffers to write and register the output in our selector.
+# ignoreSuspension is meant to be used when sending the connection
+# result and headers, which are sent irrespective of whether the handle
+# is suspended or not.
 proc pushBuffer(ctx: var LoaderContext; output: OutputHandle;
-    buffer: LoaderBuffer; si: int): PushBufferResult =
-  if output.suspended:
+    buffer: LoaderBuffer; ignoreSuspension: bool): PushBufferResult =
+  if output.suspended and not ignoreSuspension:
     if output.currentBuffer == nil:
       output.currentBuffer = buffer
-      output.currentBufferIdx = si
+      output.currentBufferIdx = 0
     else:
-      # si must be 0 here in all cases. Why? Well, it indicates the first unread
-      # position after reading headers, and at that point currentBuffer will
-      # be empty.
-      #
-      # Obviously, this breaks down if anything is pushed into the stream
-      # before the header parser destroys itself. For now it never does, so we
-      # should be fine.
-      doAssert si == 0
       output.buffers.addLast(buffer)
   elif output.currentBuffer == nil:
-    var n = si
-    let m = output.stream.writeData(buffer, si)
-    if m < 0:
+    var n = output.stream.writeData(buffer)
+    if n < 0:
       let e = errno
       if e == EAGAIN or e == EWOULDBLOCK:
-        discard
+        n = 0
       else:
         assert e == EPIPE, $strerror(e)
         return pbrUnregister
     else:
-      output.bytesSent += uint64(m)
-      n += m
+      output.bytesSent += uint64(n)
     if n < buffer.len:
       output.currentBuffer = buffer
       output.currentBufferIdx = n
@@ -510,66 +501,75 @@ proc unset(ctx: var LoaderContext; handle: LoaderHandle) =
     ctx.handleMap[fd] = nil
 
 proc addFd(ctx: var LoaderContext; handle: InputHandle) =
-  let output = handle.output
-  output.stream.setBlocking(false)
   handle.stream.setBlocking(false)
   ctx.register(handle)
   ctx.put(handle)
-  ctx.put(output)
 
 type ControlResult = enum
   crDone, crContinue, crError
 
-proc handleFirstLine(handle: InputHandle; line: string): ControlResult =
+proc handleFirstLine(ctx: var LoaderContext; handle: InputHandle; line: string):
+    ControlResult =
   if line.startsWithIgnoreCase("HTTP/1.0") or
       line.startsWithIgnoreCase("HTTP/1.1"):
     let codes = line.until(' ', "HTTP/1.0 ".len)
     let code = parseUInt16(codes)
     if codes.len > 3 or code.isNone:
-      handle.sendResult(ceCGIMalformedHeader)
+      ctx.rejectHandle(handle, ceCGIMalformedHeader)
       return crError
-    handle.sendResult(0) # Success
+    case ctx.sendResult(handle, 0) # Success
+    of pbrDone: discard
+    of pbrUnregister: return crError
     handle.parser.status = code.get
     return crDone
   let k = line.until(':')
   if k.len == line.len:
     # invalid
-    handle.sendResult(ceCGIMalformedHeader)
+    ctx.rejectHandle(handle, ceCGIMalformedHeader)
     return crError
   let v = line.substr(k.len + 1).strip()
   if k.equalsIgnoreCase("Status"):
-    handle.sendResult(0) # success
+    case ctx.sendResult(handle, 0) # success
+    of pbrDone: discard
+    of pbrUnregister: return crError
     let code = parseUInt16(v)
     if v.len > 3 or code.isNone:
-      handle.sendResult(ceCGIMalformedHeader)
+      ctx.rejectHandle(handle, ceCGIMalformedHeader)
       return crError
     handle.parser.status = code.get
     return crContinue
   if k.equalsIgnoreCase("Cha-Control"):
     if v.startsWithIgnoreCase("Connected"):
-      handle.sendResult(0) # success
+      case ctx.sendResult(handle, 0) # success
+      of pbrDone: discard
+      of pbrUnregister: return crError
       return crContinue
     if v.startsWithIgnoreCase("ConnectionError"):
       let errs = v.split(' ')
-      var code = int32(ceCGIInvalidChaControl)
+      var code = ceCGIInvalidChaControl
       var message = ""
       if errs.len > 1:
         if (let x = parseInt32(errs[1]); x.isSome):
+          let n = x.get
+          if n > 0 and n <= int32(ConnectionError.high):
+            code = ConnectionError(x.get)
+        elif (let x = strictParseEnum[ConnectionError](errs[1]);
+            x.get(ceNone) != ceNone):
           code = x.get
-        elif (let x = strictParseEnum[ConnectionError](errs[1]); x.isSome):
-          code = int32(x.get)
         if errs.len > 2:
           message &= errs[2]
           for i in 3 ..< errs.len:
             message &= ' '
             message &= errs[i]
-      handle.sendResult(code, message)
+      ctx.rejectHandle(handle, code, message)
       return crError
     if v.startsWithIgnoreCase("ControlDone"):
       return crDone
-    handle.sendResult(ceCGIInvalidChaControl)
+    ctx.rejectHandle(handle, ceCGIInvalidChaControl)
     return crError
-  handle.sendResult(0) # success
+  case ctx.sendResult(handle, 0) # success
+  of pbrDone: discard
+  of pbrUnregister: return crError
   handle.parser.headers.add(k, v)
   return crDone
 
@@ -598,7 +598,8 @@ proc handleLine(handle: InputHandle; line: string) =
     let v = line.substr(k.len + 1).strip()
     handle.parser.headers.add(k, v)
 
-proc parseHeaders0(handle: InputHandle; data: openArray[char]): int =
+proc parseHeaders0(ctx: var LoaderContext; handle: InputHandle;
+    data: openArray[char]): int =
   let parser = handle.parser
   for i, c in data:
     template die =
@@ -614,13 +615,17 @@ proc parseHeaders0(handle: InputHandle; data: openArray[char]): int =
         if parser.state == hpsBeforeLines:
           # body comes immediately, so we haven't had a chance to send result
           # yet.
-          handle.sendResult(0)
-        handle.sendStatus(parser.status, parser.headers)
+          case ctx.sendResult(handle, 0)
+          of pbrDone: discard
+          of pbrUnregister: die
+        let res = ctx.sendStatus(handle, parser.status, parser.headers)
         handle.parser = nil
-        return i + 1 # +1 to skip \n
+        return case res
+        of pbrDone: i + 1 # +1 to skip \n
+        of pbrUnregister: -1
       case parser.state
       of hpsBeforeLines:
-        case handle.handleFirstLine(parser.lineBuffer)
+        case ctx.handleFirstLine(handle, parser.lineBuffer)
         of crDone: parser.state = hpsControlDone
         of crContinue: parser.state = hpsAfterFirstLine
         of crError: die
@@ -636,17 +641,14 @@ proc parseHeaders0(handle: InputHandle; data: openArray[char]): int =
       parser.lineBuffer &= c
   return data.len
 
-proc parseHeaders(handle: InputHandle; buffer: LoaderBuffer): int =
-  try:
-    if buffer == nil:
-      return handle.parseHeaders0(['\n'])
-    let p = cast[ptr UncheckedArray[char]](addr buffer.page[0])
-    return handle.parseHeaders0(p.toOpenArray(0, buffer.len - 1))
-  except EOFError:
-    handle.parser = nil
-    return -1
+proc parseHeaders(ctx: var LoaderContext; handle: InputHandle;
+    buffer: LoaderBuffer): int =
+  if buffer == nil:
+    return ctx.parseHeaders0(handle, ['\n'])
+  let p = cast[ptr UncheckedArray[char]](addr buffer.page[0])
+  return ctx.parseHeaders0(handle, p.toOpenArray(0, buffer.len - 1))
 
-proc finishParse(handle: InputHandle) =
+proc finishParse(ctx: var LoaderContext; handle: InputHandle) =
   if handle.cacheRef != nil:
     assert handle.cacheRef.offset == -1
     let ps = newPosixStream(handle.cacheRef.path, O_RDONLY, 0)
@@ -658,7 +660,7 @@ proc finishParse(handle: InputHandle) =
         if n <= 0:
           assert n == 0 or errno != EBADF
           break
-        let pn = handle.parseHeaders0(buffer.toOpenArray(0, n - 1))
+        let pn = ctx.parseHeaders0(handle, buffer.toOpenArray(0, n - 1))
         if pn == -1:
           break
         off += int64(pn)
@@ -669,7 +671,7 @@ proc finishParse(handle: InputHandle) =
       ps.sclose()
     handle.cacheRef = nil
   if handle.parser != nil:
-    discard handle.parseHeaders(nil)
+    discard ctx.parseHeaders(handle, nil)
 
 type HandleReadResult = enum
   hrrDone, hrrUnregister, hrrBrokenPipe
@@ -680,7 +682,7 @@ proc handleRead(ctx: var LoaderContext; handle: InputHandle;
   var unregs = 0
   let maxUnregs = handle.outputs.len
   while true:
-    let buffer = newLoaderBuffer()
+    var buffer = newLoaderBuffer()
     let n = handle.stream.readData(buffer.page)
     if n < 0:
       let e = errno
@@ -694,11 +696,22 @@ proc handleRead(ctx: var LoaderContext; handle: InputHandle;
     buffer.len = n
     var si = 0
     if handle.parser != nil:
-      si = handle.parseHeaders(buffer)
+      si = ctx.parseHeaders(handle, buffer)
       if si == -1: # died while parsing headers; unregister
         return hrrUnregister
       if si == n: # parsed the entire buffer as headers; skip output handling
         continue
+      if si != 0:
+        # Some parts of the buffer have been consumed as headers; others
+        # must be passed on to the client.
+        # We *could* store si as an offset to the buffer, but it would
+        # make things much more complex.  Let's just do this:
+        let nlen = buffer.len - si
+        let nbuffer = newLoaderBuffer(nlen)
+        nbuffer.len = nlen
+        copyMem(addr nbuffer.page[0], addr buffer.page[si], nbuffer.len)
+        buffer = nbuffer
+        assert nbuffer.len != 0, $si & ' ' & $buffer.len & " n " & $n
     else:
       handle.bytesSeen += uint64(n)
       #TODO stop reading if Content-Length exceeded
@@ -706,7 +719,7 @@ proc handleRead(ctx: var LoaderContext; handle: InputHandle;
       if output.dead:
         # do not push to unregWrite candidates
         continue
-      case ctx.pushBuffer(output, buffer, si)
+      case ctx.pushBuffer(output, buffer, ignoreSuspension = false)
       of pbrUnregister:
         output.dead = true
         unregWrite.add(output)
@@ -741,11 +754,9 @@ proc loadStreamRegular(ctx: var LoaderContext;
     elif cachedHandle != nil:
       output.parent = cachedHandle
       cachedHandle.outputs.add(output)
-      ctx.put(output)
     elif output.registered or output.suspended:
       output.parent = nil
       output.istreamAtEnd = true
-      ctx.put(output)
     else:
       assert output.stream.fd >= ctx.handleMap.len or
         ctx.handleMap[output.stream.fd] == nil
@@ -856,15 +867,15 @@ proc loadCGI(ctx: var LoaderContext; client: ClientHandle; handle: InputHandle;
   let cpath = ctx.parseCGIPath(request)
   if cpath.cmd == "" or cpath.basename in ["", ".", ".."] or
       cpath.basename[0] == '~':
-    handle.sendResult(ceInvalidCGIPath)
+    ctx.rejectHandle(handle, ceInvalidCGIPath)
     return
   if not fileExists(cpath.cmd):
-    handle.sendResult(ceCGIFileNotFound)
+    ctx.rejectHandle(handle, ceCGIFileNotFound)
     return
   # Pipe the response body as stdout.
   var pipefd: array[2, cint] # child -> parent
   if pipe(pipefd) == -1:
-    handle.sendResult(ceFailedToSetUpCGI)
+    ctx.rejectHandle(handle, ceFailedToSetUpCGI)
     return
   let istreamOut = newPosixStream(pipefd[0]) # read by loader
   var ostreamOut = newPosixStream(pipefd[1]) # written by child
@@ -877,7 +888,7 @@ proc loadCGI(ctx: var LoaderContext; client: ClientHandle; handle: InputHandle;
     # RDWR, otherwise mmap won't work
     ostreamOut = newPosixStream(tmpf, O_CREAT or O_RDWR, 0o600)
     if ostreamOut == nil:
-      handle.sendResult(ceCGIFailedToOpenCacheOutput)
+      ctx.rejectHandle(handle, ceCGIFailedToOpenCacheOutput)
       return
     let cacheId = handle.output.outputId # welp
     let item = CachedItem(
@@ -898,24 +909,24 @@ proc loadCGI(ctx: var LoaderContext; client: ClientHandle; handle: InputHandle;
     var n: int
     (istream, n) = client.openCachedItem(request.body.cacheId)
     if istream == nil:
-      handle.sendResult(ceCGICachedBodyNotFound)
+      ctx.rejectHandle(handle, ceCGICachedBodyNotFound)
       return
     cachedHandle = ctx.findCachedHandle(request.body.cacheId)
     if cachedHandle != nil: # cached item still open, switch to streaming mode
       if client.cacheMap[n].offset == -1:
-        handle.sendResult(ceCGICachedBodyUnavailable)
+        ctx.rejectHandle(handle, ceCGICachedBodyUnavailable)
         return
       istream2 = istream
   elif request.body.t == rbtOutput:
     outputIn = ctx.findOutput(request.body.outputId, client)
     if outputIn == nil:
-      handle.sendResult(ceCGIOutputHandleNotFound)
+      ctx.rejectHandle(handle, ceCGIOutputHandleNotFound)
       return
   if request.body.t in {rbtString, rbtMultipart, rbtOutput} or
       request.body.t == rbtCache and istream2 != nil:
     var pipefdRead: array[2, cint] # parent -> child
     if pipe(pipefdRead) == -1:
-      handle.sendResult(ceFailedToSetUpCGI)
+      ctx.rejectHandle(handle, ceFailedToSetUpCGI)
       return
     istream = newPosixStream(pipefdRead[0])
     ostream = newPosixStream(pipefdRead[1])
@@ -923,7 +934,7 @@ proc loadCGI(ctx: var LoaderContext; client: ClientHandle; handle: InputHandle;
   stderr.flushFile()
   let pid = fork()
   if pid == -1:
-    handle.sendResult(ceFailedToSetUpCGI)
+    ctx.rejectHandle(handle, ceFailedToSetUpCGI)
   elif pid == 0:
     istreamOut.sclose() # close read
     ostreamOut.moveFd(STDOUT_FILENO) # dup stdout
@@ -1001,10 +1012,14 @@ proc loadStream(ctx: var LoaderContext; client: ClientHandle;
     handle: InputHandle; request: Request) =
   let i = client.findPassedFd(request.url.pathname)
   if i == -1:
-    handle.sendResult(ceFileNotFound, "stream not found")
+    ctx.rejectHandle(handle, ceFileNotFound, "stream not found")
     return
-  handle.sendResult(0)
-  handle.sendStatus(200, newHeaders())
+  case ctx.sendResult(handle, 0)
+  of pbrDone: discard
+  of pbrUnregister: return
+  case ctx.sendStatus(handle, 200, newHeaders())
+  of pbrDone: discard
+  of pbrUnregister: return
   let ps = client.passedFdMap[i].ps
   var stats: Stat
   doAssert fstat(ps.fd, stats) != -1
@@ -1027,35 +1042,52 @@ proc loadFromCache(ctx: var LoaderContext; client: ClientHandle;
       discard ps.seek(startFrom)
     handle.stream = ps
     if ps == nil:
-      handle.rejectHandle(ceFileNotInCache)
+      ctx.rejectHandle(handle, ceFileNotInCache)
       client.cacheMap.del(n)
       return
-    handle.sendResult(0)
-    handle.sendStatus(200, newHeaders())
+    case ctx.sendResult(handle, 0)
+    of pbrDone: discard
+    of pbrUnregister:
+      client.cacheMap.del(n)
+      handle.close()
+      return
+    case ctx.sendStatus(handle, 200, newHeaders())
+    of pbrDone: discard
+    of pbrUnregister:
+      client.cacheMap.del(n)
+      handle.close()
+      return
     handle.output.stream.setBlocking(false)
     let cachedHandle = ctx.findCachedHandle(id)
     ctx.loadStreamRegular(handle, cachedHandle)
   else:
-    handle.sendResult(ceURLNotInCache)
+    ctx.rejectHandle(handle, ceURLNotInCache)
 
 # Data URL handler.
 # Moved back into loader from CGI, because data URLs can get extremely long
 # and thus no longer fit into the environment.
 proc loadDataSend(ctx: var LoaderContext; handle: InputHandle; s, ct: string) =
-  handle.sendResult(0)
-  handle.sendStatus(200, newHeaders({"Content-Type": ct}))
+  case ctx.sendResult(handle, 0)
+  of pbrDone: discard
+  of pbrUnregister:
+    handle.close()
+    return
+  case ctx.sendStatus(handle, 200, newHeaders({"Content-Type": ct}))
+  of pbrDone: discard
+  of pbrUnregister:
+    handle.close()
+    return
   let output = handle.output
   if s.len == 0:
     if output.suspended:
       output.istreamAtEnd = true
-      ctx.put(output)
     else:
       output.oclose()
     return
   let buffer = newLoaderBuffer(s.len)
   buffer.len = s.len
   copyMem(addr buffer.page[0], unsafeAddr s[0], s.len)
-  case ctx.pushBuffer(output, buffer, 0)
+  case ctx.pushBuffer(output, buffer, ignoreSuspension = false)
   of pbrUnregister:
     if output.registered:
       ctx.unregister(output)
@@ -1063,7 +1095,6 @@ proc loadDataSend(ctx: var LoaderContext; handle: InputHandle; s, ct: string) =
   of pbrDone:
     if output.registered or output.suspended:
       output.istreamAtEnd = true
-      ctx.put(output)
     else:
       output.oclose()
 
@@ -1071,15 +1102,14 @@ proc loadData(ctx: var LoaderContext; handle: InputHandle; request: Request) =
   let url = request.url
   var ct = url.pathname.until(',')
   if AllChars - Ascii + Controls - {'\t'} in ct:
-    handle.sendResult(ceInvalidURL, "invalid data URL")
-    handle.close()
+    ctx.rejectHandle(handle, ceInvalidURL, "invalid data URL")
     return
   let sd = ct.len + 1 # data start
   let body = percentDecode(url.pathname.toOpenArray(sd, url.pathname.high))
   if ct.endsWith(";base64"):
     var d: string
     if d.atob(body).isNone:
-      handle.rejectHandle(ceInvalidURL, "invalid data URL")
+      ctx.rejectHandle(handle, ceInvalidURL, "invalid data URL")
       return
     ct.setLen(ct.len - ";base64".len) # remove base64 indicator
     ctx.loadDataSend(handle, d, ct)
@@ -1185,7 +1215,7 @@ proc loadAbout(ctx: var LoaderContext; handle: InputHandle; request: Request) =
     if request.httpMethod == hmPost:
       # OK/STOP/PAUSE/RESUME clicked
       if request.body.t != rbtString:
-        handle.rejectHandle(ceInvalidURL, "wat")
+        ctx.rejectHandle(handle, ceInvalidURL, "wat")
         return
       for it in ctx.parseDownloadActions(request.body.s):
         let dl = ctx.downloadList[it.n]
@@ -1229,7 +1259,7 @@ proc loadAbout(ctx: var LoaderContext; handle: InputHandle; request: Request) =
     const body = staticRead"res/license.md"
     ctx.loadDataSend(handle, body, "text/markdown")
   else:
-    handle.rejectHandle(ceInvalidURL, "invalid download URL")
+    ctx.rejectHandle(handle, ceInvalidURL, "invalid download URL")
 
 proc loadResource(ctx: var LoaderContext; client: ClientHandle;
     config: LoaderClientConfig; request: Request; handle: InputHandle) =
@@ -1263,7 +1293,6 @@ proc loadResource(ctx: var LoaderContext; client: ClientHandle;
     of "cache":
       ctx.loadFromCache(client, handle, request)
       assert handle.stream == nil
-      handle.close()
     of "data":
       ctx.loadData(handle, request)
     of "about":
@@ -1275,11 +1304,11 @@ proc loadResource(ctx: var LoaderContext; client: ClientHandle;
         inc tries
         redo = true
       of ummrWrongURL:
-        handle.rejectHandle(ceInvalidURIMethodEntry)
+        ctx.rejectHandle(handle, ceInvalidURIMethodEntry)
       of ummrNotFound:
-        handle.rejectHandle(ceUnknownScheme)
+        ctx.rejectHandle(handle, ceUnknownScheme)
   if tries >= MaxRewrites:
-    handle.rejectHandle(ceTooManyRewrites)
+    ctx.rejectHandle(handle, ceTooManyRewrites)
 
 proc setupRequestDefaults(request: Request; config: LoaderClientConfig) =
   for k, v in config.defaultHeaders.table:
@@ -1309,12 +1338,14 @@ proc load(ctx: var LoaderContext; stream: SocketStream; request: Request;
   if not fail:
     discard close(sv[1])
     let stream = newSocketStream(sv[0])
+    stream.setBlocking(false)
     let handle = newInputHandle(stream, ctx.getOutputId(), client.pid)
+    ctx.put(handle.output)
     when defined(debug):
       handle.url = request.url
       handle.output.url = request.url
     if not config.filter.match(request.url):
-      handle.rejectHandle(ceDisallowedURL)
+      ctx.rejectHandle(handle, ceDisallowedURL)
     else:
       request.setupRequestDefaults(config)
       ctx.loadResource(client, config, request, handle)
@@ -1403,14 +1434,14 @@ proc addCacheFile(ctx: var LoaderContext; stream: SocketStream;
     w.swrite(id)
 
 proc redirectToFile(ctx: var LoaderContext; stream: SocketStream;
-    r: var PacketReader) =
+    client: ClientHandle; r: var PacketReader) =
   var outputId: int
   var targetPath: string
   var displayUrl: string
   r.sread(outputId)
   r.sread(targetPath)
   r.sread(displayUrl)
-  let output = ctx.findOutput(outputId, ctx.pagerClient)
+  let output = ctx.findOutput(outputId, client)
   var success = false
   if output != nil:
     var fileOutput: OutputHandle
@@ -1599,7 +1630,7 @@ proc readCommand(ctx: var LoaderContext; client: ClientHandle) =
         ctx.openCachedItem(stream, client, r)
       of lcRedirectToFile:
         privileged_command
-        ctx.redirectToFile(stream, r)
+        ctx.redirectToFile(stream, client, r)
       of lcLoadConfig:
         privileged_command
         ctx.loadConfig(stream, client, r)
@@ -1677,7 +1708,7 @@ proc finishCycle(ctx: var LoaderContext) =
       ctx.unregister(handle)
       ctx.unset(handle)
       if handle.parser != nil:
-        handle.finishParse()
+        ctx.finishParse(handle)
       handle.iclose()
       for output in handle.outputs:
         output.istreamAtEnd = true
@@ -1698,7 +1729,7 @@ proc finishCycle(ctx: var LoaderContext) =
           ctx.unregister(handle)
           ctx.unset(handle)
           if handle.parser != nil:
-            handle.finishParse()
+            ctx.finishParse(handle)
           handle.iclose()
   for client in ctx.unregClient:
     if client.stream != nil: