about summary refs log tree commit diff stats
path: root/src/loader/loader.nim
diff options
context:
space:
mode:
authorbptato <nincsnevem662@gmail.com>2024-02-12 17:03:35 +0100
committerbptato <nincsnevem662@gmail.com>2024-02-12 17:03:35 +0100
commit8e6783a45fba48dd8f63fe7486e4691f05220b52 (patch)
tree5aae9f9f95432609a497eea858c4a3401dac172b /src/loader/loader.nim
parent69b1a7e7f6e0a675cd70805768162de5621e8279 (diff)
downloadchawan-8e6783a45fba48dd8f63fe7486e4691f05220b52.tar.gz
Remove CLONE BufferSource; cache document sources in tmpdir
At last all BufferSources are unified.

To achieve the same effect as the previous CLONE source type, we now
use the "fromcache" flag in Request. This *forces* the document to be
streamed from the disk; if the file no longer exists for some reason,
an error is returned (i.e. the document is not re-downloaded).

For a document to be cached, it has to be the main document of the
buffer (i.e. no additional resources requested with fetch()), and
also not an x-htmloutput HTML file (for those, the original source is
saved). The result is that toggleSource now always returns the actual
source for e.g. markdown files, not the HTML-transformed version.

Also, it is now possible to view the source of a document that is
still being downloaded.

buffer.sstream has almost been eliminated; it still exists, but only as
a pseudo-buffer to interface with EncoderStream and DecoderStream. It no
longer holds the entire source of a buffer at any point, and is cleared
as soon as the buffer is completely loaded.
Diffstat (limited to 'src/loader/loader.nim')
-rw-r--r--src/loader/loader.nim238
1 files changed, 166 insertions, 72 deletions
diff --git a/src/loader/loader.nim b/src/loader/loader.nim
index 95d119e2..0755d427 100644
--- a/src/loader/loader.nim
+++ b/src/loader/loader.nim
@@ -28,6 +28,7 @@ import std/streams
 import std/strutils
 import std/tables
 
+import extern/tempfile
 import io/posixstream
 import io/promise
 import io/serialize
@@ -42,6 +43,7 @@ import loader/headers
 import loader/loaderhandle
 import loader/request
 import loader/response
+import loader/streamid
 import types/cookie
 import types/referer
 import types/urimethodmap
@@ -57,6 +59,7 @@ export response
 type
   FileLoader* = ref object
     process*: Pid
+    clientPid*: int
     connecting*: Table[int, ConnectData]
     ongoing*: Table[int, OngoingData]
     unregistered*: seq[int]
@@ -65,7 +68,7 @@ type
 
   ConnectData = object
     promise: Promise[JSResult[Response]]
-    stream: Stream
+    stream: SocketStream
     request: Request
 
   OngoingData = object
@@ -78,13 +81,12 @@ type
     TEE
     SUSPEND
     RESUME
+    REWIND
     ADDREF
     UNREF
     SET_REFERRER_POLICY
     PASS_FD
 
-  ClientFdMap = seq[tuple[pid, fd: int, output: OutputHandle]]
-
   LoaderContext = ref object
     refcount: int
     ssock: ServerSocket
@@ -92,10 +94,11 @@ type
     config: LoaderConfig
     handleMap: Table[int, LoaderHandle]
     outputMap: Table[int, OutputHandle]
-    clientFdMap: ClientFdMap
     referrerpolicy: ReferrerPolicy
     selector: Selector[int]
     fd: int
+    # List of cached files. Note that fds from passFd are never cached.
+    cacheMap: Table[string, string] # URL -> path
     # List of file descriptors passed by the pager.
     passedFdMap: Table[string, FileHandle]
 
@@ -111,6 +114,7 @@ type
     uriMethodMap*: URIMethodMap
     w3mCGICompat*: bool
     libexecPath*: string
+    tmpdir*: string
 
   FetchPromise* = Promise[JSResult[Response]]
 
@@ -129,24 +133,27 @@ proc rejectHandle(handle: LoaderHandle, code: ConnectErrorCode, msg = "") =
   handle.sendResult(code, msg)
   handle.close()
 
-func findOutputIdx(clientFdMap: ClientFdMap, pid, fd: int): int =
-  for i, (itpid, itfd, _) in clientFdMap:
-    if pid == itpid and fd == itfd:
-      return i
-  return -1
-
-proc delOutput(clientFdMap: var ClientFdMap, pid, fd: int) =
-  let i = clientFdMap.findOutputIdx(pid, fd)
-  if i != -1:
-    clientFdMap.del(i)
-
-func findOutput(clientFdMap: ClientFdMap, pid, fd: int): OutputHandle =
-  let i = clientFdMap.findOutputIdx(pid, fd)
-  if i != -1:
-    return clientFdMap[i].output
+func findOutput(ctx: LoaderContext, id: StreamId): OutputHandle =
+  assert id.pid != -1 and id.fd != -1
+  for it in ctx.outputMap.values:
+    if it.clientId == id:
+      return it
+  return nil
+
+#TODO linear search over strings :(
+func findCachedHandle(ctx: LoaderContext, cachepath: string): LoaderHandle =
+  assert cachepath != ""
+  for it in ctx.handleMap.values:
+    if it.cached and it.cachepath == cachepath:
+      return it
   return nil
 
-proc addFd(ctx: LoaderContext, handle: LoaderHandle) =
+proc delOutput(ctx: LoaderContext, id: StreamId) =
+  let output = ctx.findOutput(id)
+  if output != nil:
+    ctx.outputMap.del(output.ostream.fd)
+
+proc addFd(ctx: LoaderContext, handle: LoaderHandle, originalUrl: URL) =
   let output = handle.output
   output.ostream.setBlocking(false)
   ctx.selector.registerHandle(handle.istream.fd, {Read}, 0)
@@ -159,10 +166,17 @@ proc addFd(ctx: LoaderContext, handle: LoaderHandle) =
     # (kind of a hack, but should always work)
     ctx.outputMap[output.ostream.fd] = output
     ctx.outputMap.del(output.sostream.fd)
-    if output.clientPid != -1:
-      ctx.clientFdMap.delOutput(output.clientPid, output.clientFd)
-      output.clientFd = -1
-      output.clientPid = -1
+    if output.clientId != NullStreamId:
+      ctx.delOutput(output.clientId)
+      output.clientId = NullStreamId
+  if originalUrl != nil:
+    let tmpf = getTempFile(ctx.config.tmpdir)
+    let ps = newPosixStream(tmpf, O_CREAT or O_WRONLY, 0o600)
+    if ps != nil:
+      output.tee(ps, NullStreamId)
+      let path = $originalUrl
+      ctx.cacheMap[path] = tmpf
+      handle.cachepath = path
 
 proc loadStream(ctx: LoaderContext, handle: LoaderHandle, request: Request) =
   ctx.passedFdMap.withValue(request.url.host, fdp):
@@ -172,12 +186,13 @@ proc loadStream(ctx: LoaderContext, handle: LoaderHandle, request: Request) =
     handle.istream = newPosixStream(fdp[])
     ctx.passedFdMap.del(request.url.host)
   do:
-    handle.rejectHandle(ERROR_FILE_NOT_FOUND, "stream not found")
+    handle.sendResult(ERROR_FILE_NOT_FOUND, "stream not found")
 
 proc loadResource(ctx: LoaderContext, request: Request, handle: LoaderHandle) =
   var redo = true
   var tries = 0
   var prevurl: URL = nil
+  let originalUrl = request.url
   while redo and tries < MaxRewrites:
     redo = false
     if ctx.config.w3mCGICompat and request.url.scheme == "file":
@@ -190,15 +205,20 @@ proc loadResource(ctx: LoaderContext, request: Request, handle: LoaderHandle) =
           redo = true
           continue
     if request.url.scheme == "cgi-bin":
-      handle.loadCGI(request, ctx.config.cgiDir, ctx.config.libexecPath, prevurl)
+      handle.loadCGI(request, ctx.config.cgiDir, ctx.config.libexecPath,
+        prevurl)
       if handle.istream != nil:
-        ctx.addFd(handle)
+        let originalUrl = if handle.cached: originalUrl else: nil
+        ctx.addFd(handle, originalUrl)
       else:
         handle.close()
     elif request.url.scheme == "stream":
       ctx.loadStream(handle, request)
       if handle.istream != nil:
-        ctx.addFd(handle)
+        let originalUrl = if handle.cached: originalUrl else: nil
+        ctx.addFd(handle, originalUrl)
+      else:
+        handle.close()
     else:
       prevurl = request.url
       case ctx.config.uriMethodMap.findAndRewrite(request.url)
@@ -212,20 +232,61 @@ proc loadResource(ctx: LoaderContext, request: Request, handle: LoaderHandle) =
   if tries >= MaxRewrites:
     handle.rejectHandle(ERROR_TOO_MANY_REWRITES)
 
+proc loadFromCache(ctx: LoaderContext, stream: SocketStream, request: Request) =
+  let handle = newLoaderHandle(stream, false, request.clientId)
+  let surl = $request.url
+  let cachedHandle = ctx.findCachedHandle(surl)
+  ctx.cacheMap.withValue(surl, p):
+    let ps = newPosixStream(p[], O_RDONLY, 0)
+    if ps == nil:
+      handle.rejectHandle(ERROR_FILE_NOT_IN_CACHE)
+      ctx.cacheMap.del(surl)
+      return
+    handle.sendResult(0)
+    handle.sendStatus(200)
+    handle.sendHeaders(newHeaders())
+    var buffer {.noinit.}: array[BufferSize, uint8]
+    try:
+      while true:
+        let n = ps.recvData(addr buffer[0], buffer.len)
+        if buffer.len == 0:
+          break
+        if handle.output.sendData(addr buffer[0], n) < n:
+          break
+        if n < buffer.len:
+          break
+    except ErrorBrokenPipe:
+      handle.close()
+      raise
+    ps.close()
+  do:
+    if cachedHandle == nil:
+      handle.sendResult(ERROR_URL_NOT_IN_CACHE)
+  if cachedHandle != nil:
+    # download is still ongoing; move output to the original handle
+    let output = handle.output
+    output.ostream.setBlocking(false)
+    handle.outputs.setLen(0)
+    output.parent = cachedHandle
+    cachedHandle.outputs.add(output)
+    ctx.outputMap[output.ostream.fd] = output
+  handle.close()
+
 proc onLoad(ctx: LoaderContext, stream: SocketStream) =
   var request: Request
   stream.sread(request)
   let handle = newLoaderHandle(
     stream,
     request.canredir,
-    request.clientPid,
-    request.clientFd
+    request.clientId
   )
-  assert request.clientPid != 0
+  assert request.clientId.pid != 0
   when defined(debug):
     handle.url = request.url
   if not ctx.config.filter.match(request.url):
     handle.rejectHandle(ERROR_DISALLOWED_URL)
+  elif request.fromcache:
+    ctx.loadFromCache(stream, request)
   else:
     for k, v in ctx.config.defaultheaders.table:
       if k notin request.headers.table:
@@ -243,9 +304,35 @@ proc onLoad(ctx: LoaderContext, stream: SocketStream) =
       request.proxy = ctx.config.proxy
     let fd = int(stream.source.getFd())
     ctx.outputMap[fd] = handle.output
-    ctx.clientFdMap.add((request.clientPid, request.clientFd, handle.output))
     ctx.loadResource(request, handle)
 
+proc rewind(ctx: LoaderContext, stream: PosixStream, clientId: StreamId) =
+  let output = ctx.findOutput(clientId)
+  if output == nil or output.ostream == nil:
+    stream.swrite(false)
+    return
+  let handle = output.parent
+  if not handle.cached:
+    stream.swrite(false)
+    return
+  assert handle.cachepath != ""
+  let ps = newPosixStream(handle.cachepath, O_RDONLY, 0)
+  if ps == nil:
+    stream.swrite(false)
+    return
+  stream.swrite(true)
+  output.ostream.setBlocking(true) #TODO
+  var buffer {.noinit.}: array[BufferSize, uint8]
+  while true:
+    let n = ps.recvData(addr buffer[0], BufferSize)
+    if n == 0:
+      break
+    if output.sendData(addr buffer[0], n) < n:
+      break
+    if n < BufferSize:
+      break
+  ps.close()
+
 proc acceptConnection(ctx: LoaderContext) =
   let stream = ctx.ssock.acceptSocketStream()
   try:
@@ -255,25 +342,22 @@ proc acceptConnection(ctx: LoaderContext) =
     of LOAD:
       ctx.onLoad(stream)
     of TEE:
-      var clientPid: int
-      var clientFd: int
-      var pid: int
-      var fd: int
-      stream.sread(pid)
-      stream.sread(fd)
-      stream.sread(clientPid)
-      stream.sread(clientFd)
-      let output = ctx.clientFdMap.findOutput(pid, fd)
+      var targetId: StreamId
+      var clientId: StreamId
+      stream.sread(targetId)
+      stream.sread(clientId)
+      let output = ctx.findOutput(targetId)
       if output != nil:
-        output.tee(stream, clientPid, clientFd)
+        output.tee(stream, clientId)
       stream.swrite(output != nil)
+      stream.setBlocking(false)
     of SUSPEND:
       var pid: int
       var fds: seq[int]
       stream.sread(pid)
       stream.sread(fds)
       for fd in fds:
-        let output = ctx.clientFdMap.findOutput(pid, fd)
+        let output = ctx.findOutput((pid, fd))
         if output != nil:
           # remove from the selector, so any new reads will be just placed
           # in the handle's buffer
@@ -284,11 +368,15 @@ proc acceptConnection(ctx: LoaderContext) =
       stream.sread(pid)
       stream.sread(fds)
       for fd in fds:
-        let output = ctx.clientFdMap.findOutput(pid, fd)
+        let output = ctx.findOutput((pid, fd))
         if output != nil:
           # place the stream back into the selector, so we can write to it
           # again
           ctx.selector.registerHandle(output.ostream.fd, {Write}, 0)
+    of REWIND:
+      var targetId: StreamId
+      stream.sread(targetId)
+      ctx.rewind(stream, targetId)
     of ADDREF:
       inc ctx.refcount
     of UNREF:
@@ -313,6 +401,8 @@ proc acceptConnection(ctx: LoaderContext) =
 
 proc exitLoader(ctx: LoaderContext) =
   ctx.ssock.close()
+  for path in ctx.cacheMap.values:
+    discard unlink(cstring(path))
   quit(0)
 
 var gctx: LoaderContext
@@ -434,8 +524,8 @@ proc finishCycle(ctx: LoaderContext, unregRead: var seq[LoaderHandle],
       if output.registered:
         ctx.selector.unregister(output.ostream.fd)
       ctx.outputMap.del(output.ostream.fd)
-      if output.clientFd != -1:
-        ctx.clientFdMap.delOutput(output.clientPid, output.clientFd)
+      if output.clientId != NullStreamId:
+        ctx.delOutput(output.clientId)
       output.ostream.close()
       output.ostream = nil
       let handle = output.parent
@@ -548,8 +638,7 @@ proc applyHeaders(loader: FileLoader, request: Request, response: Response) =
 #TODO: add init
 proc fetch*(loader: FileLoader, input: Request): FetchPromise =
   let stream = connectSocketStream(loader.process, false, blocking = true)
-  input.clientPid = getpid()
-  input.clientFd = int(stream.fd)
+  input.clientId = (loader.clientPid, int(stream.fd))
   stream.swrite(LOAD)
   stream.swrite(input)
   stream.flush()
@@ -565,8 +654,7 @@ proc fetch*(loader: FileLoader, input: Request): FetchPromise =
 
 proc reconnect*(loader: FileLoader, data: ConnectData) =
   let stream = connectSocketStream(loader.process, false, blocking = true)
-  data.request.clientPid = getpid()
-  data.request.clientFd = int(stream.fd)
+  data.request.clientId = (loader.clientPid, int(stream.fd))
   stream.swrite(LOAD)
   stream.swrite(data.request)
   stream.flush()
@@ -578,7 +666,7 @@ proc reconnect*(loader: FileLoader, data: ConnectData) =
     stream: stream
   )
 
-proc switchStream*(data: var ConnectData, stream: Stream) =
+proc switchStream*(data: var ConnectData, stream: SocketStream) =
   data.stream = stream
 
 proc switchStream*(loader: FileLoader, data: var OngoingData,
@@ -593,33 +681,41 @@ proc switchStream*(loader: FileLoader, data: var OngoingData,
     loader.unregisterFun(fd)
     realCloseImpl(stream)
 
-proc suspend*(loader: FileLoader, pid: int, fds: seq[int]) =
+proc suspend*(loader: FileLoader, fds: seq[int]) =
   let stream = connectSocketStream(loader.process, false, blocking = true)
   stream.swrite(SUSPEND)
-  stream.swrite(pid)
+  stream.swrite(loader.clientPid)
   stream.swrite(fds)
   stream.close()
 
-proc resume*(loader: FileLoader, pid: int, fds: seq[int]) =
+proc resume*(loader: FileLoader, fds: seq[int]) =
   let stream = connectSocketStream(loader.process, false, blocking = true)
   stream.swrite(RESUME)
-  stream.swrite(pid)
+  stream.swrite(loader.clientPid)
   stream.swrite(fds)
   stream.close()
 
-proc tee*(loader: FileLoader, pid, fd: int): Stream =
+proc tee*(loader: FileLoader, targetId: StreamId): SocketStream =
   let stream = connectSocketStream(loader.process, false, blocking = true)
   stream.swrite(TEE)
-  stream.swrite(pid)
-  stream.swrite(fd)
-  stream.swrite(int(getpid()))
-  stream.swrite(int(stream.fd))
+  stream.swrite(targetId)
+  let clientId: StreamId = (loader.clientPid, int(stream.fd))
+  stream.swrite(clientId)
   return stream
 
+proc rewind*(loader: FileLoader, fd: int): bool =
+  let stream = connectSocketStream(loader.process, false, blocking = true)
+  stream.swrite(REWIND)
+  let id: StreamId = (loader.clientPid, fd)
+  stream.swrite(id)
+  var res: bool
+  stream.sread(res)
+  return res
+
 const BufferSize = 4096
 
 proc handleHeaders(loader: FileLoader, request: Request, response: Response,
-    stream: Stream): bool =
+    stream: SocketStream) =
   var status: int
   stream.sread(status)
   response.status = cast[uint16](status)
@@ -628,7 +724,6 @@ proc handleHeaders(loader: FileLoader, request: Request, response: Response,
   loader.applyHeaders(request, response)
   # Only a stream of the response body may arrive after this point.
   response.body = stream
-  return true # success
 
 proc onConnected*(loader: FileLoader, fd: int) =
   let connectData = loader.connecting[fd]
@@ -638,7 +733,8 @@ proc onConnected*(loader: FileLoader, fd: int) =
   var res: int
   stream.sread(res)
   let response = newResponse(res, request, fd, stream)
-  if res == 0 and loader.handleHeaders(request, response, stream):
+  if res == 0:
+    loader.handleHeaders(request, response, stream)
     assert loader.unregisterFun != nil
     let realCloseImpl = stream.closeImpl
     stream.closeImpl = nil
@@ -651,7 +747,7 @@ proc onConnected*(loader: FileLoader, fd: int) =
       response: response,
       bodyRead: response.bodyRead
     )
-    SocketStream(stream).source.getFd().setBlocking(false)
+    stream.source.getFd().setBlocking(false)
     promise.resolve(JSResult[Response].ok(response))
   else:
     var msg: string
@@ -698,22 +794,19 @@ proc onError*(loader: FileLoader, fd: int) =
     buffer[].buf = ""
     response.unregisterFun()
 
-proc doRequest*(loader: FileLoader, request: Request, blocking = true,
-    canredir = false): Response =
+proc doRequest*(loader: FileLoader, request: Request, canredir = false):
+    Response =
   let response = Response(url: request.url)
   let stream = connectSocketStream(loader.process, false, blocking = true)
   if canredir:
     request.canredir = true #TODO set this somewhere else?
-  request.clientPid = getpid()
-  request.clientFd = int(stream.fd)
+  request.clientId = (loader.clientPid, int(stream.fd))
   stream.swrite(LOAD)
   stream.swrite(request)
   stream.flush()
   stream.sread(response.res)
   if response.res == 0:
-    if loader.handleHeaders(request, response, stream):
-      if not blocking:
-        stream.source.getFd().setBlocking(blocking)
+    loader.handleHeaders(request, response, stream)
   else:
     var msg: string
     stream.sread(msg)
@@ -725,12 +818,13 @@ proc addref*(loader: FileLoader) =
   let stream = connectSocketStream(loader.process)
   if stream != nil:
     stream.swrite(ADDREF)
-  stream.close()
+    stream.close()
 
 proc unref*(loader: FileLoader) =
   let stream = connectSocketStream(loader.process)
   if stream != nil:
     stream.swrite(UNREF)
+    stream.close()
 
 proc setReferrerPolicy*(loader: FileLoader, referrerpolicy: ReferrerPolicy) =
   let stream = connectSocketStream(loader.process)