about summary refs log tree commit diff stats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/io/bufstream.nim54
-rw-r--r--src/io/posixstream.nim5
-rw-r--r--src/local/client.nim17
-rw-r--r--src/local/container.nim13
-rw-r--r--src/local/pager.nim6
-rw-r--r--src/server/buffer.nim33
6 files changed, 104 insertions, 24 deletions
diff --git a/src/io/bufstream.nim b/src/io/bufstream.nim
new file mode 100644
index 00000000..eb9d6d6b
--- /dev/null
+++ b/src/io/bufstream.nim
@@ -0,0 +1,54 @@
+import io/posixstream
+
+type
+  BufStream* = ref object of PosixStream
+    source*: PosixStream
+    registerFun: proc(fd: int)
+    registered: bool
+    writeBuffer: string
+
+method recvData*(s: BufStream, buffer: pointer, len: int): int =
+  s.source.recvData(buffer, len)
+
+method sendData*(s: BufStream, buffer: pointer, len: int): int =
+  s.source.setBlocking(false)
+  block nobuf:
+    var n: int
+    if not s.registered:
+      try:
+        n = s.source.sendData(buffer, len)
+        if n == len:
+          break nobuf
+      except ErrorAgain:
+        discard
+      s.registerFun(s.source.fd)
+      s.registered = true
+    let olen = s.writeBuffer.len
+    s.writeBuffer.setLen(s.writeBuffer.len + len - n)
+    let buffer = cast[ptr UncheckedArray[uint8]](buffer)
+    copyMem(addr s.writeBuffer[olen], addr buffer[n], len - n)
+  s.source.setBlocking(true)
+  return len
+
+method sclose*(s: BufStream) =
+  s.source.sclose()
+
+proc flushWrite*(s: BufStream): bool =
+  s.source.setBlocking(false)
+  let n = s.source.sendData(s.writeBuffer)
+  s.source.setBlocking(true)
+  if n == s.writeBuffer.len:
+    s.writeBuffer = ""
+    s.registered = false
+    return true
+  s.writeBuffer = s.writeBuffer.substr(n)
+  return false
+
+proc newBufStream*(ps: PosixStream, registerFun: proc(fd: int)): BufStream =
+  result = BufStream(
+    fd: ps.fd,
+    source: ps,
+    blocking: ps.blocking,
+    registerFun: registerFun
+  )
+  result.addStreamIface()
diff --git a/src/io/posixstream.nim b/src/io/posixstream.nim
index da1a8a62..80407b27 100644
--- a/src/io/posixstream.nim
+++ b/src/io/posixstream.nim
@@ -56,6 +56,9 @@ method sendData*(s: PosixStream, buffer: pointer, len: int): int {.base.} =
     raisePosixIOError()
   return n
 
+proc sendData*(s: PosixStream, buffer: openArray[char]): int {.inline.} =
+  return s.sendData(unsafeAddr buffer[0], buffer.len)
+
 method setBlocking*(s: PosixStream, blocking: bool) {.base.} =
   s.blocking = blocking
   let ofl = fcntl(s.fd, F_GETFL, 0)
@@ -83,7 +86,7 @@ proc psReadData(s: Stream, buffer: pointer, len: int): int =
 
 proc psWriteData(s: Stream, buffer: pointer, len: int) =
   let s = PosixStream(s)
-  assert len != 0 and s.blocking
+  #TODO assert len != 0 and s.blocking
   discard s.sendData(buffer, len)
 
 proc psAtEnd(s: Stream): bool =
diff --git a/src/local/client.nim b/src/local/client.nim
index 76a61bfa..3a80b4bf 100644
--- a/src/local/client.nim
+++ b/src/local/client.nim
@@ -21,6 +21,7 @@ import display/term
 import html/chadombuilder
 import html/dom
 import html/event
+import io/bufstream
 import io/posixstream
 import io/promise
 import io/socketstream
@@ -41,6 +42,7 @@ import loader/loader
 import loader/request
 import local/container
 import local/pager
+import server/buffer
 import server/forkserver
 import types/blob
 import types/cookie
@@ -450,12 +452,15 @@ proc acceptBuffers(client: Client) =
       client.pager.procmap.del(pid)
     stream.close()
   var accepted: seq[Pid]
+  let registerFun = proc(fd: int) =
+    client.selector.unregister(fd)
+    client.selector.registerHandle(fd, {Read, Write}, 0)
   for pid, container in client.pager.procmap:
     let stream = connectSocketStream(pid, buffered = false, blocking = true)
     if stream == nil:
       client.pager.alert("Error: failed to set up buffer")
       continue
-    container.setStream(stream)
+    container.setStream(stream, registerFun)
     let fd = int(stream.fd)
     client.fdmap[fd] = container
     client.selector.registerHandle(fd, {Read}, 0)
@@ -513,6 +518,12 @@ proc handleRead(client: Client, fd: int) =
     let container = client.fdmap[fd]
     client.pager.handleEvent(container)
 
+proc handleWrite(client: Client, fd: int) =
+  let container = client.fdmap[fd]
+  if container.iface.stream.flushWrite():
+    client.selector.unregister(fd)
+    client.selector.registerHandle(fd, {Read}, 0)
+
 proc flushConsole*(client: Client) {.jsfunc.} =
   if client.console == nil:
     # hack for when client crashes before console has been initialized
@@ -561,6 +572,8 @@ proc inputLoop(client: Client) =
     for event in events:
       if Read in event.events:
         client.handleRead(event.fd)
+      if Write in event.events:
+        client.handleWrite(event.fd)
       if Error in event.events:
         client.handleError(event.fd)
       if Signal in event.events:
@@ -597,6 +610,8 @@ proc headlessLoop(client: Client) =
     for event in events:
       if Read in event.events:
         client.handleRead(event.fd)
+      if Write in event.events:
+        client.handleWrite(event.fd)
       if Error in event.events:
         client.handleError(event.fd)
       if selectors.Event.Timer in event.events:
diff --git a/src/local/container.nim b/src/local/container.nim
index 03cad675..5d5e6669 100644
--- a/src/local/container.nim
+++ b/src/local/container.nim
@@ -1,6 +1,5 @@
 import std/deques
 import std/options
-import std/streams
 import std/unicode
 
 when defined(posix):
@@ -11,6 +10,7 @@ import display/term
 import extern/stdio
 import io/promise
 import io/serialize
+import io/socketstream
 import js/javascript
 import js/jstypes
 import js/regex
@@ -144,8 +144,8 @@ jsDestructor(Highlight)
 jsDestructor(Container)
 
 proc newBuffer*(forkserver: ForkServer, config: BufferConfig,
-    request: Request, attrs: WindowAttributes, title = "",
-    redirectdepth = 0, canreinterpret = true, fd = FileHandle(-1),
+    request: Request, attrs: WindowAttributes, title: string,
+    redirectdepth: int, canreinterpret: bool, fd: FileHandle,
     contentType: Option[string]): Container =
   let (process, loaderPid) = forkserver.forkBuffer(request, config, attrs)
   if fd != -1:
@@ -1575,12 +1575,13 @@ proc handleCommand(container: Container) =
   container.iface.stream.sread(packetid)
   container.iface.resolve(packetid, len - slen(packetid))
 
-proc setStream*(container: Container, stream: Stream) =
+proc setStream*(container: Container, stream: SocketStream,
+    registerFun: proc(fd: int)) =
   if not container.cloned:
-    container.iface = newBufferInterface(stream)
+    container.iface = newBufferInterface(stream, registerFun)
     container.load()
   else:
-    container.iface = cloneInterface(stream)
+    container.iface = cloneInterface(stream, registerFun)
     # Maybe we have to resume loading. Let's try.
     discard container.iface.load().then(proc(res: int) =
       container.onload(res)
diff --git a/src/local/pager.nim b/src/local/pager.nim
index 1040c259..d8f7c5dd 100644
--- a/src/local/pager.nim
+++ b/src/local/pager.nim
@@ -21,8 +21,8 @@ import extern/editor
 import extern/runproc
 import extern/stdio
 import extern/tempfile
+import io/posixstream
 import io/promise
-import io/socketstream
 import js/error
 import js/javascript
 import js/jstypes
@@ -86,7 +86,7 @@ type
     statusgrid*: FixedGrid
     term*: Terminal
     tmpdir: string
-    unreg*: seq[(Pid, SocketStream)]
+    unreg*: seq[(Pid, PosixStream)]
     urimethodmap: URIMethodMap
     username: string
 
@@ -583,7 +583,7 @@ proc deleteContainer(pager: Pager, container: Container) =
       pager.setContainer(nil)
   container.parent = nil
   container.children.setLen(0)
-  pager.unreg.add((container.process, SocketStream(container.iface.stream)))
+  pager.unreg.add((container.process, container.iface.stream))
   pager.forkserver.removeChild(container.process)
 
 proc discardBuffer(pager: Pager, container = none(Container)) {.jsfunc.} =
diff --git a/src/server/buffer.nim b/src/server/buffer.nim
index 26ce08db..c0efeb65 100644
--- a/src/server/buffer.nim
+++ b/src/server/buffer.nim
@@ -26,6 +26,7 @@ import html/dom
 import html/enums
 import html/env
 import html/event
+import io/bufstream
 import io/posixstream
 import io/promise
 import io/serialize
@@ -127,26 +128,29 @@ type
     map: PromiseMap
     packetid: int
     opaque: InterfaceOpaque
-    stream*: Stream
+    stream*: BufStream
 
 proc getFromOpaque[T](opaque: pointer, res: var T) =
   let opaque = cast[InterfaceOpaque](opaque)
   if opaque.len != 0:
     opaque.stream.sread(res)
 
-proc newBufferInterface*(stream: Stream): BufferInterface =
+proc newBufferInterface*(stream: SocketStream, registerFun: proc(fd: int)):
+    BufferInterface =
   let opaque = InterfaceOpaque(stream: stream)
   result = BufferInterface(
     map: newPromiseMap(cast[pointer](opaque)),
     packetid: 1, # ids below 1 are invalid
     opaque: opaque,
-    stream: stream
+    stream: newBufStream(stream, registerFun)
   )
 
 # After cloning a buffer, we need a new interface to the new buffer process.
 # Here we create a new interface for that clone.
-proc cloneInterface*(stream: Stream): BufferInterface =
-  let iface = newBufferInterface(stream)
+proc cloneInterface*(stream: SocketStream, registerFun: proc(fd: int)):
+    BufferInterface =
+  let iface = newBufferInterface(stream, registerFun)
+  #TODO buffered data should probably be copied here
   # We have just fork'ed the buffer process inside an interface function,
   # from which the new buffer is going to return as well. So we must also
   # consume the return value of the clone function, which is the pid 0.
@@ -180,7 +184,8 @@ proc buildInterfaceProc(fun: NimNode, funid: string): tuple[fun, name: NimNode]
   let thisval = this2[0]
   body.add(quote do:
     `thisval`.stream.swrite(BufferCommand.`nup`)
-    `thisval`.stream.swrite(`thisval`.packetid))
+    `thisval`.stream.swrite(`thisval`.packetid)
+  )
   var params2: seq[NimNode]
   var retval2: NimNode
   var addfun: NimNode
@@ -196,6 +201,7 @@ proc buildInterfaceProc(fun: NimNode, funid: string): tuple[fun, name: NimNode]
       retval)
   params2.add(retval2)
   params2.add(this2)
+  # flatten args
   for i in 2 ..< params.len:
     let param = params[i]
     for i in 0 ..< param.len - 2:
@@ -205,15 +211,16 @@ proc buildInterfaceProc(fun: NimNode, funid: string): tuple[fun, name: NimNode]
     let s = params2[i][0] # sym e.g. url
     body.add(quote do:
       when typeof(`s`) is FileHandle:
-        SocketStream(`thisval`.stream).sendFileHandle(`s`)
+        #TODO flush or something
+        SocketStream(`thisval`.stream.source).sendFileHandle(`s`)
       else:
-        `thisval`.stream.swrite(`s`))
-  body.add(quote do:
-    `thisval`.stream.flush())
+        `thisval`.stream.swrite(`s`)
+    )
   body.add(quote do:
     let promise = `addfun`
     inc `thisval`.packetid
-    return promise)
+    return promise
+  )
   var pragmas: NimNode
   if retval.kind == nnkEmpty:
     pragmas = newNimNode(nnkPragma).add(ident("discardable"))
@@ -1749,14 +1756,14 @@ macro bufferDispatcher(funs: static ProxyMap, buffer: Buffer,
         let len = slen(`packetid`)
         buffer.pstream.swrite(len)
         buffer.pstream.swrite(`packetid`)
-        buffer.pstream.flush())
+      )
     else:
       resolve.add(quote do:
         let len = slen(`packetid`) + slen(`rval`)
         buffer.pstream.swrite(len)
         buffer.pstream.swrite(`packetid`)
         buffer.pstream.swrite(`rval`)
-        buffer.pstream.flush())
+      )
     if v.istask:
       let en = v.ename
       stmts.add(quote do: