about summary refs log tree commit diff stats
path: root/src/loader
diff options
context:
space:
mode:
authorbptato <nincsnevem662@gmail.com>2024-02-13 16:48:58 +0100
committerbptato <nincsnevem662@gmail.com>2024-02-13 16:48:58 +0100
commitaadbaffac2430abf01a849b1f7bbfb154c27d229 (patch)
treeea88c5a91af1544dddd26d5d41e200dc4907936d /src/loader
parent492a9e7cf80ae8e26b03602b5996f471c5edf4be (diff)
downloadchawan-aadbaffac2430abf01a849b1f7bbfb154c27d229.tar.gz
loader: fixes & improvements
* factor out pushBuffer to make loadFromCache async
* fix incorrect cache path
* replace rewind with loadFromCache (it does the same thing except
  actually works)
* remove rewindImpl callback, rewind in buffer instead
Diffstat (limited to 'src/loader')
-rw-r--r--src/loader/loader.nim169
-rw-r--r--src/loader/loaderhandle.nim31
2 files changed, 79 insertions, 121 deletions
diff --git a/src/loader/loader.nim b/src/loader/loader.nim
index 6183e771..31f5be10 100644
--- a/src/loader/loader.nim
+++ b/src/loader/loader.nim
@@ -81,7 +81,6 @@ type
     TEE
     SUSPEND
     RESUME
-    REWIND
     ADDREF
     UNREF
     SET_REFERRER_POLICY
@@ -141,10 +140,10 @@ func findOutput(ctx: LoaderContext, id: StreamId): OutputHandle =
   return nil
 
 #TODO linear search over strings :(
-func findCachedHandle(ctx: LoaderContext, cachepath: string): LoaderHandle =
-  assert cachepath != ""
+func findCachedHandle(ctx: LoaderContext, cacheUrl: string): LoaderHandle =
+  assert cacheUrl != ""
   for it in ctx.handleMap.values:
-    if it.cached and it.cachepath == cachepath:
+    if it.cached and it.cacheUrl == cacheUrl:
       return it
   return nil
 
@@ -153,6 +152,30 @@ proc delOutput(ctx: LoaderContext, id: StreamId) =
   if output != nil:
     ctx.outputMap.del(output.ostream.fd)
 
+type PushBufferResult = enum
+  pbrDone, pbrUnregister
+
+# Either write data to the target output, or append it to the list of buffers to
+# write and register the output in our selector.
+proc pushBuffer(ctx: LoaderContext, output: OutputHandle, buffer: LoaderBuffer):
+    PushBufferResult =
+  if output.currentBuffer == nil:
+    var n = 0
+    try:
+      n = output.ostream.sendData(buffer)
+    except ErrorAgain, ErrorWouldBlock:
+      discard
+    except ErrorBrokenPipe:
+      return pbrUnregister
+    if n < buffer.len:
+      output.currentBuffer = buffer
+      output.currentBufferIdx = n
+      ctx.selector.registerHandle(output.ostream.fd, {Write}, 0)
+      output.registered = true
+  else:
+    output.addBuffer(buffer)
+  return pbrDone
+
 proc addFd(ctx: LoaderContext, handle: LoaderHandle, originalUrl: URL) =
   let output = handle.output
   output.ostream.setBlocking(false)
@@ -174,9 +197,9 @@ proc addFd(ctx: LoaderContext, handle: LoaderHandle, originalUrl: URL) =
     let ps = newPosixStream(tmpf, O_CREAT or O_WRONLY, 0o600)
     if ps != nil:
       output.tee(ps, NullStreamId)
-      let path = $originalUrl
-      ctx.cacheMap[path] = tmpf
-      handle.cachepath = path
+      let surl = $originalUrl
+      ctx.cacheMap[surl] = tmpf
+      handle.cacheUrl = surl
 
 proc loadStream(ctx: LoaderContext, handle: LoaderHandle, request: Request) =
   ctx.passedFdMap.withValue(request.url.host, fdp):
@@ -236,52 +259,47 @@ proc loadFromCache(ctx: LoaderContext, stream: SocketStream, request: Request) =
   let handle = newLoaderHandle(stream, request.canredir, request.clientId)
   let surl = $request.url
   let cachedHandle = ctx.findCachedHandle(surl)
+  let output = handle.output
   ctx.cacheMap.withValue(surl, p):
     let ps = newPosixStream(p[], O_RDONLY, 0)
     if ps == nil:
       handle.rejectHandle(ERROR_FILE_NOT_IN_CACHE)
       ctx.cacheMap.del(surl)
+      handle.close()
       return
     handle.sendResult(0)
     handle.sendStatus(200)
     handle.sendHeaders(newHeaders())
-    var buffer {.noinit.}: array[BufferSize, uint8]
-    try:
-      while true:
-        let n = ps.recvData(addr buffer[0], buffer.len)
-        if buffer.len == 0:
-          break
-        if handle.output.sendData(addr buffer[0], n) < n:
-          break
-        if n < buffer.len:
-          break
-    except ErrorBrokenPipe:
-      handle.close()
-      raise
+    if handle.cached:
+      handle.cacheUrl = surl
+    output.ostream.setBlocking(false)
+    while true:
+      let buffer = newLoaderBuffer()
+      let n = ps.recvData(buffer)
+      if n == 0:
+        break
+      if ctx.pushBuffer(output, buffer) == pbrUnregister:
+        if output.registered:
+          ctx.selector.unregister(output.ostream.fd)
+        ps.close()
+        return
+      if n < buffer.cap:
+        break
     ps.close()
   do:
     if cachedHandle == nil:
-      handle.sendResult(ERROR_URL_NOT_IN_CACHE)
+      handle.rejectHandle(ERROR_URL_NOT_IN_CACHE)
+      return
   if cachedHandle != nil:
     # download is still ongoing; move output to the original handle
-    let output = handle.output
-    output.ostream.setBlocking(false)
     handle.outputs.setLen(0)
     output.parent = cachedHandle
     cachedHandle.outputs.add(output)
+  elif output.registered:
+    output.istreamAtEnd = true
     ctx.outputMap[output.ostream.fd] = output
-  if handle.outputs.len > 0:
-    let output = handle.output
-    if output.sostream != nil:
-      try:
-        handle.output.sostream.swrite(true)
-      except IOError:
-        # ignore error, that just means the buffer has already closed the
-        # stream
-        discard
-      output.sostream.close()
-      output.sostream = nil
-  handle.close()
+  else:
+    output.ostream.close()
 
 proc onLoad(ctx: LoaderContext, stream: SocketStream) =
   var request: Request
@@ -317,33 +335,6 @@ proc onLoad(ctx: LoaderContext, stream: SocketStream) =
     ctx.outputMap[fd] = handle.output
     ctx.loadResource(request, handle)
 
-proc rewind(ctx: LoaderContext, stream: PosixStream, clientId: StreamId) =
-  let output = ctx.findOutput(clientId)
-  if output == nil or output.ostream == nil:
-    stream.swrite(false)
-    return
-  let handle = output.parent
-  if not handle.cached:
-    stream.swrite(false)
-    return
-  assert handle.cachepath != ""
-  let ps = newPosixStream(handle.cachepath, O_RDONLY, 0)
-  if ps == nil:
-    stream.swrite(false)
-    return
-  stream.swrite(true)
-  output.ostream.setBlocking(true) #TODO
-  var buffer {.noinit.}: array[BufferSize, uint8]
-  while true:
-    let n = ps.recvData(addr buffer[0], BufferSize)
-    if n == 0:
-      break
-    if output.sendData(addr buffer[0], n) < n:
-      break
-    if n < BufferSize:
-      break
-  ps.close()
-
 proc acceptConnection(ctx: LoaderContext) =
   let stream = ctx.ssock.acceptSocketStream()
   try:
@@ -384,10 +375,6 @@ proc acceptConnection(ctx: LoaderContext) =
           # place the stream back into the selector, so we can write to it
           # again
           ctx.selector.registerHandle(output.ostream.fd, {Write}, 0)
-    of REWIND:
-      var targetId: StreamId
-      stream.sread(targetId)
-      ctx.rewind(stream, targetId)
     of ADDREF:
       inc ctx.refcount
     of UNREF:
@@ -446,39 +433,19 @@ proc initLoaderContext(fd: cint, config: LoaderConfig): LoaderContext =
       dir &= '/'
   return ctx
 
-# Either write data to the target output, or append it to the list of buffers to
-# write and register the output in our selector.
-proc pushBuffer(ctx: LoaderContext, handle: LoaderHandle,
-    buffer: LoaderBuffer, unregWrite: var seq[OutputHandle]) =
-  for output in handle.outputs:
-    if output.currentBuffer == nil:
-      var n = 0
-      try:
-        n = output.sendData(addr buffer[0], buffer.len)
-      except ErrorAgain, ErrorWouldBlock:
-        discard
-      except ErrorBrokenPipe:
-        unregWrite.add(output)
-        break
-      if n < buffer.len:
-        output.currentBuffer = buffer
-        output.currentBufferIdx = n
-        ctx.selector.registerHandle(output.ostream.fd, {Write}, 0)
-        output.registered = true
-    else:
-      output.addBuffer(buffer)
-
 # Called whenever there is more data available to read.
 proc handleRead(ctx: LoaderContext, handle: LoaderHandle,
     unregRead: var seq[LoaderHandle], unregWrite: var seq[OutputHandle]) =
   while true:
     let buffer = newLoaderBuffer()
     try:
-      buffer.len = handle.istream.recvData(addr buffer[0], buffer.cap)
-      if buffer.len == 0:
+      let n = handle.istream.recvData(buffer)
+      if n == 0:
         break
-      ctx.pushBuffer(handle, buffer, unregWrite)
-      if buffer.len < buffer.cap:
+      for output in handle.outputs:
+        if ctx.pushBuffer(output, buffer) == pbrUnregister:
+          unregWrite.add(output)
+      if n < buffer.cap:
         break
     except ErrorAgain, ErrorWouldBlock: # retry later
       break
@@ -493,9 +460,7 @@ proc handleWrite(ctx: LoaderContext, output: OutputHandle,
   while output.currentBuffer != nil:
     let buffer = output.currentBuffer
     try:
-      let i = output.currentBufferIdx
-      assert buffer.len - i > 0
-      let n = output.sendData(addr buffer[i], buffer.len - i)
+      let n = output.ostream.sendData(buffer, output.currentBufferIdx)
       output.currentBufferIdx += n
       if output.currentBufferIdx < buffer.len:
         break
@@ -714,15 +679,6 @@ proc tee*(loader: FileLoader, targetId: StreamId): SocketStream =
   stream.swrite(clientId)
   return stream
 
-proc rewind*(loader: FileLoader, fd: int): bool =
-  let stream = connectSocketStream(loader.process, false, blocking = true)
-  stream.swrite(REWIND)
-  let id: StreamId = (loader.clientPid, fd)
-  stream.swrite(id)
-  var res: bool
-  stream.sread(res)
-  return res
-
 const BufferSize = 4096
 
 proc handleHeaders(loader: FileLoader, request: Request, response: Response,
@@ -816,10 +772,7 @@ proc doRequest*(loader: FileLoader, request: Request): Response =
   if response.res == 0:
     loader.handleHeaders(request, response, stream)
   else:
-    var msg: string
-    stream.sread(msg)
-    if msg != "":
-      response.internalMessage = msg
+    stream.sread(response.internalMessage)
   return response
 
 proc addref*(loader: FileLoader) =
diff --git a/src/loader/loaderhandle.nim b/src/loader/loaderhandle.nim
index fe8c6435..b0c5747e 100644
--- a/src/loader/loaderhandle.nim
+++ b/src/loader/loaderhandle.nim
@@ -17,7 +17,7 @@ const LoaderBufferPageSize = 4064 # 4096 - 32
 type
   LoaderBufferObj = object
     page: ptr UncheckedArray[uint8]
-    len: int
+    len*: int
 
   LoaderBuffer* = ref LoaderBufferObj
 
@@ -41,7 +41,7 @@ type
     canredir: bool
     outputs*: seq[OutputHandle]
     cached*: bool
-    cachepath*: string
+    cacheUrl*: string
     when defined(debug):
       url*: URL
 
@@ -70,18 +70,9 @@ proc findOutputHandle*(handle: LoaderHandle, fd: int): OutputHandle =
       return output
   return nil
 
-func `[]`*(buffer: LoaderBuffer, i: int): var uint8 {.inline.} =
-  return buffer[].page[i]
-
 func cap*(buffer: LoaderBuffer): int {.inline.} =
   return LoaderBufferPageSize
 
-func len*(buffer: LoaderBuffer): var int {.inline.} =
-  return buffer[].len
-
-proc `len=`*(buffer: LoaderBuffer, i: int) {.inline.} =
-  buffer[].len = i
-
 proc newLoaderBuffer*(): LoaderBuffer =
   return LoaderBuffer(
     page: cast[ptr UncheckedArray[uint8]](alloc(LoaderBufferPageSize)),
@@ -102,6 +93,14 @@ proc bufferCleared*(output: OutputHandle) =
   else:
     output.currentBuffer = nil
 
+proc clearBuffers*(output: OutputHandle) =
+  if output.currentBuffer != nil:
+    output.currentBuffer = nil
+    output.currentBufferIdx = 0
+    output.buffers.clear()
+  else:
+    assert output.buffers.len == 0
+
 proc tee*(outputIn: OutputHandle, ostream: PosixStream, clientId: StreamId) =
   outputIn.parent.outputs.add(OutputHandle(
     parent: outputIn.parent,
@@ -139,8 +138,14 @@ proc sendHeaders*(handle: LoaderHandle, headers: Headers) =
       output.sostream = sostream
       output.ostream = newPosixStream(fd)
 
-proc sendData*(output: OutputHandle, p: pointer, nmemb: int): int =
-  return output.ostream.sendData(p, nmemb)
+proc recvData*(ps: PosixStream, buffer: LoaderBuffer): int {.inline.} =
+  let n = ps.recvData(addr buffer.page[0], buffer.cap)
+  buffer.len = n
+  return n
+
+proc sendData*(ps: PosixStream, buffer: LoaderBuffer, si = 0): int {.inline.} =
+  assert buffer.len - si > 0
+  return ps.sendData(addr buffer.page[si], buffer.len - si)
 
 proc close*(handle: LoaderHandle) =
   for output in handle.outputs: