diff options
author | bptato <nincsnevem662@gmail.com> | 2025-03-05 21:45:02 +0100 |
---|---|---|
committer | bptato <nincsnevem662@gmail.com> | 2025-03-05 21:51:31 +0100 |
commit | 747d8aeafe86421e1d6a4ee989d075be918b630c (patch) | |
tree | ae922fd9d7c72086b839c576c969491be032e94e /src | |
parent | dd522de024f19c864a6929a19a99837002eaaea0 (diff) | |
download | chawan-747d8aeafe86421e1d6a4ee989d075be918b630c.tar.gz |
loader: asyncify inputhandle status responses
I've also removed the rsBeforeStatus error check from iclose. I'm not sure if it's still needed at all, but if it is, then it was implemented in the wrong place.
Diffstat (limited to 'src')
-rw-r--r-- | src/io/packetwriter.nim | 21 | ||||
-rw-r--r-- | src/server/loader.nim | 297 |
2 files changed, 175 insertions, 143 deletions
diff --git a/src/io/packetwriter.nim b/src/io/packetwriter.nim index b6786ce0..6c99ae8a 100644 --- a/src/io/packetwriter.nim +++ b/src/io/packetwriter.nim @@ -13,9 +13,8 @@ import types/color import types/opt type PacketWriter* = object - stream: DynStream - buffer: seq[uint8] - bufLen: int + buffer*: seq[uint8] + bufLen*: int # file descriptors to send in the packet fds: seq[cint] @@ -40,31 +39,33 @@ proc sendFd*(w: var PacketWriter; fd: cint) = const InitLen = sizeof(int) * 2 const SizeInit = max(64, InitLen) -proc initWriter(stream: DynStream): PacketWriter = +proc initPacketWriter*(): PacketWriter = return PacketWriter( - stream: stream, buffer: newSeqUninitialized[uint8](SizeInit), bufLen: InitLen ) -proc flush*(w: var PacketWriter) = +proc writeSize*(w: var PacketWriter) = # subtract the length field's size let len = [w.bufLen - InitLen, w.fds.len] copyMem(addr w.buffer[0], unsafeAddr len[0], sizeof(len)) - if not w.stream.writeDataLoop(w.buffer.toOpenArray(0, w.bufLen - 1)): + +proc flush*(w: var PacketWriter; stream: DynStream) = + w.writeSize() + if not stream.writeDataLoop(w.buffer.toOpenArray(0, w.bufLen - 1)): raise newException(EOFError, "end of file") if w.fds.len > 0: w.fds.reverse() - let n = SocketStream(w.stream).sendMsg([0u8], w.fds) + let n = SocketStream(stream).sendMsg([0u8], w.fds) if n < 1: raise newException(EOFError, "end of file") w.bufLen = 0 w.fds.setLen(0) template withPacketWriter*(stream: DynStream; w, body: untyped) = - var w = stream.initWriter() + var w = initPacketWriter() body - w.flush() + w.flush(stream) proc writeData*(w: var PacketWriter; buffer: pointer; len: int) = let targetLen = w.bufLen + len diff --git a/src/server/loader.nim b/src/server/loader.nim index fb496189..57bbb25a 100644 --- a/src/server/loader.nim +++ b/src/server/loader.nim @@ -1,17 +1,17 @@ # A file loader server (?) -# The idea here is that we receive requests with a socket, then respond to each -# with a response (ideally a document.) -# For now, the protocol looks like: +# We receive various types of requests on a control socket, then respond +# to each with a response. In case of the "load" request, we return one +# half of a socket pair, and then send connection information before the +# response body so that the protocol looks like: # C: Request -# S: res (0 => success, _ => error) +# S: (packet 1) res (0 => success, _ => error) # if success: -# S: output ID -# S: status code -# S: headers -# C: resume +# S: (packet 1) output ID +# S: (packet 2) status code, headers +# C: resume (on control socket) # S: response body # else: -# S: error message +# S: (packet 1) error message # # The body is passed to the stream as-is, so effectively nothing can follow it. # @@ -19,9 +19,6 @@ # passed, it will *not* be cleaned up until a `resume' command is # received. (This allows for passing outputIds to the pager for later # addCacheFile commands there.) -# -# Note 2: We also have a separate control socket that can receive -# various messages, of which "load" is just one. import std/algorithm import std/deques @@ -163,6 +160,12 @@ type configdir*: string bookmark*: string + PushBufferResult = enum + pbrDone, pbrUnregister + +proc pushBuffer(ctx: var LoaderContext; output: OutputHandle; + buffer: LoaderBuffer; ignoreSuspension: bool): PushBufferResult + when defined(debug): func `$`*(buffer: LoaderBuffer): string = var s = newString(buffer.len) @@ -223,15 +226,21 @@ proc tee(outputIn: OutputHandle; ostream: PosixStream; outputId, pid: int): return output template output(handle: InputHandle): OutputHandle = + assert handle.outputs.len == 1 handle.outputs[0] -proc sendResult(handle: InputHandle; res: int; msg = "") = +template bufferFromWriter(w, body: untyped): LoaderBuffer = + var w = initPacketWriter() + body + w.writeSize() + LoaderBuffer(page: move(w.buffer), len: w.bufLen) + +proc sendResult(ctx: var LoaderContext; handle: InputHandle; res: int; + msg = ""): PushBufferResult = assert handle.rstate == rsBeforeResult - inc handle.rstate let output = handle.output - let blocking = output.stream.blocking - output.stream.setBlocking(true) - output.stream.withPacketWriter w: + inc handle.rstate + let buffer = bufferFromWriter w: w.swrite(res) if res == 0: # success assert msg == "" @@ -239,20 +248,19 @@ proc sendResult(handle: InputHandle; res: int; msg = "") = inc handle.rstate else: # error w.swrite(msg) - output.stream.setBlocking(blocking) + return ctx.pushBuffer(output, buffer, ignoreSuspension = true) -proc sendStatus(handle: InputHandle; status: uint16; headers: Headers) = +proc sendStatus(ctx: var LoaderContext; handle: InputHandle; status: uint16; + headers: Headers): PushBufferResult = assert handle.rstate == rsBeforeStatus inc handle.rstate - let blocking = handle.output.stream.blocking let contentLens = headers.getOrDefault("Content-Length") handle.startTime = getTime() handle.contentLen = parseUInt64(contentLens).get(uint64.high) - handle.output.stream.setBlocking(true) - handle.output.stream.withPacketWriter w: + let buffer = bufferFromWriter w: w.swrite(status) w.swrite(headers) - handle.output.stream.setBlocking(blocking) + return ctx.pushBuffer(handle.output, buffer, ignoreSuspension = true) proc writeData(ps: PosixStream; buffer: LoaderBuffer; si = 0): int {.inline.} = assert buffer.len - si > 0 @@ -261,17 +269,6 @@ proc writeData(ps: PosixStream; buffer: LoaderBuffer; si = 0): int {.inline.} = proc iclose(handle: InputHandle) = if handle.stream != nil: assert not handle.registered - if handle.rstate == rsBeforeStatus: - assert handle.outputs.len == 1 - # not an ideal solution, but better than silently eating malformed - # headers - handle.output.stream.setBlocking(true) - try: - handle.sendStatus(500, newHeaders()) - const msg = "Error: malformed header in CGI script" - discard handle.output.stream.writeData(msg) - except EOFError: - discard handle.stream.sclose() handle.stream = nil @@ -309,9 +306,13 @@ func canRewriteForCGICompat(ctx: LoaderContext; path: string): bool = return true return false -proc rejectHandle(handle: InputHandle; code: ConnectionError; msg = "") = - handle.sendResult(code, msg) - handle.close() +proc rejectHandle(ctx: var LoaderContext; handle: InputHandle; + code: ConnectionError; msg = "") = + case ctx.sendResult(handle, code, msg) + of pbrDone: discard + of pbrUnregister: + ctx.unregWrite.add(handle.output) + handle.output.dead = true iterator inputHandles(ctx: LoaderContext): InputHandle {.inline.} = for it in ctx.handleMap: @@ -346,9 +347,6 @@ func find(cacheMap: openArray[CachedItem]; id: int): int = return i -1 -type PushBufferResult = enum - pbrDone, pbrUnregister - proc register(ctx: var LoaderContext; handle: InputHandle) = assert not handle.registered ctx.pollData.register(handle.stream.fd, cshort(POLLIN)) @@ -381,37 +379,30 @@ proc unregister(ctx: var LoaderContext; client: ClientHandle) = ctx.pollData.unregister(int(client.stream.fd)) client.registered = false -# Either write data to the target output, or append it to the list of buffers to -# write and register the output in our selector. +# Either write data to the target output, or append it to the list of +# buffers to write and register the output in our selector. +# ignoreSuspension is meant to be used when sending the connection +# result and headers, which are sent irrespective of whether the handle +# is suspended or not. proc pushBuffer(ctx: var LoaderContext; output: OutputHandle; - buffer: LoaderBuffer; si: int): PushBufferResult = - if output.suspended: + buffer: LoaderBuffer; ignoreSuspension: bool): PushBufferResult = + if output.suspended and not ignoreSuspension: if output.currentBuffer == nil: output.currentBuffer = buffer - output.currentBufferIdx = si + output.currentBufferIdx = 0 else: - # si must be 0 here in all cases. Why? Well, it indicates the first unread - # position after reading headers, and at that point currentBuffer will - # be empty. - # - # Obviously, this breaks down if anything is pushed into the stream - # before the header parser destroys itself. For now it never does, so we - # should be fine. - doAssert si == 0 output.buffers.addLast(buffer) elif output.currentBuffer == nil: - var n = si - let m = output.stream.writeData(buffer, si) - if m < 0: + var n = output.stream.writeData(buffer) + if n < 0: let e = errno if e == EAGAIN or e == EWOULDBLOCK: - discard + n = 0 else: assert e == EPIPE, $strerror(e) return pbrUnregister else: - output.bytesSent += uint64(m) - n += m + output.bytesSent += uint64(n) if n < buffer.len: output.currentBuffer = buffer output.currentBufferIdx = n @@ -510,66 +501,75 @@ proc unset(ctx: var LoaderContext; handle: LoaderHandle) = ctx.handleMap[fd] = nil proc addFd(ctx: var LoaderContext; handle: InputHandle) = - let output = handle.output - output.stream.setBlocking(false) handle.stream.setBlocking(false) ctx.register(handle) ctx.put(handle) - ctx.put(output) type ControlResult = enum crDone, crContinue, crError -proc handleFirstLine(handle: InputHandle; line: string): ControlResult = +proc handleFirstLine(ctx: var LoaderContext; handle: InputHandle; line: string): + ControlResult = if line.startsWithIgnoreCase("HTTP/1.0") or line.startsWithIgnoreCase("HTTP/1.1"): let codes = line.until(' ', "HTTP/1.0 ".len) let code = parseUInt16(codes) if codes.len > 3 or code.isNone: - handle.sendResult(ceCGIMalformedHeader) + ctx.rejectHandle(handle, ceCGIMalformedHeader) return crError - handle.sendResult(0) # Success + case ctx.sendResult(handle, 0) # Success + of pbrDone: discard + of pbrUnregister: return crError handle.parser.status = code.get return crDone let k = line.until(':') if k.len == line.len: # invalid - handle.sendResult(ceCGIMalformedHeader) + ctx.rejectHandle(handle, ceCGIMalformedHeader) return crError let v = line.substr(k.len + 1).strip() if k.equalsIgnoreCase("Status"): - handle.sendResult(0) # success + case ctx.sendResult(handle, 0) # success + of pbrDone: discard + of pbrUnregister: return crError let code = parseUInt16(v) if v.len > 3 or code.isNone: - handle.sendResult(ceCGIMalformedHeader) + ctx.rejectHandle(handle, ceCGIMalformedHeader) return crError handle.parser.status = code.get return crContinue if k.equalsIgnoreCase("Cha-Control"): if v.startsWithIgnoreCase("Connected"): - handle.sendResult(0) # success + case ctx.sendResult(handle, 0) # success + of pbrDone: discard + of pbrUnregister: return crError return crContinue if v.startsWithIgnoreCase("ConnectionError"): let errs = v.split(' ') - var code = int32(ceCGIInvalidChaControl) + var code = ceCGIInvalidChaControl var message = "" if errs.len > 1: if (let x = parseInt32(errs[1]); x.isSome): + let n = x.get + if n > 0 and n <= int32(ConnectionError.high): + code = ConnectionError(x.get) + elif (let x = strictParseEnum[ConnectionError](errs[1]); + x.get(ceNone) != ceNone): code = x.get - elif (let x = strictParseEnum[ConnectionError](errs[1]); x.isSome): - code = int32(x.get) if errs.len > 2: message &= errs[2] for i in 3 ..< errs.len: message &= ' ' message &= errs[i] - handle.sendResult(code, message) + ctx.rejectHandle(handle, code, message) return crError if v.startsWithIgnoreCase("ControlDone"): return crDone - handle.sendResult(ceCGIInvalidChaControl) + ctx.rejectHandle(handle, ceCGIInvalidChaControl) return crError - handle.sendResult(0) # success + case ctx.sendResult(handle, 0) # success + of pbrDone: discard + of pbrUnregister: return crError handle.parser.headers.add(k, v) return crDone @@ -598,7 +598,8 @@ proc handleLine(handle: InputHandle; line: string) = let v = line.substr(k.len + 1).strip() handle.parser.headers.add(k, v) -proc parseHeaders0(handle: InputHandle; data: openArray[char]): int = +proc parseHeaders0(ctx: var LoaderContext; handle: InputHandle; + data: openArray[char]): int = let parser = handle.parser for i, c in data: template die = @@ -614,13 +615,17 @@ proc parseHeaders0(handle: InputHandle; data: openArray[char]): int = if parser.state == hpsBeforeLines: # body comes immediately, so we haven't had a chance to send result # yet. - handle.sendResult(0) - handle.sendStatus(parser.status, parser.headers) + case ctx.sendResult(handle, 0) + of pbrDone: discard + of pbrUnregister: die + let res = ctx.sendStatus(handle, parser.status, parser.headers) handle.parser = nil - return i + 1 # +1 to skip \n + return case res + of pbrDone: i + 1 # +1 to skip \n + of pbrUnregister: -1 case parser.state of hpsBeforeLines: - case handle.handleFirstLine(parser.lineBuffer) + case ctx.handleFirstLine(handle, parser.lineBuffer) of crDone: parser.state = hpsControlDone of crContinue: parser.state = hpsAfterFirstLine of crError: die @@ -636,17 +641,14 @@ proc parseHeaders0(handle: InputHandle; data: openArray[char]): int = parser.lineBuffer &= c return data.len -proc parseHeaders(handle: InputHandle; buffer: LoaderBuffer): int = - try: - if buffer == nil: - return handle.parseHeaders0(['\n']) - let p = cast[ptr UncheckedArray[char]](addr buffer.page[0]) - return handle.parseHeaders0(p.toOpenArray(0, buffer.len - 1)) - except EOFError: - handle.parser = nil - return -1 +proc parseHeaders(ctx: var LoaderContext; handle: InputHandle; + buffer: LoaderBuffer): int = + if buffer == nil: + return ctx.parseHeaders0(handle, ['\n']) + let p = cast[ptr UncheckedArray[char]](addr buffer.page[0]) + return ctx.parseHeaders0(handle, p.toOpenArray(0, buffer.len - 1)) -proc finishParse(handle: InputHandle) = +proc finishParse(ctx: var LoaderContext; handle: InputHandle) = if handle.cacheRef != nil: assert handle.cacheRef.offset == -1 let ps = newPosixStream(handle.cacheRef.path, O_RDONLY, 0) @@ -658,7 +660,7 @@ proc finishParse(handle: InputHandle) = if n <= 0: assert n == 0 or errno != EBADF break - let pn = handle.parseHeaders0(buffer.toOpenArray(0, n - 1)) + let pn = ctx.parseHeaders0(handle, buffer.toOpenArray(0, n - 1)) if pn == -1: break off += int64(pn) @@ -669,7 +671,7 @@ proc finishParse(handle: InputHandle) = ps.sclose() handle.cacheRef = nil if handle.parser != nil: - discard handle.parseHeaders(nil) + discard ctx.parseHeaders(handle, nil) type HandleReadResult = enum hrrDone, hrrUnregister, hrrBrokenPipe @@ -680,7 +682,7 @@ proc handleRead(ctx: var LoaderContext; handle: InputHandle; var unregs = 0 let maxUnregs = handle.outputs.len while true: - let buffer = newLoaderBuffer() + var buffer = newLoaderBuffer() let n = handle.stream.readData(buffer.page) if n < 0: let e = errno @@ -694,11 +696,22 @@ proc handleRead(ctx: var LoaderContext; handle: InputHandle; buffer.len = n var si = 0 if handle.parser != nil: - si = handle.parseHeaders(buffer) + si = ctx.parseHeaders(handle, buffer) if si == -1: # died while parsing headers; unregister return hrrUnregister if si == n: # parsed the entire buffer as headers; skip output handling continue + if si != 0: + # Some parts of the buffer have been consumed as headers; others + # must be passed on to the client. + # We *could* store si as an offset to the buffer, but it would + # make things much more complex. Let's just do this: + let nlen = buffer.len - si + let nbuffer = newLoaderBuffer(nlen) + nbuffer.len = nlen + copyMem(addr nbuffer.page[0], addr buffer.page[si], nbuffer.len) + buffer = nbuffer + assert nbuffer.len != 0, $si & ' ' & $buffer.len & " n " & $n else: handle.bytesSeen += uint64(n) #TODO stop reading if Content-Length exceeded @@ -706,7 +719,7 @@ proc handleRead(ctx: var LoaderContext; handle: InputHandle; if output.dead: # do not push to unregWrite candidates continue - case ctx.pushBuffer(output, buffer, si) + case ctx.pushBuffer(output, buffer, ignoreSuspension = false) of pbrUnregister: output.dead = true unregWrite.add(output) @@ -741,11 +754,9 @@ proc loadStreamRegular(ctx: var LoaderContext; elif cachedHandle != nil: output.parent = cachedHandle cachedHandle.outputs.add(output) - ctx.put(output) elif output.registered or output.suspended: output.parent = nil output.istreamAtEnd = true - ctx.put(output) else: assert output.stream.fd >= ctx.handleMap.len or ctx.handleMap[output.stream.fd] == nil @@ -856,15 +867,15 @@ proc loadCGI(ctx: var LoaderContext; client: ClientHandle; handle: InputHandle; let cpath = ctx.parseCGIPath(request) if cpath.cmd == "" or cpath.basename in ["", ".", ".."] or cpath.basename[0] == '~': - handle.sendResult(ceInvalidCGIPath) + ctx.rejectHandle(handle, ceInvalidCGIPath) return if not fileExists(cpath.cmd): - handle.sendResult(ceCGIFileNotFound) + ctx.rejectHandle(handle, ceCGIFileNotFound) return # Pipe the response body as stdout. var pipefd: array[2, cint] # child -> parent if pipe(pipefd) == -1: - handle.sendResult(ceFailedToSetUpCGI) + ctx.rejectHandle(handle, ceFailedToSetUpCGI) return let istreamOut = newPosixStream(pipefd[0]) # read by loader var ostreamOut = newPosixStream(pipefd[1]) # written by child @@ -877,7 +888,7 @@ proc loadCGI(ctx: var LoaderContext; client: ClientHandle; handle: InputHandle; # RDWR, otherwise mmap won't work ostreamOut = newPosixStream(tmpf, O_CREAT or O_RDWR, 0o600) if ostreamOut == nil: - handle.sendResult(ceCGIFailedToOpenCacheOutput) + ctx.rejectHandle(handle, ceCGIFailedToOpenCacheOutput) return let cacheId = handle.output.outputId # welp let item = CachedItem( @@ -898,24 +909,24 @@ proc loadCGI(ctx: var LoaderContext; client: ClientHandle; handle: InputHandle; var n: int (istream, n) = client.openCachedItem(request.body.cacheId) if istream == nil: - handle.sendResult(ceCGICachedBodyNotFound) + ctx.rejectHandle(handle, ceCGICachedBodyNotFound) return cachedHandle = ctx.findCachedHandle(request.body.cacheId) if cachedHandle != nil: # cached item still open, switch to streaming mode if client.cacheMap[n].offset == -1: - handle.sendResult(ceCGICachedBodyUnavailable) + ctx.rejectHandle(handle, ceCGICachedBodyUnavailable) return istream2 = istream elif request.body.t == rbtOutput: outputIn = ctx.findOutput(request.body.outputId, client) if outputIn == nil: - handle.sendResult(ceCGIOutputHandleNotFound) + ctx.rejectHandle(handle, ceCGIOutputHandleNotFound) return if request.body.t in {rbtString, rbtMultipart, rbtOutput} or request.body.t == rbtCache and istream2 != nil: var pipefdRead: array[2, cint] # parent -> child if pipe(pipefdRead) == -1: - handle.sendResult(ceFailedToSetUpCGI) + ctx.rejectHandle(handle, ceFailedToSetUpCGI) return istream = newPosixStream(pipefdRead[0]) ostream = newPosixStream(pipefdRead[1]) @@ -923,7 +934,7 @@ proc loadCGI(ctx: var LoaderContext; client: ClientHandle; handle: InputHandle; stderr.flushFile() let pid = fork() if pid == -1: - handle.sendResult(ceFailedToSetUpCGI) + ctx.rejectHandle(handle, ceFailedToSetUpCGI) elif pid == 0: istreamOut.sclose() # close read ostreamOut.moveFd(STDOUT_FILENO) # dup stdout @@ -1001,10 +1012,14 @@ proc loadStream(ctx: var LoaderContext; client: ClientHandle; handle: InputHandle; request: Request) = let i = client.findPassedFd(request.url.pathname) if i == -1: - handle.sendResult(ceFileNotFound, "stream not found") + ctx.rejectHandle(handle, ceFileNotFound, "stream not found") return - handle.sendResult(0) - handle.sendStatus(200, newHeaders()) + case ctx.sendResult(handle, 0) + of pbrDone: discard + of pbrUnregister: return + case ctx.sendStatus(handle, 200, newHeaders()) + of pbrDone: discard + of pbrUnregister: return let ps = client.passedFdMap[i].ps var stats: Stat doAssert fstat(ps.fd, stats) != -1 @@ -1027,35 +1042,52 @@ proc loadFromCache(ctx: var LoaderContext; client: ClientHandle; discard ps.seek(startFrom) handle.stream = ps if ps == nil: - handle.rejectHandle(ceFileNotInCache) + ctx.rejectHandle(handle, ceFileNotInCache) client.cacheMap.del(n) return - handle.sendResult(0) - handle.sendStatus(200, newHeaders()) + case ctx.sendResult(handle, 0) + of pbrDone: discard + of pbrUnregister: + client.cacheMap.del(n) + handle.close() + return + case ctx.sendStatus(handle, 200, newHeaders()) + of pbrDone: discard + of pbrUnregister: + client.cacheMap.del(n) + handle.close() + return handle.output.stream.setBlocking(false) let cachedHandle = ctx.findCachedHandle(id) ctx.loadStreamRegular(handle, cachedHandle) else: - handle.sendResult(ceURLNotInCache) + ctx.rejectHandle(handle, ceURLNotInCache) # Data URL handler. # Moved back into loader from CGI, because data URLs can get extremely long # and thus no longer fit into the environment. proc loadDataSend(ctx: var LoaderContext; handle: InputHandle; s, ct: string) = - handle.sendResult(0) - handle.sendStatus(200, newHeaders({"Content-Type": ct})) + case ctx.sendResult(handle, 0) + of pbrDone: discard + of pbrUnregister: + handle.close() + return + case ctx.sendStatus(handle, 200, newHeaders({"Content-Type": ct})) + of pbrDone: discard + of pbrUnregister: + handle.close() + return let output = handle.output if s.len == 0: if output.suspended: output.istreamAtEnd = true - ctx.put(output) else: output.oclose() return let buffer = newLoaderBuffer(s.len) buffer.len = s.len copyMem(addr buffer.page[0], unsafeAddr s[0], s.len) - case ctx.pushBuffer(output, buffer, 0) + case ctx.pushBuffer(output, buffer, ignoreSuspension = false) of pbrUnregister: if output.registered: ctx.unregister(output) @@ -1063,7 +1095,6 @@ proc loadDataSend(ctx: var LoaderContext; handle: InputHandle; s, ct: string) = of pbrDone: if output.registered or output.suspended: output.istreamAtEnd = true - ctx.put(output) else: output.oclose() @@ -1071,15 +1102,14 @@ proc loadData(ctx: var LoaderContext; handle: InputHandle; request: Request) = let url = request.url var ct = url.pathname.until(',') if AllChars - Ascii + Controls - {'\t'} in ct: - handle.sendResult(ceInvalidURL, "invalid data URL") - handle.close() + ctx.rejectHandle(handle, ceInvalidURL, "invalid data URL") return let sd = ct.len + 1 # data start let body = percentDecode(url.pathname.toOpenArray(sd, url.pathname.high)) if ct.endsWith(";base64"): var d: string if d.atob(body).isNone: - handle.rejectHandle(ceInvalidURL, "invalid data URL") + ctx.rejectHandle(handle, ceInvalidURL, "invalid data URL") return ct.setLen(ct.len - ";base64".len) # remove base64 indicator ctx.loadDataSend(handle, d, ct) @@ -1185,7 +1215,7 @@ proc loadAbout(ctx: var LoaderContext; handle: InputHandle; request: Request) = if request.httpMethod == hmPost: # OK/STOP/PAUSE/RESUME clicked if request.body.t != rbtString: - handle.rejectHandle(ceInvalidURL, "wat") + ctx.rejectHandle(handle, ceInvalidURL, "wat") return for it in ctx.parseDownloadActions(request.body.s): let dl = ctx.downloadList[it.n] @@ -1229,7 +1259,7 @@ proc loadAbout(ctx: var LoaderContext; handle: InputHandle; request: Request) = const body = staticRead"res/license.md" ctx.loadDataSend(handle, body, "text/markdown") else: - handle.rejectHandle(ceInvalidURL, "invalid download URL") + ctx.rejectHandle(handle, ceInvalidURL, "invalid download URL") proc loadResource(ctx: var LoaderContext; client: ClientHandle; config: LoaderClientConfig; request: Request; handle: InputHandle) = @@ -1263,7 +1293,6 @@ proc loadResource(ctx: var LoaderContext; client: ClientHandle; of "cache": ctx.loadFromCache(client, handle, request) assert handle.stream == nil - handle.close() of "data": ctx.loadData(handle, request) of "about": @@ -1275,11 +1304,11 @@ proc loadResource(ctx: var LoaderContext; client: ClientHandle; inc tries redo = true of ummrWrongURL: - handle.rejectHandle(ceInvalidURIMethodEntry) + ctx.rejectHandle(handle, ceInvalidURIMethodEntry) of ummrNotFound: - handle.rejectHandle(ceUnknownScheme) + ctx.rejectHandle(handle, ceUnknownScheme) if tries >= MaxRewrites: - handle.rejectHandle(ceTooManyRewrites) + ctx.rejectHandle(handle, ceTooManyRewrites) proc setupRequestDefaults(request: Request; config: LoaderClientConfig) = for k, v in config.defaultHeaders.table: @@ -1309,12 +1338,14 @@ proc load(ctx: var LoaderContext; stream: SocketStream; request: Request; if not fail: discard close(sv[1]) let stream = newSocketStream(sv[0]) + stream.setBlocking(false) let handle = newInputHandle(stream, ctx.getOutputId(), client.pid) + ctx.put(handle.output) when defined(debug): handle.url = request.url handle.output.url = request.url if not config.filter.match(request.url): - handle.rejectHandle(ceDisallowedURL) + ctx.rejectHandle(handle, ceDisallowedURL) else: request.setupRequestDefaults(config) ctx.loadResource(client, config, request, handle) @@ -1403,14 +1434,14 @@ proc addCacheFile(ctx: var LoaderContext; stream: SocketStream; w.swrite(id) proc redirectToFile(ctx: var LoaderContext; stream: SocketStream; - r: var PacketReader) = + client: ClientHandle; r: var PacketReader) = var outputId: int var targetPath: string var displayUrl: string r.sread(outputId) r.sread(targetPath) r.sread(displayUrl) - let output = ctx.findOutput(outputId, ctx.pagerClient) + let output = ctx.findOutput(outputId, client) var success = false if output != nil: var fileOutput: OutputHandle @@ -1599,7 +1630,7 @@ proc readCommand(ctx: var LoaderContext; client: ClientHandle) = ctx.openCachedItem(stream, client, r) of lcRedirectToFile: privileged_command - ctx.redirectToFile(stream, r) + ctx.redirectToFile(stream, client, r) of lcLoadConfig: privileged_command ctx.loadConfig(stream, client, r) @@ -1677,7 +1708,7 @@ proc finishCycle(ctx: var LoaderContext) = ctx.unregister(handle) ctx.unset(handle) if handle.parser != nil: - handle.finishParse() + ctx.finishParse(handle) handle.iclose() for output in handle.outputs: output.istreamAtEnd = true @@ -1698,7 +1729,7 @@ proc finishCycle(ctx: var LoaderContext) = ctx.unregister(handle) ctx.unset(handle) if handle.parser != nil: - handle.finishParse() + ctx.finishParse(handle) handle.iclose() for client in ctx.unregClient: if client.stream != nil: |