about summary refs log tree commit diff stats
path: root/src/loader
diff options
context:
space:
mode:
authorbptato <nincsnevem662@gmail.com>2024-05-30 00:19:48 +0200
committerbptato <nincsnevem662@gmail.com>2024-06-20 17:50:22 +0200
commit60dc37269cd2dc8cdf23d9f77680f6af9490032f (patch)
tree9a72ba24daffa546f92704e7e06cf84fded2d89d /src/loader
parenta146a22b11cea39bc691417d9d9a1292b7177552 (diff)
downloadchawan-60dc37269cd2dc8cdf23d9f77680f6af9490032f.tar.gz
img, loader: separate out png codec into cgi, misc improvements
* multi-processed and sandboxed PNG decoding & encoding (through local
  CGI)
* improved request body passing (including support for output id as
  response body)
* simplified & faster blob()/text() - now every request starts
  suspended, and OngoingData.buf has been replaced with loader's
  buffering capability
* image caching: we no longer pull bitmaps from the container after
  every single getLines call

Next steps: replace our bespoke PNG decoder with something more usable,
add other decoders, and make them stream.
Diffstat (limited to 'src/loader')
-rw-r--r--src/loader/cgi.nim53
-rw-r--r--src/loader/loader.nim160
-rw-r--r--src/loader/loaderhandle.nim41
-rw-r--r--src/loader/request.nim49
-rw-r--r--src/loader/response.nim124
5 files changed, 258 insertions, 169 deletions
diff --git a/src/loader/cgi.nim b/src/loader/cgi.nim
index f3c5b1e3..ee3d3160 100644
--- a/src/loader/cgi.nim
+++ b/src/loader/cgi.nim
@@ -42,8 +42,8 @@ proc setupEnv(cmd, scriptName, pathInfo, requestURI, myDir: string;
   if url.query.isSome:
     putEnv("QUERY_STRING", url.query.get)
   if request.httpMethod == hmPost:
-    if request.multipart.isSome:
-      putEnv("CONTENT_TYPE", request.multipart.get.getContentType())
+    if request.body.t == rbtMultipart:
+      putEnv("CONTENT_TYPE", request.body.multipart.getContentType())
     else:
       putEnv("CONTENT_TYPE", request.headers.getOrDefault("Content-Type", ""))
     putEnv("CONTENT_LENGTH", $contentLen)
@@ -126,7 +126,7 @@ proc handleLine(handle: LoaderHandle; line: string; headers: Headers) =
   headers.add(k, v)
 
 proc loadCGI*(handle: LoaderHandle; request: Request; cgiDir: seq[string];
-    prevURL: URL; insecureSSLNoVerify: bool) =
+    prevURL: URL; insecureSSLNoVerify: bool; ostream: var PosixStream) =
   if cgiDir.len == 0:
     handle.sendResult(ERROR_NO_CGI_DIR)
     return
@@ -181,16 +181,11 @@ proc loadCGI*(handle: LoaderHandle; request: Request; cgiDir: seq[string];
     return
   # Pipe the request body as stdin for POST.
   var pipefd_read: array[0..1, cint] # parent -> child
-  let needsPipe = request.body.isSome or request.multipart.isSome
-  if needsPipe:
+  if request.body.t != rbtNone:
     if pipe(pipefd_read) == -1:
       handle.sendResult(ERROR_FAIL_SETUP_CGI)
       return
-  var contentLen = 0
-  if request.body.isSome:
-    contentLen = request.body.get.len
-  elif request.multipart.isSome:
-    contentLen = request.multipart.get.calcLength()
+  let contentLen = request.body.contentLength()
   stdout.flushFile()
   stderr.flushFile()
   let pid = fork()
@@ -199,7 +194,7 @@ proc loadCGI*(handle: LoaderHandle; request: Request; cgiDir: seq[string];
   elif pid == 0:
     discard close(pipefd[0]) # close read
     discard dup2(pipefd[1], 1) # dup stdout
-    if needsPipe:
+    if request.body.t != rbtNone:
       discard close(pipefd_read[1]) # close write
       if pipefd_read[0] != 0:
         discard dup2(pipefd_read[0], 0) # dup stdin
@@ -220,38 +215,32 @@ proc loadCGI*(handle: LoaderHandle; request: Request; cgiDir: seq[string];
     quit(1)
   else:
     discard close(pipefd[1]) # close write
-    if needsPipe:
+    if request.body.t != rbtNone:
       discard close(pipefd_read[0]) # close read
       let ps = newPosixStream(pipefd_read[1])
-      if request.body.isSome:
-        ps.write(request.body.get)
-      elif request.multipart.isSome:
-        let multipart = request.multipart.get
-        for entry in multipart.entries:
-          ps.writeEntry(entry, multipart.boundary)
-        ps.writeEnd(multipart.boundary)
-      ps.sclose()
+      case request.body.t
+      of rbtString:
+        ps.write(request.body.s)
+        ps.sclose()
+      of rbtMultipart:
+        let boundary = request.body.multipart.boundary
+        for entry in request.body.multipart.entries:
+          ps.writeEntry(entry, boundary)
+        ps.writeEnd(boundary)
+        ps.sclose()
+      of rbtOutput:
+        ostream = ps
+      of rbtNone: discard
     handle.parser = HeaderParser(headers: newHeaders())
     handle.istream = newPosixStream(pipefd[0])
 
-proc killHandle(handle: LoaderHandle) =
-  if handle.parser.state != hpsBeforeLines:
-    # not an ideal solution, but better than silently eating malformed
-    # headers
-    handle.output.ostream.setBlocking(true)
-    handle.sendStatus(500)
-    handle.sendHeaders(newHeaders())
-    const msg = "Error: malformed header in CGI script"
-    discard handle.output.ostream.sendData(msg)
-  handle.parser = nil
-
 proc parseHeaders0(handle: LoaderHandle; buffer: LoaderBuffer): int =
   let parser = handle.parser
   var s = parser.lineBuffer
   let L = if buffer == nil: 1 else: buffer.len
   for i in 0 ..< L:
     template die =
-      handle.killHandle()
+      handle.parser = nil
       return -1
     let c = if buffer != nil:
       char(buffer.page[i])
diff --git a/src/loader/loader.nim b/src/loader/loader.nim
index c84f247a..89d97cde 100644
--- a/src/loader/loader.nim
+++ b/src/loader/loader.nim
@@ -8,11 +8,17 @@
 #  S: output ID
 #  S: status code
 #  S: headers
+#  C: resume
 #  S: response body
 # else:
 #  S: error message
 #
 # The body is passed to the stream as-is, so effectively nothing can follow it.
+#
+# Note: if the consumer closes the request's body after headers have been
+# passed, it will *not* be cleaned up until a `resume' command is
+# received. (This allows for passing outputIds to the pager for later
+# addCacheFile commands there.)
 
 import std/deques
 import std/nativesockets
@@ -57,7 +63,7 @@ type
     process*: int
     clientPid*: int
     connecting*: Table[int, ConnectData]
-    ongoing*: Table[int, OngoingData]
+    ongoing*: Table[int, Response]
     unregistered*: seq[int]
     registerFun*: proc(fd: int)
     unregisterFun*: proc(fd: int)
@@ -71,11 +77,6 @@ type
     stream*: SocketStream
     request: Request
 
-  OngoingData* = object
-    buf: string
-    response*: Response
-    bodyRead: Promise[string]
-
   LoaderCommand = enum
     lcAddCacheFile
     lcAddClient
@@ -155,10 +156,12 @@ proc rejectHandle(handle: LoaderHandle; code: ConnectErrorCode; msg = "") =
   handle.sendResult(code, msg)
   handle.close()
 
-func findOutput(ctx: LoaderContext; id: int): OutputHandle =
+func findOutput(ctx: LoaderContext; id: int; client: ClientData): OutputHandle =
   assert id != -1
   for it in ctx.outputMap.values:
     if it.outputId == id:
+      # verify that it's safe to access this handle.
+      doAssert ctx.isPrivileged(client) or client.pid == it.ownerPid
       return it
   return nil
 
@@ -211,11 +214,8 @@ proc getOutputId(ctx: LoaderContext): int =
   result = ctx.outputNum
   inc ctx.outputNum
 
-proc redirectToFile(ctx: LoaderContext; output: OutputHandle;
-    targetPath: string): bool =
-  let ps = newPosixStream(targetPath, O_CREAT or O_WRONLY, 0o600)
-  if ps == nil:
-    return false
+proc redirectToStream(ctx: LoaderContext; output: OutputHandle;
+    ps: PosixStream): bool =
   if output.currentBuffer != nil:
     let n = ps.sendData(output.currentBuffer, output.currentBufferIdx)
     if unlikely(n < output.currentBuffer.len - output.currentBufferIdx):
@@ -226,7 +226,9 @@ proc redirectToFile(ctx: LoaderContext; output: OutputHandle;
     if unlikely(n < buffer.len):
       ps.sclose()
       return false
-  if output.parent != nil:
+  if output.istreamAtEnd:
+    ps.sclose()
+  elif output.parent != nil:
     output.parent.outputs.add(OutputHandle(
       parent: output.parent,
       ostream: ps,
@@ -235,6 +237,13 @@ proc redirectToFile(ctx: LoaderContext; output: OutputHandle;
     ))
   return true
 
+proc redirectToFile(ctx: LoaderContext; output: OutputHandle;
+    targetPath: string): bool =
+  let ps = newPosixStream(targetPath, O_CREAT or O_WRONLY, 0o600)
+  if ps == nil:
+    return false
+  return ctx.redirectToStream(output, ps)
+
 type AddCacheFileResult = tuple[outputId: int; cacheFile: string]
 
 proc addCacheFile(ctx: LoaderContext; client: ClientData; output: OutputHandle):
@@ -335,8 +344,7 @@ proc loadStreamRegular(ctx: LoaderContext; handle, cachedHandle: LoaderHandle) =
       output.ostream.sclose()
       output.ostream = nil
   handle.outputs.setLen(0)
-  handle.istream.sclose()
-  handle.istream = nil
+  handle.iclose()
 
 proc loadStream(ctx: LoaderContext; handle: LoaderHandle; request: Request) =
   ctx.passedFdMap.withValue(request.url.pathname, fdp):
@@ -406,10 +414,17 @@ proc loadResource(ctx: LoaderContext; client: ClientData; config: LoaderClientCo
           redo = true
           continue
     if request.url.scheme == "cgi-bin":
+      var ostream: PosixStream = nil
       handle.loadCGI(request, ctx.config.cgiDir, prevurl,
-        config.insecureSSLNoVerify)
+        config.insecureSSLNoVerify, ostream)
       if handle.istream != nil:
         ctx.addFd(handle)
+        if ostream != nil:
+          let output = ctx.findOutput(request.body.outputId, client)
+          if output != nil:
+            doAssert ctx.redirectToStream(output, ostream)
+          else:
+            ostream.sclose()
       else:
         handle.close()
     elif request.url.scheme == "stream":
@@ -451,8 +466,7 @@ proc setupRequestDefaults(request: Request; config: LoaderClientConfig) =
 
 proc load(ctx: LoaderContext; stream: SocketStream; request: Request;
     client: ClientData; config: LoaderClientConfig) =
-  let handle = newLoaderHandle(stream, ctx.getOutputId(), client.pid,
-    request.suspended)
+  let handle = newLoaderHandle(stream, ctx.getOutputId(), client.pid)
   when defined(debug):
     handle.url = request.url
     handle.output.url = request.url
@@ -514,9 +528,12 @@ proc addCacheFile(ctx: LoaderContext; stream: SocketStream;
     r: var BufferedReader) =
   var outputId: int
   var targetPid: int
+  var sourcePid: int
   r.sread(outputId)
   r.sread(targetPid)
-  let output = ctx.findOutput(outputId)
+  r.sread(sourcePid)
+  let sourceClient = ctx.clientData[sourcePid]
+  let output = ctx.findOutput(outputId, sourceClient)
   assert output != nil
   let targetClient = ctx.clientData[targetPid]
   let (id, file) = ctx.addCacheFile(targetClient, output)
@@ -531,7 +548,7 @@ proc redirectToFile(ctx: LoaderContext; stream: SocketStream;
   var targetPath: string
   r.sread(outputId)
   r.sread(targetPath)
-  let output = ctx.findOutput(outputId)
+  let output = ctx.findOutput(outputId, ctx.pagerClient)
   var success = false
   if output != nil:
     success = ctx.redirectToFile(output, targetPath)
@@ -583,9 +600,7 @@ proc tee(ctx: LoaderContext; stream: SocketStream; client: ClientData;
   var targetPid: int
   r.sread(sourceId)
   r.sread(targetPid)
-  let output = ctx.findOutput(sourceId)
-  # only allow tee'ing outputs owned by client
-  doAssert output.ownerPid == client.pid
+  let output = ctx.findOutput(sourceId, client)
   if output != nil:
     let id = ctx.getOutputId()
     output.tee(stream, id, targetPid)
@@ -602,7 +617,7 @@ proc suspend(ctx: LoaderContext; stream: SocketStream; client: ClientData;
   var ids: seq[int]
   r.sread(ids)
   for id in ids:
-    let output = ctx.findOutput(id)
+    let output = ctx.findOutput(id, client)
     if output != nil:
       output.suspended = true
       if output.registered:
@@ -615,7 +630,7 @@ proc resume(ctx: LoaderContext; stream: SocketStream; client: ClientData;
   var ids: seq[int]
   r.sread(ids)
   for id in ids:
-    let output = ctx.findOutput(id)
+    let output = ctx.findOutput(id, client)
     if output != nil:
       output.suspended = false
       assert not output.registered
@@ -793,10 +808,9 @@ proc finishCycle(ctx: LoaderContext; unregRead: var seq[LoaderHandle];
     if handle.istream != nil:
       ctx.selector.unregister(handle.istream.fd)
       ctx.handleMap.del(handle.istream.fd)
-      handle.istream.sclose()
-      handle.istream = nil
       if handle.parser != nil:
         handle.finishParse()
+      handle.iclose()
       for output in handle.outputs:
         output.istreamAtEnd = true
         if output.isEmpty:
@@ -816,10 +830,9 @@ proc finishCycle(ctx: LoaderContext; unregRead: var seq[LoaderHandle];
           # premature end of all output streams; kill istream too
           ctx.selector.unregister(handle.istream.fd)
           ctx.handleMap.del(handle.istream.fd)
-          handle.istream.sclose()
-          handle.istream = nil
           if handle.parser != nil:
             handle.finishParse()
+          handle.iclose()
 
 proc runFileLoader*(fd: cint; config: LoaderConfig) =
   var ctx = initLoaderContext(fd, config)
@@ -861,12 +874,7 @@ proc getRedirect*(response: Response; request: Request): Request =
             status == 302 and request.httpMethod == hmPost:
           return newRequest(url.get, hmGet)
         else:
-          return newRequest(
-            url.get,
-            request.httpMethod,
-            body = request.body,
-            multipart = request.multipart
-          )
+          return newRequest(url.get, request.httpMethod, body = request.body)
   return nil
 
 template withLoaderPacketWriter(stream: SocketStream; loader: FileLoader;
@@ -898,7 +906,6 @@ proc startRequest*(loader: FileLoader; request: Request;
     w.swrite(config)
   return stream
 
-#TODO: add init
 proc fetch*(loader: FileLoader; input: Request): FetchPromise =
   let stream = loader.startRequest(input)
   let fd = int(stream.fd)
@@ -913,10 +920,7 @@ proc fetch*(loader: FileLoader; input: Request): FetchPromise =
 
 proc reconnect*(loader: FileLoader; data: ConnectData) =
   data.stream.sclose()
-  let stream = loader.connect()
-  stream.withLoaderPacketWriter loader, w:
-    w.swrite(lcLoad)
-    w.swrite(data.request)
+  let stream = loader.startRequest(data.request)
   let fd = int(stream.fd)
   loader.registerFun(fd)
   loader.connecting[fd] = ConnectData(
@@ -925,18 +929,6 @@ proc reconnect*(loader: FileLoader; data: ConnectData) =
     stream: stream
   )
 
-proc switchStream*(data: var ConnectData; stream: SocketStream) =
-  data.stream = stream
-
-proc switchStream*(loader: FileLoader; data: var OngoingData;
-    stream: SocketStream) =
-  data.response.body = stream
-  let fd = int(stream.fd)
-  data.response.unregisterFun = proc() =
-    loader.ongoing.del(fd)
-    loader.unregistered.add(fd)
-    loader.unregisterFun(fd)
-
 proc suspend*(loader: FileLoader; fds: seq[int]) =
   let stream = loader.connect()
   stream.withLoaderPacketWriter loader, w:
@@ -944,13 +936,16 @@ proc suspend*(loader: FileLoader; fds: seq[int]) =
     w.swrite(fds)
   stream.sclose()
 
-proc resume*(loader: FileLoader; fds: seq[int]) =
+proc resume*(loader: FileLoader; fds: openArray[int]) =
   let stream = loader.connect()
   stream.withLoaderPacketWriter loader, w:
     w.swrite(lcResume)
     w.swrite(fds)
   stream.sclose()
 
+proc resume*(loader: FileLoader; fds: int) =
+  loader.resume([fds])
+
 proc tee*(loader: FileLoader; sourceId, targetPid: int): (SocketStream, int) =
   let stream = loader.connect()
   stream.withLoaderPacketWriter loader, w:
@@ -962,15 +957,20 @@ proc tee*(loader: FileLoader; sourceId, targetPid: int): (SocketStream, int) =
   r.sread(outputId)
   return (stream, outputId)
 
-proc addCacheFile*(loader: FileLoader; outputId, targetPid: int):
-    AddCacheFileResult =
+# sourcePid is the PID of the output's owner. This is used in pager for images,
+# so that we can be sure that a container only loads images on the page that
+# it owns.
+proc addCacheFile*(loader: FileLoader; outputId, targetPid: int;
+    sourcePid = -1): AddCacheFileResult =
   let stream = loader.connect()
   if stream == nil:
     return (-1, "")
+  let sourcePid = if sourcePid == -1: loader.clientPid else: sourcePid
   stream.withLoaderPacketWriter loader, w:
     w.swrite(lcAddCacheFile)
     w.swrite(outputId)
     w.swrite(targetPid)
+    w.swrite(sourcePid)
   var r = stream.initPacketReader()
   var outputId: int
   var cacheFile: string
@@ -990,18 +990,18 @@ proc redirectToFile*(loader: FileLoader; outputId: int; targetPath: string):
   var r = stream.initPacketReader()
   r.sread(result)
 
-const BufferSize = 4096
-
 proc onConnected*(loader: FileLoader; fd: int) =
   let connectData = loader.connecting[fd]
   let stream = connectData.stream
   let promise = connectData.promise
   let request = connectData.request
+  # delete before resolving the promise
+  loader.connecting.del(fd)
   var r = stream.initPacketReader()
   var res: int
   r.sread(res) # packet 1
-  let response = newResponse(res, request, stream)
   if res == 0:
+    let response = newResponse(res, request, stream)
     r.sread(response.outputId) # packet 1
     r = stream.initPacketReader()
     r.sread(response.status) # packet 2
@@ -1011,13 +1011,12 @@ proc onConnected*(loader: FileLoader; fd: int) =
     response.body = stream
     assert loader.unregisterFun != nil
     response.unregisterFun = proc() =
-      loader.ongoing.del(fd)
-      loader.unregistered.add(fd)
-      loader.unregisterFun(fd)
-    loader.ongoing[fd] = OngoingData(
-      response: response,
-      bodyRead: response.bodyRead
-    )
+      loader.ongoing.del(response.body.fd)
+      loader.unregistered.add(response.body.fd)
+      loader.unregisterFun(response.body.fd)
+    response.resumeFun = proc(outputId: int) =
+      loader.resume(outputId)
+    loader.ongoing[fd] = response
     stream.setBlocking(false)
     promise.resolve(JSResult[Response].ok(response))
   else:
@@ -1030,40 +1029,27 @@ proc onConnected*(loader: FileLoader; fd: int) =
     stream.sclose()
     let err = newTypeError("NetworkError when attempting to fetch resource")
     promise.resolve(JSResult[Response].err(err))
-  loader.connecting.del(fd)
 
 proc onRead*(loader: FileLoader; fd: int) =
-  loader.ongoing.withValue(fd, buffer):
-    let response = buffer[].response
-    while not response.body.isend:
-      let olen = buffer[].buf.len
-      try:
-        buffer[].buf.setLen(olen + BufferSize)
-        let n = response.body.recvData(addr buffer[].buf[olen], BufferSize)
-        buffer[].buf.setLen(olen + n)
-        if n == 0:
-          break
-      except ErrorAgain:
-        buffer[].buf.setLen(olen)
-        break
+  let response = loader.ongoing.getOrDefault(fd)
+  if response != nil:
+    response.onRead(response)
     if response.body.isend:
-      buffer[].bodyRead.resolve(buffer[].buf)
-      buffer[].bodyRead = nil
-      buffer[].buf = ""
+      response.bodyRead.resolve()
+      response.bodyRead = nil
       response.unregisterFun()
 
 proc onError*(loader: FileLoader; fd: int) =
-  loader.ongoing.withValue(fd, buffer):
-    let response = buffer[].response
+  let response = loader.ongoing.getOrDefault(fd)
+  if response != nil:
     when defined(debug):
       var lbuf {.noinit.}: array[BufferSize, char]
       if not response.body.isend:
         let n = response.body.recvData(addr lbuf[0], lbuf.len)
         assert n == 0
       assert response.body.isend
-    buffer[].bodyRead.resolve(buffer[].buf)
-    buffer[].bodyRead = nil
-    buffer[].buf = ""
+    response.bodyRead.resolve()
+    response.bodyRead = nil
     response.unregisterFun()
 
 # Note: this blocks until headers are received.
diff --git a/src/loader/loaderhandle.nim b/src/loader/loaderhandle.nim
index 00f6f754..31a41571 100644
--- a/src/loader/loaderhandle.nim
+++ b/src/loader/loaderhandle.nim
@@ -3,6 +3,7 @@ import std/net
 import std/tables
 
 import io/bufwriter
+import io/dynstream
 import io/posixstream
 import loader/headers
 
@@ -44,14 +45,15 @@ type
     status*: uint16
 
   ResponseState = enum
-    rsBeforeResult, rsBeforeStatus, rsBeforeHeaders, rsAfterHeaders
+    rsBeforeResult, rsAfterFailure, rsBeforeStatus, rsBeforeHeaders,
+    rsAfterHeaders
 
   LoaderHandle* = ref object
     istream*: PosixStream # stream for taking input
     outputs*: seq[OutputHandle] # list of outputs to be streamed into
     cacheId*: int # if cached, our ID in a client cacheMap
     parser*: HeaderParser # only exists for CGI handles
-    rstate: ResponseState # just an enum for sanity checks
+    rstate: ResponseState # track response state
     when defined(debug):
       url*: URL
 
@@ -69,15 +71,14 @@ when defined(debug):
     return s
 
 # Create a new loader handle, with the output stream ostream.
-proc newLoaderHandle*(ostream: PosixStream; outputId, pid: int;
-    suspended: bool): LoaderHandle =
+proc newLoaderHandle*(ostream: PosixStream; outputId, pid: int): LoaderHandle =
   let handle = LoaderHandle(cacheId: -1)
   handle.outputs.add(OutputHandle(
     ostream: ostream,
     parent: handle,
     outputId: outputId,
     ownerPid: pid,
-    suspended: suspended
+    suspended: true
   ))
   return handle
 
@@ -108,15 +109,17 @@ proc bufferCleared*(output: OutputHandle) =
     output.currentBuffer = nil
 
 proc tee*(outputIn: OutputHandle; ostream: PosixStream; outputId, pid: int) =
-  outputIn.parent.outputs.add(OutputHandle(
-    parent: outputIn.parent,
+  let parent = outputIn.parent
+  parent.outputs.add(OutputHandle(
+    parent: parent,
     ostream: ostream,
     currentBuffer: outputIn.currentBuffer,
     currentBufferIdx: outputIn.currentBufferIdx,
     buffers: outputIn.buffers,
     istreamAtEnd: outputIn.istreamAtEnd,
     outputId: outputId,
-    ownerPid: pid
+    ownerPid: pid,
+    suspended: outputIn.suspended
   ))
 
 template output*(handle: LoaderHandle): OutputHandle =
@@ -133,6 +136,7 @@ proc sendResult*(handle: LoaderHandle; res: int; msg = "") =
     if res == 0: # success
       assert msg == ""
       w.swrite(output.outputId)
+      inc handle.rstate
     else: # error
       w.swrite(msg)
   output.ostream.setBlocking(blocking)
@@ -164,12 +168,27 @@ proc sendData*(ps: PosixStream; buffer: LoaderBuffer; si = 0): int {.inline.} =
   assert buffer.len - si > 0
   return ps.sendData(addr buffer.page[si], buffer.len - si)
 
+proc iclose*(handle: LoaderHandle) =
+  if handle.istream != nil:
+    if handle.rstate notin {rsBeforeResult, rsAfterFailure, rsAfterHeaders}:
+      assert handle.outputs.len == 1
+      # not an ideal solution, but better than silently eating malformed
+      # headers
+      try:
+        handle.sendStatus(500)
+        handle.sendHeaders(newHeaders())
+        handle.output.ostream.setBlocking(true)
+        const msg = "Error: malformed header in CGI script"
+        discard handle.output.ostream.sendData(msg)
+      except ErrorBrokenPipe:
+        discard # receiver is dead
+    handle.istream.sclose()
+    handle.istream = nil
+
 proc close*(handle: LoaderHandle) =
+  handle.iclose()
   for output in handle.outputs:
     #TODO assert not output.registered
     if output.ostream != nil:
       output.ostream.sclose()
       output.ostream = nil
-  if handle.istream != nil:
-    handle.istream.sclose()
-    handle.istream = nil
diff --git a/src/loader/request.nim b/src/loader/request.nim
index 277481f1..f92098cb 100644
--- a/src/loader/request.nim
+++ b/src/loader/request.nim
@@ -58,17 +58,27 @@ type
     of rwtWindow:
       window*: EnvironmentSettings
 
+  RequestBodyType* = enum
+    rbtNone, rbtString, rbtMultipart, rbtOutput
+
+  RequestBody* = object
+    case t*: RequestBodyType
+    of rbtNone:
+      discard
+    of rbtString:
+      s*: string
+    of rbtMultipart:
+      multipart*: FormData
+    of rbtOutput:
+      outputId*: int
+
   Request* = ref object
     httpMethod*: HttpMethod
     url*: URL
     headers*: Headers
-    body*: Option[string]
-    multipart*: Option[FormData]
+    body*: RequestBody
     referrer*: URL
     proxy*: URL #TODO do something with this
-    # when set to true, the loader will not write data from the body (not
-    # headers!) into the output until a resume is received.
-    suspended*: bool
 
   JSRequest* = ref object
     request*: Request
@@ -81,6 +91,13 @@ type
 
 jsDestructor(JSRequest)
 
+proc contentLength*(body: RequestBody): int =
+  case body.t
+  of rbtNone: return 0
+  of rbtString: return body.s.len
+  of rbtMultipart: return body.multipart.calcLength()
+  of rbtOutput: return 0
+
 func headers(this: JSRequest): Headers {.jsfget.} =
   return this.request.headers
 
@@ -102,17 +119,14 @@ iterator pairs*(headers: Headers): (string, string) =
       yield (k, v)
 
 func newRequest*(url: URL; httpMethod = hmGet; headers = newHeaders();
-    body = none(string); multipart = none(FormData); proxy: URL = nil;
-    referrer: URL = nil; suspended = false): Request =
+    body = RequestBody(); proxy: URL = nil; referrer: URL = nil): Request =
   return Request(
     url: url,
     httpMethod: httpMethod,
     headers: headers,
     body: body,
-    multipart: multipart,
     referrer: referrer,
-    proxy: proxy,
-    suspended: suspended
+    proxy: proxy
   )
 
 func createPotentialCORSRequest*(url: URL; destination: RequestDestination;
@@ -178,7 +192,7 @@ proc fromJSBodyInit(ctx: JSContext; val: JSValue): JSResult[BodyInit] =
     let x = fromJS[string](ctx, val)
     if x.isSome:
       return ok(BodyInit(t: bitString, str: x.get))
-  return err(newTypeError("Invalid body init type"))
+  return errTypeError("Invalid body init type")
 
 func newRequest*[T: string|JSRequest](ctx: JSContext; resource: T;
     init = none(RequestInit)): JSResult[JSRequest] {.jsctor.} =
@@ -188,13 +202,12 @@ func newRequest*[T: string|JSRequest](ctx: JSContext; resource: T;
   when T is string:
     let url = ?newURL(resource)
     if url.username != "" or url.password != "":
-      return err(newTypeError("Input URL contains a username or password"))
+      return errTypeError("Input URL contains a username or password")
     var httpMethod = hmGet
     var headers = newHeaders()
     let referrer: URL = nil
     var credentials = cmSameOrigin
-    var body: Option[string]
-    var multipart: Option[FormData]
+    var body = RequestBody()
     var proxyUrl: URL #TODO?
     let fallbackMode = opt(rmCors)
     var window = RequestWindow(t: rwtClient)
@@ -205,7 +218,6 @@ func newRequest*[T: string|JSRequest](ctx: JSContext; resource: T;
     let referrer = resource.request.referrer
     var credentials = resource.credentialsMode
     var body = resource.request.body
-    var multipart = resource.request.multipart
     var proxyUrl = resource.request.proxy #TODO?
     let fallbackMode = none(RequestMode)
     var window = resource.window
@@ -226,8 +238,10 @@ func newRequest*[T: string|JSRequest](ctx: JSContext; resource: T;
     if init.body.isSome:
       let ibody = init.body.get
       case ibody.t
-      of bitFormData: multipart = some(ibody.formData)
-      of bitString: body = some(ibody.str)
+      of bitFormData:
+        body = RequestBody(t: rbtMultipart, multipart: ibody.formData)
+      of bitString:
+        body = RequestBody(t: rbtString, s: ibody.str)
       else: discard #TODO
       if httpMethod in {hmGet, hmHead}:
         return errTypeError("HEAD or GET Request cannot have a body.")
@@ -245,7 +259,6 @@ func newRequest*[T: string|JSRequest](ctx: JSContext; resource: T;
       httpMethod,
       headers,
       body,
-      multipart,
       proxy = proxyUrl,
       referrer = referrer
     ),
diff --git a/src/loader/response.nim b/src/loader/response.nim
index 8ea17e64..3834d5a9 100644
--- a/src/loader/response.nim
+++ b/src/loader/response.nim
@@ -3,6 +3,8 @@ import std/tables
 
 import chagashi/charset
 import chagashi/decoder
+import img/bitmap
+import io/posixstream
 import io/promise
 import io/socketstream
 import loader/headers
@@ -11,6 +13,7 @@ import monoucha/javascript
 import monoucha/jserror
 import monoucha/quickjs
 import types/blob
+import types/color
 import types/opt
 import types/url
 import utils/mimeguess
@@ -43,9 +46,12 @@ type
     headersGuard: HeadersGuard
     url*: URL #TODO should be urllist?
     unregisterFun*: proc()
-    bodyRead*: Promise[string]
+    resumeFun*: proc(outputId: int)
+    bodyRead*: EmptyPromise
     internalMessage*: string # should NOT be exposed to JS!
     outputId*: int
+    onRead*: proc(response: Response) {.nimcall.}
+    opaque*: RootRef
 
 jsDestructor(Response)
 
@@ -54,7 +60,7 @@ proc newResponse*(res: int; request: Request; stream: SocketStream): Response =
     res: res,
     url: request.url,
     body: stream,
-    bodyRead: Promise[string](),
+    bodyRead: EmptyPromise(),
     outputId: -1
   )
 
@@ -66,7 +72,8 @@ func makeNetworkError*(): Response {.jsstfunc: "Response.error".} =
     responseType: TYPE_ERROR,
     status: 0,
     headers: newHeaders(),
-    headersGuard: hgImmutable
+    headersGuard: hgImmutable,
+    bodyUsed: true
   )
 
 func sok(response: Response): bool {.jsfget: "ok".} =
@@ -102,6 +109,25 @@ func getContentType*(this: Response): string =
   # override buffer mime.types
   return DefaultGuess.guessContentType(this.url.pathname)
 
+type TextOpaque = ref object of RootObj
+  buf: string
+
+const BufferSize = 4096
+
+proc onReadText(response: Response) =
+  let opaque = TextOpaque(response.opaque)
+  while true:
+    let olen = opaque.buf.len
+    try:
+      opaque.buf.setLen(olen + BufferSize)
+      let n = response.body.recvData(addr opaque.buf[olen], BufferSize)
+      opaque.buf.setLen(olen + n)
+      if n == 0:
+        break
+    except ErrorAgain:
+      opaque.buf.setLen(olen)
+      break
+
 proc text*(response: Response): Promise[JSResult[string]] {.jsfunc.} =
   if response.body == nil:
     let p = newPromise[JSResult[string]]()
@@ -113,40 +139,96 @@ proc text*(response: Response): Promise[JSResult[string]] {.jsfunc.} =
       .err(newTypeError("Body has already been consumed"))
     p.resolve(err)
     return p
-  let bodyRead = response.bodyRead
-  response.bodyRead = nil
-  return bodyRead.then(proc(s: string): JSResult[string] =
+  let opaque = TextOpaque()
+  response.opaque = opaque
+  response.onRead = onReadText
+  response.bodyUsed = true
+  response.resumeFun(response.outputId)
+  response.resumeFun = nil
+  return response.bodyRead.then(proc(): JSResult[string] =
     let charset = response.getCharset(CHARSET_UTF_8)
-    #TODO this is inefficient
-    # maybe add a JS type that turns a seq[char] into JS strings
-    ok(s.decodeAll(charset))
+    ok(opaque.buf.decodeAll(charset))
   )
 
+type BlobOpaque = ref object of RootObj
+  p: pointer
+  len: int
+  size: int
+
+proc onReadBlob(response: Response) =
+  let opaque = BlobOpaque(response.opaque)
+  while true:
+    try:
+      let targetLen = opaque.len + BufferSize
+      if targetLen > opaque.size:
+        opaque.size = targetLen
+        opaque.p = realloc(opaque.p, targetLen)
+      let p = cast[ptr UncheckedArray[uint8]](opaque.p)
+      let n = response.body.recvData(addr p[opaque.len], BufferSize)
+      opaque.len += n
+      if n == 0:
+        break
+    except ErrorAgain:
+      break
+
 proc blob*(response: Response): Promise[JSResult[Blob]] {.jsfunc.} =
-  if response.bodyRead == nil:
+  if response.bodyUsed:
     let p = newPromise[JSResult[Blob]]()
     let err = JSResult[Blob]
       .err(newTypeError("Body has already been consumed"))
     p.resolve(err)
     return p
-  let bodyRead = response.bodyRead
-  response.bodyRead = nil
+  let opaque = BlobOpaque()
+  response.opaque = opaque
+  response.onRead = onReadBlob
+  response.bodyUsed = true
+  response.resumeFun(response.outputId)
+  response.resumeFun = nil
   let contentType = response.getContentType()
-  return bodyRead.then(proc(s: string): JSResult[Blob] =
-    if s.len == 0:
+  return response.bodyRead.then(proc(): JSResult[Blob] =
+    let p = realloc(opaque.p, opaque.len)
+    opaque.p = nil
+    if p == nil:
       return ok(newBlob(nil, 0, contentType, nil))
-    GC_ref(s)
-    let deallocFun = proc() =
-      GC_unref(s)
-    let blob = newBlob(unsafeAddr s[0], s.len, contentType, deallocFun)
-    ok(blob))
+    ok(newBlob(p, opaque.len, contentType, deallocBlob))
+  )
+
+type BitmapOpaque = ref object of RootObj
+  bmp: Bitmap
+  idx: int
+
+proc onReadBitmap(response: Response) =
+  let opaque = BitmapOpaque(response.opaque)
+  let bmp = opaque.bmp
+  while true:
+    try:
+      let p = cast[ptr UncheckedArray[uint8]](addr bmp.px[0])
+      let L = bmp.px.len * 4 - opaque.idx
+      let n = response.body.recvData(addr p[opaque.idx], L)
+      opaque.idx += n
+      if n == 0:
+        break
+    except ErrorAgain:
+      break
+
+proc saveToBitmap*(response: Response; bmp: Bitmap): EmptyPromise =
+  assert not response.bodyUsed
+  let opaque = BitmapOpaque(bmp: bmp, idx: 0)
+  let size = bmp.width * bmp.height
+  bmp.px = cast[seq[ARGBColor]](newSeqUninitialized[uint32](size))
+  response.opaque = opaque
+  response.onRead = onReadBitmap
+  response.bodyUsed = true
+  response.resumeFun(response.outputId)
+  response.resumeFun = nil
+  return response.bodyRead
 
 proc json(ctx: JSContext; this: Response): Promise[JSResult[JSValue]]
     {.jsfunc.} =
   return this.text().then(proc(s: JSResult[string]): JSResult[JSValue] =
     let s = ?s
-    return ok(JS_ParseJSON(ctx, cstring(s), cast[csize_t](s.len),
-      cstring"<input>")))
+    return ok(JS_ParseJSON(ctx, cstring(s), csize_t(s.len), cstring"<input>"))
+  )
 
 proc addResponseModule*(ctx: JSContext) =
   ctx.registerType(Response)