about summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--src/loader/loader.nim75
-rw-r--r--src/loader/loaderhandle.nim1
2 files changed, 60 insertions, 16 deletions
diff --git a/src/loader/loader.nim b/src/loader/loader.nim
index f2d38d61..712db932 100644
--- a/src/loader/loader.nim
+++ b/src/loader/loader.nim
@@ -171,12 +171,8 @@ proc pushBuffer(ctx: LoaderContext, output: OutputHandle, buffer: LoaderBuffer):
     output.addBuffer(buffer)
   return pbrDone
 
-proc addFd(ctx: LoaderContext, handle: LoaderHandle, originalUrl: URL) =
+proc addFd0(ctx: LoaderContext, handle: LoaderHandle, originalUrl: URL) =
   let output = handle.output
-  output.ostream.setBlocking(false)
-  handle.istream.setBlocking(false)
-  ctx.selector.registerHandle(handle.istream.fd, {Read}, 0)
-  ctx.handleMap[handle.istream.fd] = handle
   if output.sostream != nil:
     # replace the fd with the new one in outputMap if stream was
     # redirected
@@ -193,13 +189,59 @@ proc addFd(ctx: LoaderContext, handle: LoaderHandle, originalUrl: URL) =
       ctx.cacheMap[surl] = tmpf
       handle.cacheUrl = surl
 
-proc loadStream(ctx: LoaderContext, handle: LoaderHandle, request: Request) =
+proc addFd(ctx: LoaderContext, handle: LoaderHandle, originalUrl: URL) =
+  handle.output.ostream.setBlocking(false)
+  handle.istream.setBlocking(false)
+  ctx.selector.registerHandle(handle.istream.fd, {Read}, 0)
+  ctx.handleMap[handle.istream.fd] = handle
+  ctx.addFd0(handle, originalUrl)
+
+proc loadStream(ctx: LoaderContext, handle: LoaderHandle, request: Request,
+    originalUrl: URL) =
   ctx.passedFdMap.withValue(request.url.host, fdp):
     handle.sendResult(0)
     handle.sendStatus(200)
     handle.sendHeaders(newHeaders())
-    handle.istream = newPosixStream(fdp[])
+    let ps = newPosixStream(fdp[])
+    var stats: Stat
+    doAssert fstat(fdp[], stats) != -1
+    handle.istream = ps
     ctx.passedFdMap.del(request.url.host)
+    if S_ISREG(stats.st_mode):
+      # stdin is a regular file, so we can't select on it.
+      let originalUrl = if handle.cached: originalUrl else: nil
+      let output = handle.output
+      output.ostream.setBlocking(false)
+      ctx.addFd0(handle, originalUrl)
+      var nn = 0
+      while true:
+        let buffer = newLoaderBuffer()
+        let n = ps.recvData(buffer)
+        nn += n
+        if n == 0:
+          break
+        var unregWrite: seq[OutputHandle] = @[]
+        for output in handle.outputs:
+          if ctx.pushBuffer(output, buffer) == pbrUnregister:
+            unregWrite.add(output)
+        for output in unregWrite:
+          output.parent = nil
+          let i = handle.outputs.find(output)
+          if output.registered:
+            ctx.selector.unregister(output.ostream.fd)
+            output.registered = false
+          handle.outputs.del(i)
+        if handle.outputs.len == 0:
+          # original output died and so did the cache file. (or we didn't have a
+          # cache file in the first place)
+          break
+        if n < buffer.cap:
+          break
+      for output in handle.outputs:
+        output.parent = nil
+      handle.outputs.setLen(0)
+      handle.istream.close()
+      handle.istream = nil
   do:
     handle.sendResult(ERROR_FILE_NOT_FOUND, "stream not found")
 
@@ -228,7 +270,7 @@ proc loadResource(ctx: LoaderContext, request: Request, handle: LoaderHandle) =
       else:
         handle.close()
     elif request.url.scheme == "stream":
-      ctx.loadStream(handle, request)
+      ctx.loadStream(handle, request, originalUrl)
       if handle.istream != nil:
         let originalUrl = if handle.cached: originalUrl else: nil
         ctx.addFd(handle, originalUrl)
@@ -495,14 +537,15 @@ proc finishCycle(ctx: LoaderContext, unregRead: var seq[LoaderHandle],
       output.ostream.close()
       output.ostream = nil
       let handle = output.parent
-      let i = handle.outputs.find(output)
-      handle.outputs.del(i)
-      if handle.outputs.len == 0 and handle.istream != nil:
-        # premature end of all output streams; kill istream too
-        ctx.selector.unregister(handle.istream.fd)
-        ctx.handleMap.del(handle.istream.fd)
-        handle.istream.close()
-        handle.istream = nil
+      if handle != nil: # may be nil if from loadStream S_ISREG
+        let i = handle.outputs.find(output)
+        handle.outputs.del(i)
+        if handle.outputs.len == 0 and handle.istream != nil:
+          # premature end of all output streams; kill istream too
+          ctx.selector.unregister(handle.istream.fd)
+          ctx.handleMap.del(handle.istream.fd)
+          handle.istream.close()
+          handle.istream = nil
     if output.sostream != nil:
       #TODO it is not clear what should happen when multiple outputs exist.
       #
diff --git a/src/loader/loaderhandle.nim b/src/loader/loaderhandle.nim
index b0c5747e..11a97bf4 100644
--- a/src/loader/loaderhandle.nim
+++ b/src/loader/loaderhandle.nim
@@ -149,6 +149,7 @@ proc sendData*(ps: PosixStream, buffer: LoaderBuffer, si = 0): int {.inline.} =
 
 proc close*(handle: LoaderHandle) =
   for output in handle.outputs:
+    #TODO assert not output.registered
     assert output.sostream == nil
     if output.ostream != nil:
       output.ostream.close()