diff options
-rw-r--r-- | src/io/posixstream.nim | 9 | ||||
-rw-r--r-- | src/io/socketstream.nim | 6 | ||||
-rw-r--r-- | src/loader/loader.nim | 47 | ||||
-rw-r--r-- | src/loader/loaderhandle.nim | 66 |
4 files changed, 55 insertions, 73 deletions
diff --git a/src/io/posixstream.nim b/src/io/posixstream.nim index 66b2d0d9..0cc6ff73 100644 --- a/src/io/posixstream.nim +++ b/src/io/posixstream.nim @@ -4,7 +4,7 @@ import std/streams type PosixStream* = ref object of Stream - fd*: FileHandle + fd*: cint isend*: bool ErrorAgain* = object of IOError @@ -79,6 +79,13 @@ method sendData*(s: PosixStream, buffer: pointer, len: int): int {.base.} = raisePosixIOError() return n +method setBlocking*(s: PosixStream, blocking: bool) {.base.} = + let ofl = fcntl(s.fd, F_GETFL, 0) + if blocking: + discard fcntl(s.fd, F_SETFL, ofl and not O_NONBLOCK) + else: + discard fcntl(s.fd, F_SETFL, ofl or O_NONBLOCK) + proc psWriteData(s: Stream, buffer: pointer, len: int) = #TODO use sendData instead let s = cast[PosixStream](s) diff --git a/src/io/socketstream.nim b/src/io/socketstream.nim index 38c43a84..dd391f21 100644 --- a/src/io/socketstream.nim +++ b/src/io/socketstream.nim @@ -105,8 +105,8 @@ func newSocketStream*(): SocketStream = closeImpl: sockClose ) -proc setBlocking*(ss: SocketStream, blocking: bool) = - ss.source.getFd().setBlocking(blocking) +method setBlocking*(s: SocketStream, blocking: bool) = + s.source.getFd().setBlocking(blocking) # see serversocket.nim for an explanation {.compile: "connect_unix.c".} @@ -125,6 +125,7 @@ proc connectSocketStream*(path: string, buffered = true, blocking = true): cint(path.len)) != 0: raiseOSError(osLastError()) result.source = sock + result.fd = cint(sock.getFd()) proc connectSocketStream*(pid: Pid, buffered = true, blocking = true): SocketStream = @@ -141,3 +142,4 @@ proc acceptSocketStream*(ssock: ServerSocket, blocking = true): SocketStream = result.source = sock if not blocking: sock.getFd().setBlocking(false) + result.fd = cint(sock.getFd()) diff --git a/src/loader/loader.nim b/src/loader/loader.nim index 491ca095..82796e20 100644 --- a/src/loader/loader.nim +++ b/src/loader/loader.nim @@ -138,15 +138,18 @@ proc loadResource(ctx: LoaderContext, request: Request, handle: LoaderHandle) = if handle.istream == nil: handle.close() else: - let fd = handle.istream.fd - handle.setBlocking(false) - ctx.selector.registerHandle(fd, {Read}, 0) - ctx.selector.registerHandle(handle.fd, {Write}, 0) - let ofl = fcntl(fd, F_GETFL, 0) - discard fcntl(fd, F_SETFL, ofl or O_NONBLOCK) + handle.ostream.setBlocking(false) + ctx.selector.registerHandle(handle.istream.fd, {Read}, 0) + ctx.selector.registerHandle(handle.ostream.fd, {Write}, 0) + let ofl = fcntl(handle.istream.fd, F_GETFL, 0) + discard fcntl(handle.istream.fd, F_SETFL, ofl or O_NONBLOCK) # yes, this puts the istream fd in addition to the ostream fd in # handlemap to point to the same ref - ctx.handleMap[fd] = handle + ctx.handleMap[handle.istream.fd] = handle + # also put the new fd into handleMap if stream was redirected + if handle.sostream != nil: + ctx.handleMap[handle.ostream.fd] = handle + ctx.handleMap.del(handle.sostream.fd) else: prevurl = request.url case ctx.config.uriMethodMap.findAndRewrite(request.url) @@ -213,13 +216,19 @@ proc acceptConnection(ctx: LoaderContext) = stream.sread(fds) for fd in fds: ctx.handleMap.withValue(fd, handlep): - handlep[].suspend() + if handlep[].ostream != nil and handlep[].ostream.fd == fd: + # remove from the selector, so any new reads will be just placed + # in the handle's buffer + ctx.selector.unregister(fd) of RESUME: var fds: seq[int] stream.sread(fds) for fd in fds: ctx.handleMap.withValue(fd, handlep): - handlep[].resume() + if handlep[].ostream != nil and handlep[].ostream.fd == fd: + # place the stream back into the selector, so we can write to it + # again + ctx.selector.registerHandle(fd, {Write}, 0) of ADDREF: inc ctx.refcount of UNREF: @@ -282,7 +291,7 @@ proc runFileLoader*(fd: cint, config: LoaderConfig) = ctx.acceptConnection() else: let handle = ctx.handleMap[event.fd] - assert event.fd != handle.fd + assert event.fd == handle.istream.fd while true: let buffer = newLoaderBuffer() try: @@ -302,7 +311,7 @@ proc runFileLoader*(fd: cint, config: LoaderConfig) = break if Write in event.events: let handle = ctx.handleMap[event.fd] - assert event.fd == handle.fd + assert event.fd == handle.ostream.fd while handle.currentBuffer != nil: let buffer = handle.currentBuffer try: @@ -324,9 +333,10 @@ proc runFileLoader*(fd: cint, config: LoaderConfig) = if Error in event.events: assert event.fd != ctx.fd let handle = ctx.handleMap[event.fd] - if handle.fd == event.fd: # ostream died + if handle.ostream.fd == event.fd: # ostream died unregWrite.add(handle) else: # istream died + assert handle.istream.fd == event.fd unregRead.add(handle) # Unregister handles queued for unregistration. # It is possible for both unregRead and unregWrite to contain duplicates. To @@ -342,10 +352,19 @@ proc runFileLoader*(fd: cint, config: LoaderConfig) = unregWrite.add(handle) for handle in unregWrite: if handle.ostream != nil: - ctx.selector.unregister(handle.fd) - ctx.handleMap.del(handle.fd) + ctx.selector.unregister(handle.ostream.fd) + ctx.handleMap.del(handle.ostream.fd) handle.ostream.close() handle.ostream = nil + if handle.sostream != nil: + try: + handle.sostream.swrite(true) + except IOError: + # ignore error, that just means the buffer has already closed the + # stream + discard + handle.sostream.close() + handle.sostream = nil if handle.istream != nil: ctx.handleMap.del(handle.istream.fd) ctx.selector.unregister(handle.istream.fd) diff --git a/src/loader/loaderhandle.nim b/src/loader/loaderhandle.nim index 06149e23..15336964 100644 --- a/src/loader/loaderhandle.nim +++ b/src/loader/loaderhandle.nim @@ -2,7 +2,6 @@ import std/deques import std/net import std/streams -import io/multistream import io/posixstream import io/serialize import io/socketstream @@ -20,6 +19,8 @@ type LoaderBuffer* = ptr LoaderBufferObj + OutputHandle* = object + LoaderHandle* = ref object ostream*: PosixStream #TODO un-extern # Stream for taking input @@ -28,9 +29,7 @@ type # redirect the first handle and b) async redirects would result in race # conditions that would be difficult to untangle. canredir: bool - sostream: Stream # saved ostream when redirected - sostream_suspend: Stream # saved ostream when suspended - fd*: int # ostream fd + sostream*: PosixStream # saved ostream when redirected currentBuffer*: LoaderBuffer currentBufferIdx*: int buffers: Deque[LoaderBuffer] @@ -41,8 +40,7 @@ type proc newLoaderHandle*(ostream: PosixStream, canredir: bool): LoaderHandle = return LoaderHandle( ostream: ostream, - canredir: canredir, - fd: int(SocketStream(ostream).source.getFd()) + canredir: canredir ) func `[]`*(buffer: LoaderBuffer, i: int): var uint8 {.inline.} = @@ -78,30 +76,10 @@ proc bufferCleared*(handle: LoaderHandle) = handle.currentBuffer = nil proc addOutputStream*(handle: LoaderHandle, stream: Stream) = - if likely(handle.sostream_suspend != nil): - let ms = newMultiStream(handle.sostream_suspend, stream) - handle.sostream_suspend = ms - else: - # In buffer, addOutputStream is used as follows: - # * suspend handle - # * tee handle (-> call addOutputStream) - # * resume handle - # This means that this code path will never be executed, as - # sostream_suspend is never nil when the function is called. - # (Feel free to remove this assertion if this changes.) - doAssert false - #TODO TODO TODO fix this - #let ms = newMultiStream(handle.ostream, stream) - #handle.ostream = ms - -proc setBlocking*(handle: LoaderHandle, blocking: bool) = - #TODO this is stupid - if handle.sostream_suspend != nil and handle.sostream_suspend of SocketStream: - SocketStream(handle.sostream_suspend).setBlocking(blocking) - elif handle.sostream != nil and handle.sostream of SocketStream: - SocketStream(handle.sostream).setBlocking(blocking) - else: - SocketStream(handle.ostream).setBlocking(blocking) + doAssert false + #TODO TODO TODO fix this + #let ms = newMultiStream(handle.ostream, stream) + #handle.ostream = ms proc sendResult*(handle: LoaderHandle, res: int, msg = "") = handle.ostream.swrite(res) @@ -121,37 +99,13 @@ proc sendHeaders*(handle: LoaderHandle, headers: Headers) = if redir: let fd = SocketStream(handle.ostream).recvFileHandle() handle.sostream = handle.ostream - let stream = newPosixStream(fd) - handle.ostream = stream + handle.ostream = newPosixStream(fd) proc sendData*(handle: LoaderHandle, p: pointer, nmemb: int): int = return handle.ostream.sendData(p, nmemb) -proc suspend*(handle: LoaderHandle) = - #TODO TODO TODO fix suspend - doAssert false - handle.sostream_suspend = handle.ostream - #handle.ostream = newStringStream() - -proc resume*(handle: LoaderHandle) = - #TODO TODO TODO fix resume - doAssert false - #[ - let ss = handle.ostream - handle.ostream = handle.sostream_suspend - handle.sostream_suspend = nil - handle.sendData(ss.readAll()) - ss.close() - ]# - proc close*(handle: LoaderHandle) = - if handle.sostream != nil: - try: - handle.sostream.swrite(true) - except IOError: - # ignore error, that just means the buffer has already closed the stream - discard - handle.sostream.close() + assert handle.sostream == nil if handle.ostream != nil: handle.ostream.close() handle.ostream = nil |