diff options
author | bptato <nincsnevem662@gmail.com> | 2024-02-11 15:04:03 +0100 |
---|---|---|
committer | bptato <nincsnevem662@gmail.com> | 2024-02-11 15:04:03 +0100 |
commit | 27a47bc9b7ba2fa0e2134bfd14565f11d94289fc (patch) | |
tree | 9ab2a2deb1a900912fda2ad5860818cefc418983 /src | |
parent | b36116d9e7004282803070877278cb10fbdfae57 (diff) | |
download | chawan-27a47bc9b7ba2fa0e2134bfd14565f11d94289fc.tar.gz |
loader: significantly more efficient loading
The previous version was running the processor on 100% because select would immediately return for writes even when no buffers to send were available. (This has been the case since I added asynchronous sending, but the previous commit put the console buffer's fd in loader too and that made the problem quite obvious.)
Diffstat (limited to 'src')
-rw-r--r-- | src/loader/loader.nim | 223 | ||||
-rw-r--r-- | src/loader/loaderhandle.nim | 12 |
2 files changed, 137 insertions, 98 deletions
diff --git a/src/loader/loader.nim b/src/loader/loader.nim index 3ca56cba..95d119e2 100644 --- a/src/loader/loader.nim +++ b/src/loader/loader.nim @@ -150,7 +150,6 @@ proc addFd(ctx: LoaderContext, handle: LoaderHandle) = let output = handle.output output.ostream.setBlocking(false) ctx.selector.registerHandle(handle.istream.fd, {Read}, 0) - ctx.selector.registerHandle(output.ostream.fd, {Write}, 0) let ofl = fcntl(handle.istream.fd, F_GETFL, 0) discard fcntl(handle.istream.fd, F_SETFL, ofl or O_NONBLOCK) ctx.handleMap[handle.istream.fd] = handle @@ -346,53 +345,146 @@ proc initLoaderContext(fd: cint, config: LoaderConfig): LoaderContext = dir &= '/' return ctx +# Either write data to the target output, or append it to the list of buffers to +# write and register the output in our selector. +proc pushBuffer(ctx: LoaderContext, handle: LoaderHandle, + buffer: LoaderBuffer, unregWrite: var seq[OutputHandle]) = + for output in handle.outputs: + if output.currentBuffer == nil: + var n = 0 + try: + n = output.sendData(addr buffer[0], buffer.len) + except ErrorAgain, ErrorWouldBlock: + discard + except ErrorBrokenPipe: + unregWrite.add(output) + break + if n < buffer.len: + output.currentBuffer = buffer + output.currentBufferIdx = n + ctx.selector.registerHandle(output.ostream.fd, {Write}, 0) + output.registered = true + else: + output.addBuffer(buffer) + +# Called whenever there is more data available to read. +proc handleRead(ctx: LoaderContext, handle: LoaderHandle, + unregRead: var seq[LoaderHandle], unregWrite: var seq[OutputHandle]) = + while true: + let buffer = newLoaderBuffer() + try: + buffer.len = handle.istream.recvData(addr buffer[0], buffer.cap) + if buffer.len == 0: + break + ctx.pushBuffer(handle, buffer, unregWrite) + if buffer.len < buffer.cap: + break + except ErrorAgain, ErrorWouldBlock: # retry later + break + except ErrorBrokenPipe: # sender died; stop streaming + unregRead.add(handle) + break + +# This is only called when an OutputHandle could not read enough of one (or +# more) buffers, and we asked select to notify us when it will be available. +proc handleWrite(ctx: LoaderContext, output: OutputHandle, + unregWrite: var seq[OutputHandle]) = + while output.currentBuffer != nil: + let buffer = output.currentBuffer + try: + let i = output.currentBufferIdx + assert buffer.len - i > 0 + let n = output.sendData(addr buffer[i], buffer.len - i) + output.currentBufferIdx += n + if output.currentBufferIdx < buffer.len: + break + output.bufferCleared() # swap out buffer + except ErrorAgain, ErrorWouldBlock: # never mind + break + except ErrorBrokenPipe: # receiver died; stop streaming + unregWrite.add(output) + break + if output.currentBuffer == nil: + if output.istreamAtEnd: + # after EOF, no need to send anything more here + unregWrite.add(output) + else: + # all buffers sent, no need to select on this output again for now + output.registered = false + ctx.selector.unregister(output.ostream.fd) + +proc finishCycle(ctx: LoaderContext, unregRead: var seq[LoaderHandle], + unregWrite: var seq[OutputHandle]) = + # Unregister handles queued for unregistration. + # It is possible for both unregRead and unregWrite to contain duplicates. To + # avoid double-close/double-unregister, we set the istream/ostream of + # unregistered handles to nil. + for handle in unregRead: + if handle.istream != nil: + ctx.selector.unregister(handle.istream.fd) + ctx.handleMap.del(handle.istream.fd) + handle.istream.close() + handle.istream = nil + for output in handle.outputs: + output.istreamAtEnd = true + if output.currentBuffer == nil: + unregWrite.add(output) + for output in unregWrite: + if output.ostream != nil: + if output.registered: + ctx.selector.unregister(output.ostream.fd) + ctx.outputMap.del(output.ostream.fd) + if output.clientFd != -1: + ctx.clientFdMap.delOutput(output.clientPid, output.clientFd) + output.ostream.close() + output.ostream = nil + let handle = output.parent + let i = handle.outputs.find(output) + handle.outputs.del(i) + if handle.outputs.len == 0 and handle.istream != nil: + # premature end of all output streams; kill istream too + ctx.selector.unregister(handle.istream.fd) + ctx.handleMap.del(handle.istream.fd) + handle.istream.close() + handle.istream = nil + if output.sostream != nil: + #TODO it is not clear what should happen when multiple outputs exist. + # + # Normally, sostream is created after redirection, and must be written + # to & closed after the input has completely been written into the + # output stream. e.g. runMailcapEntryFile uses this to wait for the file + # to be completely downloaded before executing an entry that takes a + # file parameter. + # + # We should either block clone in this case, or find a better way to + # wait for file downloads to finish. (Note that the buffer remaining + # opened until the file has been downloaded is a somewhat useful visual + # indication; while it does not show progress (bad), it does at least + # show that *something* has been opened. An alternative should probably + # add a temporary entry to a file download screen or something.) + try: + output.sostream.swrite(true) + except IOError: + # ignore error, that just means the buffer has already closed the + # stream + discard + output.sostream.close() + output.sostream = nil + proc runFileLoader*(fd: cint, config: LoaderConfig) = var ctx = initLoaderContext(fd, config) while ctx.alive: let events = ctx.selector.select(-1) - var unregRead: seq[LoaderHandle] - var unregWrite: seq[OutputHandle] + var unregRead: seq[LoaderHandle] = @[] + var unregWrite: seq[OutputHandle] = @[] for event in events: if Read in event.events: if event.fd == ctx.fd: # incoming connection ctx.acceptConnection() else: - let handle = ctx.handleMap[event.fd] - assert event.fd == handle.istream.fd - while true: - let buffer = newLoaderBuffer() - try: - buffer.len = handle.istream.recvData(addr buffer[0], buffer.cap) - if buffer.len == 0: - break - handle.addBuffer(buffer) - if buffer.len < buffer.cap: - break - except ErrorAgain, ErrorWouldBlock: # retry later - break - except ErrorBrokenPipe: # sender died; stop streaming - unregRead.add(handle) - break + ctx.handleRead(ctx.handleMap[event.fd], unregRead, unregWrite) if Write in event.events: - let output = ctx.outputMap[event.fd] - while output.currentBuffer != nil: - let buffer = output.currentBuffer - try: - let i = output.currentBufferIdx - assert buffer.len - i > 0 - let n = output.sendData(addr buffer[i], buffer.len - i) - output.currentBufferIdx += n - if output.currentBufferIdx < buffer.len: - break - output.bufferCleared() # swap out buffer - except ErrorAgain, ErrorWouldBlock: # never mind - break - except ErrorBrokenPipe: # receiver died; stop streaming - unregWrite.add(output) - break - if output.istreamAtEnd and output.currentBuffer == nil: - # after EOF, but not appended in this send cycle - unregWrite.add(output) + ctx.handleWrite(ctx.outputMap[event.fd], unregWrite) if Error in event.events: assert event.fd != ctx.fd ctx.outputMap.withValue(event.fd, outputp): # ostream died @@ -400,60 +492,7 @@ proc runFileLoader*(fd: cint, config: LoaderConfig) = do: # istream died let handle = ctx.handleMap[event.fd] unregRead.add(handle) - # Unregister handles queued for unregistration. - # It is possible for both unregRead and unregWrite to contain duplicates. To - # avoid double-close/double-unregister, we set the istream/ostream of - # unregistered handles to nil. - for handle in unregRead: - if handle.istream != nil: - ctx.selector.unregister(handle.istream.fd) - ctx.handleMap.del(handle.istream.fd) - handle.istream.close() - handle.istream = nil - for output in handle.outputs: - output.istreamAtEnd = true - if output.currentBuffer == nil: - unregWrite.add(output) - for output in unregWrite: - if output.ostream != nil: - ctx.selector.unregister(output.ostream.fd) - ctx.outputMap.del(output.ostream.fd) - if output.clientFd != -1: - ctx.clientFdMap.delOutput(output.clientPid, output.clientFd) - output.ostream.close() - output.ostream = nil - let handle = output.parent - let i = handle.outputs.find(output) - handle.outputs.del(i) - if handle.outputs.len == 0 and handle.istream != nil: - # premature end of all output streams; kill istream too - ctx.selector.unregister(handle.istream.fd) - ctx.handleMap.del(handle.istream.fd) - handle.istream.close() - handle.istream = nil - if output.sostream != nil: - #TODO it is not clear what should happen when multiple outputs exist. - # - # Normally, sostream is created after redirection, and must be written - # to & closed after the input has completely been written into the - # output stream. e.g. runMailcapEntryFile uses this to wait for the file - # to be completely downloaded before executing an entry that takes a - # file parameter. - # - # We should either block clone in this case, or find a better way to - # wait for file downloads to finish. (Note that the buffer remaining - # opened until the file has been downloaded is a somewhat useful visual - # indication; while it does not show progress (bad), it does at least - # show that *something* has been opened. An alternative should probably - # add a temporary entry to a file download screen or something.) - try: - output.sostream.swrite(true) - except IOError: - # ignore error, that just means the buffer has already closed the - # stream - discard - output.sostream.close() - output.sostream = nil + ctx.finishCycle(unregRead, unregWrite) ctx.exitLoader() proc getAttribute(contentType, attrname: string): string = diff --git a/src/loader/loaderhandle.nim b/src/loader/loaderhandle.nim index 7f583d2b..579c553d 100644 --- a/src/loader/loaderhandle.nim +++ b/src/loader/loaderhandle.nim @@ -29,6 +29,7 @@ type sostream*: PosixStream # saved ostream when redirected clientFd*: int clientPid*: int + registered*: bool LoaderHandle* = ref object # Stream for taking input @@ -86,12 +87,11 @@ proc newLoaderBuffer*(): LoaderBuffer = buffer.len = 0 return buffer -proc addBuffer*(handle: LoaderHandle, buffer: LoaderBuffer) = - for output in handle.outputs.mitems: - if output.currentBuffer == nil: - output.currentBuffer = buffer - else: - output.buffers.addLast(buffer) +proc addBuffer*(output: OutputHandle, buffer: LoaderBuffer) = + if output.currentBuffer == nil: + output.currentBuffer = buffer + else: + output.buffers.addLast(buffer) proc bufferCleared*(output: OutputHandle) = assert output.currentBuffer != nil |