diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/html/dom.nim | 3 | ||||
-rw-r--r-- | src/html/env.nim | 17 | ||||
-rw-r--r-- | src/io/bufstream.nim | 3 | ||||
-rw-r--r-- | src/io/bufwriter.nim | 10 | ||||
-rw-r--r-- | src/io/dynstream.nim | 70 | ||||
-rw-r--r-- | src/io/filestream.nim | 34 | ||||
-rw-r--r-- | src/io/posixstream.nim | 17 | ||||
-rw-r--r-- | src/io/serialize.nim | 234 | ||||
-rw-r--r-- | src/io/socketstream.nim | 6 | ||||
-rw-r--r-- | src/js/console.nim | 9 | ||||
-rw-r--r-- | src/js/javascript.nim | 8 | ||||
-rw-r--r-- | src/js/timeout.nim | 16 | ||||
-rw-r--r-- | src/loader/cgi.nim | 4 | ||||
-rw-r--r-- | src/loader/loader.nim | 132 | ||||
-rw-r--r-- | src/loader/loaderhandle.nim | 11 | ||||
-rw-r--r-- | src/loader/response.nim | 3 | ||||
-rw-r--r-- | src/local/client.nim | 55 | ||||
-rw-r--r-- | src/local/container.nim | 8 | ||||
-rw-r--r-- | src/local/pager.nim | 54 | ||||
-rw-r--r-- | src/server/buffer.nim | 87 | ||||
-rw-r--r-- | src/server/forkserver.nim | 31 | ||||
-rw-r--r-- | src/types/formdata.nim | 13 |
22 files changed, 291 insertions, 534 deletions
diff --git a/src/html/dom.nim b/src/html/dom.nim index 41869a95..f0cc05c9 100644 --- a/src/html/dom.nim +++ b/src/html/dom.nim @@ -18,6 +18,7 @@ import img/bitmap import img/painter import img/path import img/png +import io/dynstream import io/promise import js/console import js/domexception @@ -3515,7 +3516,7 @@ proc fetchClassicScript(element: HTMLScriptElement, url: URL, element.onComplete(ScriptResult(t: RESULT_NULL)) return #TODO make this non-blocking somehow - let s = response.body.readAll() + let s = response.body.recvAll() let source = if cs in {CHARSET_UNKNOWN, CHARSET_UTF_8}: s.toValidUTF8() else: diff --git a/src/html/env.nim b/src/html/env.nim index 3ffff116..ebd9e83e 100644 --- a/src/html/env.nim +++ b/src/html/env.nim @@ -1,5 +1,4 @@ import std/selectors -import std/streams import bindings/quickjs import html/catom @@ -9,6 +8,7 @@ import html/event import html/formdata import html/script import html/xmlhttprequest +import io/filestream import io/promise import js/base64 import js/console @@ -34,7 +34,9 @@ proc appVersion(navigator: ptr Navigator): string {.jsfget.} = "5.0 (Windows)" proc platform(navigator: ptr Navigator): string {.jsfget.} = "Win32" proc product(navigator: ptr Navigator): string {.jsfget.} = "Gecko" proc productSub(navigator: ptr Navigator): string {.jsfget.} = "20100101" -proc userAgent(navigator: ptr Navigator): string {.jsfget.} = "chawan" #TODO TODO TODO this should be configurable +proc userAgent(navigator: ptr Navigator): string {.jsfget.} = + #TODO TODO TODO this should be configurable + "chawan" proc vendor(navigator: ptr Navigator): string {.jsfget.} = "" proc vendorSub(navigator: ptr Navigator): string {.jsfget.} = "" proc taintEnabled(navigator: ptr Navigator): bool {.jsfget.} = false @@ -153,7 +155,7 @@ proc getComputedStyle(window: Window, element: Element, #TODO implement this properly return ok(element.style) -proc addScripting*(window: Window, selector: Selector[int]) = +proc addScripting*(window: Window; selector: Selector[int]) = let rt = newJSRuntime() let ctx = rt.newJSContext() window.jsrt = rt @@ -166,11 +168,10 @@ proc addScripting*(window: Window, selector: Selector[int]) = evalJSFree = (proc(src, file: string) = let ret = window.jsctx.eval(src, file, JS_EVAL_TYPE_GLOBAL) if JS_IsException(ret): - let ss = newStringStream() - window.jsctx.writeException(ss) - ss.setPosition(0) window.console.log("Exception in document", $window.document.url, - ss.readAll()) + window.jsctx.getExceptionStr()) + else: + JS_FreeValue(ctx, ret) ) ) var global = JS_GetGlobalObject(ctx) @@ -200,7 +201,7 @@ proc runJSJobs*(window: Window) = proc newWindow*(scripting, images: bool, selector: Selector[int], attrs: WindowAttributes, factory: CAtomFactory, navigate: proc(url: URL) = nil, loader = none(FileLoader)): Window = - let err = newFileStream(stderr) + let err = newDynFileStream(stderr) let window = Window( attrs: attrs, console: newConsole(err), diff --git a/src/io/bufstream.nim b/src/io/bufstream.nim index 0558b61c..118b81e5 100644 --- a/src/io/bufstream.nim +++ b/src/io/bufstream.nim @@ -46,9 +46,8 @@ proc flushWrite*(s: BufStream): bool = return false proc newBufStream*(ps: PosixStream, registerFun: proc(fd: int)): BufStream = - result = BufStream( + return BufStream( source: ps, blocking: ps.blocking, registerFun: registerFun ) - result.addStreamIface() diff --git a/src/io/bufwriter.nim b/src/io/bufwriter.nim index cbee8b5b..75da4190 100644 --- a/src/io/bufwriter.nim +++ b/src/io/bufwriter.nim @@ -1,4 +1,5 @@ -# Write data to streams. +# Write data to streams in packets. +# Each packet is prefixed with its length as a pointer-sized integer. import std/options import std/sets @@ -53,13 +54,6 @@ proc deinit*(writer: var BufferedWriter) = writer.bufSize = 0 writer.bufLen = 0 -template withWriter*(stream: DynStream; w, body: untyped) = - block: - var w = stream.initWriter() - body - w.flush() - w.deinit() - template withPacketWriter*(stream: DynStream; w, body: untyped) = block: var w = stream.initWriter(writeLen = true) diff --git a/src/io/dynstream.nim b/src/io/dynstream.nim index 10db4c64..1b0f8807 100644 --- a/src/io/dynstream.nim +++ b/src/io/dynstream.nim @@ -1,7 +1,5 @@ -import std/streams - type - DynStream* = ref object of Stream #TODO should be of RootObj + DynStream* = ref object of RootObj isend*: bool blocking*: bool #TODO move to posixstream @@ -46,6 +44,19 @@ proc sendDataLoop*(s: DynStream; buffer: pointer; len: int) = if n == len: break +proc sendDataLoop*(s: DynStream; buffer: openArray[char]) {.inline.} = + s.sendDataLoop(unsafeAddr buffer[0], buffer.len) + +proc write*(s: DynStream; buffer: openArray[char]) {.inline.} = + s.sendDataLoop(buffer) + +proc write*(s: DynStream; c: char) {.inline.} = + s.sendDataLoop(unsafeAddr c, 1) + +proc sreadChar*(s: DynStream): char = + let n = s.recvData(addr result, 1) + assert n == 1 + proc recvDataLoop*(s: DynStream; buffer: pointer; len: int) = var n = 0 while true: @@ -53,47 +64,18 @@ proc recvDataLoop*(s: DynStream; buffer: pointer; len: int) = if n == len: break -proc dsClose(s: Stream) = - DynStream(s).sclose() +proc recvDataLoop*(s: DynStream; buffer: var openArray[uint8]) {.inline.} = + s.recvDataLoop(addr buffer[0], buffer.len) -proc dsReadData(s: Stream, buffer: pointer, len: int): int = - let s = DynStream(s) - assert len != 0 and s.blocking - result = 0 - while result < len: - let p = addr cast[ptr UncheckedArray[uint8]](buffer)[result] - let n = s.recvData(p, len - result) - if n == 0: - break - result += n - -proc dsWriteData(s: Stream, buffer: pointer, len: int) = - let s = DynStream(s) - assert len != 0 and s.blocking - discard s.sendData(buffer, len) - -proc dsReadLine(s: Stream, line: var string): bool = - let s = DynStream(s) - assert s.blocking - line = "" - var c: char +proc recvAll*(s: DynStream): string = + var buffer = newString(4096) + var idx = 0 while true: - if s.recvData(addr c, 1) == 0: - return false - if c == '\r': - if s.recvData(addr c, 1) == 0: - return false - if c == '\n': + let n = s.recvData(addr buffer[idx], buffer.len - idx) + if n == 0: break - line &= c - true - -proc dsAtEnd(s: Stream): bool = - return DynStream(s).isend - -proc addStreamIface*(s: DynStream) = - s.closeImpl = cast[typeof(s.closeImpl)](dsClose) - s.readDataImpl = cast[typeof(s.readDataImpl)](dsReadData) - s.writeDataImpl = cast[typeof(s.writeDataImpl)](dsWriteData) - s.readLineImpl = cast[typeof(s.readLineImpl)](dsReadLine) - s.atEndImpl = dsAtEnd + idx += n + if idx == buffer.len: + buffer.setLen(buffer.len + 4096) + buffer.setLen(idx) + return buffer diff --git a/src/io/filestream.nim b/src/io/filestream.nim new file mode 100644 index 00000000..b1b3a296 --- /dev/null +++ b/src/io/filestream.nim @@ -0,0 +1,34 @@ +import io/dynstream + +type + DynFileStream* = ref object of DynStream + file*: File + +method recvData*(s: DynFileStream; buffer: pointer; len: int): int = + let n = s.file.readBuffer(buffer, len) + if n == 0: + if unlikely(s.isend): + raise newException(EOFError, "eof") + s.isend = true + return n + +method sendData*(s: DynFileStream; buffer: pointer; len: int): int = + return s.file.writeBuffer(buffer, len) + +method seek*(s: DynFileStream; off: int) = + s.file.setFilePos(int64(off)) + +method sclose*(s: DynFileStream) = + s.file.close() + +method sflush*(s: DynFileStream) = + s.file.flushFile() + +proc newDynFileStream*(file: File): DynFileStream = + return DynFileStream(file: file, blocking: true) + +proc newDynFileStream*(path: string): DynFileStream = + var file: File + if file.open(path): + return newDynFileStream(path) + return nil diff --git a/src/io/posixstream.nim b/src/io/posixstream.nim index 0b06c572..bdae9d50 100644 --- a/src/io/posixstream.nim +++ b/src/io/posixstream.nim @@ -1,4 +1,3 @@ -# stdlib file handling is broken, so we use this instead of FileStream. import std/posix import io/dynstream @@ -53,24 +52,12 @@ proc sreadChar*(s: PosixStream): char = s.isend = true assert n == 1 -proc recvData*(s: PosixStream, buffer: var openArray[uint8]): int {.inline.} = - return s.recvData(addr buffer[0], buffer.len) - -proc recvData*(s: PosixStream, buffer: var openArray[char]): int {.inline.} = - return s.recvData(addr buffer[0], buffer.len) - method sendData*(s: PosixStream, buffer: pointer, len: int): int = let n = write(s.fd, buffer, len) if n < 0: raisePosixIOError() return n -proc sendData*(s: PosixStream, buffer: openArray[char]): int {.inline.} = - return s.sendData(unsafeAddr buffer[0], buffer.len) - -proc sendData*(s: PosixStream, buffer: openArray[uint8]): int {.inline.} = - return s.sendData(unsafeAddr buffer[0], buffer.len) - method setBlocking*(s: PosixStream, blocking: bool) {.base.} = s.blocking = blocking let ofl = fcntl(s.fd, F_GETFL, 0) @@ -87,9 +74,7 @@ method sclose*(s: PosixStream) = discard close(s.fd) proc newPosixStream*(fd: FileHandle): PosixStream = - let ps = PosixStream(fd: fd, blocking: true) - ps.addStreamIface() - return ps + return PosixStream(fd: fd, blocking: true) proc newPosixStream*(path: string, flags, mode: cint): PosixStream = let fd = open(cstring(path), flags, mode) diff --git a/src/io/serialize.nim b/src/io/serialize.nim deleted file mode 100644 index 0b54bea3..00000000 --- a/src/io/serialize.nim +++ /dev/null @@ -1,234 +0,0 @@ -# Write data to streams. - -import std/options -import std/sets -import std/streams -import std/tables - -import types/blob -import types/formdata -import types/url -import types/opt - -proc sread*(stream: Stream, n: var SomeNumber) -func slen*(n: SomeNumber): int - -proc sread*[T](stream: Stream, s: var set[T]) -func slen*[T](s: set[T]): int - -proc sread*[T: enum](stream: Stream, x: var T) -func slen*[T: enum](x: T): int - -proc sread*(stream: Stream, s: var string) -func slen*(s: string): int - -proc sread*(stream: Stream, b: var bool) -func slen*(b: bool): int - -func slen*(url: URL): int - -func slen*(tup: tuple): int - -proc sread*[I, T](stream: Stream, a: var array[I, T]) -func slen*[I, T](a: array[I, T]): int - -proc sread*(stream: Stream, s: var seq) -func slen*(s: seq): int - -proc sread*[U, V](stream: Stream, t: var Table[U, V]) -func slen*[U, V](t: Table[U, V]): int - -proc sread*(stream: Stream, obj: var object) -func slen*(obj: object): int - -proc sread*(stream: Stream, obj: var ref object) -func slen*(obj: ref object): int - -func slen*(part: FormDataEntry): int - -func slen*(blob: Blob): int - -proc sread*[T](stream: Stream, o: var Option[T]) -func slen*[T](o: Option[T]): int - -proc sread*[T, E](stream: Stream, o: var Result[T, E]) -func slen*[T, E](o: Result[T, E]): int - -proc sread*(stream: Stream, n: var SomeNumber) = - if stream.readData(addr n, sizeof(n)) < sizeof(n): - raise newException(EOFError, "eof") - -func slen*(n: SomeNumber): int = - return sizeof(n) - -proc sread*[T: enum](stream: Stream, x: var T) = - var i: int - stream.sread(i) - x = cast[T](i) - -func slen*[T: enum](x: T): int = - return sizeof(int) - -proc sread*[T](stream: Stream, s: var set[T]) = - var len: int - stream.sread(len) - for i in 0 ..< len: - var x: T - stream.sread(x) - s.incl(x) - -func slen*[T](s: set[T]): int = - result = slen(s.card) - for x in s: - result += slen(x) - -proc sread*(stream: Stream, s: var string) = - var len: int - stream.sread(len) - if len > 0: - s = newString(len) - prepareMutation(s) - if stream.readData(addr s[0], len) < len: - raise newException(EOFError, "eof") - else: - s = "" - -func slen*(s: string): int = - slen(s.len) + s.len - -proc sread*(stream: Stream, b: var bool) = - var n: uint8 - stream.sread(n) - if n == 1u8: - b = true - else: - assert n == 0u8 - b = false - -func slen*(b: bool): int = - return sizeof(uint8) - -func slen*(url: URL): int = - if url == nil: - return slen("") - return slen(url.serialize()) - -func slen*(tup: tuple): int = - for f in tup.fields: - result += slen(f) - -proc sread*[I, T](stream: Stream; a: var array[I, T]) = - for x in a.mitems: - stream.sread(x) - -func slen*[I, T](a: array[I, T]): int = - for x in a: - result += slen(x) - -proc sread*(stream: Stream, s: var seq) = - var len: int - stream.sread(len) - s.setLen(len) - for x in s.mitems: - stream.sread(x) - -func slen*(s: seq): int = - result = slen(s.len) - for x in s: - result += slen(x) - -proc sread*[U, V](stream: Stream, t: var Table[U, V]) = - var len: int - stream.sread(len) - for i in 0..<len: - var k: U - stream.sread(k) - var v: V - stream.sread(v) - t[k] = v - -func slen*[U, V](t: Table[U, V]): int = - result = slen(t.len) - for k, v in t: - result += slen(k) - result += slen(v) - -proc sread*(stream: Stream, obj: var object) = - for f in obj.fields: - stream.sread(f) - -func slen*(obj: object): int = - for f in obj.fields: - result += slen(f) - -proc sread*(stream: Stream, obj: var ref object) = - var n: bool - stream.sread(n) - if n: - new(obj) - stream.sread(obj[]) - -func slen*(obj: ref object): int = - result = slen(obj != nil) - if obj != nil: - result += slen(obj[]) - -func slen*(part: FormDataEntry): int = - result += slen(part.isstr) - result += slen(part.name) - result += slen(part.filename) - if part.isstr: - result += slen(part.svalue) - else: - result += slen(part.value) - -func slen*(blob: Blob): int = - result += slen(blob.isfile) - if blob.isfile: - result = slen(WebFile(blob).path) - else: - result += slen(blob.ctype) - result += slen(blob.size) - result += int(blob.size) #TODO ?? - -proc sread*[T](stream: Stream, o: var Option[T]) = - var x: bool - stream.sread(x) - if x: - var m: T - stream.sread(m) - o = some(m) - else: - o = none(T) - -func slen*[T](o: Option[T]): int = - result = slen(o.isSome) - if o.isSome: - result += slen(o.get) - -proc sread*[T, E](stream: Stream, o: var Result[T, E]) = - var x: bool - stream.sread(x) - if x: - when not (T is void): - var m: T - stream.sread(m) - o.ok(m) - else: - o.ok() - else: - when not (E is void): - var e: E - stream.sread(e) - o.err(e) - else: - o.err() - -func slen*[T, E](o: Result[T, E]): int = - result = slen(o.isSome) - if o.isSome: - when not (T is void): - result += slen(o.get) - else: - when not (E is void): - result += slen(o.error) diff --git a/src/io/socketstream.nim b/src/io/socketstream.nim index 78e7fb3e..3c8e6fa6 100644 --- a/src/io/socketstream.nim +++ b/src/io/socketstream.nim @@ -69,12 +69,11 @@ proc connectSocketStream*(path: string; blocking = true): SocketStream = if connect_unix_from_c(cint(sock.getFd()), cstring(path), cint(path.len)) != 0: raiseOSError(osLastError()) - result = SocketStream( + return SocketStream( source: sock, fd: cint(sock.getFd()), blocking: blocking ) - result.addStreamIface() proc connectSocketStream*(pid: int; blocking = true): SocketStream = @@ -88,9 +87,8 @@ proc acceptSocketStream*(ssock: ServerSocket, blocking = true): SocketStream = ssock.sock.accept(sock, inheritable = true) if not blocking: sock.getFd().setBlocking(false) - result = SocketStream( + return SocketStream( blocking: blocking, source: sock, fd: cint(sock.getFd()) ) - result.addStreamIface() diff --git a/src/js/console.nim b/src/js/console.nim index e0ac2ae2..d5e074e8 100644 --- a/src/js/console.nim +++ b/src/js/console.nim @@ -1,16 +1,15 @@ -import std/streams - +import io/dynstream import js/javascript type Console* = ref object - err*: Stream + err*: DynStream clearFun: proc() showFun: proc() hideFun: proc() jsDestructor(Console) -proc newConsole*(err: Stream; clearFun: proc() = nil; showFun: proc() = nil; +proc newConsole*(err: DynStream; clearFun: proc() = nil; showFun: proc() = nil; hideFun: proc() = nil): Console = return Console( err: err, @@ -25,7 +24,7 @@ proc log*(console: Console, ss: varargs[string]) {.jsfunc.} = if i != ss.high: console.err.write(' ') console.err.write('\n') - console.err.flush() + console.err.sflush() proc clear(console: Console) {.jsfunc.} = if console.clearFun != nil: diff --git a/src/js/javascript.nim b/src/js/javascript.nim index b4c68729..cbd0b205 100644 --- a/src/js/javascript.nim +++ b/src/js/javascript.nim @@ -44,11 +44,11 @@ import std/macros import std/options import std/sets -import std/streams import std/strutils import std/tables import std/unicode +import io/dynstream import js/error import js/fromjs import js/opaque @@ -208,11 +208,11 @@ proc getExceptionStr*(ctx: JSContext): string = JS_FreeValue(ctx, stack) JS_FreeValue(ctx, ex) -proc writeException*(ctx: JSContext, s: Stream) = +proc writeException*(ctx: JSContext, s: DynStream) = s.write(ctx.getExceptionStr()) - s.flush() + s.sflush() -proc runJSJobs*(rt: JSRuntime, err: Stream) = +proc runJSJobs*(rt: JSRuntime, err: DynStream) = while JS_IsJobPending(rt): var ctx: JSContext let r = JS_ExecutePendingJob(rt, addr ctx) diff --git a/src/js/timeout.nim b/src/js/timeout.nim index 8ef7c114..3c95dada 100644 --- a/src/js/timeout.nim +++ b/src/js/timeout.nim @@ -1,7 +1,7 @@ import std/selectors -import std/streams import std/tables +import io/dynstream import js/javascript type TimeoutState* = object @@ -12,10 +12,10 @@ type TimeoutState* = object interval_fdis: Table[int, int32] selector: Selector[int] #TODO would be better with void... jsctx: JSContext - err: Stream #TODO shouldn't be needed + err: DynStream #TODO shouldn't be needed evalJSFree: proc(src, file: string) #TODO ew -func newTimeoutState*(selector: Selector[int], jsctx: JSContext, err: Stream, +func newTimeoutState*(selector: Selector[int]; jsctx: JSContext; err: DynStream; evalJSFree: proc(src, file: string)): TimeoutState = return TimeoutState( selector: selector, @@ -28,7 +28,7 @@ func empty*(state: TimeoutState): bool = return state.timeouts.len == 0 and state.intervals.len == 0 #TODO varargs -proc setTimeout*[T: JSValue|string](state: var TimeoutState, handler: T, +proc setTimeout*[T: JSValue|string](state: var TimeoutState; handler: T; timeout = 0i32): int32 = let id = state.timeoutid inc state.timeoutid @@ -52,14 +52,14 @@ proc setTimeout*[T: JSValue|string](state: var TimeoutState, handler: T, ), fdi) return id -proc clearTimeout*(state: var TimeoutState, id: int32) = +proc clearTimeout*(state: var TimeoutState; id: int32) = if id in state.timeouts: let timeout = state.timeouts[id] state.selector.unregister(timeout.fdi) state.timeout_fdis.del(timeout.fdi) state.timeouts.del(id) -proc clearInterval*(state: var TimeoutState, id: int32) = +proc clearInterval*(state: var TimeoutState; id: int32) = if id in state.intervals: let interval = state.intervals[id] state.selector.unregister(interval.fdi) @@ -68,7 +68,7 @@ proc clearInterval*(state: var TimeoutState, id: int32) = state.intervals.del(id) #TODO varargs -proc setInterval*[T: JSValue|string](state: var TimeoutState, handler: T, +proc setInterval*[T: JSValue|string](state: var TimeoutState; handler: T; interval = 0i32): int32 = let id = state.timeoutid inc state.timeoutid @@ -91,7 +91,7 @@ proc setInterval*[T: JSValue|string](state: var TimeoutState, handler: T, ), fdi, fun) return id -proc runTimeoutFd*(state: var TimeoutState, fd: int): bool = +proc runTimeoutFd*(state: var TimeoutState; fd: int): bool = if fd in state.interval_fdis: state.intervals[state.interval_fdis[fd]].handler() return true diff --git a/src/loader/cgi.nim b/src/loader/cgi.nim index 067e784d..7d08f4a2 100644 --- a/src/loader/cgi.nim +++ b/src/loader/cgi.nim @@ -1,9 +1,9 @@ import std/options import std/os import std/posix -import std/streams import std/strutils +import io/dynstream import io/posixstream import io/stdio import loader/connecterror @@ -221,7 +221,7 @@ proc loadCGI*(handle: LoaderHandle; request: Request; cgiDir: seq[string]; let multipart = request.multipart.get for entry in multipart.entries: ps.writeEntry(entry, multipart.boundary) - ps.close() + ps.sclose() handle.parser = HeaderParser(headers: newHeaders()) handle.istream = newPosixStream(pipefd[0]) diff --git a/src/loader/loader.nim b/src/loader/loader.nim index b0c5b6ab..42271ea3 100644 --- a/src/loader/loader.nim +++ b/src/loader/loader.nim @@ -5,6 +5,7 @@ # C: Request # S: res (0 => success, _ => error) # if success: +# S: output ID # S: status code # S: headers # S: response body @@ -20,15 +21,14 @@ import std/options import std/os import std/posix import std/selectors -import std/streams import std/strutils import std/tables import io/bufreader import io/bufwriter +import io/dynstream import io/posixstream import io/promise -import io/serialize import io/serversocket import io/socketstream import io/tempfile @@ -211,12 +211,12 @@ proc redirectToFile(ctx: LoaderContext; output: OutputHandle; if output.currentBuffer != nil: let n = ps.sendData(output.currentBuffer, output.currentBufferIdx) if unlikely(n < output.currentBuffer.len - output.currentBufferIdx): - ps.close() + ps.sclose() return false for buffer in output.buffers: let n = ps.sendData(buffer) if unlikely(n < buffer.len): - ps.close() + ps.sclose() return false if output.parent != nil: output.parent.outputs.add(OutputHandle( @@ -312,7 +312,7 @@ proc loadStreamRegular(ctx: LoaderContext; handle, cachedHandle: LoaderHandle) = handle.outputs.del(i) for output in handle.outputs: if r == hrrUnregister: - output.ostream.close() + output.ostream.sclose() output.ostream = nil elif cachedHandle != nil: output.parent = cachedHandle @@ -324,10 +324,10 @@ proc loadStreamRegular(ctx: LoaderContext; handle, cachedHandle: LoaderHandle) = ctx.outputMap[output.ostream.fd] = output else: assert output.ostream.fd notin ctx.outputMap - output.ostream.close() + output.ostream.sclose() output.ostream = nil handle.outputs.setLen(0) - handle.istream.close() + handle.istream.sclose() handle.istream = nil proc loadStream(ctx: LoaderContext; client: ClientData; handle: LoaderHandle; @@ -473,13 +473,13 @@ proc addClient(ctx: LoaderContext; stream: SocketStream; r.sread(key) r.sread(pid) r.sread(config) - stream.withWriter w: + stream.withPacketWriter w: if pid in ctx.clientData or key == default(ClientKey): w.swrite(false) else: ctx.clientData[pid] = ClientData(pid: pid, key: key, config: config) w.swrite(true) - stream.close() + stream.sclose() proc cleanup(client: ClientData) = for it in client.cacheMap: @@ -495,7 +495,7 @@ proc removeClient(ctx: LoaderContext; stream: SocketStream; let client = ctx.clientData[pid] client.cleanup() ctx.clientData.del(pid) - stream.close() + stream.sclose() proc addCacheFile(ctx: LoaderContext; stream: SocketStream; r: var BufferedReader) = @@ -507,10 +507,10 @@ proc addCacheFile(ctx: LoaderContext; stream: SocketStream; assert output != nil let targetClient = ctx.clientData[targetPid] let (id, file) = ctx.addCacheFile(targetClient, output) - stream.withWriter w: + stream.withPacketWriter w: w.swrite(id) w.swrite(file) - stream.close() + stream.sclose() proc redirectToFile(ctx: LoaderContext; stream: SocketStream; r: var BufferedReader) = @@ -522,9 +522,9 @@ proc redirectToFile(ctx: LoaderContext; stream: SocketStream; var success = false if output != nil: success = ctx.redirectToFile(output, targetPath) - stream.withWriter w: + stream.withPacketWriter w: w.swrite(success) - stream.close() + stream.sclose() proc shareCachedItem(ctx: LoaderContext; stream: SocketStream; r: var BufferedReader) = @@ -542,14 +542,14 @@ proc shareCachedItem(ctx: LoaderContext; stream: SocketStream; let item = sourceClient.cacheMap[n] inc item.refc targetClient.cacheMap.add(item) - stream.close() + stream.sclose() proc passFd(ctx: LoaderContext; stream: SocketStream; r: var BufferedReader) = var id: string r.sread(id) let fd = stream.recvFileHandle() ctx.passedFdMap[id] = fd - stream.close() + stream.sclose() proc removeCachedItem(ctx: LoaderContext; stream: SocketStream; client: ClientData; r: var BufferedReader) = @@ -562,7 +562,7 @@ proc removeCachedItem(ctx: LoaderContext; stream: SocketStream; dec item.refc if item.refc == 0: discard unlink(cstring(item.path)) - stream.close() + stream.sclose() proc tee(ctx: LoaderContext; stream: SocketStream; client: ClientData; r: var BufferedReader) = @@ -576,13 +576,13 @@ proc tee(ctx: LoaderContext; stream: SocketStream; client: ClientData; if output != nil: let id = ctx.getOutputId() output.tee(stream, id, targetPid) - stream.withWriter w: + stream.withPacketWriter w: w.swrite(id) stream.setBlocking(false) else: - stream.withWriter w: + stream.withPacketWriter w: w.swrite(-1) - stream.close() + stream.sclose() proc suspend(ctx: LoaderContext; stream: SocketStream; client: ClientData; r: var BufferedReader) = @@ -619,12 +619,12 @@ proc acceptConnection(ctx: LoaderContext) = r.sread(key) if myPid notin ctx.clientData: # possibly already removed - stream.close() + stream.sclose() return let client = ctx.clientData[myPid] if client.key != key: # ditto - stream.close() + stream.sclose() return var cmd: LoaderCommand r.sread(cmd) @@ -661,7 +661,7 @@ proc acceptConnection(ctx: LoaderContext) = ctx.resume(stream, client, r) except ErrorBrokenPipe: # receiving end died while reading the file; give up. - stream.close() + stream.sclose() proc exitLoader(ctx: LoaderContext) = ctx.ssock.close() @@ -684,7 +684,7 @@ proc initLoaderContext(fd: cint; config: LoaderConfig): LoaderContext = # The server has been initialized, so the main process can resume execution. let ps = newPosixStream(fd) ps.write(char(0u8)) - ps.close() + ps.sclose() onSignal SIGTERM: discard sig gctx.exitLoader() @@ -710,11 +710,11 @@ proc initLoaderContext(fd: cint; config: LoaderConfig): LoaderContext = r.sread(key) r.sread(pid) r.sread(config) - stream.withWriter w: + stream.withPacketWriter w: w.swrite(true) ctx.pagerClient = ClientData(key: key, pid: pid, config: config) ctx.clientData[pid] = ctx.pagerClient - stream.close() + stream.sclose() # unblock main socket ctx.ssock.sock.getFd().setBlocking(false) # for CGI @@ -763,7 +763,7 @@ proc finishCycle(ctx: LoaderContext; unregRead: var seq[LoaderHandle]; if handle.istream != nil: ctx.selector.unregister(handle.istream.fd) ctx.handleMap.del(handle.istream.fd) - handle.istream.close() + handle.istream.sclose() handle.istream = nil if handle.parser != nil: handle.finishParse() @@ -776,7 +776,7 @@ proc finishCycle(ctx: LoaderContext; unregRead: var seq[LoaderHandle]; if output.registered: ctx.selector.unregister(output.ostream.fd) ctx.outputMap.del(output.ostream.fd) - output.ostream.close() + output.ostream.sclose() output.ostream = nil let handle = output.parent if handle != nil: # may be nil if from loadStream S_ISREG @@ -786,7 +786,7 @@ proc finishCycle(ctx: LoaderContext; unregRead: var seq[LoaderHandle]; # premature end of all output streams; kill istream too ctx.selector.unregister(handle.istream.fd) ctx.handleMap.del(handle.istream.fd) - handle.istream.close() + handle.istream.sclose() handle.istream = nil if handle.parser != nil: handle.finishParse() @@ -872,7 +872,7 @@ proc fetch*(loader: FileLoader; input: Request): FetchPromise = return promise proc reconnect*(loader: FileLoader; data: ConnectData) = - data.stream.close() + data.stream.sclose() let stream = loader.connect() stream.withLoaderPacketWriter loader, w: w.swrite(lcLoad) @@ -892,27 +892,24 @@ proc switchStream*(loader: FileLoader; data: var OngoingData; stream: SocketStream) = data.response.body = stream let fd = int(stream.fd) - let realCloseImpl = stream.closeImpl - stream.closeImpl = nil data.response.unregisterFun = proc() = loader.ongoing.del(fd) loader.unregistered.add(fd) loader.unregisterFun(fd) - realCloseImpl(stream) proc suspend*(loader: FileLoader; fds: seq[int]) = let stream = loader.connect() stream.withLoaderPacketWriter loader, w: w.swrite(lcSuspend) w.swrite(fds) - stream.close() + stream.sclose() proc resume*(loader: FileLoader; fds: seq[int]) = let stream = loader.connect() stream.withLoaderPacketWriter loader, w: w.swrite(lcResume) w.swrite(fds) - stream.close() + stream.sclose() proc tee*(loader: FileLoader; sourceId, targetPid: int): (SocketStream, int) = let stream = loader.connect() @@ -921,7 +918,8 @@ proc tee*(loader: FileLoader; sourceId, targetPid: int): (SocketStream, int) = w.swrite(sourceId) w.swrite(targetPid) var outputId: int - stream.sread(outputId) + var r = stream.initPacketReader() + r.sread(outputId) return (stream, outputId) proc addCacheFile*(loader: FileLoader; outputId, targetPid: int): @@ -933,10 +931,11 @@ proc addCacheFile*(loader: FileLoader; outputId, targetPid: int): w.swrite(lcAddCacheFile) w.swrite(outputId) w.swrite(targetPid) + var r = stream.initPacketReader() var outputId: int var cacheFile: string - stream.sread(outputId) - stream.sread(cacheFile) + r.sread(outputId) + r.sread(cacheFile) return (outputId, cacheFile) proc redirectToFile*(loader: FileLoader; outputId: int; targetPath: string): @@ -948,35 +947,33 @@ proc redirectToFile*(loader: FileLoader; outputId: int; targetPath: string): w.swrite(lcRedirectToFile) w.swrite(outputId) w.swrite(targetPath) - stream.sread(result) + var r = stream.initPacketReader() + r.sread(result) const BufferSize = 4096 -proc handleHeaders(response: Response; request: Request; stream: SocketStream) = - stream.sread(response.outputId) - stream.sread(response.status) - stream.sread(response.headers) - proc onConnected*(loader: FileLoader, fd: int) = let connectData = loader.connecting[fd] let stream = connectData.stream let promise = connectData.promise let request = connectData.request + var r = stream.initPacketReader() var res: int - stream.sread(res) + r.sread(res) # packet 1 let response = newResponse(res, request, stream) if res == 0: - response.handleHeaders(request, stream) + r.sread(response.outputId) # packet 1 + r = stream.initPacketReader() + r.sread(response.status) # packet 2 + r = stream.initPacketReader() + r.sread(response.headers) # packet 3 # Only a stream of the response body may arrive after this point. response.body = stream assert loader.unregisterFun != nil - let realCloseImpl = stream.closeImpl - stream.closeImpl = nil response.unregisterFun = proc() = loader.ongoing.del(fd) loader.unregistered.add(fd) loader.unregisterFun(fd) - realCloseImpl(stream) loader.ongoing[fd] = OngoingData( response: response, bodyRead: response.bodyRead @@ -987,7 +984,7 @@ proc onConnected*(loader: FileLoader, fd: int) = var msg: string # msg is discarded. #TODO maybe print if called from trusted code (i.e. global == client)? - stream.sread(msg) + r.sread(msg) # packet 1 loader.unregisterFun(fd) loader.unregistered.add(fd) let err = newTypeError("NetworkError when attempting to fetch resource") @@ -997,7 +994,7 @@ proc onConnected*(loader: FileLoader, fd: int) = proc onRead*(loader: FileLoader; fd: int) = loader.ongoing.withValue(fd, buffer): let response = buffer[].response - while not response.body.atEnd(): + while not response.body.isend: let olen = buffer[].buf.len try: buffer[].buf.setLen(olen + BufferSize) @@ -1008,7 +1005,7 @@ proc onRead*(loader: FileLoader; fd: int) = except ErrorAgain: buffer[].buf.setLen(olen) break - if response.body.atEnd(): + if response.body.isend: buffer[].bodyRead.resolve(buffer[].buf) buffer[].bodyRead = nil buffer[].buf = "" @@ -1019,10 +1016,10 @@ proc onError*(loader: FileLoader; fd: int) = let response = buffer[].response when defined(debug): var lbuf {.noinit.}: array[BufferSize, char] - if not response.body.atEnd(): + if not response.body.isend: let n = response.body.recvData(addr lbuf[0], lbuf.len) assert n == 0 - assert response.body.atEnd() + assert response.body.isend buffer[].bodyRead.resolve(buffer[].buf) buffer[].bodyRead = nil buffer[].buf = "" @@ -1032,15 +1029,21 @@ proc onError*(loader: FileLoader; fd: int) = proc doRequest*(loader: FileLoader; request: Request): Response = let stream = loader.startRequest(request) let response = Response(url: request.url) - stream.sread(response.res) + var r = stream.initPacketReader() + var res: int + r.sread(res) # packet 1 if response.res == 0: - response.handleHeaders(request, stream) + r.sread(response.outputId) # packet 1 + r = stream.initPacketReader() + r.sread(response.status) # packet 2 + r = stream.initPacketReader() + r.sread(response.headers) # packet 3 # Only a stream of the response body may arrive after this point. response.body = stream else: var msg: string - stream.sread(msg) - stream.close() + r.sread(msg) # packet 1 + stream.sclose() return response proc shareCachedItem*(loader: FileLoader; id, targetPid: int) = @@ -1051,7 +1054,7 @@ proc shareCachedItem*(loader: FileLoader; id, targetPid: int) = w.swrite(loader.clientPid) w.swrite(targetPid) w.swrite(id) - stream.close() + stream.sclose() proc passFd*(loader: FileLoader; id: string; fd: FileHandle) = let stream = loader.connect() @@ -1060,7 +1063,7 @@ proc passFd*(loader: FileLoader; id: string; fd: FileHandle) = w.swrite(lcPassFd) w.swrite(id) stream.sendFileHandle(fd) - stream.close() + stream.sclose() proc removeCachedItem*(loader: FileLoader; cacheId: int) = let stream = loader.connect() @@ -1068,7 +1071,7 @@ proc removeCachedItem*(loader: FileLoader; cacheId: int) = stream.withLoaderPacketWriter loader, w: w.swrite(lcRemoveCachedItem) w.swrite(cacheId) - stream.close() + stream.sclose() proc addClient*(loader: FileLoader; key: ClientKey; pid: int; config: LoaderClientConfig): bool = @@ -1078,8 +1081,9 @@ proc addClient*(loader: FileLoader; key: ClientKey; pid: int; w.swrite(key) w.swrite(pid) w.swrite(config) - stream.sread(result) - stream.close() + var r = stream.initPacketReader() + r.sread(result) + stream.sclose() proc removeClient*(loader: FileLoader; pid: int) = let stream = loader.connect() @@ -1087,4 +1091,4 @@ proc removeClient*(loader: FileLoader; pid: int) = stream.withLoaderPacketWriter loader, w: w.swrite(lcRemoveClient) w.swrite(pid) - stream.close() + stream.sclose() diff --git a/src/loader/loaderhandle.nim b/src/loader/loaderhandle.nim index 6e1b53a9..00f6f754 100644 --- a/src/loader/loaderhandle.nim +++ b/src/loader/loaderhandle.nim @@ -1,6 +1,5 @@ import std/deques import std/net -import std/streams import std/tables import io/bufwriter @@ -129,7 +128,7 @@ proc sendResult*(handle: LoaderHandle; res: int; msg = "") = let output = handle.output let blocking = output.ostream.blocking output.ostream.setBlocking(true) - output.ostream.withWriter w: + output.ostream.withPacketWriter w: w.swrite(res) if res == 0: # success assert msg == "" @@ -143,7 +142,7 @@ proc sendStatus*(handle: LoaderHandle; status: uint16) = inc handle.rstate let blocking = handle.output.ostream.blocking handle.output.ostream.setBlocking(true) - handle.output.ostream.withWriter w: + handle.output.ostream.withPacketWriter w: w.swrite(status) handle.output.ostream.setBlocking(blocking) @@ -152,7 +151,7 @@ proc sendHeaders*(handle: LoaderHandle; headers: Headers) = inc handle.rstate let blocking = handle.output.ostream.blocking handle.output.ostream.setBlocking(true) - handle.output.ostream.withWriter w: + handle.output.ostream.withPacketWriter w: w.swrite(headers) handle.output.ostream.setBlocking(blocking) @@ -169,8 +168,8 @@ proc close*(handle: LoaderHandle) = for output in handle.outputs: #TODO assert not output.registered if output.ostream != nil: - output.ostream.close() + output.ostream.sclose() output.ostream = nil if handle.istream != nil: - handle.istream.close() + handle.istream.sclose() handle.istream = nil diff --git a/src/loader/response.nim b/src/loader/response.nim index 6b4ec64e..dbec92d2 100644 --- a/src/loader/response.nim +++ b/src/loader/response.nim @@ -1,4 +1,3 @@ -import std/streams import std/strutils import std/tables @@ -85,7 +84,7 @@ proc close*(response: Response) {.jsfunc.} = if response.unregisterFun != nil: response.unregisterFun() if response.body != nil: - response.body.close() + response.body.sclose() func getCharset*(this: Response; fallback: Charset): Charset = if "Content-Type" notin this.headers.table: diff --git a/src/local/client.nim b/src/local/client.nim index 29eaeb73..9fd6c23a 100644 --- a/src/local/client.nim +++ b/src/local/client.nim @@ -22,6 +22,8 @@ import html/formdata import html/xmlhttprequest import io/bufstream import io/bufwriter +import io/dynstream +import io/filestream import io/posixstream import io/promise import io/socketstream @@ -385,7 +387,7 @@ proc acceptBuffers(client: Client) = let fd = int(stream.source.fd) client.selector.unregister(fd) client.fdmap.del(fd) - stream.close() + stream.sclose() elif container.process != -1: # connecting to buffer process let i = pager.findProcMapItem(container.process) pager.procmap.del(i) @@ -393,7 +395,7 @@ proc acceptBuffers(client: Client) = # connecting to URL let stream = pager.connectingContainers[i].stream client.selector.unregister(stream.fd) - stream.close() + stream.sclose() pager.connectingContainers.del(i) let registerFun = proc(fd: int) = client.selector.unregister(fd) @@ -405,31 +407,32 @@ proc acceptBuffers(client: Client) = pager.alert("Error: failed to set up buffer") continue let key = pager.addLoaderClient(container.process, container.loaderConfig) - stream.withWriter w: + stream.withPacketWriter w: w.swrite(key) - let loader = pager.loader - if item.fdin != -1: - let outputId = item.istreamOutputId - if container.cacheId == -1: - (container.cacheId, container.cacheFile) = loader.addCacheFile(outputId, - loader.clientPid) - var outCacheId = container.cacheId - let pid = container.process - if item.fdout == item.fdin: - loader.shareCachedItem(container.cacheId, pid) - loader.resume(@[item.istreamOutputId]) + let loader = pager.loader + if item.fdin != -1: + let outputId = item.istreamOutputId + if container.cacheId == -1: + (container.cacheId, container.cacheFile) = + loader.addCacheFile(outputId, loader.clientPid) + var outCacheId = container.cacheId + let pid = container.process + if item.fdout == item.fdin: + loader.shareCachedItem(container.cacheId, pid) + loader.resume(@[item.istreamOutputId]) + else: + outCacheId = loader.addCacheFile(item.ostreamOutputId, pid).outputId + loader.resume(@[item.istreamOutputId, item.ostreamOutputId]) + w.swrite(outCacheId) else: - outCacheId = loader.addCacheFile(item.ostreamOutputId, pid).outputId - loader.resume(@[item.istreamOutputId, item.ostreamOutputId]) + # buffer is cloned, no need to cache anything + container.setCloneStream(stream, registerFun) + if item.fdin != -1: # pass down fdout + # must come after the previous block so the first packet is flushed stream.sendFileHandle(item.fdout) - stream.withWriter w: - w.swrite(outCacheId) discard close(item.fdout) container.setStream(stream, registerFun) - else: - # buffer is cloned, no need to cache anything - container.setCloneStream(stream, registerFun) let fd = int(stream.fd) client.fdmap[fd] = container client.selector.registerHandle(fd, {Read}, 0) @@ -466,14 +469,14 @@ proc handleRead(client: Client; fd: int) = if hadlf: client.console.err.write(prefix) if j - i > 0: - client.console.err.writeData(addr buffer[i], j - i) + client.console.err.write(buffer.toOpenArray(i, j - 1)) i = j hadlf = found except ErrorAgain: break if not hadlf: client.console.err.write('\n') - client.console.err.flush() + client.console.err.sflush() elif fd in client.loader.connecting: client.loader.onConnected(fd) client.runJSJobs() @@ -495,7 +498,7 @@ proc flushConsole*(client: Client) {.jsfunc.} = if client.console == nil: # hack for when client crashes before console has been initialized client.consoleWrapper = ConsoleWrapper( - console: newConsole(newFileStream(stderr)) + console: newConsole(newDynFileStream(stderr)) ) client.handleRead(client.forkserver.estream.fd) @@ -654,7 +657,7 @@ proc addConsole(pager: Pager; interactive: bool; clearFun, showFun, hideFun: let container = pager.readPipe0("text/plain", CHARSET_UNKNOWN, pipefd[0], url, ConsoleTitle, {}) let err = newPosixStream(pipefd[1]) - err.writeLine("Type (M-c) console.hide() to return to buffer mode.") + err.write("Type (M-c) console.hide() to return to buffer mode.\n") let console = newConsole(err, clearFun, showFun, hideFun) return ConsoleWrapper(console: console, container: container) else: @@ -673,7 +676,7 @@ proc clearConsole(client: Client) = pager.replace(client.consoleWrapper.container, replacement) client.consoleWrapper.container = replacement let console = client.consoleWrapper.console - console.err.close() + console.err.sclose() console.err = newPosixStream(pipefd[1]) proc dumpBuffers(client: Client) = diff --git a/src/local/container.nim b/src/local/container.nim index b5049edc..49526a2a 100644 --- a/src/local/container.nim +++ b/src/local/container.nim @@ -7,8 +7,8 @@ when defined(posix): import config/config import config/mimetypes +import io/dynstream import io/promise -import io/serialize import io/socketstream import js/javascript import js/jstypes @@ -1576,9 +1576,9 @@ func hoverImage(container: Container): string {.jsfget.} = proc handleCommand(container: Container) = var packetid, len: int - container.iface.stream.sread(len) - container.iface.stream.sread(packetid) - container.iface.resolve(packetid, len - slen(packetid)) + container.iface.stream.recvDataLoop(addr len, sizeof(len)) + container.iface.stream.recvDataLoop(addr packetid, sizeof(packetid)) + container.iface.resolve(packetid, len - sizeof(packetid)) proc startLoad(container: Container) = container.iface.load().then(proc(res: int) = diff --git a/src/local/pager.nim b/src/local/pager.nim index 5bd51fed..4105f08c 100644 --- a/src/local/pager.nim +++ b/src/local/pager.nim @@ -14,9 +14,10 @@ when defined(posix): import bindings/libregexp import config/config import config/mailcap +import io/bufreader +import io/dynstream import io/posixstream import io/promise -import io/serialize import io/socketstream import io/stdio import io/tempfile @@ -89,7 +90,7 @@ type LineDataDownload = ref object of LineData outputId: int - stream: Stream + stream: DynStream LineDataAuth = ref object of LineData url: URL @@ -1101,7 +1102,7 @@ proc saveTo(pager: Pager; data: LineDataDownload; path: string) = if pager.loader.redirectToFile(data.outputId, path): pager.alert("Saving file to " & path) pager.loader.resume(@[data.outputId]) - data.stream.close() + data.stream.sclose() pager.lineData = nil else: pager.ask("Failed to save to " & path & ". Retry?").then( @@ -1109,7 +1110,7 @@ proc saveTo(pager: Pager; data: LineDataDownload; path: string) = if x: pager.setLineEdit(lmDownload, path) else: - data.stream.close() + data.stream.sclose() pager.lineData = nil ) @@ -1170,7 +1171,7 @@ proc updateReadLine*(pager: Pager) = of lmCommand: pager.commandMode = false of lmDownload: let data = LineDataDownload(pager.lineData) - data.stream.close() + data.stream.sclose() else: discard pager.lineData = nil if lineedit.state in {lesCancel, lesFinish} and @@ -1299,7 +1300,7 @@ proc runMailcapReadPipe(pager: Pager; stream: SocketStream; cmd: string; # child process discard close(pipefdOut[0]) discard dup2(stream.fd, stdin.getFileHandle()) - stream.close() + stream.sclose() discard dup2(pipefdOut[1], stdout.getFileHandle()) closeStderr() discard close(pipefdOut[1]) @@ -1319,14 +1320,14 @@ proc runMailcapWritePipe(pager: Pager; stream: SocketStream; elif pid == 0: # child process discard dup2(stream.fd, stdin.getFileHandle()) - stream.close() + stream.sclose() if not needsterminal: closeStdout() closeStderr() myExec(cmd) else: # parent - stream.close() + stream.sclose() if needsterminal: var x: cint discard waitpid(pid, x, 0) @@ -1342,11 +1343,11 @@ proc writeToFile(istream: SocketStream; outpath: string): bool = if n == 0: break if ps.sendData(buffer.toOpenArray(0, n - 1)) < n: - ps.close() + ps.sclose() return false if n < buffer.len: break - ps.close() + ps.sclose() true # Save input in a file, run the command, and redirect its output to a @@ -1364,7 +1365,7 @@ proc runMailcapReadFile(pager: Pager; stream: SocketStream; if not stream.writeToFile(outpath): #TODO print error message quit(1) - stream.close() + stream.sclose() let ret = execCmd(cmd) discard tryRemoveFile(outpath) quit(ret) @@ -1395,12 +1396,12 @@ proc runMailcapWriteFile(pager: Pager; stream: SocketStream; if not stream.writeToFile(outpath): #TODO print error message (maybe in parent?) quit(1) - stream.close() + stream.sclose() let ret = execCmd(cmd) discard tryRemoveFile(outpath) quit(ret) # parent - stream.close() + stream.sclose() proc filterBuffer(pager: Pager; stream: SocketStream; cmd: string; ishtml: bool): CheckMailcapResult = @@ -1417,7 +1418,7 @@ proc filterBuffer(pager: Pager; stream: SocketStream; cmd: string; # child discard close(pipefd_out[0]) discard dup2(stream.fd, stdin.getFileHandle()) - stream.close() + stream.sclose() discard dup2(pipefd_out[1], stdout.getFileHandle()) closeStderr() discard close(pipefd_out[1]) @@ -1501,7 +1502,7 @@ proc checkMailcap(pager: Pager; container: Container; stream: SocketStream; var pipefdOut: array[2, cint] if pipe(pipefdOut) == -1: pager.alert("Error: failed to open pipe") - stream.close() # connect: false implies that we consumed the stream + stream.sclose() # connect: false implies that we consumed the stream break needsConnect let pid = if canpipe: pager.runMailcapReadPipe(stream, cmd, pipefdOut) @@ -1586,7 +1587,7 @@ proc connected(pager: Pager; container: Container; response: Response) = if response.status == 401: # unauthorized pager.setLineEdit(lmUsername) pager.lineData = LineDataAuth(url: container.url) - istream.close() + istream.sclose() return # This forces client to ask for confirmation before quitting. # (It checks a flag on container, because console buffers must not affect this @@ -1626,7 +1627,7 @@ proc connected(pager: Pager; container: Container; response: Response) = ) if mailcapRes.fdout != istream.fd: # istream has been redirected into a filter - istream.close() + istream.sclose() pager.procmap.add(ProcMapItem( container: container, fdout: FileHandle(mailcapRes.fdout), @@ -1650,17 +1651,18 @@ proc handleConnectingContainer*(pager: Pager; i: int) = let stream = item.stream case item.state of ccsBeforeResult: + var r = stream.initPacketReader() var res: int - stream.sread(res) + r.sread(res) if res == 0: - stream.sread(item.outputId) + r.sread(item.outputId) inc item.state container.loadinfo = "Connected to " & $container.url & ". Downloading..." pager.onSetLoadInfo(container) # continue else: var msg: string - stream.sread(msg) + r.sread(msg) if msg == "": msg = getLoaderErrorMessage(res) pager.fail(container, msg) @@ -1668,9 +1670,10 @@ proc handleConnectingContainer*(pager: Pager; i: int) = pager.connectingContainers.del(i) pager.selector.unregister(item.stream.fd) pager.loader.unregistered.add(item.stream.fd) - stream.close() + stream.sclose() of ccsBeforeStatus: - stream.sread(item.status) + var r = stream.initPacketReader() + r.sread(item.status) inc item.state # continue of ccsBeforeHeaders: @@ -1681,14 +1684,15 @@ proc handleConnectingContainer*(pager: Pager; i: int) = url: container.request.url, body: stream ) - stream.sread(response.headers) + var r = stream.initPacketReader() + r.sread(response.headers) # done pager.connectingContainers.del(i) pager.selector.unregister(item.stream.fd) pager.loader.unregistered.add(item.stream.fd) let redirect = response.getRedirect(container.request) if redirect != nil: - stream.close() + stream.sclose() pager.redirect(container, response, redirect) else: pager.connected(container, response) @@ -1698,7 +1702,7 @@ proc handleConnectingContainerError*(pager: Pager; i: int) = pager.fail(item.container, "loader died while loading") pager.selector.unregister(item.stream.fd) pager.loader.unregistered.add(item.stream.fd) - item.stream.close() + item.stream.sclose() pager.connectingContainers.del(i) proc handleEvent0(pager: Pager; container: Container; event: ContainerEvent): diff --git a/src/server/buffer.nim b/src/server/buffer.nim index e952d9aa..aa8a7fc7 100644 --- a/src/server/buffer.nim +++ b/src/server/buffer.nim @@ -7,7 +7,6 @@ import std/options import std/os import std/posix import std/selectors -import std/streams import std/tables import std/unicode @@ -30,9 +29,10 @@ import html/formdata as formdata_impl import io/bufreader import io/bufstream import io/bufwriter +import io/dynstream +import io/filestream import io/posixstream import io/promise -import io/serialize import io/serversocket import io/socketstream import js/fromjs @@ -106,7 +106,7 @@ type config: BufferConfig tasks: array[BufferCommand, int] #TODO this should have arguments hoverText: array[HoverType, string] - estream: Stream # error stream + estream: DynFileStream # error stream ssock: ServerSocket factory: CAtomFactory uastyle: CSSStylesheet @@ -167,11 +167,10 @@ proc cloneInterface*(stream: SocketStream, registerFun: proc(fd: int)): # We have just fork'ed the buffer process inside an interface function, # from which the new buffer is going to return as well. So we must also # consume the return value of the clone function, which is the pid 0. - var len: int + var r = stream.initPacketReader() var pid: int - stream.sread(len) - stream.sread(iface.packetid) - stream.sread(pid) + r.sread(iface.packetid) + r.sread(pid) return iface proc resolve*(iface: BufferInterface, packetid, len: int) = @@ -215,20 +214,8 @@ proc buildInterfaceProc(fun: NimNode, funid: string): tuple[fun, name: NimNode] for i in 0 ..< param.len - 2: let id2 = newIdentDefs(ident(param[i].strVal), param[^2]) params2.add(id2) - var len = ident"len" body.add(quote do: - var `len` = 0 - ) - for i in 2 ..< params2.len: - let s = params2[i][0] # sym e.g. url - body.add(quote do: - `len` += slen(`s`) - ) - body.add(quote do: - `len` += slen(BufferCommand.`nup`) - `len` += slen(`thisval`.packetid) - var writer {.inject.} = `thisval`.stream.initWriter() - writer.swrite(`len`) + var writer {.inject.} = `thisval`.stream.initWriter(writeLen = true) writer.swrite(BufferCommand.`nup`) writer.swrite(`thisval`.packetid) ) @@ -859,7 +846,7 @@ proc rewind(buffer: Buffer; offset: int; unregister = true): bool = if unregister: buffer.selector.unregister(buffer.fd) buffer.loader.unregistered.add(buffer.fd) - buffer.istream.close() + buffer.istream.sclose() buffer.istream = response.body buffer.istream.setBlocking(false) buffer.fd = response.body.fd @@ -951,7 +938,7 @@ proc clone*(buffer: Buffer, newurl: URL): int {.proxy.} = var ongoing: seq[OngoingData] = @[] for data in buffer.loader.ongoing.values: ongoing.add(data) - data.response.body.close() + data.response.body.sclose() buffer.loader.ongoing.clear() let myPid = getCurrentProcessId() for data in ongoing.mitems: @@ -970,7 +957,7 @@ proc clone*(buffer: Buffer, newurl: URL): int {.proxy.} = # the cache. (This also lets us skip suspend/resume in this case.) # We ignore errors; not much we can do with them here :/ discard buffer.rewind(buffer.bytesRead, unregister = false) - buffer.pstream.close() + buffer.pstream.sclose() let ssock = initServerSocket(myPid) buffer.ssock = ssock ps.write(char(0)) @@ -979,7 +966,8 @@ proc clone*(buffer: Buffer, newurl: URL): int {.proxy.} = it = 0 let socks = ssock.acceptSocketStream() buffer.loader.clientPid = myPid - socks.sread(buffer.loader.key) # get key for new buffer + # get key for new buffer + socks.recvDataLoop(buffer.loader.key) buffer.pstream = socks buffer.rfd = socks.fd buffer.selector.registerHandle(buffer.rfd, {Read}, 0) @@ -988,9 +976,9 @@ proc clone*(buffer: Buffer, newurl: URL): int {.proxy.} = discard close(pipefd[1]) # close write # We must wait for child to tee its ongoing streams. let ps = newPosixStream(pipefd[0]) - let c = ps.readChar() + let c = ps.sreadChar() assert c == char(0) - ps.close() + ps.sclose() buffer.loader.resume(ids) return pid @@ -1006,7 +994,8 @@ proc dispatchDOMContentLoadedEvent(buffer: Buffer) = if el.ctype == "DOMContentLoaded": let e = el.callback(event) if e.isErr: - ctx.writeException(buffer.estream) + buffer.estream.write(ctx.getExceptionStr()) + buffer.estream.sflush() called = true if called: buffer.do_reshape() @@ -1022,7 +1011,8 @@ proc dispatchLoadEvent(buffer: Buffer) = if el.ctype == "load": let e = el.callback(event) if e.isErr: - ctx.writeException(buffer.estream) + buffer.estream.write(ctx.getExceptionStr()) + buffer.estream.sflush() called = true let jsWindow = toJS(ctx, window) let jsonload = JS_GetPropertyStr(ctx, jsWindow, "onload") @@ -1052,7 +1042,8 @@ proc dispatchEvent(buffer: Buffer, ctype: string, elem: Element): tuple[ let e = el.callback(event) called = true if e.isErr: - ctx.writeException(buffer.estream) + buffer.estream.write(ctx.getExceptionStr()) + buffer.estream.sflush() if FLAG_STOP_IMMEDIATE_PROPAGATION in event.flags: stop = true break @@ -1082,7 +1073,7 @@ proc finishLoad(buffer: Buffer): EmptyPromise = buffer.cacheId = -1 buffer.fd = -1 buffer.outputId = -1 - buffer.istream.close() + buffer.istream.sclose() return buffer.loadResources() # Returns: @@ -1107,9 +1098,7 @@ proc hasTask(buffer: Buffer; cmd: BufferCommand): bool = proc resolveTask[T](buffer: Buffer; cmd: BufferCommand; res: T) = let packetid = buffer.tasks[cmd] assert packetid != 0 - let len = slen(buffer.tasks[cmd]) + slen(res) - buffer.pstream.withWriter w: - w.swrite(len) + buffer.pstream.withPacketWriter w: w.swrite(packetid) w.swrite(res) buffer.tasks[cmd] = 0 @@ -1183,7 +1172,7 @@ proc cancel*(buffer: Buffer): int {.proxy.} = for fd, data in buffer.loader.connecting: buffer.selector.unregister(fd) buffer.loader.unregistered.add(fd) - data.stream.close() + data.stream.sclose() buffer.loader.connecting.clear() for fd, data in buffer.loader.ongoing: data.response.unregisterFun() @@ -1194,7 +1183,7 @@ proc cancel*(buffer: Buffer): int {.proxy.} = buffer.fd = -1 buffer.cacheId = -1 buffer.outputId = -1 - buffer.istream.close() + buffer.istream.sclose() buffer.htmlParser.finish() buffer.document.readyState = rsInteractive buffer.state = bsLoaded @@ -1470,7 +1459,8 @@ proc evalJSURL(buffer: Buffer, url: URL): Opt[string] = let ctx = buffer.window.jsctx let ret = ctx.eval(scriptSource, $buffer.baseURL, JS_EVAL_TYPE_GLOBAL) if JS_IsException(ret): - ctx.writeException(buffer.estream) + buffer.estream.write(ctx.getExceptionStr()) + buffer.estream.sflush() return err() # error if JS_IsUndefined(ret): return err() # no need to navigate @@ -1762,16 +1752,12 @@ macro bufferDispatcher(funs: static ProxyMap; buffer: Buffer; var resolve = newStmtList() if rval == nil: resolve.add(quote do: - let len = slen(`packetid`) - buffer.pstream.withWriter w: - w.swrite(len) + buffer.pstream.withPacketWriter w: w.swrite(`packetid`) ) else: resolve.add(quote do: - let len = slen(`packetid`) + slen(`rval`) - buffer.pstream.withWriter w: - w.swrite(len) + buffer.pstream.withPacketWriter w: w.swrite(`packetid`) w.swrite(`rval`) ) @@ -1864,23 +1850,23 @@ proc runBuffer(buffer: Buffer) = buffer.loader.unregistered.setLen(0) proc cleanup(buffer: Buffer) = - buffer.pstream.close() + buffer.pstream.sclose() buffer.ssock.close() proc launchBuffer*(config: BufferConfig; url: URL; request: Request; attrs: WindowAttributes; ishtml: bool; charsetStack: seq[Charset]; loader: FileLoader; ssock: ServerSocket) = - let socks = ssock.acceptSocketStream() + let pstream = ssock.acceptSocketStream() let buffer = Buffer( attrs: attrs, config: config, - estream: newFileStream(stderr), + estream: newDynFileStream(stderr), ishtml: ishtml, loader: loader, needsBOMSniff: config.charsetOverride == CHARSET_UNKNOWN, - pstream: socks, + pstream: pstream, request: request, - rfd: socks.fd, + rfd: pstream.fd, selector: newSelector[int](), ssock: ssock, url: url, @@ -1889,10 +1875,11 @@ proc launchBuffer*(config: BufferConfig; url: URL; request: Request; outputId: -1 ) buffer.charset = buffer.charsetStack.pop() - socks.sread(buffer.loader.key) - let fd = socks.recvFileHandle() + var r = pstream.initPacketReader() + r.sread(buffer.loader.key) + r.sread(buffer.cacheId) + let fd = pstream.recvFileHandle() buffer.fd = fd - socks.sread(buffer.cacheId) buffer.istream = newPosixStream(fd) buffer.istream.setBlocking(false) buffer.selector.registerHandle(fd, {Read}, 0) diff --git a/src/server/forkserver.nim b/src/server/forkserver.nim index 60981213..f0285357 100644 --- a/src/server/forkserver.nim +++ b/src/server/forkserver.nim @@ -1,15 +1,15 @@ import std/options import std/os import std/posix -import std/streams import std/tables import config/config import io/bufreader import io/bufwriter +import io/dynstream import io/posixstream -import io/serialize import io/serversocket +import io/stdio import loader/loader import server/buffer import types/urimethodmap @@ -39,8 +39,9 @@ proc newFileLoader*(forkserver: ForkServer; config: LoaderConfig): FileLoader = forkserver.ostream.withPacketWriter w: w.swrite(fcForkLoader) w.swrite(config) + var r = forkserver.istream.initPacketReader() var process: int - forkserver.istream.sread(process) + r.sread(process) return FileLoader(process: process, clientPid: getCurrentProcessId()) proc loadForkServerConfig*(forkserver: ForkServer, config: Config) = @@ -64,9 +65,10 @@ proc forkBuffer*(forkserver: ForkServer; config: BufferConfig; url: URL; w.swrite(attrs) w.swrite(ishtml) w.swrite(charsetStack) + var r = forkserver.istream.initPacketReader() var bufferPid: int - forkserver.istream.sread(bufferPid) - bufferPid + r.sread(bufferPid) + return bufferPid proc trapSIGINT() = # trap SIGINT, so e.g. an external editor receiving an interrupt in the @@ -147,9 +149,9 @@ proc forkBuffer(ctx: var ForkServerContext; r: var BufferedReader): int = gssock.close() let ps = newPosixStream(pipefd[1]) ps.write(char(0)) - ps.close() - discard close(stdin.getFileHandle()) - discard close(stdout.getFileHandle()) + ps.sclose() + closeStdin() + closeStdout() let loader = FileLoader( process: loaderPid, clientPid: pid @@ -168,9 +170,9 @@ proc forkBuffer(ctx: var ForkServerContext; r: var BufferedReader): int = doAssert false discard close(pipefd[1]) # close write let ps = newPosixStream(pipefd[0]) - let c = ps.readChar() + let c = ps.sreadChar() assert c == char(0) - ps.close() + ps.sclose() ctx.children.add(pid) return pid @@ -195,14 +197,14 @@ proc runForkServer() = ctx.children.del(i) of fcForkBuffer: let r = ctx.forkBuffer(r) - ctx.ostream.withWriter w: + ctx.ostream.withPacketWriter w: w.swrite(r) of fcForkLoader: assert ctx.loaderPid == 0 var config: LoaderConfig r.sread(config) let pid = ctx.forkLoader(config) - ctx.ostream.withWriter w: + ctx.ostream.withPacketWriter w: w.swrite(pid) ctx.loaderPid = pid ctx.children.add(pid) @@ -211,12 +213,11 @@ proc runForkServer() = r.sread(config) set_cjk_ambiguous(config.ambiguous_double) SocketDirectory = config.tmpdir - ctx.ostream.flush() except EOFError: # EOF break - ctx.istream.close() - ctx.ostream.close() + ctx.istream.sclose() + ctx.ostream.sclose() # Clean up when the main process crashed. for child in ctx.children: discard kill(cint(child), cint(SIGTERM)) diff --git a/src/types/formdata.nim b/src/types/formdata.nim index 29817e54..b353b814 100644 --- a/src/types/formdata.nim +++ b/src/types/formdata.nim @@ -1,6 +1,7 @@ -import std/streams import std/strutils +import io/dynstream +import io/filestream import js/javascript import types/blob import utils/twtstr @@ -56,7 +57,7 @@ proc calcLength*(this: FormData): int = proc getContentType*(this: FormData): string = return "multipart/form-data; boundary=" & this.boundary -proc writeEntry*(stream: Stream, entry: FormDataEntry, boundary: string) = +proc writeEntry*(stream: DynStream; entry: FormDataEntry; boundary: string) = stream.write("--" & boundary & "\r\n") let name = percentEncode(entry.name, {'"', '\r', '\n'}) if entry.isstr: @@ -74,17 +75,17 @@ proc writeEntry*(stream: Stream, entry: FormDataEntry, boundary: string) = blob.ctype stream.write("Content-Type: " & ctype & "\r\n") if blob.isfile: - let fs = newFileStream(WebFile(blob).path) + let fs = newDynFileStream(WebFile(blob).path) if fs != nil: var buf {.noinit.}: array[4096, uint8] while true: - let n = fs.readData(addr buf[0], 4096) + let n = fs.recvData(addr buf[0], 4096) if n == 0: break - stream.writeData(addr buf[0], n) + stream.sendDataLoop(addr buf[0], n) if n < buf.len: break else: - stream.writeData(blob.buffer, int(blob.size)) + stream.sendDataLoop(blob.buffer, int(blob.size)) stream.write("\r\n") stream.write("\r\n") |