diff options
author | PMunch <peterme@peterme.net> | 2021-01-07 16:09:57 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-07 16:09:57 +0100 |
commit | 0e7902b976fbd0e566f0fb5f4915695aca406451 (patch) | |
tree | 586e315bd2a20ed5ef22fae21ec0c702276f1527 /lib/std/socketstreams.nim | |
parent | 89a21e4ec71e705833d2aacd069e291cf41a19c6 (diff) | |
download | Nim-0e7902b976fbd0e566f0fb5f4915695aca406451.tar.gz |
Implements streams for sockets (#15729)
Diffstat (limited to 'lib/std/socketstreams.nim')
-rw-r--r-- | lib/std/socketstreams.nim | 181 |
1 files changed, 181 insertions, 0 deletions
diff --git a/lib/std/socketstreams.nim b/lib/std/socketstreams.nim new file mode 100644 index 000000000..5c882858d --- /dev/null +++ b/lib/std/socketstreams.nim @@ -0,0 +1,181 @@ +# +# +# Nim's Runtime Library +# (c) Copyright 2021 Nim contributors +# +# See the file "copying.txt", included in this +# distribution, for details about the copyright. +# + +## This module provides an implementation of the streams interface for sockets. +## It contains two separate implementations, a +## `ReadSocketStream <#ReadSocketStream>`_ and a +## `WriteSocketStream <#WriteSocketStream>`_. +## +## The `ReadSocketStream` only supports reading, peeking, and seeking. +## It reads into a buffer, so even by +## seeking backwards it will only read the same position a single time from the +## underlying socket. To clear the buffer and free the data read into it you +## can call `resetStream`, this will also reset the position back to 0 but +## won't do anything to the underlying socket. +## +## The `WriteSocketStream` allows both reading and writing, but it performs the +## reads on the internal buffer. So by writing to the buffer you can then read +## back what was written but without receiving anything from the socket. You +## can also set the position and overwrite parts of the buffer, and to send +## anything over the socket you need to call `flush` at which point you can't +## write anything to the buffer before the point of the flush (but it can still +## be read). Again to empty the underlying buffer you need to call +## `resetStream`. +## +## Examples +## ======== +## +## .. code-block:: Nim +## import std/socketstreams +## +## var +## socket = newSocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP) +## stream = newReadSocketStream(socket) +## socket.sendTo("127.0.0.1", Port(12345), "SOME REQUEST") +## echo stream.readLine() # Will call `recv` +## stream.setPosition(0) +## echo stream.readLine() # Will return the read line from the buffer +## stream.resetStream() # Buffer is now empty, position is 0 +## echo stream.readLine() # Will call `recv` again +## stream.close() # Closes the socket +## +## .. code-block:: Nim +## +## import std/socketstreams +## +## var socket = newSocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP) +## socket.connect("127.0.0.1", Port(12345)) +## var sendStream = newWriteSocketStream(socket) +## sendStream.write "NOM" +## sendStream.setPosition(1) +## echo sendStream.peekStr(2) # OM +## sendStream.write "I" +## sendStream.setPosition(0) +## echo sendStream.readStr(3) # NIM +## echo sendStream.getPosition() # 3 +## sendStream.flush() # This actually performs the writing to the socket +## sendStream.setPosition(1) +## sendStream.write "I" # Throws an error as we can't write into an already sent buffer + +import net, streams + +type + ReadSocketStream* = ref ReadSocketStreamObj + ReadSocketStreamObj* = object of StreamObj + data: Socket + pos: int + buf: seq[byte] + WriteSocketStream* = ref WriteSocketStreamObj + WriteSocketStreamObj* = object of ReadSocketStreamObj + lastFlush: int + +proc rsAtEnd(s: Stream): bool = + return false + +proc rsSetPosition(s: Stream, pos: int) = + var s = ReadSocketStream(s) + s.pos = pos + +proc rsGetPosition(s: Stream): int = + var s = ReadSocketStream(s) + return s.pos + +proc rsPeekData(s: Stream, buffer: pointer, bufLen: int): int = + let s = ReadSocketStream(s) + if bufLen > 0: + let oldLen = s.buf.len + s.buf.setLen(max(s.pos + bufLen, s.buf.len)) + if s.pos + bufLen > oldLen: + result = s.data.recv(s.buf[oldLen].addr, s.buf.len - oldLen) + if result > 0: + result += oldLen - s.pos + else: + result = bufLen + copyMem(buffer, s.buf[s.pos].addr, result) + +proc rsReadData(s: Stream, buffer: pointer, bufLen: int): int = + result = s.rsPeekData(buffer, bufLen) + var s = ReadSocketStream(s) + s.pos += bufLen + +proc rsReadDataStr(s: Stream, buffer: var string, slice: Slice[int]): int = + var s = ReadSocketStream(s) + result = slice.b + 1 - slice.a + if result > 0: + result = s.rsReadData(buffer[slice.a].addr, result) + inc(s.pos, result) + else: + result = 0 + +proc wsWriteData(s: Stream, buffer: pointer, bufLen: int) = + var s = WriteSocketStream(s) + if s.pos < s.lastFlush: + raise newException(IOError, "Unable to write into buffer that has already been sent") + if s.buf.len < s.pos + bufLen: + s.buf.setLen(s.pos + bufLen) + copyMem(s.buf[s.pos].addr, buffer, bufLen) + s.pos += bufLen + +proc wsPeekData(s: Stream, buffer: pointer, bufLen: int): int = + var s = WriteSocketStream(s) + result = bufLen + if result > 0: + if s.pos > s.buf.len or s.pos == s.buf.len or s.pos + bufLen > s.buf.len: + raise newException(IOError, "Unable to read past end of write buffer") + else: + copyMem(buffer, s.buf[s.pos].addr, bufLen) + +proc wsReadData(s: Stream, buffer: pointer, bufLen: int): int = + result = s.wsPeekData(buffer, bufLen) + var s = ReadSocketStream(s) + s.pos += bufLen + +proc wsAtEnd(s: Stream): bool = + var s = WriteSocketStream(s) + return s.pos == s.buf.len + +proc wsFlush(s: Stream) = + var s = WriteSocketStream(s) + discard s.data.send(s.buf[s.lastFlush].addr, s.buf.len - s.lastFlush) + s.lastFlush = s.buf.len + +proc rsClose(s: Stream) = + {.cast(tags: []).}: + var s = ReadSocketStream(s) + s.data.close() + +proc newReadSocketStream*(s: Socket): owned ReadSocketStream = + result = ReadSocketStream(data: s, pos: 0, + closeImpl: rsClose, + atEndImpl: rsAtEnd, + setPositionImpl: rsSetPosition, + getPositionImpl: rsGetPosition, + readDataImpl: rsReadData, + peekDataImpl: rsPeekData, + readDataStrImpl: rsReadDataStr) + +proc resetStream*(s: ReadSocketStream) = + s.buf = @[] + s.pos = 0 + +proc newWriteSocketStream*(s: Socket): owned WriteSocketStream = + result = WriteSocketStream(data: s, pos: 0, + closeImpl: rsClose, + atEndImpl: wsAtEnd, + setPositionImpl: rsSetPosition, + getPositionImpl: rsGetPosition, + writeDataImpl: wsWriteData, + readDataImpl: wsReadData, + peekDataImpl: wsPeekData, + flushImpl: wsFlush) + +proc resetStream*(s: WriteSocketStream) = + s.buf = @[] + s.pos = 0 + s.lastFlush = 0 |