diff options
author | bptato <nincsnevem662@gmail.com> | 2024-03-16 23:08:57 +0100 |
---|---|---|
committer | bptato <nincsnevem662@gmail.com> | 2024-03-16 23:08:57 +0100 |
commit | 7fd73dff220f7dd5075884059f1c4edc88036813 (patch) | |
tree | ed3c758152ea78011331b49b1191e499b6ae3372 /src | |
parent | 1e81fdf28bcd25c5fb1c2638b74ddb9d51bd5b72 (diff) | |
download | chawan-7fd73dff220f7dd5075884059f1c4edc88036813.tar.gz |
io: add BuferedWriter
Unsurprisingly enough, calling `write` a million times is never going to be very fast. BufferedWriter basically does the same thing as serialize.swrite did, but queues up writes in batches before sending them. TODO: give sread a similar treatment
Diffstat (limited to 'src')
-rw-r--r-- | src/io/bufwriter.nim | 175 | ||||
-rw-r--r-- | src/io/dynstream.nim | 3 | ||||
-rw-r--r-- | src/io/serialize.nim | 106 | ||||
-rw-r--r-- | src/loader/loader.nim | 113 | ||||
-rw-r--r-- | src/loader/loaderhandle.nim | 21 | ||||
-rw-r--r-- | src/local/client.nim | 5 | ||||
-rw-r--r-- | src/server/buffer.nim | 40 | ||||
-rw-r--r-- | src/server/forkserver.nim | 54 |
8 files changed, 310 insertions, 207 deletions
diff --git a/src/io/bufwriter.nim b/src/io/bufwriter.nim new file mode 100644 index 00000000..99c7ed94 --- /dev/null +++ b/src/io/bufwriter.nim @@ -0,0 +1,175 @@ +# Write data to streams. + +import std/options +import std/sets +import std/tables + +import io/dynstream + +import types/blob +import types/formdata +import types/url +import types/opt + +type BufferedWriter* = object + stream: DynStream + buffer: ptr UncheckedArray[uint8] + bufSize: int + bufLen: int + +{.warning[Deprecated]: off.}: + proc `=destroy`(writer: var BufferedWriter) = + if writer.buffer != nil: + dealloc(writer.buffer) + writer.buffer = nil + +proc initWriter*(stream: DynStream; sizeInit = 64): BufferedWriter = + return BufferedWriter( + stream: stream, + buffer: cast[ptr UncheckedArray[uint8]](alloc(sizeInit)), + bufSize: sizeInit, + bufLen: 0 + ) + +proc flush*(writer: var BufferedWriter) = + let stream = writer.stream + var n = 0 + while true: + n += stream.sendData(addr writer.buffer[n], writer.bufLen - n) + if n == writer.bufLen: + break + writer.bufLen = 0 + stream.sflush() + +proc deinit*(writer: var BufferedWriter) = + dealloc(writer.buffer) + writer.buffer = nil + writer.bufSize = 0 + writer.bufLen = 0 + +template withWriter*(stream: DynStream; w, body: untyped) = + var w {.inject.} = stream.initWriter() + body + w.flush() + w.deinit() + +proc swrite*(writer: var BufferedWriter; n: SomeNumber) +proc swrite*[T](writer: var BufferedWriter; s: set[T]) +proc swrite*[T: enum](writer: var BufferedWriter; x: T) +proc swrite*(writer: var BufferedWriter; s: string) +proc swrite*(writer: var BufferedWriter; b: bool) +proc swrite*(writer: var BufferedWriter; url: URL) +proc swrite*(writer: var BufferedWriter; tup: tuple) +proc swrite*[I, T](writer: var BufferedWriter; a: array[I, T]) +proc swrite*(writer: var BufferedWriter; s: seq) +proc swrite*[U, V](writer: var BufferedWriter; t: Table[U, V]) +proc swrite*(writer: var BufferedWriter; obj: object) +proc swrite*(writer: var BufferedWriter; obj: ref object) +proc swrite*(writer: var BufferedWriter; part: FormDataEntry) +proc swrite*(writer: var BufferedWriter; blob: Blob) +proc swrite*[T](writer: var BufferedWriter; o: Option[T]) +proc swrite*[T, E](writer: var BufferedWriter; o: Result[T, E]) + +proc writeData(writer: var BufferedWriter; buffer: pointer; len: int) = + let targetLen = writer.bufLen + len + let missing = targetLen - writer.bufSize + if missing > 0: + let target = writer.bufSize + missing + writer.bufSize *= 2 + if writer.bufSize < target: + writer.bufSize = target + let p = realloc(writer.buffer, writer.bufSize) + writer.buffer = cast[ptr UncheckedArray[uint8]](p) + copyMem(addr writer.buffer[writer.bufLen], buffer, len) + writer.bufLen = targetLen + +proc swrite*(writer: var BufferedWriter; n: SomeNumber) = + writer.writeData(unsafeAddr n, sizeof(n)) + +proc swrite*[T: enum](writer: var BufferedWriter; x: T) = + static: + doAssert sizeof(int) >= sizeof(T) + writer.swrite(int(x)) + +proc swrite*[T](writer: var BufferedWriter; s: set[T]) = + writer.swrite(s.card) + for e in s: + writer.swrite(e) + +proc swrite*(writer: var BufferedWriter; s: string) = + writer.swrite(s.len) + if s.len > 0: + writer.writeData(unsafeAddr s[0], s.len) + +proc swrite*(writer: var BufferedWriter; b: bool) = + if b: + writer.swrite(1u8) + else: + writer.swrite(0u8) + +proc swrite*(writer: var BufferedWriter; url: URL) = + if url != nil: + writer.swrite(url.serialize()) + else: + writer.swrite("") + +proc swrite*(writer: var BufferedWriter; tup: tuple) = + for f in tup.fields: + writer.swrite(f) + +proc swrite*[I, T](writer: var BufferedWriter; a: array[I, T]) = + for x in a: + writer.swrite(x) + +proc swrite*(writer: var BufferedWriter; s: seq) = + writer.swrite(s.len) + for x in s: + writer.swrite(x) + +proc swrite*[U, V](writer: var BufferedWriter; t: Table[U, V]) = + writer.swrite(t.len) + for k, v in t: + writer.swrite(k) + writer.swrite(v) + +proc swrite*(writer: var BufferedWriter; obj: object) = + for f in obj.fields: + writer.swrite(f) + +proc swrite*(writer: var BufferedWriter; obj: ref object) = + writer.swrite(obj != nil) + if obj != nil: + writer.swrite(obj[]) + +proc swrite*(writer: var BufferedWriter; part: FormDataEntry) = + writer.swrite(part.isstr) + writer.swrite(part.name) + writer.swrite(part.filename) + if part.isstr: + writer.swrite(part.svalue) + else: + writer.swrite(part.value) + +#TODO clean up this mess +proc swrite*(writer: var BufferedWriter; blob: Blob) = + writer.swrite(blob.isfile) + if blob.isfile: + writer.swrite(WebFile(blob).path) + else: + writer.swrite(blob.ctype) + writer.swrite(blob.size) + writer.writeData(blob.buffer, int(blob.size)) + +proc swrite*[T](writer: var BufferedWriter; o: Option[T]) = + writer.swrite(o.isSome) + if o.isSome: + writer.swrite(o.get) + +proc swrite*[T, E](writer: var BufferedWriter; o: Result[T, E]) = + writer.swrite(o.isOk) + if o.isOk: + when not (T is void): + writer.swrite(o.get) + else: + when not (E is void): + writer.swrite(o.error) diff --git a/src/io/dynstream.nim b/src/io/dynstream.nim index d4c7760f..ec38c595 100644 --- a/src/io/dynstream.nim +++ b/src/io/dynstream.nim @@ -24,6 +24,9 @@ method seek*(s: DynStream; off: int) {.base.} = method sclose*(s: DynStream) {.base.} = assert false +method sflush*(s: DynStream) {.base.} = + discard + proc recvData*(s: DynStream; buffer: var openArray[uint8]): int {.inline.} = return s.recvData(addr buffer[0], buffer.len) diff --git a/src/io/serialize.nim b/src/io/serialize.nim index 4dcb79f0..3ffa40ac 100644 --- a/src/io/serialize.nim +++ b/src/io/serialize.nim @@ -10,73 +10,54 @@ import types/formdata import types/url import types/opt -proc swrite*(stream: Stream, n: SomeNumber) proc sread*(stream: Stream, n: var SomeNumber) func slen*(n: SomeNumber): int -proc swrite*[T](stream: Stream, s: set[T]) proc sread*[T](stream: Stream, s: var set[T]) func slen*[T](s: set[T]): int -proc swrite*[T: enum](stream: Stream, x: T) proc sread*[T: enum](stream: Stream, x: var T) func slen*[T: enum](x: T): int -proc swrite*(stream: Stream, s: string) proc sread*(stream: Stream, s: var string) func slen*(s: string): int -proc swrite*(stream: Stream, b: bool) proc sread*(stream: Stream, b: var bool) func slen*(b: bool): int -proc swrite*(stream: Stream, url: URL) proc sread*(stream: Stream, url: var URL) func slen*(url: URL): int -proc swrite*(stream: Stream, tup: tuple) proc sread*(stream: Stream, tup: var tuple) func slen*(tup: tuple): int -proc swrite*[I, T](stream: Stream, a: array[I, T]) proc sread*[I, T](stream: Stream, a: var array[I, T]) func slen*[I, T](a: array[I, T]): int -proc swrite*(stream: Stream, s: seq) proc sread*(stream: Stream, s: var seq) func slen*(s: seq): int -proc swrite*[U, V](stream: Stream, t: Table[U, V]) proc sread*[U, V](stream: Stream, t: var Table[U, V]) func slen*[U, V](t: Table[U, V]): int -proc swrite*(stream: Stream, obj: object) proc sread*(stream: Stream, obj: var object) func slen*(obj: object): int -proc swrite*(stream: Stream, obj: ref object) proc sread*(stream: Stream, obj: var ref object) func slen*(obj: ref object): int -proc swrite*(stream: Stream, part: FormDataEntry) proc sread*(stream: Stream, part: var FormDataEntry) func slen*(part: FormDataEntry): int -proc swrite*(stream: Stream, blob: Blob) proc sread*(stream: Stream, blob: var Blob) func slen*(blob: Blob): int -proc swrite*[T](stream: Stream, o: Option[T]) proc sread*[T](stream: Stream, o: var Option[T]) func slen*[T](o: Option[T]): int -proc swrite*[T, E](stream: Stream, o: Result[T, E]) proc sread*[T, E](stream: Stream, o: var Result[T, E]) func slen*[T, E](o: Result[T, E]): int -proc swrite*(stream: Stream, n: SomeNumber) = - stream.write(n) - proc sread*(stream: Stream, n: var SomeNumber) = if stream.readData(addr n, sizeof(n)) < sizeof(n): raise newException(EOFError, "eof") @@ -84,11 +65,6 @@ proc sread*(stream: Stream, n: var SomeNumber) = func slen*(n: SomeNumber): int = return sizeof(n) -proc swrite*[T: enum](stream: Stream, x: T) = - static: - doAssert sizeof(int) >= sizeof(T) - stream.swrite(int(x)) - proc sread*[T: enum](stream: Stream, x: var T) = var i: int stream.sread(i) @@ -97,11 +73,6 @@ proc sread*[T: enum](stream: Stream, x: var T) = func slen*[T: enum](x: T): int = return sizeof(int) -proc swrite*[T](stream: Stream, s: set[T]) = - stream.swrite(s.card) - for e in s: - stream.swrite(e) - proc sread*[T](stream: Stream, s: var set[T]) = var len: int stream.sread(len) @@ -115,10 +86,6 @@ func slen*[T](s: set[T]): int = for x in s: result += slen(x) -proc swrite*(stream: Stream, s: string) = - stream.swrite(s.len) - stream.write(s) - proc sread*(stream: Stream, s: var string) = var len: int stream.sread(len) @@ -133,12 +100,6 @@ proc sread*(stream: Stream, s: var string) = func slen*(s: string): int = slen(s.len) + s.len -proc swrite*(stream: Stream, b: bool) = - if b: - stream.swrite(1u8) - else: - stream.swrite(0u8) - proc sread*(stream: Stream, b: var bool) = var n: uint8 stream.sread(n) @@ -151,12 +112,6 @@ proc sread*(stream: Stream, b: var bool) = func slen*(b: bool): int = return sizeof(uint8) -proc swrite*(stream: Stream, url: URL) = - if url != nil: - stream.swrite(url.serialize()) - else: - stream.swrite("") - proc sread*(stream: Stream, url: var URL) = var s: string stream.sread(s) @@ -174,10 +129,6 @@ func slen*(url: URL): int = return slen("") return slen(url.serialize()) -proc swrite*(stream: Stream, tup: tuple) = - for f in tup.fields: - stream.swrite(f) - proc sread*(stream: Stream, tup: var tuple) = for f in tup.fields: stream.sread(f) @@ -186,10 +137,6 @@ func slen*(tup: tuple): int = for f in tup.fields: result += slen(f) -proc swrite*[I, T](stream: Stream; a: array[I, T]) = - for x in a: - stream.swrite(x) - proc sread*[I, T](stream: Stream; a: var array[I, T]) = for x in a.mitems: stream.sread(x) @@ -198,11 +145,6 @@ func slen*[I, T](a: array[I, T]): int = for x in a: result += slen(x) -proc swrite*(stream: Stream, s: seq) = - stream.swrite(s.len) - for x in s: - stream.swrite(x) - proc sread*(stream: Stream, s: var seq) = var len: int stream.sread(len) @@ -215,12 +157,6 @@ func slen*(s: seq): int = for x in s: result += slen(x) -proc swrite*[U, V](stream: Stream, t: Table[U, V]) = - stream.swrite(t.len) - for k, v in t: - stream.swrite(k) - stream.swrite(v) - proc sread*[U, V](stream: Stream, t: var Table[U, V]) = var len: int stream.sread(len) @@ -237,10 +173,6 @@ func slen*[U, V](t: Table[U, V]): int = result += slen(k) result += slen(v) -proc swrite*(stream: Stream, obj: object) = - for f in obj.fields: - stream.swrite(f) - proc sread*(stream: Stream, obj: var object) = for f in obj.fields: stream.sread(f) @@ -249,11 +181,6 @@ func slen*(obj: object): int = for f in obj.fields: result += slen(f) -proc swrite*(stream: Stream, obj: ref object) = - stream.swrite(obj != nil) - if obj != nil: - stream.swrite(obj[]) - proc sread*(stream: Stream, obj: var ref object) = var n: bool stream.sread(n) @@ -266,15 +193,6 @@ func slen*(obj: ref object): int = if obj != nil: result += slen(obj[]) -proc swrite*(stream: Stream, part: FormDataEntry) = - stream.swrite(part.isstr) - stream.swrite(part.name) - stream.swrite(part.filename) - if part.isstr: - stream.swrite(part.svalue) - else: - stream.swrite(part.value) - proc sread*(stream: Stream, part: var FormDataEntry) = var isstr: bool stream.sread(isstr) @@ -298,16 +216,6 @@ func slen*(part: FormDataEntry): int = else: result += slen(part.value) -#TODO clean up this mess -proc swrite*(stream: Stream, blob: Blob) = - stream.swrite(blob.isfile) - if blob.isfile: - stream.swrite(WebFile(blob).path) - else: - stream.swrite(blob.ctype) - stream.swrite(blob.size) - stream.writeData(blob.buffer, int(blob.size)) - proc sread*(stream: Stream, blob: var Blob) = var isfile: bool stream.sread(isfile) @@ -336,11 +244,6 @@ func slen*(blob: Blob): int = result += slen(blob.size) result += int(blob.size) #TODO ?? -proc swrite*[T](stream: Stream, o: Option[T]) = - stream.swrite(o.isSome) - if o.isSome: - stream.swrite(o.get) - proc sread*[T](stream: Stream, o: var Option[T]) = var x: bool stream.sread(x) @@ -356,15 +259,6 @@ func slen*[T](o: Option[T]): int = if o.isSome: result += slen(o.get) -proc swrite*[T, E](stream: Stream, o: Result[T, E]) = - stream.swrite(o.isOk) - if o.isOk: - when not (T is void): - stream.swrite(o.get) - else: - when not (E is void): - stream.swrite(o.error) - proc sread*[T, E](stream: Stream, o: var Result[T, E]) = var x: bool stream.sread(x) diff --git a/src/loader/loader.nim b/src/loader/loader.nim index fdc87eb2..3b454b4d 100644 --- a/src/loader/loader.nim +++ b/src/loader/loader.nim @@ -25,6 +25,7 @@ import std/strutils import std/tables import config/chapath +import io/bufwriter import io/posixstream import io/promise import io/serialize @@ -466,11 +467,12 @@ proc addClient(ctx: LoaderContext; stream: SocketStream) = stream.sread(key) stream.sread(pid) stream.sread(config) - if pid in ctx.clientData or key == default(ClientKey): - stream.swrite(false) - else: - ctx.clientData[pid] = ClientData(pid: pid, key: key, config: config) - stream.swrite(true) + stream.withWriter w: + if pid in ctx.clientData or key == default(ClientKey): + w.swrite(false) + else: + ctx.clientData[pid] = ClientData(pid: pid, key: key, config: config) + w.swrite(true) stream.close() proc cleanup(client: ClientData) = @@ -497,8 +499,9 @@ proc addCacheFile(ctx: LoaderContext; stream: SocketStream) = assert output != nil let targetClient = ctx.clientData[targetPid] let (id, file) = ctx.addCacheFile(targetClient, output) - stream.swrite(id) - stream.swrite(file) + stream.withWriter w: + w.swrite(id) + w.swrite(file) stream.close() proc redirectToFile(ctx: LoaderContext; stream: SocketStream) = @@ -510,7 +513,8 @@ proc redirectToFile(ctx: LoaderContext; stream: SocketStream) = var success = false if output != nil: success = ctx.redirectToFile(output, targetPath) - stream.swrite(success) + stream.withWriter w: + w.swrite(success) stream.close() proc shareCachedItem(ctx: LoaderContext; stream: SocketStream) = @@ -561,10 +565,12 @@ proc tee(ctx: LoaderContext; stream: SocketStream; client: ClientData) = if output != nil: let id = ctx.getOutputId() output.tee(stream, id, targetPid) - stream.swrite(id) + stream.withWriter w: + w.swrite(id) stream.setBlocking(false) else: - stream.swrite(-1) + stream.withWriter w: + w.swrite(-1) stream.close() proc suspend(ctx: LoaderContext; stream: SocketStream; client: ClientData) = @@ -692,7 +698,8 @@ proc initLoaderContext(fd: cint; config: LoaderConfig): LoaderContext = stream.sread(key) stream.sread(pid) stream.sread(config) - stream.swrite(true) + stream.withWriter w: + w.swrite(true) ctx.pagerClient = ClientData(key: key, pid: pid, config: config) ctx.clientData[pid] = ctx.pagerClient stream.close() @@ -815,8 +822,9 @@ proc getRedirect*(response: Response; request: Request): Request = proc connect(loader: FileLoader; buffered = true): SocketStream = let stream = connectSocketStream(loader.process, buffered, blocking = true) if stream != nil: - stream.swrite(loader.clientPid) - stream.swrite(loader.key) + stream.withWriter w: + w.swrite(loader.clientPid) + w.swrite(loader.key) return stream return nil @@ -824,9 +832,9 @@ proc connect(loader: FileLoader; buffered = true): SocketStream = # anyway). proc startRequest*(loader: FileLoader; request: Request): SocketStream = let stream = loader.connect(buffered = false) - stream.swrite(lcLoad) - stream.swrite(request) - stream.flush() + stream.withWriter w: + w.swrite(lcLoad) + w.swrite(request) return stream #TODO: add init @@ -845,9 +853,9 @@ proc fetch*(loader: FileLoader; input: Request): FetchPromise = proc reconnect*(loader: FileLoader; data: ConnectData) = data.stream.close() let stream = loader.connect(buffered = false) - stream.swrite(lcLoad) - stream.swrite(data.request) - stream.flush() + stream.withWriter w: + w.swrite(lcLoad) + w.swrite(data.request) let fd = int(stream.fd) loader.registerFun(fd) loader.connecting[fd] = ConnectData( @@ -873,21 +881,24 @@ proc switchStream*(loader: FileLoader; data: var OngoingData; proc suspend*(loader: FileLoader; fds: seq[int]) = let stream = loader.connect() - stream.swrite(lcSuspend) - stream.swrite(fds) + stream.withWriter w: + w.swrite(lcSuspend) + w.swrite(fds) stream.close() proc resume*(loader: FileLoader; fds: seq[int]) = let stream = loader.connect() - stream.swrite(lcResume) - stream.swrite(fds) + stream.withWriter w: + w.swrite(lcResume) + w.swrite(fds) stream.close() proc tee*(loader: FileLoader; sourceId, targetPid: int): (SocketStream, int) = let stream = loader.connect(buffered = false) - stream.swrite(lcTee) - stream.swrite(sourceId) - stream.swrite(targetPid) + stream.withWriter w: + w.swrite(lcTee) + w.swrite(sourceId) + w.swrite(targetPid) var outputId: int stream.sread(outputId) return (stream, outputId) @@ -897,10 +908,10 @@ proc addCacheFile*(loader: FileLoader; outputId, targetPid: int): let stream = loader.connect() if stream == nil: return (-1, "") - stream.swrite(lcAddCacheFile) - stream.swrite(outputId) - stream.swrite(targetPid) - stream.flush() + stream.withWriter w: + w.swrite(lcAddCacheFile) + w.swrite(outputId) + w.swrite(targetPid) var outputId: int var cacheFile: string stream.sread(outputId) @@ -912,10 +923,10 @@ proc redirectToFile*(loader: FileLoader; outputId: int; targetPath: string): let stream = loader.connect() if stream == nil: return false - stream.swrite(lcRedirectToFile) - stream.swrite(outputId) - stream.swrite(targetPath) - stream.flush() + stream.withWriter w: + w.swrite(lcRedirectToFile) + w.swrite(outputId) + w.swrite(targetPath) stream.sread(result) const BufferSize = 4096 @@ -1014,41 +1025,45 @@ proc doRequest*(loader: FileLoader; request: Request): Response = proc shareCachedItem*(loader: FileLoader; id, targetPid: int) = let stream = loader.connect() if stream != nil: - stream.swrite(lcShareCachedItem) - stream.swrite(loader.clientPid) - stream.swrite(targetPid) - stream.swrite(id) + stream.withWriter w: + w.swrite(lcShareCachedItem) + w.swrite(loader.clientPid) + w.swrite(targetPid) + w.swrite(id) stream.close() proc passFd*(loader: FileLoader; id: string; fd: FileHandle) = let stream = loader.connect(buffered = false) if stream != nil: - stream.swrite(lcPassFd) - stream.swrite(id) + stream.withWriter w: + w.swrite(lcPassFd) + w.swrite(id) stream.sendFileHandle(fd) stream.close() proc removeCachedItem*(loader: FileLoader; cacheId: int) = let stream = loader.connect() if stream != nil: - stream.swrite(lcRemoveCachedItem) - stream.swrite(cacheId) + stream.withWriter w: + w.swrite(lcRemoveCachedItem) + w.swrite(cacheId) stream.close() proc addClient*(loader: FileLoader; key: ClientKey; pid: int; config: LoaderClientConfig): bool = let stream = loader.connect() - stream.swrite(lcAddClient) - stream.swrite(key) - stream.swrite(pid) - stream.swrite(config) - stream.flush() + stream.withWriter w: + w.swrite(lcAddClient) + w.swrite(key) + w.swrite(pid) + w.swrite(config) stream.sread(result) stream.close() proc removeClient*(loader: FileLoader; pid: int) = let stream = loader.connect() if stream != nil: - stream.swrite(lcRemoveClient) - stream.swrite(pid) + stream.withWriter w: + w.swrite(lcRemoveClient) + w.swrite(pid) stream.close() diff --git a/src/loader/loaderhandle.nim b/src/loader/loaderhandle.nim index dbe998c6..6e1b53a9 100644 --- a/src/loader/loaderhandle.nim +++ b/src/loader/loaderhandle.nim @@ -3,8 +3,8 @@ import std/net import std/streams import std/tables +import io/bufwriter import io/posixstream -import io/serialize import loader/headers when defined(debug): @@ -129,12 +129,13 @@ proc sendResult*(handle: LoaderHandle; res: int; msg = "") = let output = handle.output let blocking = output.ostream.blocking output.ostream.setBlocking(true) - output.ostream.swrite(res) - if res == 0: # success - assert msg == "" - output.ostream.swrite(output.outputId) - else: # error - output.ostream.swrite(msg) + output.ostream.withWriter w: + w.swrite(res) + if res == 0: # success + assert msg == "" + w.swrite(output.outputId) + else: # error + w.swrite(msg) output.ostream.setBlocking(blocking) proc sendStatus*(handle: LoaderHandle; status: uint16) = @@ -142,7 +143,8 @@ proc sendStatus*(handle: LoaderHandle; status: uint16) = inc handle.rstate let blocking = handle.output.ostream.blocking handle.output.ostream.setBlocking(true) - handle.output.ostream.swrite(status) + handle.output.ostream.withWriter w: + w.swrite(status) handle.output.ostream.setBlocking(blocking) proc sendHeaders*(handle: LoaderHandle; headers: Headers) = @@ -150,7 +152,8 @@ proc sendHeaders*(handle: LoaderHandle; headers: Headers) = inc handle.rstate let blocking = handle.output.ostream.blocking handle.output.ostream.setBlocking(true) - handle.output.ostream.swrite(headers) + handle.output.ostream.withWriter w: + w.swrite(headers) handle.output.ostream.setBlocking(blocking) proc recvData*(ps: PosixStream; buffer: LoaderBuffer): int {.inline.} = diff --git a/src/local/client.nim b/src/local/client.nim index 5c93e375..8633d051 100644 --- a/src/local/client.nim +++ b/src/local/client.nim @@ -21,9 +21,9 @@ import html/event import html/formdata import html/xmlhttprequest import io/bufstream +import io/bufwriter import io/posixstream import io/promise -import io/serialize import io/socketstream import js/base64 import js/console @@ -489,7 +489,8 @@ proc acceptBuffers(client: Client) = pager.alert("Error: failed to set up buffer") continue let key = pager.addLoaderClient(container.process, container.loaderConfig) - stream.swrite(key) + stream.withWriter w: + w.swrite(key) let loader = pager.loader if item.fdin != -1: let outputId = item.istreamOutputId diff --git a/src/server/buffer.nim b/src/server/buffer.nim index 96d9c5ff..7bdd46d0 100644 --- a/src/server/buffer.nim +++ b/src/server/buffer.nim @@ -27,6 +27,7 @@ import html/env import html/event import html/formdata as formdata_impl import io/bufstream +import io/bufwriter import io/posixstream import io/promise import io/serialize @@ -192,8 +193,9 @@ proc buildInterfaceProc(fun: NimNode, funid: string): tuple[fun, name: NimNode] let this2 = newIdentDefs(ident("iface"), ident("BufferInterface")) let thisval = this2[0] body.add(quote do: - `thisval`.stream.swrite(BufferCommand.`nup`) - `thisval`.stream.swrite(`thisval`.packetid) + var writer {.inject.} = `thisval`.stream.initWriter() + writer.swrite(BufferCommand.`nup`) + writer.swrite(`thisval`.packetid) ) var params2: seq[NimNode] var retval2: NimNode @@ -220,12 +222,14 @@ proc buildInterfaceProc(fun: NimNode, funid: string): tuple[fun, name: NimNode] let s = params2[i][0] # sym e.g. url body.add(quote do: when typeof(`s`) is FileHandle: - #TODO flush or something + writer.flush() SocketStream(`thisval`.stream.source).sendFileHandle(`s`) else: - `thisval`.stream.swrite(`s`) + writer.swrite(`s`) ) body.add(quote do: + writer.flush() + writer.deinit() let promise = `addfun` inc `thisval`.packetid return promise @@ -1099,11 +1103,11 @@ proc resolveTask[T](buffer: Buffer, cmd: BufferCommand, res: T) = if packetid == 0: return # no task to resolve (TODO this is kind of inefficient) let len = slen(buffer.tasks[cmd]) + slen(res) - buffer.pstream.swrite(len) - buffer.pstream.swrite(packetid) + buffer.pstream.withWriter w: + w.swrite(len) + w.swrite(packetid) + w.swrite(res) buffer.tasks[cmd] = 0 - buffer.pstream.swrite(res) - buffer.pstream.flush() proc onload(buffer: Buffer) = case buffer.state @@ -1664,7 +1668,8 @@ macro bufferDispatcher(funs: static ProxyMap, buffer: Buffer, let `id` = `buffer`.pstream.recvFileHandle() else: var `id`: `typ` - `buffer`.pstream.sread(`id`)) + `buffer`.pstream.sread(`id`) + ) call.add(id) var rval: NimNode if v.params[0].kind == nnkEmpty: @@ -1677,15 +1682,19 @@ macro bufferDispatcher(funs: static ProxyMap, buffer: Buffer, if rval == nil: resolve.add(quote do: let len = slen(`packetid`) - buffer.pstream.swrite(len) - buffer.pstream.swrite(`packetid`) + block: + buffer.pstream.withWriter w: + w.swrite(len) + w.swrite(`packetid`) ) else: resolve.add(quote do: let len = slen(`packetid`) + slen(`rval`) - buffer.pstream.swrite(len) - buffer.pstream.swrite(`packetid`) - buffer.pstream.swrite(`rval`) + block: + buffer.pstream.withWriter w: + w.swrite(len) + w.swrite(`packetid`) + w.swrite(`rval`) ) if v.istask: let en = v.ename @@ -1694,7 +1703,8 @@ macro bufferDispatcher(funs: static ProxyMap, buffer: Buffer, buffer.savetask = false buffer.tasks[BufferCommand.`en`] = `packetid` else: - `resolve`) + `resolve` + ) else: stmts.add(resolve) ofbranch.add(stmts) diff --git a/src/server/forkserver.nim b/src/server/forkserver.nim index 2c00dd4e..12b25dc3 100644 --- a/src/server/forkserver.nim +++ b/src/server/forkserver.nim @@ -5,6 +5,7 @@ import std/streams import std/tables import config/config +import io/bufwriter import io/posixstream import io/serialize import io/serversocket @@ -23,44 +24,44 @@ type ForkServer* = ref object istream: Stream - ostream: Stream + ostream: PosixStream estream*: PosixStream ForkServerContext = object - istream: Stream - ostream: Stream + istream: PosixStream + ostream: PosixStream children: seq[int] loaderPid: int proc newFileLoader*(forkserver: ForkServer; config: LoaderConfig): FileLoader = - forkserver.ostream.swrite(fcForkLoader) - forkserver.ostream.swrite(config) - forkserver.ostream.flush() + forkserver.ostream.withWriter w: + w.swrite(fcForkLoader) + w.swrite(config) var process: int forkserver.istream.sread(process) return FileLoader(process: process, clientPid: getCurrentProcessId()) proc loadForkServerConfig*(forkserver: ForkServer, config: Config) = - forkserver.ostream.swrite(fcLoadConfig) - forkserver.ostream.swrite(config.getForkServerConfig()) - forkserver.ostream.flush() + forkserver.ostream.withWriter w: + w.swrite(fcLoadConfig) + w.swrite(config.getForkServerConfig()) proc removeChild*(forkserver: ForkServer, pid: int) = - forkserver.ostream.swrite(fcRemoveChild) - forkserver.ostream.swrite(pid) - forkserver.ostream.flush() + forkserver.ostream.withWriter w: + w.swrite(fcRemoveChild) + w.swrite(pid) proc forkBuffer*(forkserver: ForkServer; config: BufferConfig; url: URL; request: Request; attrs: WindowAttributes; ishtml: bool; charsetStack: seq[Charset]): int = - forkserver.ostream.swrite(fcForkBuffer) - forkserver.ostream.swrite(config) - forkserver.ostream.swrite(url) - forkserver.ostream.swrite(request) - forkserver.ostream.swrite(attrs) - forkserver.ostream.swrite(ishtml) - forkserver.ostream.swrite(charsetStack) - forkserver.ostream.flush() + forkserver.ostream.withWriter w: + w.swrite(fcForkBuffer) + w.swrite(config) + w.swrite(url) + w.swrite(request) + w.swrite(attrs) + w.swrite(ishtml) + w.swrite(charsetStack) var bufferPid: int forkserver.istream.sread(bufferPid) bufferPid @@ -186,13 +187,16 @@ proc runForkServer() = if i != -1: ctx.children.del(i) of fcForkBuffer: - ctx.ostream.swrite(ctx.forkBuffer()) + let r = ctx.forkBuffer() + ctx.ostream.withWriter w: + w.swrite(r) of fcForkLoader: assert ctx.loaderPid == 0 var config: LoaderConfig ctx.istream.sread(config) let pid = ctx.forkLoader(config) - ctx.ostream.swrite(pid) + ctx.ostream.withWriter w: + w.swrite(pid) ctx.loaderPid = pid ctx.children.add(pid) of fcLoadConfig: @@ -246,15 +250,13 @@ proc newForkServer*(): ForkServer = discard close(pipefd_in[0]) # close read discard close(pipefd_out[1]) # close write discard close(pipefd_err[1]) # close write - var writef, readf: File - if not open(writef, pipefd_in[1], fmWrite): - raise newException(Defect, "Failed to open output handle") + var readf: File if not open(readf, pipefd_out[0], fmRead): raise newException(Defect, "Failed to open input handle") let estream = newPosixStream(pipefd_err[0]) estream.setBlocking(false) return ForkServer( - ostream: newFileStream(writef), + ostream: newPosixStream(pipefd_in[1]), istream: newFileStream(readf), estream: estream ) |