about summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--src/io/posixstream.nim9
-rw-r--r--src/io/socketstream.nim6
-rw-r--r--src/loader/loader.nim47
-rw-r--r--src/loader/loaderhandle.nim66
4 files changed, 55 insertions, 73 deletions
diff --git a/src/io/posixstream.nim b/src/io/posixstream.nim
index 66b2d0d9..0cc6ff73 100644
--- a/src/io/posixstream.nim
+++ b/src/io/posixstream.nim
@@ -4,7 +4,7 @@ import std/streams
 
 type
   PosixStream* = ref object of Stream
-    fd*: FileHandle
+    fd*: cint
     isend*: bool
 
   ErrorAgain* = object of IOError
@@ -79,6 +79,13 @@ method sendData*(s: PosixStream, buffer: pointer, len: int): int {.base.} =
     raisePosixIOError()
   return n
 
+method setBlocking*(s: PosixStream, blocking: bool) {.base.} =
+  let ofl = fcntl(s.fd, F_GETFL, 0)
+  if blocking:
+    discard fcntl(s.fd, F_SETFL, ofl and not O_NONBLOCK)
+  else:
+    discard fcntl(s.fd, F_SETFL, ofl or O_NONBLOCK)
+
 proc psWriteData(s: Stream, buffer: pointer, len: int) =
   #TODO use sendData instead
   let s = cast[PosixStream](s)
diff --git a/src/io/socketstream.nim b/src/io/socketstream.nim
index 38c43a84..dd391f21 100644
--- a/src/io/socketstream.nim
+++ b/src/io/socketstream.nim
@@ -105,8 +105,8 @@ func newSocketStream*(): SocketStream =
     closeImpl: sockClose
   )
 
-proc setBlocking*(ss: SocketStream, blocking: bool) =
-  ss.source.getFd().setBlocking(blocking)
+method setBlocking*(s: SocketStream, blocking: bool) =
+  s.source.getFd().setBlocking(blocking)
 
 # see serversocket.nim for an explanation
 {.compile: "connect_unix.c".}
@@ -125,6 +125,7 @@ proc connectSocketStream*(path: string, buffered = true, blocking = true):
       cint(path.len)) != 0:
     raiseOSError(osLastError())
   result.source = sock
+  result.fd = cint(sock.getFd())
 
 proc connectSocketStream*(pid: Pid, buffered = true, blocking = true):
     SocketStream =
@@ -141,3 +142,4 @@ proc acceptSocketStream*(ssock: ServerSocket, blocking = true): SocketStream =
   result.source = sock
   if not blocking:
     sock.getFd().setBlocking(false)
+  result.fd = cint(sock.getFd())
diff --git a/src/loader/loader.nim b/src/loader/loader.nim
index 491ca095..82796e20 100644
--- a/src/loader/loader.nim
+++ b/src/loader/loader.nim
@@ -138,15 +138,18 @@ proc loadResource(ctx: LoaderContext, request: Request, handle: LoaderHandle) =
       if handle.istream == nil:
         handle.close()
       else:
-        let fd = handle.istream.fd
-        handle.setBlocking(false)
-        ctx.selector.registerHandle(fd, {Read}, 0)
-        ctx.selector.registerHandle(handle.fd, {Write}, 0)
-        let ofl = fcntl(fd, F_GETFL, 0)
-        discard fcntl(fd, F_SETFL, ofl or O_NONBLOCK)
+        handle.ostream.setBlocking(false)
+        ctx.selector.registerHandle(handle.istream.fd, {Read}, 0)
+        ctx.selector.registerHandle(handle.ostream.fd, {Write}, 0)
+        let ofl = fcntl(handle.istream.fd, F_GETFL, 0)
+        discard fcntl(handle.istream.fd, F_SETFL, ofl or O_NONBLOCK)
         # yes, this puts the istream fd in addition to the ostream fd in
         # handlemap to point to the same ref
-        ctx.handleMap[fd] = handle
+        ctx.handleMap[handle.istream.fd] = handle
+        # also put the new fd into handleMap if stream was redirected
+        if handle.sostream != nil:
+          ctx.handleMap[handle.ostream.fd] = handle
+          ctx.handleMap.del(handle.sostream.fd)
     else:
       prevurl = request.url
       case ctx.config.uriMethodMap.findAndRewrite(request.url)
@@ -213,13 +216,19 @@ proc acceptConnection(ctx: LoaderContext) =
       stream.sread(fds)
       for fd in fds:
         ctx.handleMap.withValue(fd, handlep):
-          handlep[].suspend()
+          if handlep[].ostream != nil and handlep[].ostream.fd == fd:
+            # remove from the selector, so any new reads will be just placed
+            # in the handle's buffer
+            ctx.selector.unregister(fd)
     of RESUME:
       var fds: seq[int]
       stream.sread(fds)
       for fd in fds:
         ctx.handleMap.withValue(fd, handlep):
-          handlep[].resume()
+          if handlep[].ostream != nil and handlep[].ostream.fd == fd:
+            # place the stream back into the selector, so we can write to it
+            # again
+            ctx.selector.registerHandle(fd, {Write}, 0)
     of ADDREF:
       inc ctx.refcount
     of UNREF:
@@ -282,7 +291,7 @@ proc runFileLoader*(fd: cint, config: LoaderConfig) =
           ctx.acceptConnection()
         else:
           let handle = ctx.handleMap[event.fd]
-          assert event.fd != handle.fd
+          assert event.fd == handle.istream.fd
           while true:
             let buffer = newLoaderBuffer()
             try:
@@ -302,7 +311,7 @@ proc runFileLoader*(fd: cint, config: LoaderConfig) =
               break
       if Write in event.events:
         let handle = ctx.handleMap[event.fd]
-        assert event.fd == handle.fd
+        assert event.fd == handle.ostream.fd
         while handle.currentBuffer != nil:
           let buffer = handle.currentBuffer
           try:
@@ -324,9 +333,10 @@ proc runFileLoader*(fd: cint, config: LoaderConfig) =
       if Error in event.events:
         assert event.fd != ctx.fd
         let handle = ctx.handleMap[event.fd]
-        if handle.fd == event.fd: # ostream died
+        if handle.ostream.fd == event.fd: # ostream died
           unregWrite.add(handle)
         else: # istream died
+          assert handle.istream.fd == event.fd
           unregRead.add(handle)
     # Unregister handles queued for unregistration.
     # It is possible for both unregRead and unregWrite to contain duplicates. To
@@ -342,10 +352,19 @@ proc runFileLoader*(fd: cint, config: LoaderConfig) =
           unregWrite.add(handle)
     for handle in unregWrite:
       if handle.ostream != nil:
-        ctx.selector.unregister(handle.fd)
-        ctx.handleMap.del(handle.fd)
+        ctx.selector.unregister(handle.ostream.fd)
+        ctx.handleMap.del(handle.ostream.fd)
         handle.ostream.close()
         handle.ostream = nil
+      if handle.sostream != nil:
+        try:
+          handle.sostream.swrite(true)
+        except IOError:
+          # ignore error, that just means the buffer has already closed the
+          # stream
+          discard
+        handle.sostream.close()
+        handle.sostream = nil
       if handle.istream != nil:
         ctx.handleMap.del(handle.istream.fd)
         ctx.selector.unregister(handle.istream.fd)
diff --git a/src/loader/loaderhandle.nim b/src/loader/loaderhandle.nim
index 06149e23..15336964 100644
--- a/src/loader/loaderhandle.nim
+++ b/src/loader/loaderhandle.nim
@@ -2,7 +2,6 @@ import std/deques
 import std/net
 import std/streams
 
-import io/multistream
 import io/posixstream
 import io/serialize
 import io/socketstream
@@ -20,6 +19,8 @@ type
 
   LoaderBuffer* = ptr LoaderBufferObj
 
+  OutputHandle* = object
+
   LoaderHandle* = ref object
     ostream*: PosixStream #TODO un-extern
     # Stream for taking input
@@ -28,9 +29,7 @@ type
     # redirect the first handle and b) async redirects would result in race
     # conditions that would be difficult to untangle.
     canredir: bool
-    sostream: Stream # saved ostream when redirected
-    sostream_suspend: Stream # saved ostream when suspended
-    fd*: int # ostream fd
+    sostream*: PosixStream # saved ostream when redirected
     currentBuffer*: LoaderBuffer
     currentBufferIdx*: int
     buffers: Deque[LoaderBuffer]
@@ -41,8 +40,7 @@ type
 proc newLoaderHandle*(ostream: PosixStream, canredir: bool): LoaderHandle =
   return LoaderHandle(
     ostream: ostream,
-    canredir: canredir,
-    fd: int(SocketStream(ostream).source.getFd())
+    canredir: canredir
   )
 
 func `[]`*(buffer: LoaderBuffer, i: int): var uint8 {.inline.} =
@@ -78,30 +76,10 @@ proc bufferCleared*(handle: LoaderHandle) =
     handle.currentBuffer = nil
 
 proc addOutputStream*(handle: LoaderHandle, stream: Stream) =
-  if likely(handle.sostream_suspend != nil):
-    let ms = newMultiStream(handle.sostream_suspend, stream)
-    handle.sostream_suspend = ms
-  else:
-    # In buffer, addOutputStream is used as follows:
-    # * suspend handle
-    # * tee handle (-> call addOutputStream)
-    # * resume handle
-    # This means that this code path will never be executed, as
-    # sostream_suspend is never nil when the function is called.
-    # (Feel free to remove this assertion if this changes.)
-    doAssert false
-    #TODO TODO TODO fix this
-    #let ms = newMultiStream(handle.ostream, stream)
-    #handle.ostream = ms
-
-proc setBlocking*(handle: LoaderHandle, blocking: bool) =
-  #TODO this is stupid
-  if handle.sostream_suspend != nil and handle.sostream_suspend of SocketStream:
-    SocketStream(handle.sostream_suspend).setBlocking(blocking)
-  elif handle.sostream != nil and handle.sostream of SocketStream:
-    SocketStream(handle.sostream).setBlocking(blocking)
-  else:
-    SocketStream(handle.ostream).setBlocking(blocking)
+  doAssert false
+  #TODO TODO TODO fix this
+  #let ms = newMultiStream(handle.ostream, stream)
+  #handle.ostream = ms
 
 proc sendResult*(handle: LoaderHandle, res: int, msg = "") =
   handle.ostream.swrite(res)
@@ -121,37 +99,13 @@ proc sendHeaders*(handle: LoaderHandle, headers: Headers) =
     if redir:
       let fd = SocketStream(handle.ostream).recvFileHandle()
       handle.sostream = handle.ostream
-      let stream = newPosixStream(fd)
-      handle.ostream = stream
+      handle.ostream = newPosixStream(fd)
 
 proc sendData*(handle: LoaderHandle, p: pointer, nmemb: int): int =
   return handle.ostream.sendData(p, nmemb)
 
-proc suspend*(handle: LoaderHandle) =
-  #TODO TODO TODO fix suspend
-  doAssert false
-  handle.sostream_suspend = handle.ostream
-  #handle.ostream = newStringStream()
-
-proc resume*(handle: LoaderHandle) =
-  #TODO TODO TODO fix resume
-  doAssert false
-  #[
-  let ss = handle.ostream
-  handle.ostream = handle.sostream_suspend
-  handle.sostream_suspend = nil
-  handle.sendData(ss.readAll())
-  ss.close()
-  ]#
-
 proc close*(handle: LoaderHandle) =
-  if handle.sostream != nil:
-    try:
-      handle.sostream.swrite(true)
-    except IOError:
-      # ignore error, that just means the buffer has already closed the stream
-      discard
-    handle.sostream.close()
+  assert handle.sostream == nil
   if handle.ostream != nil:
     handle.ostream.close()
     handle.ostream = nil