# A file loader server (?) # The idea here is that we receive requests with a socket, then respond to each # with a response (ideally a document.) # For now, the protocol looks like: # C: Request # S: res (0 => success, _ => error) # if success: # S: output ID # S: status code # S: headers # C: resume # S: response body # else: # S: error message # # The body is passed to the stream as-is, so effectively nothing can follow it. # # Note: if the consumer closes the request's body after headers have been # passed, it will *not* be cleaned up until a `resume' command is # received. (This allows for passing outputIds to the pager for later # addCacheFile commands there.) import std/deques import std/nativesockets import std/net import std/options import std/os import std/posix import std/selectors import std/strutils import std/tables import io/bufreader import io/bufwriter import io/dynstream import io/promise import io/serversocket import io/tempfile import io/urlfilter import loader/cgi import loader/connecterror import loader/headers import loader/loaderhandle import loader/request import loader/response import monoucha/javascript import monoucha/jserror import types/cookie import types/opt import types/referrer import types/urimethodmap import types/url import utils/twtstr export request export response type FileLoader* = ref object key*: ClientKey process*: int clientPid*: int connecting*: Table[int, ConnectData] ongoing*: Table[int, Response] unregistered*: seq[int] registerFun*: proc(fd: int) unregisterFun*: proc(fd: int) # directory where we store UNIX domain sockets sockDir*: string # (FreeBSD only) fd for the socket directory so we can connectat() on it sockDirFd*: int ConnectDataState = enum cdsBeforeResult, cdsBeforeStatus, cdsBeforeHeaders ConnectData = ref object state: ConnectDataState status: uint16 res: int outputId: int redirectNum: int promise: Promise[JSResult[Response]] stream*: SocketStream request: Request LoaderCommand = enum lcAddCacheFile lcAddClient lcGetCacheFile lcLoad lcLoadConfig lcPassFd lcRedirectToFile lcRemoveCachedItem lcRemoveClient lcResume lcShareCachedItem lcSuspend lcTee ClientKey* = array[32, uint8] CachedItem = ref object id: int path: string refc: int ClientData = ref object pid: int key: ClientKey cacheMap: seq[CachedItem] config: LoaderClientConfig LoaderContext = ref object pagerClient: ClientData ssock: ServerSocket alive: bool config: LoaderConfig handleMap: Table[int, LoaderHandle] outputMap: Table[int, OutputHandle] selector: Selector[int] # List of file descriptors passed by the pager. passedFdMap: Table[string, FileHandle] # host -> fd # List of existing clients (buffer or pager) that may make requests. clientData: Table[int, ClientData] # pid -> data # ID of next output. TODO: find a better allocation scheme outputNum: int LoaderConfig* = object cgiDir*: seq[string] uriMethodMap*: URIMethodMap w3mCGICompat*: bool tmpdir*: string sockdir*: string LoaderClientConfig* = object cookieJar*: CookieJar defaultHeaders*: Headers filter*: URLFilter proxy*: URL referrerPolicy*: ReferrerPolicy insecureSSLNoVerify*: bool FetchPromise* = Promise[JSResult[Response]] func isPrivileged(ctx: LoaderContext; client: ClientData): bool = return ctx.pagerClient == client #TODO this may be too low if we want to use urimethodmap for everything const MaxRewrites = 4 func canRewriteForCGICompat(ctx: LoaderContext; path: string): bool = if path.startsWith("/cgi-bin/") or path.startsWith("/$LIB/"): return true for dir in ctx.config.cgiDir: if path.startsWith(dir): return true return false proc rejectHandle(handle: LoaderHandle; code: ConnectErrorCode; msg = "") = handle.sendResult(code, msg) handle.close() func findOutput(ctx: LoaderContext; id: int; client: ClientData): OutputHandle = assert id != -1 for it in ctx.outputMap.values: if it.outputId == id: # verify that it's safe to access this handle. doAssert ctx.isPrivileged(client) or client.pid == it.ownerPid return it return nil func findCachedHandle(ctx: LoaderContext; cacheId: int): LoaderHandle = assert cacheId != -1 for it in ctx.handleMap.values: if it.cacheId == cacheId: return it return nil type PushBufferResult = enum pbrDone, pbrUnregister proc register(ctx: LoaderContext; output: OutputHandle) = assert not output.registered ctx.selector.registerHandle(int(output.ostream.fd), {Write}, 0) output.registered = true proc unregister(ctx: LoaderContext; output: OutputHandle) = assert output.registered ctx.selector.unregister(int(output.ostream.fd)) output.registered = false # Either write data to the target output, or append it to the list of buffers to # write and register the output in our selector. proc pushBuffer(ctx: LoaderContext; output: OutputHandle; buffer: LoaderBuffer; si: int): PushBufferResult = if output.suspended: if output.currentBuffer == nil: output.currentBuffer = buffer output.currentBufferIdx = si else: # si must be 0 here in all cases. Why? Well, it indicates the first unread # position after reading headers, and at that point currentBuffer will # be empty. # # Obviously, this breaks down if anything is pushed into the stream # before the header parser destroys itself. For now it never does, so we # should be fine. doAssert si == 0 output.buffers.addLast(buffer) elif output.currentBuffer == nil: var n = si try: n += output.ostream.sendData(buffer, si) except ErrorAgain: discard except ErrorBrokenPipe: return pbrUnregister if n < buffer.len: output.currentBuffer = buffer output.currentBufferIdx = n ctx.register(output) else: output.buffers.addLast(buffer) pbrDone proc getOutputId(ctx: LoaderContext): int = result = ctx.outputNum inc ctx.outputNum proc redirectToFile(ctx: LoaderContext; output: OutputHandle; targetPath: string): bool = let ps = newPosixStream(targetPath, O_CREAT or O_WRONLY, 0o600) if ps == nil: return false try: if output.currentBuffer != nil: let n = ps.sendData(output.currentBuffer, output.currentBufferIdx) if unlikely(n < output.currentBuffer.len - output.currentBufferIdx): ps.sclose() return false for buffer in output.buffers: let n = ps.sendData(buffer) if unlikely(n < buffer.len): ps.sclose() return false except ErrorBrokenPipe: # ps is dead; give up. ps.sclose() return false if output.istreamAtEnd: ps.sclose() elif output.parent != nil: output.parent.outputs.add(OutputHandle( parent: output.parent, ostream: ps, istreamAtEnd: output.istreamAtEnd, outputId: ctx.getOutputId() )) return true proc addCacheFile(ctx: LoaderContext; client: ClientData; output: OutputHandle): int = if output.parent != nil and output.parent.cacheId != -1: # may happen e.g. if client tries to cache a `cache:' URL return output.parent.cacheId let tmpf = getTempFile(ctx.config.tmpdir) if ctx.redirectToFile(output, tmpf): let cacheId = output.outputId if output.parent != nil: output.parent.cacheId = cacheId client.cacheMap.add(CachedItem(id: cacheId, path: tmpf, refc: 1)) return cacheId return -1 proc addFd(ctx: LoaderContext; handle: LoaderHandle) = let output = handle.output output.ostream.setBlocking(false) handle.istream.setBlocking(false) ctx.selector.registerHandle(int(handle.istream.fd), {Read}, 0) assert handle.istream.fd notin ctx.handleMap assert output.ostream.fd notin ctx.outputMap ctx.handleMap[handle.istream.fd] = handle ctx.outputMap[output.ostream.fd] = output type HandleReadResult = enum hrrDone, hrrUnregister, hrrBrokenPipe # Called whenever there is more data available to read. proc handleRead(ctx: LoaderContext; handle: LoaderHandle; unregWrite: var seq[OutputHandle]): HandleReadResult = var unregs = 0 let maxUnregs = handle.outputs.len while true: let buffer = newLoaderBuffer() try: let n = handle.istream.recvData(buffer) if n == 0: # EOF return hrrUnregister var si = 0 if handle.parser != nil: si = handle.parseHeaders(buffer) if si == -1: # died while parsing headers; unregister return hrrUnregister if si == n: # parsed the entire buffer as headers; skip output handling continue for output in handle.outputs: if output.dead: # do not push to unregWrite candidates continue case ctx.pushBuffer(output, buffer, si) of pbrUnregister: output.dead = true unregWrite.add(output) inc unregs of pbrDone: discard if unregs == maxUnregs: # early return: no more outputs to write to break if n < buffer.cap: break except ErrorAgain: # retry later break except ErrorBrokenPipe: # sender died; stop streaming return hrrBrokenPipe hrrDone # stream is a regular file, so we can't select on it. # cachedHandle is used for attaching the output handle to a different # LoaderHandle when loadFromCache is called while a download is still ongoing # (and thus some parts of the document are not cached yet). proc loadStreamRegular(ctx: LoaderContext; handle, cachedHandle: LoaderHandle) = assert handle.parser == nil # parser is only used with CGI var unregWrite: seq[OutputHandle] = @[] let r = ctx.handleRead(handle, unregWrite) for output in unregWrite: output.parent = nil let i = handle.outputs.find(output) if output.registered: ctx.unregister(output) handle.outputs.del(i) for output in handle.outputs: if r == hrrBrokenPipe: output.oclose() elif cachedHandle != nil: output.parent = cachedHandle cachedHandle.outputs.add(output) ctx.outputMap[output.ostream.fd] = output elif output.registered or output.suspended: output.parent = nil output.istreamAtEnd = true ctx.outputMap[output.ostream.fd] = output else: assert output.ostream.fd notin ctx.outputMap output.oclose() handle.outputs.setLen(0) handle.iclose() proc loadStream(ctx: LoaderContext; handle: LoaderHandle; request: Request) = ctx.passedFdMap.withValue(request.url.pathname, fdp): handle.sendResult(0) handle.sendStatus(200) handle.sendHeaders(newHeaders()) let ps = newPosixStream(fdp[]) var stats: Stat doAssert fstat(fdp[], stats) != -1 handle.istream = ps ctx.passedFdMap.del(request.url.pathname) if S_ISCHR(stats.st_mode) or S_ISREG(stats.st_mode): # regular file: e.g. cha = MaxRewrites: handle.rejectHandle(ERROR_TOO_MANY_REWRITES) proc setupRequestDefaults(request: Request; config: LoaderClientConfig) = for k, v in config.defaultHeaders.table: if k notin request.headers.table: request.headers.table[k] = v if config.cookieJar != nil and config.cookieJar.cookies.len > 0: if "Cookie" notin request.headers.table: let cookie = config.cookieJar.serialize(request.url) if cookie != "": request.headers["Cookie"] = cookie if request.referrer != nil and "Referer" notin request.headers: let r = request.referrer.getReferrer(request.url, config.referrerPolicy) if r != "": request.headers["Referer"] = r proc load(ctx: LoaderContext; stream: SocketStream; request: Request; client: ClientData; config: LoaderClientConfig) = let handle = newLoaderHandle(stream, ctx.getOutputId(), client.pid) when defined(debug): handle.url = request.url handle.output.url = request.url if not config.filter.match(request.url): handle.rejectHandle(ERROR_DISALLOWED_URL) else: request.setupRequestDefaults(config) if request.proxy == nil or not ctx.isPrivileged(client): request.proxy = config.proxy ctx.loadResource(client, config, request, handle) proc load(ctx: LoaderContext; stream: SocketStream; client: ClientData; r: var BufferedReader) = var request: Request r.sread(request) ctx.load(stream, request, client, client.config) proc loadConfig(ctx: LoaderContext; stream: SocketStream; client: ClientData; r: var BufferedReader) = var request: Request r.sread(request) var config: LoaderClientConfig r.sread(config) ctx.load(stream, request, client, config) proc getCacheFile(ctx: LoaderContext; stream: SocketStream; client: ClientData; r: var BufferedReader) = var cacheId: int r.sread(cacheId) stream.withPacketWriter w: let n = client.cacheMap.find(cacheId) if n != -1: w.swrite(client.cacheMap[n].path) else: w.swrite("") proc addClient(ctx: LoaderContext; stream: SocketStream; r: var BufferedReader) = var key: ClientKey var pid: int var config: LoaderClientConfig var clonedFrom: int r.sread(key) r.sread(pid) r.sread(config) r.sread(clonedFrom) stream.withPacketWriter w: if pid in ctx.clientData or key == default(ClientKey): w.swrite(false) else: let client = ClientData(pid: pid, key: key, config: config) ctx.clientData[pid] = client if clonedFrom != -1: let client2 = ctx.clientData[clonedFrom] for item in client2.cacheMap: inc item.refc client.cacheMap = client2.cacheMap w.swrite(true) stream.sclose() proc cleanup(client: ClientData) = for it in client.cacheMap: dec it.refc if it.refc == 0: discard unlink(cstring(it.path)) proc removeClient(ctx: LoaderContext; stream: SocketStream; r: var BufferedReader) = var pid: int r.sread(pid) if pid in ctx.clientData: let client = ctx.clientData[pid] client.cleanup() ctx.clientData.del(pid) stream.sclose() proc addCacheFile(ctx: LoaderContext; stream: SocketStream; client: ClientData; r: var BufferedReader) = var outputId: int var targetPid: int r.sread(outputId) #TODO get rid of targetPid r.sread(targetPid) doAssert ctx.isPrivileged(client) or client.pid == targetPid let output = ctx.findOutput(outputId, client) assert output != nil let targetClient = ctx.clientData[targetPid] let id = ctx.addCacheFile(targetClient, output) stream.withPacketWriter w: w.swrite(id) stream.sclose() proc redirectToFile(ctx: LoaderContext; stream: SocketStream; r: var BufferedReader) = var outputId: int var targetPath: string r.sread(outputId) r.sread(targetPath) let output = ctx.findOutput(outputId, ctx.pagerClient) var success = false if output != nil: success = ctx.redirectToFile(output, targetPath) stream.withPacketWriter w: w.swrite(success) stream.sclose() proc shareCachedItem(ctx: LoaderContext; stream: SocketStream; r: var BufferedReader) = # share a cached file with another buffer. this is for newBufferFrom # (i.e. view source) var sourcePid: int # pid of source client var targetPid: int # pid of target client var id: int r.sread(sourcePid) r.sread(targetPid) r.sread(id) let sourceClient = ctx.clientData[sourcePid] let targetClient = ctx.clientData[targetPid] let n = sourceClient.cacheMap.find(id) let item = sourceClient.cacheMap[n] inc item.refc targetClient.cacheMap.add(item) stream.sclose() proc passFd(ctx: LoaderContext; stream: SocketStream; r: var BufferedReader) = var id: string r.sread(id) let fd = stream.recvFileHandle() ctx.passedFdMap[id] = fd stream.sclose() proc removeCachedItem(ctx: LoaderContext; stream: SocketStream; client: ClientData; r: var BufferedReader) = var id: int r.sread(id) let n = client.cacheMap.find(id) if n != -1: let item = client.cacheMap[n] client.cacheMap.del(n) dec item.refc if item.refc == 0: discard unlink(cstring(item.path)) stream.sclose() proc tee(ctx: LoaderContext; stream: SocketStream; client: ClientData; r: var BufferedReader) = var sourceId: int var targetPid: int r.sread(sourceId) r.sread(targetPid) let outputIn = ctx.findOutput(sourceId, client) if outputIn != nil: let id = ctx.getOutputId() let output = outputIn.tee(stream, id, targetPid) ctx.outputMap[output.ostream.fd] = output stream.withPacketWriter w: w.swrite(id) stream.setBlocking(false) else: stream.withPacketWriter w: w.swrite(-1) stream.sclose() proc suspend(ctx: LoaderContext; stream: SocketStream; client: ClientData; r: var BufferedReader) = var ids: seq[int] r.sread(ids) for id in ids: let output = ctx.findOutput(id, client) if output != nil: output.suspended = true if output.registered: # do not waste cycles trying to push into output ctx.unregister(output) stream.sclose() proc resume(ctx: LoaderContext; stream: SocketStream; client: ClientData; r: var BufferedReader) = var ids: seq[int] r.sread(ids) for id in ids: let output = ctx.findOutput(id, client) if output != nil: output.suspended = false ctx.register(output) stream.sclose() proc equalsConstantTime(a, b: ClientKey): bool = static: doAssert a.len == b.len {.push boundChecks:off, overflowChecks:off.} var i {.volatile.} = 0 var res {.volatile.} = 0u8 while i < a.len: res = res or (a[i] xor b[i]) inc i {.pop.} return res == 0 proc acceptConnection(ctx: LoaderContext) = let stream = ctx.ssock.acceptSocketStream() try: stream.withPacketReader r: var myPid: int var key: ClientKey r.sread(myPid) r.sread(key) if myPid notin ctx.clientData: # possibly already removed stream.sclose() return let client = ctx.clientData[myPid] if not client.key.equalsConstantTime(key): # ditto stream.sclose() return var cmd: LoaderCommand r.sread(cmd) template privileged_command = doAssert ctx.isPrivileged(client) case cmd of lcAddClient: privileged_command ctx.addClient(stream, r) of lcRemoveClient: privileged_command ctx.removeClient(stream, r) of lcShareCachedItem: privileged_command ctx.shareCachedItem(stream, r) of lcPassFd: privileged_command ctx.passFd(stream, r) of lcRedirectToFile: privileged_command ctx.redirectToFile(stream, r) of lcLoadConfig: privileged_command ctx.loadConfig(stream, client, r) of lcGetCacheFile: privileged_command ctx.getCacheFile(stream, client, r) of lcAddCacheFile: ctx.addCacheFile(stream, client, r) of lcRemoveCachedItem: ctx.removeCachedItem(stream, client, r) of lcLoad: ctx.load(stream, client, r) of lcTee: ctx.tee(stream, client, r) of lcSuspend: ctx.suspend(stream, client, r) of lcResume: ctx.resume(stream, client, r) except ErrorBrokenPipe: # receiving end died while reading the file; give up. assert stream.fd notin ctx.outputMap stream.sclose() proc exitLoader(ctx: LoaderContext) = ctx.ssock.close() for client in ctx.clientData.values: client.cleanup() exitnow(1) var gctx: LoaderContext proc initLoaderContext(fd: cint; config: LoaderConfig): LoaderContext = var ctx = LoaderContext( alive: true, config: config, selector: newSelector[int]() ) gctx = ctx let myPid = getCurrentProcessId() # we don't capsicumize loader, so -1 is appropriate here ctx.ssock = initServerSocket(config.sockdir, -1, myPid, blocking = true) let sfd = int(ctx.ssock.sock.getFd()) ctx.selector.registerHandle(sfd, {Read}, 0) # The server has been initialized, so the main process can resume execution. let ps = newPosixStream(fd) ps.write(char(0u8)) ps.sclose() onSignal SIGTERM: discard sig gctx.exitLoader() for dir in ctx.config.cgiDir.mitems: if dir.len > 0 and dir[^1] != '/': dir &= '/' # get pager's key let stream = ctx.ssock.acceptSocketStream() stream.withPacketReader r: block readNullKey: var pid: int # ignore pid r.sread(pid) # pager's key is still null var key: ClientKey r.sread(key) doAssert key == default(ClientKey) var cmd: LoaderCommand r.sread(cmd) doAssert cmd == lcAddClient var key: ClientKey var pid: int var config: LoaderClientConfig r.sread(key) r.sread(pid) r.sread(config) stream.withPacketWriter w: w.swrite(true) ctx.pagerClient = ClientData(key: key, pid: pid, config: config) ctx.clientData[pid] = ctx.pagerClient stream.sclose() # unblock main socket ctx.ssock.sock.getFd().setBlocking(false) # for CGI putEnv("SERVER_SOFTWARE", "Chawan") putEnv("SERVER_PROTOCOL", "HTTP/1.0") putEnv("SERVER_NAME", "localhost") putEnv("SERVER_PORT", "80") putEnv("REMOTE_HOST", "localhost") putEnv("REMOTE_ADDR", "127.0.0.1") putEnv("GATEWAY_INTERFACE", "CGI/1.1") putEnv("CHA_INSECURE_SSL_NO_VERIFY", "0") return ctx # This is only called when an OutputHandle could not read enough of one (or # more) buffers, and we asked select to notify us when it will be available. proc handleWrite(ctx: LoaderContext; output: OutputHandle; unregWrite: var seq[OutputHandle]) = while output.currentBuffer != nil: let buffer = output.currentBuffer try: let n = output.ostream.sendData(buffer, output.currentBufferIdx) output.currentBufferIdx += n if output.currentBufferIdx < buffer.len: break output.bufferCleared() # swap out buffer except ErrorAgain: # never mind break except ErrorBrokenPipe: # receiver died; stop streaming unregWrite.add(output) break if output.isEmpty: if output.istreamAtEnd: # after EOF, no need to send anything more here unregWrite.add(output) else: # all buffers sent, no need to select on this output again for now ctx.unregister(output) proc finishCycle(ctx: LoaderContext; unregRead: var seq[LoaderHandle]; unregWrite: var seq[OutputHandle]) = # Unregister handles queued for unregistration. # It is possible for both unregRead and unregWrite to contain duplicates. To # avoid double-close/double-unregister, we set the istream/ostream of # unregistered handles to nil. for handle in unregRead: if handle.istream != nil: ctx.selector.unregister(int(handle.istream.fd)) ctx.handleMap.del(handle.istream.fd) if handle.parser != nil: handle.finishParse() handle.iclose() for output in handle.outputs: output.istreamAtEnd = true if output.isEmpty: unregWrite.add(output) for output in unregWrite: if output.ostream != nil: if output.registered: ctx.unregister(output) ctx.outputMap.del(output.ostream.fd) output.oclose() let handle = output.parent if handle != nil: # may be nil if from loadStream S_ISREG let i = handle.outputs.find(output) handle.outputs.del(i) if handle.outputs.len == 0 and handle.istream != nil: # premature end of all output streams; kill istream too ctx.selector.unregister(int(handle.istream.fd)) ctx.handleMap.del(handle.istream.fd) if handle.parser != nil: handle.finishParse() handle.iclose() proc runFileLoader*(fd: cint; config: LoaderConfig) = var ctx = initLoaderContext(fd, config) let fd = int(ctx.ssock.sock.getFd()) while ctx.alive: let events = ctx.selector.select(-1) var unregRead: seq[LoaderHandle] = @[] var unregWrite: seq[OutputHandle] = @[] for event in events: if Read in event.events: if event.fd == fd: # incoming connection ctx.acceptConnection() else: let handle = ctx.handleMap[event.fd] case ctx.handleRead(handle, unregWrite) of hrrDone: discard of hrrUnregister, hrrBrokenPipe: unregRead.add(handle) if Write in event.events: ctx.handleWrite(ctx.outputMap[event.fd], unregWrite) if Error in event.events: assert event.fd != fd ctx.outputMap.withValue(event.fd, outputp): # ostream died unregWrite.add(outputp[]) do: # istream died let handle = ctx.handleMap[event.fd] unregRead.add(handle) ctx.finishCycle(unregRead, unregWrite) ctx.exitLoader() proc getRedirect*(response: Response; request: Request): Request = if "Location" in response.headers.table: if response.status in 301u16..303u16 or response.status in 307u16..308u16: let location = response.headers.table["Location"][0] let url = parseURL(location, option(request.url)) if url.isSome: let status = response.status if status == 303 and request.httpMethod notin {hmGet, hmHead} or status == 301 or status == 302 and request.httpMethod == hmPost: return newRequest(url.get, hmGet) else: return newRequest(url.get, request.httpMethod, body = request.body) return nil template withLoaderPacketWriter(stream: SocketStream; loader: FileLoader; w, body: untyped) = stream.withPacketWriter w: w.swrite(loader.clientPid) w.swrite(loader.key) body proc connect(loader: FileLoader): SocketStream = return connectSocketStream(loader.sockDir, loader.sockDirFd, loader.process, blocking = true) # Start a request. This should not block (not for a significant amount of time # anyway). proc startRequest(loader: FileLoader; request: Request): SocketStream = let stream = loader.connect() stream.withLoaderPacketWriter loader, w: w.swrite(lcLoad) w.swrite(request) return stream proc startRequest*(loader: FileLoader; request: Request; config: LoaderClientConfig): SocketStream = let stream = loader.connect() stream.withLoaderPacketWriter loader, w: w.swrite(lcLoadConfig) w.swrite(request) w.swrite(config) return stream proc fetch0(loader: FileLoader; input: Request; promise: FetchPromise; redirectNum: int) = let stream = loader.startRequest(input) let fd = int(stream.fd) loader.registerFun(fd) loader.connecting[fd] = ConnectData( promise: promise, request: input, stream: stream, redirectNum: redirectNum ) proc fetch*(loader: FileLoader; input: Request): FetchPromise = let promise = FetchPromise() loader.fetch0(input, promise, 0) return promise proc reconnect*(loader: FileLoader; data: ConnectData) = data.stream.sclose() let stream = loader.startRequest(data.request) let fd = int(stream.fd) loader.registerFun(fd) loader.connecting[fd] = ConnectData( promise: data.promise, request: data.request, stream: stream ) proc suspend*(loader: FileLoader; fds: seq[int]) = let stream = loader.connect() stream.withLoaderPacketWriter loader, w: w.swrite(lcSuspend) w.swrite(fds) stream.sclose() proc resume*(loader: FileLoader; fds: openArray[int]) = let stream = loader.connect() stream.withLoaderPacketWriter loader, w: w.swrite(lcResume) w.swrite(fds) stream.sclose() proc resume*(loader: FileLoader; fds: int) = loader.resume([fds]) proc tee*(loader: FileLoader; sourceId, targetPid: int): (SocketStream, int) = let stream = loader.connect() stream.withLoaderPacketWriter loader, w: w.swrite(lcTee) w.swrite(sourceId) w.swrite(targetPid) var outputId: int var r = stream.initPacketReader() r.sread(outputId) return (stream, outputId) proc addCacheFile*(loader: FileLoader; outputId, targetPid: int): int = let stream = loader.connect() if stream == nil: return -1 stream.withLoaderPacketWriter loader, w: w.swrite(lcAddCacheFile) w.swrite(outputId) w.swrite(targetPid) var r = stream.initPacketReader() var outputId: int r.sread(outputId) return outputId proc getCacheFile*(loader: FileLoader; cacheId: int): string = let stream = loader.connect() if stream == nil: return "" stream.withLoaderPacketWriter loader, w: w.swrite(lcGetCacheFile) w.swrite(cacheId) var r = stream.initPacketReader() var s: string r.sread(s) return s proc redirectToFile*(loader: FileLoader; outputId: int; targetPath: string): bool = let stream = loader.connect() if stream == nil: return false stream.withLoaderPacketWriter loader, w: w.swrite(lcRedirectToFile) w.swrite(outputId) w.swrite(targetPath) var r = stream.initPacketReader() r.sread(result) proc onConnected*(loader: FileLoader; fd: int) = let connectData = loader.connecting[fd] let stream = connectData.stream let promise = connectData.promise let request = connectData.request var r = stream.initPacketReader() case connectData.state of cdsBeforeResult: var res: int r.sread(res) # packet 1 if res == 0: r.sread(connectData.outputId) # packet 1 inc connectData.state else: var msg: string # msg is discarded. #TODO maybe print if called from trusted code (i.e. global == client)? r.sread(msg) # packet 1 loader.unregisterFun(fd) loader.unregistered.add(fd) stream.sclose() # delete before resolving the promise loader.connecting.del(fd) let err = newTypeError("NetworkError when attempting to fetch resource") promise.resolve(JSResult[Response].err(err)) of cdsBeforeStatus: r.sread(connectData.status) # packet 2 inc connectData.state of cdsBeforeHeaders: let response = newResponse(connectData.res, request, stream, connectData.outputId, connectData.status) r.sread(response.headers) # packet 3 # Only a stream of the response body may arrive after this point. response.body = stream assert loader.unregisterFun != nil response.unregisterFun = proc() = loader.ongoing.del(response.body.fd) loader.unregistered.add(response.body.fd) loader.unregisterFun(response.body.fd) response.resumeFun = proc(outputId: int) = loader.resume(outputId) loader.ongoing[fd] = response stream.setBlocking(false) let redirect = response.getRedirect(request) # delete before resolving the promise loader.connecting.del(fd) if redirect != nil: response.unregisterFun() stream.sclose() let redirectNum = connectData.redirectNum + 1 if redirectNum < 5: #TODO use config.network.max_redirect? loader.fetch0(redirect, promise, redirectNum) else: let err = newTypeError("NetworkError when attempting to fetch resource") promise.resolve(JSResult[Response].err(err)) else: promise.resolve(JSResult[Response].ok(response)) proc onRead*(loader: FileLoader; fd: int) = let response = loader.ongoing.getOrDefault(fd) if response != nil: response.onRead(response) if response.body.isend: response.bodyRead.resolve() response.bodyRead = nil response.unregisterFun() proc onError*(loader: FileLoader; fd: int) = let response = loader.ongoing.getOrDefault(fd) if response != nil: when defined(debug): var lbuf {.noinit.}: array[BufferSize, char] if not response.body.isend: let n = response.body.recvData(lbuf) assert n == 0 assert response.body.isend response.bodyRead.resolve() response.bodyRead = nil response.unregisterFun() # Note: this blocks until headers are received. proc doRequest*(loader: FileLoader; request: Request): Response = let stream = loader.startRequest(request) let response = Response(url: request.url) var r = stream.initPacketReader() r.sread(response.res) # packet 1 if response.res == 0: r.sread(response.outputId) # packet 1 r = stream.initPacketReader() r.sread(response.status) # packet 2 r = stream.initPacketReader() r.sread(response.headers) # packet 3 # Only a stream of the response body may arrive after this point. response.body = stream else: var msg: string r.sread(msg) # packet 1 stream.sclose() return response proc shareCachedItem*(loader: FileLoader; id, targetPid: int; sourcePid = -1) = let stream = loader.connect() if stream != nil: let sourcePid = if sourcePid != -1: sourcePid else: loader.clientPid stream.withLoaderPacketWriter loader, w: w.swrite(lcShareCachedItem) w.swrite(sourcePid) w.swrite(targetPid) w.swrite(id) stream.sclose() proc passFd*(loader: FileLoader; id: string; fd: FileHandle) = let stream = loader.connect() if stream != nil: stream.withLoaderPacketWriter loader, w: w.swrite(lcPassFd) w.swrite(id) stream.sendFileHandle(fd) stream.sclose() proc removeCachedItem*(loader: FileLoader; cacheId: int) = let stream = loader.connect() if stream != nil: stream.withLoaderPacketWriter loader, w: w.swrite(lcRemoveCachedItem) w.swrite(cacheId) stream.sclose() proc addClient*(loader: FileLoader; key: ClientKey; pid: int; config: LoaderClientConfig; clonedFrom: int): bool = let stream = loader.connect() stream.withLoaderPacketWriter loader, w: w.swrite(lcAddClient) w.swrite(key) w.swrite(pid) w.swrite(config) w.swrite(clonedFrom) var r = stream.initPacketReader() r.sread(result) stream.sclose() proc removeClient*(loader: FileLoader; pid: int) = let stream = loader.connect() if stream != nil: stream.withLoaderPacketWriter loader, w: w.swrite(lcRemoveClient) w.swrite(pid) stream.sclose() when defined(freebsd): let O_DIRECTORY* {.importc, header: "", noinit.}: cint proc setSocketDir*(loader: FileLoader; path: string) = loader.sockDir = path when defined(freebsd): loader.sockDirFd = open(cstring(path), O_DIRECTORY) else: loader.sockDirFd = -1