diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/io/multistream.nim | 27 | ||||
-rw-r--r-- | src/io/teestream.nim | 3 | ||||
-rw-r--r-- | src/loader/loader.nim | 104 | ||||
-rw-r--r-- | src/loader/loaderhandle.nim | 29 | ||||
-rw-r--r-- | src/local/container.nim | 72 | ||||
-rw-r--r-- | src/local/pager.nim | 24 | ||||
-rw-r--r-- | src/server/buffer.nim | 163 |
7 files changed, 375 insertions, 47 deletions
diff --git a/src/io/multistream.nim b/src/io/multistream.nim new file mode 100644 index 00000000..33038ad6 --- /dev/null +++ b/src/io/multistream.nim @@ -0,0 +1,27 @@ +# MultiStream: write to several streams at once when writing to a single +# stream. +# See TeeStream for a pull version. + +import streams + +type MultiStream = ref object of Stream + s1: Stream + s2: Stream + +proc tsClose(s: Stream) = + let s = cast[MultiStream](s) + s.s1.close() + s.s2.close() + +proc msWriteData(s: Stream, buffer: pointer, bufLen: int) = + let s = cast[MultiStream](s) + s.s1.writeData(buffer, bufLen) + s.s2.writeData(buffer, bufLen) + +proc newMultiStream*(s1, s2: Stream, closedest = true): MultiStream = + return MultiStream( + s1: s1, + s2: s2, + closeImpl: tsClose, + writeDataImpl: msWriteData + ) diff --git a/src/io/teestream.nim b/src/io/teestream.nim index 81c9e2f0..a8f5792e 100644 --- a/src/io/teestream.nim +++ b/src/io/teestream.nim @@ -1,3 +1,6 @@ +# TeeStream: write to another stream when reading from one stream. +# See MultiStream for a push version. + import streams type TeeStream = ref object of Stream diff --git a/src/loader/loader.nim b/src/loader/loader.nim index 663915fa..6ab3ee9b 100644 --- a/src/loader/loader.nim +++ b/src/loader/loader.nim @@ -73,15 +73,21 @@ type LoaderCommand = enum LOAD - QUIT + TEE + SUSPEND + RESUME + ADDREF + UNREF LoaderContext = ref object + refcount: int ssock: ServerSocket alive: bool curlm: CURLM config: LoaderConfig extra_fds: seq[curl_waitfd] handleList: seq[CurlHandle] + handleMap: Table[int, LoaderHandle] LoaderConfig* = object defaultheaders*: Headers @@ -128,7 +134,7 @@ proc loadResource(ctx: LoaderContext, request: Request, handle: LoaderHandle) = discard handle.sendResult(ERROR_UNKNOWN_SCHEME) handle.close() -proc onLoad(ctx: LoaderContext, stream: Stream) = +proc onLoad(ctx: LoaderContext, stream: SocketStream) = var request: Request stream.sread(request) if not ctx.config.filter.match(request.url): @@ -150,6 +156,8 @@ proc onLoad(ctx: LoaderContext, stream: Stream) = request.headers["Referer"] = r if request.proxy == nil or not ctx.config.acceptProxy: request.proxy = ctx.config.proxy + let fd = int(stream.source.getFd()) + ctx.handleMap[fd] = handle ctx.loadResource(request, handle) proc acceptConnection(ctx: LoaderContext) = @@ -163,9 +171,37 @@ proc acceptConnection(ctx: LoaderContext) = case cmd of LOAD: ctx.onLoad(stream) - of QUIT: - ctx.alive = false - stream.close() + of TEE: + var fd: int + stream.sread(fd) + if fd notin ctx.handleMap: + stream.swrite(false) + else: + let handle = ctx.handleMap[fd] + handle.addOutputStream(stream) + stream.swrite(true) + of ADDREF: + inc ctx.refcount + of UNREF: + dec ctx.refcount + if ctx.refcount == 0: + ctx.alive = false + stream.close() + else: + assert ctx.refcount > 0 + of SUSPEND: + var fds: seq[int] + stream.sread(fds) + for fd in fds: + ctx.handleMap.withValue(fd, handlep): + handlep[].suspend() + of RESUME: + var fds: seq[int] + stream.sread(fds) + for fd in fds: + ctx.handleMap.withValue(fd, handlep): + handlep[].resume() + except IOError: # End-of-file, broken pipe, or something else. For now we just # ignore it and pray nothing breaks. @@ -198,7 +234,8 @@ proc initLoaderContext(fd: cint, config: LoaderConfig): LoaderContext = var ctx = LoaderContext( alive: true, curlm: curlm, - config: config + config: config, + refcount: 1 ) gctx = ctx #TODO ideally, buffered would be true. Unfortunately this conflicts with @@ -314,6 +351,52 @@ proc fetch*(loader: FileLoader, input: Request): FetchPromise = ) return promise +proc reconnect*(loader: FileLoader, data: ConnectData) = + let stream = connectSocketStream(loader.process, false, blocking = true) + stream.swrite(LOAD) + stream.swrite(data.request) + stream.flush() + let fd = int(stream.source.getFd()) + loader.registerFun(fd) + loader.connecting[fd] = ConnectData( + promise: data.promise, + request: data.request, + stream: stream + ) + +proc switchStream*(data: var ConnectData, stream: Stream) = + data.stream = stream + +proc switchStream*(loader: FileLoader, data: var OngoingData, + stream: SocketStream) = + data.response.body = stream + let fd = int(stream.source.getFd()) + let realCloseImpl = stream.closeImpl + stream.closeImpl = nil + data.response.unregisterFun = proc() = + loader.ongoing.del(fd) + loader.unregistered.add(fd) + loader.unregisterFun(fd) + realCloseImpl(stream) + +proc suspend*(loader: FileLoader, fds: seq[int]) = + let stream = connectSocketStream(loader.process, false, blocking = true) + stream.swrite(SUSPEND) + stream.swrite(fds) + stream.close() + +proc resume*(loader: FileLoader, fds: seq[int]) = + let stream = connectSocketStream(loader.process, false, blocking = true) + stream.swrite(RESUME) + stream.swrite(fds) + stream.close() + +proc tee*(loader: FileLoader, fd: int): Stream = + let stream = connectSocketStream(loader.process, false, blocking = true) + stream.swrite(TEE) + stream.swrite(fd) + return stream + const BufferSize = 4096 proc handleHeaders(loader: FileLoader, request: Request, response: Response, @@ -401,7 +484,12 @@ proc doRequest*(loader: FileLoader, request: Request, blocking = true, stream.source.getFd().setBlocking(blocking) return response -proc quit*(loader: FileLoader) = +proc addref*(loader: FileLoader) = + let stream = connectSocketStream(loader.process) + if stream != nil: + stream.swrite(ADDREF) + +proc unref*(loader: FileLoader) = let stream = connectSocketStream(loader.process) if stream != nil: - stream.swrite(QUIT) + stream.swrite(UNREF) diff --git a/src/loader/loaderhandle.nim b/src/loader/loaderhandle.nim index 2c999813..d8d01bb2 100644 --- a/src/loader/loaderhandle.nim +++ b/src/loader/loaderhandle.nim @@ -1,6 +1,7 @@ import net import streams +import io/multistream import io/posixstream import io/serialize import io/socketstream @@ -13,6 +14,7 @@ type LoaderHandle* = ref object # conditions that would be difficult to untangle. canredir: bool sostream: Stream # saved ostream when redirected + sostream_suspend: Stream # saved ostream when suspended # Create a new loader handle, with the output stream ostream. proc newLoaderHandle*(ostream: Stream, canredir: bool): LoaderHandle = @@ -21,6 +23,22 @@ proc newLoaderHandle*(ostream: Stream, canredir: bool): LoaderHandle = proc getFd*(handle: LoaderHandle): int = return int(SocketStream(handle.ostream).source.getFd()) +proc addOutputStream*(handle: LoaderHandle, stream: Stream) = + if likely(handle.sostream_suspend != nil): + let ms = newMultiStream(handle.sostream_suspend, stream) + handle.sostream_suspend = ms + else: + # In buffer, addOutputStream is used as follows: + # * suspend handle + # * tee handle (-> call addOutputStream) + # * resume handle + # This means that this code path will never be executed, as + # sostream_suspend is never nil when the function is called. + # (Feel free to remove this assertion if this changes.) + doAssert false + let ms = newMultiStream(handle.ostream, stream) + handle.ostream = ms + proc sendResult*(handle: LoaderHandle, res: int): bool = try: handle.ostream.swrite(res) @@ -62,6 +80,17 @@ proc sendData*(handle: LoaderHandle, s: string): bool = return handle.sendData(unsafeAddr s[0], s.len) return true +proc suspend*(handle: LoaderHandle) = + handle.sostream_suspend = handle.ostream + handle.ostream = newStringStream() + +proc resume*(handle: LoaderHandle) = + let ss = handle.ostream + handle.ostream = handle.sostream_suspend + handle.sostream_suspend = nil + discard handle.sendData(ss.readAll()) + ss.close() + proc close*(handle: LoaderHandle) = if handle.sostream != nil: try: diff --git a/src/local/container.nim b/src/local/container.nim index f8b6f1fc..d191c6b3 100644 --- a/src/local/container.nim +++ b/src/local/container.nim @@ -98,6 +98,7 @@ type redirectdepth*: int select*: Select canreinterpret*: bool + cloned: bool jsDestructor(Container) @@ -129,15 +130,62 @@ proc newBuffer*(forkserver: ForkServer, mainproc: Pid, config: BufferConfig, canreinterpret: canreinterpret ) +func location*(container: Container): URL {.jsfget.} = + return container.source.location + +proc clone*(container: Container, newurl: URL): Promise[Container] = + let url = if newurl != nil: + newurl + else: + container.location + return container.iface.clone(url).then(proc(pid: Pid): Container = + if pid == -1: + return nil + let ncontainer = Container( + config: container.config, + iface: container.iface, # changed later in setStream + width: container.width, + height: container.height, + title: container.title, + hovertext: container.hovertext, + lastpeek: container.lastpeek, + source: container.source, + pos: container.pos, + bpos: container.bpos, + process: pid, + loadinfo: container.loadinfo, + lines: container.lines, + lineshift: container.lineshift, + numLines: container.numLines, + code: container.code, + retry: container.retry, + hlon: container.hlon, + redraw: container.redraw, + #needslines: container.needslines, + canceled: container.canceled, + events: container.events, + startpos: container.startpos, + hasstart: container.hasstart, + redirectdepth: container.redirectdepth, + select: container.select, + canreinterpret: container.canreinterpret, + cloned: true + ) + for hl in container.highlights: + var hl0 = Highlight() + hl0[] = hl[] + ncontainer.highlights.add(hl0) + if newurl != nil: + ncontainer.source.location = newurl + return ncontainer + ) + func charset*(container: Container): Charset = return container.source.charset func contentType*(container: Container): Option[string] {.jsfget.} = return container.source.contenttype -func location*(container: Container): URL {.jsfget.} = - return container.source.location - func lineLoaded(container: Container, y: int): bool = return y - container.lineshift in 0..container.lines.high @@ -931,12 +979,18 @@ proc handleCommand(container: Container) = container.iface.resolve(packetid, len - slen(packetid)) proc setStream*(container: Container, stream: Stream) = - container.iface = newBufferInterface(stream) - if container.source.t == LOAD_PIPE: - container.iface.passFd(container.source.fd).then(proc() = - discard close(container.source.fd)) - stream.flush() - container.load() + if not container.cloned: + container.iface = newBufferInterface(stream) + if container.source.t == LOAD_PIPE: + container.iface.passFd(container.source.fd).then(proc() = + discard close(container.source.fd)) + stream.flush() + container.load() + else: + container.iface = container.iface.clone(stream) + # Maybe we have to resume loading. Let's try. + discard container.iface.load().then(proc(res: LoadResult) = + container.onload(res)) proc onreadline(container: Container, w: Slice[int], handle: (proc(line: SimpleFlexibleLine)), res: GetLinesResult) = for line in res.lines: diff --git a/src/local/pager.nim b/src/local/pager.nim index 26cbba53..d6845636 100644 --- a/src/local/pager.nim +++ b/src/local/pager.nim @@ -406,7 +406,7 @@ proc newBuffer(pager: Pager, bufferConfig: BufferConfig, source: BufferSource, canreinterpret ) -proc dupeBuffer(pager: Pager, container: Container, location: URL, +proc dupeBuffer2(pager: Pager, container: Container, location: URL, contentType = ""): Container = let contentType = if contentType != "": some(contentType) @@ -426,8 +426,16 @@ proc dupeBuffer(pager: Pager, container: Container, location: URL, container.pipeBuffer(pipeTo) return pipeTo -proc dupeBuffer(pager: Pager, location: URL = nil) {.jsfunc.} = - pager.addContainer(pager.dupeBuffer(pager.container, location)) +proc dupeBuffer(pager: Pager, container: Container, location: URL) = + container.clone(location).then(proc(container: Container) = + if container == nil: + pager.alert("Failed to duplicate buffer.") + else: + pager.addContainer(container) + ) + +proc dupeBuffer(pager: Pager) {.jsfunc.} = + pager.dupeBuffer(pager.container, pager.container.location) # The prevBuffer and nextBuffer procedures emulate w3m's PREV and NEXT # commands by traversing the container tree in a depth-first order. @@ -560,7 +568,7 @@ proc toggleSource(pager: Pager) {.jsfunc.} = "text/plain" else: "text/html" - let container = pager.dupeBuffer(pager.container, nil, contenttype) + let container = pager.dupeBuffer2(pager.container, nil, contenttype) container.sourcepair = pager.container pager.container.sourcepair = container pager.addContainer(container) @@ -897,9 +905,6 @@ proc runMailcapReadPipe(pager: Pager, container: Container, let p2 = p.then(proc(): auto = discard close(fdin) let ishtml = HTMLOUTPUT in entry.flags - if ishtml: - #TODO this is a hack for dupe buffer and should be reconsidered. - container.source.contenttype = some("text/html") return container.readFromFd(fdout, ishtml) ).then(proc() = discard close(fdout) @@ -972,9 +977,6 @@ proc runMailcapReadFile(pager: Pager, container: Container, discard close(pipefd[1]) let fdout = pipefd[0] let ishtml = HTMLOUTPUT in entry.flags - if ishtml: - #TODO this is a hack for dupe buffer and should be reconsidered. - container.source.contenttype = some("text/html") return container.readFromFd(fdout, ishtml).then(proc() = discard close(fdout) ) @@ -1118,7 +1120,7 @@ proc handleEvent0(pager: Pager, container: Container, event: ContainerEvent): bo of ANCHOR: var url2 = newURL(container.source.location) url2.setHash(event.anchor) - pager.addContainer(pager.dupeBuffer(container, url2)) + pager.dupeBuffer(container, url2) of NO_ANCHOR: pager.alert("Couldn't find anchor " & event.anchor) of UPDATE: diff --git a/src/server/buffer.nim b/src/server/buffer.nim index 49cd9140..af735c41 100644 --- a/src/server/buffer.nim +++ b/src/server/buffer.nim @@ -66,7 +66,7 @@ type CLICK, FIND_NEXT_LINK, FIND_PREV_LINK, FIND_NEXT_MATCH, FIND_PREV_MATCH, GET_SOURCE, GET_LINES, UPDATE_HOVER, PASS_FD, CONNECT, CONNECT2, GOTO_ANCHOR, CANCEL, GET_TITLE, SELECT, REDIRECT_TO_FD, READ_FROM_FD, - SET_CONTENT_TYPE + SET_CONTENT_TYPE, CLONE # LOADING_PAGE: istream open # LOADING_RESOURCES: istream closed, resources open @@ -104,7 +104,7 @@ type istream: Stream sstream: Stream available: int - pstream: Stream # pipe stream + pstream: SocketStream # pipe stream srenderer: StreamRenderer connected: bool state: BufferState @@ -141,6 +141,15 @@ proc newBufferInterface*(stream: Stream): BufferInterface = stream: stream ) +proc clone*(iface: BufferInterface, stream: Stream): BufferInterface = + let iface2 = newBufferInterface(stream) + var len: int + var pid: Pid + stream.sread(len) + stream.sread(iface2.packetid) + stream.sread(pid) + return iface2 + proc resolve*(iface: BufferInterface, packetid, len: int) = iface.opaque.len = len iface.map.resolve(packetid) @@ -680,7 +689,10 @@ proc connect*(buffer: Buffer): ConnectResult {.proxy.} = var referrerpolicy: Option[ReferrerPolicy] case source.t of CLONE: - #TODO clone should probably just fork() the buffer instead. + #TODO there is only one function for CLONE left: to get the source for + # the "view buffer" operation. + # This does not belong in buffers at all, and should be requested from + # the networking module instead. let s = connectSocketStream(source.clonepid, blocking = false) buffer.istream = s buffer.fd = int(s.source.getFd()) @@ -728,7 +740,7 @@ proc connect*(buffer: Buffer): ConnectResult {.proxy.} = # * connect2, telling loader to load at last (we block loader until then) # * redirectToFd, telling loader to load into the passed fd proc connect2*(buffer: Buffer) {.proxy.} = - if buffer.source.t == LOAD_REQUEST: + if buffer.source.t == LOAD_REQUEST and buffer.istream of SocketStream: # Notify loader that we can proceed with loading the input stream. let ss = SocketStream(buffer.istream) ss.swrite(false) @@ -777,6 +789,118 @@ proc readFromFd*(buffer: Buffer, fd: FileHandle, ishtml: bool) {.proxy.} = proc setContentType*(buffer: Buffer, contentType: string) {.proxy.} = buffer.source.contenttype = some(contentType) +# Create an exact clone of the current buffer. +# This clone will share the loader process with the previous buffer. +proc clone*(buffer: Buffer, newurl: URL): Pid {.proxy.} = + var pipefd: array[2, cint] + if pipe(pipefd) == -1: + buffer.estream.write("Failed to open pipe.\n") + return -1 + # Naturally, we have to solve the problem of splitting up input streams here. + # The "cleanest" way is to get the source to duplicate the stream, and + # also send the new buffer the data over a separate stream. We do this + # for resources we retrieve with fetch(). + # This is unfortunately not possible for the main source input stream, + # because it may come from a pipe that we receive from the client. + # So for istream, we just use a TeeStream from the original buffer and + # pray that no interruptions happen along the way. + # TODO: this is fundamentally broken and should be changed once the istream + # mess is untangled. A good first step would be to remove sstream from + # buffer. + let needsPipe = not buffer.istream.atEnd + var pipefd_write: array[2, cint] + if needsPipe: + assert buffer.fd != -1 + if pipe(pipefd_write) == -1: + buffer.estream.write("Failed to open pipe.\n") + return -1 + var fds: seq[int] + for fd in buffer.loader.connecting.keys: + fds.add(fd) + for fd in buffer.loader.ongoing.keys: + fds.add(fd) + #TODO maybe we still have some data in sockets... we should probably split + # this up to be executed after the main loop is finished... + buffer.loader.suspend(fds) + buffer.loader.addref() + let pid = fork() + if pid == -1: + buffer.estream.write("Failed to clone buffer.\n") + return -1 + if pid == 0: # child + discard close(pipefd[0]) # close read + let ps = newPosixStream(pipefd[1]) + # We must allocate a new selector for this new process. (Otherwise we + # would interfere with operation of the other one.) + # Closing seems to suffice here. + buffer.selector.close() + buffer.selector = newSelector[int]() + #TODO set buffer.window.timeouts.selector + var cfds: seq[int] + for fd in buffer.loader.connecting.keys: + cfds.add(fd) + for fd in cfds: + let stream = SocketStream(buffer.loader.tee(fd)) + var success: bool + stream.sread(success) + let sfd = int(stream.source.getFd()) + if success: + switchStream(buffer.loader.connecting[fd], stream) + buffer.loader.connecting[sfd] = buffer.loader.connecting[fd] + else: + # Unlikely, but theoretically possible: our SUSPEND connection + # finished before the connection could have been completed. + #TODO for now, we get an fd even if the connection has already been + # finished. there should be a better way to do this. + buffer.loader.reconnect(buffer.loader.connecting[fd]) + buffer.loader.connecting.del(fd) + var ofds: seq[int] + for fd in buffer.loader.ongoing.keys: + ofds.add(fd) + for fd in ofds: + let stream = SocketStream(buffer.loader.tee(fd)) + var success: bool + stream.sread(success) + let sfd = int(stream.source.getFd()) + if success: + buffer.loader.switchStream(buffer.loader.ongoing[fd], stream) + buffer.loader.ongoing[sfd] = buffer.loader.ongoing[fd] + else: + # Already finished. + #TODO what to do? + discard + if needsPipe: + discard close(pipefd_write[1]) # close write + buffer.fd = pipefd_write[0] + buffer.selector.registerHandle(buffer.fd, {Read}, 0) + let ps = newPosixStream(pipefd_write[0]) + buffer.istream = newTeeStream(ps, buffer.sstream, closedest = false) + buffer.pstream.close() + let ssock = initServerSocket(buffered = false) + ps.write(char(0)) + buffer.source.location = newurl + for it in buffer.tasks.mitems: + it = 0 + let socks = ssock.acceptSocketStream() + buffer.pstream = socks + buffer.rfd = int(socks.source.getFd()) + buffer.selector.registerHandle(buffer.rfd, {Read}, 0) + return 0 + else: # parent + discard close(pipefd[1]) # close write + if needsPipe: + discard close(pipefd_write[0]) # close read + # We must wait for child to tee its ongoing streams. + let ps = newPosixStream(pipefd[0]) + let c = ps.readChar() + assert c == char(0) + ps.close() + if needsPipe: + let istrmp = newPosixStream(pipefd_write[1]) + buffer.istream = newTeeStream(buffer.istream, istrmp) + buffer.loader.resume(fds) + return pid + const BufferSize = 4096 proc finishLoad(buffer: Buffer): EmptyPromise = @@ -1346,7 +1470,7 @@ macro bufferDispatcher(funs: static ProxyMap, buffer: Buffer, let typ = param[^2] stmts.add(quote do: when `typ` is FileHandle: - let `id` = SocketStream(`buffer`.pstream).recvFileHandle() + let `id` = `buffer`.pstream.recvFileHandle() else: var `id`: `typ` `buffer`.pstream.sread(`id`)) @@ -1434,8 +1558,7 @@ proc handleError(buffer: Buffer, fd: int, err: OSErrorCode) = else: assert false, $fd & ": " & $err -proc runBuffer(buffer: Buffer, rfd: int) = - buffer.rfd = rfd +proc runBuffer(buffer: Buffer) = while buffer.alive: let events = buffer.selector.select(-1) for event in events: @@ -1454,6 +1577,7 @@ proc runBuffer(buffer: Buffer, rfd: int) = proc launchBuffer*(config: BufferConfig, source: BufferSource, attrs: WindowAttributes, loader: FileLoader, ssock: ServerSocket) = + let socks = ssock.acceptSocketStream() let buffer = Buffer( alive: true, userstyle: parseStylesheet(config.userstyle), @@ -1464,22 +1588,23 @@ proc launchBuffer*(config: BufferConfig, source: BufferSource, sstream: newStringStream(), viewport: Viewport(window: attrs), width: attrs.width, - height: attrs.height - 1 + height: attrs.height - 1, + readbufsize: BufferSize, + selector: newSelector[int](), + estream: newFileStream(stderr), + pstream: socks, + rfd: int(socks.source.getFd()) ) - buffer.readbufsize = BufferSize - buffer.selector = newSelector[int]() - loader.registerFun = proc(fd: int) = buffer.selector.registerHandle(fd, {Read}, 0) - loader.unregisterFun = proc(fd: int) = buffer.selector.unregister(fd) buffer.srenderer = newStreamRenderer(buffer.sstream, buffer.charsets) + loader.registerFun = proc(fd: int) = + buffer.selector.registerHandle(fd, {Read}, 0) + loader.unregisterFun = proc(fd: int) = + buffer.selector.unregister(fd) if buffer.config.scripting: buffer.window = newWindow(buffer.config.scripting, buffer.selector, buffer.attrs, proc(url: URL) = buffer.navigate(url), some(buffer.loader)) - let socks = ssock.acceptSocketStream() - buffer.estream = newFileStream(stderr) - buffer.pstream = socks - let rfd = int(socks.source.getFd()) - buffer.selector.registerHandle(rfd, {Read}, 0) - buffer.runBuffer(rfd) + buffer.selector.registerHandle(buffer.rfd, {Read}, 0) + buffer.runBuffer() buffer.pstream.close() - buffer.loader.quit() + buffer.loader.unref() quit(0) |