diff options
author | bptato <nincsnevem662@gmail.com> | 2024-05-30 00:19:48 +0200 |
---|---|---|
committer | bptato <nincsnevem662@gmail.com> | 2024-06-20 17:50:22 +0200 |
commit | 60dc37269cd2dc8cdf23d9f77680f6af9490032f (patch) | |
tree | 9a72ba24daffa546f92704e7e06cf84fded2d89d /src/loader | |
parent | a146a22b11cea39bc691417d9d9a1292b7177552 (diff) | |
download | chawan-60dc37269cd2dc8cdf23d9f77680f6af9490032f.tar.gz |
img, loader: separate out png codec into cgi, misc improvements
* multi-processed and sandboxed PNG decoding & encoding (through local CGI) * improved request body passing (including support for output id as response body) * simplified & faster blob()/text() - now every request starts suspended, and OngoingData.buf has been replaced with loader's buffering capability * image caching: we no longer pull bitmaps from the container after every single getLines call Next steps: replace our bespoke PNG decoder with something more usable, add other decoders, and make them stream.
Diffstat (limited to 'src/loader')
-rw-r--r-- | src/loader/cgi.nim | 53 | ||||
-rw-r--r-- | src/loader/loader.nim | 160 | ||||
-rw-r--r-- | src/loader/loaderhandle.nim | 41 | ||||
-rw-r--r-- | src/loader/request.nim | 49 | ||||
-rw-r--r-- | src/loader/response.nim | 124 |
5 files changed, 258 insertions, 169 deletions
diff --git a/src/loader/cgi.nim b/src/loader/cgi.nim index f3c5b1e3..ee3d3160 100644 --- a/src/loader/cgi.nim +++ b/src/loader/cgi.nim @@ -42,8 +42,8 @@ proc setupEnv(cmd, scriptName, pathInfo, requestURI, myDir: string; if url.query.isSome: putEnv("QUERY_STRING", url.query.get) if request.httpMethod == hmPost: - if request.multipart.isSome: - putEnv("CONTENT_TYPE", request.multipart.get.getContentType()) + if request.body.t == rbtMultipart: + putEnv("CONTENT_TYPE", request.body.multipart.getContentType()) else: putEnv("CONTENT_TYPE", request.headers.getOrDefault("Content-Type", "")) putEnv("CONTENT_LENGTH", $contentLen) @@ -126,7 +126,7 @@ proc handleLine(handle: LoaderHandle; line: string; headers: Headers) = headers.add(k, v) proc loadCGI*(handle: LoaderHandle; request: Request; cgiDir: seq[string]; - prevURL: URL; insecureSSLNoVerify: bool) = + prevURL: URL; insecureSSLNoVerify: bool; ostream: var PosixStream) = if cgiDir.len == 0: handle.sendResult(ERROR_NO_CGI_DIR) return @@ -181,16 +181,11 @@ proc loadCGI*(handle: LoaderHandle; request: Request; cgiDir: seq[string]; return # Pipe the request body as stdin for POST. var pipefd_read: array[0..1, cint] # parent -> child - let needsPipe = request.body.isSome or request.multipart.isSome - if needsPipe: + if request.body.t != rbtNone: if pipe(pipefd_read) == -1: handle.sendResult(ERROR_FAIL_SETUP_CGI) return - var contentLen = 0 - if request.body.isSome: - contentLen = request.body.get.len - elif request.multipart.isSome: - contentLen = request.multipart.get.calcLength() + let contentLen = request.body.contentLength() stdout.flushFile() stderr.flushFile() let pid = fork() @@ -199,7 +194,7 @@ proc loadCGI*(handle: LoaderHandle; request: Request; cgiDir: seq[string]; elif pid == 0: discard close(pipefd[0]) # close read discard dup2(pipefd[1], 1) # dup stdout - if needsPipe: + if request.body.t != rbtNone: discard close(pipefd_read[1]) # close write if pipefd_read[0] != 0: discard dup2(pipefd_read[0], 0) # dup stdin @@ -220,38 +215,32 @@ proc loadCGI*(handle: LoaderHandle; request: Request; cgiDir: seq[string]; quit(1) else: discard close(pipefd[1]) # close write - if needsPipe: + if request.body.t != rbtNone: discard close(pipefd_read[0]) # close read let ps = newPosixStream(pipefd_read[1]) - if request.body.isSome: - ps.write(request.body.get) - elif request.multipart.isSome: - let multipart = request.multipart.get - for entry in multipart.entries: - ps.writeEntry(entry, multipart.boundary) - ps.writeEnd(multipart.boundary) - ps.sclose() + case request.body.t + of rbtString: + ps.write(request.body.s) + ps.sclose() + of rbtMultipart: + let boundary = request.body.multipart.boundary + for entry in request.body.multipart.entries: + ps.writeEntry(entry, boundary) + ps.writeEnd(boundary) + ps.sclose() + of rbtOutput: + ostream = ps + of rbtNone: discard handle.parser = HeaderParser(headers: newHeaders()) handle.istream = newPosixStream(pipefd[0]) -proc killHandle(handle: LoaderHandle) = - if handle.parser.state != hpsBeforeLines: - # not an ideal solution, but better than silently eating malformed - # headers - handle.output.ostream.setBlocking(true) - handle.sendStatus(500) - handle.sendHeaders(newHeaders()) - const msg = "Error: malformed header in CGI script" - discard handle.output.ostream.sendData(msg) - handle.parser = nil - proc parseHeaders0(handle: LoaderHandle; buffer: LoaderBuffer): int = let parser = handle.parser var s = parser.lineBuffer let L = if buffer == nil: 1 else: buffer.len for i in 0 ..< L: template die = - handle.killHandle() + handle.parser = nil return -1 let c = if buffer != nil: char(buffer.page[i]) diff --git a/src/loader/loader.nim b/src/loader/loader.nim index c84f247a..89d97cde 100644 --- a/src/loader/loader.nim +++ b/src/loader/loader.nim @@ -8,11 +8,17 @@ # S: output ID # S: status code # S: headers +# C: resume # S: response body # else: # S: error message # # The body is passed to the stream as-is, so effectively nothing can follow it. +# +# Note: if the consumer closes the request's body after headers have been +# passed, it will *not* be cleaned up until a `resume' command is +# received. (This allows for passing outputIds to the pager for later +# addCacheFile commands there.) import std/deques import std/nativesockets @@ -57,7 +63,7 @@ type process*: int clientPid*: int connecting*: Table[int, ConnectData] - ongoing*: Table[int, OngoingData] + ongoing*: Table[int, Response] unregistered*: seq[int] registerFun*: proc(fd: int) unregisterFun*: proc(fd: int) @@ -71,11 +77,6 @@ type stream*: SocketStream request: Request - OngoingData* = object - buf: string - response*: Response - bodyRead: Promise[string] - LoaderCommand = enum lcAddCacheFile lcAddClient @@ -155,10 +156,12 @@ proc rejectHandle(handle: LoaderHandle; code: ConnectErrorCode; msg = "") = handle.sendResult(code, msg) handle.close() -func findOutput(ctx: LoaderContext; id: int): OutputHandle = +func findOutput(ctx: LoaderContext; id: int; client: ClientData): OutputHandle = assert id != -1 for it in ctx.outputMap.values: if it.outputId == id: + # verify that it's safe to access this handle. + doAssert ctx.isPrivileged(client) or client.pid == it.ownerPid return it return nil @@ -211,11 +214,8 @@ proc getOutputId(ctx: LoaderContext): int = result = ctx.outputNum inc ctx.outputNum -proc redirectToFile(ctx: LoaderContext; output: OutputHandle; - targetPath: string): bool = - let ps = newPosixStream(targetPath, O_CREAT or O_WRONLY, 0o600) - if ps == nil: - return false +proc redirectToStream(ctx: LoaderContext; output: OutputHandle; + ps: PosixStream): bool = if output.currentBuffer != nil: let n = ps.sendData(output.currentBuffer, output.currentBufferIdx) if unlikely(n < output.currentBuffer.len - output.currentBufferIdx): @@ -226,7 +226,9 @@ proc redirectToFile(ctx: LoaderContext; output: OutputHandle; if unlikely(n < buffer.len): ps.sclose() return false - if output.parent != nil: + if output.istreamAtEnd: + ps.sclose() + elif output.parent != nil: output.parent.outputs.add(OutputHandle( parent: output.parent, ostream: ps, @@ -235,6 +237,13 @@ proc redirectToFile(ctx: LoaderContext; output: OutputHandle; )) return true +proc redirectToFile(ctx: LoaderContext; output: OutputHandle; + targetPath: string): bool = + let ps = newPosixStream(targetPath, O_CREAT or O_WRONLY, 0o600) + if ps == nil: + return false + return ctx.redirectToStream(output, ps) + type AddCacheFileResult = tuple[outputId: int; cacheFile: string] proc addCacheFile(ctx: LoaderContext; client: ClientData; output: OutputHandle): @@ -335,8 +344,7 @@ proc loadStreamRegular(ctx: LoaderContext; handle, cachedHandle: LoaderHandle) = output.ostream.sclose() output.ostream = nil handle.outputs.setLen(0) - handle.istream.sclose() - handle.istream = nil + handle.iclose() proc loadStream(ctx: LoaderContext; handle: LoaderHandle; request: Request) = ctx.passedFdMap.withValue(request.url.pathname, fdp): @@ -406,10 +414,17 @@ proc loadResource(ctx: LoaderContext; client: ClientData; config: LoaderClientCo redo = true continue if request.url.scheme == "cgi-bin": + var ostream: PosixStream = nil handle.loadCGI(request, ctx.config.cgiDir, prevurl, - config.insecureSSLNoVerify) + config.insecureSSLNoVerify, ostream) if handle.istream != nil: ctx.addFd(handle) + if ostream != nil: + let output = ctx.findOutput(request.body.outputId, client) + if output != nil: + doAssert ctx.redirectToStream(output, ostream) + else: + ostream.sclose() else: handle.close() elif request.url.scheme == "stream": @@ -451,8 +466,7 @@ proc setupRequestDefaults(request: Request; config: LoaderClientConfig) = proc load(ctx: LoaderContext; stream: SocketStream; request: Request; client: ClientData; config: LoaderClientConfig) = - let handle = newLoaderHandle(stream, ctx.getOutputId(), client.pid, - request.suspended) + let handle = newLoaderHandle(stream, ctx.getOutputId(), client.pid) when defined(debug): handle.url = request.url handle.output.url = request.url @@ -514,9 +528,12 @@ proc addCacheFile(ctx: LoaderContext; stream: SocketStream; r: var BufferedReader) = var outputId: int var targetPid: int + var sourcePid: int r.sread(outputId) r.sread(targetPid) - let output = ctx.findOutput(outputId) + r.sread(sourcePid) + let sourceClient = ctx.clientData[sourcePid] + let output = ctx.findOutput(outputId, sourceClient) assert output != nil let targetClient = ctx.clientData[targetPid] let (id, file) = ctx.addCacheFile(targetClient, output) @@ -531,7 +548,7 @@ proc redirectToFile(ctx: LoaderContext; stream: SocketStream; var targetPath: string r.sread(outputId) r.sread(targetPath) - let output = ctx.findOutput(outputId) + let output = ctx.findOutput(outputId, ctx.pagerClient) var success = false if output != nil: success = ctx.redirectToFile(output, targetPath) @@ -583,9 +600,7 @@ proc tee(ctx: LoaderContext; stream: SocketStream; client: ClientData; var targetPid: int r.sread(sourceId) r.sread(targetPid) - let output = ctx.findOutput(sourceId) - # only allow tee'ing outputs owned by client - doAssert output.ownerPid == client.pid + let output = ctx.findOutput(sourceId, client) if output != nil: let id = ctx.getOutputId() output.tee(stream, id, targetPid) @@ -602,7 +617,7 @@ proc suspend(ctx: LoaderContext; stream: SocketStream; client: ClientData; var ids: seq[int] r.sread(ids) for id in ids: - let output = ctx.findOutput(id) + let output = ctx.findOutput(id, client) if output != nil: output.suspended = true if output.registered: @@ -615,7 +630,7 @@ proc resume(ctx: LoaderContext; stream: SocketStream; client: ClientData; var ids: seq[int] r.sread(ids) for id in ids: - let output = ctx.findOutput(id) + let output = ctx.findOutput(id, client) if output != nil: output.suspended = false assert not output.registered @@ -793,10 +808,9 @@ proc finishCycle(ctx: LoaderContext; unregRead: var seq[LoaderHandle]; if handle.istream != nil: ctx.selector.unregister(handle.istream.fd) ctx.handleMap.del(handle.istream.fd) - handle.istream.sclose() - handle.istream = nil if handle.parser != nil: handle.finishParse() + handle.iclose() for output in handle.outputs: output.istreamAtEnd = true if output.isEmpty: @@ -816,10 +830,9 @@ proc finishCycle(ctx: LoaderContext; unregRead: var seq[LoaderHandle]; # premature end of all output streams; kill istream too ctx.selector.unregister(handle.istream.fd) ctx.handleMap.del(handle.istream.fd) - handle.istream.sclose() - handle.istream = nil if handle.parser != nil: handle.finishParse() + handle.iclose() proc runFileLoader*(fd: cint; config: LoaderConfig) = var ctx = initLoaderContext(fd, config) @@ -861,12 +874,7 @@ proc getRedirect*(response: Response; request: Request): Request = status == 302 and request.httpMethod == hmPost: return newRequest(url.get, hmGet) else: - return newRequest( - url.get, - request.httpMethod, - body = request.body, - multipart = request.multipart - ) + return newRequest(url.get, request.httpMethod, body = request.body) return nil template withLoaderPacketWriter(stream: SocketStream; loader: FileLoader; @@ -898,7 +906,6 @@ proc startRequest*(loader: FileLoader; request: Request; w.swrite(config) return stream -#TODO: add init proc fetch*(loader: FileLoader; input: Request): FetchPromise = let stream = loader.startRequest(input) let fd = int(stream.fd) @@ -913,10 +920,7 @@ proc fetch*(loader: FileLoader; input: Request): FetchPromise = proc reconnect*(loader: FileLoader; data: ConnectData) = data.stream.sclose() - let stream = loader.connect() - stream.withLoaderPacketWriter loader, w: - w.swrite(lcLoad) - w.swrite(data.request) + let stream = loader.startRequest(data.request) let fd = int(stream.fd) loader.registerFun(fd) loader.connecting[fd] = ConnectData( @@ -925,18 +929,6 @@ proc reconnect*(loader: FileLoader; data: ConnectData) = stream: stream ) -proc switchStream*(data: var ConnectData; stream: SocketStream) = - data.stream = stream - -proc switchStream*(loader: FileLoader; data: var OngoingData; - stream: SocketStream) = - data.response.body = stream - let fd = int(stream.fd) - data.response.unregisterFun = proc() = - loader.ongoing.del(fd) - loader.unregistered.add(fd) - loader.unregisterFun(fd) - proc suspend*(loader: FileLoader; fds: seq[int]) = let stream = loader.connect() stream.withLoaderPacketWriter loader, w: @@ -944,13 +936,16 @@ proc suspend*(loader: FileLoader; fds: seq[int]) = w.swrite(fds) stream.sclose() -proc resume*(loader: FileLoader; fds: seq[int]) = +proc resume*(loader: FileLoader; fds: openArray[int]) = let stream = loader.connect() stream.withLoaderPacketWriter loader, w: w.swrite(lcResume) w.swrite(fds) stream.sclose() +proc resume*(loader: FileLoader; fds: int) = + loader.resume([fds]) + proc tee*(loader: FileLoader; sourceId, targetPid: int): (SocketStream, int) = let stream = loader.connect() stream.withLoaderPacketWriter loader, w: @@ -962,15 +957,20 @@ proc tee*(loader: FileLoader; sourceId, targetPid: int): (SocketStream, int) = r.sread(outputId) return (stream, outputId) -proc addCacheFile*(loader: FileLoader; outputId, targetPid: int): - AddCacheFileResult = +# sourcePid is the PID of the output's owner. This is used in pager for images, +# so that we can be sure that a container only loads images on the page that +# it owns. +proc addCacheFile*(loader: FileLoader; outputId, targetPid: int; + sourcePid = -1): AddCacheFileResult = let stream = loader.connect() if stream == nil: return (-1, "") + let sourcePid = if sourcePid == -1: loader.clientPid else: sourcePid stream.withLoaderPacketWriter loader, w: w.swrite(lcAddCacheFile) w.swrite(outputId) w.swrite(targetPid) + w.swrite(sourcePid) var r = stream.initPacketReader() var outputId: int var cacheFile: string @@ -990,18 +990,18 @@ proc redirectToFile*(loader: FileLoader; outputId: int; targetPath: string): var r = stream.initPacketReader() r.sread(result) -const BufferSize = 4096 - proc onConnected*(loader: FileLoader; fd: int) = let connectData = loader.connecting[fd] let stream = connectData.stream let promise = connectData.promise let request = connectData.request + # delete before resolving the promise + loader.connecting.del(fd) var r = stream.initPacketReader() var res: int r.sread(res) # packet 1 - let response = newResponse(res, request, stream) if res == 0: + let response = newResponse(res, request, stream) r.sread(response.outputId) # packet 1 r = stream.initPacketReader() r.sread(response.status) # packet 2 @@ -1011,13 +1011,12 @@ proc onConnected*(loader: FileLoader; fd: int) = response.body = stream assert loader.unregisterFun != nil response.unregisterFun = proc() = - loader.ongoing.del(fd) - loader.unregistered.add(fd) - loader.unregisterFun(fd) - loader.ongoing[fd] = OngoingData( - response: response, - bodyRead: response.bodyRead - ) + loader.ongoing.del(response.body.fd) + loader.unregistered.add(response.body.fd) + loader.unregisterFun(response.body.fd) + response.resumeFun = proc(outputId: int) = + loader.resume(outputId) + loader.ongoing[fd] = response stream.setBlocking(false) promise.resolve(JSResult[Response].ok(response)) else: @@ -1030,40 +1029,27 @@ proc onConnected*(loader: FileLoader; fd: int) = stream.sclose() let err = newTypeError("NetworkError when attempting to fetch resource") promise.resolve(JSResult[Response].err(err)) - loader.connecting.del(fd) proc onRead*(loader: FileLoader; fd: int) = - loader.ongoing.withValue(fd, buffer): - let response = buffer[].response - while not response.body.isend: - let olen = buffer[].buf.len - try: - buffer[].buf.setLen(olen + BufferSize) - let n = response.body.recvData(addr buffer[].buf[olen], BufferSize) - buffer[].buf.setLen(olen + n) - if n == 0: - break - except ErrorAgain: - buffer[].buf.setLen(olen) - break + let response = loader.ongoing.getOrDefault(fd) + if response != nil: + response.onRead(response) if response.body.isend: - buffer[].bodyRead.resolve(buffer[].buf) - buffer[].bodyRead = nil - buffer[].buf = "" + response.bodyRead.resolve() + response.bodyRead = nil response.unregisterFun() proc onError*(loader: FileLoader; fd: int) = - loader.ongoing.withValue(fd, buffer): - let response = buffer[].response + let response = loader.ongoing.getOrDefault(fd) + if response != nil: when defined(debug): var lbuf {.noinit.}: array[BufferSize, char] if not response.body.isend: let n = response.body.recvData(addr lbuf[0], lbuf.len) assert n == 0 assert response.body.isend - buffer[].bodyRead.resolve(buffer[].buf) - buffer[].bodyRead = nil - buffer[].buf = "" + response.bodyRead.resolve() + response.bodyRead = nil response.unregisterFun() # Note: this blocks until headers are received. diff --git a/src/loader/loaderhandle.nim b/src/loader/loaderhandle.nim index 00f6f754..31a41571 100644 --- a/src/loader/loaderhandle.nim +++ b/src/loader/loaderhandle.nim @@ -3,6 +3,7 @@ import std/net import std/tables import io/bufwriter +import io/dynstream import io/posixstream import loader/headers @@ -44,14 +45,15 @@ type status*: uint16 ResponseState = enum - rsBeforeResult, rsBeforeStatus, rsBeforeHeaders, rsAfterHeaders + rsBeforeResult, rsAfterFailure, rsBeforeStatus, rsBeforeHeaders, + rsAfterHeaders LoaderHandle* = ref object istream*: PosixStream # stream for taking input outputs*: seq[OutputHandle] # list of outputs to be streamed into cacheId*: int # if cached, our ID in a client cacheMap parser*: HeaderParser # only exists for CGI handles - rstate: ResponseState # just an enum for sanity checks + rstate: ResponseState # track response state when defined(debug): url*: URL @@ -69,15 +71,14 @@ when defined(debug): return s # Create a new loader handle, with the output stream ostream. -proc newLoaderHandle*(ostream: PosixStream; outputId, pid: int; - suspended: bool): LoaderHandle = +proc newLoaderHandle*(ostream: PosixStream; outputId, pid: int): LoaderHandle = let handle = LoaderHandle(cacheId: -1) handle.outputs.add(OutputHandle( ostream: ostream, parent: handle, outputId: outputId, ownerPid: pid, - suspended: suspended + suspended: true )) return handle @@ -108,15 +109,17 @@ proc bufferCleared*(output: OutputHandle) = output.currentBuffer = nil proc tee*(outputIn: OutputHandle; ostream: PosixStream; outputId, pid: int) = - outputIn.parent.outputs.add(OutputHandle( - parent: outputIn.parent, + let parent = outputIn.parent + parent.outputs.add(OutputHandle( + parent: parent, ostream: ostream, currentBuffer: outputIn.currentBuffer, currentBufferIdx: outputIn.currentBufferIdx, buffers: outputIn.buffers, istreamAtEnd: outputIn.istreamAtEnd, outputId: outputId, - ownerPid: pid + ownerPid: pid, + suspended: outputIn.suspended )) template output*(handle: LoaderHandle): OutputHandle = @@ -133,6 +136,7 @@ proc sendResult*(handle: LoaderHandle; res: int; msg = "") = if res == 0: # success assert msg == "" w.swrite(output.outputId) + inc handle.rstate else: # error w.swrite(msg) output.ostream.setBlocking(blocking) @@ -164,12 +168,27 @@ proc sendData*(ps: PosixStream; buffer: LoaderBuffer; si = 0): int {.inline.} = assert buffer.len - si > 0 return ps.sendData(addr buffer.page[si], buffer.len - si) +proc iclose*(handle: LoaderHandle) = + if handle.istream != nil: + if handle.rstate notin {rsBeforeResult, rsAfterFailure, rsAfterHeaders}: + assert handle.outputs.len == 1 + # not an ideal solution, but better than silently eating malformed + # headers + try: + handle.sendStatus(500) + handle.sendHeaders(newHeaders()) + handle.output.ostream.setBlocking(true) + const msg = "Error: malformed header in CGI script" + discard handle.output.ostream.sendData(msg) + except ErrorBrokenPipe: + discard # receiver is dead + handle.istream.sclose() + handle.istream = nil + proc close*(handle: LoaderHandle) = + handle.iclose() for output in handle.outputs: #TODO assert not output.registered if output.ostream != nil: output.ostream.sclose() output.ostream = nil - if handle.istream != nil: - handle.istream.sclose() - handle.istream = nil diff --git a/src/loader/request.nim b/src/loader/request.nim index 277481f1..f92098cb 100644 --- a/src/loader/request.nim +++ b/src/loader/request.nim @@ -58,17 +58,27 @@ type of rwtWindow: window*: EnvironmentSettings + RequestBodyType* = enum + rbtNone, rbtString, rbtMultipart, rbtOutput + + RequestBody* = object + case t*: RequestBodyType + of rbtNone: + discard + of rbtString: + s*: string + of rbtMultipart: + multipart*: FormData + of rbtOutput: + outputId*: int + Request* = ref object httpMethod*: HttpMethod url*: URL headers*: Headers - body*: Option[string] - multipart*: Option[FormData] + body*: RequestBody referrer*: URL proxy*: URL #TODO do something with this - # when set to true, the loader will not write data from the body (not - # headers!) into the output until a resume is received. - suspended*: bool JSRequest* = ref object request*: Request @@ -81,6 +91,13 @@ type jsDestructor(JSRequest) +proc contentLength*(body: RequestBody): int = + case body.t + of rbtNone: return 0 + of rbtString: return body.s.len + of rbtMultipart: return body.multipart.calcLength() + of rbtOutput: return 0 + func headers(this: JSRequest): Headers {.jsfget.} = return this.request.headers @@ -102,17 +119,14 @@ iterator pairs*(headers: Headers): (string, string) = yield (k, v) func newRequest*(url: URL; httpMethod = hmGet; headers = newHeaders(); - body = none(string); multipart = none(FormData); proxy: URL = nil; - referrer: URL = nil; suspended = false): Request = + body = RequestBody(); proxy: URL = nil; referrer: URL = nil): Request = return Request( url: url, httpMethod: httpMethod, headers: headers, body: body, - multipart: multipart, referrer: referrer, - proxy: proxy, - suspended: suspended + proxy: proxy ) func createPotentialCORSRequest*(url: URL; destination: RequestDestination; @@ -178,7 +192,7 @@ proc fromJSBodyInit(ctx: JSContext; val: JSValue): JSResult[BodyInit] = let x = fromJS[string](ctx, val) if x.isSome: return ok(BodyInit(t: bitString, str: x.get)) - return err(newTypeError("Invalid body init type")) + return errTypeError("Invalid body init type") func newRequest*[T: string|JSRequest](ctx: JSContext; resource: T; init = none(RequestInit)): JSResult[JSRequest] {.jsctor.} = @@ -188,13 +202,12 @@ func newRequest*[T: string|JSRequest](ctx: JSContext; resource: T; when T is string: let url = ?newURL(resource) if url.username != "" or url.password != "": - return err(newTypeError("Input URL contains a username or password")) + return errTypeError("Input URL contains a username or password") var httpMethod = hmGet var headers = newHeaders() let referrer: URL = nil var credentials = cmSameOrigin - var body: Option[string] - var multipart: Option[FormData] + var body = RequestBody() var proxyUrl: URL #TODO? let fallbackMode = opt(rmCors) var window = RequestWindow(t: rwtClient) @@ -205,7 +218,6 @@ func newRequest*[T: string|JSRequest](ctx: JSContext; resource: T; let referrer = resource.request.referrer var credentials = resource.credentialsMode var body = resource.request.body - var multipart = resource.request.multipart var proxyUrl = resource.request.proxy #TODO? let fallbackMode = none(RequestMode) var window = resource.window @@ -226,8 +238,10 @@ func newRequest*[T: string|JSRequest](ctx: JSContext; resource: T; if init.body.isSome: let ibody = init.body.get case ibody.t - of bitFormData: multipart = some(ibody.formData) - of bitString: body = some(ibody.str) + of bitFormData: + body = RequestBody(t: rbtMultipart, multipart: ibody.formData) + of bitString: + body = RequestBody(t: rbtString, s: ibody.str) else: discard #TODO if httpMethod in {hmGet, hmHead}: return errTypeError("HEAD or GET Request cannot have a body.") @@ -245,7 +259,6 @@ func newRequest*[T: string|JSRequest](ctx: JSContext; resource: T; httpMethod, headers, body, - multipart, proxy = proxyUrl, referrer = referrer ), diff --git a/src/loader/response.nim b/src/loader/response.nim index 8ea17e64..3834d5a9 100644 --- a/src/loader/response.nim +++ b/src/loader/response.nim @@ -3,6 +3,8 @@ import std/tables import chagashi/charset import chagashi/decoder +import img/bitmap +import io/posixstream import io/promise import io/socketstream import loader/headers @@ -11,6 +13,7 @@ import monoucha/javascript import monoucha/jserror import monoucha/quickjs import types/blob +import types/color import types/opt import types/url import utils/mimeguess @@ -43,9 +46,12 @@ type headersGuard: HeadersGuard url*: URL #TODO should be urllist? unregisterFun*: proc() - bodyRead*: Promise[string] + resumeFun*: proc(outputId: int) + bodyRead*: EmptyPromise internalMessage*: string # should NOT be exposed to JS! outputId*: int + onRead*: proc(response: Response) {.nimcall.} + opaque*: RootRef jsDestructor(Response) @@ -54,7 +60,7 @@ proc newResponse*(res: int; request: Request; stream: SocketStream): Response = res: res, url: request.url, body: stream, - bodyRead: Promise[string](), + bodyRead: EmptyPromise(), outputId: -1 ) @@ -66,7 +72,8 @@ func makeNetworkError*(): Response {.jsstfunc: "Response.error".} = responseType: TYPE_ERROR, status: 0, headers: newHeaders(), - headersGuard: hgImmutable + headersGuard: hgImmutable, + bodyUsed: true ) func sok(response: Response): bool {.jsfget: "ok".} = @@ -102,6 +109,25 @@ func getContentType*(this: Response): string = # override buffer mime.types return DefaultGuess.guessContentType(this.url.pathname) +type TextOpaque = ref object of RootObj + buf: string + +const BufferSize = 4096 + +proc onReadText(response: Response) = + let opaque = TextOpaque(response.opaque) + while true: + let olen = opaque.buf.len + try: + opaque.buf.setLen(olen + BufferSize) + let n = response.body.recvData(addr opaque.buf[olen], BufferSize) + opaque.buf.setLen(olen + n) + if n == 0: + break + except ErrorAgain: + opaque.buf.setLen(olen) + break + proc text*(response: Response): Promise[JSResult[string]] {.jsfunc.} = if response.body == nil: let p = newPromise[JSResult[string]]() @@ -113,40 +139,96 @@ proc text*(response: Response): Promise[JSResult[string]] {.jsfunc.} = .err(newTypeError("Body has already been consumed")) p.resolve(err) return p - let bodyRead = response.bodyRead - response.bodyRead = nil - return bodyRead.then(proc(s: string): JSResult[string] = + let opaque = TextOpaque() + response.opaque = opaque + response.onRead = onReadText + response.bodyUsed = true + response.resumeFun(response.outputId) + response.resumeFun = nil + return response.bodyRead.then(proc(): JSResult[string] = let charset = response.getCharset(CHARSET_UTF_8) - #TODO this is inefficient - # maybe add a JS type that turns a seq[char] into JS strings - ok(s.decodeAll(charset)) + ok(opaque.buf.decodeAll(charset)) ) +type BlobOpaque = ref object of RootObj + p: pointer + len: int + size: int + +proc onReadBlob(response: Response) = + let opaque = BlobOpaque(response.opaque) + while true: + try: + let targetLen = opaque.len + BufferSize + if targetLen > opaque.size: + opaque.size = targetLen + opaque.p = realloc(opaque.p, targetLen) + let p = cast[ptr UncheckedArray[uint8]](opaque.p) + let n = response.body.recvData(addr p[opaque.len], BufferSize) + opaque.len += n + if n == 0: + break + except ErrorAgain: + break + proc blob*(response: Response): Promise[JSResult[Blob]] {.jsfunc.} = - if response.bodyRead == nil: + if response.bodyUsed: let p = newPromise[JSResult[Blob]]() let err = JSResult[Blob] .err(newTypeError("Body has already been consumed")) p.resolve(err) return p - let bodyRead = response.bodyRead - response.bodyRead = nil + let opaque = BlobOpaque() + response.opaque = opaque + response.onRead = onReadBlob + response.bodyUsed = true + response.resumeFun(response.outputId) + response.resumeFun = nil let contentType = response.getContentType() - return bodyRead.then(proc(s: string): JSResult[Blob] = - if s.len == 0: + return response.bodyRead.then(proc(): JSResult[Blob] = + let p = realloc(opaque.p, opaque.len) + opaque.p = nil + if p == nil: return ok(newBlob(nil, 0, contentType, nil)) - GC_ref(s) - let deallocFun = proc() = - GC_unref(s) - let blob = newBlob(unsafeAddr s[0], s.len, contentType, deallocFun) - ok(blob)) + ok(newBlob(p, opaque.len, contentType, deallocBlob)) + ) + +type BitmapOpaque = ref object of RootObj + bmp: Bitmap + idx: int + +proc onReadBitmap(response: Response) = + let opaque = BitmapOpaque(response.opaque) + let bmp = opaque.bmp + while true: + try: + let p = cast[ptr UncheckedArray[uint8]](addr bmp.px[0]) + let L = bmp.px.len * 4 - opaque.idx + let n = response.body.recvData(addr p[opaque.idx], L) + opaque.idx += n + if n == 0: + break + except ErrorAgain: + break + +proc saveToBitmap*(response: Response; bmp: Bitmap): EmptyPromise = + assert not response.bodyUsed + let opaque = BitmapOpaque(bmp: bmp, idx: 0) + let size = bmp.width * bmp.height + bmp.px = cast[seq[ARGBColor]](newSeqUninitialized[uint32](size)) + response.opaque = opaque + response.onRead = onReadBitmap + response.bodyUsed = true + response.resumeFun(response.outputId) + response.resumeFun = nil + return response.bodyRead proc json(ctx: JSContext; this: Response): Promise[JSResult[JSValue]] {.jsfunc.} = return this.text().then(proc(s: JSResult[string]): JSResult[JSValue] = let s = ?s - return ok(JS_ParseJSON(ctx, cstring(s), cast[csize_t](s.len), - cstring"<input>"))) + return ok(JS_ParseJSON(ctx, cstring(s), csize_t(s.len), cstring"<input>")) + ) proc addResponseModule*(ctx: JSContext) = ctx.registerType(Response) |