diff options
author | bptato <nincsnevem662@gmail.com> | 2024-09-14 15:58:52 +0200 |
---|---|---|
committer | bptato <nincsnevem662@gmail.com> | 2024-09-14 16:56:46 +0200 |
commit | 5b2a36579e53c69f154288a91ddc3e7c5375d7a6 (patch) | |
tree | 6b81b21aa15ef26031ce653ea972f75e00b1df0e | |
parent | dfa1a4bc5ece3b8c333b9a47bab038ff6a162f5b (diff) | |
download | chawan-5b2a36579e53c69f154288a91ddc3e7c5375d7a6.tar.gz |
loader: refactor, misc optimizations & fixes
* factor out input/output handle tables; use a seq instead * add possibility to directly open cached items onto stdin (mainly an optimization for reading images, which are always cached) * close used handles on local CGI execution * make clone during load work again
-rw-r--r-- | src/html/dom.nim | 21 | ||||
-rw-r--r-- | src/loader/cgi.nim | 64 | ||||
-rw-r--r-- | src/loader/loader.nim | 281 | ||||
-rw-r--r-- | src/loader/loaderhandle.nim | 94 | ||||
-rw-r--r-- | src/loader/request.nim | 7 | ||||
-rw-r--r-- | src/local/client.nim | 17 | ||||
-rw-r--r-- | src/local/container.nim | 6 | ||||
-rw-r--r-- | src/local/pager.nim | 32 | ||||
-rw-r--r-- | src/server/buffer.nim | 78 |
9 files changed, 334 insertions, 266 deletions
diff --git a/src/html/dom.nim b/src/html/dom.nim index b31802bc..146056a9 100644 --- a/src/html/dom.nim +++ b/src/html/dom.nim @@ -4565,6 +4565,7 @@ proc getContext*(jctx: JSContext; this: HTMLCanvasElement; contextId: string; # backwards compat, but I don't care. proc toBlob(ctx: JSContext; this: HTMLCanvasElement; callback: JSValue; contentType = "image/png"; quality = none(float64)) {.jsfunc.} = + let contentType = contentType.toLowerAscii() if not contentType.startsWith("image/") or this.bitmap.cacheId == 0: return let url0 = newURL("img-codec+" & contentType.after('/') & ":encode") @@ -4582,21 +4583,11 @@ proc toBlob(ctx: JSContext; this: HTMLCanvasElement; callback: JSValue; let callback = JS_DupValue(ctx, callback) let window = this.document.window let loader = window.loader - let contentType = contentType.toLowerAscii() - let cacheReq = newRequest(newURL("cache:" & $this.bitmap.cacheId).get) - loader.fetch(cacheReq).then(proc(res: JSResult[Response]): FetchPromise = - if res.isNone: - return newResolvedPromise(res) - let res = res.get - let p = loader.fetch(newRequest( - newURL("img-codec+x-cha-canvas:decode").get, - httpMethod = hmPost, - body = RequestBody(t: rbtOutput, outputId: res.outputId) - )) - res.resume() - res.close() - return p - ).then(proc(res: JSResult[Response]): FetchPromise = + loader.fetch(newRequest( + newURL("img-codec+x-cha-canvas:decode").get, + httpMethod = hmPost, + body = RequestBody(t: rbtCache, cacheId: this.bitmap.cacheId) + )).then(proc(res: JSResult[Response]): FetchPromise = if res.isNone: return newResolvedPromise(res) let res = res.get diff --git a/src/loader/cgi.nim b/src/loader/cgi.nim index e5fa33c4..7e294eae 100644 --- a/src/loader/cgi.nim +++ b/src/loader/cgi.nim @@ -59,7 +59,7 @@ proc setupEnv(cmd, scriptName, pathInfo, requestURI, myDir: string; type ControlResult = enum crDone, crContinue, crError -proc handleFirstLine(handle: LoaderHandle; line: string; headers: Headers; +proc handleFirstLine(handle: InputHandle; line: string; headers: Headers; status: var uint16): ControlResult = let k = line.until(':') if k.len == line.len: @@ -98,7 +98,7 @@ proc handleFirstLine(handle: LoaderHandle; line: string; headers: Headers; headers.add(k, v) return crDone -proc handleControlLine(handle: LoaderHandle; line: string; headers: Headers; +proc handleControlLine(handle: InputHandle; line: string; headers: Headers; status: var uint16): ControlResult = let k = line.until(':') if k.len == line.len: @@ -116,7 +116,7 @@ proc handleControlLine(handle: LoaderHandle; line: string; headers: Headers; return crDone # returns false if transfer was interrupted -proc handleLine(handle: LoaderHandle; line: string; headers: Headers) = +proc handleLine(handle: InputHandle; line: string; headers: Headers) = let k = line.until(':') if k.len == line.len: # invalid @@ -124,8 +124,9 @@ proc handleLine(handle: LoaderHandle; line: string; headers: Headers) = let v = line.substr(k.len + 1).strip() headers.add(k, v) -proc loadCGI*(handle: LoaderHandle; request: Request; cgiDir: seq[string]; - prevURL: URL; insecureSSLNoVerify: bool; ostream: var PosixStream) = +proc loadCGI*(handle: InputHandle; request: Request; cgiDir: seq[string]; + prevURL: URL; insecureSSLNoVerify: bool; handleMap: openArray[LoaderHandle]; + istream: PosixStream; ostream: var PosixStream) = if cgiDir.len == 0: handle.sendResult(ERROR_NO_CGI_DIR) return @@ -180,7 +181,7 @@ proc loadCGI*(handle: LoaderHandle; request: Request; cgiDir: seq[string]; return # Pipe the request body as stdin for POST. var pipefd_read: array[0..1, cint] # parent -> child - if request.body.t != rbtNone: + if request.body.t notin {rbtNone, rbtCache}: if pipe(pipefd_read) == -1: handle.sendResult(ERROR_FAIL_SETUP_CGI) return @@ -193,7 +194,11 @@ proc loadCGI*(handle: LoaderHandle; request: Request; cgiDir: seq[string]; elif pid == 0: discard close(pipefd[0]) # close read discard dup2(pipefd[1], 1) # dup stdout - if request.body.t != rbtNone: + discard close(pipefd[1]) + if istream != nil: # cached input (file) + discard dup2(istream.fd, 0) # dup stdin + istream.sclose() + elif request.body.t notin {rbtNone, rbtCache}: discard close(pipefd_read[1]) # close write if pipefd_read[0] != 0: discard dup2(pipefd_read[0], 0) # dup stdin @@ -207,6 +212,10 @@ proc loadCGI*(handle: LoaderHandle; request: Request; cgiDir: seq[string]; # expects SIGCHLD to be untouched. (e.g. git dies a horrible death with # SIGCHLD as SIG_IGN) signal(SIGCHLD, SIG_DFL) + # close the parent handles + for i in 0 ..< handleMap.len: + if handleMap[i] != nil: + discard close(cint(i)) discard execl(cstring(cmd), cstring(basename), nil) let code = int(ERROR_FAILED_TO_EXECUTE_CGI_SCRIPT) stdout.write("Cha-Control: ConnectionError " & $code & " " & @@ -214,26 +223,29 @@ proc loadCGI*(handle: LoaderHandle; request: Request; cgiDir: seq[string]; quit(1) else: discard close(pipefd[1]) # close write - if request.body.t != rbtNone: + var ps: PosixStream = nil + if request.body.t notin {rbtNone, rbtCache}: discard close(pipefd_read[0]) # close read - let ps = newPosixStream(pipefd_read[1]) - case request.body.t - of rbtString: - ps.write(request.body.s) - ps.sclose() - of rbtMultipart: - let boundary = request.body.multipart.boundary - for entry in request.body.multipart.entries: - ps.writeEntry(entry, boundary) - ps.writeEnd(boundary) - ps.sclose() - of rbtOutput: - ostream = ps - of rbtNone: discard + ps = newPosixStream(pipefd_read[1]) + case request.body.t + of rbtString: + ps.write(request.body.s) + ps.sclose() + of rbtMultipart: + let boundary = request.body.multipart.boundary + for entry in request.body.multipart.entries: + ps.writeEntry(entry, boundary) + ps.writeEnd(boundary) + ps.sclose() + of rbtOutput: + ostream = ps + of rbtCache: + istream.sclose() + of rbtNone: discard handle.parser = HeaderParser(headers: newHeaders()) - handle.istream = newPosixStream(pipefd[0]) + handle.stream = newPosixStream(pipefd[0]) -proc parseHeaders0(handle: LoaderHandle; buffer: LoaderBuffer): int = +proc parseHeaders0(handle: InputHandle; buffer: LoaderBuffer): int = let parser = handle.parser var s = parser.lineBuffer let L = if buffer == nil: 1 else: buffer.len @@ -280,12 +292,12 @@ proc parseHeaders0(handle: LoaderHandle; buffer: LoaderBuffer): int = parser.lineBuffer = s return L -proc parseHeaders*(handle: LoaderHandle; buffer: LoaderBuffer): int = +proc parseHeaders*(handle: InputHandle; buffer: LoaderBuffer): int = try: return handle.parseHeaders0(buffer) except ErrorBrokenPipe: handle.parser = nil return -1 -proc finishParse*(handle: LoaderHandle) = +proc finishParse*(handle: InputHandle) = discard handle.parseHeaders(nil) diff --git a/src/loader/loader.nim b/src/loader/loader.nim index c882c227..0816c6f8 100644 --- a/src/loader/loader.nim +++ b/src/loader/loader.nim @@ -60,8 +60,8 @@ type key*: ClientKey process*: int clientPid*: int - connecting*: Table[int, ConnectData] - ongoing*: Table[int, Response] + map*: seq[LoaderData] + mapFds*: int # number of fds in map unregistered*: seq[int] registerFun*: proc(fd: int) unregisterFun*: proc(fd: int) @@ -73,16 +73,21 @@ type ConnectDataState = enum cdsBeforeResult, cdsBeforeStatus, cdsBeforeHeaders - ConnectData = ref object + LoaderData = ref object of RootObj + stream*: SocketStream + + ConnectData* = ref object of LoaderData state: ConnectDataState status: uint16 res: int outputId: int redirectNum: int promise: Promise[JSResult[Response]] - stream*: SocketStream request: Request + OngoingData* = ref object of LoaderData + response*: Response + LoaderCommand = enum lcAddCacheFile lcAddClient @@ -119,8 +124,7 @@ type ssock: ServerSocket alive: bool config: LoaderConfig - handleMap: Table[int, LoaderHandle] - outputMap: Table[int, OutputHandle] + handleMap: seq[LoaderHandle] selector: Selector[int] # List of existing clients (buffer or pager) that may make requests. clientData: Table[int, ClientData] # pid -> data @@ -158,22 +162,32 @@ func canRewriteForCGICompat(ctx: LoaderContext; path: string): bool = return true return false -proc rejectHandle(handle: LoaderHandle; code: ConnectErrorCode; msg = "") = +proc rejectHandle(handle: InputHandle; code: ConnectErrorCode; msg = "") = handle.sendResult(code, msg) handle.close() +iterator inputHandles(ctx: LoaderContext): InputHandle {.inline.} = + for it in ctx.handleMap: + if it != nil and it of InputHandle: + yield InputHandle(it) + +iterator outputHandles(ctx: LoaderContext): OutputHandle {.inline.} = + for it in ctx.handleMap: + if it != nil and it of OutputHandle: + yield OutputHandle(it) + func findOutput(ctx: LoaderContext; id: int; client: ClientData): OutputHandle = assert id != -1 - for it in ctx.outputMap.values: + for it in ctx.outputHandles: if it.outputId == id: # verify that it's safe to access this handle. doAssert ctx.isPrivileged(client) or client.pid == it.ownerPid return it return nil -func findCachedHandle(ctx: LoaderContext; cacheId: int): LoaderHandle = +func findCachedHandle(ctx: LoaderContext; cacheId: int): InputHandle = assert cacheId != -1 - for it in ctx.handleMap.values: + for it in ctx.inputHandles: if it.cacheId == cacheId: return it return nil @@ -181,19 +195,19 @@ func findCachedHandle(ctx: LoaderContext; cacheId: int): LoaderHandle = type PushBufferResult = enum pbrDone, pbrUnregister -proc register(ctx: LoaderContext; handle: LoaderHandle) = +proc register(ctx: LoaderContext; handle: InputHandle) = assert not handle.registered - ctx.selector.registerHandle(int(handle.istream.fd), {Read}, 0) + ctx.selector.registerHandle(int(handle.stream.fd), {Read}, 0) handle.registered = true -proc unregister(ctx: LoaderContext; handle: LoaderHandle) = +proc unregister(ctx: LoaderContext; handle: InputHandle) = assert handle.registered - ctx.selector.unregister(int(handle.istream.fd)) + ctx.selector.unregister(int(handle.stream.fd)) handle.registered = false proc register(ctx: LoaderContext; output: OutputHandle) = assert not output.registered - ctx.selector.registerHandle(int(output.ostream.fd), {Write}, 0) + ctx.selector.registerHandle(int(output.stream.fd), {Write}, 0) output.registered = true const bsdPlatform = defined(macosx) or defined(freebsd) or defined(netbsd) or @@ -202,7 +216,7 @@ proc unregister(ctx: LoaderContext; output: OutputHandle) = assert output.registered # so kqueue-based selectors raise when we try to unregister a pipe whose # reader is at EOF. "solution": clean up this mess ourselves. - let fd = int(output.ostream.fd) + let fd = int(output.stream.fd) when bsdPlatform: let oc = ctx.selector.count try: @@ -240,7 +254,7 @@ proc pushBuffer(ctx: LoaderContext; output: OutputHandle; buffer: LoaderBuffer; elif output.currentBuffer == nil: var n = si try: - n += output.ostream.sendData(buffer, si) + n += output.stream.sendData(buffer, si) except ErrorAgain: discard except ErrorBrokenPipe: @@ -282,7 +296,7 @@ proc redirectToFile(ctx: LoaderContext; output: OutputHandle; elif output.parent != nil: output.parent.outputs.add(OutputHandle( parent: output.parent, - ostream: ps, + stream: ps, istreamAtEnd: output.istreamAtEnd, outputId: ctx.getOutputId() )) @@ -302,28 +316,38 @@ proc addCacheFile(ctx: LoaderContext; client: ClientData; output: OutputHandle): return cacheId return -1 -proc addFd(ctx: LoaderContext; handle: LoaderHandle) = +proc put(ctx: LoaderContext; handle: LoaderHandle) = + let fd = int(handle.stream.fd) + if ctx.handleMap.len <= fd: + ctx.handleMap.setLen(fd + 1) + assert ctx.handleMap[fd] == nil + ctx.handleMap[fd] = handle + +proc unset(ctx: LoaderContext; handle: LoaderHandle) = + let fd = int(handle.stream.fd) + if fd < ctx.handleMap.len: + ctx.handleMap[fd] = nil + +proc addFd(ctx: LoaderContext; handle: InputHandle) = let output = handle.output - output.ostream.setBlocking(false) - handle.istream.setBlocking(false) + output.stream.setBlocking(false) + handle.stream.setBlocking(false) ctx.register(handle) - assert handle.istream.fd notin ctx.handleMap - assert output.ostream.fd notin ctx.outputMap - ctx.handleMap[handle.istream.fd] = handle - ctx.outputMap[output.ostream.fd] = output + ctx.put(handle) + ctx.put(output) type HandleReadResult = enum hrrDone, hrrUnregister, hrrBrokenPipe # Called whenever there is more data available to read. -proc handleRead(ctx: LoaderContext; handle: LoaderHandle; +proc handleRead(ctx: LoaderContext; handle: InputHandle; unregWrite: var seq[OutputHandle]): HandleReadResult = var unregs = 0 let maxUnregs = handle.outputs.len while true: let buffer = newLoaderBuffer() try: - let n = handle.istream.recvData(buffer) + let n = handle.stream.recvData(buffer) if n == 0: # EOF return hrrUnregister var si = 0 @@ -356,9 +380,9 @@ proc handleRead(ctx: LoaderContext; handle: LoaderHandle; # stream is a regular file, so we can't select on it. # cachedHandle is used for attaching the output handle to a different -# LoaderHandle when loadFromCache is called while a download is still ongoing +# InputHandle when loadFromCache is called while a download is still ongoing # (and thus some parts of the document are not cached yet). -proc loadStreamRegular(ctx: LoaderContext; handle, cachedHandle: LoaderHandle) = +proc loadStreamRegular(ctx: LoaderContext; handle, cachedHandle: InputHandle) = assert handle.parser == nil # parser is only used with CGI var unregWrite: seq[OutputHandle] = @[] let r = ctx.handleRead(handle, unregWrite) @@ -374,18 +398,19 @@ proc loadStreamRegular(ctx: LoaderContext; handle, cachedHandle: LoaderHandle) = elif cachedHandle != nil: output.parent = cachedHandle cachedHandle.outputs.add(output) - ctx.outputMap[output.ostream.fd] = output + ctx.put(output) elif output.registered or output.suspended: output.parent = nil output.istreamAtEnd = true - ctx.outputMap[output.ostream.fd] = output + ctx.put(output) else: - assert output.ostream.fd notin ctx.outputMap + assert output.stream.fd < ctx.handleMap.len or + ctx.handleMap[output.stream.fd] == nil output.oclose() handle.outputs.setLen(0) handle.iclose() -proc loadStream(ctx: LoaderContext; client: ClientData; handle: LoaderHandle; +proc loadStream(ctx: LoaderContext; client: ClientData; handle: InputHandle; request: Request) = client.passedFdMap.withValue(request.url.pathname, fdp): handle.sendResult(0) @@ -394,12 +419,12 @@ proc loadStream(ctx: LoaderContext; client: ClientData; handle: LoaderHandle; let ps = newPosixStream(fdp[]) var stats: Stat doAssert fstat(fdp[], stats) != -1 - handle.istream = ps + handle.stream = ps client.passedFdMap.del(request.url.pathname) if S_ISCHR(stats.st_mode) or S_ISREG(stats.st_mode): # regular file: e.g. cha <file # or character device: e.g. cha </dev/null - handle.output.ostream.setBlocking(false) + handle.output.stream.setBlocking(false) # not loading from cache, so cachedHandle is nil ctx.loadStreamRegular(handle, nil) do: @@ -411,19 +436,24 @@ func find(cacheMap: seq[CachedItem]; id: int): int = return i -1 -proc loadFromCache(ctx: LoaderContext; client: ClientData; handle: LoaderHandle; +proc openCachedItem(client: ClientData; id: int): (PosixStream, int) = + let n = client.cacheMap.find(id) + if n != -1: + return (newPosixStream(client.cacheMap[n].path, O_RDONLY, 0), n) + return (nil, -1) + +proc loadFromCache(ctx: LoaderContext; client: ClientData; handle: InputHandle; request: Request) = let id = parseInt32(request.url.pathname).get(-1) let startFrom = if request.url.query.isSome: parseInt32(request.url.query.get).get(0) else: 0 - let n = client.cacheMap.find(id) - if n != -1: - let ps = newPosixStream(client.cacheMap[n].path, O_RDONLY, 0) + let (ps, n) = client.openCachedItem(id) + if ps != nil: if startFrom != 0: ps.seek(startFrom) - handle.istream = ps + handle.stream = ps if ps == nil: handle.rejectHandle(ERROR_FILE_NOT_IN_CACHE) client.cacheMap.del(n) @@ -431,7 +461,7 @@ proc loadFromCache(ctx: LoaderContext; client: ClientData; handle: LoaderHandle; handle.sendResult(0) handle.sendStatus(200) handle.sendHeaders(newHeaders()) - handle.output.ostream.setBlocking(false) + handle.output.stream.setBlocking(false) let cachedHandle = ctx.findCachedHandle(id) ctx.loadStreamRegular(handle, cachedHandle) else: @@ -440,7 +470,7 @@ proc loadFromCache(ctx: LoaderContext; client: ClientData; handle: LoaderHandle; # Data URL handler. # Moved back into loader from CGI, because data URLs can get extremely long # and thus no longer fit into the environment. -proc loadDataSend(ctx: LoaderContext; handle: LoaderHandle; s, ct: string) = +proc loadDataSend(ctx: LoaderContext; handle: InputHandle; s, ct: string) = handle.sendResult(0) handle.sendStatus(200) handle.sendHeaders(newHeaders({"Content-Type": ct})) @@ -448,7 +478,7 @@ proc loadDataSend(ctx: LoaderContext; handle: LoaderHandle; s, ct: string) = if s.len == 0: if output.suspended: output.istreamAtEnd = true - ctx.outputMap[output.ostream.fd] = output + ctx.put(output) else: output.oclose() return @@ -463,11 +493,11 @@ proc loadDataSend(ctx: LoaderContext; handle: LoaderHandle; s, ct: string) = of pbrDone: if output.registered or output.suspended: output.istreamAtEnd = true - ctx.outputMap[output.ostream.fd] = output + ctx.put(output) else: output.oclose() -proc loadData(ctx: LoaderContext; handle: LoaderHandle; request: Request) = +proc loadData(ctx: LoaderContext; handle: InputHandle; request: Request) = let url = request.url var ct = url.path.s.until(',') if AllChars - Ascii + Controls - {'\t', ' '} in ct: @@ -488,7 +518,7 @@ proc loadData(ctx: LoaderContext; handle: LoaderHandle; request: Request) = ctx.loadDataSend(handle, body, ct) proc loadResource(ctx: LoaderContext; client: ClientData; - config: LoaderClientConfig; request: Request; handle: LoaderHandle) = + config: LoaderClientConfig; request: Request; handle: InputHandle) = var redo = true var tries = 0 var prevurl: URL = nil @@ -504,16 +534,20 @@ proc loadResource(ctx: LoaderContext; client: ClientData; redo = true continue if request.url.scheme == "cgi-bin": - var ostream: PosixStream = nil + var istream: PosixStream = nil # for rbtCache + var ostream: PosixStream = nil # for rbtOutput + if request.body.t == rbtCache: + var n: int + (istream, n) = client.openCachedItem(request.body.cacheId) handle.loadCGI(request, ctx.config.cgiDir, prevurl, - config.insecureSSLNoVerify, ostream) - if handle.istream != nil: + config.insecureSSLNoVerify, ctx.handleMap, istream, ostream) + if handle.stream != nil: if ostream != nil: let outputIn = ctx.findOutput(request.body.outputId, client) if outputIn != nil: ostream.setBlocking(false) let output = outputIn.tee(ostream, ctx.getOutputId(), client.pid) - ctx.outputMap[ostream.fd] = output + ctx.put(output) output.suspended = false if not output.isEmpty: ctx.register(output) @@ -525,13 +559,13 @@ proc loadResource(ctx: LoaderContext; client: ClientData; handle.close() elif request.url.scheme == "stream": ctx.loadStream(client, handle, request) - if handle.istream != nil: + if handle.stream != nil: ctx.addFd(handle) else: handle.close() elif request.url.scheme == "cache": ctx.loadFromCache(client, handle, request) - assert handle.istream == nil + assert handle.stream == nil handle.close() elif request.url.scheme == "data": ctx.loadData(handle, request) @@ -564,7 +598,7 @@ proc setupRequestDefaults(request: Request; config: LoaderClientConfig) = proc load(ctx: LoaderContext; stream: SocketStream; request: Request; client: ClientData; config: LoaderClientConfig) = - let handle = newLoaderHandle(stream, ctx.getOutputId(), client.pid) + let handle = newInputHandle(stream, ctx.getOutputId(), client.pid) when defined(debug): handle.url = request.url handle.output.url = request.url @@ -720,7 +754,7 @@ proc tee(ctx: LoaderContext; stream: SocketStream; client: ClientData; if outputIn != nil: let id = ctx.getOutputId() let output = outputIn.tee(stream, id, targetPid) - ctx.outputMap[output.ostream.fd] = output + ctx.put(output) stream.withPacketWriter w: w.swrite(id) stream.setBlocking(false) @@ -821,7 +855,7 @@ proc acceptConnection(ctx: LoaderContext) = ctx.resume(stream, client, r) except ErrorBrokenPipe: # receiving end died while reading the file; give up. - assert stream.fd notin ctx.outputMap + assert stream.fd >= ctx.handleMap.len or ctx.handleMap[stream.fd] == nil stream.sclose() proc exitLoader(ctx: LoaderContext) = @@ -843,6 +877,9 @@ proc initLoaderContext(fd: cint; config: LoaderConfig): LoaderContext = ctx.ssock = initServerSocket(config.sockdir, -1, myPid, blocking = true) let sfd = int(ctx.ssock.sock.getFd()) ctx.selector.registerHandle(sfd, {Read}, 0) + if ctx.handleMap.len <= sfd: + ctx.handleMap.setLen(sfd + 1) + ctx.handleMap[sfd] = LoaderHandle() # pseudo handle # The server has been initialized, so the main process can resume execution. let ps = newPosixStream(fd) ps.write(char(0u8)) @@ -897,7 +934,7 @@ proc handleWrite(ctx: LoaderContext; output: OutputHandle; while output.currentBuffer != nil: let buffer = output.currentBuffer try: - let n = output.ostream.sendData(buffer, output.currentBufferIdx) + let n = output.stream.sendData(buffer, output.currentBufferIdx) output.currentBufferIdx += n if output.currentBufferIdx < buffer.len: break @@ -915,16 +952,16 @@ proc handleWrite(ctx: LoaderContext; output: OutputHandle; # all buffers sent, no need to select on this output again for now ctx.unregister(output) -proc finishCycle(ctx: LoaderContext; unregRead: var seq[LoaderHandle]; +proc finishCycle(ctx: LoaderContext; unregRead: var seq[InputHandle]; unregWrite: var seq[OutputHandle]) = # Unregister handles queued for unregistration. # It is possible for both unregRead and unregWrite to contain duplicates. To # avoid double-close/double-unregister, we set the istream/ostream of # unregistered handles to nil. for handle in unregRead: - if handle.istream != nil: + if handle.stream != nil: ctx.unregister(handle) - ctx.handleMap.del(handle.istream.fd) + ctx.unset(handle) if handle.parser != nil: handle.finishParse() handle.iclose() @@ -933,19 +970,19 @@ proc finishCycle(ctx: LoaderContext; unregRead: var seq[LoaderHandle]; if output.isEmpty: unregWrite.add(output) for output in unregWrite: - if output.ostream != nil: + if output.stream != nil: if output.registered: ctx.unregister(output) - ctx.outputMap.del(output.ostream.fd) + ctx.unset(output) output.oclose() let handle = output.parent if handle != nil: # may be nil if from loadStream S_ISREG let i = handle.outputs.find(output) handle.outputs.del(i) - if handle.outputs.len == 0 and handle.istream != nil: + if handle.outputs.len == 0 and handle.stream != nil: # premature end of all output streams; kill istream too ctx.unregister(handle) - ctx.handleMap.del(handle.istream.fd) + ctx.unset(handle) if handle.parser != nil: handle.finishParse() handle.iclose() @@ -956,26 +993,26 @@ proc runFileLoader*(fd: cint; config: LoaderConfig) = var keys: array[64, ReadyKey] while ctx.alive: let count = ctx.selector.selectInto(-1, keys) - var unregRead: seq[LoaderHandle] = @[] + var unregRead: seq[InputHandle] = @[] var unregWrite: seq[OutputHandle] = @[] for event in keys.toOpenArray(0, count - 1): + let handle = ctx.handleMap[event.fd] if Read in event.events: if event.fd == fd: # incoming connection ctx.acceptConnection() else: - let handle = ctx.handleMap[event.fd] + let handle = InputHandle(ctx.handleMap[event.fd]) case ctx.handleRead(handle, unregWrite) of hrrDone: discard of hrrUnregister, hrrBrokenPipe: unregRead.add(handle) if Write in event.events: - ctx.handleWrite(ctx.outputMap[event.fd], unregWrite) + ctx.handleWrite(OutputHandle(handle), unregWrite) if Error in event.events: assert event.fd != fd - ctx.outputMap.withValue(event.fd, outputp): # ostream died - unregWrite.add(outputp[]) - do: # istream died - let handle = ctx.handleMap[event.fd] - unregRead.add(handle) + if handle of InputHandle: # istream died + unregRead.add(InputHandle(handle)) + else: # ostream died + unregWrite.add(OutputHandle(handle)) ctx.finishCycle(unregRead, unregWrite) ctx.exitLoader() @@ -1023,17 +1060,43 @@ proc startRequest*(loader: FileLoader; request: Request; w.swrite(config) return stream +iterator ongoing*(loader: FileLoader): OngoingData = + for it in loader.map: + if it != nil and it of OngoingData: + yield OngoingData(it) + +func fd*(data: LoaderData): int = + return int(data.stream.fd) + +proc put*(loader: FileLoader; data: LoaderData) = + let fd = int(data.stream.fd) + if loader.map.len <= fd: + loader.map.setLen(fd + 1) + assert loader.map[fd] == nil + loader.map[fd] = data + inc loader.mapFds + +proc get*(loader: FileLoader; fd: int): LoaderData = + if fd < loader.map.len: + return loader.map[fd] + return nil + +proc unset*(loader: FileLoader; data: LoaderData) = + let fd = int(data.stream.fd) + if loader.get(fd) != nil: + dec loader.mapFds + loader.map[fd] = nil + proc fetch0(loader: FileLoader; input: Request; promise: FetchPromise; redirectNum: int) = let stream = loader.startRequest(input) - let fd = int(stream.fd) - loader.registerFun(fd) - loader.connecting[fd] = ConnectData( + loader.registerFun(int(stream.fd)) + loader.put(ConnectData( promise: promise, request: input, stream: stream, redirectNum: redirectNum - ) + )) proc fetch*(loader: FileLoader; input: Request): FetchPromise = let promise = FetchPromise() @@ -1043,13 +1106,13 @@ proc fetch*(loader: FileLoader; input: Request): FetchPromise = proc reconnect*(loader: FileLoader; data: ConnectData) = data.stream.sclose() let stream = loader.startRequest(data.request) - let fd = int(stream.fd) - loader.registerFun(fd) - loader.connecting[fd] = ConnectData( + let data = ConnectData( promise: data.promise, request: data.request, stream: stream ) + loader.put(data) + loader.registerFun(data.fd) proc suspend*(loader: FileLoader; fds: seq[int]) = let stream = loader.connect() @@ -1121,8 +1184,7 @@ proc redirectToFile*(loader: FileLoader; outputId: int; targetPath: string): stream.sclose() return res -proc onConnected*(loader: FileLoader; fd: int) = - let connectData = loader.connecting[fd] +proc onConnected(loader: FileLoader; connectData: ConnectData) = let stream = connectData.stream let promise = connectData.promise let request = connectData.request @@ -1139,11 +1201,12 @@ proc onConnected*(loader: FileLoader; fd: int) = # msg is discarded. #TODO maybe print if called from trusted code (i.e. global == client)? r.sread(msg) # packet 1 + let fd = connectData.fd loader.unregisterFun(fd) loader.unregistered.add(fd) stream.sclose() # delete before resolving the promise - loader.connecting.del(fd) + loader.unset(connectData) let err = newTypeError("NetworkError when attempting to fetch resource") promise.resolve(JSResult[Response].err(err)) of cdsBeforeStatus: @@ -1155,18 +1218,20 @@ proc onConnected*(loader: FileLoader; fd: int) = r.sread(response.headers) # packet 3 # Only a stream of the response body may arrive after this point. response.body = stream + # delete before resolving the promise + loader.unset(connectData) + let data = OngoingData(response: response, stream: stream) + loader.put(data) assert loader.unregisterFun != nil response.unregisterFun = proc() = - loader.ongoing.del(response.body.fd) - loader.unregistered.add(response.body.fd) - loader.unregisterFun(response.body.fd) + loader.unset(data) + let fd = data.fd + loader.unregistered.add(fd) + loader.unregisterFun(fd) response.resumeFun = proc(outputId: int) = loader.resume(outputId) - loader.ongoing[fd] = response stream.setBlocking(false) let redirect = response.getRedirect(request) - # delete before resolving the promise - loader.connecting.del(fd) if redirect != nil: response.unregisterFun() stream.sclose() @@ -1179,24 +1244,38 @@ proc onConnected*(loader: FileLoader; fd: int) = else: promise.resolve(JSResult[Response].ok(response)) -proc onRead*(loader: FileLoader; fd: int) = - let response = loader.ongoing.getOrDefault(fd) - if response != nil: - response.onRead(response) - if response.body.isend: - if response.onFinish != nil: - response.onFinish(response, true) - response.onFinish = nil - response.close() - -proc onError*(loader: FileLoader; fd: int) = - let response = loader.ongoing.getOrDefault(fd) - if response != nil: +proc onRead*(loader: FileLoader; data: OngoingData) = + let response = data.response + response.onRead(response) + if response.body.isend: if response.onFinish != nil: - response.onFinish(response, false) + response.onFinish(response, true) response.onFinish = nil response.close() +proc onRead*(loader: FileLoader; fd: int) = + let data = loader.map[fd] + if data of ConnectData: + loader.onConnected(ConnectData(data)) + else: + loader.onRead(OngoingData(data)) + +proc onError*(loader: FileLoader; data: OngoingData) = + let response = data.response + if response.onFinish != nil: + response.onFinish(response, false) + response.onFinish = nil + response.close() + +proc onError*(loader: FileLoader; fd: int): bool = + let data = loader.map[fd] + if data of ConnectData: + # probably shouldn't happen. TODO + return false + else: + loader.onError(OngoingData(data)) + return true + # Note: this blocks until headers are received. proc doRequest*(loader: FileLoader; request: Request): Response = let stream = loader.startRequest(request) diff --git a/src/loader/loaderhandle.nim b/src/loader/loaderhandle.nim index 156bd22c..acca6e34 100644 --- a/src/loader/loaderhandle.nim +++ b/src/loader/loaderhandle.nim @@ -18,20 +18,28 @@ type LoaderBuffer* = ref LoaderBufferObj - OutputHandle* = ref object - parent*: LoaderHandle + LoaderHandle* = ref object of RootObj + registered*: bool # track registered state + stream*: PosixStream # input/output stream depending on type + when defined(debug): + url*: URL + + InputHandle* = ref object of LoaderHandle + outputs*: seq[OutputHandle] # list of outputs to be streamed into + cacheId*: int # if cached, our ID in a client cacheMap + parser*: HeaderParser # only exists for CGI handles + rstate: ResponseState # track response state + + OutputHandle* = ref object of LoaderHandle + parent*: InputHandle currentBuffer*: LoaderBuffer currentBufferIdx*: int buffers*: Deque[LoaderBuffer] - ostream*: PosixStream istreamAtEnd*: bool ownerPid*: int outputId*: int - registered*: bool suspended*: bool dead*: bool - when defined(debug): - url*: URL HandleParserState* = enum hpsBeforeLines, hpsAfterFirstLine, hpsControlDone @@ -47,16 +55,6 @@ type rsBeforeResult, rsAfterFailure, rsBeforeStatus, rsBeforeHeaders, rsAfterHeaders - LoaderHandle* = ref object - istream*: PosixStream # stream for taking input - outputs*: seq[OutputHandle] # list of outputs to be streamed into - cacheId*: int # if cached, our ID in a client cacheMap - parser*: HeaderParser # only exists for CGI handles - rstate: ResponseState # track response state - registered*: bool # track registered state - when defined(debug): - url*: URL - proc `=destroy`(buffer: var LoaderBufferObj) = if buffer.page != nil: dealloc(buffer.page) @@ -70,10 +68,10 @@ when defined(debug): return s # Create a new loader handle, with the output stream ostream. -proc newLoaderHandle*(ostream: PosixStream; outputId, pid: int): LoaderHandle = - let handle = LoaderHandle(cacheId: -1) +proc newInputHandle*(ostream: PosixStream; outputId, pid: int): InputHandle = + let handle = InputHandle(cacheId: -1) handle.outputs.add(OutputHandle( - ostream: ostream, + stream: ostream, parent: handle, outputId: outputId, ownerPid: pid, @@ -81,9 +79,9 @@ proc newLoaderHandle*(ostream: PosixStream; outputId, pid: int): LoaderHandle = )) return handle -proc findOutputHandle*(handle: LoaderHandle; fd: int): OutputHandle = +proc findOutputHandle*(handle: InputHandle; fd: int): OutputHandle = for output in handle.outputs: - if output.ostream.fd == fd: + if output.stream.fd == fd: return output return nil @@ -112,7 +110,7 @@ proc tee*(outputIn: OutputHandle; ostream: PosixStream; outputId, pid: int): assert outputIn.suspended let output = OutputHandle( parent: outputIn.parent, - ostream: ostream, + stream: ostream, currentBuffer: outputIn.currentBuffer, currentBufferIdx: outputIn.currentBufferIdx, buffers: outputIn.buffers, @@ -128,16 +126,16 @@ proc tee*(outputIn: OutputHandle; ostream: PosixStream; outputId, pid: int): outputIn.parent.outputs.add(output) return output -template output*(handle: LoaderHandle): OutputHandle = +template output*(handle: InputHandle): OutputHandle = handle.outputs[0] -proc sendResult*(handle: LoaderHandle; res: int; msg = "") = +proc sendResult*(handle: InputHandle; res: int; msg = "") = assert handle.rstate == rsBeforeResult inc handle.rstate let output = handle.output - let blocking = output.ostream.blocking - output.ostream.setBlocking(true) - output.ostream.withPacketWriter w: + let blocking = output.stream.blocking + output.stream.setBlocking(true) + output.stream.withPacketWriter w: w.swrite(res) if res == 0: # success assert msg == "" @@ -145,25 +143,25 @@ proc sendResult*(handle: LoaderHandle; res: int; msg = "") = inc handle.rstate else: # error w.swrite(msg) - output.ostream.setBlocking(blocking) + output.stream.setBlocking(blocking) -proc sendStatus*(handle: LoaderHandle; status: uint16) = +proc sendStatus*(handle: InputHandle; status: uint16) = assert handle.rstate == rsBeforeStatus inc handle.rstate - let blocking = handle.output.ostream.blocking - handle.output.ostream.setBlocking(true) - handle.output.ostream.withPacketWriter w: + let blocking = handle.output.stream.blocking + handle.output.stream.setBlocking(true) + handle.output.stream.withPacketWriter w: w.swrite(status) - handle.output.ostream.setBlocking(blocking) + handle.output.stream.setBlocking(blocking) -proc sendHeaders*(handle: LoaderHandle; headers: Headers) = +proc sendHeaders*(handle: InputHandle; headers: Headers) = assert handle.rstate == rsBeforeHeaders inc handle.rstate - let blocking = handle.output.ostream.blocking - handle.output.ostream.setBlocking(true) - handle.output.ostream.withPacketWriter w: + let blocking = handle.output.stream.blocking + handle.output.stream.setBlocking(true) + handle.output.stream.withPacketWriter w: w.swrite(headers) - handle.output.ostream.setBlocking(blocking) + handle.output.stream.setBlocking(blocking) proc recvData*(ps: PosixStream; buffer: LoaderBuffer): int {.inline.} = let n = ps.recvData(addr buffer.page[0], buffer.cap) @@ -174,8 +172,8 @@ proc sendData*(ps: PosixStream; buffer: LoaderBuffer; si = 0): int {.inline.} = assert buffer.len - si > 0 return ps.sendData(addr buffer.page[si], buffer.len - si) -proc iclose*(handle: LoaderHandle) = - if handle.istream != nil: +proc iclose*(handle: InputHandle) = + if handle.stream != nil: assert not handle.registered if handle.rstate notin {rsBeforeResult, rsAfterFailure, rsAfterHeaders}: assert handle.outputs.len == 1 @@ -186,21 +184,21 @@ proc iclose*(handle: LoaderHandle) = handle.sendStatus(500) if handle.rstate == rsBeforeHeaders: handle.sendHeaders(newHeaders()) - handle.output.ostream.setBlocking(true) + handle.output.stream.setBlocking(true) const msg = "Error: malformed header in CGI script" - discard handle.output.ostream.sendData(msg) + discard handle.output.stream.sendData(msg) except ErrorBrokenPipe: discard # receiver is dead - handle.istream.sclose() - handle.istream = nil + handle.stream.sclose() + handle.stream = nil proc oclose*(output: OutputHandle) = assert not output.registered - output.ostream.sclose() - output.ostream = nil + output.stream.sclose() + output.stream = nil -proc close*(handle: LoaderHandle) = +proc close*(handle: InputHandle) = handle.iclose() for output in handle.outputs: - if output.ostream != nil: + if output.stream != nil: output.oclose() diff --git a/src/loader/request.nim b/src/loader/request.nim index c210898d..bd102093 100644 --- a/src/loader/request.nim +++ b/src/loader/request.nim @@ -60,7 +60,7 @@ type window*: EnvironmentSettings RequestBodyType* = enum - rbtNone, rbtString, rbtMultipart, rbtOutput + rbtNone, rbtString, rbtMultipart, rbtOutput, rbtCache RequestBody* = object case t*: RequestBodyType @@ -72,6 +72,8 @@ type multipart*: FormData of rbtOutput: outputId*: int + of rbtCache: + cacheId*: int Request* = ref object httpMethod*: HttpMethod @@ -99,6 +101,7 @@ proc swrite*(writer: var BufferedWriter; o: RequestBody) = of rbtString: writer.swrite(o.s) of rbtMultipart: writer.swrite(o.multipart) of rbtOutput: writer.swrite(o.outputId) + of rbtCache: writer.swrite(o.cacheId) proc sread*(reader: var BufferedReader; o: var RequestBody) = var t: RequestBodyType @@ -109,6 +112,7 @@ proc sread*(reader: var BufferedReader; o: var RequestBody) = of rbtString: reader.sread(o.s) of rbtMultipart: reader.sread(o.multipart) of rbtOutput: reader.sread(o.outputId) + of rbtCache: reader.sread(o.cacheId) proc contentLength*(body: RequestBody): int = case body.t @@ -116,6 +120,7 @@ proc contentLength*(body: RequestBody): int = of rbtString: return body.s.len of rbtMultipart: return body.multipart.calcLength() of rbtOutput: return 0 + of rbtCache: return 0 func headers(this: JSRequest): Headers {.jsfget.} = return this.request.headers diff --git a/src/local/client.nim b/src/local/client.nim index 02f497ff..e96b5163 100644 --- a/src/local/client.nim +++ b/src/local/client.nim @@ -483,11 +483,10 @@ proc handleRead(client: Client; fd: int) = if not hadlf: client.console.err.write('\n') client.console.err.sflush() - elif fd in client.loader.connecting: - client.loader.onConnected(fd) - client.runJSJobs() - elif fd in client.loader.ongoing: + elif (let data = client.loader.get(fd); data != nil): client.loader.onRead(fd) + if data of ConnectData: + client.runJSJobs() elif fd in client.loader.unregistered: discard # ignore else: @@ -517,11 +516,8 @@ proc handleError(client: Client; fd: int) = #TODO do something here... stderr.write("Fork server crashed :(\n") client.quit(1) - elif fd in client.loader.connecting: - #TODO handle error? - discard - elif fd in client.loader.ongoing: - client.loader.onError(fd) + elif client.loader.map[fd] != nil: + discard client.loader.onError(fd) #TODO handle connection error? elif fd in client.loader.unregistered: discard # already unregistered... elif (let i = client.pager.findConnectingContainer(fd); i != -1): @@ -594,8 +590,7 @@ proc inputLoop(client: Client) = func hasSelectFds(client: Client): bool = return not client.timeouts.empty or client.pager.numload > 0 or - client.loader.connecting.len > 0 or - client.loader.ongoing.len > 0 or + client.loader.mapFds > 0 or client.pager.procmap.len > 0 proc headlessLoop(client: Client) = diff --git a/src/local/container.nim b/src/local/container.nim index 4a31d054..54d9bedd 100644 --- a/src/local/container.nim +++ b/src/local/container.nim @@ -505,6 +505,9 @@ proc newContainer*(config: BufferConfig; loaderConfig: LoaderClientConfig; func location(container: Container): URL {.jsfget.} = return container.url +proc c_rename(oldname, newname: cstring): cint {.importc: "rename", + header: "<stdio.h>".} + proc clone*(container: Container; newurl: URL; loader: FileLoader): Promise[Container] = if container.iface == nil: @@ -527,7 +530,8 @@ proc clone*(container: Container; newurl: URL; loader: FileLoader): return nil let newPath = getSocketPath(loader.sockDir, pid) let oldPath = getSocketPath(loader.sockDir, loader.clientPid) - moveFile(oldPath, newPath) + if c_rename(cstring(oldPath), cstring(newPath)) == -1: + return nil let nc = Container() nc[] = container[] nc.url = url diff --git a/src/local/pager.nim b/src/local/pager.nim index bba25c23..c0034a7f 100644 --- a/src/local/pager.nim +++ b/src/local/pager.nim @@ -484,7 +484,6 @@ proc redraw(pager: Pager) {.jsfunc.} = proc loadCachedImage(pager: Pager; container: Container; image: PosBitmap; offx, erry, dispw: int) = let bmp = image.bmp - let request = newRequest(newURL("cache:" & $bmp.cacheId).get) let cachedImage = CachedImage( bmp: bmp, width: image.width, @@ -496,27 +495,16 @@ proc loadCachedImage(pager: Pager; container: Container; image: PosBitmap; pager.loader.shareCachedItem(bmp.cacheId, pager.loader.clientPid, container.process) let imageMode = pager.term.imageMode - pager.loader.fetch(request).then(proc(res: JSResult[Response]): - Promise[JSResult[Response]] = - if res.isNone: - pager.loader.removeCachedItem(bmp.cacheId) - return - let response = res.get - let headers = newHeaders() - if image.width != bmp.width or image.height != bmp.height: - headers.add("Cha-Image-Target-Dimensions", $image.width & 'x' & - $image.height) - let request = newRequest( - newURL("img-codec+" & bmp.contentType.after('/') & ":decode").get, - httpMethod = hmPost, - headers = headers, - body = RequestBody(t: rbtOutput, outputId: response.outputId), - ) - let r = pager.loader.fetch(request) - response.resume() - response.close() - return r - ).then(proc(res: JSResult[Response]) = + let headers = newHeaders() + if image.width != bmp.width or image.height != bmp.height: + headers.add("Cha-Image-Target-Dimensions", $image.width & 'x' & + $image.height) + pager.loader.fetch(newRequest( + newURL("img-codec+" & bmp.contentType.after('/') & ":decode").get, + httpMethod = hmPost, + headers = headers, + body = RequestBody(t: rbtCache, cacheId: bmp.cacheId), + )).then(proc(res: JSResult[Response]) = if res.isNone: pager.loader.removeCachedItem(bmp.cacheId) return diff --git a/src/server/buffer.nim b/src/server/buffer.nim index 4324944e..ea17cbfc 100644 --- a/src/server/buffer.nim +++ b/src/server/buffer.nim @@ -929,15 +929,15 @@ proc clone*(buffer: Buffer; newurl: URL): int {.proxy.} = return -1 # suspend outputs before tee'ing var ids: seq[int] = @[] - for response in buffer.loader.ongoing.values: - if response.onRead != nil: - ids.add(response.outputId) + for it in buffer.loader.ongoing: + if it.response.onRead != nil: + ids.add(it.response.outputId) buffer.loader.suspend(ids) # ongoing transfers are now suspended; exhaust all data in the internal buffer # just to be safe. - for fd, response in buffer.loader.ongoing: - if response.onRead != nil: - buffer.loader.onRead(fd) + for it in buffer.loader.ongoing: + if it.response.onRead != nil: + buffer.loader.onRead(it.fd) let pid = fork() if pid == -1: buffer.estream.write("Failed to clone buffer.\n") @@ -973,22 +973,31 @@ proc clone*(buffer: Buffer; newurl: URL): int {.proxy.} = else: buffer.selector = newSelector[int]() #TODO set buffer.window.timeouts.selector - var ongoing: seq[Response] = @[] - for response in buffer.loader.ongoing.values: - ongoing.add(response) - response.body.sclose() - buffer.loader.ongoing.clear() + var connecting: seq[ConnectData] = @[] + var ongoing: seq[OngoingData] = @[] + for it in buffer.loader.map: + if it != nil: + if it of ConnectData: + connecting.add(ConnectData(it)) + else: + let it = OngoingData(it) + ongoing.add(it) + it.response.body.sclose() + buffer.loader.unregistered.add(it.fd) + buffer.loader.unset(it) let myPid = getCurrentProcessId() - for response in ongoing.mitems: + for it in ongoing.mitems: + let response = it.response # tee ongoing streams let (stream, outputId) = buffer.loader.tee(response.outputId, myPid) # if -1, well, this side hasn't exhausted the socket's buffer doAssert outputId != -1 and stream != nil response.outputId = outputId response.body = stream - let fd = int(response.body.fd) - buffer.loader.ongoing[fd] = response + let data = OngoingData(response: response, stream: stream) + let fd = data.fd buffer.selector.registerHandle(fd, {Read}, 0) + buffer.loader.put(data) if buffer.istream != nil: # We do not own our input stream, so we can't tee it. # Luckily it is cached, so what we *can* do is to load the same thing from @@ -1015,14 +1024,9 @@ proc clone*(buffer: Buffer; newurl: URL): int {.proxy.} = buffer.selector.registerHandle(buffer.rfd, {Read}, 0) # must reconnect after the new client is set up, or the client pids get # mixed up. - var cfds: seq[int] = @[] - for fd in buffer.loader.connecting.keys: - cfds.add(fd) - for fd in cfds: + for it in connecting: # connecting: just reconnect - let data = buffer.loader.connecting[fd] - buffer.loader.connecting.del(fd) - buffer.loader.reconnect(data) + buffer.loader.reconnect(it) return 0 else: # parent discard close(pipefd[1]) # close write @@ -1171,16 +1175,13 @@ proc forceRender*(buffer: Buffer) {.proxy.} = proc cancel*(buffer: Buffer) {.proxy.} = if buffer.state == bsLoaded: return - for fd, data in buffer.loader.connecting: - buffer.selector.unregister(fd) - buffer.loader.unregistered.add(fd) - data.stream.sclose() - buffer.loader.connecting.clear() - for fd, response in buffer.loader.ongoing: - buffer.selector.unregister(fd) - buffer.loader.unregistered.add(fd) - response.body.sclose() - buffer.loader.ongoing.clear() + for it in buffer.loader.map: + if it != nil: + let fd = it.fd + buffer.selector.unregister(fd) + buffer.loader.unregistered.add(fd) + it.stream.sclose() + buffer.loader.unset(it) if buffer.istream != nil: buffer.selector.unregister(buffer.fd) buffer.loader.unregistered.add(buffer.fd) @@ -1792,11 +1793,7 @@ proc handleRead(buffer: Buffer; fd: int): bool = return false elif fd == buffer.fd: buffer.onload() - elif fd in buffer.loader.connecting: - buffer.loader.onConnected(fd) - if buffer.config.scripting: - buffer.window.runJSJobs() - elif fd in buffer.loader.ongoing: + elif buffer.loader.get(fd) != nil: buffer.loader.onRead(fd) if buffer.config.scripting: buffer.window.runJSJobs() @@ -1812,11 +1809,10 @@ proc handleError(buffer: Buffer; fd: int; err: OSErrorCode): bool = return false elif fd == buffer.fd: buffer.onload() - elif fd in buffer.loader.connecting: - # probably shouldn't happen. TODO - assert false, $fd & ": " & $err - elif fd in buffer.loader.ongoing: - buffer.loader.onError(fd) + elif buffer.loader.get(fd) != nil: + if not buffer.loader.onError(fd): + #TODO handle connection error + assert false, $fd & ": " & $err if buffer.config.scripting: buffer.window.runJSJobs() elif fd in buffer.loader.unregistered: |