about summary refs log tree commit diff stats
path: root/src/loader
diff options
context:
space:
mode:
authorbptato <nincsnevem662@gmail.com>2024-02-10 22:52:13 +0100
committerbptato <nincsnevem662@gmail.com>2024-02-10 22:54:31 +0100
commitc4f0423e1a786fef840fd2f8c5c6bba550b353ab (patch)
tree9eb8007ca3af03f466dd3eedbb1e8b7e29e8ff94 /src/loader
parentd8c4b0979c6d1ff9f6edea650e3aeb1ca1e4a104 (diff)
downloadchawan-c4f0423e1a786fef840fd2f8c5c6bba550b353ab.tar.gz
loader: fix tee
My eyes are bleeding, but at least there is a chance that this does what
I wanted.

The previous tee implementation mixed buffer and loader fds, so it was
fundamentally broken. Also, it used MultiStream which makes asynchronous
streaming impossible.

This time we use a flat array of output handles and link to them any
buffers not written to the target yet.
Diffstat (limited to 'src/loader')
-rw-r--r--src/loader/loader.nim213
-rw-r--r--src/loader/loaderhandle.nim132
-rw-r--r--src/loader/request.nim2
3 files changed, 225 insertions, 122 deletions
diff --git a/src/loader/loader.nim b/src/loader/loader.nim
index 82796e20..53aeb498 100644
--- a/src/loader/loader.nim
+++ b/src/loader/loader.nim
@@ -88,6 +88,8 @@ type
     alive: bool
     config: LoaderConfig
     handleMap: Table[int, LoaderHandle]
+    outputMap: Table[int, OutputHandle]
+    clientFdMap: seq[tuple[pid, fd: int, output: OutputHandle]]
     referrerpolicy: ReferrerPolicy
     selector: Selector[int]
     fd: int
@@ -138,18 +140,23 @@ proc loadResource(ctx: LoaderContext, request: Request, handle: LoaderHandle) =
       if handle.istream == nil:
         handle.close()
       else:
-        handle.ostream.setBlocking(false)
+        let output = handle.output
+        output.ostream.setBlocking(false)
         ctx.selector.registerHandle(handle.istream.fd, {Read}, 0)
-        ctx.selector.registerHandle(handle.ostream.fd, {Write}, 0)
+        ctx.selector.registerHandle(output.ostream.fd, {Write}, 0)
         let ofl = fcntl(handle.istream.fd, F_GETFL, 0)
         discard fcntl(handle.istream.fd, F_SETFL, ofl or O_NONBLOCK)
-        # yes, this puts the istream fd in addition to the ostream fd in
-        # handlemap to point to the same ref
         ctx.handleMap[handle.istream.fd] = handle
-        # also put the new fd into handleMap if stream was redirected
-        if handle.sostream != nil:
-          ctx.handleMap[handle.ostream.fd] = handle
-          ctx.handleMap.del(handle.sostream.fd)
+        if output.sostream != nil:
+          # replace the fd with the new one in outputMap if stream was
+          # redirected
+          # (kind of a hack, but should always work)
+          ctx.outputMap[output.ostream.fd] = output
+          ctx.outputMap.del(output.sostream.fd)
+          # currently only the main buffer stream can have redirects, and we
+          # don't suspend/resume it; if we did, we would have to put the new
+          # output stream's clientFd in clientFdMap too.
+          ctx.clientFdMap.del(output.sostream.fd)
     else:
       prevurl = request.url
       case ctx.config.uriMethodMap.findAndRewrite(request.url)
@@ -169,7 +176,13 @@ proc loadResource(ctx: LoaderContext, request: Request, handle: LoaderHandle) =
 proc onLoad(ctx: LoaderContext, stream: SocketStream) =
   var request: Request
   stream.sread(request)
-  let handle = newLoaderHandle(stream, request.canredir)
+  let handle = newLoaderHandle(
+    stream,
+    request.canredir,
+    request.clientPid,
+    request.clientFd
+  )
+  assert request.clientPid != 0
   when defined(debug):
     handle.url = request.url
   if not ctx.config.filter.match(request.url):
@@ -191,9 +204,22 @@ proc onLoad(ctx: LoaderContext, stream: SocketStream) =
     if request.proxy == nil or not ctx.config.acceptProxy:
       request.proxy = ctx.config.proxy
     let fd = int(stream.source.getFd())
-    ctx.handleMap[fd] = handle
+    ctx.outputMap[fd] = handle.output
+    ctx.clientFdMap.add((request.clientPid, request.clientFd, handle.output))
     ctx.loadResource(request, handle)
 
+func findClientFdEntry(ctx: LoaderContext, pid, fd: int): int =
+  for i, (itpid, itfd, _) in ctx.clientFdMap:
+    if pid == itpid and fd == itfd:
+      return i
+  return -1
+
+func findOutputByClientFd(ctx: LoaderContext, pid, fd: int): OutputHandle =
+  let i = ctx.findClientFdEntry(pid, fd)
+  if i != -1:
+    return ctx.clientFdMap[i].output
+  return nil
+
 proc acceptConnection(ctx: LoaderContext) =
   let stream = ctx.ssock.acceptSocketStream()
   try:
@@ -203,32 +229,40 @@ proc acceptConnection(ctx: LoaderContext) =
     of LOAD:
       ctx.onLoad(stream)
     of TEE:
+      var clientPid: int
+      var clientFd: int
+      var pid: int
       var fd: int
+      stream.sread(pid)
       stream.sread(fd)
-      if fd notin ctx.handleMap:
-        stream.swrite(false)
-      else:
-        let handle = ctx.handleMap[fd]
-        handle.addOutputStream(stream)
-        stream.swrite(true)
+      stream.sread(clientPid)
+      stream.sread(clientFd)
+      let output = ctx.findOutputByClientFd(pid, fd)
+      if output != nil:
+        output.tee(stream, clientPid, clientFd)
+      stream.swrite(output != nil)
     of SUSPEND:
+      var pid: int
       var fds: seq[int]
+      stream.sread(pid)
       stream.sread(fds)
       for fd in fds:
-        ctx.handleMap.withValue(fd, handlep):
-          if handlep[].ostream != nil and handlep[].ostream.fd == fd:
-            # remove from the selector, so any new reads will be just placed
-            # in the handle's buffer
-            ctx.selector.unregister(fd)
+        let output = ctx.findOutputByClientFd(pid, fd)
+        if output != nil:
+          # remove from the selector, so any new reads will be just placed
+          # in the handle's buffer
+          ctx.selector.unregister(output.ostream.fd)
     of RESUME:
+      var pid: int
       var fds: seq[int]
+      stream.sread(pid)
       stream.sread(fds)
       for fd in fds:
-        ctx.handleMap.withValue(fd, handlep):
-          if handlep[].ostream != nil and handlep[].ostream.fd == fd:
-            # place the stream back into the selector, so we can write to it
-            # again
-            ctx.selector.registerHandle(fd, {Write}, 0)
+        let output = ctx.findOutputByClientFd(pid, fd)
+        if output != nil:
+          # place the stream back into the selector, so we can write to it
+          # again
+          ctx.selector.registerHandle(output.ostream.fd, {Write}, 0)
     of ADDREF:
       inc ctx.refcount
     of UNREF:
@@ -284,7 +318,7 @@ proc runFileLoader*(fd: cint, config: LoaderConfig) =
   while ctx.alive:
     let events = ctx.selector.select(-1)
     var unregRead: seq[LoaderHandle]
-    var unregWrite: seq[LoaderHandle]
+    var unregWrite: seq[OutputHandle]
     for event in events:
       if Read in event.events:
         if event.fd == ctx.fd: # incoming connection
@@ -297,46 +331,41 @@ proc runFileLoader*(fd: cint, config: LoaderConfig) =
             try:
               buffer.len = handle.istream.recvData(addr buffer[0], buffer.cap)
               if buffer.len == 0:
-                dealloc(buffer)
                 break
               handle.addBuffer(buffer)
               if buffer.len < buffer.cap:
                 break
             except ErrorAgain, ErrorWouldBlock: # retry later
-              dealloc(buffer)
               break
             except ErrorBrokenPipe: # sender died; stop streaming
-              dealloc(buffer)
               unregRead.add(handle)
               break
       if Write in event.events:
-        let handle = ctx.handleMap[event.fd]
-        assert event.fd == handle.ostream.fd
-        while handle.currentBuffer != nil:
-          let buffer = handle.currentBuffer
+        let output = ctx.outputMap[event.fd]
+        while output.currentBuffer != nil:
+          let buffer = output.currentBuffer
           try:
-            let i = handle.currentBufferIdx
+            let i = output.currentBufferIdx
             assert buffer.len - i > 0
-            let n = handle.sendData(addr buffer[i], buffer.len - i)
-            handle.currentBufferIdx += n
-            if handle.currentBufferIdx < buffer.len:
+            let n = output.sendData(addr buffer[i], buffer.len - i)
+            output.currentBufferIdx += n
+            if output.currentBufferIdx < buffer.len:
               break
-            handle.bufferCleared() # swap out buffer
+            output.bufferCleared() # swap out buffer
           except ErrorAgain, ErrorWouldBlock: # never mind
             break
           except ErrorBrokenPipe: # receiver died; stop streaming
-            unregWrite.add(handle)
+            unregWrite.add(output)
             break
-        if handle.istream == nil and handle.currentBuffer == nil:
+        if output.istreamAtEnd and output.currentBuffer == nil:
           # after EOF, but not appended in this send cycle
-          unregWrite.add(handle)
+          unregWrite.add(output)
       if Error in event.events:
         assert event.fd != ctx.fd
-        let handle = ctx.handleMap[event.fd]
-        if handle.ostream.fd == event.fd: # ostream died
-          unregWrite.add(handle)
-        else: # istream died
-          assert handle.istream.fd == event.fd
+        ctx.outputMap.withValue(event.fd, outputp): # ostream died
+          unregWrite.add(outputp[])
+        do: # istream died
+          let handle = ctx.handleMap[event.fd]
           unregRead.add(handle)
     # Unregister handles queued for unregistration.
     # It is possible for both unregRead and unregWrite to contain duplicates. To
@@ -348,29 +377,51 @@ proc runFileLoader*(fd: cint, config: LoaderConfig) =
         ctx.handleMap.del(handle.istream.fd)
         handle.istream.close()
         handle.istream = nil
-        if handle.currentBuffer == nil:
-          unregWrite.add(handle)
-    for handle in unregWrite:
-      if handle.ostream != nil:
-        ctx.selector.unregister(handle.ostream.fd)
-        ctx.handleMap.del(handle.ostream.fd)
-        handle.ostream.close()
-        handle.ostream = nil
-      if handle.sostream != nil:
+        for output in handle.outputs:
+          output.istreamAtEnd = true
+          if output.currentBuffer == nil:
+            unregWrite.add(output)
+    for output in unregWrite:
+      if output.ostream != nil:
+        ctx.selector.unregister(output.ostream.fd)
+        ctx.outputMap.del(output.ostream.fd)
+        if output.clientFd != -1:
+          let i = ctx.findClientFdEntry(output.clientPid, output.clientFd)
+          ctx.clientFdMap.del(i)
+        output.ostream.close()
+        output.ostream = nil
+        let handle = output.parent
+        let i = handle.outputs.find(output)
+        handle.outputs.del(i)
+        if handle.outputs.len == 0 and handle.istream != nil:
+          # premature end of all output streams; kill istream too
+          ctx.selector.unregister(handle.istream.fd)
+          ctx.handleMap.del(handle.istream.fd)
+          handle.istream.close()
+          handle.istream = nil
+      if output.sostream != nil:
+        #TODO it is not clear what should happen when multiple outputs exist.
+        #
+        # Normally, sostream is created after redirection, and must be written
+        # to & closed after the input has completely been written into the
+        # output stream. e.g. runMailcapEntryFile uses this to wait for the file
+        # to be completely downloaded before executing an entry that takes a
+        # file parameter.
+        #
+        # We should either block clone in this case, or find a better way to
+        # wait for file downloads to finish. (Note that the buffer remaining
+        # opened until the file has been downloaded is a somewhat useful visual
+        # indication; while it does not show progress (bad), it does at least
+        # show that *something* has been opened. An alternative should probably
+        # add a temporary entry to a file download screen or something.)
         try:
-          handle.sostream.swrite(true)
+          output.sostream.swrite(true)
         except IOError:
           # ignore error, that just means the buffer has already closed the
           # stream
           discard
-        handle.sostream.close()
-        handle.sostream = nil
-      if handle.istream != nil:
-        ctx.handleMap.del(handle.istream.fd)
-        ctx.selector.unregister(handle.istream.fd)
-        handle.istream.close()
-        handle.istream = nil
-      #TODO TODO TODO what to do about sostream
+        output.sostream.close()
+        output.sostream = nil
   ctx.exitLoader()
 
 proc getAttribute(contentType, attrname: string): string =
@@ -383,16 +434,15 @@ proc getAttribute(contentType, attrname: string): string =
     while i < kvs.len and kvs[i] in AsciiWhitespace:
       inc i
     var q = false
-    for j in i ..< kvs.len:
+    for j, c in kvs.toOpenArray(i, kvs.high):
       if q:
-        s &= kvs[j]
+        s &= c
+      elif c == '\\':
+        q = true
+      elif c == ';' or c in AsciiWhitespace:
+        break
       else:
-        if kvs[j] == '\\':
-          q = true
-        elif kvs[j] == ';' or kvs[j] in AsciiWhitespace:
-          break
-        else:
-          s &= kvs[j]
+        s &= c
   return s
 
 proc applyHeaders(loader: FileLoader, request: Request, response: Response) =
@@ -427,6 +477,8 @@ proc applyHeaders(loader: FileLoader, request: Request, response: Response) =
 #TODO: add init
 proc fetch*(loader: FileLoader, input: Request): FetchPromise =
   let stream = connectSocketStream(loader.process, false, blocking = true)
+  input.clientPid = getpid()
+  input.clientFd = int(stream.fd)
   stream.swrite(LOAD)
   stream.swrite(input)
   stream.flush()
@@ -442,6 +494,8 @@ proc fetch*(loader: FileLoader, input: Request): FetchPromise =
 
 proc reconnect*(loader: FileLoader, data: ConnectData) =
   let stream = connectSocketStream(loader.process, false, blocking = true)
+  data.request.clientPid = getpid()
+  data.request.clientFd = int(stream.fd)
   stream.swrite(LOAD)
   stream.swrite(data.request)
   stream.flush()
@@ -468,22 +522,27 @@ proc switchStream*(loader: FileLoader, data: var OngoingData,
     loader.unregisterFun(fd)
     realCloseImpl(stream)
 
-proc suspend*(loader: FileLoader, fds: seq[int]) =
+proc suspend*(loader: FileLoader, pid: int, fds: seq[int]) =
   let stream = connectSocketStream(loader.process, false, blocking = true)
   stream.swrite(SUSPEND)
+  stream.swrite(pid)
   stream.swrite(fds)
   stream.close()
 
-proc resume*(loader: FileLoader, fds: seq[int]) =
+proc resume*(loader: FileLoader, pid: int, fds: seq[int]) =
   let stream = connectSocketStream(loader.process, false, blocking = true)
   stream.swrite(RESUME)
+  stream.swrite(pid)
   stream.swrite(fds)
   stream.close()
 
-proc tee*(loader: FileLoader, fd: int): Stream =
+proc tee*(loader: FileLoader, pid, fd: int): Stream =
   let stream = connectSocketStream(loader.process, false, blocking = true)
   stream.swrite(TEE)
+  stream.swrite(pid)
   stream.swrite(fd)
+  stream.swrite(int(getpid()))
+  stream.swrite(int(stream.fd))
   return stream
 
 const BufferSize = 4096
@@ -574,6 +633,8 @@ proc doRequest*(loader: FileLoader, request: Request, blocking = true,
   let stream = connectSocketStream(loader.process, false, blocking = true)
   if canredir:
     request.canredir = true #TODO set this somewhere else?
+  request.clientPid = getpid()
+  request.clientFd = int(stream.fd)
   stream.swrite(LOAD)
   stream.swrite(request)
   stream.flush()
diff --git a/src/loader/loaderhandle.nim b/src/loader/loaderhandle.nim
index 15336964..6cdfd19f 100644
--- a/src/loader/loaderhandle.nim
+++ b/src/loader/loaderhandle.nim
@@ -10,44 +10,68 @@ import loader/headers
 when defined(debug):
   import types/url
 
-type
-  LoaderBufferPage = array[4056, uint8] # 4096 - 8 - 32
+const LoaderBufferPageSize = 4064 # 4096 - 32
 
+type
   LoaderBufferObj = object
-    page*: LoaderBufferPage
+    page: ptr UncheckedArray[uint8]
     len: int
 
-  LoaderBuffer* = ptr LoaderBufferObj
+  LoaderBuffer* = ref LoaderBufferObj
 
-  OutputHandle* = object
+  OutputHandle* = ref object
+    parent*: LoaderHandle
+    currentBuffer*: LoaderBuffer
+    currentBufferIdx*: int
+    buffers: Deque[LoaderBuffer]
+    ostream*: PosixStream
+    istreamAtEnd*: bool
+    sostream*: PosixStream # saved ostream when redirected
+    clientFd*: int
+    clientPid*: int
 
   LoaderHandle* = ref object
-    ostream*: PosixStream #TODO un-extern
     # Stream for taking input
     istream*: PosixStream
     # Only the first handle can be redirected, because a) mailcap can only
     # redirect the first handle and b) async redirects would result in race
     # conditions that would be difficult to untangle.
     canredir: bool
-    sostream*: PosixStream # saved ostream when redirected
-    currentBuffer*: LoaderBuffer
-    currentBufferIdx*: int
-    buffers: Deque[LoaderBuffer]
+    outputs*: seq[OutputHandle]
     when defined(debug):
       url*: URL
 
+{.warning[Deprecated]:off.}:
+  proc `=destroy`(buffer: var LoaderBufferObj) =
+    if buffer.page != nil:
+      dealloc(buffer.page)
+      buffer.page = nil
+
 # Create a new loader handle, with the output stream ostream.
-proc newLoaderHandle*(ostream: PosixStream, canredir: bool): LoaderHandle =
-  return LoaderHandle(
-    ostream: ostream,
+proc newLoaderHandle*(ostream: PosixStream, canredir: bool,
+    clientPid, clientFd: int): LoaderHandle =
+  let handle = LoaderHandle(
     canredir: canredir
   )
+  handle.outputs.add(OutputHandle(
+    ostream: ostream,
+    parent: handle,
+    clientPid: clientPid,
+    clientFd: clientFd
+  ))
+  return handle
+
+proc findOutputHandle*(handle: LoaderHandle, fd: int): OutputHandle =
+  for output in handle.outputs:
+    if output.ostream.fd == fd:
+      return output
+  return nil
 
 func `[]`*(buffer: LoaderBuffer, i: int): var uint8 {.inline.} =
   return buffer[].page[i]
 
 func cap*(buffer: LoaderBuffer): int {.inline.} =
-  return buffer[].page.len
+  return LoaderBufferPageSize
 
 func len*(buffer: LoaderBuffer): var int {.inline.} =
   return buffer[].len
@@ -56,59 +80,75 @@ proc `len=`*(buffer: LoaderBuffer, i: int) {.inline.} =
   buffer[].len = i
 
 proc newLoaderBuffer*(): LoaderBuffer =
-  let buffer = cast[LoaderBuffer](alloc(sizeof(LoaderBufferObj)))
+  let buffer = LoaderBuffer(
+    page: cast[ptr UncheckedArray[uint8]](alloc(LoaderBufferPageSize))
+  )
   buffer.len = 0
   return buffer
 
 proc addBuffer*(handle: LoaderHandle, buffer: LoaderBuffer) =
-  if handle.currentBuffer == nil:
-    handle.currentBuffer = buffer
-  else:
-    handle.buffers.addLast(buffer)
-
-proc bufferCleared*(handle: LoaderHandle) =
-  assert handle.currentBuffer != nil
-  handle.currentBufferIdx = 0
-  dealloc(handle.currentBuffer)
-  if handle.buffers.len > 0:
-    handle.currentBuffer = handle.buffers.popFirst()
+  for output in handle.outputs.mitems:
+    if output.currentBuffer == nil:
+      output.currentBuffer = buffer
+    else:
+      output.buffers.addLast(buffer)
+
+proc bufferCleared*(output: OutputHandle) =
+  assert output.currentBuffer != nil
+  output.currentBufferIdx = 0
+  if output.buffers.len > 0:
+    output.currentBuffer = output.buffers.popFirst()
   else:
-    handle.currentBuffer = nil
+    output.currentBuffer = nil
+
+proc tee*(outputIn: OutputHandle, ostream: PosixStream,
+    clientFd, clientPid: int) =
+  outputIn.parent.outputs.add(OutputHandle(
+    parent: outputIn.parent,
+    ostream: ostream,
+    currentBuffer: outputIn.currentBuffer,
+    currentBufferIdx: outputIn.currentBufferIdx,
+    buffers: outputIn.buffers,
+    istreamAtEnd: outputIn.istreamAtEnd,
+    clientFd: clientFd,
+    clientPid: clientPid
+  ))
 
-proc addOutputStream*(handle: LoaderHandle, stream: Stream) =
-  doAssert false
-  #TODO TODO TODO fix this
-  #let ms = newMultiStream(handle.ostream, stream)
-  #handle.ostream = ms
+template output*(handle: LoaderHandle): OutputHandle =
+  handle.outputs[0]
 
 proc sendResult*(handle: LoaderHandle, res: int, msg = "") =
-  handle.ostream.swrite(res)
+  handle.output.ostream.swrite(res)
   if res == 0: # success
     assert msg == ""
   else: # error
-    handle.ostream.swrite(msg)
+    handle.output.ostream.swrite(msg)
 
 proc sendStatus*(handle: LoaderHandle, status: int) =
-  handle.ostream.swrite(status)
+  handle.output.ostream.swrite(status)
 
 proc sendHeaders*(handle: LoaderHandle, headers: Headers) =
-  handle.ostream.swrite(headers)
+  let output = handle.output
+  output.ostream.swrite(headers)
   if handle.canredir:
     var redir: bool
-    handle.ostream.sread(redir)
+    output.ostream.sread(redir)
     if redir:
-      let fd = SocketStream(handle.ostream).recvFileHandle()
-      handle.sostream = handle.ostream
-      handle.ostream = newPosixStream(fd)
+      let fd = SocketStream(output.ostream).recvFileHandle()
+      output.sostream = output.ostream
+      output.ostream = newPosixStream(fd)
+      output.clientFd = -1
+      output.clientPid = -1
 
-proc sendData*(handle: LoaderHandle, p: pointer, nmemb: int): int =
-  return handle.ostream.sendData(p, nmemb)
+proc sendData*(output: OutputHandle, p: pointer, nmemb: int): int =
+  return output.ostream.sendData(p, nmemb)
 
 proc close*(handle: LoaderHandle) =
-  assert handle.sostream == nil
-  if handle.ostream != nil:
-    handle.ostream.close()
-    handle.ostream = nil
+  for output in handle.outputs:
+    assert output.sostream == nil
+    if output.ostream != nil:
+      output.ostream.close()
+      output.ostream = nil
   if handle.istream != nil:
     handle.istream.close()
     handle.istream = nil
diff --git a/src/loader/request.nim b/src/loader/request.nim
index a91731bb..805345f3 100644
--- a/src/loader/request.nim
+++ b/src/loader/request.nim
@@ -80,6 +80,8 @@ type
     credentialsMode* {.jsget.}: CredentialsMode
     proxy*: URL #TODO do something with this
     canredir*: bool
+    clientFd*: int
+    clientPid*: int
 
   ReadableStream* = ref object of Stream
     isource*: Stream