diff options
author | bptato <nincsnevem662@gmail.com> | 2024-02-10 22:52:13 +0100 |
---|---|---|
committer | bptato <nincsnevem662@gmail.com> | 2024-02-10 22:54:31 +0100 |
commit | c4f0423e1a786fef840fd2f8c5c6bba550b353ab (patch) | |
tree | 9eb8007ca3af03f466dd3eedbb1e8b7e29e8ff94 /src | |
parent | d8c4b0979c6d1ff9f6edea650e3aeb1ca1e4a104 (diff) | |
download | chawan-c4f0423e1a786fef840fd2f8c5c6bba550b353ab.tar.gz |
loader: fix tee
My eyes are bleeding, but at least there is a chance that this does what I wanted. The previous tee implementation mixed buffer and loader fds, so it was fundamentally broken. Also, it used MultiStream which makes asynchronous streaming impossible. This time we use a flat array of output handles and link to them any buffers not written to the target yet.
Diffstat (limited to 'src')
-rw-r--r-- | src/loader/loader.nim | 213 | ||||
-rw-r--r-- | src/loader/loaderhandle.nim | 132 | ||||
-rw-r--r-- | src/loader/request.nim | 2 | ||||
-rw-r--r-- | src/server/buffer.nim | 9 |
4 files changed, 230 insertions, 126 deletions
diff --git a/src/loader/loader.nim b/src/loader/loader.nim index 82796e20..53aeb498 100644 --- a/src/loader/loader.nim +++ b/src/loader/loader.nim @@ -88,6 +88,8 @@ type alive: bool config: LoaderConfig handleMap: Table[int, LoaderHandle] + outputMap: Table[int, OutputHandle] + clientFdMap: seq[tuple[pid, fd: int, output: OutputHandle]] referrerpolicy: ReferrerPolicy selector: Selector[int] fd: int @@ -138,18 +140,23 @@ proc loadResource(ctx: LoaderContext, request: Request, handle: LoaderHandle) = if handle.istream == nil: handle.close() else: - handle.ostream.setBlocking(false) + let output = handle.output + output.ostream.setBlocking(false) ctx.selector.registerHandle(handle.istream.fd, {Read}, 0) - ctx.selector.registerHandle(handle.ostream.fd, {Write}, 0) + ctx.selector.registerHandle(output.ostream.fd, {Write}, 0) let ofl = fcntl(handle.istream.fd, F_GETFL, 0) discard fcntl(handle.istream.fd, F_SETFL, ofl or O_NONBLOCK) - # yes, this puts the istream fd in addition to the ostream fd in - # handlemap to point to the same ref ctx.handleMap[handle.istream.fd] = handle - # also put the new fd into handleMap if stream was redirected - if handle.sostream != nil: - ctx.handleMap[handle.ostream.fd] = handle - ctx.handleMap.del(handle.sostream.fd) + if output.sostream != nil: + # replace the fd with the new one in outputMap if stream was + # redirected + # (kind of a hack, but should always work) + ctx.outputMap[output.ostream.fd] = output + ctx.outputMap.del(output.sostream.fd) + # currently only the main buffer stream can have redirects, and we + # don't suspend/resume it; if we did, we would have to put the new + # output stream's clientFd in clientFdMap too. + ctx.clientFdMap.del(output.sostream.fd) else: prevurl = request.url case ctx.config.uriMethodMap.findAndRewrite(request.url) @@ -169,7 +176,13 @@ proc loadResource(ctx: LoaderContext, request: Request, handle: LoaderHandle) = proc onLoad(ctx: LoaderContext, stream: SocketStream) = var request: Request stream.sread(request) - let handle = newLoaderHandle(stream, request.canredir) + let handle = newLoaderHandle( + stream, + request.canredir, + request.clientPid, + request.clientFd + ) + assert request.clientPid != 0 when defined(debug): handle.url = request.url if not ctx.config.filter.match(request.url): @@ -191,9 +204,22 @@ proc onLoad(ctx: LoaderContext, stream: SocketStream) = if request.proxy == nil or not ctx.config.acceptProxy: request.proxy = ctx.config.proxy let fd = int(stream.source.getFd()) - ctx.handleMap[fd] = handle + ctx.outputMap[fd] = handle.output + ctx.clientFdMap.add((request.clientPid, request.clientFd, handle.output)) ctx.loadResource(request, handle) +func findClientFdEntry(ctx: LoaderContext, pid, fd: int): int = + for i, (itpid, itfd, _) in ctx.clientFdMap: + if pid == itpid and fd == itfd: + return i + return -1 + +func findOutputByClientFd(ctx: LoaderContext, pid, fd: int): OutputHandle = + let i = ctx.findClientFdEntry(pid, fd) + if i != -1: + return ctx.clientFdMap[i].output + return nil + proc acceptConnection(ctx: LoaderContext) = let stream = ctx.ssock.acceptSocketStream() try: @@ -203,32 +229,40 @@ proc acceptConnection(ctx: LoaderContext) = of LOAD: ctx.onLoad(stream) of TEE: + var clientPid: int + var clientFd: int + var pid: int var fd: int + stream.sread(pid) stream.sread(fd) - if fd notin ctx.handleMap: - stream.swrite(false) - else: - let handle = ctx.handleMap[fd] - handle.addOutputStream(stream) - stream.swrite(true) + stream.sread(clientPid) + stream.sread(clientFd) + let output = ctx.findOutputByClientFd(pid, fd) + if output != nil: + output.tee(stream, clientPid, clientFd) + stream.swrite(output != nil) of SUSPEND: + var pid: int var fds: seq[int] + stream.sread(pid) stream.sread(fds) for fd in fds: - ctx.handleMap.withValue(fd, handlep): - if handlep[].ostream != nil and handlep[].ostream.fd == fd: - # remove from the selector, so any new reads will be just placed - # in the handle's buffer - ctx.selector.unregister(fd) + let output = ctx.findOutputByClientFd(pid, fd) + if output != nil: + # remove from the selector, so any new reads will be just placed + # in the handle's buffer + ctx.selector.unregister(output.ostream.fd) of RESUME: + var pid: int var fds: seq[int] + stream.sread(pid) stream.sread(fds) for fd in fds: - ctx.handleMap.withValue(fd, handlep): - if handlep[].ostream != nil and handlep[].ostream.fd == fd: - # place the stream back into the selector, so we can write to it - # again - ctx.selector.registerHandle(fd, {Write}, 0) + let output = ctx.findOutputByClientFd(pid, fd) + if output != nil: + # place the stream back into the selector, so we can write to it + # again + ctx.selector.registerHandle(output.ostream.fd, {Write}, 0) of ADDREF: inc ctx.refcount of UNREF: @@ -284,7 +318,7 @@ proc runFileLoader*(fd: cint, config: LoaderConfig) = while ctx.alive: let events = ctx.selector.select(-1) var unregRead: seq[LoaderHandle] - var unregWrite: seq[LoaderHandle] + var unregWrite: seq[OutputHandle] for event in events: if Read in event.events: if event.fd == ctx.fd: # incoming connection @@ -297,46 +331,41 @@ proc runFileLoader*(fd: cint, config: LoaderConfig) = try: buffer.len = handle.istream.recvData(addr buffer[0], buffer.cap) if buffer.len == 0: - dealloc(buffer) break handle.addBuffer(buffer) if buffer.len < buffer.cap: break except ErrorAgain, ErrorWouldBlock: # retry later - dealloc(buffer) break except ErrorBrokenPipe: # sender died; stop streaming - dealloc(buffer) unregRead.add(handle) break if Write in event.events: - let handle = ctx.handleMap[event.fd] - assert event.fd == handle.ostream.fd - while handle.currentBuffer != nil: - let buffer = handle.currentBuffer + let output = ctx.outputMap[event.fd] + while output.currentBuffer != nil: + let buffer = output.currentBuffer try: - let i = handle.currentBufferIdx + let i = output.currentBufferIdx assert buffer.len - i > 0 - let n = handle.sendData(addr buffer[i], buffer.len - i) - handle.currentBufferIdx += n - if handle.currentBufferIdx < buffer.len: + let n = output.sendData(addr buffer[i], buffer.len - i) + output.currentBufferIdx += n + if output.currentBufferIdx < buffer.len: break - handle.bufferCleared() # swap out buffer + output.bufferCleared() # swap out buffer except ErrorAgain, ErrorWouldBlock: # never mind break except ErrorBrokenPipe: # receiver died; stop streaming - unregWrite.add(handle) + unregWrite.add(output) break - if handle.istream == nil and handle.currentBuffer == nil: + if output.istreamAtEnd and output.currentBuffer == nil: # after EOF, but not appended in this send cycle - unregWrite.add(handle) + unregWrite.add(output) if Error in event.events: assert event.fd != ctx.fd - let handle = ctx.handleMap[event.fd] - if handle.ostream.fd == event.fd: # ostream died - unregWrite.add(handle) - else: # istream died - assert handle.istream.fd == event.fd + ctx.outputMap.withValue(event.fd, outputp): # ostream died + unregWrite.add(outputp[]) + do: # istream died + let handle = ctx.handleMap[event.fd] unregRead.add(handle) # Unregister handles queued for unregistration. # It is possible for both unregRead and unregWrite to contain duplicates. To @@ -348,29 +377,51 @@ proc runFileLoader*(fd: cint, config: LoaderConfig) = ctx.handleMap.del(handle.istream.fd) handle.istream.close() handle.istream = nil - if handle.currentBuffer == nil: - unregWrite.add(handle) - for handle in unregWrite: - if handle.ostream != nil: - ctx.selector.unregister(handle.ostream.fd) - ctx.handleMap.del(handle.ostream.fd) - handle.ostream.close() - handle.ostream = nil - if handle.sostream != nil: + for output in handle.outputs: + output.istreamAtEnd = true + if output.currentBuffer == nil: + unregWrite.add(output) + for output in unregWrite: + if output.ostream != nil: + ctx.selector.unregister(output.ostream.fd) + ctx.outputMap.del(output.ostream.fd) + if output.clientFd != -1: + let i = ctx.findClientFdEntry(output.clientPid, output.clientFd) + ctx.clientFdMap.del(i) + output.ostream.close() + output.ostream = nil + let handle = output.parent + let i = handle.outputs.find(output) + handle.outputs.del(i) + if handle.outputs.len == 0 and handle.istream != nil: + # premature end of all output streams; kill istream too + ctx.selector.unregister(handle.istream.fd) + ctx.handleMap.del(handle.istream.fd) + handle.istream.close() + handle.istream = nil + if output.sostream != nil: + #TODO it is not clear what should happen when multiple outputs exist. + # + # Normally, sostream is created after redirection, and must be written + # to & closed after the input has completely been written into the + # output stream. e.g. runMailcapEntryFile uses this to wait for the file + # to be completely downloaded before executing an entry that takes a + # file parameter. + # + # We should either block clone in this case, or find a better way to + # wait for file downloads to finish. (Note that the buffer remaining + # opened until the file has been downloaded is a somewhat useful visual + # indication; while it does not show progress (bad), it does at least + # show that *something* has been opened. An alternative should probably + # add a temporary entry to a file download screen or something.) try: - handle.sostream.swrite(true) + output.sostream.swrite(true) except IOError: # ignore error, that just means the buffer has already closed the # stream discard - handle.sostream.close() - handle.sostream = nil - if handle.istream != nil: - ctx.handleMap.del(handle.istream.fd) - ctx.selector.unregister(handle.istream.fd) - handle.istream.close() - handle.istream = nil - #TODO TODO TODO what to do about sostream + output.sostream.close() + output.sostream = nil ctx.exitLoader() proc getAttribute(contentType, attrname: string): string = @@ -383,16 +434,15 @@ proc getAttribute(contentType, attrname: string): string = while i < kvs.len and kvs[i] in AsciiWhitespace: inc i var q = false - for j in i ..< kvs.len: + for j, c in kvs.toOpenArray(i, kvs.high): if q: - s &= kvs[j] + s &= c + elif c == '\\': + q = true + elif c == ';' or c in AsciiWhitespace: + break else: - if kvs[j] == '\\': - q = true - elif kvs[j] == ';' or kvs[j] in AsciiWhitespace: - break - else: - s &= kvs[j] + s &= c return s proc applyHeaders(loader: FileLoader, request: Request, response: Response) = @@ -427,6 +477,8 @@ proc applyHeaders(loader: FileLoader, request: Request, response: Response) = #TODO: add init proc fetch*(loader: FileLoader, input: Request): FetchPromise = let stream = connectSocketStream(loader.process, false, blocking = true) + input.clientPid = getpid() + input.clientFd = int(stream.fd) stream.swrite(LOAD) stream.swrite(input) stream.flush() @@ -442,6 +494,8 @@ proc fetch*(loader: FileLoader, input: Request): FetchPromise = proc reconnect*(loader: FileLoader, data: ConnectData) = let stream = connectSocketStream(loader.process, false, blocking = true) + data.request.clientPid = getpid() + data.request.clientFd = int(stream.fd) stream.swrite(LOAD) stream.swrite(data.request) stream.flush() @@ -468,22 +522,27 @@ proc switchStream*(loader: FileLoader, data: var OngoingData, loader.unregisterFun(fd) realCloseImpl(stream) -proc suspend*(loader: FileLoader, fds: seq[int]) = +proc suspend*(loader: FileLoader, pid: int, fds: seq[int]) = let stream = connectSocketStream(loader.process, false, blocking = true) stream.swrite(SUSPEND) + stream.swrite(pid) stream.swrite(fds) stream.close() -proc resume*(loader: FileLoader, fds: seq[int]) = +proc resume*(loader: FileLoader, pid: int, fds: seq[int]) = let stream = connectSocketStream(loader.process, false, blocking = true) stream.swrite(RESUME) + stream.swrite(pid) stream.swrite(fds) stream.close() -proc tee*(loader: FileLoader, fd: int): Stream = +proc tee*(loader: FileLoader, pid, fd: int): Stream = let stream = connectSocketStream(loader.process, false, blocking = true) stream.swrite(TEE) + stream.swrite(pid) stream.swrite(fd) + stream.swrite(int(getpid())) + stream.swrite(int(stream.fd)) return stream const BufferSize = 4096 @@ -574,6 +633,8 @@ proc doRequest*(loader: FileLoader, request: Request, blocking = true, let stream = connectSocketStream(loader.process, false, blocking = true) if canredir: request.canredir = true #TODO set this somewhere else? + request.clientPid = getpid() + request.clientFd = int(stream.fd) stream.swrite(LOAD) stream.swrite(request) stream.flush() diff --git a/src/loader/loaderhandle.nim b/src/loader/loaderhandle.nim index 15336964..6cdfd19f 100644 --- a/src/loader/loaderhandle.nim +++ b/src/loader/loaderhandle.nim @@ -10,44 +10,68 @@ import loader/headers when defined(debug): import types/url -type - LoaderBufferPage = array[4056, uint8] # 4096 - 8 - 32 +const LoaderBufferPageSize = 4064 # 4096 - 32 +type LoaderBufferObj = object - page*: LoaderBufferPage + page: ptr UncheckedArray[uint8] len: int - LoaderBuffer* = ptr LoaderBufferObj + LoaderBuffer* = ref LoaderBufferObj - OutputHandle* = object + OutputHandle* = ref object + parent*: LoaderHandle + currentBuffer*: LoaderBuffer + currentBufferIdx*: int + buffers: Deque[LoaderBuffer] + ostream*: PosixStream + istreamAtEnd*: bool + sostream*: PosixStream # saved ostream when redirected + clientFd*: int + clientPid*: int LoaderHandle* = ref object - ostream*: PosixStream #TODO un-extern # Stream for taking input istream*: PosixStream # Only the first handle can be redirected, because a) mailcap can only # redirect the first handle and b) async redirects would result in race # conditions that would be difficult to untangle. canredir: bool - sostream*: PosixStream # saved ostream when redirected - currentBuffer*: LoaderBuffer - currentBufferIdx*: int - buffers: Deque[LoaderBuffer] + outputs*: seq[OutputHandle] when defined(debug): url*: URL +{.warning[Deprecated]:off.}: + proc `=destroy`(buffer: var LoaderBufferObj) = + if buffer.page != nil: + dealloc(buffer.page) + buffer.page = nil + # Create a new loader handle, with the output stream ostream. -proc newLoaderHandle*(ostream: PosixStream, canredir: bool): LoaderHandle = - return LoaderHandle( - ostream: ostream, +proc newLoaderHandle*(ostream: PosixStream, canredir: bool, + clientPid, clientFd: int): LoaderHandle = + let handle = LoaderHandle( canredir: canredir ) + handle.outputs.add(OutputHandle( + ostream: ostream, + parent: handle, + clientPid: clientPid, + clientFd: clientFd + )) + return handle + +proc findOutputHandle*(handle: LoaderHandle, fd: int): OutputHandle = + for output in handle.outputs: + if output.ostream.fd == fd: + return output + return nil func `[]`*(buffer: LoaderBuffer, i: int): var uint8 {.inline.} = return buffer[].page[i] func cap*(buffer: LoaderBuffer): int {.inline.} = - return buffer[].page.len + return LoaderBufferPageSize func len*(buffer: LoaderBuffer): var int {.inline.} = return buffer[].len @@ -56,59 +80,75 @@ proc `len=`*(buffer: LoaderBuffer, i: int) {.inline.} = buffer[].len = i proc newLoaderBuffer*(): LoaderBuffer = - let buffer = cast[LoaderBuffer](alloc(sizeof(LoaderBufferObj))) + let buffer = LoaderBuffer( + page: cast[ptr UncheckedArray[uint8]](alloc(LoaderBufferPageSize)) + ) buffer.len = 0 return buffer proc addBuffer*(handle: LoaderHandle, buffer: LoaderBuffer) = - if handle.currentBuffer == nil: - handle.currentBuffer = buffer - else: - handle.buffers.addLast(buffer) - -proc bufferCleared*(handle: LoaderHandle) = - assert handle.currentBuffer != nil - handle.currentBufferIdx = 0 - dealloc(handle.currentBuffer) - if handle.buffers.len > 0: - handle.currentBuffer = handle.buffers.popFirst() + for output in handle.outputs.mitems: + if output.currentBuffer == nil: + output.currentBuffer = buffer + else: + output.buffers.addLast(buffer) + +proc bufferCleared*(output: OutputHandle) = + assert output.currentBuffer != nil + output.currentBufferIdx = 0 + if output.buffers.len > 0: + output.currentBuffer = output.buffers.popFirst() else: - handle.currentBuffer = nil + output.currentBuffer = nil + +proc tee*(outputIn: OutputHandle, ostream: PosixStream, + clientFd, clientPid: int) = + outputIn.parent.outputs.add(OutputHandle( + parent: outputIn.parent, + ostream: ostream, + currentBuffer: outputIn.currentBuffer, + currentBufferIdx: outputIn.currentBufferIdx, + buffers: outputIn.buffers, + istreamAtEnd: outputIn.istreamAtEnd, + clientFd: clientFd, + clientPid: clientPid + )) -proc addOutputStream*(handle: LoaderHandle, stream: Stream) = - doAssert false - #TODO TODO TODO fix this - #let ms = newMultiStream(handle.ostream, stream) - #handle.ostream = ms +template output*(handle: LoaderHandle): OutputHandle = + handle.outputs[0] proc sendResult*(handle: LoaderHandle, res: int, msg = "") = - handle.ostream.swrite(res) + handle.output.ostream.swrite(res) if res == 0: # success assert msg == "" else: # error - handle.ostream.swrite(msg) + handle.output.ostream.swrite(msg) proc sendStatus*(handle: LoaderHandle, status: int) = - handle.ostream.swrite(status) + handle.output.ostream.swrite(status) proc sendHeaders*(handle: LoaderHandle, headers: Headers) = - handle.ostream.swrite(headers) + let output = handle.output + output.ostream.swrite(headers) if handle.canredir: var redir: bool - handle.ostream.sread(redir) + output.ostream.sread(redir) if redir: - let fd = SocketStream(handle.ostream).recvFileHandle() - handle.sostream = handle.ostream - handle.ostream = newPosixStream(fd) + let fd = SocketStream(output.ostream).recvFileHandle() + output.sostream = output.ostream + output.ostream = newPosixStream(fd) + output.clientFd = -1 + output.clientPid = -1 -proc sendData*(handle: LoaderHandle, p: pointer, nmemb: int): int = - return handle.ostream.sendData(p, nmemb) +proc sendData*(output: OutputHandle, p: pointer, nmemb: int): int = + return output.ostream.sendData(p, nmemb) proc close*(handle: LoaderHandle) = - assert handle.sostream == nil - if handle.ostream != nil: - handle.ostream.close() - handle.ostream = nil + for output in handle.outputs: + assert output.sostream == nil + if output.ostream != nil: + output.ostream.close() + output.ostream = nil if handle.istream != nil: handle.istream.close() handle.istream = nil diff --git a/src/loader/request.nim b/src/loader/request.nim index a91731bb..805345f3 100644 --- a/src/loader/request.nim +++ b/src/loader/request.nim @@ -80,6 +80,8 @@ type credentialsMode* {.jsget.}: CredentialsMode proxy*: URL #TODO do something with this canredir*: bool + clientFd*: int + clientPid*: int ReadableStream* = ref object of Stream isource*: Stream diff --git a/src/server/buffer.nim b/src/server/buffer.nim index e5cdf50d..851f04f1 100644 --- a/src/server/buffer.nim +++ b/src/server/buffer.nim @@ -919,7 +919,8 @@ proc clone*(buffer: Buffer, newurl: URL): Pid {.proxy.} = fds.add(fd) #TODO maybe we still have some data in sockets... we should probably split # this up to be executed after the main loop is finished... - buffer.loader.suspend(fds) + let parentPid = getpid() + buffer.loader.suspend(parentPid, fds) buffer.loader.addref() let pid = fork() if pid == -1: @@ -939,7 +940,7 @@ proc clone*(buffer: Buffer, newurl: URL): Pid {.proxy.} = for fd in buffer.loader.connecting.keys: cfds.add(fd) for fd in cfds: - let stream = SocketStream(buffer.loader.tee(fd)) + let stream = SocketStream(buffer.loader.tee(parentPid, fd)) var success: bool stream.sread(success) let sfd = int(stream.source.getFd()) @@ -957,7 +958,7 @@ proc clone*(buffer: Buffer, newurl: URL): Pid {.proxy.} = for fd in buffer.loader.ongoing.keys: ofds.add(fd) for fd in ofds: - let stream = SocketStream(buffer.loader.tee(fd)) + let stream = SocketStream(buffer.loader.tee(parentPid, fd)) var success: bool stream.sread(success) let sfd = int(stream.source.getFd()) @@ -997,7 +998,7 @@ proc clone*(buffer: Buffer, newurl: URL): Pid {.proxy.} = if needsPipe: let istrmp = newPosixStream(pipefd_write[1]) buffer.istream = newTeeStream(buffer.istream, istrmp) - buffer.loader.resume(fds) + buffer.loader.resume(parentPid, fds) return pid proc dispatchDOMContentLoadedEvent(buffer: Buffer) = |