summary refs log tree commit diff stats
path: root/lib/std/socketstreams.nim
diff options
context:
space:
mode:
authorPMunch <peterme@peterme.net>2021-01-07 16:09:57 +0100
committerGitHub <noreply@github.com>2021-01-07 16:09:57 +0100
commit0e7902b976fbd0e566f0fb5f4915695aca406451 (patch)
tree586e315bd2a20ed5ef22fae21ec0c702276f1527 /lib/std/socketstreams.nim
parent89a21e4ec71e705833d2aacd069e291cf41a19c6 (diff)
downloadNim-0e7902b976fbd0e566f0fb5f4915695aca406451.tar.gz
Implements streams for sockets (#15729)
Diffstat (limited to 'lib/std/socketstreams.nim')
-rw-r--r--lib/std/socketstreams.nim181
1 files changed, 181 insertions, 0 deletions
diff --git a/lib/std/socketstreams.nim b/lib/std/socketstreams.nim
new file mode 100644
index 000000000..5c882858d
--- /dev/null
+++ b/lib/std/socketstreams.nim
@@ -0,0 +1,181 @@
+#
+#
+#            Nim's Runtime Library
+#        (c) Copyright 2021 Nim contributors
+#
+#    See the file "copying.txt", included in this
+#    distribution, for details about the copyright.
+#
+
+## This module provides an implementation of the streams interface for sockets.
+## It contains two separate implementations, a
+## `ReadSocketStream <#ReadSocketStream>`_ and a
+## `WriteSocketStream <#WriteSocketStream>`_.
+##
+## The `ReadSocketStream` only supports reading, peeking, and seeking.
+## It reads into a buffer, so even by
+## seeking backwards it will only read the same position a single time from the
+## underlying socket. To clear the buffer and free the data read into it you
+## can call `resetStream`, this will also reset the position back to 0 but
+## won't do anything to the underlying socket.
+##
+## The `WriteSocketStream` allows both reading and writing, but it performs the
+## reads on the internal buffer. So by writing to the buffer you can then read
+## back what was written but without receiving anything from the socket. You
+## can also set the position and overwrite parts of the buffer, and to send
+## anything over the socket you need to call `flush` at which point you can't
+## write anything to the buffer before the point of the flush (but it can still
+## be read). Again to empty the underlying buffer you need to call
+## `resetStream`.
+##
+## Examples
+## ========
+##
+## .. code-block:: Nim
+##  import std/socketstreams
+##
+##  var
+##    socket = newSocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)
+##    stream = newReadSocketStream(socket)
+##  socket.sendTo("127.0.0.1", Port(12345), "SOME REQUEST")
+##  echo stream.readLine() # Will call `recv`
+##  stream.setPosition(0)
+##  echo stream.readLine() # Will return the read line from the buffer
+##  stream.resetStream() # Buffer is now empty, position is 0
+##  echo stream.readLine() # Will call `recv` again
+##  stream.close() # Closes the socket
+##
+## .. code-block:: Nim
+##
+##  import std/socketstreams
+##
+##  var socket = newSocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)
+##  socket.connect("127.0.0.1", Port(12345))
+##  var sendStream = newWriteSocketStream(socket)
+##  sendStream.write "NOM"
+##  sendStream.setPosition(1)
+##  echo sendStream.peekStr(2) # OM
+##  sendStream.write "I"
+##  sendStream.setPosition(0)
+##  echo sendStream.readStr(3) # NIM
+##  echo sendStream.getPosition() # 3
+##  sendStream.flush() # This actually performs the writing to the socket
+##  sendStream.setPosition(1)
+##  sendStream.write "I" # Throws an error as we can't write into an already sent buffer
+
+import net, streams
+
+type
+  ReadSocketStream* = ref ReadSocketStreamObj
+  ReadSocketStreamObj* = object of StreamObj
+    data: Socket
+    pos: int
+    buf: seq[byte]
+  WriteSocketStream* = ref WriteSocketStreamObj
+  WriteSocketStreamObj* = object of ReadSocketStreamObj
+    lastFlush: int
+
+proc rsAtEnd(s: Stream): bool =
+  return false
+
+proc rsSetPosition(s: Stream, pos: int) =
+  var s = ReadSocketStream(s)
+  s.pos = pos
+
+proc rsGetPosition(s: Stream): int =
+  var s = ReadSocketStream(s)
+  return s.pos
+
+proc rsPeekData(s: Stream, buffer: pointer, bufLen: int): int =
+  let s = ReadSocketStream(s)
+  if bufLen > 0:
+    let oldLen = s.buf.len
+    s.buf.setLen(max(s.pos + bufLen, s.buf.len))
+    if s.pos + bufLen > oldLen:
+      result = s.data.recv(s.buf[oldLen].addr, s.buf.len - oldLen)
+      if result > 0:
+        result += oldLen - s.pos
+    else:
+      result = bufLen
+    copyMem(buffer, s.buf[s.pos].addr, result)
+
+proc rsReadData(s: Stream, buffer: pointer, bufLen: int): int =
+  result = s.rsPeekData(buffer, bufLen)
+  var s = ReadSocketStream(s)
+  s.pos += bufLen
+
+proc rsReadDataStr(s: Stream, buffer: var string, slice: Slice[int]): int =
+  var s = ReadSocketStream(s)
+  result = slice.b + 1 - slice.a
+  if result > 0:
+    result = s.rsReadData(buffer[slice.a].addr, result)
+    inc(s.pos, result)
+  else:
+    result = 0
+
+proc wsWriteData(s: Stream, buffer: pointer, bufLen: int) =
+  var s = WriteSocketStream(s)
+  if s.pos < s.lastFlush:
+    raise newException(IOError, "Unable to write into buffer that has already been sent")
+  if s.buf.len < s.pos + bufLen:
+    s.buf.setLen(s.pos + bufLen)
+  copyMem(s.buf[s.pos].addr, buffer, bufLen)
+  s.pos += bufLen
+
+proc wsPeekData(s: Stream, buffer: pointer, bufLen: int): int =
+  var s = WriteSocketStream(s)
+  result = bufLen
+  if result > 0:
+    if s.pos > s.buf.len or s.pos == s.buf.len or s.pos + bufLen > s.buf.len:
+      raise newException(IOError, "Unable to read past end of write buffer")
+    else:
+      copyMem(buffer, s.buf[s.pos].addr, bufLen)
+
+proc wsReadData(s: Stream, buffer: pointer, bufLen: int): int =
+  result = s.wsPeekData(buffer, bufLen)
+  var s = ReadSocketStream(s)
+  s.pos += bufLen
+
+proc wsAtEnd(s: Stream): bool =
+  var s = WriteSocketStream(s)
+  return s.pos == s.buf.len
+
+proc wsFlush(s: Stream) =
+  var s = WriteSocketStream(s)
+  discard s.data.send(s.buf[s.lastFlush].addr, s.buf.len - s.lastFlush)
+  s.lastFlush = s.buf.len
+
+proc rsClose(s: Stream) =
+  {.cast(tags: []).}:
+    var s = ReadSocketStream(s)
+    s.data.close()
+
+proc newReadSocketStream*(s: Socket): owned ReadSocketStream =
+  result = ReadSocketStream(data: s, pos: 0,
+    closeImpl: rsClose,
+    atEndImpl: rsAtEnd,
+    setPositionImpl: rsSetPosition,
+    getPositionImpl: rsGetPosition,
+    readDataImpl: rsReadData,
+    peekDataImpl: rsPeekData,
+    readDataStrImpl: rsReadDataStr)
+
+proc resetStream*(s: ReadSocketStream) =
+  s.buf = @[]
+  s.pos = 0
+
+proc newWriteSocketStream*(s: Socket): owned WriteSocketStream =
+  result = WriteSocketStream(data: s, pos: 0,
+    closeImpl: rsClose,
+    atEndImpl: wsAtEnd,
+    setPositionImpl: rsSetPosition,
+    getPositionImpl: rsGetPosition,
+    writeDataImpl: wsWriteData,
+    readDataImpl: wsReadData,
+    peekDataImpl: wsPeekData,
+    flushImpl: wsFlush)
+
+proc resetStream*(s: WriteSocketStream) =
+  s.buf = @[]
+  s.pos = 0
+  s.lastFlush = 0