about summary refs log tree commit diff stats
path: root/src/loader
diff options
context:
space:
mode:
authorbptato <nincsnevem662@gmail.com>2024-02-11 15:04:03 +0100
committerbptato <nincsnevem662@gmail.com>2024-02-11 15:04:03 +0100
commit27a47bc9b7ba2fa0e2134bfd14565f11d94289fc (patch)
tree9ab2a2deb1a900912fda2ad5860818cefc418983 /src/loader
parentb36116d9e7004282803070877278cb10fbdfae57 (diff)
downloadchawan-27a47bc9b7ba2fa0e2134bfd14565f11d94289fc.tar.gz
loader: significantly more efficient loading
The previous version was running the processor on 100% because select
would immediately return for writes even when no buffers to send were
available.

(This has been the case since I added asynchronous sending, but the
previous commit put the console buffer's fd in loader too and that made
the problem quite obvious.)
Diffstat (limited to 'src/loader')
-rw-r--r--src/loader/loader.nim223
-rw-r--r--src/loader/loaderhandle.nim12
2 files changed, 137 insertions, 98 deletions
diff --git a/src/loader/loader.nim b/src/loader/loader.nim
index 3ca56cba..95d119e2 100644
--- a/src/loader/loader.nim
+++ b/src/loader/loader.nim
@@ -150,7 +150,6 @@ proc addFd(ctx: LoaderContext, handle: LoaderHandle) =
   let output = handle.output
   output.ostream.setBlocking(false)
   ctx.selector.registerHandle(handle.istream.fd, {Read}, 0)
-  ctx.selector.registerHandle(output.ostream.fd, {Write}, 0)
   let ofl = fcntl(handle.istream.fd, F_GETFL, 0)
   discard fcntl(handle.istream.fd, F_SETFL, ofl or O_NONBLOCK)
   ctx.handleMap[handle.istream.fd] = handle
@@ -346,53 +345,146 @@ proc initLoaderContext(fd: cint, config: LoaderConfig): LoaderContext =
       dir &= '/'
   return ctx
 
+# Either write data to the target output, or append it to the list of buffers to
+# write and register the output in our selector.
+proc pushBuffer(ctx: LoaderContext, handle: LoaderHandle,
+    buffer: LoaderBuffer, unregWrite: var seq[OutputHandle]) =
+  for output in handle.outputs:
+    if output.currentBuffer == nil:
+      var n = 0
+      try:
+        n = output.sendData(addr buffer[0], buffer.len)
+      except ErrorAgain, ErrorWouldBlock:
+        discard
+      except ErrorBrokenPipe:
+        unregWrite.add(output)
+        break
+      if n < buffer.len:
+        output.currentBuffer = buffer
+        output.currentBufferIdx = n
+        ctx.selector.registerHandle(output.ostream.fd, {Write}, 0)
+        output.registered = true
+    else:
+      output.addBuffer(buffer)
+
+# Called whenever there is more data available to read.
+proc handleRead(ctx: LoaderContext, handle: LoaderHandle,
+    unregRead: var seq[LoaderHandle], unregWrite: var seq[OutputHandle]) =
+  while true:
+    let buffer = newLoaderBuffer()
+    try:
+      buffer.len = handle.istream.recvData(addr buffer[0], buffer.cap)
+      if buffer.len == 0:
+        break
+      ctx.pushBuffer(handle, buffer, unregWrite)
+      if buffer.len < buffer.cap:
+        break
+    except ErrorAgain, ErrorWouldBlock: # retry later
+      break
+    except ErrorBrokenPipe: # sender died; stop streaming
+      unregRead.add(handle)
+      break
+
+# This is only called when an OutputHandle could not read enough of one (or
+# more) buffers, and we asked select to notify us when it will be available.
+proc handleWrite(ctx: LoaderContext, output: OutputHandle,
+    unregWrite: var seq[OutputHandle]) =
+  while output.currentBuffer != nil:
+    let buffer = output.currentBuffer
+    try:
+      let i = output.currentBufferIdx
+      assert buffer.len - i > 0
+      let n = output.sendData(addr buffer[i], buffer.len - i)
+      output.currentBufferIdx += n
+      if output.currentBufferIdx < buffer.len:
+        break
+      output.bufferCleared() # swap out buffer
+    except ErrorAgain, ErrorWouldBlock: # never mind
+      break
+    except ErrorBrokenPipe: # receiver died; stop streaming
+      unregWrite.add(output)
+      break
+  if output.currentBuffer == nil:
+    if output.istreamAtEnd:
+      # after EOF, no need to send anything more here
+      unregWrite.add(output)
+    else:
+      # all buffers sent, no need to select on this output again for now
+      output.registered = false
+      ctx.selector.unregister(output.ostream.fd)
+
+proc finishCycle(ctx: LoaderContext, unregRead: var seq[LoaderHandle],
+    unregWrite: var seq[OutputHandle]) =
+  # Unregister handles queued for unregistration.
+  # It is possible for both unregRead and unregWrite to contain duplicates. To
+  # avoid double-close/double-unregister, we set the istream/ostream of
+  # unregistered handles to nil.
+  for handle in unregRead:
+    if handle.istream != nil:
+      ctx.selector.unregister(handle.istream.fd)
+      ctx.handleMap.del(handle.istream.fd)
+      handle.istream.close()
+      handle.istream = nil
+      for output in handle.outputs:
+        output.istreamAtEnd = true
+        if output.currentBuffer == nil:
+          unregWrite.add(output)
+  for output in unregWrite:
+    if output.ostream != nil:
+      if output.registered:
+        ctx.selector.unregister(output.ostream.fd)
+      ctx.outputMap.del(output.ostream.fd)
+      if output.clientFd != -1:
+        ctx.clientFdMap.delOutput(output.clientPid, output.clientFd)
+      output.ostream.close()
+      output.ostream = nil
+      let handle = output.parent
+      let i = handle.outputs.find(output)
+      handle.outputs.del(i)
+      if handle.outputs.len == 0 and handle.istream != nil:
+        # premature end of all output streams; kill istream too
+        ctx.selector.unregister(handle.istream.fd)
+        ctx.handleMap.del(handle.istream.fd)
+        handle.istream.close()
+        handle.istream = nil
+    if output.sostream != nil:
+      #TODO it is not clear what should happen when multiple outputs exist.
+      #
+      # Normally, sostream is created after redirection, and must be written
+      # to & closed after the input has completely been written into the
+      # output stream. e.g. runMailcapEntryFile uses this to wait for the file
+      # to be completely downloaded before executing an entry that takes a
+      # file parameter.
+      #
+      # We should either block clone in this case, or find a better way to
+      # wait for file downloads to finish. (Note that the buffer remaining
+      # opened until the file has been downloaded is a somewhat useful visual
+      # indication; while it does not show progress (bad), it does at least
+      # show that *something* has been opened. An alternative should probably
+      # add a temporary entry to a file download screen or something.)
+      try:
+        output.sostream.swrite(true)
+      except IOError:
+        # ignore error, that just means the buffer has already closed the
+        # stream
+        discard
+      output.sostream.close()
+      output.sostream = nil
+
 proc runFileLoader*(fd: cint, config: LoaderConfig) =
   var ctx = initLoaderContext(fd, config)
   while ctx.alive:
     let events = ctx.selector.select(-1)
-    var unregRead: seq[LoaderHandle]
-    var unregWrite: seq[OutputHandle]
+    var unregRead: seq[LoaderHandle] = @[]
+    var unregWrite: seq[OutputHandle] = @[]
     for event in events:
       if Read in event.events:
         if event.fd == ctx.fd: # incoming connection
           ctx.acceptConnection()
         else:
-          let handle = ctx.handleMap[event.fd]
-          assert event.fd == handle.istream.fd
-          while true:
-            let buffer = newLoaderBuffer()
-            try:
-              buffer.len = handle.istream.recvData(addr buffer[0], buffer.cap)
-              if buffer.len == 0:
-                break
-              handle.addBuffer(buffer)
-              if buffer.len < buffer.cap:
-                break
-            except ErrorAgain, ErrorWouldBlock: # retry later
-              break
-            except ErrorBrokenPipe: # sender died; stop streaming
-              unregRead.add(handle)
-              break
+          ctx.handleRead(ctx.handleMap[event.fd], unregRead, unregWrite)
       if Write in event.events:
-        let output = ctx.outputMap[event.fd]
-        while output.currentBuffer != nil:
-          let buffer = output.currentBuffer
-          try:
-            let i = output.currentBufferIdx
-            assert buffer.len - i > 0
-            let n = output.sendData(addr buffer[i], buffer.len - i)
-            output.currentBufferIdx += n
-            if output.currentBufferIdx < buffer.len:
-              break
-            output.bufferCleared() # swap out buffer
-          except ErrorAgain, ErrorWouldBlock: # never mind
-            break
-          except ErrorBrokenPipe: # receiver died; stop streaming
-            unregWrite.add(output)
-            break
-        if output.istreamAtEnd and output.currentBuffer == nil:
-          # after EOF, but not appended in this send cycle
-          unregWrite.add(output)
+        ctx.handleWrite(ctx.outputMap[event.fd], unregWrite)
       if Error in event.events:
         assert event.fd != ctx.fd
         ctx.outputMap.withValue(event.fd, outputp): # ostream died
@@ -400,60 +492,7 @@ proc runFileLoader*(fd: cint, config: LoaderConfig) =
         do: # istream died
           let handle = ctx.handleMap[event.fd]
           unregRead.add(handle)
-    # Unregister handles queued for unregistration.
-    # It is possible for both unregRead and unregWrite to contain duplicates. To
-    # avoid double-close/double-unregister, we set the istream/ostream of
-    # unregistered handles to nil.
-    for handle in unregRead:
-      if handle.istream != nil:
-        ctx.selector.unregister(handle.istream.fd)
-        ctx.handleMap.del(handle.istream.fd)
-        handle.istream.close()
-        handle.istream = nil
-        for output in handle.outputs:
-          output.istreamAtEnd = true
-          if output.currentBuffer == nil:
-            unregWrite.add(output)
-    for output in unregWrite:
-      if output.ostream != nil:
-        ctx.selector.unregister(output.ostream.fd)
-        ctx.outputMap.del(output.ostream.fd)
-        if output.clientFd != -1:
-          ctx.clientFdMap.delOutput(output.clientPid, output.clientFd)
-        output.ostream.close()
-        output.ostream = nil
-        let handle = output.parent
-        let i = handle.outputs.find(output)
-        handle.outputs.del(i)
-        if handle.outputs.len == 0 and handle.istream != nil:
-          # premature end of all output streams; kill istream too
-          ctx.selector.unregister(handle.istream.fd)
-          ctx.handleMap.del(handle.istream.fd)
-          handle.istream.close()
-          handle.istream = nil
-      if output.sostream != nil:
-        #TODO it is not clear what should happen when multiple outputs exist.
-        #
-        # Normally, sostream is created after redirection, and must be written
-        # to & closed after the input has completely been written into the
-        # output stream. e.g. runMailcapEntryFile uses this to wait for the file
-        # to be completely downloaded before executing an entry that takes a
-        # file parameter.
-        #
-        # We should either block clone in this case, or find a better way to
-        # wait for file downloads to finish. (Note that the buffer remaining
-        # opened until the file has been downloaded is a somewhat useful visual
-        # indication; while it does not show progress (bad), it does at least
-        # show that *something* has been opened. An alternative should probably
-        # add a temporary entry to a file download screen or something.)
-        try:
-          output.sostream.swrite(true)
-        except IOError:
-          # ignore error, that just means the buffer has already closed the
-          # stream
-          discard
-        output.sostream.close()
-        output.sostream = nil
+    ctx.finishCycle(unregRead, unregWrite)
   ctx.exitLoader()
 
 proc getAttribute(contentType, attrname: string): string =
diff --git a/src/loader/loaderhandle.nim b/src/loader/loaderhandle.nim
index 7f583d2b..579c553d 100644
--- a/src/loader/loaderhandle.nim
+++ b/src/loader/loaderhandle.nim
@@ -29,6 +29,7 @@ type
     sostream*: PosixStream # saved ostream when redirected
     clientFd*: int
     clientPid*: int
+    registered*: bool
 
   LoaderHandle* = ref object
     # Stream for taking input
@@ -86,12 +87,11 @@ proc newLoaderBuffer*(): LoaderBuffer =
   buffer.len = 0
   return buffer
 
-proc addBuffer*(handle: LoaderHandle, buffer: LoaderBuffer) =
-  for output in handle.outputs.mitems:
-    if output.currentBuffer == nil:
-      output.currentBuffer = buffer
-    else:
-      output.buffers.addLast(buffer)
+proc addBuffer*(output: OutputHandle, buffer: LoaderBuffer) =
+  if output.currentBuffer == nil:
+    output.currentBuffer = buffer
+  else:
+    output.buffers.addLast(buffer)
 
 proc bufferCleared*(output: OutputHandle) =
   assert output.currentBuffer != nil