diff options
author | bptato <nincsnevem662@gmail.com> | 2024-03-24 14:12:27 +0100 |
---|---|---|
committer | bptato <nincsnevem662@gmail.com> | 2024-03-24 14:21:01 +0100 |
commit | fb21b1e4f0ee0e55e9556bf1f399d00d5eae26e4 (patch) | |
tree | c532aa98ee6bedf19b840f4ea0c7360a42407fbf /src | |
parent | b27deb7672c53e3ee59f91b7091e83ab28a8318d (diff) | |
download | chawan-fb21b1e4f0ee0e55e9556bf1f399d00d5eae26e4.tar.gz |
io: derive DynStream from RootObj (not Stream)
This way they are no longer compatible, but we no longer need them to be compatible anyway. (This also forces us to throw out the old serialize module, and use packet writers everywhere.)
Diffstat (limited to 'src')
-rw-r--r-- | src/html/dom.nim | 3 | ||||
-rw-r--r-- | src/html/env.nim | 17 | ||||
-rw-r--r-- | src/io/bufstream.nim | 3 | ||||
-rw-r--r-- | src/io/bufwriter.nim | 10 | ||||
-rw-r--r-- | src/io/dynstream.nim | 70 | ||||
-rw-r--r-- | src/io/filestream.nim | 34 | ||||
-rw-r--r-- | src/io/posixstream.nim | 17 | ||||
-rw-r--r-- | src/io/serialize.nim | 234 | ||||
-rw-r--r-- | src/io/socketstream.nim | 6 | ||||
-rw-r--r-- | src/js/console.nim | 9 | ||||
-rw-r--r-- | src/js/javascript.nim | 8 | ||||
-rw-r--r-- | src/js/timeout.nim | 16 | ||||
-rw-r--r-- | src/loader/cgi.nim | 4 | ||||
-rw-r--r-- | src/loader/loader.nim | 132 | ||||
-rw-r--r-- | src/loader/loaderhandle.nim | 11 | ||||
-rw-r--r-- | src/loader/response.nim | 3 | ||||
-rw-r--r-- | src/local/client.nim | 55 | ||||
-rw-r--r-- | src/local/container.nim | 8 | ||||
-rw-r--r-- | src/local/pager.nim | 54 | ||||
-rw-r--r-- | src/server/buffer.nim | 87 | ||||
-rw-r--r-- | src/server/forkserver.nim | 31 | ||||
-rw-r--r-- | src/types/formdata.nim | 13 |
22 files changed, 291 insertions, 534 deletions
diff --git a/src/html/dom.nim b/src/html/dom.nim index 41869a95..f0cc05c9 100644 --- a/src/html/dom.nim +++ b/src/html/dom.nim @@ -18,6 +18,7 @@ import img/bitmap import img/painter import img/path import img/png +import io/dynstream import io/promise import js/console import js/domexception @@ -3515,7 +3516,7 @@ proc fetchClassicScript(element: HTMLScriptElement, url: URL, element.onComplete(ScriptResult(t: RESULT_NULL)) return #TODO make this non-blocking somehow - let s = response.body.readAll() + let s = response.body.recvAll() let source = if cs in {CHARSET_UNKNOWN, CHARSET_UTF_8}: s.toValidUTF8() else: diff --git a/src/html/env.nim b/src/html/env.nim index 3ffff116..ebd9e83e 100644 --- a/src/html/env.nim +++ b/src/html/env.nim @@ -1,5 +1,4 @@ import std/selectors -import std/streams import bindings/quickjs import html/catom @@ -9,6 +8,7 @@ import html/event import html/formdata import html/script import html/xmlhttprequest +import io/filestream import io/promise import js/base64 import js/console @@ -34,7 +34,9 @@ proc appVersion(navigator: ptr Navigator): string {.jsfget.} = "5.0 (Windows)" proc platform(navigator: ptr Navigator): string {.jsfget.} = "Win32" proc product(navigator: ptr Navigator): string {.jsfget.} = "Gecko" proc productSub(navigator: ptr Navigator): string {.jsfget.} = "20100101" -proc userAgent(navigator: ptr Navigator): string {.jsfget.} = "chawan" #TODO TODO TODO this should be configurable +proc userAgent(navigator: ptr Navigator): string {.jsfget.} = + #TODO TODO TODO this should be configurable + "chawan" proc vendor(navigator: ptr Navigator): string {.jsfget.} = "" proc vendorSub(navigator: ptr Navigator): string {.jsfget.} = "" proc taintEnabled(navigator: ptr Navigator): bool {.jsfget.} = false @@ -153,7 +155,7 @@ proc getComputedStyle(window: Window, element: Element, #TODO implement this properly return ok(element.style) -proc addScripting*(window: Window, selector: Selector[int]) = +proc addScripting*(window: Window; selector: Selector[int]) = let rt = newJSRuntime() let ctx = rt.newJSContext() window.jsrt = rt @@ -166,11 +168,10 @@ proc addScripting*(window: Window, selector: Selector[int]) = evalJSFree = (proc(src, file: string) = let ret = window.jsctx.eval(src, file, JS_EVAL_TYPE_GLOBAL) if JS_IsException(ret): - let ss = newStringStream() - window.jsctx.writeException(ss) - ss.setPosition(0) window.console.log("Exception in document", $window.document.url, - ss.readAll()) + window.jsctx.getExceptionStr()) + else: + JS_FreeValue(ctx, ret) ) ) var global = JS_GetGlobalObject(ctx) @@ -200,7 +201,7 @@ proc runJSJobs*(window: Window) = proc newWindow*(scripting, images: bool, selector: Selector[int], attrs: WindowAttributes, factory: CAtomFactory, navigate: proc(url: URL) = nil, loader = none(FileLoader)): Window = - let err = newFileStream(stderr) + let err = newDynFileStream(stderr) let window = Window( attrs: attrs, console: newConsole(err), diff --git a/src/io/bufstream.nim b/src/io/bufstream.nim index 0558b61c..118b81e5 100644 --- a/src/io/bufstream.nim +++ b/src/io/bufstream.nim @@ -46,9 +46,8 @@ proc flushWrite*(s: BufStream): bool = return false proc newBufStream*(ps: PosixStream, registerFun: proc(fd: int)): BufStream = - result = BufStream( + return BufStream( source: ps, blocking: ps.blocking, registerFun: registerFun ) - result.addStreamIface() diff --git a/src/io/bufwriter.nim b/src/io/bufwriter.nim index cbee8b5b..75da4190 100644 --- a/src/io/bufwriter.nim +++ b/src/io/bufwriter.nim @@ -1,4 +1,5 @@ -# Write data to streams. +# Write data to streams in packets. +# Each packet is prefixed with its length as a pointer-sized integer. import std/options import std/sets @@ -53,13 +54,6 @@ proc deinit*(writer: var BufferedWriter) = writer.bufSize = 0 writer.bufLen = 0 -template withWriter*(stream: DynStream; w, body: untyped) = - block: - var w = stream.initWriter() - body - w.flush() - w.deinit() - template withPacketWriter*(stream: DynStream; w, body: untyped) = block: var w = stream.initWriter(writeLen = true) diff --git a/src/io/dynstream.nim b/src/io/dynstream.nim index 10db4c64..1b0f8807 100644 --- a/src/io/dynstream.nim +++ b/src/io/dynstream.nim @@ -1,7 +1,5 @@ -import std/streams - type - DynStream* = ref object of Stream #TODO should be of RootObj + DynStream* = ref object of RootObj isend*: bool blocking*: bool #TODO move to posixstream @@ -46,6 +44,19 @@ proc sendDataLoop*(s: DynStream; buffer: pointer; len: int) = if n == len: break +proc sendDataLoop*(s: DynStream; buffer: openArray[char]) {.inline.} = + s.sendDataLoop(unsafeAddr buffer[0], buffer.len) + +proc write*(s: DynStream; buffer: openArray[char]) {.inline.} = + s.sendDataLoop(buffer) + +proc write*(s: DynStream; c: char) {.inline.} = + s.sendDataLoop(unsafeAddr c, 1) + +proc sreadChar*(s: DynStream): char = + let n = s.recvData(addr result, 1) + assert n == 1 + proc recvDataLoop*(s: DynStream; buffer: pointer; len: int) = var n = 0 while true: @@ -53,47 +64,18 @@ proc recvDataLoop*(s: DynStream; buffer: pointer; len: int) = if n == len: break -proc dsClose(s: Stream) = - DynStream(s).sclose() +proc recvDataLoop*(s: DynStream; buffer: var openArray[uint8]) {.inline.} = + s.recvDataLoop(addr buffer[0], buffer.len) -proc dsReadData(s: Stream, buffer: pointer, len: int): int = - let s = DynStream(s) - assert len != 0 and s.blocking - result = 0 - while result < len: - let p = addr cast[ptr UncheckedArray[uint8]](buffer)[result] - let n = s.recvData(p, len - result) - if n == 0: - break - result += n - -proc dsWriteData(s: Stream, buffer: pointer, len: int) = - let s = DynStream(s) - assert len != 0 and s.blocking - discard s.sendData(buffer, len) - -proc dsReadLine(s: Stream, line: var string): bool = - let s = DynStream(s) - assert s.blocking - line = "" - var c: char +proc recvAll*(s: DynStream): string = + var buffer = newString(4096) + var idx = 0 while true: - if s.recvData(addr c, 1) == 0: - return false - if c == '\r': - if s.recvData(addr c, 1) == 0: - return false - if c == '\n': + let n = s.recvData(addr buffer[idx], buffer.len - idx) + if n == 0: break - line &= c - true - -proc dsAtEnd(s: Stream): bool = - return DynStream(s).isend - -proc addStreamIface*(s: DynStream) = - s.closeImpl = cast[typeof(s.closeImpl)](dsClose) - s.readDataImpl = cast[typeof(s.readDataImpl)](dsReadData) - s.writeDataImpl = cast[typeof(s.writeDataImpl)](dsWriteData) - s.readLineImpl = cast[typeof(s.readLineImpl)](dsReadLine) - s.atEndImpl = dsAtEnd + idx += n + if idx == buffer.len: + buffer.setLen(buffer.len + 4096) + buffer.setLen(idx) + return buffer diff --git a/src/io/filestream.nim b/src/io/filestream.nim new file mode 100644 index 00000000..b1b3a296 --- /dev/null +++ b/src/io/filestream.nim @@ -0,0 +1,34 @@ +import io/dynstream + +type + DynFileStream* = ref object of DynStream + file*: File + +method recvData*(s: DynFileStream; buffer: pointer; len: int): int = + let n = s.file.readBuffer(buffer, len) + if n == 0: + if unlikely(s.isend): + raise newException(EOFError, "eof") + s.isend = true + return n + +method sendData*(s: DynFileStream; buffer: pointer; len: int): int = + return s.file.writeBuffer(buffer, len) + +method seek*(s: DynFileStream; off: int) = + s.file.setFilePos(int64(off)) + +method sclose*(s: DynFileStream) = + s.file.close() + +method sflush*(s: DynFileStream) = + s.file.flushFile() + +proc newDynFileStream*(file: File): DynFileStream = + return DynFileStream(file: file, blocking: true) + +proc newDynFileStream*(path: string): DynFileStream = + var file: File + if file.open(path): + return newDynFileStream(path) + return nil diff --git a/src/io/posixstream.nim b/src/io/posixstream.nim index 0b06c572..bdae9d50 100644 --- a/src/io/posixstream.nim +++ b/src/io/posixstream.nim @@ -1,4 +1,3 @@ -# stdlib file handling is broken, so we use this instead of FileStream. import std/posix import io/dynstream @@ -53,24 +52,12 @@ proc sreadChar*(s: PosixStream): char = s.isend = true assert n == 1 -proc recvData*(s: PosixStream, buffer: var openArray[uint8]): int {.inline.} = - return s.recvData(addr buffer[0], buffer.len) - -proc recvData*(s: PosixStream, buffer: var openArray[char]): int {.inline.} = - return s.recvData(addr buffer[0], buffer.len) - method sendData*(s: PosixStream, buffer: pointer, len: int): int = let n = write(s.fd, buffer, len) if n < 0: raisePosixIOError() return n -proc sendData*(s: PosixStream, buffer: openArray[char]): int {.inline.} = - return s.sendData(unsafeAddr buffer[0], buffer.len) - -proc sendData*(s: PosixStream, buffer: openArray[uint8]): int {.inline.} = - return s.sendData(unsafeAddr buffer[0], buffer.len) - method setBlocking*(s: PosixStream, blocking: bool) {.base.} = s.blocking = blocking let ofl = fcntl(s.fd, F_GETFL, 0) @@ -87,9 +74,7 @@ method sclose*(s: PosixStream) = discard close(s.fd) proc newPosixStream*(fd: FileHandle): PosixStream = - let ps = PosixStream(fd: fd, blocking: true) - ps.addStreamIface() - return ps + return PosixStream(fd: fd, blocking: true) proc newPosixStream*(path: string, flags, mode: cint): PosixStream = let fd = open(cstring(path), flags, mode) diff --git a/src/io/serialize.nim b/src/io/serialize.nim deleted file mode 100644 index 0b54bea3..00000000 --- a/src/io/serialize.nim +++ /dev/null @@ -1,234 +0,0 @@ -# Write data to streams. - -import std/options -import std/sets -import std/streams -import std/tables - -import types/blob -import types/formdata -import types/url -import types/opt - -proc sread*(stream: Stream, n: var SomeNumber) -func slen*(n: SomeNumber): int - -proc sread*[T](stream: Stream, s: var set[T]) -func slen*[T](s: set[T]): int - -proc sread*[T: enum](stream: Stream, x: var T) -func slen*[T: enum](x: T): int - -proc sread*(stream: Stream, s: var string) -func slen*(s: string): int - -proc sread*(stream: Stream, b: var bool) -func slen*(b: bool): int - -func slen*(url: URL): int - -func slen*(tup: tuple): int - -proc sread*[I, T](stream: Stream, a: var array[I, T]) -func slen*[I, T](a: array[I, T]): int - -proc sread*(stream: Stream, s: var seq) -func slen*(s: seq): int - -proc sread*[U, V](stream: Stream, t: var Table[U, V]) -func slen*[U, V](t: Table[U, V]): int - -proc sread*(stream: Stream, obj: var object) -func slen*(obj: object): int - -proc sread*(stream: Stream, obj: var ref object) -func slen*(obj: ref object): int - -func slen*(part: FormDataEntry): int - -func slen*(blob: Blob): int - -proc sread*[T](stream: Stream, o: var Option[T]) -func slen*[T](o: Option[T]): int - -proc sread*[T, E](stream: Stream, o: var Result[T, E]) -func slen*[T, E](o: Result[T, E]): int - -proc sread*(stream: Stream, n: var SomeNumber) = - if stream.readData(addr n, sizeof(n)) < sizeof(n): - raise newException(EOFError, "eof") - -func slen*(n: SomeNumber): int = - return sizeof(n) - -proc sread*[T: enum](stream: Stream, x: var T) = - var i: int - stream.sread(i) - x = cast[T](i) - -func slen*[T: enum](x: T): int = - return sizeof(int) - -proc sread*[T](stream: Stream, s: var set[T]) = - var len: int - stream.sread(len) - for i in 0 ..< len: - var x: T - stream.sread(x) - s.incl(x) - -func slen*[T](s: set[T]): int = - result = slen(s.card) - for x in s: - result += slen(x) - -proc sread*(stream: Stream, s: var string) = - var len: int - stream.sread(len) - if len > 0: - s = newString(len) - prepareMutation(s) - if stream.readData(addr s[0], len) < len: - raise newException(EOFError, "eof") - else: - s = "" - -func slen*(s: string): int = - slen(s.len) + s.len - -proc sread*(stream: Stream, b: var bool) = - var n: uint8 - stream.sread(n) - if n == 1u8: - b = true - else: - assert n == 0u8 - b = false - -func slen*(b: bool): int = - return sizeof(uint8) - -func slen*(url: URL): int = - if url == nil: - return slen("") - return slen(url.serialize()) - -func slen*(tup: tuple): int = - for f in tup.fields: - result += slen(f) - -proc sread*[I, T](stream: Stream; a: var array[I, T]) = - for x in a.mitems: - stream.sread(x) - -func slen*[I, T](a: array[I, T]): int = - for x in a: - result += slen(x) - -proc sread*(stream: Stream, s: var seq) = - var len: int - stream.sread(len) - s.setLen(len) - for x in s.mitems: - stream.sread(x) - -func slen*(s: seq): int = - result = slen(s.len) - for x in s: - result += slen(x) - -proc sread*[U, V](stream: Stream, t: var Table[U, V]) = - var len: int - stream.sread(len) - for i in 0..<len: - var k: U - stream.sread(k) - var v: V - stream.sread(v) - t[k] = v - -func slen*[U, V](t: Table[U, V]): int = - result = slen(t.len) - for k, v in t: - result += slen(k) - result += slen(v) - -proc sread*(stream: Stream, obj: var object) = - for f in obj.fields: - stream.sread(f) - -func slen*(obj: object): int = - for f in obj.fields: - result += slen(f) - -proc sread*(stream: Stream, obj: var ref object) = - var n: bool - stream.sread(n) - if n: - new(obj) - stream.sread(obj[]) - -func slen*(obj: ref object): int = - result = slen(obj != nil) - if obj != nil: - result += slen(obj[]) - -func slen*(part: FormDataEntry): int = - result += slen(part.isstr) - result += slen(part.name) - result += slen(part.filename) - if part.isstr: - result += slen(part.svalue) - else: - result += slen(part.value) - -func slen*(blob: Blob): int = - result += slen(blob.isfile) - if blob.isfile: - result = slen(WebFile(blob).path) - else: - result += slen(blob.ctype) - result += slen(blob.size) - result += int(blob.size) #TODO ?? - -proc sread*[T](stream: Stream, o: var Option[T]) = - var x: bool - stream.sread(x) - if x: - var m: T - stream.sread(m) - o = some(m) - else: - o = none(T) - -func slen*[T](o: Option[T]): int = - result = slen(o.isSome) - if o.isSome: - result += slen(o.get) - -proc sread*[T, E](stream: Stream, o: var Result[T, E]) = - var x: bool - stream.sread(x) - if x: - when not (T is void): - var m: T - stream.sread(m) - o.ok(m) - else: - o.ok() - else: - when not (E is void): - var e: E - stream.sread(e) - o.err(e) - else: - o.err() - -func slen*[T, E](o: Result[T, E]): int = - result = slen(o.isSome) - if o.isSome: - when not (T is void): - result += slen(o.get) - else: - when not (E is void): - result += slen(o.error) diff --git a/src/io/socketstream.nim b/src/io/socketstream.nim index 78e7fb3e..3c8e6fa6 100644 --- a/src/io/socketstream.nim +++ b/src/io/socketstream.nim @@ -69,12 +69,11 @@ proc connectSocketStream*(path: string; blocking = true): SocketStream = if connect_unix_from_c(cint(sock.getFd()), cstring(path), cint(path.len)) != 0: raiseOSError(osLastError()) - result = SocketStream( + return SocketStream( source: sock, fd: cint(sock.getFd()), blocking: blocking ) - result.addStreamIface() proc connectSocketStream*(pid: int; blocking = true): SocketStream = @@ -88,9 +87,8 @@ proc acceptSocketStream*(ssock: ServerSocket, blocking = true): SocketStream = ssock.sock.accept(sock, inheritable = true) if not blocking: sock.getFd().setBlocking(false) - result = SocketStream( + return SocketStream( blocking: blocking, source: sock, fd: cint(sock.getFd()) ) - result.addStreamIface() diff --git a/src/js/console.nim b/src/js/console.nim index e0ac2ae2..d5e074e8 100644 --- a/src/js/console.nim +++ b/src/js/console.nim @@ -1,16 +1,15 @@ -import std/streams - +import io/dynstream import js/javascript type Console* = ref object - err*: Stream + err*: DynStream clearFun: proc() showFun: proc() hideFun: proc() jsDestructor(Console) -proc newConsole*(err: Stream; clearFun: proc() = nil; showFun: proc() = nil; +proc newConsole*(err: DynStream; clearFun: proc() = nil; showFun: proc() = nil; hideFun: proc() = nil): Console = return Console( err: err, @@ -25,7 +24,7 @@ proc log*(console: Console, ss: varargs[string]) {.jsfunc.} = if i != ss.high: console.err.write(' ') console.err.write('\n') - console.err.flush() + console.err.sflush() proc clear(console: Console) {.jsfunc.} = if console.clearFun != nil: diff --git a/src/js/javascript.nim b/src/js/javascript.nim index b4c68729..cbd0b205 100644 --- a/src/js/javascript.nim +++ b/src/js/javascript.nim @@ -44,11 +44,11 @@ import std/macros import std/options import std/sets -import std/streams import std/strutils import std/tables import std/unicode +import io/dynstream import js/error import js/fromjs import js/opaque @@ -208,11 +208,11 @@ proc getExceptionStr*(ctx: JSContext): string = JS_FreeValue(ctx, stack) JS_FreeValue(ctx, ex) -proc writeException*(ctx: JSContext, s: Stream) = +proc writeException*(ctx: JSContext, s: DynStream) = s.write(ctx.getExceptionStr()) - s.flush() + s.sflush() -proc runJSJobs*(rt: JSRuntime, err: Stream) = +proc runJSJobs*(rt: JSRuntime, err: DynStream) = while JS_IsJobPending(rt): var ctx: JSContext let r = JS_ExecutePendingJob(rt, addr ctx) diff --git a/src/js/timeout.nim b/src/js/timeout.nim index 8ef7c114..3c95dada 100644 --- a/src/js/timeout.nim +++ b/src/js/timeout.nim @@ -1,7 +1,7 @@ import std/selectors -import std/streams import std/tables +import io/dynstream import js/javascript type TimeoutState* = object @@ -12,10 +12,10 @@ type TimeoutState* = object interval_fdis: Table[int, int32] selector: Selector[int] #TODO would be better with void... jsctx: JSContext - err: Stream #TODO shouldn't be needed + err: DynStream #TODO shouldn't be needed evalJSFree: proc(src, file: string) #TODO ew -func newTimeoutState*(selector: Selector[int], jsctx: JSContext, err: Stream, +func newTimeoutState*(selector: Selector[int]; jsctx: JSContext; err: DynStream; evalJSFree: proc(src, file: string)): TimeoutState = return TimeoutState( selector: selector, @@ -28,7 +28,7 @@ func empty*(state: TimeoutState): bool = return state.timeouts.len == 0 and state.intervals.len == 0 #TODO varargs -proc setTimeout*[T: JSValue|string](state: var TimeoutState, handler: T, +proc setTimeout*[T: JSValue|string](state: var TimeoutState; handler: T; timeout = 0i32): int32 = let id = state.timeoutid inc state.timeoutid @@ -52,14 +52,14 @@ proc setTimeout*[T: JSValue|string](state: var TimeoutState, handler: T, ), fdi) return id -proc clearTimeout*(state: var TimeoutState, id: int32) = +proc clearTimeout*(state: var TimeoutState; id: int32) = if id in state.timeouts: let timeout = state.timeouts[id] state.selector.unregister(timeout.fdi) state.timeout_fdis.del(timeout.fdi) state.timeouts.del(id) -proc clearInterval*(state: var TimeoutState, id: int32) = +proc clearInterval*(state: var TimeoutState; id: int32) = if id in state.intervals: let interval = state.intervals[id] state.selector.unregister(interval.fdi) @@ -68,7 +68,7 @@ proc clearInterval*(state: var TimeoutState, id: int32) = state.intervals.del(id) #TODO varargs -proc setInterval*[T: JSValue|string](state: var TimeoutState, handler: T, +proc setInterval*[T: JSValue|string](state: var TimeoutState; handler: T; interval = 0i32): int32 = let id = state.timeoutid inc state.timeoutid @@ -91,7 +91,7 @@ proc setInterval*[T: JSValue|string](state: var TimeoutState, handler: T, ), fdi, fun) return id -proc runTimeoutFd*(state: var TimeoutState, fd: int): bool = +proc runTimeoutFd*(state: var TimeoutState; fd: int): bool = if fd in state.interval_fdis: state.intervals[state.interval_fdis[fd]].handler() return true diff --git a/src/loader/cgi.nim b/src/loader/cgi.nim index 067e784d..7d08f4a2 100644 --- a/src/loader/cgi.nim +++ b/src/loader/cgi.nim @@ -1,9 +1,9 @@ import std/options import std/os import std/posix -import std/streams import std/strutils +import io/dynstream import io/posixstream import io/stdio import loader/connecterror @@ -221,7 +221,7 @@ proc loadCGI*(handle: LoaderHandle; request: Request; cgiDir: seq[string]; let multipart = request.multipart.get for entry in multipart.entries: ps.writeEntry(entry, multipart.boundary) - ps.close() + ps.sclose() handle.parser = HeaderParser(headers: newHeaders()) handle.istream = newPosixStream(pipefd[0]) diff --git a/src/loader/loader.nim b/src/loader/loader.nim index b0c5b6ab..42271ea3 100644 --- a/src/loader/loader.nim +++ b/src/loader/loader.nim @@ -5,6 +5,7 @@ # C: Request # S: res (0 => success, _ => error) # if success: +# S: output ID # S: status code # S: headers # S: response body @@ -20,15 +21,14 @@ import std/options import std/os import std/posix import std/selectors -import std/streams import std/strutils import std/tables import io/bufreader import io/bufwriter +import io/dynstream import io/posixstream import io/promise -import io/serialize import io/serversocket import io/socketstream import io/tempfile @@ -211,12 +211,12 @@ proc redirectToFile(ctx: LoaderContext; output: OutputHandle; if output.currentBuffer != nil: let n = ps.sendData(output.currentBuffer, output.currentBufferIdx) if unlikely(n < output.currentBuffer.len - output.currentBufferIdx): - ps.close() + ps.sclose() return false for buffer in output.buffers: let n = ps.sendData(buffer) if unlikely(n < buffer.len): - ps.close() + ps.sclose() return false if output.parent != nil: output.parent.outputs.add(OutputHandle( @@ -312,7 +312,7 @@ proc loadStreamRegular(ctx: LoaderContext; handle, cachedHandle: LoaderHandle) = handle.outputs.del(i) for output in handle.outputs: if r == hrrUnregister: - output.ostream.close() + output.ostream.sclose() output.ostream = nil elif cachedHandle != nil: output.parent = cachedHandle @@ -324,10 +324,10 @@ proc loadStreamRegular(ctx: LoaderContext; handle, cachedHandle: LoaderHandle) = ctx.outputMap[output.ostream.fd] = output else: assert output.ostream.fd notin ctx.outputMap - output.ostream.close() + output.ostream.sclose() output.ostream = nil handle.outputs.setLen(0) - handle.istream.close() + handle.istream.sclose() handle.istream = nil proc loadStream(ctx: LoaderContext; client: ClientData; handle: LoaderHandle; @@ -473,13 +473,13 @@ proc addClient(ctx: LoaderContext; stream: SocketStream; r.sread(key) r.sread(pid) r.sread(config) - stream.withWriter w: + stream.withPacketWriter w: if pid in ctx.clientData or key == default(ClientKey): w.swrite(false) else: ctx.clientData[pid] = ClientData(pid: pid, key: key, config: config) w.swrite(true) - stream.close() + stream.sclose() proc cleanup(client: ClientData) = for it in client.cacheMap: @@ -495,7 +495,7 @@ proc removeClient(ctx: LoaderContext; stream: SocketStream; let client = ctx.clientData[pid] client.cleanup() ctx.clientData.del(pid) - stream.close() + stream.sclose() proc addCacheFile(ctx: LoaderContext; stream: SocketStream; r: var BufferedReader) = @@ -507,10 +507,10 @@ proc addCacheFile(ctx: LoaderContext; stream: SocketStream; assert output != nil let targetClient = ctx.clientData[targetPid] let (id, file) = ctx.addCacheFile(targetClient, output) - stream.withWriter w: + stream.withPacketWriter w: w.swrite(id) w.swrite(file) - stream.close() + stream.sclose() proc redirectToFile(ctx: LoaderContext; stream: SocketStream; r: var BufferedReader) = @@ -522,9 +522,9 @@ proc redirectToFile(ctx: LoaderContext; stream: SocketStream; var success = false if output != nil: success = ctx.redirectToFile(output, targetPath) - stream.withWriter w: + stream.withPacketWriter w: w.swrite(success) - stream.close() + stream.sclose() proc shareCachedItem(ctx: LoaderContext; stream: SocketStream; r: var BufferedReader) = @@ -542,14 +542,14 @@ proc shareCachedItem(ctx: LoaderContext; stream: SocketStream; let item = sourceClient.cacheMap[n] inc item.refc targetClient.cacheMap.add(item) - stream.close() + stream.sclose() proc passFd(ctx: LoaderContext; stream: SocketStream; r: var BufferedReader) = var id: string r.sread(id) let fd = stream.recvFileHandle() ctx.passedFdMap[id] = fd - stream.close() + stream.sclose() proc removeCachedItem(ctx: LoaderContext; stream: SocketStream; client: ClientData; r: var BufferedReader) = @@ -562,7 +562,7 @@ proc removeCachedItem(ctx: LoaderContext; stream: SocketStream; dec item.refc if item.refc == 0: discard unlink(cstring(item.path)) - stream.close() + stream.sclose() proc tee(ctx: LoaderContext; stream: SocketStream; client: ClientData; r: var BufferedReader) = @@ -576,13 +576,13 @@ proc tee(ctx: LoaderContext; stream: SocketStream; client: ClientData; if output != nil: let id = ctx.getOutputId() output.tee(stream, id, targetPid) - stream.withWriter w: + stream.withPacketWriter w: w.swrite(id) stream.setBlocking(false) else: - stream.withWriter w: + stream.withPacketWriter w: w.swrite(-1) - stream.close() + stream.sclose() proc suspend(ctx: LoaderContext; stream: SocketStream; client: ClientData; r: var BufferedReader) = @@ -619,12 +619,12 @@ proc acceptConnection(ctx: LoaderContext) = r.sread(key) if myPid notin ctx.clientData: # possibly already removed - stream.close() + stream.sclose() return let client = ctx.clientData[myPid] if client.key != key: # ditto - stream.close() + stream.sclose() return var cmd: LoaderCommand r.sread(cmd) @@ -661,7 +661,7 @@ proc acceptConnection(ctx: LoaderContext) = ctx.resume(stream, client, r) except ErrorBrokenPipe: # receiving end died while reading the file; give up. - stream.close() + stream.sclose() proc exitLoader(ctx: LoaderContext) = ctx.ssock.close() @@ -684,7 +684,7 @@ proc initLoaderContext(fd: cint; config: LoaderConfig): LoaderContext = # The server has been initialized, so the main process can resume execution. let ps = newPosixStream(fd) ps.write(char(0u8)) - ps.close() + ps.sclose() onSignal SIGTERM: discard sig gctx.exitLoader() @@ -710,11 +710,11 @@ proc initLoaderContext(fd: cint; config: LoaderConfig): LoaderContext = r.sread(key) r.sread(pid) r.sread(config) - stream.withWriter w: + stream.withPacketWriter w: w.swrite(true) ctx.pagerClient = ClientData(key: key, pid: pid, config: config) ctx.clientData[pid] = ctx.pagerClient - stream.close() + stream.sclose() # unblock main socket ctx.ssock.sock.getFd().setBlocking(false) # for CGI @@ -763,7 +763,7 @@ proc finishCycle(ctx: LoaderContext; unregRead: var seq[LoaderHandle]; if handle.istream != nil: ctx.selector.unregister(handle.istream.fd) ctx.handleMap.del(handle.istream.fd) - handle.istream.close() + handle.istream.sclose() handle.istream = nil if handle.parser != nil: handle.finishParse() @@ -776,7 +776,7 @@ proc finishCycle(ctx: LoaderContext; unregRead: var seq[LoaderHandle]; if output.registered: ctx.selector.unregister(output.ostream.fd) ctx.outputMap.del(output.ostream.fd) - output.ostream.close() + output.ostream.sclose() output.ostream = nil let handle = output.parent if handle != nil: # may be nil if from loadStream S_ISREG @@ -786,7 +786,7 @@ proc finishCycle(ctx: LoaderContext; unregRead: var seq[LoaderHandle]; # premature end of all output streams; kill istream too ctx.selector.unregister(handle.istream.fd) ctx.handleMap.del(handle.istream.fd) - handle.istream.close() + handle.istream.sclose() handle.istream = nil if handle.parser != nil: handle.finishParse() @@ -872,7 +872,7 @@ proc fetch*(loader: FileLoader; input: Request): FetchPromise = return promise proc reconnect*(loader: FileLoader; data: ConnectData) = - data.stream.close() + data.stream.sclose() let stream = loader.connect() stream.withLoaderPacketWriter loader, w: w.swrite(lcLoad) @@ -892,27 +892,24 @@ proc switchStream*(loader: FileLoader; data: var OngoingData; stream: SocketStream) = data.response.body = stream let fd = int(stream.fd) - let realCloseImpl = stream.closeImpl - stream.closeImpl = nil data.response.unregisterFun = proc() = loader.ongoing.del(fd) loader.unregistered.add(fd) loader.unregisterFun(fd) - realCloseImpl(stream) proc suspend*(loader: FileLoader; fds: seq[int]) = let stream = loader.connect() stream.withLoaderPacketWriter loader, w: w.swrite(lcSuspend) w.swrite(fds) - stream.close() + stream.sclose() proc resume*(loader: FileLoader; fds: seq[int]) = let stream = loader.connect() stream.withLoaderPacketWriter loader, w: w.swrite(lcResume) w.swrite(fds) - stream.close() + stream.sclose() proc tee*(loader: FileLoader; sourceId, targetPid: int): (SocketStream, int) = let stream = loader.connect() @@ -921,7 +918,8 @@ proc tee*(loader: FileLoader; sourceId, targetPid: int): (SocketStream, int) = w.swrite(sourceId) w.swrite(targetPid) var outputId: int - stream.sread(outputId) + var r = stream.initPacketReader() + r.sread(outputId) return (stream, outputId) proc addCacheFile*(loader: FileLoader; outputId, targetPid: int): @@ -933,10 +931,11 @@ proc addCacheFile*(loader: FileLoader; outputId, targetPid: int): w.swrite(lcAddCacheFile) w.swrite(outputId) w.swrite(targetPid) + var r = stream.initPacketReader() var outputId: int var cacheFile: string - stream.sread(outputId) - stream.sread(cacheFile) + r.sread(outputId) + r.sread(cacheFile) return (outputId, cacheFile) proc redirectToFile*(loader: FileLoader; outputId: int; targetPath: string): @@ -948,35 +947,33 @@ proc redirectToFile*(loader: FileLoader; outputId: int; targetPath: string): w.swrite(lcRedirectToFile) w.swrite(outputId) w.swrite(targetPath) - stream.sread(result) + var r = stream.initPacketReader() + r.sread(result) const BufferSize = 4096 -proc handleHeaders(response: Response; request: Request; stream: SocketStream) = - stream.sread(response.outputId) - stream.sread(response.status) - stream.sread(response.headers) - proc onConnected*(loader: FileLoader, fd: int) = let connectData = loader.connecting[fd] let stream = connectData.stream let promise = connectData.promise let request = connectData.request + var r = stream.initPacketReader() var res: int - stream.sread(res) + r.sread(res) # packet 1 let response = newResponse(res, request, stream) if res == 0: - response.handleHeaders(request, stream) + r.sread(response.outputId) # packet 1 + r = stream.initPacketReader() + r.sread(response.status) # packet 2 + r = stream.initPacketReader() + r.sread(response.headers) # packet 3 # Only a stream of the response body may arrive after this point. response.body = stream assert loader.unregisterFun != nil - let realCloseImpl = stream.closeImpl - stream.closeImpl = nil response.unregisterFun = proc() = loader.ongoing.del(fd) loader.unregistered.add(fd) loader.unregisterFun(fd) - realCloseImpl(stream) loader.ongoing[fd] = OngoingData( response: response, bodyRead: response.bodyRead @@ -987,7 +984,7 @@ proc onConnected*(loader: FileLoader, fd: int) = var msg: string # msg is discarded. #TODO maybe print if called from trusted code (i.e. global == client)? - stream.sread(msg) + r.sread(msg) # packet 1 loader.unregisterFun(fd) loader.unregistered.add(fd) let err = newTypeError("NetworkError when attempting to fetch resource") @@ -997,7 +994,7 @@ proc onConnected*(loader: FileLoader, fd: int) = proc onRead*(loader: FileLoader; fd: int) = loader.ongoing.withValue(fd, buffer): let response = buffer[].response - while not response.body.atEnd(): + while not response.body.isend: let olen = buffer[].buf.len try: buffer[].buf.setLen(olen + BufferSize) @@ -1008,7 +1005,7 @@ proc onRead*(loader: FileLoader; fd: int) = except ErrorAgain: buffer[].buf.setLen(olen) break - if response.body.atEnd(): + if response.body.isend: buffer[].bodyRead.resolve(buffer[].buf) buffer[].bodyRead = nil buffer[].buf = "" @@ -1019,10 +1016,10 @@ proc onError*(loader: FileLoader; fd: int) = let response = buffer[].response when defined(debug): var lbuf {.noinit.}: array[BufferSize, char] - if not response.body.atEnd(): + if not response.body.isend: let n = response.body.recvData(addr lbuf[0], lbuf.len) assert n == 0 - assert response.body.atEnd() + assert response.body.isend buffer[].bodyRead.resolve(buffer[].buf) buffer[].bodyRead = nil buffer[].buf = "" @@ -1032,15 +1029,21 @@ proc onError*(loader: FileLoader; fd: int) = proc doRequest*(loader: FileLoader; request: Request): Response = let stream = loader.startRequest(request) let response = Response(url: request.url) - stream.sread(response.res) + var r = stream.initPacketReader() + var res: int + r.sread(res) # packet 1 if response.res == 0: - response.handleHeaders(request, stream) + r.sread(response.outputId) # packet 1 + r = stream.initPacketReader() + r.sread(response.status) # packet 2 + r = stream.initPacketReader() + r.sread(response.headers) # packet 3 # Only a stream of the response body may arrive after this point. response.body = stream else: var msg: string - stream.sread(msg) - stream.close() + r.sread(msg) # packet 1 + stream.sclose() return response proc shareCachedItem*(loader: FileLoader; id, targetPid: int) = @@ -1051,7 +1054,7 @@ proc shareCachedItem*(loader: FileLoader; id, targetPid: int) = w.swrite(loader.clientPid) w.swrite(targetPid) w.swrite(id) - stream.close() + stream.sclose() proc passFd*(loader: FileLoader; id: string; fd: FileHandle) = let stream = loader.connect() @@ -1060,7 +1063,7 @@ proc passFd*(loader: FileLoader; id: string; fd: FileHandle) = w.swrite(lcPassFd) w.swrite(id) stream.sendFileHandle(fd) - stream.close() + stream.sclose() proc removeCachedItem*(loader: FileLoader; cacheId: int) = let stream = loader.connect() @@ -1068,7 +1071,7 @@ proc removeCachedItem*(loader: FileLoader; cacheId: int) = stream.withLoaderPacketWriter loader, w: w.swrite(lcRemoveCachedItem) w.swrite(cacheId) - stream.close() + stream.sclose() proc addClient*(loader: FileLoader; key: ClientKey; pid: int; config: LoaderClientConfig): bool = @@ -1078,8 +1081,9 @@ proc addClient*(loader: FileLoader; key: ClientKey; pid: int; w.swrite(key) w.swrite(pid) w.swrite(config) - stream.sread(result) - stream.close() + var r = stream.initPacketReader() + r.sread(result) + stream.sclose() proc removeClient*(loader: FileLoader; pid: int) = let stream = loader.connect() @@ -1087,4 +1091,4 @@ proc removeClient*(loader: FileLoader; pid: int) = stream.withLoaderPacketWriter loader, w: w.swrite(lcRemoveClient) w.swrite(pid) - stream.close() + stream.sclose() diff --git a/src/loader/loaderhandle.nim b/src/loader/loaderhandle.nim index 6e1b53a9..00f6f754 100644 --- a/src/loader/loaderhandle.nim +++ b/src/loader/loaderhandle.nim @@ -1,6 +1,5 @@ import std/deques import std/net -import std/streams import std/tables import io/bufwriter @@ -129,7 +128,7 @@ proc sendResult*(handle: LoaderHandle; res: int; msg = "") = let output = handle.output let blocking = output.ostream.blocking output.ostream.setBlocking(true) - output.ostream.withWriter w: + output.ostream.withPacketWriter w: w.swrite(res) if res == 0: # success assert msg == "" @@ -143,7 +142,7 @@ proc sendStatus*(handle: LoaderHandle; status: uint16) = inc handle.rstate let blocking = handle.output.ostream.blocking handle.output.ostream.setBlocking(true) - handle.output.ostream.withWriter w: + handle.output.ostream.withPacketWriter w: w.swrite(status) handle.output.ostream.setBlocking(blocking) @@ -152,7 +151,7 @@ proc sendHeaders*(handle: LoaderHandle; headers: Headers) = inc handle.rstate let blocking = handle.output.ostream.blocking handle.output.ostream.setBlocking(true) - handle.output.ostream.withWriter w: + handle.output.ostream.withPacketWriter w: w.swrite(headers) handle.output.ostream.setBlocking(blocking) @@ -169,8 +168,8 @@ proc close*(handle: LoaderHandle) = for output in handle.outputs: #TODO assert not output.registered if output.ostream != nil: - output.ostream.close() + output.ostream.sclose() output.ostream = nil if handle.istream != nil: - handle.istream.close() + handle.istream.sclose() handle.istream = nil diff --git a/src/loader/response.nim b/src/loader/response.nim index 6b4ec64e..dbec92d2 100644 --- a/src/loader/response.nim +++ b/src/loader/response.nim @@ -1,4 +1,3 @@ -import std/streams import std/strutils import std/tables @@ -85,7 +84,7 @@ proc close*(response: Response) {.jsfunc.} = if response.unregisterFun != nil: response.unregisterFun() if response.body != nil: - response.body.close() + response.body.sclose() func getCharset*(this: Response; fallback: Charset): Charset = if "Content-Type" notin this.headers.table: diff --git a/src/local/client.nim b/src/local/client.nim index 29eaeb73..9fd6c23a 100644 --- a/src/local/client.nim +++ b/src/local/client.nim @@ -22,6 +22,8 @@ import html/formdata import html/xmlhttprequest import io/bufstream import io/bufwriter +import io/dynstream +import io/filestream import io/posixstream import io/promise import io/socketstream @@ -385,7 +387,7 @@ proc acceptBuffers(client: Client) = let fd = int(stream.source.fd) client.selector.unregister(fd) client.fdmap.del(fd) - stream.close() + stream.sclose() elif container.process != -1: # connecting to buffer process let i = pager.findProcMapItem(container.process) pager.procmap.del(i) @@ -393,7 +395,7 @@ proc acceptBuffers(client: Client) = # connecting to URL let stream = pager.connectingContainers[i].stream client.selector.unregister(stream.fd) - stream.close() + stream.sclose() pager.connectingContainers.del(i) let registerFun = proc(fd: int) = client.selector.unregister(fd) @@ -405,31 +407,32 @@ proc acceptBuffers(client: Client) = pager.alert("Error: failed to set up buffer") continue let key = pager.addLoaderClient(container.process, container.loaderConfig) - stream.withWriter w: + stream.withPacketWriter w: w.swrite(key) - let loader = pager.loader - if item.fdin != -1: - let outputId = item.istreamOutputId - if container.cacheId == -1: - (container.cacheId, container.cacheFile) = loader.addCacheFile(outputId, - loader.clientPid) - var outCacheId = container.cacheId - let pid = container.process - if item.fdout == item.fdin: - loader.shareCachedItem(container.cacheId, pid) - loader.resume(@[item.istreamOutputId]) + let loader = pager.loader + if item.fdin != -1: + let outputId = item.istreamOutputId + if container.cacheId == -1: + (container.cacheId, container.cacheFile) = + loader.addCacheFile(outputId, loader.clientPid) + var outCacheId = container.cacheId + let pid = container.process + if item.fdout == item.fdin: + loader.shareCachedItem(container.cacheId, pid) + loader.resume(@[item.istreamOutputId]) + else: + outCacheId = loader.addCacheFile(item.ostreamOutputId, pid).outputId + loader.resume(@[item.istreamOutputId, item.ostreamOutputId]) + w.swrite(outCacheId) else: - outCacheId = loader.addCacheFile(item.ostreamOutputId, pid).outputId - loader.resume(@[item.istreamOutputId, item.ostreamOutputId]) + # buffer is cloned, no need to cache anything + container.setCloneStream(stream, registerFun) + if item.fdin != -1: # pass down fdout + # must come after the previous block so the first packet is flushed stream.sendFileHandle(item.fdout) - stream.withWriter w: - w.swrite(outCacheId) discard close(item.fdout) container.setStream(stream, registerFun) - else: - # buffer is cloned, no need to cache anything - container.setCloneStream(stream, registerFun) let fd = int(stream.fd) client.fdmap[fd] = container client.selector.registerHandle(fd, {Read}, 0) @@ -466,14 +469,14 @@ proc handleRead(client: Client; fd: int) = if hadlf: client.console.err.write(prefix) if j - i > 0: - client.console.err.writeData(addr buffer[i], j - i) + client.console.err.write(buffer.toOpenArray(i, j - 1)) i = j hadlf = found except ErrorAgain: break if not hadlf: client.console.err.write('\n') - client.console.err.flush() + client.console.err.sflush() elif fd in client.loader.connecting: client.loader.onConnected(fd) client.runJSJobs() @@ -495,7 +498,7 @@ proc flushConsole*(client: Client) {.jsfunc.} = if client.console == nil: # hack for when client crashes before console has been initialized client.consoleWrapper = ConsoleWrapper( - console: newConsole(newFileStream(stderr)) + console: newConsole(newDynFileStream(stderr)) ) client.handleRead(client.forkserver.estream.fd) @@ -654,7 +657,7 @@ proc addConsole(pager: Pager; interactive: bool; clearFun, showFun, hideFun: let container = pager.readPipe0("text/plain", CHARSET_UNKNOWN, pipefd[0], url, ConsoleTitle, {}) let err = newPosixStream(pipefd[1]) - err.writeLine("Type (M-c) console.hide() to return to buffer mode.") + err.write("Type (M-c) console.hide() to return to buffer mode.\n") let console = newConsole(err, clearFun, showFun, hideFun) return ConsoleWrapper(console: console, container: container) else: @@ -673,7 +676,7 @@ proc clearConsole(client: Client) = pager.replace(client.consoleWrapper.container, replacement) client.consoleWrapper.container = replacement let console = client.consoleWrapper.console - console.err.close() + console.err.sclose() console.err = newPosixStream(pipefd[1]) proc dumpBuffers(client: Client) = diff --git a/src/local/container.nim b/src/local/container.nim index b5049edc..49526a2a 100644 --- a/src/local/container.nim +++ b/src/local/container.nim @@ -7,8 +7,8 @@ when defined(posix): import config/config import config/mimetypes +import io/dynstream import io/promise -import io/serialize import io/socketstream import js/javascript import js/jstypes @@ -1576,9 +1576,9 @@ func hoverImage(container: Container): string {.jsfget.} = proc handleCommand(container: Container) = var packetid, len: int - container.iface.stream.sread(len) - container.iface.stream.sread(packetid) - container.iface.resolve(packetid, len - slen(packetid)) + container.iface.stream.recvDataLoop(addr len, sizeof(len)) + container.iface.stream.recvDataLoop(addr packetid, sizeof(packetid)) + container.iface.resolve(packetid, len - sizeof(packetid)) proc startLoad(container: Container) = container.iface.load().then(proc(res: int) = diff --git a/src/local/pager.nim b/src/local/pager.nim index 5bd51fed..4105f08c 100644 --- a/src/local/pager.nim +++ b/src/local/pager.nim @@ -14,9 +14,10 @@ when defined(posix): import bindings/libregexp import config/config import config/mailcap +import io/bufreader +import io/dynstream import io/posixstream import io/promise -import io/serialize import io/socketstream import io/stdio import io/tempfile @@ -89,7 +90,7 @@ type LineDataDownload = ref object of LineData outputId: int - stream: Stream + stream: DynStream LineDataAuth = ref object of LineData url: URL @@ -1101,7 +1102,7 @@ proc saveTo(pager: Pager; data: LineDataDownload; path: string) = if pager.loader.redirectToFile(data.outputId, path): pager.alert("Saving file to " & path) pager.loader.resume(@[data.outputId]) - data.stream.close() + data.stream.sclose() pager.lineData = nil else: pager.ask("Failed to save to " & path & ". Retry?").then( @@ -1109,7 +1110,7 @@ proc saveTo(pager: Pager; data: LineDataDownload; path: string) = if x: pager.setLineEdit(lmDownload, path) else: - data.stream.close() + data.stream.sclose() pager.lineData = nil ) @@ -1170,7 +1171,7 @@ proc updateReadLine*(pager: Pager) = of lmCommand: pager.commandMode = false of lmDownload: let data = LineDataDownload(pager.lineData) - data.stream.close() + data.stream.sclose() else: discard pager.lineData = nil if lineedit.state in {lesCancel, lesFinish} and @@ -1299,7 +1300,7 @@ proc runMailcapReadPipe(pager: Pager; stream: SocketStream; cmd: string; # child process discard close(pipefdOut[0]) discard dup2(stream.fd, stdin.getFileHandle()) - stream.close() + stream.sclose() discard dup2(pipefdOut[1], stdout.getFileHandle()) closeStderr() discard close(pipefdOut[1]) @@ -1319,14 +1320,14 @@ proc runMailcapWritePipe(pager: Pager; stream: SocketStream; elif pid == 0: # child process discard dup2(stream.fd, stdin.getFileHandle()) - stream.close() + stream.sclose() if not needsterminal: closeStdout() closeStderr() myExec(cmd) else: # parent - stream.close() + stream.sclose() if needsterminal: var x: cint discard waitpid(pid, x, 0) @@ -1342,11 +1343,11 @@ proc writeToFile(istream: SocketStream; outpath: string): bool = if n == 0: break if ps.sendData(buffer.toOpenArray(0, n - 1)) < n: - ps.close() + ps.sclose() return false if n < buffer.len: break - ps.close() + ps.sclose() true # Save input in a file, run the command, and redirect its output to a @@ -1364,7 +1365,7 @@ proc runMailcapReadFile(pager: Pager; stream: SocketStream; if not stream.writeToFile(outpath): #TODO print error message quit(1) - stream.close() + stream.sclose() let ret = execCmd(cmd) discard tryRemoveFile(outpath) quit(ret) @@ -1395,12 +1396,12 @@ proc runMailcapWriteFile(pager: Pager; stream: SocketStream; if not stream.writeToFile(outpath): #TODO print error message (maybe in parent?) quit(1) - stream.close() + stream.sclose() let ret = execCmd(cmd) discard tryRemoveFile(outpath) quit(ret) # parent - stream.close() + stream.sclose() proc filterBuffer(pager: Pager; stream: SocketStream; cmd: string; ishtml: bool): CheckMailcapResult = @@ -1417,7 +1418,7 @@ proc filterBuffer(pager: Pager; stream: SocketStream; cmd: string; # child discard close(pipefd_out[0]) discard dup2(stream.fd, stdin.getFileHandle()) - stream.close() + stream.sclose() discard dup2(pipefd_out[1], stdout.getFileHandle()) closeStderr() discard close(pipefd_out[1]) @@ -1501,7 +1502,7 @@ proc checkMailcap(pager: Pager; container: Container; stream: SocketStream; var pipefdOut: array[2, cint] if pipe(pipefdOut) == -1: pager.alert("Error: failed to open pipe") - stream.close() # connect: false implies that we consumed the stream + stream.sclose() # connect: false implies that we consumed the stream break needsConnect let pid = if canpipe: pager.runMailcapReadPipe(stream, cmd, pipefdOut) @@ -1586,7 +1587,7 @@ proc connected(pager: Pager; container: Container; response: Response) = if response.status == 401: # unauthorized pager.setLineEdit(lmUsername) pager.lineData = LineDataAuth(url: container.url) - istream.close() + istream.sclose() return # This forces client to ask for confirmation before quitting. # (It checks a flag on container, because console buffers must not affect this @@ -1626,7 +1627,7 @@ proc connected(pager: Pager; container: Container; response: Response) = ) if mailcapRes.fdout != istream.fd: # istream has been redirected into a filter - istream.close() + istream.sclose() pager.procmap.add(ProcMapItem( container: container, fdout: FileHandle(mailcapRes.fdout), @@ -1650,17 +1651,18 @@ proc handleConnectingContainer*(pager: Pager; i: int) = let stream = item.stream case item.state of ccsBeforeResult: + var r = stream.initPacketReader() var res: int - stream.sread(res) + r.sread(res) if res == 0: - stream.sread(item.outputId) + r.sread(item.outputId) inc item.state container.loadinfo = "Connected to " & $container.url & ". Downloading..." pager.onSetLoadInfo(container) # continue else: var msg: string - stream.sread(msg) + r.sread(msg) if msg == "": msg = getLoaderErrorMessage(res) pager.fail(container, msg) @@ -1668,9 +1670,10 @@ proc handleConnectingContainer*(pager: Pager; i: int) = pager.connectingContainers.del(i) pager.selector.unregister(item.stream.fd) pager.loader.unregistered.add(item.stream.fd) - stream.close() + stream.sclose() of ccsBeforeStatus: - stream.sread(item.status) + var r = stream.initPacketReader() + r.sread(item.status) inc item.state # continue of ccsBeforeHeaders: @@ -1681,14 +1684,15 @@ proc handleConnectingContainer*(pager: Pager; i: int) = url: container.request.url, body: stream ) - stream.sread(response.headers) + var r = stream.initPacketReader() + r.sread(response.headers) # done pager.connectingContainers.del(i) pager.selector.unregister(item.stream.fd) pager.loader.unregistered.add(item.stream.fd) let redirect = response.getRedirect(container.request) if redirect != nil: - stream.close() + stream.sclose() pager.redirect(container, response, redirect) else: pager.connected(container, response) @@ -1698,7 +1702,7 @@ proc handleConnectingContainerError*(pager: Pager; i: int) = pager.fail(item.container, "loader died while loading") pager.selector.unregister(item.stream.fd) pager.loader.unregistered.add(item.stream.fd) - item.stream.close() + item.stream.sclose() pager.connectingContainers.del(i) proc handleEvent0(pager: Pager; container: Container; event: ContainerEvent): diff --git a/src/server/buffer.nim b/src/server/buffer.nim index e952d9aa..aa8a7fc7 100644 --- a/src/server/buffer.nim +++ b/src/server/buffer.nim @@ -7,7 +7,6 @@ import std/options import std/os import std/posix import std/selectors -import std/streams import std/tables import std/unicode @@ -30,9 +29,10 @@ import html/formdata as formdata_impl import io/bufreader import io/bufstream import io/bufwriter +import io/dynstream +import io/filestream import io/posixstream import io/promise -import io/serialize import io/serversocket import io/socketstream import js/fromjs @@ -106,7 +106,7 @@ type config: BufferConfig tasks: array[BufferCommand, int] #TODO this should have arguments hoverText: array[HoverType, string] - estream: Stream # error stream + estream: DynFileStream # error stream ssock: ServerSocket factory: CAtomFactory uastyle: CSSStylesheet @@ -167,11 +167,10 @@ proc cloneInterface*(stream: SocketStream, registerFun: proc(fd: int)): # We have just fork'ed the buffer process inside an interface function, # from which the new buffer is going to return as well. So we must also # consume the return value of the clone function, which is the pid 0. - var len: int + var r = stream.initPacketReader() var pid: int - stream.sread(len) - stream.sread(iface.packetid) - stream.sread(pid) + r.sread(iface.packetid) + r.sread(pid) return iface proc resolve*(iface: BufferInterface, packetid, len: int) = @@ -215,20 +214,8 @@ proc buildInterfaceProc(fun: NimNode, funid: string): tuple[fun, name: NimNode] for i in 0 ..< param.len - 2: let id2 = newIdentDefs(ident(param[i].strVal), param[^2]) params2.add(id2) - var len = ident"len" body.add(quote do: - var `len` = 0 - ) - for i in 2 ..< params2.len: - let s = params2[i][0] # sym e.g. url - body.add(quote do: - `len` += slen(`s`) - ) - body.add(quote do: - `len` += slen(BufferCommand.`nup`) - `len` += slen(`thisval`.packetid) - var writer {.inject.} = `thisval`.stream.initWriter() - writer.swrite(`len`) + var writer {.inject.} = `thisval`.stream.initWriter(writeLen = true) writer.swrite(BufferCommand.`nup`) writer.swrite(`thisval`.packetid) ) @@ -859,7 +846,7 @@ proc rewind(buffer: Buffer; offset: int; unregister = true): bool = if unregister: buffer.selector.unregister(buffer.fd) buffer.loader.unregistered.add(buffer.fd) - buffer.istream.close() + buffer.istream.sclose() buffer.istream = response.body buffer.istream.setBlocking(false) buffer.fd = response.body.fd @@ -951,7 +938,7 @@ proc clone*(buffer: Buffer, newurl: URL): int {.proxy.} = var ongoing: seq[OngoingData] = @[] for data in buffer.loader.ongoing.values: ongoing.add(data) - data.response.body.close() + data.response.body.sclose() buffer.loader.ongoing.clear() let myPid = getCurrentProcessId() for data in ongoing.mitems: @@ -970,7 +957,7 @@ proc clone*(buffer: Buffer, newurl: URL): int {.proxy.} = # the cache. (This also lets us skip suspend/resume in this case.) # We ignore errors; not much we can do with them here :/ discard buffer.rewind(buffer.bytesRead, unregister = false) - buffer.pstream.close() + buffer.pstream.sclose() let ssock = initServerSocket(myPid) buffer.ssock = ssock ps.write(char(0)) @@ -979,7 +966,8 @@ proc clone*(buffer: Buffer, newurl: URL): int {.proxy.} = it = 0 let socks = ssock.acceptSocketStream() buffer.loader.clientPid = myPid - socks.sread(buffer.loader.key) # get key for new buffer + # get key for new buffer + socks.recvDataLoop(buffer.loader.key) buffer.pstream = socks buffer.rfd = socks.fd buffer.selector.registerHandle(buffer.rfd, {Read}, 0) @@ -988,9 +976,9 @@ proc clone*(buffer: Buffer, newurl: URL): int {.proxy.} = discard close(pipefd[1]) # close write # We must wait for child to tee its ongoing streams. let ps = newPosixStream(pipefd[0]) - let c = ps.readChar() + let c = ps.sreadChar() assert c == char(0) - ps.close() + ps.sclose() buffer.loader.resume(ids) return pid @@ -1006,7 +994,8 @@ proc dispatchDOMContentLoadedEvent(buffer: Buffer) = if el.ctype == "DOMContentLoaded": let e = el.callback(event) if e.isErr: - ctx.writeException(buffer.estream) + buffer.estream.write(ctx.getExceptionStr()) + buffer.estream.sflush() called = true if called: buffer.do_reshape() @@ -1022,7 +1011,8 @@ proc dispatchLoadEvent(buffer: Buffer) = if el.ctype == "load": let e = el.callback(event) if e.isErr: - ctx.writeException(buffer.estream) + buffer.estream.write(ctx.getExceptionStr()) + buffer.estream.sflush() called = true let jsWindow = toJS(ctx, window) let jsonload = JS_GetPropertyStr(ctx, jsWindow, "onload") @@ -1052,7 +1042,8 @@ proc dispatchEvent(buffer: Buffer, ctype: string, elem: Element): tuple[ let e = el.callback(event) called = true if e.isErr: - ctx.writeException(buffer.estream) + buffer.estream.write(ctx.getExceptionStr()) + buffer.estream.sflush() if FLAG_STOP_IMMEDIATE_PROPAGATION in event.flags: stop = true break @@ -1082,7 +1073,7 @@ proc finishLoad(buffer: Buffer): EmptyPromise = buffer.cacheId = -1 buffer.fd = -1 buffer.outputId = -1 - buffer.istream.close() + buffer.istream.sclose() return buffer.loadResources() # Returns: @@ -1107,9 +1098,7 @@ proc hasTask(buffer: Buffer; cmd: BufferCommand): bool = proc resolveTask[T](buffer: Buffer; cmd: BufferCommand; res: T) = let packetid = buffer.tasks[cmd] assert packetid != 0 - let len = slen(buffer.tasks[cmd]) + slen(res) - buffer.pstream.withWriter w: - w.swrite(len) + buffer.pstream.withPacketWriter w: w.swrite(packetid) w.swrite(res) buffer.tasks[cmd] = 0 @@ -1183,7 +1172,7 @@ proc cancel*(buffer: Buffer): int {.proxy.} = for fd, data in buffer.loader.connecting: buffer.selector.unregister(fd) buffer.loader.unregistered.add(fd) - data.stream.close() + data.stream.sclose() buffer.loader.connecting.clear() for fd, data in buffer.loader.ongoing: data.response.unregisterFun() @@ -1194,7 +1183,7 @@ proc cancel*(buffer: Buffer): int {.proxy.} = buffer.fd = -1 buffer.cacheId = -1 buffer.outputId = -1 - buffer.istream.close() + buffer.istream.sclose() buffer.htmlParser.finish() buffer.document.readyState = rsInteractive buffer.state = bsLoaded @@ -1470,7 +1459,8 @@ proc evalJSURL(buffer: Buffer, url: URL): Opt[string] = let ctx = buffer.window.jsctx let ret = ctx.eval(scriptSource, $buffer.baseURL, JS_EVAL_TYPE_GLOBAL) if JS_IsException(ret): - ctx.writeException(buffer.estream) + buffer.estream.write(ctx.getExceptionStr()) + buffer.estream.sflush() return err() # error if JS_IsUndefined(ret): return err() # no need to navigate @@ -1762,16 +1752,12 @@ macro bufferDispatcher(funs: static ProxyMap; buffer: Buffer; var resolve = newStmtList() if rval == nil: resolve.add(quote do: - let len = slen(`packetid`) - buffer.pstream.withWriter w: - w.swrite(len) + buffer.pstream.withPacketWriter w: w.swrite(`packetid`) ) else: resolve.add(quote do: - let len = slen(`packetid`) + slen(`rval`) - buffer.pstream.withWriter w: - w.swrite(len) + buffer.pstream.withPacketWriter w: w.swrite(`packetid`) w.swrite(`rval`) ) @@ -1864,23 +1850,23 @@ proc runBuffer(buffer: Buffer) = buffer.loader.unregistered.setLen(0) proc cleanup(buffer: Buffer) = - buffer.pstream.close() + buffer.pstream.sclose() buffer.ssock.close() proc launchBuffer*(config: BufferConfig; url: URL; request: Request; attrs: WindowAttributes; ishtml: bool; charsetStack: seq[Charset]; loader: FileLoader; ssock: ServerSocket) = - let socks = ssock.acceptSocketStream() + let pstream = ssock.acceptSocketStream() let buffer = Buffer( attrs: attrs, config: config, - estream: newFileStream(stderr), + estream: newDynFileStream(stderr), ishtml: ishtml, loader: loader, needsBOMSniff: config.charsetOverride == CHARSET_UNKNOWN, - pstream: socks, + pstream: pstream, request: request, - rfd: socks.fd, + rfd: pstream.fd, selector: newSelector[int](), ssock: ssock, url: url, @@ -1889,10 +1875,11 @@ proc launchBuffer*(config: BufferConfig; url: URL; request: Request; outputId: -1 ) buffer.charset = buffer.charsetStack.pop() - socks.sread(buffer.loader.key) - let fd = socks.recvFileHandle() + var r = pstream.initPacketReader() + r.sread(buffer.loader.key) + r.sread(buffer.cacheId) + let fd = pstream.recvFileHandle() buffer.fd = fd - socks.sread(buffer.cacheId) buffer.istream = newPosixStream(fd) buffer.istream.setBlocking(false) buffer.selector.registerHandle(fd, {Read}, 0) diff --git a/src/server/forkserver.nim b/src/server/forkserver.nim index 60981213..f0285357 100644 --- a/src/server/forkserver.nim +++ b/src/server/forkserver.nim @@ -1,15 +1,15 @@ import std/options import std/os import std/posix -import std/streams import std/tables import config/config import io/bufreader import io/bufwriter +import io/dynstream import io/posixstream -import io/serialize import io/serversocket +import io/stdio import loader/loader import server/buffer import types/urimethodmap @@ -39,8 +39,9 @@ proc newFileLoader*(forkserver: ForkServer; config: LoaderConfig): FileLoader = forkserver.ostream.withPacketWriter w: w.swrite(fcForkLoader) w.swrite(config) + var r = forkserver.istream.initPacketReader() var process: int - forkserver.istream.sread(process) + r.sread(process) return FileLoader(process: process, clientPid: getCurrentProcessId()) proc loadForkServerConfig*(forkserver: ForkServer, config: Config) = @@ -64,9 +65,10 @@ proc forkBuffer*(forkserver: ForkServer; config: BufferConfig; url: URL; w.swrite(attrs) w.swrite(ishtml) w.swrite(charsetStack) + var r = forkserver.istream.initPacketReader() var bufferPid: int - forkserver.istream.sread(bufferPid) - bufferPid + r.sread(bufferPid) + return bufferPid proc trapSIGINT() = # trap SIGINT, so e.g. an external editor receiving an interrupt in the @@ -147,9 +149,9 @@ proc forkBuffer(ctx: var ForkServerContext; r: var BufferedReader): int = gssock.close() let ps = newPosixStream(pipefd[1]) ps.write(char(0)) - ps.close() - discard close(stdin.getFileHandle()) - discard close(stdout.getFileHandle()) + ps.sclose() + closeStdin() + closeStdout() let loader = FileLoader( process: loaderPid, clientPid: pid @@ -168,9 +170,9 @@ proc forkBuffer(ctx: var ForkServerContext; r: var BufferedReader): int = doAssert false discard close(pipefd[1]) # close write let ps = newPosixStream(pipefd[0]) - let c = ps.readChar() + let c = ps.sreadChar() assert c == char(0) - ps.close() + ps.sclose() ctx.children.add(pid) return pid @@ -195,14 +197,14 @@ proc runForkServer() = ctx.children.del(i) of fcForkBuffer: let r = ctx.forkBuffer(r) - ctx.ostream.withWriter w: + ctx.ostream.withPacketWriter w: w.swrite(r) of fcForkLoader: assert ctx.loaderPid == 0 var config: LoaderConfig r.sread(config) let pid = ctx.forkLoader(config) - ctx.ostream.withWriter w: + ctx.ostream.withPacketWriter w: w.swrite(pid) ctx.loaderPid = pid ctx.children.add(pid) @@ -211,12 +213,11 @@ proc runForkServer() = r.sread(config) set_cjk_ambiguous(config.ambiguous_double) SocketDirectory = config.tmpdir - ctx.ostream.flush() except EOFError: # EOF break - ctx.istream.close() - ctx.ostream.close() + ctx.istream.sclose() + ctx.ostream.sclose() # Clean up when the main process crashed. for child in ctx.children: discard kill(cint(child), cint(SIGTERM)) diff --git a/src/types/formdata.nim b/src/types/formdata.nim index 29817e54..b353b814 100644 --- a/src/types/formdata.nim +++ b/src/types/formdata.nim @@ -1,6 +1,7 @@ -import std/streams import std/strutils +import io/dynstream +import io/filestream import js/javascript import types/blob import utils/twtstr @@ -56,7 +57,7 @@ proc calcLength*(this: FormData): int = proc getContentType*(this: FormData): string = return "multipart/form-data; boundary=" & this.boundary -proc writeEntry*(stream: Stream, entry: FormDataEntry, boundary: string) = +proc writeEntry*(stream: DynStream; entry: FormDataEntry; boundary: string) = stream.write("--" & boundary & "\r\n") let name = percentEncode(entry.name, {'"', '\r', '\n'}) if entry.isstr: @@ -74,17 +75,17 @@ proc writeEntry*(stream: Stream, entry: FormDataEntry, boundary: string) = blob.ctype stream.write("Content-Type: " & ctype & "\r\n") if blob.isfile: - let fs = newFileStream(WebFile(blob).path) + let fs = newDynFileStream(WebFile(blob).path) if fs != nil: var buf {.noinit.}: array[4096, uint8] while true: - let n = fs.readData(addr buf[0], 4096) + let n = fs.recvData(addr buf[0], 4096) if n == 0: break - stream.writeData(addr buf[0], n) + stream.sendDataLoop(addr buf[0], n) if n < buf.len: break else: - stream.writeData(blob.buffer, int(blob.size)) + stream.sendDataLoop(blob.buffer, int(blob.size)) stream.write("\r\n") stream.write("\r\n") |