about summary refs log tree commit diff stats
path: root/src/loader
diff options
context:
space:
mode:
authorbptato <nincsnevem662@gmail.com>2024-02-05 15:32:05 +0100
committerbptato <nincsnevem662@gmail.com>2024-02-07 22:21:48 +0100
commit407b525332106d84f18d74f6b51ae2f7a1ed3475 (patch)
treec4ca2ad55df079b9c49d9361bc3c495a7a5977d0 /src/loader
parent168bd542d989c76ce3ff09a29b8d77af448c3c12 (diff)
downloadchawan-407b525332106d84f18d74f6b51ae2f7a1ed3475.tar.gz
Incremental rendering
Yay!

Admittedly, it is not very useful in its current form, except maybe on
very slow networks.

The problem is that renderDocument is *slow*, so we only run it when
onload fails to consume all bytes from the network in a single pass.
Even then, we are guaranteed to get a FOUC, since CSS is only downloaded
in finishLoad(). Well, I think it's cool, anyway.
Diffstat (limited to 'src/loader')
-rw-r--r--src/loader/loader.nim93
-rw-r--r--src/loader/loaderhandle.nim110
2 files changed, 152 insertions, 51 deletions
diff --git a/src/loader/loader.nim b/src/loader/loader.nim
index 02585630..749f7d29 100644
--- a/src/loader/loader.nim
+++ b/src/loader/loader.nim
@@ -139,7 +139,9 @@ proc loadResource(ctx: LoaderContext, request: Request, handle: LoaderHandle) =
         handle.close()
       else:
         let fd = handle.istream.fd
+        handle.setBlocking(false)
         ctx.selector.registerHandle(fd, {Read}, 0)
+        ctx.selector.registerHandle(handle.fd, {Write}, 0)
         let ofl = fcntl(fd, F_GETFL, 0)
         discard fcntl(fd, F_SETFL, ofl or O_NONBLOCK)
         # yes, this puts the istream fd in addition to the ostream fd in
@@ -164,7 +166,7 @@ proc loadResource(ctx: LoaderContext, request: Request, handle: LoaderHandle) =
 proc onLoad(ctx: LoaderContext, stream: SocketStream) =
   var request: Request
   stream.sread(request)
-  let handle = newLoaderHandle(stream, request.canredir)
+  let handle = newLoaderHandle(stream, request.canredir, request.url)
   if not ctx.config.filter.match(request.url):
     handle.sendResult(ERROR_DISALLOWED_URL)
     handle.close()
@@ -188,9 +190,6 @@ proc onLoad(ctx: LoaderContext, stream: SocketStream) =
     ctx.loadResource(request, handle)
 
 proc acceptConnection(ctx: LoaderContext) =
-  #TODO TODO TODO acceptSocketStream should be non-blocking here,
-  # otherwise the client disconnecting between poll and accept could
-  # block this indefinitely.
   let stream = ctx.ssock.acceptSocketStream()
   try:
     var cmd: LoaderCommand
@@ -250,7 +249,7 @@ proc initLoaderContext(fd: cint, config: LoaderConfig): LoaderContext =
   gctx = ctx
   #TODO ideally, buffered would be true. Unfortunately this conflicts with
   # sendFileHandle/recvFileHandle.
-  ctx.ssock = initServerSocket(buffered = false)
+  ctx.ssock = initServerSocket(buffered = false, blocking = false)
   ctx.fd = int(ctx.ssock.sock.getFd())
   ctx.selector.registerHandle(ctx.fd, {Read}, 0)
   # The server has been initialized, so the main process can resume execution.
@@ -271,41 +270,81 @@ proc initLoaderContext(fd: cint, config: LoaderConfig): LoaderContext =
 
 proc runFileLoader*(fd: cint, config: LoaderConfig) =
   var ctx = initLoaderContext(fd, config)
-  var buffer {.noinit.}: array[16384, uint8]
   while ctx.alive:
     let events = ctx.selector.select(-1)
-    var unreg: seq[int]
+    var unregRead: seq[LoaderHandle]
+    var unregWrite: seq[LoaderHandle]
     for event in events:
       if Read in event.events:
         if event.fd == ctx.fd: # incoming connection
           ctx.acceptConnection()
         else:
           let handle = ctx.handleMap[event.fd]
-          while not handle.istream.atEnd:
+          assert event.fd != handle.fd
+          while true:
             try:
-              let n = handle.istream.readData(addr buffer[0], buffer.len)
-              handle.sendData(addr buffer[0], n)
+              let buffer = newLoaderBuffer()
+              buffer.len = handle.istream.readData(addr buffer[0], buffer.cap)
+              if buffer.len == 0:
+                dealloc(buffer)
+                break
+              handle.addBuffer(buffer)
+              if buffer.len < buffer.cap:
+                break
             except ErrorAgain, ErrorWouldBlock: # retry later
               break
-            except ErrorBrokenPipe: # receiver died; stop streaming
-              unreg.add(event.fd)
+            except ErrorBrokenPipe: # sender died; stop streaming
+              unregRead.add(handle)
               break
+      if Write in event.events:
+        let handle = ctx.handleMap[event.fd]
+        assert event.fd == handle.fd
+        while handle.currentBuffer != nil:
+          let buffer = handle.currentBuffer
+          try:
+            let i = handle.currentBufferIdx
+            assert buffer.len - i > 0
+            let n = handle.sendData(addr buffer[i], buffer.len - i)
+            handle.currentBufferIdx += n
+            if handle.currentBufferIdx < buffer.len:
+              break
+            handle.bufferCleared() # swap out buffer
+          except ErrorAgain, ErrorWouldBlock: # never mind
+            break
+          except ErrorBrokenPipe: # receiver died; stop streaming
+            unregWrite.add(handle)
+            break
+        if handle.istream == nil and handle.currentBuffer == nil and
+            (unregWrite.len == 0 or unregWrite[^1] != handle):
+          # after EOF, but not appended in this send cycle
+          unregWrite.add(handle)
       if Error in event.events:
         assert event.fd != ctx.fd
-        when defined(debug):
-          # sanity check
-          let handle = ctx.handleMap[event.fd]
-          if not handle.istream.atEnd():
-            let n = handle.istream.readData(addr buffer[0], buffer.len)
-            assert n == 0
-            assert handle.istream.atEnd()
-        unreg.add(event.fd)
-    for fd in unreg:
-      ctx.selector.unregister(fd)
-      let handle = ctx.handleMap[fd]
-      ctx.handleMap.del(fd)
-      ctx.handleMap.del(handle.getFd())
-      handle.close()
+        let handle = ctx.handleMap[event.fd]
+        if handle.fd == event.fd: # ostream died
+          unregWrite.add(handle)
+        else: # istream died
+          unregRead.add(handle)
+    for handle in unregRead:
+      ctx.selector.unregister(handle.istream.fd)
+      ctx.handleMap.del(handle.istream.fd)
+      handle.istream.close()
+      handle.istream = nil
+      if handle.currentBuffer == nil:
+        unregWrite.add(handle)
+      #TODO TODO TODO what to do about sostream
+    for handle in unregWrite:
+      ctx.selector.unregister(handle.fd)
+      ctx.handleMap.del(handle.fd)
+      handle.ostream.close()
+      handle.ostream = nil
+      if handle.istream != nil:
+        handle.istream.close()
+        ctx.handleMap.del(handle.istream.fd)
+        ctx.selector.unregister(handle.istream.fd)
+        handle.istream.close()
+        handle.istream = nil
+      #TODO TODO TODO what to do about sostream
   ctx.exitLoader()
 
 proc getAttribute(contentType, attrname: string): string =
@@ -478,6 +517,8 @@ proc onRead*(loader: FileLoader, fd: int) =
         buffer[].buf.setLen(olen + BufferSize)
         let n = response.body.readData(addr buffer[].buf[olen], BufferSize)
         buffer[].buf.setLen(olen + n)
+        if n == 0:
+          break
       except ErrorAgain, ErrorWouldBlock:
         break
     if response.body.atEnd():
diff --git a/src/loader/loaderhandle.nim b/src/loader/loaderhandle.nim
index 7a9b3434..5d2dee4e 100644
--- a/src/loader/loaderhandle.nim
+++ b/src/loader/loaderhandle.nim
@@ -1,3 +1,4 @@
+import std/deques
 import std/net
 import std/streams
 
@@ -7,28 +8,72 @@ import io/serialize
 import io/socketstream
 import loader/headers
 
-type LoaderHandle* = ref object
-  ostream: Stream
-  # Stream for taking input
-  istream*: PosixStream
-  # Only the first handle can be redirected, because a) mailcap can only
-  # redirect the first handle and b) async redirects would result in race
-  # conditions that would be difficult to untangle.
-  canredir: bool
-  sostream: Stream # saved ostream when redirected
-  sostream_suspend: Stream # saved ostream when suspended
-  fd: int
+import types/url
+type
+  LoaderBufferPage = array[4056, uint8] # 4096 - 8 - 32
+
+  LoaderBufferObj = object
+    page*: LoaderBufferPage
+    len: int
+
+  LoaderBuffer* = ptr LoaderBufferObj
+
+  LoaderHandle* = ref object
+    ostream*: PosixStream #TODO un-extern
+    # Stream for taking input
+    istream*: PosixStream
+    # Only the first handle can be redirected, because a) mailcap can only
+    # redirect the first handle and b) async redirects would result in race
+    # conditions that would be difficult to untangle.
+    canredir: bool
+    sostream: Stream # saved ostream when redirected
+    sostream_suspend: Stream # saved ostream when suspended
+    fd*: int # ostream fd
+    currentBuffer*: LoaderBuffer
+    currentBufferIdx*: int
+    buffers: Deque[LoaderBuffer]
+    url*: URL #TODO TODO TODO debug
 
 # Create a new loader handle, with the output stream ostream.
-proc newLoaderHandle*(ostream: Stream, canredir: bool): LoaderHandle =
+proc newLoaderHandle*(ostream: PosixStream, canredir: bool, url: URL): LoaderHandle =
   return LoaderHandle(
     ostream: ostream,
     canredir: canredir,
-    fd: int(SocketStream(ostream).source.getFd())
+    fd: int(SocketStream(ostream).source.getFd()),
+    url: url
   )
 
-proc getFd*(handle: LoaderHandle): int =
-  return handle.fd
+func `[]`*(buffer: LoaderBuffer, i: int): var uint8 {.inline.} =
+  return buffer[].page[i]
+
+func cap*(buffer: LoaderBuffer): int {.inline.} =
+  return buffer[].page.len
+
+func len*(buffer: LoaderBuffer): var int {.inline.} =
+  return buffer[].len
+
+proc `len=`*(buffer: LoaderBuffer, i: int) {.inline.} =
+  buffer[].len = i
+
+proc newLoaderBuffer*(): LoaderBuffer =
+  let buffer = cast[LoaderBuffer](alloc(sizeof(LoaderBufferObj)))
+  buffer.len = 0
+  return buffer
+
+proc addBuffer*(handle: LoaderHandle, buffer: LoaderBuffer) =
+  if handle.currentBuffer == nil:
+    handle.currentBuffer = buffer
+  else:
+    handle.buffers.addLast(buffer)
+
+proc bufferCleared*(handle: LoaderHandle) =
+  assert handle.currentBuffer != nil
+  handle.currentBufferIdx = 0
+  dealloc(handle.currentBuffer)
+  if handle.buffers.len > 0:
+    handle.currentBuffer = handle.buffers.popFirst()
+  else:
+    handle.currentBuffer = nil
 
 proc addOutputStream*(handle: LoaderHandle, stream: Stream) =
   if likely(handle.sostream_suspend != nil):
@@ -43,8 +88,18 @@ proc addOutputStream*(handle: LoaderHandle, stream: Stream) =
     # sostream_suspend is never nil when the function is called.
     # (Feel free to remove this assertion if this changes.)
     doAssert false
-    let ms = newMultiStream(handle.ostream, stream)
-    handle.ostream = ms
+    #TODO TODO TODO fix this
+    #let ms = newMultiStream(handle.ostream, stream)
+    #handle.ostream = ms
+
+proc setBlocking*(handle: LoaderHandle, blocking: bool) =
+  #TODO this is stupid
+  if handle.sostream_suspend != nil and handle.sostream_suspend of SocketStream:
+    SocketStream(handle.sostream_suspend).setBlocking(blocking)
+  elif handle.sostream != nil and handle.sostream of SocketStream:
+    SocketStream(handle.sostream).setBlocking(blocking)
+  else:
+    SocketStream(handle.ostream).setBlocking(blocking)
 
 proc sendResult*(handle: LoaderHandle, res: int, msg = "") =
   handle.ostream.swrite(res)
@@ -67,23 +122,25 @@ proc sendHeaders*(handle: LoaderHandle, headers: Headers) =
       let stream = newPosixStream(fd)
       handle.ostream = stream
 
-proc sendData*(handle: LoaderHandle, p: pointer, nmemb: int) =
-  handle.ostream.writeData(p, nmemb)
-
-proc sendData*(handle: LoaderHandle, s: string) =
-  if s.len > 0:
-    handle.sendData(unsafeAddr s[0], s.len)
+proc sendData*(handle: LoaderHandle, p: pointer, nmemb: int): int =
+  return handle.ostream.sendData(p, nmemb)
 
 proc suspend*(handle: LoaderHandle) =
+  #TODO TODO TODO fix suspend
+  doAssert false
   handle.sostream_suspend = handle.ostream
-  handle.ostream = newStringStream()
+  #handle.ostream = newStringStream()
 
 proc resume*(handle: LoaderHandle) =
+  #TODO TODO TODO fix resume
+  doAssert false
+  #[
   let ss = handle.ostream
   handle.ostream = handle.sostream_suspend
   handle.sostream_suspend = nil
   handle.sendData(ss.readAll())
   ss.close()
+  ]#
 
 proc close*(handle: LoaderHandle) =
   if handle.sostream != nil:
@@ -93,6 +150,9 @@ proc close*(handle: LoaderHandle) =
       # ignore error, that just means the buffer has already closed the stream
       discard
     handle.sostream.close()
-  handle.ostream.close()
+  if handle.ostream != nil:
+    handle.ostream.close()
+    handle.ostream = nil
   if handle.istream != nil:
     handle.istream.close()
+    handle.istream = nil