about summary refs log tree commit diff stats
path: root/src/loader
diff options
context:
space:
mode:
authorbptato <nincsnevem662@gmail.com>2024-03-16 23:08:57 +0100
committerbptato <nincsnevem662@gmail.com>2024-03-16 23:08:57 +0100
commit7fd73dff220f7dd5075884059f1c4edc88036813 (patch)
treeed3c758152ea78011331b49b1191e499b6ae3372 /src/loader
parent1e81fdf28bcd25c5fb1c2638b74ddb9d51bd5b72 (diff)
downloadchawan-7fd73dff220f7dd5075884059f1c4edc88036813.tar.gz
io: add BuferedWriter
Unsurprisingly enough, calling `write` a million times is never going to
be very fast.

BufferedWriter basically does the same thing as serialize.swrite did,
but queues up writes in batches before sending them.

TODO: give sread a similar treatment
Diffstat (limited to 'src/loader')
-rw-r--r--src/loader/loader.nim113
-rw-r--r--src/loader/loaderhandle.nim21
2 files changed, 76 insertions, 58 deletions
diff --git a/src/loader/loader.nim b/src/loader/loader.nim
index fdc87eb2..3b454b4d 100644
--- a/src/loader/loader.nim
+++ b/src/loader/loader.nim
@@ -25,6 +25,7 @@ import std/strutils
 import std/tables
 
 import config/chapath
+import io/bufwriter
 import io/posixstream
 import io/promise
 import io/serialize
@@ -466,11 +467,12 @@ proc addClient(ctx: LoaderContext; stream: SocketStream) =
   stream.sread(key)
   stream.sread(pid)
   stream.sread(config)
-  if pid in ctx.clientData or key == default(ClientKey):
-    stream.swrite(false)
-  else:
-    ctx.clientData[pid] = ClientData(pid: pid, key: key, config: config)
-    stream.swrite(true)
+  stream.withWriter w:
+    if pid in ctx.clientData or key == default(ClientKey):
+      w.swrite(false)
+    else:
+      ctx.clientData[pid] = ClientData(pid: pid, key: key, config: config)
+      w.swrite(true)
   stream.close()
 
 proc cleanup(client: ClientData) =
@@ -497,8 +499,9 @@ proc addCacheFile(ctx: LoaderContext; stream: SocketStream) =
   assert output != nil
   let targetClient = ctx.clientData[targetPid]
   let (id, file) = ctx.addCacheFile(targetClient, output)
-  stream.swrite(id)
-  stream.swrite(file)
+  stream.withWriter w:
+    w.swrite(id)
+    w.swrite(file)
   stream.close()
 
 proc redirectToFile(ctx: LoaderContext; stream: SocketStream) =
@@ -510,7 +513,8 @@ proc redirectToFile(ctx: LoaderContext; stream: SocketStream) =
   var success = false
   if output != nil:
     success = ctx.redirectToFile(output, targetPath)
-  stream.swrite(success)
+  stream.withWriter w:
+    w.swrite(success)
   stream.close()
 
 proc shareCachedItem(ctx: LoaderContext; stream: SocketStream) =
@@ -561,10 +565,12 @@ proc tee(ctx: LoaderContext; stream: SocketStream; client: ClientData) =
   if output != nil:
     let id = ctx.getOutputId()
     output.tee(stream, id, targetPid)
-    stream.swrite(id)
+    stream.withWriter w:
+      w.swrite(id)
     stream.setBlocking(false)
   else:
-    stream.swrite(-1)
+    stream.withWriter w:
+      w.swrite(-1)
     stream.close()
 
 proc suspend(ctx: LoaderContext; stream: SocketStream; client: ClientData) =
@@ -692,7 +698,8 @@ proc initLoaderContext(fd: cint; config: LoaderConfig): LoaderContext =
   stream.sread(key)
   stream.sread(pid)
   stream.sread(config)
-  stream.swrite(true)
+  stream.withWriter w:
+    w.swrite(true)
   ctx.pagerClient = ClientData(key: key, pid: pid, config: config)
   ctx.clientData[pid] = ctx.pagerClient
   stream.close()
@@ -815,8 +822,9 @@ proc getRedirect*(response: Response; request: Request): Request =
 proc connect(loader: FileLoader; buffered = true): SocketStream =
   let stream = connectSocketStream(loader.process, buffered, blocking = true)
   if stream != nil:
-    stream.swrite(loader.clientPid)
-    stream.swrite(loader.key)
+    stream.withWriter w:
+      w.swrite(loader.clientPid)
+      w.swrite(loader.key)
     return stream
   return nil
 
@@ -824,9 +832,9 @@ proc connect(loader: FileLoader; buffered = true): SocketStream =
 # anyway).
 proc startRequest*(loader: FileLoader; request: Request): SocketStream =
   let stream = loader.connect(buffered = false)
-  stream.swrite(lcLoad)
-  stream.swrite(request)
-  stream.flush()
+  stream.withWriter w:
+    w.swrite(lcLoad)
+    w.swrite(request)
   return stream
 
 #TODO: add init
@@ -845,9 +853,9 @@ proc fetch*(loader: FileLoader; input: Request): FetchPromise =
 proc reconnect*(loader: FileLoader; data: ConnectData) =
   data.stream.close()
   let stream = loader.connect(buffered = false)
-  stream.swrite(lcLoad)
-  stream.swrite(data.request)
-  stream.flush()
+  stream.withWriter w:
+    w.swrite(lcLoad)
+    w.swrite(data.request)
   let fd = int(stream.fd)
   loader.registerFun(fd)
   loader.connecting[fd] = ConnectData(
@@ -873,21 +881,24 @@ proc switchStream*(loader: FileLoader; data: var OngoingData;
 
 proc suspend*(loader: FileLoader; fds: seq[int]) =
   let stream = loader.connect()
-  stream.swrite(lcSuspend)
-  stream.swrite(fds)
+  stream.withWriter w:
+    w.swrite(lcSuspend)
+    w.swrite(fds)
   stream.close()
 
 proc resume*(loader: FileLoader; fds: seq[int]) =
   let stream = loader.connect()
-  stream.swrite(lcResume)
-  stream.swrite(fds)
+  stream.withWriter w:
+    w.swrite(lcResume)
+    w.swrite(fds)
   stream.close()
 
 proc tee*(loader: FileLoader; sourceId, targetPid: int): (SocketStream, int) =
   let stream = loader.connect(buffered = false)
-  stream.swrite(lcTee)
-  stream.swrite(sourceId)
-  stream.swrite(targetPid)
+  stream.withWriter w:
+    w.swrite(lcTee)
+    w.swrite(sourceId)
+    w.swrite(targetPid)
   var outputId: int
   stream.sread(outputId)
   return (stream, outputId)
@@ -897,10 +908,10 @@ proc addCacheFile*(loader: FileLoader; outputId, targetPid: int):
   let stream = loader.connect()
   if stream == nil:
     return (-1, "")
-  stream.swrite(lcAddCacheFile)
-  stream.swrite(outputId)
-  stream.swrite(targetPid)
-  stream.flush()
+  stream.withWriter w:
+    w.swrite(lcAddCacheFile)
+    w.swrite(outputId)
+    w.swrite(targetPid)
   var outputId: int
   var cacheFile: string
   stream.sread(outputId)
@@ -912,10 +923,10 @@ proc redirectToFile*(loader: FileLoader; outputId: int; targetPath: string):
   let stream = loader.connect()
   if stream == nil:
     return false
-  stream.swrite(lcRedirectToFile)
-  stream.swrite(outputId)
-  stream.swrite(targetPath)
-  stream.flush()
+  stream.withWriter w:
+    w.swrite(lcRedirectToFile)
+    w.swrite(outputId)
+    w.swrite(targetPath)
   stream.sread(result)
 
 const BufferSize = 4096
@@ -1014,41 +1025,45 @@ proc doRequest*(loader: FileLoader; request: Request): Response =
 proc shareCachedItem*(loader: FileLoader; id, targetPid: int) =
   let stream = loader.connect()
   if stream != nil:
-    stream.swrite(lcShareCachedItem)
-    stream.swrite(loader.clientPid)
-    stream.swrite(targetPid)
-    stream.swrite(id)
+    stream.withWriter w:
+      w.swrite(lcShareCachedItem)
+      w.swrite(loader.clientPid)
+      w.swrite(targetPid)
+      w.swrite(id)
     stream.close()
 
 proc passFd*(loader: FileLoader; id: string; fd: FileHandle) =
   let stream = loader.connect(buffered = false)
   if stream != nil:
-    stream.swrite(lcPassFd)
-    stream.swrite(id)
+    stream.withWriter w:
+      w.swrite(lcPassFd)
+      w.swrite(id)
     stream.sendFileHandle(fd)
     stream.close()
 
 proc removeCachedItem*(loader: FileLoader; cacheId: int) =
   let stream = loader.connect()
   if stream != nil:
-    stream.swrite(lcRemoveCachedItem)
-    stream.swrite(cacheId)
+    stream.withWriter w:
+      w.swrite(lcRemoveCachedItem)
+      w.swrite(cacheId)
     stream.close()
 
 proc addClient*(loader: FileLoader; key: ClientKey; pid: int;
     config: LoaderClientConfig): bool =
   let stream = loader.connect()
-  stream.swrite(lcAddClient)
-  stream.swrite(key)
-  stream.swrite(pid)
-  stream.swrite(config)
-  stream.flush()
+  stream.withWriter w:
+    w.swrite(lcAddClient)
+    w.swrite(key)
+    w.swrite(pid)
+    w.swrite(config)
   stream.sread(result)
   stream.close()
 
 proc removeClient*(loader: FileLoader; pid: int) =
   let stream = loader.connect()
   if stream != nil:
-    stream.swrite(lcRemoveClient)
-    stream.swrite(pid)
+    stream.withWriter w:
+      w.swrite(lcRemoveClient)
+      w.swrite(pid)
     stream.close()
diff --git a/src/loader/loaderhandle.nim b/src/loader/loaderhandle.nim
index dbe998c6..6e1b53a9 100644
--- a/src/loader/loaderhandle.nim
+++ b/src/loader/loaderhandle.nim
@@ -3,8 +3,8 @@ import std/net
 import std/streams
 import std/tables
 
+import io/bufwriter
 import io/posixstream
-import io/serialize
 import loader/headers
 
 when defined(debug):
@@ -129,12 +129,13 @@ proc sendResult*(handle: LoaderHandle; res: int; msg = "") =
   let output = handle.output
   let blocking = output.ostream.blocking
   output.ostream.setBlocking(true)
-  output.ostream.swrite(res)
-  if res == 0: # success
-    assert msg == ""
-    output.ostream.swrite(output.outputId)
-  else: # error
-    output.ostream.swrite(msg)
+  output.ostream.withWriter w:
+    w.swrite(res)
+    if res == 0: # success
+      assert msg == ""
+      w.swrite(output.outputId)
+    else: # error
+      w.swrite(msg)
   output.ostream.setBlocking(blocking)
 
 proc sendStatus*(handle: LoaderHandle; status: uint16) =
@@ -142,7 +143,8 @@ proc sendStatus*(handle: LoaderHandle; status: uint16) =
   inc handle.rstate
   let blocking = handle.output.ostream.blocking
   handle.output.ostream.setBlocking(true)
-  handle.output.ostream.swrite(status)
+  handle.output.ostream.withWriter w:
+    w.swrite(status)
   handle.output.ostream.setBlocking(blocking)
 
 proc sendHeaders*(handle: LoaderHandle; headers: Headers) =
@@ -150,7 +152,8 @@ proc sendHeaders*(handle: LoaderHandle; headers: Headers) =
   inc handle.rstate
   let blocking = handle.output.ostream.blocking
   handle.output.ostream.setBlocking(true)
-  handle.output.ostream.swrite(headers)
+  handle.output.ostream.withWriter w:
+    w.swrite(headers)
   handle.output.ostream.setBlocking(blocking)
 
 proc recvData*(ps: PosixStream; buffer: LoaderBuffer): int {.inline.} =