about summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--src/io/dynstream.nim9
-rw-r--r--src/loader/loader.nim31
-rw-r--r--src/loader/loaderhandle.nim2
-rw-r--r--src/local/pager.nim6
4 files changed, 37 insertions, 11 deletions
diff --git a/src/io/dynstream.nim b/src/io/dynstream.nim
index 9e23a8ae..2bfa02f5 100644
--- a/src/io/dynstream.nim
+++ b/src/io/dynstream.nim
@@ -9,6 +9,7 @@ type
   DynStream* = ref object of RootObj
     isend*: bool
     blocking*: bool #TODO move to posixstream
+    closed: bool
 
 # Semantics of this function are those of POSIX read(2): that is, it may return
 # a result that is lower than `len`, and that does not mean the stream is
@@ -155,7 +156,9 @@ method seek*(s: PosixStream; off: int) =
     raisePosixIOError()
 
 method sclose*(s: PosixStream) =
+  assert not s.closed
   discard close(s.fd)
+  s.closed = true
 
 proc newPosixStream*(fd: FileHandle): PosixStream =
   return PosixStream(fd: fd, blocking: true)
@@ -214,7 +217,9 @@ method seek*(s: SocketStream; off: int) =
   doAssert false
 
 method sclose*(s: SocketStream) =
+  assert not s.closed
   s.source.close()
+  s.closed = true
 
 # see serversocket.nim for an explanation
 {.compile: "connect_unix.c".}
@@ -301,7 +306,9 @@ method sendData*(s: BufStream; buffer: pointer; len: int): int =
   return len
 
 method sclose*(s: BufStream) =
+  assert not s.closed
   s.source.sclose()
+  s.closed = true
 
 proc flushWrite*(s: BufStream): bool =
   s.source.setBlocking(false)
@@ -340,7 +347,9 @@ method seek*(s: DynFileStream; off: int) =
   s.file.setFilePos(int64(off))
 
 method sclose*(s: DynFileStream) =
+  assert not s.closed
   s.file.close()
+  s.closed = true
 
 method sflush*(s: DynFileStream) =
   s.file.flushFile()
diff --git a/src/loader/loader.nim b/src/loader/loader.nim
index e3c36b01..01593a00 100644
--- a/src/loader/loader.nim
+++ b/src/loader/loader.nim
@@ -181,6 +181,16 @@ func findCachedHandle(ctx: LoaderContext; cacheId: int): LoaderHandle =
 type PushBufferResult = enum
   pbrDone, pbrUnregister
 
+proc register(ctx: LoaderContext; handle: LoaderHandle) =
+  assert not handle.registered
+  ctx.selector.registerHandle(int(handle.istream.fd), {Read}, 0)
+  handle.registered = true
+
+proc unregister(ctx: LoaderContext; handle: LoaderHandle) =
+  assert handle.registered
+  ctx.selector.unregister(int(handle.istream.fd))
+  handle.registered = false
+
 proc register(ctx: LoaderContext; output: OutputHandle) =
   assert not output.registered
   ctx.selector.registerHandle(int(output.ostream.fd), {Write}, 0)
@@ -278,7 +288,7 @@ proc addFd(ctx: LoaderContext; handle: LoaderHandle) =
   let output = handle.output
   output.ostream.setBlocking(false)
   handle.istream.setBlocking(false)
-  ctx.selector.registerHandle(int(handle.istream.fd), {Read}, 0)
+  ctx.register(handle)
   assert handle.istream.fd notin ctx.handleMap
   assert output.ostream.fd notin ctx.outputMap
   ctx.handleMap[handle.istream.fd] = handle
@@ -894,7 +904,7 @@ proc finishCycle(ctx: LoaderContext; unregRead: var seq[LoaderHandle];
   # unregistered handles to nil.
   for handle in unregRead:
     if handle.istream != nil:
-      ctx.selector.unregister(int(handle.istream.fd))
+      ctx.unregister(handle)
       ctx.handleMap.del(handle.istream.fd)
       if handle.parser != nil:
         handle.finishParse()
@@ -915,7 +925,7 @@ proc finishCycle(ctx: LoaderContext; unregRead: var seq[LoaderHandle];
         handle.outputs.del(i)
         if handle.outputs.len == 0 and handle.istream != nil:
           # premature end of all output streams; kill istream too
-          ctx.selector.unregister(int(handle.istream.fd))
+          ctx.unregister(handle)
           ctx.handleMap.del(handle.istream.fd)
           if handle.parser != nil:
             handle.finishParse()
@@ -1060,6 +1070,7 @@ proc addCacheFile*(loader: FileLoader; outputId, targetPid: int): int =
   var r = stream.initPacketReader()
   var outputId: int
   r.sread(outputId)
+  stream.sclose()
   return outputId
 
 proc getCacheFile*(loader: FileLoader; cacheId: int): string =
@@ -1072,6 +1083,7 @@ proc getCacheFile*(loader: FileLoader; cacheId: int): string =
   var r = stream.initPacketReader()
   var s: string
   r.sread(s)
+  stream.sclose()
   return s
 
 proc redirectToFile*(loader: FileLoader; outputId: int; targetPath: string):
@@ -1084,7 +1096,10 @@ proc redirectToFile*(loader: FileLoader; outputId: int; targetPath: string):
     w.swrite(outputId)
     w.swrite(targetPath)
   var r = stream.initPacketReader()
-  r.sread(result)
+  var res: bool
+  r.sread(res)
+  stream.sclose()
+  return res
 
 proc onConnected*(loader: FileLoader; fd: int) =
   let connectData = loader.connecting[fd]
@@ -1152,7 +1167,7 @@ proc onRead*(loader: FileLoader; fd: int) =
       if response.onFinish != nil:
         response.onFinish(response, true)
       response.onFinish = nil
-      response.unregisterFun()
+      response.close()
 
 proc onError*(loader: FileLoader; fd: int) =
   let response = loader.ongoing.getOrDefault(fd)
@@ -1160,7 +1175,7 @@ proc onError*(loader: FileLoader; fd: int) =
     if response.onFinish != nil:
       response.onFinish(response, false)
     response.onFinish = nil
-    response.unregisterFun()
+    response.close()
 
 # Note: this blocks until headers are received.
 proc doRequest*(loader: FileLoader; request: Request): Response =
@@ -1222,8 +1237,10 @@ proc addClient*(loader: FileLoader; key: ClientKey; pid: int;
     w.swrite(config)
     w.swrite(clonedFrom)
   var r = stream.initPacketReader()
-  r.sread(result)
+  var res: bool
+  r.sread(res)
   stream.sclose()
+  return res
 
 proc removeClient*(loader: FileLoader; pid: int) =
   let stream = loader.connect()
diff --git a/src/loader/loaderhandle.nim b/src/loader/loaderhandle.nim
index 11c33606..7637bdac 100644
--- a/src/loader/loaderhandle.nim
+++ b/src/loader/loaderhandle.nim
@@ -53,6 +53,7 @@ type
     cacheId*: int # if cached, our ID in a client cacheMap
     parser*: HeaderParser # only exists for CGI handles
     rstate: ResponseState # track response state
+    registered*: bool # track registered state
     when defined(debug):
       url*: URL
 
@@ -175,6 +176,7 @@ proc sendData*(ps: PosixStream; buffer: LoaderBuffer; si = 0): int {.inline.} =
 
 proc iclose*(handle: LoaderHandle) =
   if handle.istream != nil:
+    assert not handle.registered
     if handle.rstate notin {rsBeforeResult, rsAfterFailure, rsAfterHeaders}:
       assert handle.outputs.len == 1
       # not an ideal solution, but better than silently eating malformed
diff --git a/src/local/pager.nim b/src/local/pager.nim
index ebe48a61..c713d36a 100644
--- a/src/local/pager.nim
+++ b/src/local/pager.nim
@@ -515,8 +515,7 @@ proc loadCachedImage(pager: Pager; container: Container; image: PosBitmap;
     )
     let r = pager.loader.fetch(request)
     response.resume()
-    response.unregisterFun()
-    response.body.sclose()
+    response.close()
     return r
   ).then(proc(res: JSResult[Response]) =
     if res.isNone:
@@ -546,8 +545,7 @@ proc loadCachedImage(pager: Pager; container: Container; image: PosBitmap;
     )
     let r = pager.loader.fetch(request)
     response.resume()
-    response.unregisterFun()
-    response.body.sclose()
+    response.close()
     r.then(proc(res: JSResult[Response]): Promise[JSResult[Blob]] =
       if res.isNone:
         let p = newPromise[JSResult[Blob]]()