about summary refs log tree commit diff stats
path: root/src
diff options
context:
space:
mode:
authorbptato <nincsnevem662@gmail.com>2024-03-16 23:08:57 +0100
committerbptato <nincsnevem662@gmail.com>2024-03-16 23:08:57 +0100
commit7fd73dff220f7dd5075884059f1c4edc88036813 (patch)
treeed3c758152ea78011331b49b1191e499b6ae3372 /src
parent1e81fdf28bcd25c5fb1c2638b74ddb9d51bd5b72 (diff)
downloadchawan-7fd73dff220f7dd5075884059f1c4edc88036813.tar.gz
io: add BuferedWriter
Unsurprisingly enough, calling `write` a million times is never going to
be very fast.

BufferedWriter basically does the same thing as serialize.swrite did,
but queues up writes in batches before sending them.

TODO: give sread a similar treatment
Diffstat (limited to 'src')
-rw-r--r--src/io/bufwriter.nim175
-rw-r--r--src/io/dynstream.nim3
-rw-r--r--src/io/serialize.nim106
-rw-r--r--src/loader/loader.nim113
-rw-r--r--src/loader/loaderhandle.nim21
-rw-r--r--src/local/client.nim5
-rw-r--r--src/server/buffer.nim40
-rw-r--r--src/server/forkserver.nim54
8 files changed, 310 insertions, 207 deletions
diff --git a/src/io/bufwriter.nim b/src/io/bufwriter.nim
new file mode 100644
index 00000000..99c7ed94
--- /dev/null
+++ b/src/io/bufwriter.nim
@@ -0,0 +1,175 @@
+# Write data to streams.
+
+import std/options
+import std/sets
+import std/tables
+
+import io/dynstream
+
+import types/blob
+import types/formdata
+import types/url
+import types/opt
+
+type BufferedWriter* = object
+  stream: DynStream
+  buffer: ptr UncheckedArray[uint8]
+  bufSize: int
+  bufLen: int
+
+{.warning[Deprecated]: off.}:
+  proc `=destroy`(writer: var BufferedWriter) =
+    if writer.buffer != nil:
+      dealloc(writer.buffer)
+      writer.buffer = nil
+
+proc initWriter*(stream: DynStream; sizeInit = 64): BufferedWriter =
+  return BufferedWriter(
+    stream: stream,
+    buffer: cast[ptr UncheckedArray[uint8]](alloc(sizeInit)),
+    bufSize: sizeInit,
+    bufLen: 0
+  )
+
+proc flush*(writer: var BufferedWriter) =
+  let stream = writer.stream
+  var n = 0
+  while true:
+    n += stream.sendData(addr writer.buffer[n], writer.bufLen - n)
+    if n == writer.bufLen:
+      break
+  writer.bufLen = 0
+  stream.sflush()
+
+proc deinit*(writer: var BufferedWriter) =
+  dealloc(writer.buffer)
+  writer.buffer = nil
+  writer.bufSize = 0
+  writer.bufLen = 0
+
+template withWriter*(stream: DynStream; w, body: untyped) =
+  var w {.inject.} = stream.initWriter()
+  body
+  w.flush()
+  w.deinit()
+
+proc swrite*(writer: var BufferedWriter; n: SomeNumber)
+proc swrite*[T](writer: var BufferedWriter; s: set[T])
+proc swrite*[T: enum](writer: var BufferedWriter; x: T)
+proc swrite*(writer: var BufferedWriter; s: string)
+proc swrite*(writer: var BufferedWriter; b: bool)
+proc swrite*(writer: var BufferedWriter; url: URL)
+proc swrite*(writer: var BufferedWriter; tup: tuple)
+proc swrite*[I, T](writer: var BufferedWriter; a: array[I, T])
+proc swrite*(writer: var BufferedWriter; s: seq)
+proc swrite*[U, V](writer: var BufferedWriter; t: Table[U, V])
+proc swrite*(writer: var BufferedWriter; obj: object)
+proc swrite*(writer: var BufferedWriter; obj: ref object)
+proc swrite*(writer: var BufferedWriter; part: FormDataEntry)
+proc swrite*(writer: var BufferedWriter; blob: Blob)
+proc swrite*[T](writer: var BufferedWriter; o: Option[T])
+proc swrite*[T, E](writer: var BufferedWriter; o: Result[T, E])
+
+proc writeData(writer: var BufferedWriter; buffer: pointer; len: int) =
+  let targetLen = writer.bufLen + len
+  let missing = targetLen - writer.bufSize
+  if missing > 0:
+    let target = writer.bufSize + missing
+    writer.bufSize *= 2
+    if writer.bufSize < target:
+      writer.bufSize = target
+    let p = realloc(writer.buffer, writer.bufSize)
+    writer.buffer = cast[ptr UncheckedArray[uint8]](p)
+  copyMem(addr writer.buffer[writer.bufLen], buffer, len)
+  writer.bufLen = targetLen
+
+proc swrite*(writer: var BufferedWriter; n: SomeNumber) =
+  writer.writeData(unsafeAddr n, sizeof(n))
+
+proc swrite*[T: enum](writer: var BufferedWriter; x: T) =
+  static:
+    doAssert sizeof(int) >= sizeof(T)
+  writer.swrite(int(x))
+
+proc swrite*[T](writer: var BufferedWriter; s: set[T]) =
+  writer.swrite(s.card)
+  for e in s:
+    writer.swrite(e)
+
+proc swrite*(writer: var BufferedWriter; s: string) =
+  writer.swrite(s.len)
+  if s.len > 0:
+    writer.writeData(unsafeAddr s[0], s.len)
+
+proc swrite*(writer: var BufferedWriter; b: bool) =
+  if b:
+    writer.swrite(1u8)
+  else:
+    writer.swrite(0u8)
+
+proc swrite*(writer: var BufferedWriter; url: URL) =
+  if url != nil:
+    writer.swrite(url.serialize())
+  else:
+    writer.swrite("")
+
+proc swrite*(writer: var BufferedWriter; tup: tuple) =
+  for f in tup.fields:
+    writer.swrite(f)
+
+proc swrite*[I, T](writer: var BufferedWriter; a: array[I, T]) =
+  for x in a:
+    writer.swrite(x)
+
+proc swrite*(writer: var BufferedWriter; s: seq) =
+  writer.swrite(s.len)
+  for x in s:
+    writer.swrite(x)
+
+proc swrite*[U, V](writer: var BufferedWriter; t: Table[U, V]) =
+  writer.swrite(t.len)
+  for k, v in t:
+    writer.swrite(k)
+    writer.swrite(v)
+
+proc swrite*(writer: var BufferedWriter; obj: object) =
+  for f in obj.fields:
+    writer.swrite(f)
+
+proc swrite*(writer: var BufferedWriter; obj: ref object) =
+  writer.swrite(obj != nil)
+  if obj != nil:
+    writer.swrite(obj[])
+
+proc swrite*(writer: var BufferedWriter; part: FormDataEntry) =
+  writer.swrite(part.isstr)
+  writer.swrite(part.name)
+  writer.swrite(part.filename)
+  if part.isstr:
+    writer.swrite(part.svalue)
+  else:
+    writer.swrite(part.value)
+
+#TODO clean up this mess
+proc swrite*(writer: var BufferedWriter; blob: Blob) =
+  writer.swrite(blob.isfile)
+  if blob.isfile:
+    writer.swrite(WebFile(blob).path)
+  else:
+    writer.swrite(blob.ctype)
+    writer.swrite(blob.size)
+    writer.writeData(blob.buffer, int(blob.size))
+
+proc swrite*[T](writer: var BufferedWriter; o: Option[T]) =
+  writer.swrite(o.isSome)
+  if o.isSome:
+    writer.swrite(o.get)
+
+proc swrite*[T, E](writer: var BufferedWriter; o: Result[T, E]) =
+  writer.swrite(o.isOk)
+  if o.isOk:
+    when not (T is void):
+      writer.swrite(o.get)
+  else:
+    when not (E is void):
+      writer.swrite(o.error)
diff --git a/src/io/dynstream.nim b/src/io/dynstream.nim
index d4c7760f..ec38c595 100644
--- a/src/io/dynstream.nim
+++ b/src/io/dynstream.nim
@@ -24,6 +24,9 @@ method seek*(s: DynStream; off: int) {.base.} =
 method sclose*(s: DynStream) {.base.} =
   assert false
 
+method sflush*(s: DynStream) {.base.} =
+  discard
+
 proc recvData*(s: DynStream; buffer: var openArray[uint8]): int {.inline.} =
   return s.recvData(addr buffer[0], buffer.len)
 
diff --git a/src/io/serialize.nim b/src/io/serialize.nim
index 4dcb79f0..3ffa40ac 100644
--- a/src/io/serialize.nim
+++ b/src/io/serialize.nim
@@ -10,73 +10,54 @@ import types/formdata
 import types/url
 import types/opt
 
-proc swrite*(stream: Stream, n: SomeNumber)
 proc sread*(stream: Stream, n: var SomeNumber)
 func slen*(n: SomeNumber): int
 
-proc swrite*[T](stream: Stream, s: set[T])
 proc sread*[T](stream: Stream, s: var set[T])
 func slen*[T](s: set[T]): int
 
-proc swrite*[T: enum](stream: Stream, x: T)
 proc sread*[T: enum](stream: Stream, x: var T)
 func slen*[T: enum](x: T): int
 
-proc swrite*(stream: Stream, s: string)
 proc sread*(stream: Stream, s: var string)
 func slen*(s: string): int
 
-proc swrite*(stream: Stream, b: bool)
 proc sread*(stream: Stream, b: var bool)
 func slen*(b: bool): int
 
-proc swrite*(stream: Stream, url: URL)
 proc sread*(stream: Stream, url: var URL)
 func slen*(url: URL): int
 
-proc swrite*(stream: Stream, tup: tuple)
 proc sread*(stream: Stream, tup: var tuple)
 func slen*(tup: tuple): int
 
-proc swrite*[I, T](stream: Stream, a: array[I, T])
 proc sread*[I, T](stream: Stream, a: var array[I, T])
 func slen*[I, T](a: array[I, T]): int
 
-proc swrite*(stream: Stream, s: seq)
 proc sread*(stream: Stream, s: var seq)
 func slen*(s: seq): int
 
-proc swrite*[U, V](stream: Stream, t: Table[U, V])
 proc sread*[U, V](stream: Stream, t: var Table[U, V])
 func slen*[U, V](t: Table[U, V]): int
 
-proc swrite*(stream: Stream, obj: object)
 proc sread*(stream: Stream, obj: var object)
 func slen*(obj: object): int
 
-proc swrite*(stream: Stream, obj: ref object)
 proc sread*(stream: Stream, obj: var ref object)
 func slen*(obj: ref object): int
 
-proc swrite*(stream: Stream, part: FormDataEntry)
 proc sread*(stream: Stream, part: var FormDataEntry)
 func slen*(part: FormDataEntry): int
 
-proc swrite*(stream: Stream, blob: Blob)
 proc sread*(stream: Stream, blob: var Blob)
 func slen*(blob: Blob): int
 
-proc swrite*[T](stream: Stream, o: Option[T])
 proc sread*[T](stream: Stream, o: var Option[T])
 func slen*[T](o: Option[T]): int
 
-proc swrite*[T, E](stream: Stream, o: Result[T, E])
 proc sread*[T, E](stream: Stream, o: var Result[T, E])
 func slen*[T, E](o: Result[T, E]): int
 
-proc swrite*(stream: Stream, n: SomeNumber) =
-  stream.write(n)
-
 proc sread*(stream: Stream, n: var SomeNumber) =
   if stream.readData(addr n, sizeof(n)) < sizeof(n):
     raise newException(EOFError, "eof")
@@ -84,11 +65,6 @@ proc sread*(stream: Stream, n: var SomeNumber) =
 func slen*(n: SomeNumber): int =
   return sizeof(n)
 
-proc swrite*[T: enum](stream: Stream, x: T) =
-  static:
-    doAssert sizeof(int) >= sizeof(T)
-  stream.swrite(int(x))
-
 proc sread*[T: enum](stream: Stream, x: var T) =
   var i: int
   stream.sread(i)
@@ -97,11 +73,6 @@ proc sread*[T: enum](stream: Stream, x: var T) =
 func slen*[T: enum](x: T): int =
   return sizeof(int)
 
-proc swrite*[T](stream: Stream, s: set[T]) =
-  stream.swrite(s.card)
-  for e in s:
-    stream.swrite(e)
-
 proc sread*[T](stream: Stream, s: var set[T]) =
   var len: int
   stream.sread(len)
@@ -115,10 +86,6 @@ func slen*[T](s: set[T]): int =
   for x in s:
     result += slen(x)
 
-proc swrite*(stream: Stream, s: string) =
-  stream.swrite(s.len)
-  stream.write(s)
-
 proc sread*(stream: Stream, s: var string) =
   var len: int
   stream.sread(len)
@@ -133,12 +100,6 @@ proc sread*(stream: Stream, s: var string) =
 func slen*(s: string): int =
   slen(s.len) + s.len
 
-proc swrite*(stream: Stream, b: bool) =
-  if b:
-    stream.swrite(1u8)
-  else:
-    stream.swrite(0u8)
-
 proc sread*(stream: Stream, b: var bool) =
   var n: uint8
   stream.sread(n)
@@ -151,12 +112,6 @@ proc sread*(stream: Stream, b: var bool) =
 func slen*(b: bool): int =
   return sizeof(uint8)
 
-proc swrite*(stream: Stream, url: URL) =
-  if url != nil:
-    stream.swrite(url.serialize())
-  else:
-    stream.swrite("")
-
 proc sread*(stream: Stream, url: var URL) =
   var s: string
   stream.sread(s)
@@ -174,10 +129,6 @@ func slen*(url: URL): int =
     return slen("")
   return slen(url.serialize())
 
-proc swrite*(stream: Stream, tup: tuple) =
-  for f in tup.fields:
-    stream.swrite(f)
-
 proc sread*(stream: Stream, tup: var tuple) =
   for f in tup.fields:
     stream.sread(f)
@@ -186,10 +137,6 @@ func slen*(tup: tuple): int =
   for f in tup.fields:
     result += slen(f)
 
-proc swrite*[I, T](stream: Stream; a: array[I, T]) =
-  for x in a:
-    stream.swrite(x)
-
 proc sread*[I, T](stream: Stream; a: var array[I, T]) =
   for x in a.mitems:
     stream.sread(x)
@@ -198,11 +145,6 @@ func slen*[I, T](a: array[I, T]): int =
   for x in a:
     result += slen(x)
 
-proc swrite*(stream: Stream, s: seq) =
-  stream.swrite(s.len)
-  for x in s:
-    stream.swrite(x)
-
 proc sread*(stream: Stream, s: var seq) =
   var len: int
   stream.sread(len)
@@ -215,12 +157,6 @@ func slen*(s: seq): int =
   for x in s:
     result += slen(x)
 
-proc swrite*[U, V](stream: Stream, t: Table[U, V]) =
-  stream.swrite(t.len)
-  for k, v in t:
-    stream.swrite(k)
-    stream.swrite(v)
-
 proc sread*[U, V](stream: Stream, t: var Table[U, V]) =
   var len: int
   stream.sread(len)
@@ -237,10 +173,6 @@ func slen*[U, V](t: Table[U, V]): int =
     result += slen(k)
     result += slen(v)
 
-proc swrite*(stream: Stream, obj: object) =
-  for f in obj.fields:
-    stream.swrite(f)
-
 proc sread*(stream: Stream, obj: var object) =
   for f in obj.fields:
     stream.sread(f)
@@ -249,11 +181,6 @@ func slen*(obj: object): int =
   for f in obj.fields:
     result += slen(f)
 
-proc swrite*(stream: Stream, obj: ref object) =
-  stream.swrite(obj != nil)
-  if obj != nil:
-    stream.swrite(obj[])
-
 proc sread*(stream: Stream, obj: var ref object) =
   var n: bool
   stream.sread(n)
@@ -266,15 +193,6 @@ func slen*(obj: ref object): int =
   if obj != nil:
     result += slen(obj[])
 
-proc swrite*(stream: Stream, part: FormDataEntry) =
-  stream.swrite(part.isstr)
-  stream.swrite(part.name)
-  stream.swrite(part.filename)
-  if part.isstr:
-    stream.swrite(part.svalue)
-  else:
-    stream.swrite(part.value)
-
 proc sread*(stream: Stream, part: var FormDataEntry) =
   var isstr: bool
   stream.sread(isstr)
@@ -298,16 +216,6 @@ func slen*(part: FormDataEntry): int =
   else:
     result += slen(part.value)
 
-#TODO clean up this mess
-proc swrite*(stream: Stream, blob: Blob) =
-  stream.swrite(blob.isfile)
-  if blob.isfile:
-    stream.swrite(WebFile(blob).path)
-  else:
-    stream.swrite(blob.ctype)
-    stream.swrite(blob.size)
-    stream.writeData(blob.buffer, int(blob.size))
-
 proc sread*(stream: Stream, blob: var Blob) =
   var isfile: bool
   stream.sread(isfile)
@@ -336,11 +244,6 @@ func slen*(blob: Blob): int =
     result += slen(blob.size)
     result += int(blob.size) #TODO ??
 
-proc swrite*[T](stream: Stream, o: Option[T]) =
-  stream.swrite(o.isSome)
-  if o.isSome:
-    stream.swrite(o.get)
-
 proc sread*[T](stream: Stream, o: var Option[T]) =
   var x: bool
   stream.sread(x)
@@ -356,15 +259,6 @@ func slen*[T](o: Option[T]): int =
   if o.isSome:
     result += slen(o.get)
 
-proc swrite*[T, E](stream: Stream, o: Result[T, E]) =
-  stream.swrite(o.isOk)
-  if o.isOk:
-    when not (T is void):
-      stream.swrite(o.get)
-  else:
-    when not (E is void):
-      stream.swrite(o.error)
-
 proc sread*[T, E](stream: Stream, o: var Result[T, E]) =
   var x: bool
   stream.sread(x)
diff --git a/src/loader/loader.nim b/src/loader/loader.nim
index fdc87eb2..3b454b4d 100644
--- a/src/loader/loader.nim
+++ b/src/loader/loader.nim
@@ -25,6 +25,7 @@ import std/strutils
 import std/tables
 
 import config/chapath
+import io/bufwriter
 import io/posixstream
 import io/promise
 import io/serialize
@@ -466,11 +467,12 @@ proc addClient(ctx: LoaderContext; stream: SocketStream) =
   stream.sread(key)
   stream.sread(pid)
   stream.sread(config)
-  if pid in ctx.clientData or key == default(ClientKey):
-    stream.swrite(false)
-  else:
-    ctx.clientData[pid] = ClientData(pid: pid, key: key, config: config)
-    stream.swrite(true)
+  stream.withWriter w:
+    if pid in ctx.clientData or key == default(ClientKey):
+      w.swrite(false)
+    else:
+      ctx.clientData[pid] = ClientData(pid: pid, key: key, config: config)
+      w.swrite(true)
   stream.close()
 
 proc cleanup(client: ClientData) =
@@ -497,8 +499,9 @@ proc addCacheFile(ctx: LoaderContext; stream: SocketStream) =
   assert output != nil
   let targetClient = ctx.clientData[targetPid]
   let (id, file) = ctx.addCacheFile(targetClient, output)
-  stream.swrite(id)
-  stream.swrite(file)
+  stream.withWriter w:
+    w.swrite(id)
+    w.swrite(file)
   stream.close()
 
 proc redirectToFile(ctx: LoaderContext; stream: SocketStream) =
@@ -510,7 +513,8 @@ proc redirectToFile(ctx: LoaderContext; stream: SocketStream) =
   var success = false
   if output != nil:
     success = ctx.redirectToFile(output, targetPath)
-  stream.swrite(success)
+  stream.withWriter w:
+    w.swrite(success)
   stream.close()
 
 proc shareCachedItem(ctx: LoaderContext; stream: SocketStream) =
@@ -561,10 +565,12 @@ proc tee(ctx: LoaderContext; stream: SocketStream; client: ClientData) =
   if output != nil:
     let id = ctx.getOutputId()
     output.tee(stream, id, targetPid)
-    stream.swrite(id)
+    stream.withWriter w:
+      w.swrite(id)
     stream.setBlocking(false)
   else:
-    stream.swrite(-1)
+    stream.withWriter w:
+      w.swrite(-1)
     stream.close()
 
 proc suspend(ctx: LoaderContext; stream: SocketStream; client: ClientData) =
@@ -692,7 +698,8 @@ proc initLoaderContext(fd: cint; config: LoaderConfig): LoaderContext =
   stream.sread(key)
   stream.sread(pid)
   stream.sread(config)
-  stream.swrite(true)
+  stream.withWriter w:
+    w.swrite(true)
   ctx.pagerClient = ClientData(key: key, pid: pid, config: config)
   ctx.clientData[pid] = ctx.pagerClient
   stream.close()
@@ -815,8 +822,9 @@ proc getRedirect*(response: Response; request: Request): Request =
 proc connect(loader: FileLoader; buffered = true): SocketStream =
   let stream = connectSocketStream(loader.process, buffered, blocking = true)
   if stream != nil:
-    stream.swrite(loader.clientPid)
-    stream.swrite(loader.key)
+    stream.withWriter w:
+      w.swrite(loader.clientPid)
+      w.swrite(loader.key)
     return stream
   return nil
 
@@ -824,9 +832,9 @@ proc connect(loader: FileLoader; buffered = true): SocketStream =
 # anyway).
 proc startRequest*(loader: FileLoader; request: Request): SocketStream =
   let stream = loader.connect(buffered = false)
-  stream.swrite(lcLoad)
-  stream.swrite(request)
-  stream.flush()
+  stream.withWriter w:
+    w.swrite(lcLoad)
+    w.swrite(request)
   return stream
 
 #TODO: add init
@@ -845,9 +853,9 @@ proc fetch*(loader: FileLoader; input: Request): FetchPromise =
 proc reconnect*(loader: FileLoader; data: ConnectData) =
   data.stream.close()
   let stream = loader.connect(buffered = false)
-  stream.swrite(lcLoad)
-  stream.swrite(data.request)
-  stream.flush()
+  stream.withWriter w:
+    w.swrite(lcLoad)
+    w.swrite(data.request)
   let fd = int(stream.fd)
   loader.registerFun(fd)
   loader.connecting[fd] = ConnectData(
@@ -873,21 +881,24 @@ proc switchStream*(loader: FileLoader; data: var OngoingData;
 
 proc suspend*(loader: FileLoader; fds: seq[int]) =
   let stream = loader.connect()
-  stream.swrite(lcSuspend)
-  stream.swrite(fds)
+  stream.withWriter w:
+    w.swrite(lcSuspend)
+    w.swrite(fds)
   stream.close()
 
 proc resume*(loader: FileLoader; fds: seq[int]) =
   let stream = loader.connect()
-  stream.swrite(lcResume)
-  stream.swrite(fds)
+  stream.withWriter w:
+    w.swrite(lcResume)
+    w.swrite(fds)
   stream.close()
 
 proc tee*(loader: FileLoader; sourceId, targetPid: int): (SocketStream, int) =
   let stream = loader.connect(buffered = false)
-  stream.swrite(lcTee)
-  stream.swrite(sourceId)
-  stream.swrite(targetPid)
+  stream.withWriter w:
+    w.swrite(lcTee)
+    w.swrite(sourceId)
+    w.swrite(targetPid)
   var outputId: int
   stream.sread(outputId)
   return (stream, outputId)
@@ -897,10 +908,10 @@ proc addCacheFile*(loader: FileLoader; outputId, targetPid: int):
   let stream = loader.connect()
   if stream == nil:
     return (-1, "")
-  stream.swrite(lcAddCacheFile)
-  stream.swrite(outputId)
-  stream.swrite(targetPid)
-  stream.flush()
+  stream.withWriter w:
+    w.swrite(lcAddCacheFile)
+    w.swrite(outputId)
+    w.swrite(targetPid)
   var outputId: int
   var cacheFile: string
   stream.sread(outputId)
@@ -912,10 +923,10 @@ proc redirectToFile*(loader: FileLoader; outputId: int; targetPath: string):
   let stream = loader.connect()
   if stream == nil:
     return false
-  stream.swrite(lcRedirectToFile)
-  stream.swrite(outputId)
-  stream.swrite(targetPath)
-  stream.flush()
+  stream.withWriter w:
+    w.swrite(lcRedirectToFile)
+    w.swrite(outputId)
+    w.swrite(targetPath)
   stream.sread(result)
 
 const BufferSize = 4096
@@ -1014,41 +1025,45 @@ proc doRequest*(loader: FileLoader; request: Request): Response =
 proc shareCachedItem*(loader: FileLoader; id, targetPid: int) =
   let stream = loader.connect()
   if stream != nil:
-    stream.swrite(lcShareCachedItem)
-    stream.swrite(loader.clientPid)
-    stream.swrite(targetPid)
-    stream.swrite(id)
+    stream.withWriter w:
+      w.swrite(lcShareCachedItem)
+      w.swrite(loader.clientPid)
+      w.swrite(targetPid)
+      w.swrite(id)
     stream.close()
 
 proc passFd*(loader: FileLoader; id: string; fd: FileHandle) =
   let stream = loader.connect(buffered = false)
   if stream != nil:
-    stream.swrite(lcPassFd)
-    stream.swrite(id)
+    stream.withWriter w:
+      w.swrite(lcPassFd)
+      w.swrite(id)
     stream.sendFileHandle(fd)
     stream.close()
 
 proc removeCachedItem*(loader: FileLoader; cacheId: int) =
   let stream = loader.connect()
   if stream != nil:
-    stream.swrite(lcRemoveCachedItem)
-    stream.swrite(cacheId)
+    stream.withWriter w:
+      w.swrite(lcRemoveCachedItem)
+      w.swrite(cacheId)
     stream.close()
 
 proc addClient*(loader: FileLoader; key: ClientKey; pid: int;
     config: LoaderClientConfig): bool =
   let stream = loader.connect()
-  stream.swrite(lcAddClient)
-  stream.swrite(key)
-  stream.swrite(pid)
-  stream.swrite(config)
-  stream.flush()
+  stream.withWriter w:
+    w.swrite(lcAddClient)
+    w.swrite(key)
+    w.swrite(pid)
+    w.swrite(config)
   stream.sread(result)
   stream.close()
 
 proc removeClient*(loader: FileLoader; pid: int) =
   let stream = loader.connect()
   if stream != nil:
-    stream.swrite(lcRemoveClient)
-    stream.swrite(pid)
+    stream.withWriter w:
+      w.swrite(lcRemoveClient)
+      w.swrite(pid)
     stream.close()
diff --git a/src/loader/loaderhandle.nim b/src/loader/loaderhandle.nim
index dbe998c6..6e1b53a9 100644
--- a/src/loader/loaderhandle.nim
+++ b/src/loader/loaderhandle.nim
@@ -3,8 +3,8 @@ import std/net
 import std/streams
 import std/tables
 
+import io/bufwriter
 import io/posixstream
-import io/serialize
 import loader/headers
 
 when defined(debug):
@@ -129,12 +129,13 @@ proc sendResult*(handle: LoaderHandle; res: int; msg = "") =
   let output = handle.output
   let blocking = output.ostream.blocking
   output.ostream.setBlocking(true)
-  output.ostream.swrite(res)
-  if res == 0: # success
-    assert msg == ""
-    output.ostream.swrite(output.outputId)
-  else: # error
-    output.ostream.swrite(msg)
+  output.ostream.withWriter w:
+    w.swrite(res)
+    if res == 0: # success
+      assert msg == ""
+      w.swrite(output.outputId)
+    else: # error
+      w.swrite(msg)
   output.ostream.setBlocking(blocking)
 
 proc sendStatus*(handle: LoaderHandle; status: uint16) =
@@ -142,7 +143,8 @@ proc sendStatus*(handle: LoaderHandle; status: uint16) =
   inc handle.rstate
   let blocking = handle.output.ostream.blocking
   handle.output.ostream.setBlocking(true)
-  handle.output.ostream.swrite(status)
+  handle.output.ostream.withWriter w:
+    w.swrite(status)
   handle.output.ostream.setBlocking(blocking)
 
 proc sendHeaders*(handle: LoaderHandle; headers: Headers) =
@@ -150,7 +152,8 @@ proc sendHeaders*(handle: LoaderHandle; headers: Headers) =
   inc handle.rstate
   let blocking = handle.output.ostream.blocking
   handle.output.ostream.setBlocking(true)
-  handle.output.ostream.swrite(headers)
+  handle.output.ostream.withWriter w:
+    w.swrite(headers)
   handle.output.ostream.setBlocking(blocking)
 
 proc recvData*(ps: PosixStream; buffer: LoaderBuffer): int {.inline.} =
diff --git a/src/local/client.nim b/src/local/client.nim
index 5c93e375..8633d051 100644
--- a/src/local/client.nim
+++ b/src/local/client.nim
@@ -21,9 +21,9 @@ import html/event
 import html/formdata
 import html/xmlhttprequest
 import io/bufstream
+import io/bufwriter
 import io/posixstream
 import io/promise
-import io/serialize
 import io/socketstream
 import js/base64
 import js/console
@@ -489,7 +489,8 @@ proc acceptBuffers(client: Client) =
       pager.alert("Error: failed to set up buffer")
       continue
     let key = pager.addLoaderClient(container.process, container.loaderConfig)
-    stream.swrite(key)
+    stream.withWriter w:
+      w.swrite(key)
     let loader = pager.loader
     if item.fdin != -1:
       let outputId = item.istreamOutputId
diff --git a/src/server/buffer.nim b/src/server/buffer.nim
index 96d9c5ff..7bdd46d0 100644
--- a/src/server/buffer.nim
+++ b/src/server/buffer.nim
@@ -27,6 +27,7 @@ import html/env
 import html/event
 import html/formdata as formdata_impl
 import io/bufstream
+import io/bufwriter
 import io/posixstream
 import io/promise
 import io/serialize
@@ -192,8 +193,9 @@ proc buildInterfaceProc(fun: NimNode, funid: string): tuple[fun, name: NimNode]
   let this2 = newIdentDefs(ident("iface"), ident("BufferInterface"))
   let thisval = this2[0]
   body.add(quote do:
-    `thisval`.stream.swrite(BufferCommand.`nup`)
-    `thisval`.stream.swrite(`thisval`.packetid)
+    var writer {.inject.} = `thisval`.stream.initWriter()
+    writer.swrite(BufferCommand.`nup`)
+    writer.swrite(`thisval`.packetid)
   )
   var params2: seq[NimNode]
   var retval2: NimNode
@@ -220,12 +222,14 @@ proc buildInterfaceProc(fun: NimNode, funid: string): tuple[fun, name: NimNode]
     let s = params2[i][0] # sym e.g. url
     body.add(quote do:
       when typeof(`s`) is FileHandle:
-        #TODO flush or something
+        writer.flush()
         SocketStream(`thisval`.stream.source).sendFileHandle(`s`)
       else:
-        `thisval`.stream.swrite(`s`)
+        writer.swrite(`s`)
     )
   body.add(quote do:
+    writer.flush()
+    writer.deinit()
     let promise = `addfun`
     inc `thisval`.packetid
     return promise
@@ -1099,11 +1103,11 @@ proc resolveTask[T](buffer: Buffer, cmd: BufferCommand, res: T) =
   if packetid == 0:
     return # no task to resolve (TODO this is kind of inefficient)
   let len = slen(buffer.tasks[cmd]) + slen(res)
-  buffer.pstream.swrite(len)
-  buffer.pstream.swrite(packetid)
+  buffer.pstream.withWriter w:
+    w.swrite(len)
+    w.swrite(packetid)
+    w.swrite(res)
   buffer.tasks[cmd] = 0
-  buffer.pstream.swrite(res)
-  buffer.pstream.flush()
 
 proc onload(buffer: Buffer) =
   case buffer.state
@@ -1664,7 +1668,8 @@ macro bufferDispatcher(funs: static ProxyMap, buffer: Buffer,
             let `id` = `buffer`.pstream.recvFileHandle()
           else:
             var `id`: `typ`
-            `buffer`.pstream.sread(`id`))
+            `buffer`.pstream.sread(`id`)
+        )
         call.add(id)
     var rval: NimNode
     if v.params[0].kind == nnkEmpty:
@@ -1677,15 +1682,19 @@ macro bufferDispatcher(funs: static ProxyMap, buffer: Buffer,
     if rval == nil:
       resolve.add(quote do:
         let len = slen(`packetid`)
-        buffer.pstream.swrite(len)
-        buffer.pstream.swrite(`packetid`)
+        block:
+          buffer.pstream.withWriter w:
+            w.swrite(len)
+            w.swrite(`packetid`)
       )
     else:
       resolve.add(quote do:
         let len = slen(`packetid`) + slen(`rval`)
-        buffer.pstream.swrite(len)
-        buffer.pstream.swrite(`packetid`)
-        buffer.pstream.swrite(`rval`)
+        block:
+          buffer.pstream.withWriter w:
+            w.swrite(len)
+            w.swrite(`packetid`)
+            w.swrite(`rval`)
       )
     if v.istask:
       let en = v.ename
@@ -1694,7 +1703,8 @@ macro bufferDispatcher(funs: static ProxyMap, buffer: Buffer,
           buffer.savetask = false
           buffer.tasks[BufferCommand.`en`] = `packetid`
         else:
-          `resolve`)
+          `resolve`
+      )
     else:
       stmts.add(resolve)
     ofbranch.add(stmts)
diff --git a/src/server/forkserver.nim b/src/server/forkserver.nim
index 2c00dd4e..12b25dc3 100644
--- a/src/server/forkserver.nim
+++ b/src/server/forkserver.nim
@@ -5,6 +5,7 @@ import std/streams
 import std/tables
 
 import config/config
+import io/bufwriter
 import io/posixstream
 import io/serialize
 import io/serversocket
@@ -23,44 +24,44 @@ type
 
   ForkServer* = ref object
     istream: Stream
-    ostream: Stream
+    ostream: PosixStream
     estream*: PosixStream
 
   ForkServerContext = object
-    istream: Stream
-    ostream: Stream
+    istream: PosixStream
+    ostream: PosixStream
     children: seq[int]
     loaderPid: int
 
 proc newFileLoader*(forkserver: ForkServer; config: LoaderConfig): FileLoader =
-  forkserver.ostream.swrite(fcForkLoader)
-  forkserver.ostream.swrite(config)
-  forkserver.ostream.flush()
+  forkserver.ostream.withWriter w:
+    w.swrite(fcForkLoader)
+    w.swrite(config)
   var process: int
   forkserver.istream.sread(process)
   return FileLoader(process: process, clientPid: getCurrentProcessId())
 
 proc loadForkServerConfig*(forkserver: ForkServer, config: Config) =
-  forkserver.ostream.swrite(fcLoadConfig)
-  forkserver.ostream.swrite(config.getForkServerConfig())
-  forkserver.ostream.flush()
+  forkserver.ostream.withWriter w:
+    w.swrite(fcLoadConfig)
+    w.swrite(config.getForkServerConfig())
 
 proc removeChild*(forkserver: ForkServer, pid: int) =
-  forkserver.ostream.swrite(fcRemoveChild)
-  forkserver.ostream.swrite(pid)
-  forkserver.ostream.flush()
+  forkserver.ostream.withWriter w:
+    w.swrite(fcRemoveChild)
+    w.swrite(pid)
 
 proc forkBuffer*(forkserver: ForkServer; config: BufferConfig; url: URL;
     request: Request; attrs: WindowAttributes; ishtml: bool;
     charsetStack: seq[Charset]): int =
-  forkserver.ostream.swrite(fcForkBuffer)
-  forkserver.ostream.swrite(config)
-  forkserver.ostream.swrite(url)
-  forkserver.ostream.swrite(request)
-  forkserver.ostream.swrite(attrs)
-  forkserver.ostream.swrite(ishtml)
-  forkserver.ostream.swrite(charsetStack)
-  forkserver.ostream.flush()
+  forkserver.ostream.withWriter w:
+    w.swrite(fcForkBuffer)
+    w.swrite(config)
+    w.swrite(url)
+    w.swrite(request)
+    w.swrite(attrs)
+    w.swrite(ishtml)
+    w.swrite(charsetStack)
   var bufferPid: int
   forkserver.istream.sread(bufferPid)
   bufferPid
@@ -186,13 +187,16 @@ proc runForkServer() =
         if i != -1:
           ctx.children.del(i)
       of fcForkBuffer:
-        ctx.ostream.swrite(ctx.forkBuffer())
+        let r = ctx.forkBuffer()
+        ctx.ostream.withWriter w:
+          w.swrite(r)
       of fcForkLoader:
         assert ctx.loaderPid == 0
         var config: LoaderConfig
         ctx.istream.sread(config)
         let pid = ctx.forkLoader(config)
-        ctx.ostream.swrite(pid)
+        ctx.ostream.withWriter w:
+          w.swrite(pid)
         ctx.loaderPid = pid
         ctx.children.add(pid)
       of fcLoadConfig:
@@ -246,15 +250,13 @@ proc newForkServer*(): ForkServer =
     discard close(pipefd_in[0]) # close read
     discard close(pipefd_out[1]) # close write
     discard close(pipefd_err[1]) # close write
-    var writef, readf: File
-    if not open(writef, pipefd_in[1], fmWrite):
-      raise newException(Defect, "Failed to open output handle")
+    var readf: File
     if not open(readf, pipefd_out[0], fmRead):
       raise newException(Defect, "Failed to open input handle")
     let estream = newPosixStream(pipefd_err[0])
     estream.setBlocking(false)
     return ForkServer(
-      ostream: newFileStream(writef),
+      ostream: newPosixStream(pipefd_in[1]),
       istream: newFileStream(readf),
       estream: estream
     )