From 5b2a36579e53c69f154288a91ddc3e7c5375d7a6 Mon Sep 17 00:00:00 2001 From: bptato Date: Sat, 14 Sep 2024 15:58:52 +0200 Subject: loader: refactor, misc optimizations & fixes * factor out input/output handle tables; use a seq instead * add possibility to directly open cached items onto stdin (mainly an optimization for reading images, which are always cached) * close used handles on local CGI execution * make clone during load work again --- src/server/buffer.nim | 78 ++++++++++++++++++++++++--------------------------- 1 file changed, 37 insertions(+), 41 deletions(-) (limited to 'src/server/buffer.nim') diff --git a/src/server/buffer.nim b/src/server/buffer.nim index 4324944e..ea17cbfc 100644 --- a/src/server/buffer.nim +++ b/src/server/buffer.nim @@ -929,15 +929,15 @@ proc clone*(buffer: Buffer; newurl: URL): int {.proxy.} = return -1 # suspend outputs before tee'ing var ids: seq[int] = @[] - for response in buffer.loader.ongoing.values: - if response.onRead != nil: - ids.add(response.outputId) + for it in buffer.loader.ongoing: + if it.response.onRead != nil: + ids.add(it.response.outputId) buffer.loader.suspend(ids) # ongoing transfers are now suspended; exhaust all data in the internal buffer # just to be safe. - for fd, response in buffer.loader.ongoing: - if response.onRead != nil: - buffer.loader.onRead(fd) + for it in buffer.loader.ongoing: + if it.response.onRead != nil: + buffer.loader.onRead(it.fd) let pid = fork() if pid == -1: buffer.estream.write("Failed to clone buffer.\n") @@ -973,22 +973,31 @@ proc clone*(buffer: Buffer; newurl: URL): int {.proxy.} = else: buffer.selector = newSelector[int]() #TODO set buffer.window.timeouts.selector - var ongoing: seq[Response] = @[] - for response in buffer.loader.ongoing.values: - ongoing.add(response) - response.body.sclose() - buffer.loader.ongoing.clear() + var connecting: seq[ConnectData] = @[] + var ongoing: seq[OngoingData] = @[] + for it in buffer.loader.map: + if it != nil: + if it of ConnectData: + connecting.add(ConnectData(it)) + else: + let it = OngoingData(it) + ongoing.add(it) + it.response.body.sclose() + buffer.loader.unregistered.add(it.fd) + buffer.loader.unset(it) let myPid = getCurrentProcessId() - for response in ongoing.mitems: + for it in ongoing.mitems: + let response = it.response # tee ongoing streams let (stream, outputId) = buffer.loader.tee(response.outputId, myPid) # if -1, well, this side hasn't exhausted the socket's buffer doAssert outputId != -1 and stream != nil response.outputId = outputId response.body = stream - let fd = int(response.body.fd) - buffer.loader.ongoing[fd] = response + let data = OngoingData(response: response, stream: stream) + let fd = data.fd buffer.selector.registerHandle(fd, {Read}, 0) + buffer.loader.put(data) if buffer.istream != nil: # We do not own our input stream, so we can't tee it. # Luckily it is cached, so what we *can* do is to load the same thing from @@ -1015,14 +1024,9 @@ proc clone*(buffer: Buffer; newurl: URL): int {.proxy.} = buffer.selector.registerHandle(buffer.rfd, {Read}, 0) # must reconnect after the new client is set up, or the client pids get # mixed up. - var cfds: seq[int] = @[] - for fd in buffer.loader.connecting.keys: - cfds.add(fd) - for fd in cfds: + for it in connecting: # connecting: just reconnect - let data = buffer.loader.connecting[fd] - buffer.loader.connecting.del(fd) - buffer.loader.reconnect(data) + buffer.loader.reconnect(it) return 0 else: # parent discard close(pipefd[1]) # close write @@ -1171,16 +1175,13 @@ proc forceRender*(buffer: Buffer) {.proxy.} = proc cancel*(buffer: Buffer) {.proxy.} = if buffer.state == bsLoaded: return - for fd, data in buffer.loader.connecting: - buffer.selector.unregister(fd) - buffer.loader.unregistered.add(fd) - data.stream.sclose() - buffer.loader.connecting.clear() - for fd, response in buffer.loader.ongoing: - buffer.selector.unregister(fd) - buffer.loader.unregistered.add(fd) - response.body.sclose() - buffer.loader.ongoing.clear() + for it in buffer.loader.map: + if it != nil: + let fd = it.fd + buffer.selector.unregister(fd) + buffer.loader.unregistered.add(fd) + it.stream.sclose() + buffer.loader.unset(it) if buffer.istream != nil: buffer.selector.unregister(buffer.fd) buffer.loader.unregistered.add(buffer.fd) @@ -1792,11 +1793,7 @@ proc handleRead(buffer: Buffer; fd: int): bool = return false elif fd == buffer.fd: buffer.onload() - elif fd in buffer.loader.connecting: - buffer.loader.onConnected(fd) - if buffer.config.scripting: - buffer.window.runJSJobs() - elif fd in buffer.loader.ongoing: + elif buffer.loader.get(fd) != nil: buffer.loader.onRead(fd) if buffer.config.scripting: buffer.window.runJSJobs() @@ -1812,11 +1809,10 @@ proc handleError(buffer: Buffer; fd: int; err: OSErrorCode): bool = return false elif fd == buffer.fd: buffer.onload() - elif fd in buffer.loader.connecting: - # probably shouldn't happen. TODO - assert false, $fd & ": " & $err - elif fd in buffer.loader.ongoing: - buffer.loader.onError(fd) + elif buffer.loader.get(fd) != nil: + if not buffer.loader.onError(fd): + #TODO handle connection error + assert false, $fd & ": " & $err if buffer.config.scripting: buffer.window.runJSJobs() elif fd in buffer.loader.unregistered: -- cgit 1.4.1-2-gfad0 id='n91' href='#n91'>91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134