diff options
author | bptato <nincsnevem662@gmail.com> | 2024-03-16 23:08:57 +0100 |
---|---|---|
committer | bptato <nincsnevem662@gmail.com> | 2024-03-16 23:08:57 +0100 |
commit | 7fd73dff220f7dd5075884059f1c4edc88036813 (patch) | |
tree | ed3c758152ea78011331b49b1191e499b6ae3372 /src/loader | |
parent | 1e81fdf28bcd25c5fb1c2638b74ddb9d51bd5b72 (diff) | |
download | chawan-7fd73dff220f7dd5075884059f1c4edc88036813.tar.gz |
io: add BuferedWriter
Unsurprisingly enough, calling `write` a million times is never going to be very fast. BufferedWriter basically does the same thing as serialize.swrite did, but queues up writes in batches before sending them. TODO: give sread a similar treatment
Diffstat (limited to 'src/loader')
-rw-r--r-- | src/loader/loader.nim | 113 | ||||
-rw-r--r-- | src/loader/loaderhandle.nim | 21 |
2 files changed, 76 insertions, 58 deletions
diff --git a/src/loader/loader.nim b/src/loader/loader.nim index fdc87eb2..3b454b4d 100644 --- a/src/loader/loader.nim +++ b/src/loader/loader.nim @@ -25,6 +25,7 @@ import std/strutils import std/tables import config/chapath +import io/bufwriter import io/posixstream import io/promise import io/serialize @@ -466,11 +467,12 @@ proc addClient(ctx: LoaderContext; stream: SocketStream) = stream.sread(key) stream.sread(pid) stream.sread(config) - if pid in ctx.clientData or key == default(ClientKey): - stream.swrite(false) - else: - ctx.clientData[pid] = ClientData(pid: pid, key: key, config: config) - stream.swrite(true) + stream.withWriter w: + if pid in ctx.clientData or key == default(ClientKey): + w.swrite(false) + else: + ctx.clientData[pid] = ClientData(pid: pid, key: key, config: config) + w.swrite(true) stream.close() proc cleanup(client: ClientData) = @@ -497,8 +499,9 @@ proc addCacheFile(ctx: LoaderContext; stream: SocketStream) = assert output != nil let targetClient = ctx.clientData[targetPid] let (id, file) = ctx.addCacheFile(targetClient, output) - stream.swrite(id) - stream.swrite(file) + stream.withWriter w: + w.swrite(id) + w.swrite(file) stream.close() proc redirectToFile(ctx: LoaderContext; stream: SocketStream) = @@ -510,7 +513,8 @@ proc redirectToFile(ctx: LoaderContext; stream: SocketStream) = var success = false if output != nil: success = ctx.redirectToFile(output, targetPath) - stream.swrite(success) + stream.withWriter w: + w.swrite(success) stream.close() proc shareCachedItem(ctx: LoaderContext; stream: SocketStream) = @@ -561,10 +565,12 @@ proc tee(ctx: LoaderContext; stream: SocketStream; client: ClientData) = if output != nil: let id = ctx.getOutputId() output.tee(stream, id, targetPid) - stream.swrite(id) + stream.withWriter w: + w.swrite(id) stream.setBlocking(false) else: - stream.swrite(-1) + stream.withWriter w: + w.swrite(-1) stream.close() proc suspend(ctx: LoaderContext; stream: SocketStream; client: ClientData) = @@ -692,7 +698,8 @@ proc initLoaderContext(fd: cint; config: LoaderConfig): LoaderContext = stream.sread(key) stream.sread(pid) stream.sread(config) - stream.swrite(true) + stream.withWriter w: + w.swrite(true) ctx.pagerClient = ClientData(key: key, pid: pid, config: config) ctx.clientData[pid] = ctx.pagerClient stream.close() @@ -815,8 +822,9 @@ proc getRedirect*(response: Response; request: Request): Request = proc connect(loader: FileLoader; buffered = true): SocketStream = let stream = connectSocketStream(loader.process, buffered, blocking = true) if stream != nil: - stream.swrite(loader.clientPid) - stream.swrite(loader.key) + stream.withWriter w: + w.swrite(loader.clientPid) + w.swrite(loader.key) return stream return nil @@ -824,9 +832,9 @@ proc connect(loader: FileLoader; buffered = true): SocketStream = # anyway). proc startRequest*(loader: FileLoader; request: Request): SocketStream = let stream = loader.connect(buffered = false) - stream.swrite(lcLoad) - stream.swrite(request) - stream.flush() + stream.withWriter w: + w.swrite(lcLoad) + w.swrite(request) return stream #TODO: add init @@ -845,9 +853,9 @@ proc fetch*(loader: FileLoader; input: Request): FetchPromise = proc reconnect*(loader: FileLoader; data: ConnectData) = data.stream.close() let stream = loader.connect(buffered = false) - stream.swrite(lcLoad) - stream.swrite(data.request) - stream.flush() + stream.withWriter w: + w.swrite(lcLoad) + w.swrite(data.request) let fd = int(stream.fd) loader.registerFun(fd) loader.connecting[fd] = ConnectData( @@ -873,21 +881,24 @@ proc switchStream*(loader: FileLoader; data: var OngoingData; proc suspend*(loader: FileLoader; fds: seq[int]) = let stream = loader.connect() - stream.swrite(lcSuspend) - stream.swrite(fds) + stream.withWriter w: + w.swrite(lcSuspend) + w.swrite(fds) stream.close() proc resume*(loader: FileLoader; fds: seq[int]) = let stream = loader.connect() - stream.swrite(lcResume) - stream.swrite(fds) + stream.withWriter w: + w.swrite(lcResume) + w.swrite(fds) stream.close() proc tee*(loader: FileLoader; sourceId, targetPid: int): (SocketStream, int) = let stream = loader.connect(buffered = false) - stream.swrite(lcTee) - stream.swrite(sourceId) - stream.swrite(targetPid) + stream.withWriter w: + w.swrite(lcTee) + w.swrite(sourceId) + w.swrite(targetPid) var outputId: int stream.sread(outputId) return (stream, outputId) @@ -897,10 +908,10 @@ proc addCacheFile*(loader: FileLoader; outputId, targetPid: int): let stream = loader.connect() if stream == nil: return (-1, "") - stream.swrite(lcAddCacheFile) - stream.swrite(outputId) - stream.swrite(targetPid) - stream.flush() + stream.withWriter w: + w.swrite(lcAddCacheFile) + w.swrite(outputId) + w.swrite(targetPid) var outputId: int var cacheFile: string stream.sread(outputId) @@ -912,10 +923,10 @@ proc redirectToFile*(loader: FileLoader; outputId: int; targetPath: string): let stream = loader.connect() if stream == nil: return false - stream.swrite(lcRedirectToFile) - stream.swrite(outputId) - stream.swrite(targetPath) - stream.flush() + stream.withWriter w: + w.swrite(lcRedirectToFile) + w.swrite(outputId) + w.swrite(targetPath) stream.sread(result) const BufferSize = 4096 @@ -1014,41 +1025,45 @@ proc doRequest*(loader: FileLoader; request: Request): Response = proc shareCachedItem*(loader: FileLoader; id, targetPid: int) = let stream = loader.connect() if stream != nil: - stream.swrite(lcShareCachedItem) - stream.swrite(loader.clientPid) - stream.swrite(targetPid) - stream.swrite(id) + stream.withWriter w: + w.swrite(lcShareCachedItem) + w.swrite(loader.clientPid) + w.swrite(targetPid) + w.swrite(id) stream.close() proc passFd*(loader: FileLoader; id: string; fd: FileHandle) = let stream = loader.connect(buffered = false) if stream != nil: - stream.swrite(lcPassFd) - stream.swrite(id) + stream.withWriter w: + w.swrite(lcPassFd) + w.swrite(id) stream.sendFileHandle(fd) stream.close() proc removeCachedItem*(loader: FileLoader; cacheId: int) = let stream = loader.connect() if stream != nil: - stream.swrite(lcRemoveCachedItem) - stream.swrite(cacheId) + stream.withWriter w: + w.swrite(lcRemoveCachedItem) + w.swrite(cacheId) stream.close() proc addClient*(loader: FileLoader; key: ClientKey; pid: int; config: LoaderClientConfig): bool = let stream = loader.connect() - stream.swrite(lcAddClient) - stream.swrite(key) - stream.swrite(pid) - stream.swrite(config) - stream.flush() + stream.withWriter w: + w.swrite(lcAddClient) + w.swrite(key) + w.swrite(pid) + w.swrite(config) stream.sread(result) stream.close() proc removeClient*(loader: FileLoader; pid: int) = let stream = loader.connect() if stream != nil: - stream.swrite(lcRemoveClient) - stream.swrite(pid) + stream.withWriter w: + w.swrite(lcRemoveClient) + w.swrite(pid) stream.close() diff --git a/src/loader/loaderhandle.nim b/src/loader/loaderhandle.nim index dbe998c6..6e1b53a9 100644 --- a/src/loader/loaderhandle.nim +++ b/src/loader/loaderhandle.nim @@ -3,8 +3,8 @@ import std/net import std/streams import std/tables +import io/bufwriter import io/posixstream -import io/serialize import loader/headers when defined(debug): @@ -129,12 +129,13 @@ proc sendResult*(handle: LoaderHandle; res: int; msg = "") = let output = handle.output let blocking = output.ostream.blocking output.ostream.setBlocking(true) - output.ostream.swrite(res) - if res == 0: # success - assert msg == "" - output.ostream.swrite(output.outputId) - else: # error - output.ostream.swrite(msg) + output.ostream.withWriter w: + w.swrite(res) + if res == 0: # success + assert msg == "" + w.swrite(output.outputId) + else: # error + w.swrite(msg) output.ostream.setBlocking(blocking) proc sendStatus*(handle: LoaderHandle; status: uint16) = @@ -142,7 +143,8 @@ proc sendStatus*(handle: LoaderHandle; status: uint16) = inc handle.rstate let blocking = handle.output.ostream.blocking handle.output.ostream.setBlocking(true) - handle.output.ostream.swrite(status) + handle.output.ostream.withWriter w: + w.swrite(status) handle.output.ostream.setBlocking(blocking) proc sendHeaders*(handle: LoaderHandle; headers: Headers) = @@ -150,7 +152,8 @@ proc sendHeaders*(handle: LoaderHandle; headers: Headers) = inc handle.rstate let blocking = handle.output.ostream.blocking handle.output.ostream.setBlocking(true) - handle.output.ostream.swrite(headers) + handle.output.ostream.withWriter w: + w.swrite(headers) handle.output.ostream.setBlocking(blocking) proc recvData*(ps: PosixStream; buffer: LoaderBuffer): int {.inline.} = |