diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/io/bufstream.nim | 54 | ||||
-rw-r--r-- | src/io/posixstream.nim | 5 | ||||
-rw-r--r-- | src/local/client.nim | 17 | ||||
-rw-r--r-- | src/local/container.nim | 13 | ||||
-rw-r--r-- | src/local/pager.nim | 6 | ||||
-rw-r--r-- | src/server/buffer.nim | 33 |
6 files changed, 104 insertions, 24 deletions
diff --git a/src/io/bufstream.nim b/src/io/bufstream.nim new file mode 100644 index 00000000..eb9d6d6b --- /dev/null +++ b/src/io/bufstream.nim @@ -0,0 +1,54 @@ +import io/posixstream + +type + BufStream* = ref object of PosixStream + source*: PosixStream + registerFun: proc(fd: int) + registered: bool + writeBuffer: string + +method recvData*(s: BufStream, buffer: pointer, len: int): int = + s.source.recvData(buffer, len) + +method sendData*(s: BufStream, buffer: pointer, len: int): int = + s.source.setBlocking(false) + block nobuf: + var n: int + if not s.registered: + try: + n = s.source.sendData(buffer, len) + if n == len: + break nobuf + except ErrorAgain: + discard + s.registerFun(s.source.fd) + s.registered = true + let olen = s.writeBuffer.len + s.writeBuffer.setLen(s.writeBuffer.len + len - n) + let buffer = cast[ptr UncheckedArray[uint8]](buffer) + copyMem(addr s.writeBuffer[olen], addr buffer[n], len - n) + s.source.setBlocking(true) + return len + +method sclose*(s: BufStream) = + s.source.sclose() + +proc flushWrite*(s: BufStream): bool = + s.source.setBlocking(false) + let n = s.source.sendData(s.writeBuffer) + s.source.setBlocking(true) + if n == s.writeBuffer.len: + s.writeBuffer = "" + s.registered = false + return true + s.writeBuffer = s.writeBuffer.substr(n) + return false + +proc newBufStream*(ps: PosixStream, registerFun: proc(fd: int)): BufStream = + result = BufStream( + fd: ps.fd, + source: ps, + blocking: ps.blocking, + registerFun: registerFun + ) + result.addStreamIface() diff --git a/src/io/posixstream.nim b/src/io/posixstream.nim index da1a8a62..80407b27 100644 --- a/src/io/posixstream.nim +++ b/src/io/posixstream.nim @@ -56,6 +56,9 @@ method sendData*(s: PosixStream, buffer: pointer, len: int): int {.base.} = raisePosixIOError() return n +proc sendData*(s: PosixStream, buffer: openArray[char]): int {.inline.} = + return s.sendData(unsafeAddr buffer[0], buffer.len) + method setBlocking*(s: PosixStream, blocking: bool) {.base.} = s.blocking = blocking let ofl = fcntl(s.fd, F_GETFL, 0) @@ -83,7 +86,7 @@ proc psReadData(s: Stream, buffer: pointer, len: int): int = proc psWriteData(s: Stream, buffer: pointer, len: int) = let s = PosixStream(s) - assert len != 0 and s.blocking + #TODO assert len != 0 and s.blocking discard s.sendData(buffer, len) proc psAtEnd(s: Stream): bool = diff --git a/src/local/client.nim b/src/local/client.nim index 76a61bfa..3a80b4bf 100644 --- a/src/local/client.nim +++ b/src/local/client.nim @@ -21,6 +21,7 @@ import display/term import html/chadombuilder import html/dom import html/event +import io/bufstream import io/posixstream import io/promise import io/socketstream @@ -41,6 +42,7 @@ import loader/loader import loader/request import local/container import local/pager +import server/buffer import server/forkserver import types/blob import types/cookie @@ -450,12 +452,15 @@ proc acceptBuffers(client: Client) = client.pager.procmap.del(pid) stream.close() var accepted: seq[Pid] + let registerFun = proc(fd: int) = + client.selector.unregister(fd) + client.selector.registerHandle(fd, {Read, Write}, 0) for pid, container in client.pager.procmap: let stream = connectSocketStream(pid, buffered = false, blocking = true) if stream == nil: client.pager.alert("Error: failed to set up buffer") continue - container.setStream(stream) + container.setStream(stream, registerFun) let fd = int(stream.fd) client.fdmap[fd] = container client.selector.registerHandle(fd, {Read}, 0) @@ -513,6 +518,12 @@ proc handleRead(client: Client, fd: int) = let container = client.fdmap[fd] client.pager.handleEvent(container) +proc handleWrite(client: Client, fd: int) = + let container = client.fdmap[fd] + if container.iface.stream.flushWrite(): + client.selector.unregister(fd) + client.selector.registerHandle(fd, {Read}, 0) + proc flushConsole*(client: Client) {.jsfunc.} = if client.console == nil: # hack for when client crashes before console has been initialized @@ -561,6 +572,8 @@ proc inputLoop(client: Client) = for event in events: if Read in event.events: client.handleRead(event.fd) + if Write in event.events: + client.handleWrite(event.fd) if Error in event.events: client.handleError(event.fd) if Signal in event.events: @@ -597,6 +610,8 @@ proc headlessLoop(client: Client) = for event in events: if Read in event.events: client.handleRead(event.fd) + if Write in event.events: + client.handleWrite(event.fd) if Error in event.events: client.handleError(event.fd) if selectors.Event.Timer in event.events: diff --git a/src/local/container.nim b/src/local/container.nim index 03cad675..5d5e6669 100644 --- a/src/local/container.nim +++ b/src/local/container.nim @@ -1,6 +1,5 @@ import std/deques import std/options -import std/streams import std/unicode when defined(posix): @@ -11,6 +10,7 @@ import display/term import extern/stdio import io/promise import io/serialize +import io/socketstream import js/javascript import js/jstypes import js/regex @@ -144,8 +144,8 @@ jsDestructor(Highlight) jsDestructor(Container) proc newBuffer*(forkserver: ForkServer, config: BufferConfig, - request: Request, attrs: WindowAttributes, title = "", - redirectdepth = 0, canreinterpret = true, fd = FileHandle(-1), + request: Request, attrs: WindowAttributes, title: string, + redirectdepth: int, canreinterpret: bool, fd: FileHandle, contentType: Option[string]): Container = let (process, loaderPid) = forkserver.forkBuffer(request, config, attrs) if fd != -1: @@ -1575,12 +1575,13 @@ proc handleCommand(container: Container) = container.iface.stream.sread(packetid) container.iface.resolve(packetid, len - slen(packetid)) -proc setStream*(container: Container, stream: Stream) = +proc setStream*(container: Container, stream: SocketStream, + registerFun: proc(fd: int)) = if not container.cloned: - container.iface = newBufferInterface(stream) + container.iface = newBufferInterface(stream, registerFun) container.load() else: - container.iface = cloneInterface(stream) + container.iface = cloneInterface(stream, registerFun) # Maybe we have to resume loading. Let's try. discard container.iface.load().then(proc(res: int) = container.onload(res) diff --git a/src/local/pager.nim b/src/local/pager.nim index 1040c259..d8f7c5dd 100644 --- a/src/local/pager.nim +++ b/src/local/pager.nim @@ -21,8 +21,8 @@ import extern/editor import extern/runproc import extern/stdio import extern/tempfile +import io/posixstream import io/promise -import io/socketstream import js/error import js/javascript import js/jstypes @@ -86,7 +86,7 @@ type statusgrid*: FixedGrid term*: Terminal tmpdir: string - unreg*: seq[(Pid, SocketStream)] + unreg*: seq[(Pid, PosixStream)] urimethodmap: URIMethodMap username: string @@ -583,7 +583,7 @@ proc deleteContainer(pager: Pager, container: Container) = pager.setContainer(nil) container.parent = nil container.children.setLen(0) - pager.unreg.add((container.process, SocketStream(container.iface.stream))) + pager.unreg.add((container.process, container.iface.stream)) pager.forkserver.removeChild(container.process) proc discardBuffer(pager: Pager, container = none(Container)) {.jsfunc.} = diff --git a/src/server/buffer.nim b/src/server/buffer.nim index 26ce08db..c0efeb65 100644 --- a/src/server/buffer.nim +++ b/src/server/buffer.nim @@ -26,6 +26,7 @@ import html/dom import html/enums import html/env import html/event +import io/bufstream import io/posixstream import io/promise import io/serialize @@ -127,26 +128,29 @@ type map: PromiseMap packetid: int opaque: InterfaceOpaque - stream*: Stream + stream*: BufStream proc getFromOpaque[T](opaque: pointer, res: var T) = let opaque = cast[InterfaceOpaque](opaque) if opaque.len != 0: opaque.stream.sread(res) -proc newBufferInterface*(stream: Stream): BufferInterface = +proc newBufferInterface*(stream: SocketStream, registerFun: proc(fd: int)): + BufferInterface = let opaque = InterfaceOpaque(stream: stream) result = BufferInterface( map: newPromiseMap(cast[pointer](opaque)), packetid: 1, # ids below 1 are invalid opaque: opaque, - stream: stream + stream: newBufStream(stream, registerFun) ) # After cloning a buffer, we need a new interface to the new buffer process. # Here we create a new interface for that clone. -proc cloneInterface*(stream: Stream): BufferInterface = - let iface = newBufferInterface(stream) +proc cloneInterface*(stream: SocketStream, registerFun: proc(fd: int)): + BufferInterface = + let iface = newBufferInterface(stream, registerFun) + #TODO buffered data should probably be copied here # We have just fork'ed the buffer process inside an interface function, # from which the new buffer is going to return as well. So we must also # consume the return value of the clone function, which is the pid 0. @@ -180,7 +184,8 @@ proc buildInterfaceProc(fun: NimNode, funid: string): tuple[fun, name: NimNode] let thisval = this2[0] body.add(quote do: `thisval`.stream.swrite(BufferCommand.`nup`) - `thisval`.stream.swrite(`thisval`.packetid)) + `thisval`.stream.swrite(`thisval`.packetid) + ) var params2: seq[NimNode] var retval2: NimNode var addfun: NimNode @@ -196,6 +201,7 @@ proc buildInterfaceProc(fun: NimNode, funid: string): tuple[fun, name: NimNode] retval) params2.add(retval2) params2.add(this2) + # flatten args for i in 2 ..< params.len: let param = params[i] for i in 0 ..< param.len - 2: @@ -205,15 +211,16 @@ proc buildInterfaceProc(fun: NimNode, funid: string): tuple[fun, name: NimNode] let s = params2[i][0] # sym e.g. url body.add(quote do: when typeof(`s`) is FileHandle: - SocketStream(`thisval`.stream).sendFileHandle(`s`) + #TODO flush or something + SocketStream(`thisval`.stream.source).sendFileHandle(`s`) else: - `thisval`.stream.swrite(`s`)) - body.add(quote do: - `thisval`.stream.flush()) + `thisval`.stream.swrite(`s`) + ) body.add(quote do: let promise = `addfun` inc `thisval`.packetid - return promise) + return promise + ) var pragmas: NimNode if retval.kind == nnkEmpty: pragmas = newNimNode(nnkPragma).add(ident("discardable")) @@ -1749,14 +1756,14 @@ macro bufferDispatcher(funs: static ProxyMap, buffer: Buffer, let len = slen(`packetid`) buffer.pstream.swrite(len) buffer.pstream.swrite(`packetid`) - buffer.pstream.flush()) + ) else: resolve.add(quote do: let len = slen(`packetid`) + slen(`rval`) buffer.pstream.swrite(len) buffer.pstream.swrite(`packetid`) buffer.pstream.swrite(`rval`) - buffer.pstream.flush()) + ) if v.istask: let en = v.ename stmts.add(quote do: |