diff options
author | bptato <nincsnevem662@gmail.com> | 2024-03-21 23:11:18 +0100 |
---|---|---|
committer | bptato <nincsnevem662@gmail.com> | 2024-03-21 23:18:55 +0100 |
commit | 03d591d9aed833b0bdb028bfea376e0beeac8e9a (patch) | |
tree | 9a369a4af2aab1711901c06cc02d8b5497018f2a | |
parent | abb09126edcce518614efc0e2c0d55d5e42f8094 (diff) | |
download | chawan-03d591d9aed833b0bdb028bfea376e0beeac8e9a.tar.gz |
io: add bufreader
analogous to bufwriter
-rw-r--r-- | src/io/bufreader.nim | 198 | ||||
-rw-r--r-- | src/io/bufwriter.nim | 35 | ||||
-rw-r--r-- | src/io/dynstream.nim | 7 | ||||
-rw-r--r-- | src/io/serialize.nim | 53 | ||||
-rw-r--r-- | src/io/serversocket.nim | 6 | ||||
-rw-r--r-- | src/io/socketstream.nim | 9 | ||||
-rw-r--r-- | src/loader/loader.nim | 255 | ||||
-rw-r--r-- | src/local/client.nim | 2 | ||||
-rw-r--r-- | src/server/buffer.nim | 33 | ||||
-rw-r--r-- | src/server/forkserver.nim | 89 |
10 files changed, 426 insertions, 261 deletions
diff --git a/src/io/bufreader.nim b/src/io/bufreader.nim new file mode 100644 index 00000000..a8d30fb0 --- /dev/null +++ b/src/io/bufreader.nim @@ -0,0 +1,198 @@ +# Write data to streams. + +import std/options +import std/sets +import std/tables + +import io/dynstream +import types/blob +import types/formdata +import types/opt +import types/url + +type BufferedReader* = object + buffer: seq[uint8] + bufIdx: int + +proc initReader*(stream: DynStream; len: int): BufferedReader = + assert len != 0 + var reader = BufferedReader( + buffer: newSeqUninitialized[uint8](len), + bufIdx: 0 + ) + var n = 0 + while true: + n += stream.recvData(addr reader.buffer[n], len - n) + if n == len: + break + return reader + +template withPacketReader*(stream: DynStream; r, body: untyped) = + block: + var len: int + # note: this must be readData + doAssert stream.readData(addr len, sizeof(len)) == sizeof(len) + var r = stream.initReader(len) + body + +proc sread*(reader: var BufferedReader; n: var SomeNumber) +proc sread*[T](reader: var BufferedReader; s: var set[T]) +proc sread*[T: enum](reader: var BufferedReader; x: var T) +proc sread*(reader: var BufferedReader; s: var string) +proc sread*(reader: var BufferedReader; b: var bool) +proc sread*(reader: var BufferedReader; url: var URL) +proc sread*(reader: var BufferedReader; tup: var tuple) +proc sread*[I, T](reader: var BufferedReader; a: var array[I, T]) +proc sread*(reader: var BufferedReader; s: var seq) +proc sread*[U, V](reader: var BufferedReader; t: var Table[U, V]) +proc sread*(reader: var BufferedReader; obj: var object) +proc sread*(reader: var BufferedReader; obj: var ref object) +proc sread*(reader: var BufferedReader; part: var FormDataEntry) +proc sread*(reader: var BufferedReader; blob: var Blob) +proc sread*[T](reader: var BufferedReader; o: var Option[T]) +proc sread*[T, E](reader: var BufferedReader; o: var Result[T, E]) + +proc readData(reader: var BufferedReader; buffer: pointer; len: int) = + assert reader.bufIdx + len <= reader.buffer.len + copyMem(buffer, addr reader.buffer[reader.bufIdx], len) + reader.bufIdx += len + +proc sread*(reader: var BufferedReader; n: var SomeNumber) = + reader.readData(addr n, sizeof(n)) + +proc sread*[T: enum](reader: var BufferedReader; x: var T) = + var i: int + reader.sread(i) + x = cast[T](i) + +proc sread*[T](reader: var BufferedReader; s: var set[T]) = + var len: int + reader.sread(len) + for i in 0 ..< len: + var x: T + reader.sread(x) + s.incl(x) + +proc sread*(reader: var BufferedReader; s: var string) = + var len: int + reader.sread(len) + s = newString(len) + if len > 0: + reader.readData(addr s[0], len) + +proc sread*(reader: var BufferedReader; b: var bool) = + var n: uint8 + reader.sread(n) + if n == 1u8: + b = true + else: + assert n == 0u8 + b = false + +proc sread*(reader: var BufferedReader; url: var URL) = + var s: string + reader.sread(s) + if s == "": + url = nil + else: + let x = newURL(s) + if x.isSome: + url = x.get + else: + url = nil + +proc sread*(reader: var BufferedReader; tup: var tuple) = + for f in tup.fields: + reader.sread(f) + +proc sread*[I; T](reader: var BufferedReader; a: var array[I, T]) = + for x in a.mitems: + reader.sread(x) + +proc sread*(reader: var BufferedReader; s: var seq) = + var len: int + reader.sread(len) + s.setLen(len) + for x in s.mitems: + reader.sread(x) + +proc sread*[U; V](reader: var BufferedReader, t: var Table[U, V]) = + var len: int + reader.sread(len) + for i in 0..<len: + var k: U + reader.sread(k) + var v: V + reader.sread(v) + t[k] = v + +proc sread*(reader: var BufferedReader; obj: var object) = + for f in obj.fields: + reader.sread(f) + +proc sread*(reader: var BufferedReader; obj: var ref object) = + var n: bool + reader.sread(n) + if n: + new(obj) + reader.sread(obj[]) + +proc sread*(reader: var BufferedReader; part: var FormDataEntry) = + var isstr: bool + reader.sread(isstr) + if isstr: + part = FormDataEntry(isstr: true) + else: + part = FormDataEntry(isstr: false) + reader.sread(part.name) + reader.sread(part.filename) + if part.isstr: + reader.sread(part.svalue) + else: + reader.sread(part.value) + +proc sread*(reader: var BufferedReader; blob: var Blob) = + var isfile: bool + reader.sread(isfile) + if isfile: + var file = new WebFile + file.isfile = true + reader.sread(file.path) + blob = file + else: + blob = Blob() + reader.sread(blob.ctype) + reader.sread(blob.size) + let buffer = alloc(blob.size) + blob.buffer = buffer + blob.deallocFun = proc() = dealloc(buffer) + if blob.size > 0: + reader.readData(blob.buffer, int(blob.size)) + +proc sread*[T](reader: var BufferedReader; o: var Option[T]) = + var x: bool + reader.sread(x) + if x: + var m: T + reader.sread(m) + o = some(m) + else: + o = none(T) + +proc sread*[T, E](reader: var BufferedReader; o: var Result[T, E]) = + var x: bool + reader.sread(x) + if x: + when T isnot void: + var m: T + reader.sread(m) + o.ok(m) + else: + o.ok() + else: + when E isnot void: + var e: E + reader.sread(e) + o.err(e) + else: + o.err() diff --git a/src/io/bufwriter.nim b/src/io/bufwriter.nim index 57219100..cbee8b5b 100644 --- a/src/io/bufwriter.nim +++ b/src/io/bufwriter.nim @@ -16,6 +16,7 @@ type BufferedWriter* = object buffer: ptr UncheckedArray[uint8] bufSize: int bufLen: int + writeLen: bool {.warning[Deprecated]: off.}: proc `=destroy`(writer: var BufferedWriter) = @@ -23,23 +24,28 @@ type BufferedWriter* = object dealloc(writer.buffer) writer.buffer = nil -proc initWriter*(stream: DynStream; sizeInit = 64): BufferedWriter = - return BufferedWriter( +proc initWriter*(stream: DynStream; sizeInit = 64; writeLen = false): + BufferedWriter = + var w = BufferedWriter( stream: stream, buffer: cast[ptr UncheckedArray[uint8]](alloc(sizeInit)), bufSize: sizeInit, - bufLen: 0 + bufLen: 0, + writeLen: writeLen ) + if writeLen: # add space for `len' + w.bufLen += sizeof(w.bufLen) + assert w.bufLen < sizeInit + return w proc flush*(writer: var BufferedWriter) = - let stream = writer.stream - var n = 0 - while true: - n += stream.sendData(addr writer.buffer[n], writer.bufLen - n) - if n == writer.bufLen: - break + if writer.writeLen: + # subtract the length field's size + var realLen = writer.bufLen - sizeof(writer.bufLen) + copyMem(writer.buffer, addr realLen, sizeof(writer.bufLen)) + writer.stream.sendDataLoop(writer.buffer, writer.bufLen) writer.bufLen = 0 - stream.sflush() + writer.stream.sflush() proc deinit*(writer: var BufferedWriter) = dealloc(writer.buffer) @@ -49,7 +55,14 @@ proc deinit*(writer: var BufferedWriter) = template withWriter*(stream: DynStream; w, body: untyped) = block: - var w {.inject.} = stream.initWriter() + var w = stream.initWriter() + body + w.flush() + w.deinit() + +template withPacketWriter*(stream: DynStream; w, body: untyped) = + block: + var w = stream.initWriter(writeLen = true) body w.flush() w.deinit() diff --git a/src/io/dynstream.nim b/src/io/dynstream.nim index ec38c595..ac1269f5 100644 --- a/src/io/dynstream.nim +++ b/src/io/dynstream.nim @@ -39,6 +39,13 @@ proc sendData*(s: DynStream; buffer: openArray[char]): int {.inline.} = proc sendData*(s: DynStream; buffer: openArray[uint8]): int {.inline.} = return s.sendData(unsafeAddr buffer[0], buffer.len) +proc sendDataLoop*(s: DynStream; buffer: pointer; len: int) = + var n = 0 + while true: + n += s.sendData(addr cast[ptr UncheckedArray[uint8]](buffer)[n], len - n) + if n == len: + break + proc dsClose(s: Stream) = DynStream(s).sclose() diff --git a/src/io/serialize.nim b/src/io/serialize.nim index 3ffa40ac..0b54bea3 100644 --- a/src/io/serialize.nim +++ b/src/io/serialize.nim @@ -25,10 +25,8 @@ func slen*(s: string): int proc sread*(stream: Stream, b: var bool) func slen*(b: bool): int -proc sread*(stream: Stream, url: var URL) func slen*(url: URL): int -proc sread*(stream: Stream, tup: var tuple) func slen*(tup: tuple): int proc sread*[I, T](stream: Stream, a: var array[I, T]) @@ -46,10 +44,8 @@ func slen*(obj: object): int proc sread*(stream: Stream, obj: var ref object) func slen*(obj: ref object): int -proc sread*(stream: Stream, part: var FormDataEntry) func slen*(part: FormDataEntry): int -proc sread*(stream: Stream, blob: var Blob) func slen*(blob: Blob): int proc sread*[T](stream: Stream, o: var Option[T]) @@ -112,27 +108,11 @@ proc sread*(stream: Stream, b: var bool) = func slen*(b: bool): int = return sizeof(uint8) -proc sread*(stream: Stream, url: var URL) = - var s: string - stream.sread(s) - if s == "": - url = nil - else: - let x = newURL(s) - if x.isSome: - url = x.get - else: - url = nil - func slen*(url: URL): int = if url == nil: return slen("") return slen(url.serialize()) -proc sread*(stream: Stream, tup: var tuple) = - for f in tup.fields: - stream.sread(f) - func slen*(tup: tuple): int = for f in tup.fields: result += slen(f) @@ -193,20 +173,6 @@ func slen*(obj: ref object): int = if obj != nil: result += slen(obj[]) -proc sread*(stream: Stream, part: var FormDataEntry) = - var isstr: bool - stream.sread(isstr) - if isstr: - part = FormDataEntry(isstr: true) - else: - part = FormDataEntry(isstr: false) - stream.sread(part.name) - stream.sread(part.filename) - if part.isstr: - stream.sread(part.svalue) - else: - stream.sread(part.value) - func slen*(part: FormDataEntry): int = result += slen(part.isstr) result += slen(part.name) @@ -216,25 +182,6 @@ func slen*(part: FormDataEntry): int = else: result += slen(part.value) -proc sread*(stream: Stream, blob: var Blob) = - var isfile: bool - stream.sread(isfile) - if isfile: - var file = new WebFile - file.isfile = true - stream.sread(file.path) - blob = file - else: - blob = Blob() - stream.sread(blob.ctype) - stream.sread(blob.size) - let buffer = alloc(blob.size) - blob.buffer = buffer - blob.deallocFun = proc() = dealloc(buffer) - if blob.size > 0: - let n = stream.readData(blob.buffer, int(blob.size)) - assert n == int(blob.size) - func slen*(blob: Blob): int = result += slen(blob.isfile) if blob.isfile: diff --git a/src/io/serversocket.nim b/src/io/serversocket.nim index 020c5ed3..a6acc555 100644 --- a/src/io/serversocket.nim +++ b/src/io/serversocket.nim @@ -19,10 +19,10 @@ proc getSocketPath*(pid: int): string = {.compile: "bind_unix.c".} proc bind_unix_from_c(fd: cint, path: cstring, pathlen: cint): cint {.importc.} -proc initServerSocket*(pid: int; buffered = true; blocking = true): - ServerSocket = +proc initServerSocket*(pid: int; blocking = true): ServerSocket = createDir(SocketDirectory) - let sock = newSocket(Domain.AF_UNIX, SockType.SOCK_STREAM, Protocol.IPPROTO_IP, buffered) + let sock = newSocket(Domain.AF_UNIX, SockType.SOCK_STREAM, + Protocol.IPPROTO_IP, buffered = false) if not blocking: sock.getFd().setBlocking(false) let path = getSocketPath(pid) diff --git a/src/io/socketstream.nim b/src/io/socketstream.nim index e02108ce..78e7fb3e 100644 --- a/src/io/socketstream.nim +++ b/src/io/socketstream.nim @@ -61,10 +61,9 @@ method sclose*(s: SocketStream) = proc connect_unix_from_c(fd: cint, path: cstring, pathlen: cint): cint {.importc.} -proc connectSocketStream*(path: string, buffered = true, blocking = true): - SocketStream = +proc connectSocketStream*(path: string; blocking = true): SocketStream = let sock = newSocket(Domain.AF_UNIX, SockType.SOCK_STREAM, - Protocol.IPPROTO_IP, buffered) + Protocol.IPPROTO_IP, buffered = false) if not blocking: sock.getFd().setBlocking(false) if connect_unix_from_c(cint(sock.getFd()), cstring(path), @@ -77,10 +76,10 @@ proc connectSocketStream*(path: string, buffered = true, blocking = true): ) result.addStreamIface() -proc connectSocketStream*(pid: int, buffered = true, blocking = true): +proc connectSocketStream*(pid: int; blocking = true): SocketStream = try: - return connectSocketStream(getSocketPath(pid), buffered, blocking) + return connectSocketStream(getSocketPath(pid), blocking) except OSError: return nil diff --git a/src/loader/loader.nim b/src/loader/loader.nim index 9c9c2e8a..b0c5b6ab 100644 --- a/src/loader/loader.nim +++ b/src/loader/loader.nim @@ -24,6 +24,7 @@ import std/streams import std/strutils import std/tables +import io/bufreader import io/bufwriter import io/posixstream import io/promise @@ -444,9 +445,10 @@ proc setupRequestDefaults*(request: Request; config: LoaderClientConfig) = if r != "": request.headers["Referer"] = r -proc onLoad(ctx: LoaderContext; stream: SocketStream; client: ClientData) = +proc load(ctx: LoaderContext; stream: SocketStream; client: ClientData; + r: var BufferedReader) = var request: Request - stream.sread(request) + r.sread(request) let handle = newLoaderHandle(stream, ctx.getOutputId(), client.pid, request.suspended) when defined(debug): @@ -463,13 +465,14 @@ proc onLoad(ctx: LoaderContext; stream: SocketStream; client: ClientData) = request.proxy = client.config.proxy ctx.loadResource(client, request, handle) -proc addClient(ctx: LoaderContext; stream: SocketStream) = +proc addClient(ctx: LoaderContext; stream: SocketStream; + r: var BufferedReader) = var key: ClientKey var pid: int var config: LoaderClientConfig - stream.sread(key) - stream.sread(pid) - stream.sread(config) + r.sread(key) + r.sread(pid) + r.sread(config) stream.withWriter w: if pid in ctx.clientData or key == default(ClientKey): w.swrite(false) @@ -484,20 +487,22 @@ proc cleanup(client: ClientData) = if it.refc == 0: discard unlink(cstring(it.path)) -proc removeClient(ctx: LoaderContext; stream: SocketStream) = +proc removeClient(ctx: LoaderContext; stream: SocketStream; + r: var BufferedReader) = var pid: int - stream.sread(pid) + r.sread(pid) if pid in ctx.clientData: let client = ctx.clientData[pid] client.cleanup() ctx.clientData.del(pid) stream.close() -proc addCacheFile(ctx: LoaderContext; stream: SocketStream) = +proc addCacheFile(ctx: LoaderContext; stream: SocketStream; + r: var BufferedReader) = var outputId: int var targetPid: int - stream.sread(outputId) - stream.sread(targetPid) + r.sread(outputId) + r.sread(targetPid) let output = ctx.findOutput(outputId) assert output != nil let targetClient = ctx.clientData[targetPid] @@ -507,11 +512,12 @@ proc addCacheFile(ctx: LoaderContext; stream: SocketStream) = w.swrite(file) stream.close() -proc redirectToFile(ctx: LoaderContext; stream: SocketStream) = +proc redirectToFile(ctx: LoaderContext; stream: SocketStream; + r: var BufferedReader) = var outputId: int var targetPath: string - stream.sread(outputId) - stream.sread(targetPath) + r.sread(outputId) + r.sread(targetPath) let output = ctx.findOutput(outputId) var success = false if output != nil: @@ -520,15 +526,16 @@ proc redirectToFile(ctx: LoaderContext; stream: SocketStream) = w.swrite(success) stream.close() -proc shareCachedItem(ctx: LoaderContext; stream: SocketStream) = +proc shareCachedItem(ctx: LoaderContext; stream: SocketStream; + r: var BufferedReader) = # share a cached file with another buffer. this is for newBufferFrom # (i.e. view source) var sourcePid: int # pid of source client var targetPid: int # pid of target client var id: int - stream.sread(sourcePid) - stream.sread(targetPid) - stream.sread(id) + r.sread(sourcePid) + r.sread(targetPid) + r.sread(id) let sourceClient = ctx.clientData[sourcePid] let targetClient = ctx.clientData[targetPid] let n = sourceClient.cacheMap.find(id) @@ -537,17 +544,17 @@ proc shareCachedItem(ctx: LoaderContext; stream: SocketStream) = targetClient.cacheMap.add(item) stream.close() -proc passFd(ctx: LoaderContext; stream: SocketStream) = +proc passFd(ctx: LoaderContext; stream: SocketStream; r: var BufferedReader) = var id: string - stream.sread(id) + r.sread(id) let fd = stream.recvFileHandle() ctx.passedFdMap[id] = fd stream.close() proc removeCachedItem(ctx: LoaderContext; stream: SocketStream; - client: ClientData) = + client: ClientData; r: var BufferedReader) = var id: int - stream.sread(id) + r.sread(id) let n = client.cacheMap.find(id) if n != -1: let item = client.cacheMap[n] @@ -557,11 +564,12 @@ proc removeCachedItem(ctx: LoaderContext; stream: SocketStream; discard unlink(cstring(item.path)) stream.close() -proc tee(ctx: LoaderContext; stream: SocketStream; client: ClientData) = +proc tee(ctx: LoaderContext; stream: SocketStream; client: ClientData; + r: var BufferedReader) = var sourceId: int var targetPid: int - stream.sread(sourceId) - stream.sread(targetPid) + r.sread(sourceId) + r.sread(targetPid) let output = ctx.findOutput(sourceId) # only allow tee'ing outputs owned by client doAssert output.ownerPid == client.pid @@ -576,9 +584,10 @@ proc tee(ctx: LoaderContext; stream: SocketStream; client: ClientData) = w.swrite(-1) stream.close() -proc suspend(ctx: LoaderContext; stream: SocketStream; client: ClientData) = +proc suspend(ctx: LoaderContext; stream: SocketStream; client: ClientData; + r: var BufferedReader) = var ids: seq[int] - stream.sread(ids) + r.sread(ids) for id in ids: let output = ctx.findOutput(id) if output != nil: @@ -588,9 +597,10 @@ proc suspend(ctx: LoaderContext; stream: SocketStream; client: ClientData) = output.registered = false ctx.selector.unregister(output.ostream.fd) -proc resume(ctx: LoaderContext; stream: SocketStream; client: ClientData) = +proc resume(ctx: LoaderContext; stream: SocketStream; client: ClientData; + r: var BufferedReader) = var ids: seq[int] - stream.sread(ids) + r.sread(ids) for id in ids: let output = ctx.findOutput(id) if output != nil: @@ -602,52 +612,53 @@ proc resume(ctx: LoaderContext; stream: SocketStream; client: ClientData) = proc acceptConnection(ctx: LoaderContext) = let stream = ctx.ssock.acceptSocketStream() try: - var myPid: int - var key: ClientKey - stream.sread(myPid) - stream.sread(key) - if myPid notin ctx.clientData: - # possibly already removed - stream.close() - return - let client = ctx.clientData[myPid] - if client.key != key: - # ditto - stream.close() - return - var cmd: LoaderCommand - stream.sread(cmd) - template privileged_command = - doAssert client == ctx.pagerClient - case cmd - of lcAddClient: - privileged_command - ctx.addClient(stream) - of lcRemoveClient: - privileged_command - ctx.removeClient(stream) - of lcAddCacheFile: - privileged_command - ctx.addCacheFile(stream) - of lcShareCachedItem: - privileged_command - ctx.shareCachedItem(stream) - of lcPassFd: - privileged_command - ctx.passFd(stream) - of lcRedirectToFile: - privileged_command - ctx.redirectToFile(stream) - of lcRemoveCachedItem: - ctx.removeCachedItem(stream, client) - of lcLoad: - ctx.onLoad(stream, client) - of lcTee: - ctx.tee(stream, client) - of lcSuspend: - ctx.suspend(stream, client) - of lcResume: - ctx.resume(stream, client) + stream.withPacketReader r: + var myPid: int + var key: ClientKey + r.sread(myPid) + r.sread(key) + if myPid notin ctx.clientData: + # possibly already removed + stream.close() + return + let client = ctx.clientData[myPid] + if client.key != key: + # ditto + stream.close() + return + var cmd: LoaderCommand + r.sread(cmd) + template privileged_command = + doAssert client == ctx.pagerClient + case cmd + of lcAddClient: + privileged_command + ctx.addClient(stream, r) + of lcRemoveClient: + privileged_command + ctx.removeClient(stream, r) + of lcAddCacheFile: + privileged_command + ctx.addCacheFile(stream, r) + of lcShareCachedItem: + privileged_command + ctx.shareCachedItem(stream, r) + of lcPassFd: + privileged_command + ctx.passFd(stream, r) + of lcRedirectToFile: + privileged_command + ctx.redirectToFile(stream, r) + of lcRemoveCachedItem: + ctx.removeCachedItem(stream, client, r) + of lcLoad: + ctx.load(stream, client, r) + of lcTee: + ctx.tee(stream, client, r) + of lcSuspend: + ctx.suspend(stream, client, r) + of lcResume: + ctx.resume(stream, client, r) except ErrorBrokenPipe: # receiving end died while reading the file; give up. stream.close() @@ -666,10 +677,8 @@ proc initLoaderContext(fd: cint; config: LoaderConfig): LoaderContext = selector: newSelector[int]() ) gctx = ctx - #TODO ideally, buffered would be true. Unfortunately this conflicts with - # sendFileHandle/recvFileHandle. let myPid = getCurrentProcessId() - ctx.ssock = initServerSocket(myPid, buffered = false, blocking = true) + ctx.ssock = initServerSocket(myPid, blocking = true) let sfd = int(ctx.ssock.sock.getFd()) ctx.selector.registerHandle(sfd, {Read}, 0) # The server has been initialized, so the main process can resume execution. @@ -684,27 +693,28 @@ proc initLoaderContext(fd: cint; config: LoaderConfig): LoaderContext = dir &= '/' # get pager's key let stream = ctx.ssock.acceptSocketStream() - block readNullKey: - var pid: int # ignore pid - stream.sread(pid) - # pager's key is still null + stream.withPacketReader r: + block readNullKey: + var pid: int # ignore pid + r.sread(pid) + # pager's key is still null + var key: ClientKey + r.sread(key) + doAssert key == default(ClientKey) + var cmd: LoaderCommand + r.sread(cmd) + doAssert cmd == lcAddClient var key: ClientKey - stream.sread(key) - doAssert key == default(ClientKey) - var cmd: LoaderCommand - stream.sread(cmd) - doAssert cmd == lcAddClient - var key: ClientKey - var pid: int - var config: LoaderClientConfig - stream.sread(key) - stream.sread(pid) - stream.sread(config) - stream.withWriter w: - w.swrite(true) - ctx.pagerClient = ClientData(key: key, pid: pid, config: config) - ctx.clientData[pid] = ctx.pagerClient - stream.close() + var pid: int + var config: LoaderClientConfig + r.sread(key) + r.sread(pid) + r.sread(config) + stream.withWriter w: + w.swrite(true) + ctx.pagerClient = ClientData(key: key, pid: pid, config: config) + ctx.clientData[pid] = ctx.pagerClient + stream.close() # unblock main socket ctx.ssock.sock.getFd().setBlocking(false) # for CGI @@ -829,20 +839,21 @@ proc getRedirect*(response: Response; request: Request): Request = destination = request.destination) return nil -proc connect(loader: FileLoader; buffered = true): SocketStream = - let stream = connectSocketStream(loader.process, buffered, blocking = true) - if stream != nil: - stream.withWriter w: - w.swrite(loader.clientPid) - w.swrite(loader.key) - return stream - return nil +template withLoaderPacketWriter(stream: SocketStream; loader: FileLoader; + w, body: untyped) = + stream.withPacketWriter w: + w.swrite(loader.clientPid) + w.swrite(loader.key) + body + +proc connect(loader: FileLoader): SocketStream = + return connectSocketStream(loader.process, blocking = true) # Start a request. This should not block (not for a significant amount of time # anyway). proc startRequest*(loader: FileLoader; request: Request): SocketStream = - let stream = loader.connect(buffered = false) - stream.withWriter w: + let stream = loader.connect() + stream.withLoaderPacketWriter loader, w: w.swrite(lcLoad) w.swrite(request) return stream @@ -862,8 +873,8 @@ proc fetch*(loader: FileLoader; input: Request): FetchPromise = proc reconnect*(loader: FileLoader; data: ConnectData) = data.stream.close() - let stream = loader.connect(buffered = false) - stream.withWriter w: + let stream = loader.connect() + stream.withLoaderPacketWriter loader, w: w.swrite(lcLoad) w.swrite(data.request) let fd = int(stream.fd) @@ -891,21 +902,21 @@ proc switchStream*(loader: FileLoader; data: var OngoingData; proc suspend*(loader: FileLoader; fds: seq[int]) = let stream = loader.connect() - stream.withWriter w: + stream.withLoaderPacketWriter loader, w: w.swrite(lcSuspend) w.swrite(fds) stream.close() proc resume*(loader: FileLoader; fds: seq[int]) = let stream = loader.connect() - stream.withWriter w: + stream.withLoaderPacketWriter loader, w: w.swrite(lcResume) w.swrite(fds) stream.close() proc tee*(loader: FileLoader; sourceId, targetPid: int): (SocketStream, int) = - let stream = loader.connect(buffered = false) - stream.withWriter w: + let stream = loader.connect() + stream.withLoaderPacketWriter loader, w: w.swrite(lcTee) w.swrite(sourceId) w.swrite(targetPid) @@ -918,7 +929,7 @@ proc addCacheFile*(loader: FileLoader; outputId, targetPid: int): let stream = loader.connect() if stream == nil: return (-1, "") - stream.withWriter w: + stream.withLoaderPacketWriter loader, w: w.swrite(lcAddCacheFile) w.swrite(outputId) w.swrite(targetPid) @@ -933,7 +944,7 @@ proc redirectToFile*(loader: FileLoader; outputId: int; targetPath: string): let stream = loader.connect() if stream == nil: return false - stream.withWriter w: + stream.withLoaderPacketWriter loader, w: w.swrite(lcRedirectToFile) w.swrite(outputId) w.swrite(targetPath) @@ -1035,7 +1046,7 @@ proc doRequest*(loader: FileLoader; request: Request): Response = proc shareCachedItem*(loader: FileLoader; id, targetPid: int) = let stream = loader.connect() if stream != nil: - stream.withWriter w: + stream.withLoaderPacketWriter loader, w: w.swrite(lcShareCachedItem) w.swrite(loader.clientPid) w.swrite(targetPid) @@ -1043,9 +1054,9 @@ proc shareCachedItem*(loader: FileLoader; id, targetPid: int) = stream.close() proc passFd*(loader: FileLoader; id: string; fd: FileHandle) = - let stream = loader.connect(buffered = false) + let stream = loader.connect() if stream != nil: - stream.withWriter w: + stream.withLoaderPacketWriter loader, w: w.swrite(lcPassFd) w.swrite(id) stream.sendFileHandle(fd) @@ -1054,7 +1065,7 @@ proc passFd*(loader: FileLoader; id: string; fd: FileHandle) = proc removeCachedItem*(loader: FileLoader; cacheId: int) = let stream = loader.connect() if stream != nil: - stream.withWriter w: + stream.withLoaderPacketWriter loader, w: w.swrite(lcRemoveCachedItem) w.swrite(cacheId) stream.close() @@ -1062,7 +1073,7 @@ proc removeCachedItem*(loader: FileLoader; cacheId: int) = proc addClient*(loader: FileLoader; key: ClientKey; pid: int; config: LoaderClientConfig): bool = let stream = loader.connect() - stream.withWriter w: + stream.withLoaderPacketWriter loader, w: w.swrite(lcAddClient) w.swrite(key) w.swrite(pid) @@ -1073,7 +1084,7 @@ proc addClient*(loader: FileLoader; key: ClientKey; pid: int; proc removeClient*(loader: FileLoader; pid: int) = let stream = loader.connect() if stream != nil: - stream.withWriter w: + stream.withLoaderPacketWriter loader, w: w.swrite(lcRemoveClient) w.swrite(pid) stream.close() diff --git a/src/local/client.nim b/src/local/client.nim index 8e29bdc6..29eaeb73 100644 --- a/src/local/client.nim +++ b/src/local/client.nim @@ -400,7 +400,7 @@ proc acceptBuffers(client: Client) = client.selector.registerHandle(fd, {Read, Write}, 0) for item in pager.procmap: let container = item.container - let stream = connectSocketStream(container.process, buffered = false) + let stream = connectSocketStream(container.process) if stream == nil: pager.alert("Error: failed to set up buffer") continue diff --git a/src/server/buffer.nim b/src/server/buffer.nim index a3c94bf3..71deed82 100644 --- a/src/server/buffer.nim +++ b/src/server/buffer.nim @@ -27,6 +27,7 @@ import html/enums import html/env import html/event import html/formdata as formdata_impl +import io/bufreader import io/bufstream import io/bufwriter import io/posixstream @@ -121,11 +122,9 @@ type charset: Charset cacheId: int outputId: int - dummyStream: StringStream InterfaceOpaque = ref object stream: SocketStream - dummyStream: StringStream len: int BufferInterface* = ref object @@ -146,16 +145,12 @@ type proc getFromOpaque[T](opaque: pointer, res: var T) = let opaque = cast[InterfaceOpaque](opaque) if opaque.len != 0: - let dummyStream = opaque.dummyStream - dummyStream.setPosition(0) - dummyStream.data = newString(opaque.len) - let n = opaque.stream.readData(addr dummyStream.data[0], opaque.len) - assert n == opaque.len - dummyStream.sread(res) + var r = opaque.stream.initReader(opaque.len) + r.sread(res) proc newBufferInterface*(stream: SocketStream, registerFun: proc(fd: int)): BufferInterface = - let opaque = InterfaceOpaque(stream: stream, dummyStream: newStringStream()) + let opaque = InterfaceOpaque(stream: stream) result = BufferInterface( map: newPromiseMap(cast[pointer](opaque)), packetid: 1, # ids below 1 are invalid @@ -976,7 +971,7 @@ proc clone*(buffer: Buffer, newurl: URL): int {.proxy.} = # We ignore errors; not much we can do with them here :/ discard buffer.rewind(buffer.bytesRead, unregister = false) buffer.pstream.close() - let ssock = initServerSocket(myPid, buffered = false) + let ssock = initServerSocket(myPid) buffer.ssock = ssock ps.write(char(0)) buffer.url = newurl @@ -1712,7 +1707,7 @@ proc markURL*(buffer: Buffer; schemes: seq[string]) {.proxy.} = buffer.do_reshape() macro bufferDispatcher(funs: static ProxyMap; buffer: Buffer; - cmd: BufferCommand; packetid: int) = + cmd: BufferCommand; packetid: int; r: var BufferedReader) = let switch = newNimNode(nnkCaseStmt) switch.add(ident("cmd")) for k, v in funs: @@ -1727,7 +1722,7 @@ macro bufferDispatcher(funs: static ProxyMap; buffer: Buffer; let typ = param[^2] stmts.add(quote do: var `id`: `typ` - `buffer`.dummyStream.sread(`id`) + `r`.sread(`id`) ) call.add(id) var rval: NimNode @@ -1773,13 +1768,10 @@ proc readCommand(buffer: Buffer) = var len: int var packetid: int buffer.pstream.sread(len) - buffer.dummyStream.setPosition(0) - buffer.dummyStream.data = newString(len) - let n = buffer.pstream.readData(addr buffer.dummyStream.data[0], len) - assert n == len - buffer.dummyStream.sread(cmd) - buffer.dummyStream.sread(packetid) - bufferDispatcher(ProxyFunctions, buffer, cmd, packetid) + var r = buffer.pstream.initReader(len) + r.sread(cmd) + r.sread(packetid) + bufferDispatcher(ProxyFunctions, buffer, cmd, packetid, r) proc handleRead(buffer: Buffer, fd: int): bool = if fd == buffer.rfd: @@ -1869,8 +1861,7 @@ proc launchBuffer*(config: BufferConfig; url: URL; request: Request; url: url, charsetStack: charsetStack, cacheId: -1, - outputId: -1, - dummyStream: newStringStream() + outputId: -1 ) buffer.charset = buffer.charsetStack.pop() socks.sread(buffer.loader.key) diff --git a/src/server/forkserver.nim b/src/server/forkserver.nim index 4a616a2e..60981213 100644 --- a/src/server/forkserver.nim +++ b/src/server/forkserver.nim @@ -5,6 +5,7 @@ import std/streams import std/tables import config/config +import io/bufreader import io/bufwriter import io/posixstream import io/serialize @@ -24,7 +25,7 @@ type fcForkBuffer, fcForkLoader, fcRemoveChild, fcLoadConfig ForkServer* = ref object - istream: Stream + istream: PosixStream ostream: PosixStream estream*: PosixStream @@ -35,7 +36,7 @@ type loaderPid: int proc newFileLoader*(forkserver: ForkServer; config: LoaderConfig): FileLoader = - forkserver.ostream.withWriter w: + forkserver.ostream.withPacketWriter w: w.swrite(fcForkLoader) w.swrite(config) var process: int @@ -43,19 +44,19 @@ proc newFileLoader*(forkserver: ForkServer; config: LoaderConfig): FileLoader = return FileLoader(process: process, clientPid: getCurrentProcessId()) proc loadForkServerConfig*(forkserver: ForkServer, config: Config) = - forkserver.ostream.withWriter w: + forkserver.ostream.withPacketWriter w: w.swrite(fcLoadConfig) w.swrite(config.getForkServerConfig()) proc removeChild*(forkserver: ForkServer, pid: int) = - forkserver.ostream.withWriter w: + forkserver.ostream.withPacketWriter w: w.swrite(fcRemoveChild) w.swrite(pid) proc forkBuffer*(forkserver: ForkServer; config: BufferConfig; url: URL; request: Request; attrs: WindowAttributes; ishtml: bool; charsetStack: seq[Charset]): int = - forkserver.ostream.withWriter w: + forkserver.ostream.withPacketWriter w: w.swrite(fcForkBuffer) w.swrite(config) w.swrite(url) @@ -109,19 +110,19 @@ proc forkLoader(ctx: var ForkServerContext, config: LoaderConfig): int = return pid var gssock: ServerSocket -proc forkBuffer(ctx: var ForkServerContext): int = +proc forkBuffer(ctx: var ForkServerContext; r: var BufferedReader): int = var config: BufferConfig var url: URL var request: Request var attrs: WindowAttributes var ishtml: bool var charsetStack: seq[Charset] - ctx.istream.sread(config) - ctx.istream.sread(url) - ctx.istream.sread(request) - ctx.istream.sread(attrs) - ctx.istream.sread(ishtml) - ctx.istream.sread(charsetStack) + r.sread(config) + r.sread(url) + r.sread(request) + r.sread(attrs) + r.sread(ishtml) + r.sread(charsetStack) var pipefd: array[2, cint] if pipe(pipefd) == -1: raise newException(Defect, "Failed to open pipe.") @@ -137,7 +138,7 @@ proc forkBuffer(ctx: var ForkServerContext): int = zeroMem(addr ctx, sizeof(ctx)) discard close(pipefd[0]) # close read let pid = getCurrentProcessId() - let ssock = initServerSocket(pid, buffered = false) + let ssock = initServerSocket(pid) gssock = ssock onSignal SIGTERM: # This will be overridden after buffer has been set up; it is only @@ -182,34 +183,35 @@ proc runForkServer() = signal(SIGCHLD, SIG_IGN) while true: try: - var cmd: ForkCommand - ctx.istream.sread(cmd) - case cmd - of fcRemoveChild: - var pid: int - ctx.istream.sread(pid) - let i = ctx.children.find(pid) - if i != -1: - ctx.children.del(i) - of fcForkBuffer: - let r = ctx.forkBuffer() - ctx.ostream.withWriter w: - w.swrite(r) - of fcForkLoader: - assert ctx.loaderPid == 0 - var config: LoaderConfig - ctx.istream.sread(config) - let pid = ctx.forkLoader(config) - ctx.ostream.withWriter w: - w.swrite(pid) - ctx.loaderPid = pid - ctx.children.add(pid) - of fcLoadConfig: - var config: ForkServerConfig - ctx.istream.sread(config) - set_cjk_ambiguous(config.ambiguous_double) - SocketDirectory = config.tmpdir - ctx.ostream.flush() + ctx.istream.withPacketReader r: + var cmd: ForkCommand + r.sread(cmd) + case cmd + of fcRemoveChild: + var pid: int + r.sread(pid) + let i = ctx.children.find(pid) + if i != -1: + ctx.children.del(i) + of fcForkBuffer: + let r = ctx.forkBuffer(r) + ctx.ostream.withWriter w: + w.swrite(r) + of fcForkLoader: + assert ctx.loaderPid == 0 + var config: LoaderConfig + r.sread(config) + let pid = ctx.forkLoader(config) + ctx.ostream.withWriter w: + w.swrite(pid) + ctx.loaderPid = pid + ctx.children.add(pid) + of fcLoadConfig: + var config: ForkServerConfig + r.sread(config) + set_cjk_ambiguous(config.ambiguous_double) + SocketDirectory = config.tmpdir + ctx.ostream.flush() except EOFError: # EOF break @@ -255,13 +257,10 @@ proc newForkServer*(): ForkServer = discard close(pipefd_in[0]) # close read discard close(pipefd_out[1]) # close write discard close(pipefd_err[1]) # close write - var readf: File - if not open(readf, pipefd_out[0], fmRead): - raise newException(Defect, "Failed to open input handle") let estream = newPosixStream(pipefd_err[0]) estream.setBlocking(false) return ForkServer( ostream: newPosixStream(pipefd_in[1]), - istream: newFileStream(readf), + istream: newPosixStream(pipefd_out[0]), estream: estream ) |