diff options
author | bptato <nincsnevem662@gmail.com> | 2024-02-13 16:48:58 +0100 |
---|---|---|
committer | bptato <nincsnevem662@gmail.com> | 2024-02-13 16:48:58 +0100 |
commit | aadbaffac2430abf01a849b1f7bbfb154c27d229 (patch) | |
tree | ea88c5a91af1544dddd26d5d41e200dc4907936d /src/loader | |
parent | 492a9e7cf80ae8e26b03602b5996f471c5edf4be (diff) | |
download | chawan-aadbaffac2430abf01a849b1f7bbfb154c27d229.tar.gz |
loader: fixes & improvements
* factor out pushBuffer to make loadFromCache async * fix incorrect cache path * replace rewind with loadFromCache (it does the same thing except actually works) * remove rewindImpl callback, rewind in buffer instead
Diffstat (limited to 'src/loader')
-rw-r--r-- | src/loader/loader.nim | 169 | ||||
-rw-r--r-- | src/loader/loaderhandle.nim | 31 |
2 files changed, 79 insertions, 121 deletions
diff --git a/src/loader/loader.nim b/src/loader/loader.nim index 6183e771..31f5be10 100644 --- a/src/loader/loader.nim +++ b/src/loader/loader.nim @@ -81,7 +81,6 @@ type TEE SUSPEND RESUME - REWIND ADDREF UNREF SET_REFERRER_POLICY @@ -141,10 +140,10 @@ func findOutput(ctx: LoaderContext, id: StreamId): OutputHandle = return nil #TODO linear search over strings :( -func findCachedHandle(ctx: LoaderContext, cachepath: string): LoaderHandle = - assert cachepath != "" +func findCachedHandle(ctx: LoaderContext, cacheUrl: string): LoaderHandle = + assert cacheUrl != "" for it in ctx.handleMap.values: - if it.cached and it.cachepath == cachepath: + if it.cached and it.cacheUrl == cacheUrl: return it return nil @@ -153,6 +152,30 @@ proc delOutput(ctx: LoaderContext, id: StreamId) = if output != nil: ctx.outputMap.del(output.ostream.fd) +type PushBufferResult = enum + pbrDone, pbrUnregister + +# Either write data to the target output, or append it to the list of buffers to +# write and register the output in our selector. +proc pushBuffer(ctx: LoaderContext, output: OutputHandle, buffer: LoaderBuffer): + PushBufferResult = + if output.currentBuffer == nil: + var n = 0 + try: + n = output.ostream.sendData(buffer) + except ErrorAgain, ErrorWouldBlock: + discard + except ErrorBrokenPipe: + return pbrUnregister + if n < buffer.len: + output.currentBuffer = buffer + output.currentBufferIdx = n + ctx.selector.registerHandle(output.ostream.fd, {Write}, 0) + output.registered = true + else: + output.addBuffer(buffer) + return pbrDone + proc addFd(ctx: LoaderContext, handle: LoaderHandle, originalUrl: URL) = let output = handle.output output.ostream.setBlocking(false) @@ -174,9 +197,9 @@ proc addFd(ctx: LoaderContext, handle: LoaderHandle, originalUrl: URL) = let ps = newPosixStream(tmpf, O_CREAT or O_WRONLY, 0o600) if ps != nil: output.tee(ps, NullStreamId) - let path = $originalUrl - ctx.cacheMap[path] = tmpf - handle.cachepath = path + let surl = $originalUrl + ctx.cacheMap[surl] = tmpf + handle.cacheUrl = surl proc loadStream(ctx: LoaderContext, handle: LoaderHandle, request: Request) = ctx.passedFdMap.withValue(request.url.host, fdp): @@ -236,52 +259,47 @@ proc loadFromCache(ctx: LoaderContext, stream: SocketStream, request: Request) = let handle = newLoaderHandle(stream, request.canredir, request.clientId) let surl = $request.url let cachedHandle = ctx.findCachedHandle(surl) + let output = handle.output ctx.cacheMap.withValue(surl, p): let ps = newPosixStream(p[], O_RDONLY, 0) if ps == nil: handle.rejectHandle(ERROR_FILE_NOT_IN_CACHE) ctx.cacheMap.del(surl) + handle.close() return handle.sendResult(0) handle.sendStatus(200) handle.sendHeaders(newHeaders()) - var buffer {.noinit.}: array[BufferSize, uint8] - try: - while true: - let n = ps.recvData(addr buffer[0], buffer.len) - if buffer.len == 0: - break - if handle.output.sendData(addr buffer[0], n) < n: - break - if n < buffer.len: - break - except ErrorBrokenPipe: - handle.close() - raise + if handle.cached: + handle.cacheUrl = surl + output.ostream.setBlocking(false) + while true: + let buffer = newLoaderBuffer() + let n = ps.recvData(buffer) + if n == 0: + break + if ctx.pushBuffer(output, buffer) == pbrUnregister: + if output.registered: + ctx.selector.unregister(output.ostream.fd) + ps.close() + return + if n < buffer.cap: + break ps.close() do: if cachedHandle == nil: - handle.sendResult(ERROR_URL_NOT_IN_CACHE) + handle.rejectHandle(ERROR_URL_NOT_IN_CACHE) + return if cachedHandle != nil: # download is still ongoing; move output to the original handle - let output = handle.output - output.ostream.setBlocking(false) handle.outputs.setLen(0) output.parent = cachedHandle cachedHandle.outputs.add(output) + elif output.registered: + output.istreamAtEnd = true ctx.outputMap[output.ostream.fd] = output - if handle.outputs.len > 0: - let output = handle.output - if output.sostream != nil: - try: - handle.output.sostream.swrite(true) - except IOError: - # ignore error, that just means the buffer has already closed the - # stream - discard - output.sostream.close() - output.sostream = nil - handle.close() + else: + output.ostream.close() proc onLoad(ctx: LoaderContext, stream: SocketStream) = var request: Request @@ -317,33 +335,6 @@ proc onLoad(ctx: LoaderContext, stream: SocketStream) = ctx.outputMap[fd] = handle.output ctx.loadResource(request, handle) -proc rewind(ctx: LoaderContext, stream: PosixStream, clientId: StreamId) = - let output = ctx.findOutput(clientId) - if output == nil or output.ostream == nil: - stream.swrite(false) - return - let handle = output.parent - if not handle.cached: - stream.swrite(false) - return - assert handle.cachepath != "" - let ps = newPosixStream(handle.cachepath, O_RDONLY, 0) - if ps == nil: - stream.swrite(false) - return - stream.swrite(true) - output.ostream.setBlocking(true) #TODO - var buffer {.noinit.}: array[BufferSize, uint8] - while true: - let n = ps.recvData(addr buffer[0], BufferSize) - if n == 0: - break - if output.sendData(addr buffer[0], n) < n: - break - if n < BufferSize: - break - ps.close() - proc acceptConnection(ctx: LoaderContext) = let stream = ctx.ssock.acceptSocketStream() try: @@ -384,10 +375,6 @@ proc acceptConnection(ctx: LoaderContext) = # place the stream back into the selector, so we can write to it # again ctx.selector.registerHandle(output.ostream.fd, {Write}, 0) - of REWIND: - var targetId: StreamId - stream.sread(targetId) - ctx.rewind(stream, targetId) of ADDREF: inc ctx.refcount of UNREF: @@ -446,39 +433,19 @@ proc initLoaderContext(fd: cint, config: LoaderConfig): LoaderContext = dir &= '/' return ctx -# Either write data to the target output, or append it to the list of buffers to -# write and register the output in our selector. -proc pushBuffer(ctx: LoaderContext, handle: LoaderHandle, - buffer: LoaderBuffer, unregWrite: var seq[OutputHandle]) = - for output in handle.outputs: - if output.currentBuffer == nil: - var n = 0 - try: - n = output.sendData(addr buffer[0], buffer.len) - except ErrorAgain, ErrorWouldBlock: - discard - except ErrorBrokenPipe: - unregWrite.add(output) - break - if n < buffer.len: - output.currentBuffer = buffer - output.currentBufferIdx = n - ctx.selector.registerHandle(output.ostream.fd, {Write}, 0) - output.registered = true - else: - output.addBuffer(buffer) - # Called whenever there is more data available to read. proc handleRead(ctx: LoaderContext, handle: LoaderHandle, unregRead: var seq[LoaderHandle], unregWrite: var seq[OutputHandle]) = while true: let buffer = newLoaderBuffer() try: - buffer.len = handle.istream.recvData(addr buffer[0], buffer.cap) - if buffer.len == 0: + let n = handle.istream.recvData(buffer) + if n == 0: break - ctx.pushBuffer(handle, buffer, unregWrite) - if buffer.len < buffer.cap: + for output in handle.outputs: + if ctx.pushBuffer(output, buffer) == pbrUnregister: + unregWrite.add(output) + if n < buffer.cap: break except ErrorAgain, ErrorWouldBlock: # retry later break @@ -493,9 +460,7 @@ proc handleWrite(ctx: LoaderContext, output: OutputHandle, while output.currentBuffer != nil: let buffer = output.currentBuffer try: - let i = output.currentBufferIdx - assert buffer.len - i > 0 - let n = output.sendData(addr buffer[i], buffer.len - i) + let n = output.ostream.sendData(buffer, output.currentBufferIdx) output.currentBufferIdx += n if output.currentBufferIdx < buffer.len: break @@ -714,15 +679,6 @@ proc tee*(loader: FileLoader, targetId: StreamId): SocketStream = stream.swrite(clientId) return stream -proc rewind*(loader: FileLoader, fd: int): bool = - let stream = connectSocketStream(loader.process, false, blocking = true) - stream.swrite(REWIND) - let id: StreamId = (loader.clientPid, fd) - stream.swrite(id) - var res: bool - stream.sread(res) - return res - const BufferSize = 4096 proc handleHeaders(loader: FileLoader, request: Request, response: Response, @@ -816,10 +772,7 @@ proc doRequest*(loader: FileLoader, request: Request): Response = if response.res == 0: loader.handleHeaders(request, response, stream) else: - var msg: string - stream.sread(msg) - if msg != "": - response.internalMessage = msg + stream.sread(response.internalMessage) return response proc addref*(loader: FileLoader) = diff --git a/src/loader/loaderhandle.nim b/src/loader/loaderhandle.nim index fe8c6435..b0c5747e 100644 --- a/src/loader/loaderhandle.nim +++ b/src/loader/loaderhandle.nim @@ -17,7 +17,7 @@ const LoaderBufferPageSize = 4064 # 4096 - 32 type LoaderBufferObj = object page: ptr UncheckedArray[uint8] - len: int + len*: int LoaderBuffer* = ref LoaderBufferObj @@ -41,7 +41,7 @@ type canredir: bool outputs*: seq[OutputHandle] cached*: bool - cachepath*: string + cacheUrl*: string when defined(debug): url*: URL @@ -70,18 +70,9 @@ proc findOutputHandle*(handle: LoaderHandle, fd: int): OutputHandle = return output return nil -func `[]`*(buffer: LoaderBuffer, i: int): var uint8 {.inline.} = - return buffer[].page[i] - func cap*(buffer: LoaderBuffer): int {.inline.} = return LoaderBufferPageSize -func len*(buffer: LoaderBuffer): var int {.inline.} = - return buffer[].len - -proc `len=`*(buffer: LoaderBuffer, i: int) {.inline.} = - buffer[].len = i - proc newLoaderBuffer*(): LoaderBuffer = return LoaderBuffer( page: cast[ptr UncheckedArray[uint8]](alloc(LoaderBufferPageSize)), @@ -102,6 +93,14 @@ proc bufferCleared*(output: OutputHandle) = else: output.currentBuffer = nil +proc clearBuffers*(output: OutputHandle) = + if output.currentBuffer != nil: + output.currentBuffer = nil + output.currentBufferIdx = 0 + output.buffers.clear() + else: + assert output.buffers.len == 0 + proc tee*(outputIn: OutputHandle, ostream: PosixStream, clientId: StreamId) = outputIn.parent.outputs.add(OutputHandle( parent: outputIn.parent, @@ -139,8 +138,14 @@ proc sendHeaders*(handle: LoaderHandle, headers: Headers) = output.sostream = sostream output.ostream = newPosixStream(fd) -proc sendData*(output: OutputHandle, p: pointer, nmemb: int): int = - return output.ostream.sendData(p, nmemb) +proc recvData*(ps: PosixStream, buffer: LoaderBuffer): int {.inline.} = + let n = ps.recvData(addr buffer.page[0], buffer.cap) + buffer.len = n + return n + +proc sendData*(ps: PosixStream, buffer: LoaderBuffer, si = 0): int {.inline.} = + assert buffer.len - si > 0 + return ps.sendData(addr buffer.page[si], buffer.len - si) proc close*(handle: LoaderHandle) = for output in handle.outputs: |