diff options
Diffstat (limited to 'src/loader/loader.nim')
-rw-r--r-- | src/loader/loader.nim | 93 |
1 files changed, 67 insertions, 26 deletions
diff --git a/src/loader/loader.nim b/src/loader/loader.nim index 02585630..749f7d29 100644 --- a/src/loader/loader.nim +++ b/src/loader/loader.nim @@ -139,7 +139,9 @@ proc loadResource(ctx: LoaderContext, request: Request, handle: LoaderHandle) = handle.close() else: let fd = handle.istream.fd + handle.setBlocking(false) ctx.selector.registerHandle(fd, {Read}, 0) + ctx.selector.registerHandle(handle.fd, {Write}, 0) let ofl = fcntl(fd, F_GETFL, 0) discard fcntl(fd, F_SETFL, ofl or O_NONBLOCK) # yes, this puts the istream fd in addition to the ostream fd in @@ -164,7 +166,7 @@ proc loadResource(ctx: LoaderContext, request: Request, handle: LoaderHandle) = proc onLoad(ctx: LoaderContext, stream: SocketStream) = var request: Request stream.sread(request) - let handle = newLoaderHandle(stream, request.canredir) + let handle = newLoaderHandle(stream, request.canredir, request.url) if not ctx.config.filter.match(request.url): handle.sendResult(ERROR_DISALLOWED_URL) handle.close() @@ -188,9 +190,6 @@ proc onLoad(ctx: LoaderContext, stream: SocketStream) = ctx.loadResource(request, handle) proc acceptConnection(ctx: LoaderContext) = - #TODO TODO TODO acceptSocketStream should be non-blocking here, - # otherwise the client disconnecting between poll and accept could - # block this indefinitely. let stream = ctx.ssock.acceptSocketStream() try: var cmd: LoaderCommand @@ -250,7 +249,7 @@ proc initLoaderContext(fd: cint, config: LoaderConfig): LoaderContext = gctx = ctx #TODO ideally, buffered would be true. Unfortunately this conflicts with # sendFileHandle/recvFileHandle. - ctx.ssock = initServerSocket(buffered = false) + ctx.ssock = initServerSocket(buffered = false, blocking = false) ctx.fd = int(ctx.ssock.sock.getFd()) ctx.selector.registerHandle(ctx.fd, {Read}, 0) # The server has been initialized, so the main process can resume execution. @@ -271,41 +270,81 @@ proc initLoaderContext(fd: cint, config: LoaderConfig): LoaderContext = proc runFileLoader*(fd: cint, config: LoaderConfig) = var ctx = initLoaderContext(fd, config) - var buffer {.noinit.}: array[16384, uint8] while ctx.alive: let events = ctx.selector.select(-1) - var unreg: seq[int] + var unregRead: seq[LoaderHandle] + var unregWrite: seq[LoaderHandle] for event in events: if Read in event.events: if event.fd == ctx.fd: # incoming connection ctx.acceptConnection() else: let handle = ctx.handleMap[event.fd] - while not handle.istream.atEnd: + assert event.fd != handle.fd + while true: try: - let n = handle.istream.readData(addr buffer[0], buffer.len) - handle.sendData(addr buffer[0], n) + let buffer = newLoaderBuffer() + buffer.len = handle.istream.readData(addr buffer[0], buffer.cap) + if buffer.len == 0: + dealloc(buffer) + break + handle.addBuffer(buffer) + if buffer.len < buffer.cap: + break except ErrorAgain, ErrorWouldBlock: # retry later break - except ErrorBrokenPipe: # receiver died; stop streaming - unreg.add(event.fd) + except ErrorBrokenPipe: # sender died; stop streaming + unregRead.add(handle) break + if Write in event.events: + let handle = ctx.handleMap[event.fd] + assert event.fd == handle.fd + while handle.currentBuffer != nil: + let buffer = handle.currentBuffer + try: + let i = handle.currentBufferIdx + assert buffer.len - i > 0 + let n = handle.sendData(addr buffer[i], buffer.len - i) + handle.currentBufferIdx += n + if handle.currentBufferIdx < buffer.len: + break + handle.bufferCleared() # swap out buffer + except ErrorAgain, ErrorWouldBlock: # never mind + break + except ErrorBrokenPipe: # receiver died; stop streaming + unregWrite.add(handle) + break + if handle.istream == nil and handle.currentBuffer == nil and + (unregWrite.len == 0 or unregWrite[^1] != handle): + # after EOF, but not appended in this send cycle + unregWrite.add(handle) if Error in event.events: assert event.fd != ctx.fd - when defined(debug): - # sanity check - let handle = ctx.handleMap[event.fd] - if not handle.istream.atEnd(): - let n = handle.istream.readData(addr buffer[0], buffer.len) - assert n == 0 - assert handle.istream.atEnd() - unreg.add(event.fd) - for fd in unreg: - ctx.selector.unregister(fd) - let handle = ctx.handleMap[fd] - ctx.handleMap.del(fd) - ctx.handleMap.del(handle.getFd()) - handle.close() + let handle = ctx.handleMap[event.fd] + if handle.fd == event.fd: # ostream died + unregWrite.add(handle) + else: # istream died + unregRead.add(handle) + for handle in unregRead: + ctx.selector.unregister(handle.istream.fd) + ctx.handleMap.del(handle.istream.fd) + handle.istream.close() + handle.istream = nil + if handle.currentBuffer == nil: + unregWrite.add(handle) + #TODO TODO TODO what to do about sostream + for handle in unregWrite: + ctx.selector.unregister(handle.fd) + ctx.handleMap.del(handle.fd) + handle.ostream.close() + handle.ostream = nil + if handle.istream != nil: + handle.istream.close() + ctx.handleMap.del(handle.istream.fd) + ctx.selector.unregister(handle.istream.fd) + handle.istream.close() + handle.istream = nil + #TODO TODO TODO what to do about sostream ctx.exitLoader() proc getAttribute(contentType, attrname: string): string = @@ -478,6 +517,8 @@ proc onRead*(loader: FileLoader, fd: int) = buffer[].buf.setLen(olen + BufferSize) let n = response.body.readData(addr buffer[].buf[olen], BufferSize) buffer[].buf.setLen(olen + n) + if n == 0: + break except ErrorAgain, ErrorWouldBlock: break if response.body.atEnd(): |