about summary refs log tree commit diff stats
diff options
context:
space:
mode:
authorbptato <nincsnevem662@gmail.com>2024-03-21 23:11:18 +0100
committerbptato <nincsnevem662@gmail.com>2024-03-21 23:18:55 +0100
commit03d591d9aed833b0bdb028bfea376e0beeac8e9a (patch)
tree9a369a4af2aab1711901c06cc02d8b5497018f2a
parentabb09126edcce518614efc0e2c0d55d5e42f8094 (diff)
downloadchawan-03d591d9aed833b0bdb028bfea376e0beeac8e9a.tar.gz
io: add bufreader
analogous to bufwriter
-rw-r--r--src/io/bufreader.nim198
-rw-r--r--src/io/bufwriter.nim35
-rw-r--r--src/io/dynstream.nim7
-rw-r--r--src/io/serialize.nim53
-rw-r--r--src/io/serversocket.nim6
-rw-r--r--src/io/socketstream.nim9
-rw-r--r--src/loader/loader.nim255
-rw-r--r--src/local/client.nim2
-rw-r--r--src/server/buffer.nim33
-rw-r--r--src/server/forkserver.nim89
10 files changed, 426 insertions, 261 deletions
diff --git a/src/io/bufreader.nim b/src/io/bufreader.nim
new file mode 100644
index 00000000..a8d30fb0
--- /dev/null
+++ b/src/io/bufreader.nim
@@ -0,0 +1,198 @@
+# Write data to streams.
+
+import std/options
+import std/sets
+import std/tables
+
+import io/dynstream
+import types/blob
+import types/formdata
+import types/opt
+import types/url
+
+type BufferedReader* = object
+  buffer: seq[uint8]
+  bufIdx: int
+
+proc initReader*(stream: DynStream; len: int): BufferedReader =
+  assert len != 0
+  var reader = BufferedReader(
+    buffer: newSeqUninitialized[uint8](len),
+    bufIdx: 0
+  )
+  var n = 0
+  while true:
+    n += stream.recvData(addr reader.buffer[n], len - n)
+    if n == len:
+      break
+  return reader
+
+template withPacketReader*(stream: DynStream; r, body: untyped) =
+  block:
+    var len: int
+    # note: this must be readData
+    doAssert stream.readData(addr len, sizeof(len)) == sizeof(len)
+    var r = stream.initReader(len)
+    body
+
+proc sread*(reader: var BufferedReader; n: var SomeNumber)
+proc sread*[T](reader: var BufferedReader; s: var set[T])
+proc sread*[T: enum](reader: var BufferedReader; x: var T)
+proc sread*(reader: var BufferedReader; s: var string)
+proc sread*(reader: var BufferedReader; b: var bool)
+proc sread*(reader: var BufferedReader; url: var URL)
+proc sread*(reader: var BufferedReader; tup: var tuple)
+proc sread*[I, T](reader: var BufferedReader; a: var array[I, T])
+proc sread*(reader: var BufferedReader; s: var seq)
+proc sread*[U, V](reader: var BufferedReader; t: var Table[U, V])
+proc sread*(reader: var BufferedReader; obj: var object)
+proc sread*(reader: var BufferedReader; obj: var ref object)
+proc sread*(reader: var BufferedReader; part: var FormDataEntry)
+proc sread*(reader: var BufferedReader; blob: var Blob)
+proc sread*[T](reader: var BufferedReader; o: var Option[T])
+proc sread*[T, E](reader: var BufferedReader; o: var Result[T, E])
+
+proc readData(reader: var BufferedReader; buffer: pointer; len: int) =
+  assert reader.bufIdx + len <= reader.buffer.len
+  copyMem(buffer, addr reader.buffer[reader.bufIdx], len)
+  reader.bufIdx += len
+
+proc sread*(reader: var BufferedReader; n: var SomeNumber) =
+  reader.readData(addr n, sizeof(n))
+
+proc sread*[T: enum](reader: var BufferedReader; x: var T) =
+  var i: int
+  reader.sread(i)
+  x = cast[T](i)
+
+proc sread*[T](reader: var BufferedReader; s: var set[T]) =
+  var len: int
+  reader.sread(len)
+  for i in 0 ..< len:
+    var x: T
+    reader.sread(x)
+    s.incl(x)
+
+proc sread*(reader: var BufferedReader; s: var string) =
+  var len: int
+  reader.sread(len)
+  s = newString(len)
+  if len > 0:
+    reader.readData(addr s[0], len)
+
+proc sread*(reader: var BufferedReader; b: var bool) =
+  var n: uint8
+  reader.sread(n)
+  if n == 1u8:
+    b = true
+  else:
+    assert n == 0u8
+    b = false
+
+proc sread*(reader: var BufferedReader; url: var URL) =
+  var s: string
+  reader.sread(s)
+  if s == "":
+    url = nil
+  else:
+    let x = newURL(s)
+    if x.isSome:
+      url = x.get
+    else:
+      url = nil
+
+proc sread*(reader: var BufferedReader; tup: var tuple) =
+  for f in tup.fields:
+    reader.sread(f)
+
+proc sread*[I; T](reader: var BufferedReader; a: var array[I, T]) =
+  for x in a.mitems:
+    reader.sread(x)
+
+proc sread*(reader: var BufferedReader; s: var seq) =
+  var len: int
+  reader.sread(len)
+  s.setLen(len)
+  for x in s.mitems:
+    reader.sread(x)
+
+proc sread*[U; V](reader: var BufferedReader, t: var Table[U, V]) =
+  var len: int
+  reader.sread(len)
+  for i in 0..<len:
+    var k: U
+    reader.sread(k)
+    var v: V
+    reader.sread(v)
+    t[k] = v
+
+proc sread*(reader: var BufferedReader; obj: var object) =
+  for f in obj.fields:
+    reader.sread(f)
+
+proc sread*(reader: var BufferedReader; obj: var ref object) =
+  var n: bool
+  reader.sread(n)
+  if n:
+    new(obj)
+    reader.sread(obj[])
+
+proc sread*(reader: var BufferedReader; part: var FormDataEntry) =
+  var isstr: bool
+  reader.sread(isstr)
+  if isstr:
+    part = FormDataEntry(isstr: true)
+  else:
+    part = FormDataEntry(isstr: false)
+  reader.sread(part.name)
+  reader.sread(part.filename)
+  if part.isstr:
+    reader.sread(part.svalue)
+  else:
+    reader.sread(part.value)
+
+proc sread*(reader: var BufferedReader; blob: var Blob) =
+  var isfile: bool
+  reader.sread(isfile)
+  if isfile:
+    var file = new WebFile
+    file.isfile = true
+    reader.sread(file.path)
+    blob = file
+  else:
+    blob = Blob()
+    reader.sread(blob.ctype)
+    reader.sread(blob.size)
+    let buffer = alloc(blob.size)
+    blob.buffer = buffer
+    blob.deallocFun = proc() = dealloc(buffer)
+    if blob.size > 0:
+      reader.readData(blob.buffer, int(blob.size))
+
+proc sread*[T](reader: var BufferedReader; o: var Option[T]) =
+  var x: bool
+  reader.sread(x)
+  if x:
+    var m: T
+    reader.sread(m)
+    o = some(m)
+  else:
+    o = none(T)
+
+proc sread*[T, E](reader: var BufferedReader; o: var Result[T, E]) =
+  var x: bool
+  reader.sread(x)
+  if x:
+    when T isnot void:
+      var m: T
+      reader.sread(m)
+      o.ok(m)
+    else:
+      o.ok()
+  else:
+    when E isnot void:
+      var e: E
+      reader.sread(e)
+      o.err(e)
+    else:
+      o.err()
diff --git a/src/io/bufwriter.nim b/src/io/bufwriter.nim
index 57219100..cbee8b5b 100644
--- a/src/io/bufwriter.nim
+++ b/src/io/bufwriter.nim
@@ -16,6 +16,7 @@ type BufferedWriter* = object
   buffer: ptr UncheckedArray[uint8]
   bufSize: int
   bufLen: int
+  writeLen: bool
 
 {.warning[Deprecated]: off.}:
   proc `=destroy`(writer: var BufferedWriter) =
@@ -23,23 +24,28 @@ type BufferedWriter* = object
       dealloc(writer.buffer)
       writer.buffer = nil
 
-proc initWriter*(stream: DynStream; sizeInit = 64): BufferedWriter =
-  return BufferedWriter(
+proc initWriter*(stream: DynStream; sizeInit = 64; writeLen = false):
+    BufferedWriter =
+  var w = BufferedWriter(
     stream: stream,
     buffer: cast[ptr UncheckedArray[uint8]](alloc(sizeInit)),
     bufSize: sizeInit,
-    bufLen: 0
+    bufLen: 0,
+    writeLen: writeLen
   )
+  if writeLen: # add space for `len'
+    w.bufLen += sizeof(w.bufLen)
+    assert w.bufLen < sizeInit
+  return w
 
 proc flush*(writer: var BufferedWriter) =
-  let stream = writer.stream
-  var n = 0
-  while true:
-    n += stream.sendData(addr writer.buffer[n], writer.bufLen - n)
-    if n == writer.bufLen:
-      break
+  if writer.writeLen:
+    # subtract the length field's size
+    var realLen = writer.bufLen - sizeof(writer.bufLen)
+    copyMem(writer.buffer, addr realLen, sizeof(writer.bufLen))
+  writer.stream.sendDataLoop(writer.buffer, writer.bufLen)
   writer.bufLen = 0
-  stream.sflush()
+  writer.stream.sflush()
 
 proc deinit*(writer: var BufferedWriter) =
   dealloc(writer.buffer)
@@ -49,7 +55,14 @@ proc deinit*(writer: var BufferedWriter) =
 
 template withWriter*(stream: DynStream; w, body: untyped) =
   block:
-    var w {.inject.} = stream.initWriter()
+    var w = stream.initWriter()
+    body
+    w.flush()
+    w.deinit()
+
+template withPacketWriter*(stream: DynStream; w, body: untyped) =
+  block:
+    var w = stream.initWriter(writeLen = true)
     body
     w.flush()
     w.deinit()
diff --git a/src/io/dynstream.nim b/src/io/dynstream.nim
index ec38c595..ac1269f5 100644
--- a/src/io/dynstream.nim
+++ b/src/io/dynstream.nim
@@ -39,6 +39,13 @@ proc sendData*(s: DynStream; buffer: openArray[char]): int {.inline.} =
 proc sendData*(s: DynStream; buffer: openArray[uint8]): int {.inline.} =
   return s.sendData(unsafeAddr buffer[0], buffer.len)
 
+proc sendDataLoop*(s: DynStream; buffer: pointer; len: int) =
+  var n = 0
+  while true:
+    n += s.sendData(addr cast[ptr UncheckedArray[uint8]](buffer)[n], len - n)
+    if n == len:
+      break
+
 proc dsClose(s: Stream) =
   DynStream(s).sclose()
 
diff --git a/src/io/serialize.nim b/src/io/serialize.nim
index 3ffa40ac..0b54bea3 100644
--- a/src/io/serialize.nim
+++ b/src/io/serialize.nim
@@ -25,10 +25,8 @@ func slen*(s: string): int
 proc sread*(stream: Stream, b: var bool)
 func slen*(b: bool): int
 
-proc sread*(stream: Stream, url: var URL)
 func slen*(url: URL): int
 
-proc sread*(stream: Stream, tup: var tuple)
 func slen*(tup: tuple): int
 
 proc sread*[I, T](stream: Stream, a: var array[I, T])
@@ -46,10 +44,8 @@ func slen*(obj: object): int
 proc sread*(stream: Stream, obj: var ref object)
 func slen*(obj: ref object): int
 
-proc sread*(stream: Stream, part: var FormDataEntry)
 func slen*(part: FormDataEntry): int
 
-proc sread*(stream: Stream, blob: var Blob)
 func slen*(blob: Blob): int
 
 proc sread*[T](stream: Stream, o: var Option[T])
@@ -112,27 +108,11 @@ proc sread*(stream: Stream, b: var bool) =
 func slen*(b: bool): int =
   return sizeof(uint8)
 
-proc sread*(stream: Stream, url: var URL) =
-  var s: string
-  stream.sread(s)
-  if s == "":
-    url = nil
-  else:
-    let x = newURL(s)
-    if x.isSome:
-      url = x.get
-    else:
-      url = nil
-
 func slen*(url: URL): int =
   if url == nil:
     return slen("")
   return slen(url.serialize())
 
-proc sread*(stream: Stream, tup: var tuple) =
-  for f in tup.fields:
-    stream.sread(f)
-
 func slen*(tup: tuple): int =
   for f in tup.fields:
     result += slen(f)
@@ -193,20 +173,6 @@ func slen*(obj: ref object): int =
   if obj != nil:
     result += slen(obj[])
 
-proc sread*(stream: Stream, part: var FormDataEntry) =
-  var isstr: bool
-  stream.sread(isstr)
-  if isstr:
-    part = FormDataEntry(isstr: true)
-  else:
-    part = FormDataEntry(isstr: false)
-  stream.sread(part.name)
-  stream.sread(part.filename)
-  if part.isstr:
-    stream.sread(part.svalue)
-  else:
-    stream.sread(part.value)
-
 func slen*(part: FormDataEntry): int =
   result += slen(part.isstr)
   result += slen(part.name)
@@ -216,25 +182,6 @@ func slen*(part: FormDataEntry): int =
   else:
     result += slen(part.value)
 
-proc sread*(stream: Stream, blob: var Blob) =
-  var isfile: bool
-  stream.sread(isfile)
-  if isfile:
-    var file = new WebFile
-    file.isfile = true
-    stream.sread(file.path)
-    blob = file
-  else:
-    blob = Blob()
-    stream.sread(blob.ctype)
-    stream.sread(blob.size)
-    let buffer = alloc(blob.size)
-    blob.buffer = buffer
-    blob.deallocFun = proc() = dealloc(buffer)
-    if blob.size > 0:
-      let n = stream.readData(blob.buffer, int(blob.size))
-      assert n == int(blob.size)
-
 func slen*(blob: Blob): int =
   result += slen(blob.isfile)
   if blob.isfile:
diff --git a/src/io/serversocket.nim b/src/io/serversocket.nim
index 020c5ed3..a6acc555 100644
--- a/src/io/serversocket.nim
+++ b/src/io/serversocket.nim
@@ -19,10 +19,10 @@ proc getSocketPath*(pid: int): string =
 {.compile: "bind_unix.c".}
 proc bind_unix_from_c(fd: cint, path: cstring, pathlen: cint): cint {.importc.}
 
-proc initServerSocket*(pid: int; buffered = true; blocking = true):
-    ServerSocket =
+proc initServerSocket*(pid: int; blocking = true): ServerSocket =
   createDir(SocketDirectory)
-  let sock = newSocket(Domain.AF_UNIX, SockType.SOCK_STREAM, Protocol.IPPROTO_IP, buffered)
+  let sock = newSocket(Domain.AF_UNIX, SockType.SOCK_STREAM,
+    Protocol.IPPROTO_IP, buffered = false)
   if not blocking:
     sock.getFd().setBlocking(false)
   let path = getSocketPath(pid)
diff --git a/src/io/socketstream.nim b/src/io/socketstream.nim
index e02108ce..78e7fb3e 100644
--- a/src/io/socketstream.nim
+++ b/src/io/socketstream.nim
@@ -61,10 +61,9 @@ method sclose*(s: SocketStream) =
 proc connect_unix_from_c(fd: cint, path: cstring, pathlen: cint): cint
   {.importc.}
 
-proc connectSocketStream*(path: string, buffered = true, blocking = true):
-    SocketStream =
+proc connectSocketStream*(path: string; blocking = true): SocketStream =
   let sock = newSocket(Domain.AF_UNIX, SockType.SOCK_STREAM,
-    Protocol.IPPROTO_IP, buffered)
+    Protocol.IPPROTO_IP, buffered = false)
   if not blocking:
     sock.getFd().setBlocking(false)
   if connect_unix_from_c(cint(sock.getFd()), cstring(path),
@@ -77,10 +76,10 @@ proc connectSocketStream*(path: string, buffered = true, blocking = true):
   )
   result.addStreamIface()
 
-proc connectSocketStream*(pid: int, buffered = true, blocking = true):
+proc connectSocketStream*(pid: int; blocking = true):
     SocketStream =
   try:
-    return connectSocketStream(getSocketPath(pid), buffered, blocking)
+    return connectSocketStream(getSocketPath(pid), blocking)
   except OSError:
     return nil
 
diff --git a/src/loader/loader.nim b/src/loader/loader.nim
index 9c9c2e8a..b0c5b6ab 100644
--- a/src/loader/loader.nim
+++ b/src/loader/loader.nim
@@ -24,6 +24,7 @@ import std/streams
 import std/strutils
 import std/tables
 
+import io/bufreader
 import io/bufwriter
 import io/posixstream
 import io/promise
@@ -444,9 +445,10 @@ proc setupRequestDefaults*(request: Request; config: LoaderClientConfig) =
     if r != "":
       request.headers["Referer"] = r
 
-proc onLoad(ctx: LoaderContext; stream: SocketStream; client: ClientData) =
+proc load(ctx: LoaderContext; stream: SocketStream; client: ClientData;
+    r: var BufferedReader) =
   var request: Request
-  stream.sread(request)
+  r.sread(request)
   let handle = newLoaderHandle(stream, ctx.getOutputId(), client.pid,
     request.suspended)
   when defined(debug):
@@ -463,13 +465,14 @@ proc onLoad(ctx: LoaderContext; stream: SocketStream; client: ClientData) =
       request.proxy = client.config.proxy
     ctx.loadResource(client, request, handle)
 
-proc addClient(ctx: LoaderContext; stream: SocketStream) =
+proc addClient(ctx: LoaderContext; stream: SocketStream;
+    r: var BufferedReader) =
   var key: ClientKey
   var pid: int
   var config: LoaderClientConfig
-  stream.sread(key)
-  stream.sread(pid)
-  stream.sread(config)
+  r.sread(key)
+  r.sread(pid)
+  r.sread(config)
   stream.withWriter w:
     if pid in ctx.clientData or key == default(ClientKey):
       w.swrite(false)
@@ -484,20 +487,22 @@ proc cleanup(client: ClientData) =
     if it.refc == 0:
       discard unlink(cstring(it.path))
 
-proc removeClient(ctx: LoaderContext; stream: SocketStream) =
+proc removeClient(ctx: LoaderContext; stream: SocketStream;
+    r: var BufferedReader) =
   var pid: int
-  stream.sread(pid)
+  r.sread(pid)
   if pid in ctx.clientData:
     let client = ctx.clientData[pid]
     client.cleanup()
     ctx.clientData.del(pid)
   stream.close()
 
-proc addCacheFile(ctx: LoaderContext; stream: SocketStream) =
+proc addCacheFile(ctx: LoaderContext; stream: SocketStream;
+    r: var BufferedReader) =
   var outputId: int
   var targetPid: int
-  stream.sread(outputId)
-  stream.sread(targetPid)
+  r.sread(outputId)
+  r.sread(targetPid)
   let output = ctx.findOutput(outputId)
   assert output != nil
   let targetClient = ctx.clientData[targetPid]
@@ -507,11 +512,12 @@ proc addCacheFile(ctx: LoaderContext; stream: SocketStream) =
     w.swrite(file)
   stream.close()
 
-proc redirectToFile(ctx: LoaderContext; stream: SocketStream) =
+proc redirectToFile(ctx: LoaderContext; stream: SocketStream;
+    r: var BufferedReader) =
   var outputId: int
   var targetPath: string
-  stream.sread(outputId)
-  stream.sread(targetPath)
+  r.sread(outputId)
+  r.sread(targetPath)
   let output = ctx.findOutput(outputId)
   var success = false
   if output != nil:
@@ -520,15 +526,16 @@ proc redirectToFile(ctx: LoaderContext; stream: SocketStream) =
     w.swrite(success)
   stream.close()
 
-proc shareCachedItem(ctx: LoaderContext; stream: SocketStream) =
+proc shareCachedItem(ctx: LoaderContext; stream: SocketStream;
+    r: var BufferedReader) =
   # share a cached file with another buffer. this is for newBufferFrom
   # (i.e. view source)
   var sourcePid: int # pid of source client
   var targetPid: int # pid of target client
   var id: int
-  stream.sread(sourcePid)
-  stream.sread(targetPid)
-  stream.sread(id)
+  r.sread(sourcePid)
+  r.sread(targetPid)
+  r.sread(id)
   let sourceClient = ctx.clientData[sourcePid]
   let targetClient = ctx.clientData[targetPid]
   let n = sourceClient.cacheMap.find(id)
@@ -537,17 +544,17 @@ proc shareCachedItem(ctx: LoaderContext; stream: SocketStream) =
   targetClient.cacheMap.add(item)
   stream.close()
 
-proc passFd(ctx: LoaderContext; stream: SocketStream) =
+proc passFd(ctx: LoaderContext; stream: SocketStream; r: var BufferedReader) =
   var id: string
-  stream.sread(id)
+  r.sread(id)
   let fd = stream.recvFileHandle()
   ctx.passedFdMap[id] = fd
   stream.close()
 
 proc removeCachedItem(ctx: LoaderContext; stream: SocketStream;
-    client: ClientData) =
+    client: ClientData; r: var BufferedReader) =
   var id: int
-  stream.sread(id)
+  r.sread(id)
   let n = client.cacheMap.find(id)
   if n != -1:
     let item = client.cacheMap[n]
@@ -557,11 +564,12 @@ proc removeCachedItem(ctx: LoaderContext; stream: SocketStream;
       discard unlink(cstring(item.path))
   stream.close()
 
-proc tee(ctx: LoaderContext; stream: SocketStream; client: ClientData) =
+proc tee(ctx: LoaderContext; stream: SocketStream; client: ClientData;
+    r: var BufferedReader) =
   var sourceId: int
   var targetPid: int
-  stream.sread(sourceId)
-  stream.sread(targetPid)
+  r.sread(sourceId)
+  r.sread(targetPid)
   let output = ctx.findOutput(sourceId)
   # only allow tee'ing outputs owned by client
   doAssert output.ownerPid == client.pid
@@ -576,9 +584,10 @@ proc tee(ctx: LoaderContext; stream: SocketStream; client: ClientData) =
       w.swrite(-1)
     stream.close()
 
-proc suspend(ctx: LoaderContext; stream: SocketStream; client: ClientData) =
+proc suspend(ctx: LoaderContext; stream: SocketStream; client: ClientData;
+    r: var BufferedReader) =
   var ids: seq[int]
-  stream.sread(ids)
+  r.sread(ids)
   for id in ids:
     let output = ctx.findOutput(id)
     if output != nil:
@@ -588,9 +597,10 @@ proc suspend(ctx: LoaderContext; stream: SocketStream; client: ClientData) =
         output.registered = false
         ctx.selector.unregister(output.ostream.fd)
 
-proc resume(ctx: LoaderContext; stream: SocketStream; client: ClientData) =
+proc resume(ctx: LoaderContext; stream: SocketStream; client: ClientData;
+    r: var BufferedReader) =
   var ids: seq[int]
-  stream.sread(ids)
+  r.sread(ids)
   for id in ids:
     let output = ctx.findOutput(id)
     if output != nil:
@@ -602,52 +612,53 @@ proc resume(ctx: LoaderContext; stream: SocketStream; client: ClientData) =
 proc acceptConnection(ctx: LoaderContext) =
   let stream = ctx.ssock.acceptSocketStream()
   try:
-    var myPid: int
-    var key: ClientKey
-    stream.sread(myPid)
-    stream.sread(key)
-    if myPid notin ctx.clientData:
-      # possibly already removed
-      stream.close()
-      return
-    let client = ctx.clientData[myPid]
-    if client.key != key:
-      # ditto
-      stream.close()
-      return
-    var cmd: LoaderCommand
-    stream.sread(cmd)
-    template privileged_command =
-      doAssert client == ctx.pagerClient
-    case cmd
-    of lcAddClient:
-      privileged_command
-      ctx.addClient(stream)
-    of lcRemoveClient:
-      privileged_command
-      ctx.removeClient(stream)
-    of lcAddCacheFile:
-      privileged_command
-      ctx.addCacheFile(stream)
-    of lcShareCachedItem:
-      privileged_command
-      ctx.shareCachedItem(stream)
-    of lcPassFd:
-      privileged_command
-      ctx.passFd(stream)
-    of lcRedirectToFile:
-      privileged_command
-      ctx.redirectToFile(stream)
-    of lcRemoveCachedItem:
-      ctx.removeCachedItem(stream, client)
-    of lcLoad:
-      ctx.onLoad(stream, client)
-    of lcTee:
-      ctx.tee(stream, client)
-    of lcSuspend:
-      ctx.suspend(stream, client)
-    of lcResume:
-      ctx.resume(stream, client)
+    stream.withPacketReader r:
+      var myPid: int
+      var key: ClientKey
+      r.sread(myPid)
+      r.sread(key)
+      if myPid notin ctx.clientData:
+        # possibly already removed
+        stream.close()
+        return
+      let client = ctx.clientData[myPid]
+      if client.key != key:
+        # ditto
+        stream.close()
+        return
+      var cmd: LoaderCommand
+      r.sread(cmd)
+      template privileged_command =
+        doAssert client == ctx.pagerClient
+      case cmd
+      of lcAddClient:
+        privileged_command
+        ctx.addClient(stream, r)
+      of lcRemoveClient:
+        privileged_command
+        ctx.removeClient(stream, r)
+      of lcAddCacheFile:
+        privileged_command
+        ctx.addCacheFile(stream, r)
+      of lcShareCachedItem:
+        privileged_command
+        ctx.shareCachedItem(stream, r)
+      of lcPassFd:
+        privileged_command
+        ctx.passFd(stream, r)
+      of lcRedirectToFile:
+        privileged_command
+        ctx.redirectToFile(stream, r)
+      of lcRemoveCachedItem:
+        ctx.removeCachedItem(stream, client, r)
+      of lcLoad:
+        ctx.load(stream, client, r)
+      of lcTee:
+        ctx.tee(stream, client, r)
+      of lcSuspend:
+        ctx.suspend(stream, client, r)
+      of lcResume:
+        ctx.resume(stream, client, r)
   except ErrorBrokenPipe:
     # receiving end died while reading the file; give up.
     stream.close()
@@ -666,10 +677,8 @@ proc initLoaderContext(fd: cint; config: LoaderConfig): LoaderContext =
     selector: newSelector[int]()
   )
   gctx = ctx
-  #TODO ideally, buffered would be true. Unfortunately this conflicts with
-  # sendFileHandle/recvFileHandle.
   let myPid = getCurrentProcessId()
-  ctx.ssock = initServerSocket(myPid, buffered = false, blocking = true)
+  ctx.ssock = initServerSocket(myPid, blocking = true)
   let sfd = int(ctx.ssock.sock.getFd())
   ctx.selector.registerHandle(sfd, {Read}, 0)
   # The server has been initialized, so the main process can resume execution.
@@ -684,27 +693,28 @@ proc initLoaderContext(fd: cint; config: LoaderConfig): LoaderContext =
       dir &= '/'
   # get pager's key
   let stream = ctx.ssock.acceptSocketStream()
-  block readNullKey:
-    var pid: int # ignore pid
-    stream.sread(pid)
-    # pager's key is still null
+  stream.withPacketReader r:
+    block readNullKey:
+      var pid: int # ignore pid
+      r.sread(pid)
+      # pager's key is still null
+      var key: ClientKey
+      r.sread(key)
+      doAssert key == default(ClientKey)
+    var cmd: LoaderCommand
+    r.sread(cmd)
+    doAssert cmd == lcAddClient
     var key: ClientKey
-    stream.sread(key)
-    doAssert key == default(ClientKey)
-  var cmd: LoaderCommand
-  stream.sread(cmd)
-  doAssert cmd == lcAddClient
-  var key: ClientKey
-  var pid: int
-  var config: LoaderClientConfig
-  stream.sread(key)
-  stream.sread(pid)
-  stream.sread(config)
-  stream.withWriter w:
-    w.swrite(true)
-  ctx.pagerClient = ClientData(key: key, pid: pid, config: config)
-  ctx.clientData[pid] = ctx.pagerClient
-  stream.close()
+    var pid: int
+    var config: LoaderClientConfig
+    r.sread(key)
+    r.sread(pid)
+    r.sread(config)
+    stream.withWriter w:
+      w.swrite(true)
+    ctx.pagerClient = ClientData(key: key, pid: pid, config: config)
+    ctx.clientData[pid] = ctx.pagerClient
+    stream.close()
   # unblock main socket
   ctx.ssock.sock.getFd().setBlocking(false)
   # for CGI
@@ -829,20 +839,21 @@ proc getRedirect*(response: Response; request: Request): Request =
             destination = request.destination)
   return nil
 
-proc connect(loader: FileLoader; buffered = true): SocketStream =
-  let stream = connectSocketStream(loader.process, buffered, blocking = true)
-  if stream != nil:
-    stream.withWriter w:
-      w.swrite(loader.clientPid)
-      w.swrite(loader.key)
-    return stream
-  return nil
+template withLoaderPacketWriter(stream: SocketStream; loader: FileLoader;
+    w, body: untyped) =
+  stream.withPacketWriter w:
+    w.swrite(loader.clientPid)
+    w.swrite(loader.key)
+    body
+
+proc connect(loader: FileLoader): SocketStream =
+  return connectSocketStream(loader.process, blocking = true)
 
 # Start a request. This should not block (not for a significant amount of time
 # anyway).
 proc startRequest*(loader: FileLoader; request: Request): SocketStream =
-  let stream = loader.connect(buffered = false)
-  stream.withWriter w:
+  let stream = loader.connect()
+  stream.withLoaderPacketWriter loader, w:
     w.swrite(lcLoad)
     w.swrite(request)
   return stream
@@ -862,8 +873,8 @@ proc fetch*(loader: FileLoader; input: Request): FetchPromise =
 
 proc reconnect*(loader: FileLoader; data: ConnectData) =
   data.stream.close()
-  let stream = loader.connect(buffered = false)
-  stream.withWriter w:
+  let stream = loader.connect()
+  stream.withLoaderPacketWriter loader, w:
     w.swrite(lcLoad)
     w.swrite(data.request)
   let fd = int(stream.fd)
@@ -891,21 +902,21 @@ proc switchStream*(loader: FileLoader; data: var OngoingData;
 
 proc suspend*(loader: FileLoader; fds: seq[int]) =
   let stream = loader.connect()
-  stream.withWriter w:
+  stream.withLoaderPacketWriter loader, w:
     w.swrite(lcSuspend)
     w.swrite(fds)
   stream.close()
 
 proc resume*(loader: FileLoader; fds: seq[int]) =
   let stream = loader.connect()
-  stream.withWriter w:
+  stream.withLoaderPacketWriter loader, w:
     w.swrite(lcResume)
     w.swrite(fds)
   stream.close()
 
 proc tee*(loader: FileLoader; sourceId, targetPid: int): (SocketStream, int) =
-  let stream = loader.connect(buffered = false)
-  stream.withWriter w:
+  let stream = loader.connect()
+  stream.withLoaderPacketWriter loader, w:
     w.swrite(lcTee)
     w.swrite(sourceId)
     w.swrite(targetPid)
@@ -918,7 +929,7 @@ proc addCacheFile*(loader: FileLoader; outputId, targetPid: int):
   let stream = loader.connect()
   if stream == nil:
     return (-1, "")
-  stream.withWriter w:
+  stream.withLoaderPacketWriter loader, w:
     w.swrite(lcAddCacheFile)
     w.swrite(outputId)
     w.swrite(targetPid)
@@ -933,7 +944,7 @@ proc redirectToFile*(loader: FileLoader; outputId: int; targetPath: string):
   let stream = loader.connect()
   if stream == nil:
     return false
-  stream.withWriter w:
+  stream.withLoaderPacketWriter loader, w:
     w.swrite(lcRedirectToFile)
     w.swrite(outputId)
     w.swrite(targetPath)
@@ -1035,7 +1046,7 @@ proc doRequest*(loader: FileLoader; request: Request): Response =
 proc shareCachedItem*(loader: FileLoader; id, targetPid: int) =
   let stream = loader.connect()
   if stream != nil:
-    stream.withWriter w:
+    stream.withLoaderPacketWriter loader, w:
       w.swrite(lcShareCachedItem)
       w.swrite(loader.clientPid)
       w.swrite(targetPid)
@@ -1043,9 +1054,9 @@ proc shareCachedItem*(loader: FileLoader; id, targetPid: int) =
     stream.close()
 
 proc passFd*(loader: FileLoader; id: string; fd: FileHandle) =
-  let stream = loader.connect(buffered = false)
+  let stream = loader.connect()
   if stream != nil:
-    stream.withWriter w:
+    stream.withLoaderPacketWriter loader, w:
       w.swrite(lcPassFd)
       w.swrite(id)
     stream.sendFileHandle(fd)
@@ -1054,7 +1065,7 @@ proc passFd*(loader: FileLoader; id: string; fd: FileHandle) =
 proc removeCachedItem*(loader: FileLoader; cacheId: int) =
   let stream = loader.connect()
   if stream != nil:
-    stream.withWriter w:
+    stream.withLoaderPacketWriter loader, w:
       w.swrite(lcRemoveCachedItem)
       w.swrite(cacheId)
     stream.close()
@@ -1062,7 +1073,7 @@ proc removeCachedItem*(loader: FileLoader; cacheId: int) =
 proc addClient*(loader: FileLoader; key: ClientKey; pid: int;
     config: LoaderClientConfig): bool =
   let stream = loader.connect()
-  stream.withWriter w:
+  stream.withLoaderPacketWriter loader, w:
     w.swrite(lcAddClient)
     w.swrite(key)
     w.swrite(pid)
@@ -1073,7 +1084,7 @@ proc addClient*(loader: FileLoader; key: ClientKey; pid: int;
 proc removeClient*(loader: FileLoader; pid: int) =
   let stream = loader.connect()
   if stream != nil:
-    stream.withWriter w:
+    stream.withLoaderPacketWriter loader, w:
       w.swrite(lcRemoveClient)
       w.swrite(pid)
     stream.close()
diff --git a/src/local/client.nim b/src/local/client.nim
index 8e29bdc6..29eaeb73 100644
--- a/src/local/client.nim
+++ b/src/local/client.nim
@@ -400,7 +400,7 @@ proc acceptBuffers(client: Client) =
     client.selector.registerHandle(fd, {Read, Write}, 0)
   for item in pager.procmap:
     let container = item.container
-    let stream = connectSocketStream(container.process, buffered = false)
+    let stream = connectSocketStream(container.process)
     if stream == nil:
       pager.alert("Error: failed to set up buffer")
       continue
diff --git a/src/server/buffer.nim b/src/server/buffer.nim
index a3c94bf3..71deed82 100644
--- a/src/server/buffer.nim
+++ b/src/server/buffer.nim
@@ -27,6 +27,7 @@ import html/enums
 import html/env
 import html/event
 import html/formdata as formdata_impl
+import io/bufreader
 import io/bufstream
 import io/bufwriter
 import io/posixstream
@@ -121,11 +122,9 @@ type
     charset: Charset
     cacheId: int
     outputId: int
-    dummyStream: StringStream
 
   InterfaceOpaque = ref object
     stream: SocketStream
-    dummyStream: StringStream
     len: int
 
   BufferInterface* = ref object
@@ -146,16 +145,12 @@ type
 proc getFromOpaque[T](opaque: pointer, res: var T) =
   let opaque = cast[InterfaceOpaque](opaque)
   if opaque.len != 0:
-    let dummyStream = opaque.dummyStream
-    dummyStream.setPosition(0)
-    dummyStream.data = newString(opaque.len)
-    let n = opaque.stream.readData(addr dummyStream.data[0], opaque.len)
-    assert n == opaque.len
-    dummyStream.sread(res)
+    var r = opaque.stream.initReader(opaque.len)
+    r.sread(res)
 
 proc newBufferInterface*(stream: SocketStream, registerFun: proc(fd: int)):
     BufferInterface =
-  let opaque = InterfaceOpaque(stream: stream, dummyStream: newStringStream())
+  let opaque = InterfaceOpaque(stream: stream)
   result = BufferInterface(
     map: newPromiseMap(cast[pointer](opaque)),
     packetid: 1, # ids below 1 are invalid
@@ -976,7 +971,7 @@ proc clone*(buffer: Buffer, newurl: URL): int {.proxy.} =
       # We ignore errors; not much we can do with them here :/
       discard buffer.rewind(buffer.bytesRead, unregister = false)
     buffer.pstream.close()
-    let ssock = initServerSocket(myPid, buffered = false)
+    let ssock = initServerSocket(myPid)
     buffer.ssock = ssock
     ps.write(char(0))
     buffer.url = newurl
@@ -1712,7 +1707,7 @@ proc markURL*(buffer: Buffer; schemes: seq[string]) {.proxy.} =
   buffer.do_reshape()
 
 macro bufferDispatcher(funs: static ProxyMap; buffer: Buffer;
-    cmd: BufferCommand; packetid: int) =
+    cmd: BufferCommand; packetid: int; r: var BufferedReader) =
   let switch = newNimNode(nnkCaseStmt)
   switch.add(ident("cmd"))
   for k, v in funs:
@@ -1727,7 +1722,7 @@ macro bufferDispatcher(funs: static ProxyMap; buffer: Buffer;
         let typ = param[^2]
         stmts.add(quote do:
           var `id`: `typ`
-          `buffer`.dummyStream.sread(`id`)
+          `r`.sread(`id`)
         )
         call.add(id)
     var rval: NimNode
@@ -1773,13 +1768,10 @@ proc readCommand(buffer: Buffer) =
   var len: int
   var packetid: int
   buffer.pstream.sread(len)
-  buffer.dummyStream.setPosition(0)
-  buffer.dummyStream.data = newString(len)
-  let n = buffer.pstream.readData(addr buffer.dummyStream.data[0], len)
-  assert n == len
-  buffer.dummyStream.sread(cmd)
-  buffer.dummyStream.sread(packetid)
-  bufferDispatcher(ProxyFunctions, buffer, cmd, packetid)
+  var r = buffer.pstream.initReader(len)
+  r.sread(cmd)
+  r.sread(packetid)
+  bufferDispatcher(ProxyFunctions, buffer, cmd, packetid, r)
 
 proc handleRead(buffer: Buffer, fd: int): bool =
   if fd == buffer.rfd:
@@ -1869,8 +1861,7 @@ proc launchBuffer*(config: BufferConfig; url: URL; request: Request;
     url: url,
     charsetStack: charsetStack,
     cacheId: -1,
-    outputId: -1,
-    dummyStream: newStringStream()
+    outputId: -1
   )
   buffer.charset = buffer.charsetStack.pop()
   socks.sread(buffer.loader.key)
diff --git a/src/server/forkserver.nim b/src/server/forkserver.nim
index 4a616a2e..60981213 100644
--- a/src/server/forkserver.nim
+++ b/src/server/forkserver.nim
@@ -5,6 +5,7 @@ import std/streams
 import std/tables
 
 import config/config
+import io/bufreader
 import io/bufwriter
 import io/posixstream
 import io/serialize
@@ -24,7 +25,7 @@ type
     fcForkBuffer, fcForkLoader, fcRemoveChild, fcLoadConfig
 
   ForkServer* = ref object
-    istream: Stream
+    istream: PosixStream
     ostream: PosixStream
     estream*: PosixStream
 
@@ -35,7 +36,7 @@ type
     loaderPid: int
 
 proc newFileLoader*(forkserver: ForkServer; config: LoaderConfig): FileLoader =
-  forkserver.ostream.withWriter w:
+  forkserver.ostream.withPacketWriter w:
     w.swrite(fcForkLoader)
     w.swrite(config)
   var process: int
@@ -43,19 +44,19 @@ proc newFileLoader*(forkserver: ForkServer; config: LoaderConfig): FileLoader =
   return FileLoader(process: process, clientPid: getCurrentProcessId())
 
 proc loadForkServerConfig*(forkserver: ForkServer, config: Config) =
-  forkserver.ostream.withWriter w:
+  forkserver.ostream.withPacketWriter w:
     w.swrite(fcLoadConfig)
     w.swrite(config.getForkServerConfig())
 
 proc removeChild*(forkserver: ForkServer, pid: int) =
-  forkserver.ostream.withWriter w:
+  forkserver.ostream.withPacketWriter w:
     w.swrite(fcRemoveChild)
     w.swrite(pid)
 
 proc forkBuffer*(forkserver: ForkServer; config: BufferConfig; url: URL;
     request: Request; attrs: WindowAttributes; ishtml: bool;
     charsetStack: seq[Charset]): int =
-  forkserver.ostream.withWriter w:
+  forkserver.ostream.withPacketWriter w:
     w.swrite(fcForkBuffer)
     w.swrite(config)
     w.swrite(url)
@@ -109,19 +110,19 @@ proc forkLoader(ctx: var ForkServerContext, config: LoaderConfig): int =
   return pid
 
 var gssock: ServerSocket
-proc forkBuffer(ctx: var ForkServerContext): int =
+proc forkBuffer(ctx: var ForkServerContext; r: var BufferedReader): int =
   var config: BufferConfig
   var url: URL
   var request: Request
   var attrs: WindowAttributes
   var ishtml: bool
   var charsetStack: seq[Charset]
-  ctx.istream.sread(config)
-  ctx.istream.sread(url)
-  ctx.istream.sread(request)
-  ctx.istream.sread(attrs)
-  ctx.istream.sread(ishtml)
-  ctx.istream.sread(charsetStack)
+  r.sread(config)
+  r.sread(url)
+  r.sread(request)
+  r.sread(attrs)
+  r.sread(ishtml)
+  r.sread(charsetStack)
   var pipefd: array[2, cint]
   if pipe(pipefd) == -1:
     raise newException(Defect, "Failed to open pipe.")
@@ -137,7 +138,7 @@ proc forkBuffer(ctx: var ForkServerContext): int =
     zeroMem(addr ctx, sizeof(ctx))
     discard close(pipefd[0]) # close read
     let pid = getCurrentProcessId()
-    let ssock = initServerSocket(pid, buffered = false)
+    let ssock = initServerSocket(pid)
     gssock = ssock
     onSignal SIGTERM:
       # This will be overridden after buffer has been set up; it is only
@@ -182,34 +183,35 @@ proc runForkServer() =
   signal(SIGCHLD, SIG_IGN)
   while true:
     try:
-      var cmd: ForkCommand
-      ctx.istream.sread(cmd)
-      case cmd
-      of fcRemoveChild:
-        var pid: int
-        ctx.istream.sread(pid)
-        let i = ctx.children.find(pid)
-        if i != -1:
-          ctx.children.del(i)
-      of fcForkBuffer:
-        let r = ctx.forkBuffer()
-        ctx.ostream.withWriter w:
-          w.swrite(r)
-      of fcForkLoader:
-        assert ctx.loaderPid == 0
-        var config: LoaderConfig
-        ctx.istream.sread(config)
-        let pid = ctx.forkLoader(config)
-        ctx.ostream.withWriter w:
-          w.swrite(pid)
-        ctx.loaderPid = pid
-        ctx.children.add(pid)
-      of fcLoadConfig:
-        var config: ForkServerConfig
-        ctx.istream.sread(config)
-        set_cjk_ambiguous(config.ambiguous_double)
-        SocketDirectory = config.tmpdir
-      ctx.ostream.flush()
+      ctx.istream.withPacketReader r:
+        var cmd: ForkCommand
+        r.sread(cmd)
+        case cmd
+        of fcRemoveChild:
+          var pid: int
+          r.sread(pid)
+          let i = ctx.children.find(pid)
+          if i != -1:
+            ctx.children.del(i)
+        of fcForkBuffer:
+          let r = ctx.forkBuffer(r)
+          ctx.ostream.withWriter w:
+            w.swrite(r)
+        of fcForkLoader:
+          assert ctx.loaderPid == 0
+          var config: LoaderConfig
+          r.sread(config)
+          let pid = ctx.forkLoader(config)
+          ctx.ostream.withWriter w:
+            w.swrite(pid)
+          ctx.loaderPid = pid
+          ctx.children.add(pid)
+        of fcLoadConfig:
+          var config: ForkServerConfig
+          r.sread(config)
+          set_cjk_ambiguous(config.ambiguous_double)
+          SocketDirectory = config.tmpdir
+        ctx.ostream.flush()
     except EOFError:
       # EOF
       break
@@ -255,13 +257,10 @@ proc newForkServer*(): ForkServer =
     discard close(pipefd_in[0]) # close read
     discard close(pipefd_out[1]) # close write
     discard close(pipefd_err[1]) # close write
-    var readf: File
-    if not open(readf, pipefd_out[0], fmRead):
-      raise newException(Defect, "Failed to open input handle")
     let estream = newPosixStream(pipefd_err[0])
     estream.setBlocking(false)
     return ForkServer(
       ostream: newPosixStream(pipefd_in[1]),
-      istream: newFileStream(readf),
+      istream: newPosixStream(pipefd_out[0]),
       estream: estream
     )