diff options
author | bptato <nincsnevem662@gmail.com> | 2024-09-14 15:58:52 +0200 |
---|---|---|
committer | bptato <nincsnevem662@gmail.com> | 2024-09-14 16:56:46 +0200 |
commit | 5b2a36579e53c69f154288a91ddc3e7c5375d7a6 (patch) | |
tree | 6b81b21aa15ef26031ce653ea972f75e00b1df0e /src/loader/loader.nim | |
parent | dfa1a4bc5ece3b8c333b9a47bab038ff6a162f5b (diff) | |
download | chawan-5b2a36579e53c69f154288a91ddc3e7c5375d7a6.tar.gz |
loader: refactor, misc optimizations & fixes
* factor out input/output handle tables; use a seq instead * add possibility to directly open cached items onto stdin (mainly an optimization for reading images, which are always cached) * close used handles on local CGI execution * make clone during load work again
Diffstat (limited to 'src/loader/loader.nim')
-rw-r--r-- | src/loader/loader.nim | 281 |
1 files changed, 180 insertions, 101 deletions
diff --git a/src/loader/loader.nim b/src/loader/loader.nim index c882c227..0816c6f8 100644 --- a/src/loader/loader.nim +++ b/src/loader/loader.nim @@ -60,8 +60,8 @@ type key*: ClientKey process*: int clientPid*: int - connecting*: Table[int, ConnectData] - ongoing*: Table[int, Response] + map*: seq[LoaderData] + mapFds*: int # number of fds in map unregistered*: seq[int] registerFun*: proc(fd: int) unregisterFun*: proc(fd: int) @@ -73,16 +73,21 @@ type ConnectDataState = enum cdsBeforeResult, cdsBeforeStatus, cdsBeforeHeaders - ConnectData = ref object + LoaderData = ref object of RootObj + stream*: SocketStream + + ConnectData* = ref object of LoaderData state: ConnectDataState status: uint16 res: int outputId: int redirectNum: int promise: Promise[JSResult[Response]] - stream*: SocketStream request: Request + OngoingData* = ref object of LoaderData + response*: Response + LoaderCommand = enum lcAddCacheFile lcAddClient @@ -119,8 +124,7 @@ type ssock: ServerSocket alive: bool config: LoaderConfig - handleMap: Table[int, LoaderHandle] - outputMap: Table[int, OutputHandle] + handleMap: seq[LoaderHandle] selector: Selector[int] # List of existing clients (buffer or pager) that may make requests. clientData: Table[int, ClientData] # pid -> data @@ -158,22 +162,32 @@ func canRewriteForCGICompat(ctx: LoaderContext; path: string): bool = return true return false -proc rejectHandle(handle: LoaderHandle; code: ConnectErrorCode; msg = "") = +proc rejectHandle(handle: InputHandle; code: ConnectErrorCode; msg = "") = handle.sendResult(code, msg) handle.close() +iterator inputHandles(ctx: LoaderContext): InputHandle {.inline.} = + for it in ctx.handleMap: + if it != nil and it of InputHandle: + yield InputHandle(it) + +iterator outputHandles(ctx: LoaderContext): OutputHandle {.inline.} = + for it in ctx.handleMap: + if it != nil and it of OutputHandle: + yield OutputHandle(it) + func findOutput(ctx: LoaderContext; id: int; client: ClientData): OutputHandle = assert id != -1 - for it in ctx.outputMap.values: + for it in ctx.outputHandles: if it.outputId == id: # verify that it's safe to access this handle. doAssert ctx.isPrivileged(client) or client.pid == it.ownerPid return it return nil -func findCachedHandle(ctx: LoaderContext; cacheId: int): LoaderHandle = +func findCachedHandle(ctx: LoaderContext; cacheId: int): InputHandle = assert cacheId != -1 - for it in ctx.handleMap.values: + for it in ctx.inputHandles: if it.cacheId == cacheId: return it return nil @@ -181,19 +195,19 @@ func findCachedHandle(ctx: LoaderContext; cacheId: int): LoaderHandle = type PushBufferResult = enum pbrDone, pbrUnregister -proc register(ctx: LoaderContext; handle: LoaderHandle) = +proc register(ctx: LoaderContext; handle: InputHandle) = assert not handle.registered - ctx.selector.registerHandle(int(handle.istream.fd), {Read}, 0) + ctx.selector.registerHandle(int(handle.stream.fd), {Read}, 0) handle.registered = true -proc unregister(ctx: LoaderContext; handle: LoaderHandle) = +proc unregister(ctx: LoaderContext; handle: InputHandle) = assert handle.registered - ctx.selector.unregister(int(handle.istream.fd)) + ctx.selector.unregister(int(handle.stream.fd)) handle.registered = false proc register(ctx: LoaderContext; output: OutputHandle) = assert not output.registered - ctx.selector.registerHandle(int(output.ostream.fd), {Write}, 0) + ctx.selector.registerHandle(int(output.stream.fd), {Write}, 0) output.registered = true const bsdPlatform = defined(macosx) or defined(freebsd) or defined(netbsd) or @@ -202,7 +216,7 @@ proc unregister(ctx: LoaderContext; output: OutputHandle) = assert output.registered # so kqueue-based selectors raise when we try to unregister a pipe whose # reader is at EOF. "solution": clean up this mess ourselves. - let fd = int(output.ostream.fd) + let fd = int(output.stream.fd) when bsdPlatform: let oc = ctx.selector.count try: @@ -240,7 +254,7 @@ proc pushBuffer(ctx: LoaderContext; output: OutputHandle; buffer: LoaderBuffer; elif output.currentBuffer == nil: var n = si try: - n += output.ostream.sendData(buffer, si) + n += output.stream.sendData(buffer, si) except ErrorAgain: discard except ErrorBrokenPipe: @@ -282,7 +296,7 @@ proc redirectToFile(ctx: LoaderContext; output: OutputHandle; elif output.parent != nil: output.parent.outputs.add(OutputHandle( parent: output.parent, - ostream: ps, + stream: ps, istreamAtEnd: output.istreamAtEnd, outputId: ctx.getOutputId() )) @@ -302,28 +316,38 @@ proc addCacheFile(ctx: LoaderContext; client: ClientData; output: OutputHandle): return cacheId return -1 -proc addFd(ctx: LoaderContext; handle: LoaderHandle) = +proc put(ctx: LoaderContext; handle: LoaderHandle) = + let fd = int(handle.stream.fd) + if ctx.handleMap.len <= fd: + ctx.handleMap.setLen(fd + 1) + assert ctx.handleMap[fd] == nil + ctx.handleMap[fd] = handle + +proc unset(ctx: LoaderContext; handle: LoaderHandle) = + let fd = int(handle.stream.fd) + if fd < ctx.handleMap.len: + ctx.handleMap[fd] = nil + +proc addFd(ctx: LoaderContext; handle: InputHandle) = let output = handle.output - output.ostream.setBlocking(false) - handle.istream.setBlocking(false) + output.stream.setBlocking(false) + handle.stream.setBlocking(false) ctx.register(handle) - assert handle.istream.fd notin ctx.handleMap - assert output.ostream.fd notin ctx.outputMap - ctx.handleMap[handle.istream.fd] = handle - ctx.outputMap[output.ostream.fd] = output + ctx.put(handle) + ctx.put(output) type HandleReadResult = enum hrrDone, hrrUnregister, hrrBrokenPipe # Called whenever there is more data available to read. -proc handleRead(ctx: LoaderContext; handle: LoaderHandle; +proc handleRead(ctx: LoaderContext; handle: InputHandle; unregWrite: var seq[OutputHandle]): HandleReadResult = var unregs = 0 let maxUnregs = handle.outputs.len while true: let buffer = newLoaderBuffer() try: - let n = handle.istream.recvData(buffer) + let n = handle.stream.recvData(buffer) if n == 0: # EOF return hrrUnregister var si = 0 @@ -356,9 +380,9 @@ proc handleRead(ctx: LoaderContext; handle: LoaderHandle; # stream is a regular file, so we can't select on it. # cachedHandle is used for attaching the output handle to a different -# LoaderHandle when loadFromCache is called while a download is still ongoing +# InputHandle when loadFromCache is called while a download is still ongoing # (and thus some parts of the document are not cached yet). -proc loadStreamRegular(ctx: LoaderContext; handle, cachedHandle: LoaderHandle) = +proc loadStreamRegular(ctx: LoaderContext; handle, cachedHandle: InputHandle) = assert handle.parser == nil # parser is only used with CGI var unregWrite: seq[OutputHandle] = @[] let r = ctx.handleRead(handle, unregWrite) @@ -374,18 +398,19 @@ proc loadStreamRegular(ctx: LoaderContext; handle, cachedHandle: LoaderHandle) = elif cachedHandle != nil: output.parent = cachedHandle cachedHandle.outputs.add(output) - ctx.outputMap[output.ostream.fd] = output + ctx.put(output) elif output.registered or output.suspended: output.parent = nil output.istreamAtEnd = true - ctx.outputMap[output.ostream.fd] = output + ctx.put(output) else: - assert output.ostream.fd notin ctx.outputMap + assert output.stream.fd < ctx.handleMap.len or + ctx.handleMap[output.stream.fd] == nil output.oclose() handle.outputs.setLen(0) handle.iclose() -proc loadStream(ctx: LoaderContext; client: ClientData; handle: LoaderHandle; +proc loadStream(ctx: LoaderContext; client: ClientData; handle: InputHandle; request: Request) = client.passedFdMap.withValue(request.url.pathname, fdp): handle.sendResult(0) @@ -394,12 +419,12 @@ proc loadStream(ctx: LoaderContext; client: ClientData; handle: LoaderHandle; let ps = newPosixStream(fdp[]) var stats: Stat doAssert fstat(fdp[], stats) != -1 - handle.istream = ps + handle.stream = ps client.passedFdMap.del(request.url.pathname) if S_ISCHR(stats.st_mode) or S_ISREG(stats.st_mode): # regular file: e.g. cha <file # or character device: e.g. cha </dev/null - handle.output.ostream.setBlocking(false) + handle.output.stream.setBlocking(false) # not loading from cache, so cachedHandle is nil ctx.loadStreamRegular(handle, nil) do: @@ -411,19 +436,24 @@ func find(cacheMap: seq[CachedItem]; id: int): int = return i -1 -proc loadFromCache(ctx: LoaderContext; client: ClientData; handle: LoaderHandle; +proc openCachedItem(client: ClientData; id: int): (PosixStream, int) = + let n = client.cacheMap.find(id) + if n != -1: + return (newPosixStream(client.cacheMap[n].path, O_RDONLY, 0), n) + return (nil, -1) + +proc loadFromCache(ctx: LoaderContext; client: ClientData; handle: InputHandle; request: Request) = let id = parseInt32(request.url.pathname).get(-1) let startFrom = if request.url.query.isSome: parseInt32(request.url.query.get).get(0) else: 0 - let n = client.cacheMap.find(id) - if n != -1: - let ps = newPosixStream(client.cacheMap[n].path, O_RDONLY, 0) + let (ps, n) = client.openCachedItem(id) + if ps != nil: if startFrom != 0: ps.seek(startFrom) - handle.istream = ps + handle.stream = ps if ps == nil: handle.rejectHandle(ERROR_FILE_NOT_IN_CACHE) client.cacheMap.del(n) @@ -431,7 +461,7 @@ proc loadFromCache(ctx: LoaderContext; client: ClientData; handle: LoaderHandle; handle.sendResult(0) handle.sendStatus(200) handle.sendHeaders(newHeaders()) - handle.output.ostream.setBlocking(false) + handle.output.stream.setBlocking(false) let cachedHandle = ctx.findCachedHandle(id) ctx.loadStreamRegular(handle, cachedHandle) else: @@ -440,7 +470,7 @@ proc loadFromCache(ctx: LoaderContext; client: ClientData; handle: LoaderHandle; # Data URL handler. # Moved back into loader from CGI, because data URLs can get extremely long # and thus no longer fit into the environment. -proc loadDataSend(ctx: LoaderContext; handle: LoaderHandle; s, ct: string) = +proc loadDataSend(ctx: LoaderContext; handle: InputHandle; s, ct: string) = handle.sendResult(0) handle.sendStatus(200) handle.sendHeaders(newHeaders({"Content-Type": ct})) @@ -448,7 +478,7 @@ proc loadDataSend(ctx: LoaderContext; handle: LoaderHandle; s, ct: string) = if s.len == 0: if output.suspended: output.istreamAtEnd = true - ctx.outputMap[output.ostream.fd] = output + ctx.put(output) else: output.oclose() return @@ -463,11 +493,11 @@ proc loadDataSend(ctx: LoaderContext; handle: LoaderHandle; s, ct: string) = of pbrDone: if output.registered or output.suspended: output.istreamAtEnd = true - ctx.outputMap[output.ostream.fd] = output + ctx.put(output) else: output.oclose() -proc loadData(ctx: LoaderContext; handle: LoaderHandle; request: Request) = +proc loadData(ctx: LoaderContext; handle: InputHandle; request: Request) = let url = request.url var ct = url.path.s.until(',') if AllChars - Ascii + Controls - {'\t', ' '} in ct: @@ -488,7 +518,7 @@ proc loadData(ctx: LoaderContext; handle: LoaderHandle; request: Request) = ctx.loadDataSend(handle, body, ct) proc loadResource(ctx: LoaderContext; client: ClientData; - config: LoaderClientConfig; request: Request; handle: LoaderHandle) = + config: LoaderClientConfig; request: Request; handle: InputHandle) = var redo = true var tries = 0 var prevurl: URL = nil @@ -504,16 +534,20 @@ proc loadResource(ctx: LoaderContext; client: ClientData; redo = true continue if request.url.scheme == "cgi-bin": - var ostream: PosixStream = nil + var istream: PosixStream = nil # for rbtCache + var ostream: PosixStream = nil # for rbtOutput + if request.body.t == rbtCache: + var n: int + (istream, n) = client.openCachedItem(request.body.cacheId) handle.loadCGI(request, ctx.config.cgiDir, prevurl, - config.insecureSSLNoVerify, ostream) - if handle.istream != nil: + config.insecureSSLNoVerify, ctx.handleMap, istream, ostream) + if handle.stream != nil: if ostream != nil: let outputIn = ctx.findOutput(request.body.outputId, client) if outputIn != nil: ostream.setBlocking(false) let output = outputIn.tee(ostream, ctx.getOutputId(), client.pid) - ctx.outputMap[ostream.fd] = output + ctx.put(output) output.suspended = false if not output.isEmpty: ctx.register(output) @@ -525,13 +559,13 @@ proc loadResource(ctx: LoaderContext; client: ClientData; handle.close() elif request.url.scheme == "stream": ctx.loadStream(client, handle, request) - if handle.istream != nil: + if handle.stream != nil: ctx.addFd(handle) else: handle.close() elif request.url.scheme == "cache": ctx.loadFromCache(client, handle, request) - assert handle.istream == nil + assert handle.stream == nil handle.close() elif request.url.scheme == "data": ctx.loadData(handle, request) @@ -564,7 +598,7 @@ proc setupRequestDefaults(request: Request; config: LoaderClientConfig) = proc load(ctx: LoaderContext; stream: SocketStream; request: Request; client: ClientData; config: LoaderClientConfig) = - let handle = newLoaderHandle(stream, ctx.getOutputId(), client.pid) + let handle = newInputHandle(stream, ctx.getOutputId(), client.pid) when defined(debug): handle.url = request.url handle.output.url = request.url @@ -720,7 +754,7 @@ proc tee(ctx: LoaderContext; stream: SocketStream; client: ClientData; if outputIn != nil: let id = ctx.getOutputId() let output = outputIn.tee(stream, id, targetPid) - ctx.outputMap[output.ostream.fd] = output + ctx.put(output) stream.withPacketWriter w: w.swrite(id) stream.setBlocking(false) @@ -821,7 +855,7 @@ proc acceptConnection(ctx: LoaderContext) = ctx.resume(stream, client, r) except ErrorBrokenPipe: # receiving end died while reading the file; give up. - assert stream.fd notin ctx.outputMap + assert stream.fd >= ctx.handleMap.len or ctx.handleMap[stream.fd] == nil stream.sclose() proc exitLoader(ctx: LoaderContext) = @@ -843,6 +877,9 @@ proc initLoaderContext(fd: cint; config: LoaderConfig): LoaderContext = ctx.ssock = initServerSocket(config.sockdir, -1, myPid, blocking = true) let sfd = int(ctx.ssock.sock.getFd()) ctx.selector.registerHandle(sfd, {Read}, 0) + if ctx.handleMap.len <= sfd: + ctx.handleMap.setLen(sfd + 1) + ctx.handleMap[sfd] = LoaderHandle() # pseudo handle # The server has been initialized, so the main process can resume execution. let ps = newPosixStream(fd) ps.write(char(0u8)) @@ -897,7 +934,7 @@ proc handleWrite(ctx: LoaderContext; output: OutputHandle; while output.currentBuffer != nil: let buffer = output.currentBuffer try: - let n = output.ostream.sendData(buffer, output.currentBufferIdx) + let n = output.stream.sendData(buffer, output.currentBufferIdx) output.currentBufferIdx += n if output.currentBufferIdx < buffer.len: break @@ -915,16 +952,16 @@ proc handleWrite(ctx: LoaderContext; output: OutputHandle; # all buffers sent, no need to select on this output again for now ctx.unregister(output) -proc finishCycle(ctx: LoaderContext; unregRead: var seq[LoaderHandle]; +proc finishCycle(ctx: LoaderContext; unregRead: var seq[InputHandle]; unregWrite: var seq[OutputHandle]) = # Unregister handles queued for unregistration. # It is possible for both unregRead and unregWrite to contain duplicates. To # avoid double-close/double-unregister, we set the istream/ostream of # unregistered handles to nil. for handle in unregRead: - if handle.istream != nil: + if handle.stream != nil: ctx.unregister(handle) - ctx.handleMap.del(handle.istream.fd) + ctx.unset(handle) if handle.parser != nil: handle.finishParse() handle.iclose() @@ -933,19 +970,19 @@ proc finishCycle(ctx: LoaderContext; unregRead: var seq[LoaderHandle]; if output.isEmpty: unregWrite.add(output) for output in unregWrite: - if output.ostream != nil: + if output.stream != nil: if output.registered: ctx.unregister(output) - ctx.outputMap.del(output.ostream.fd) + ctx.unset(output) output.oclose() let handle = output.parent if handle != nil: # may be nil if from loadStream S_ISREG let i = handle.outputs.find(output) handle.outputs.del(i) - if handle.outputs.len == 0 and handle.istream != nil: + if handle.outputs.len == 0 and handle.stream != nil: # premature end of all output streams; kill istream too ctx.unregister(handle) - ctx.handleMap.del(handle.istream.fd) + ctx.unset(handle) if handle.parser != nil: handle.finishParse() handle.iclose() @@ -956,26 +993,26 @@ proc runFileLoader*(fd: cint; config: LoaderConfig) = var keys: array[64, ReadyKey] while ctx.alive: let count = ctx.selector.selectInto(-1, keys) - var unregRead: seq[LoaderHandle] = @[] + var unregRead: seq[InputHandle] = @[] var unregWrite: seq[OutputHandle] = @[] for event in keys.toOpenArray(0, count - 1): + let handle = ctx.handleMap[event.fd] if Read in event.events: if event.fd == fd: # incoming connection ctx.acceptConnection() else: - let handle = ctx.handleMap[event.fd] + let handle = InputHandle(ctx.handleMap[event.fd]) case ctx.handleRead(handle, unregWrite) of hrrDone: discard of hrrUnregister, hrrBrokenPipe: unregRead.add(handle) if Write in event.events: - ctx.handleWrite(ctx.outputMap[event.fd], unregWrite) + ctx.handleWrite(OutputHandle(handle), unregWrite) if Error in event.events: assert event.fd != fd - ctx.outputMap.withValue(event.fd, outputp): # ostream died - unregWrite.add(outputp[]) - do: # istream died - let handle = ctx.handleMap[event.fd] - unregRead.add(handle) + if handle of InputHandle: # istream died + unregRead.add(InputHandle(handle)) + else: # ostream died + unregWrite.add(OutputHandle(handle)) ctx.finishCycle(unregRead, unregWrite) ctx.exitLoader() @@ -1023,17 +1060,43 @@ proc startRequest*(loader: FileLoader; request: Request; w.swrite(config) return stream +iterator ongoing*(loader: FileLoader): OngoingData = + for it in loader.map: + if it != nil and it of OngoingData: + yield OngoingData(it) + +func fd*(data: LoaderData): int = + return int(data.stream.fd) + +proc put*(loader: FileLoader; data: LoaderData) = + let fd = int(data.stream.fd) + if loader.map.len <= fd: + loader.map.setLen(fd + 1) + assert loader.map[fd] == nil + loader.map[fd] = data + inc loader.mapFds + +proc get*(loader: FileLoader; fd: int): LoaderData = + if fd < loader.map.len: + return loader.map[fd] + return nil + +proc unset*(loader: FileLoader; data: LoaderData) = + let fd = int(data.stream.fd) + if loader.get(fd) != nil: + dec loader.mapFds + loader.map[fd] = nil + proc fetch0(loader: FileLoader; input: Request; promise: FetchPromise; redirectNum: int) = let stream = loader.startRequest(input) - let fd = int(stream.fd) - loader.registerFun(fd) - loader.connecting[fd] = ConnectData( + loader.registerFun(int(stream.fd)) + loader.put(ConnectData( promise: promise, request: input, stream: stream, redirectNum: redirectNum - ) + )) proc fetch*(loader: FileLoader; input: Request): FetchPromise = let promise = FetchPromise() @@ -1043,13 +1106,13 @@ proc fetch*(loader: FileLoader; input: Request): FetchPromise = proc reconnect*(loader: FileLoader; data: ConnectData) = data.stream.sclose() let stream = loader.startRequest(data.request) - let fd = int(stream.fd) - loader.registerFun(fd) - loader.connecting[fd] = ConnectData( + let data = ConnectData( promise: data.promise, request: data.request, stream: stream ) + loader.put(data) + loader.registerFun(data.fd) proc suspend*(loader: FileLoader; fds: seq[int]) = let stream = loader.connect() @@ -1121,8 +1184,7 @@ proc redirectToFile*(loader: FileLoader; outputId: int; targetPath: string): stream.sclose() return res -proc onConnected*(loader: FileLoader; fd: int) = - let connectData = loader.connecting[fd] +proc onConnected(loader: FileLoader; connectData: ConnectData) = let stream = connectData.stream let promise = connectData.promise let request = connectData.request @@ -1139,11 +1201,12 @@ proc onConnected*(loader: FileLoader; fd: int) = # msg is discarded. #TODO maybe print if called from trusted code (i.e. global == client)? r.sread(msg) # packet 1 + let fd = connectData.fd loader.unregisterFun(fd) loader.unregistered.add(fd) stream.sclose() # delete before resolving the promise - loader.connecting.del(fd) + loader.unset(connectData) let err = newTypeError("NetworkError when attempting to fetch resource") promise.resolve(JSResult[Response].err(err)) of cdsBeforeStatus: @@ -1155,18 +1218,20 @@ proc onConnected*(loader: FileLoader; fd: int) = r.sread(response.headers) # packet 3 # Only a stream of the response body may arrive after this point. response.body = stream + # delete before resolving the promise + loader.unset(connectData) + let data = OngoingData(response: response, stream: stream) + loader.put(data) assert loader.unregisterFun != nil response.unregisterFun = proc() = - loader.ongoing.del(response.body.fd) - loader.unregistered.add(response.body.fd) - loader.unregisterFun(response.body.fd) + loader.unset(data) + let fd = data.fd + loader.unregistered.add(fd) + loader.unregisterFun(fd) response.resumeFun = proc(outputId: int) = loader.resume(outputId) - loader.ongoing[fd] = response stream.setBlocking(false) let redirect = response.getRedirect(request) - # delete before resolving the promise - loader.connecting.del(fd) if redirect != nil: response.unregisterFun() stream.sclose() @@ -1179,24 +1244,38 @@ proc onConnected*(loader: FileLoader; fd: int) = else: promise.resolve(JSResult[Response].ok(response)) -proc onRead*(loader: FileLoader; fd: int) = - let response = loader.ongoing.getOrDefault(fd) - if response != nil: - response.onRead(response) - if response.body.isend: - if response.onFinish != nil: - response.onFinish(response, true) - response.onFinish = nil - response.close() - -proc onError*(loader: FileLoader; fd: int) = - let response = loader.ongoing.getOrDefault(fd) - if response != nil: +proc onRead*(loader: FileLoader; data: OngoingData) = + let response = data.response + response.onRead(response) + if response.body.isend: if response.onFinish != nil: - response.onFinish(response, false) + response.onFinish(response, true) response.onFinish = nil response.close() +proc onRead*(loader: FileLoader; fd: int) = + let data = loader.map[fd] + if data of ConnectData: + loader.onConnected(ConnectData(data)) + else: + loader.onRead(OngoingData(data)) + +proc onError*(loader: FileLoader; data: OngoingData) = + let response = data.response + if response.onFinish != nil: + response.onFinish(response, false) + response.onFinish = nil + response.close() + +proc onError*(loader: FileLoader; fd: int): bool = + let data = loader.map[fd] + if data of ConnectData: + # probably shouldn't happen. TODO + return false + else: + loader.onError(OngoingData(data)) + return true + # Note: this blocks until headers are received. proc doRequest*(loader: FileLoader; request: Request): Response = let stream = loader.startRequest(request) |