diff options
-rw-r--r-- | src/io/dynstream.nim | 9 | ||||
-rw-r--r-- | src/loader/loader.nim | 31 | ||||
-rw-r--r-- | src/loader/loaderhandle.nim | 2 | ||||
-rw-r--r-- | src/local/pager.nim | 6 |
4 files changed, 37 insertions, 11 deletions
diff --git a/src/io/dynstream.nim b/src/io/dynstream.nim index 9e23a8ae..2bfa02f5 100644 --- a/src/io/dynstream.nim +++ b/src/io/dynstream.nim @@ -9,6 +9,7 @@ type DynStream* = ref object of RootObj isend*: bool blocking*: bool #TODO move to posixstream + closed: bool # Semantics of this function are those of POSIX read(2): that is, it may return # a result that is lower than `len`, and that does not mean the stream is @@ -155,7 +156,9 @@ method seek*(s: PosixStream; off: int) = raisePosixIOError() method sclose*(s: PosixStream) = + assert not s.closed discard close(s.fd) + s.closed = true proc newPosixStream*(fd: FileHandle): PosixStream = return PosixStream(fd: fd, blocking: true) @@ -214,7 +217,9 @@ method seek*(s: SocketStream; off: int) = doAssert false method sclose*(s: SocketStream) = + assert not s.closed s.source.close() + s.closed = true # see serversocket.nim for an explanation {.compile: "connect_unix.c".} @@ -301,7 +306,9 @@ method sendData*(s: BufStream; buffer: pointer; len: int): int = return len method sclose*(s: BufStream) = + assert not s.closed s.source.sclose() + s.closed = true proc flushWrite*(s: BufStream): bool = s.source.setBlocking(false) @@ -340,7 +347,9 @@ method seek*(s: DynFileStream; off: int) = s.file.setFilePos(int64(off)) method sclose*(s: DynFileStream) = + assert not s.closed s.file.close() + s.closed = true method sflush*(s: DynFileStream) = s.file.flushFile() diff --git a/src/loader/loader.nim b/src/loader/loader.nim index e3c36b01..01593a00 100644 --- a/src/loader/loader.nim +++ b/src/loader/loader.nim @@ -181,6 +181,16 @@ func findCachedHandle(ctx: LoaderContext; cacheId: int): LoaderHandle = type PushBufferResult = enum pbrDone, pbrUnregister +proc register(ctx: LoaderContext; handle: LoaderHandle) = + assert not handle.registered + ctx.selector.registerHandle(int(handle.istream.fd), {Read}, 0) + handle.registered = true + +proc unregister(ctx: LoaderContext; handle: LoaderHandle) = + assert handle.registered + ctx.selector.unregister(int(handle.istream.fd)) + handle.registered = false + proc register(ctx: LoaderContext; output: OutputHandle) = assert not output.registered ctx.selector.registerHandle(int(output.ostream.fd), {Write}, 0) @@ -278,7 +288,7 @@ proc addFd(ctx: LoaderContext; handle: LoaderHandle) = let output = handle.output output.ostream.setBlocking(false) handle.istream.setBlocking(false) - ctx.selector.registerHandle(int(handle.istream.fd), {Read}, 0) + ctx.register(handle) assert handle.istream.fd notin ctx.handleMap assert output.ostream.fd notin ctx.outputMap ctx.handleMap[handle.istream.fd] = handle @@ -894,7 +904,7 @@ proc finishCycle(ctx: LoaderContext; unregRead: var seq[LoaderHandle]; # unregistered handles to nil. for handle in unregRead: if handle.istream != nil: - ctx.selector.unregister(int(handle.istream.fd)) + ctx.unregister(handle) ctx.handleMap.del(handle.istream.fd) if handle.parser != nil: handle.finishParse() @@ -915,7 +925,7 @@ proc finishCycle(ctx: LoaderContext; unregRead: var seq[LoaderHandle]; handle.outputs.del(i) if handle.outputs.len == 0 and handle.istream != nil: # premature end of all output streams; kill istream too - ctx.selector.unregister(int(handle.istream.fd)) + ctx.unregister(handle) ctx.handleMap.del(handle.istream.fd) if handle.parser != nil: handle.finishParse() @@ -1060,6 +1070,7 @@ proc addCacheFile*(loader: FileLoader; outputId, targetPid: int): int = var r = stream.initPacketReader() var outputId: int r.sread(outputId) + stream.sclose() return outputId proc getCacheFile*(loader: FileLoader; cacheId: int): string = @@ -1072,6 +1083,7 @@ proc getCacheFile*(loader: FileLoader; cacheId: int): string = var r = stream.initPacketReader() var s: string r.sread(s) + stream.sclose() return s proc redirectToFile*(loader: FileLoader; outputId: int; targetPath: string): @@ -1084,7 +1096,10 @@ proc redirectToFile*(loader: FileLoader; outputId: int; targetPath: string): w.swrite(outputId) w.swrite(targetPath) var r = stream.initPacketReader() - r.sread(result) + var res: bool + r.sread(res) + stream.sclose() + return res proc onConnected*(loader: FileLoader; fd: int) = let connectData = loader.connecting[fd] @@ -1152,7 +1167,7 @@ proc onRead*(loader: FileLoader; fd: int) = if response.onFinish != nil: response.onFinish(response, true) response.onFinish = nil - response.unregisterFun() + response.close() proc onError*(loader: FileLoader; fd: int) = let response = loader.ongoing.getOrDefault(fd) @@ -1160,7 +1175,7 @@ proc onError*(loader: FileLoader; fd: int) = if response.onFinish != nil: response.onFinish(response, false) response.onFinish = nil - response.unregisterFun() + response.close() # Note: this blocks until headers are received. proc doRequest*(loader: FileLoader; request: Request): Response = @@ -1222,8 +1237,10 @@ proc addClient*(loader: FileLoader; key: ClientKey; pid: int; w.swrite(config) w.swrite(clonedFrom) var r = stream.initPacketReader() - r.sread(result) + var res: bool + r.sread(res) stream.sclose() + return res proc removeClient*(loader: FileLoader; pid: int) = let stream = loader.connect() diff --git a/src/loader/loaderhandle.nim b/src/loader/loaderhandle.nim index 11c33606..7637bdac 100644 --- a/src/loader/loaderhandle.nim +++ b/src/loader/loaderhandle.nim @@ -53,6 +53,7 @@ type cacheId*: int # if cached, our ID in a client cacheMap parser*: HeaderParser # only exists for CGI handles rstate: ResponseState # track response state + registered*: bool # track registered state when defined(debug): url*: URL @@ -175,6 +176,7 @@ proc sendData*(ps: PosixStream; buffer: LoaderBuffer; si = 0): int {.inline.} = proc iclose*(handle: LoaderHandle) = if handle.istream != nil: + assert not handle.registered if handle.rstate notin {rsBeforeResult, rsAfterFailure, rsAfterHeaders}: assert handle.outputs.len == 1 # not an ideal solution, but better than silently eating malformed diff --git a/src/local/pager.nim b/src/local/pager.nim index ebe48a61..c713d36a 100644 --- a/src/local/pager.nim +++ b/src/local/pager.nim @@ -515,8 +515,7 @@ proc loadCachedImage(pager: Pager; container: Container; image: PosBitmap; ) let r = pager.loader.fetch(request) response.resume() - response.unregisterFun() - response.body.sclose() + response.close() return r ).then(proc(res: JSResult[Response]) = if res.isNone: @@ -546,8 +545,7 @@ proc loadCachedImage(pager: Pager; container: Container; image: PosBitmap; ) let r = pager.loader.fetch(request) response.resume() - response.unregisterFun() - response.body.sclose() + response.close() r.then(proc(res: JSResult[Response]): Promise[JSResult[Blob]] = if res.isNone: let p = newPromise[JSResult[Blob]]() |