diff options
-rw-r--r-- | changelog.md | 2 | ||||
-rw-r--r-- | lib/std/socketstreams.nim | 181 | ||||
-rw-r--r-- | tests/stdlib/tsocketstreams.nim | 64 |
3 files changed, 247 insertions, 0 deletions
diff --git a/changelog.md b/changelog.md index 4e9b8c162..afe49f1f3 100644 --- a/changelog.md +++ b/changelog.md @@ -91,6 +91,8 @@ - `math.round` now is rounded "away from zero" in JS backend which is consistent with other backends. see #9125. Use `-d:nimLegacyJsRound` for previous behavior. +- Added `socketstream` module that wraps sockets in the stream interface + diff --git a/lib/std/socketstreams.nim b/lib/std/socketstreams.nim new file mode 100644 index 000000000..5c882858d --- /dev/null +++ b/lib/std/socketstreams.nim @@ -0,0 +1,181 @@ +# +# +# Nim's Runtime Library +# (c) Copyright 2021 Nim contributors +# +# See the file "copying.txt", included in this +# distribution, for details about the copyright. +# + +## This module provides an implementation of the streams interface for sockets. +## It contains two separate implementations, a +## `ReadSocketStream <#ReadSocketStream>`_ and a +## `WriteSocketStream <#WriteSocketStream>`_. +## +## The `ReadSocketStream` only supports reading, peeking, and seeking. +## It reads into a buffer, so even by +## seeking backwards it will only read the same position a single time from the +## underlying socket. To clear the buffer and free the data read into it you +## can call `resetStream`, this will also reset the position back to 0 but +## won't do anything to the underlying socket. +## +## The `WriteSocketStream` allows both reading and writing, but it performs the +## reads on the internal buffer. So by writing to the buffer you can then read +## back what was written but without receiving anything from the socket. You +## can also set the position and overwrite parts of the buffer, and to send +## anything over the socket you need to call `flush` at which point you can't +## write anything to the buffer before the point of the flush (but it can still +## be read). Again to empty the underlying buffer you need to call +## `resetStream`. +## +## Examples +## ======== +## +## .. code-block:: Nim +## import std/socketstreams +## +## var +## socket = newSocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP) +## stream = newReadSocketStream(socket) +## socket.sendTo("127.0.0.1", Port(12345), "SOME REQUEST") +## echo stream.readLine() # Will call `recv` +## stream.setPosition(0) +## echo stream.readLine() # Will return the read line from the buffer +## stream.resetStream() # Buffer is now empty, position is 0 +## echo stream.readLine() # Will call `recv` again +## stream.close() # Closes the socket +## +## .. code-block:: Nim +## +## import std/socketstreams +## +## var socket = newSocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP) +## socket.connect("127.0.0.1", Port(12345)) +## var sendStream = newWriteSocketStream(socket) +## sendStream.write "NOM" +## sendStream.setPosition(1) +## echo sendStream.peekStr(2) # OM +## sendStream.write "I" +## sendStream.setPosition(0) +## echo sendStream.readStr(3) # NIM +## echo sendStream.getPosition() # 3 +## sendStream.flush() # This actually performs the writing to the socket +## sendStream.setPosition(1) +## sendStream.write "I" # Throws an error as we can't write into an already sent buffer + +import net, streams + +type + ReadSocketStream* = ref ReadSocketStreamObj + ReadSocketStreamObj* = object of StreamObj + data: Socket + pos: int + buf: seq[byte] + WriteSocketStream* = ref WriteSocketStreamObj + WriteSocketStreamObj* = object of ReadSocketStreamObj + lastFlush: int + +proc rsAtEnd(s: Stream): bool = + return false + +proc rsSetPosition(s: Stream, pos: int) = + var s = ReadSocketStream(s) + s.pos = pos + +proc rsGetPosition(s: Stream): int = + var s = ReadSocketStream(s) + return s.pos + +proc rsPeekData(s: Stream, buffer: pointer, bufLen: int): int = + let s = ReadSocketStream(s) + if bufLen > 0: + let oldLen = s.buf.len + s.buf.setLen(max(s.pos + bufLen, s.buf.len)) + if s.pos + bufLen > oldLen: + result = s.data.recv(s.buf[oldLen].addr, s.buf.len - oldLen) + if result > 0: + result += oldLen - s.pos + else: + result = bufLen + copyMem(buffer, s.buf[s.pos].addr, result) + +proc rsReadData(s: Stream, buffer: pointer, bufLen: int): int = + result = s.rsPeekData(buffer, bufLen) + var s = ReadSocketStream(s) + s.pos += bufLen + +proc rsReadDataStr(s: Stream, buffer: var string, slice: Slice[int]): int = + var s = ReadSocketStream(s) + result = slice.b + 1 - slice.a + if result > 0: + result = s.rsReadData(buffer[slice.a].addr, result) + inc(s.pos, result) + else: + result = 0 + +proc wsWriteData(s: Stream, buffer: pointer, bufLen: int) = + var s = WriteSocketStream(s) + if s.pos < s.lastFlush: + raise newException(IOError, "Unable to write into buffer that has already been sent") + if s.buf.len < s.pos + bufLen: + s.buf.setLen(s.pos + bufLen) + copyMem(s.buf[s.pos].addr, buffer, bufLen) + s.pos += bufLen + +proc wsPeekData(s: Stream, buffer: pointer, bufLen: int): int = + var s = WriteSocketStream(s) + result = bufLen + if result > 0: + if s.pos > s.buf.len or s.pos == s.buf.len or s.pos + bufLen > s.buf.len: + raise newException(IOError, "Unable to read past end of write buffer") + else: + copyMem(buffer, s.buf[s.pos].addr, bufLen) + +proc wsReadData(s: Stream, buffer: pointer, bufLen: int): int = + result = s.wsPeekData(buffer, bufLen) + var s = ReadSocketStream(s) + s.pos += bufLen + +proc wsAtEnd(s: Stream): bool = + var s = WriteSocketStream(s) + return s.pos == s.buf.len + +proc wsFlush(s: Stream) = + var s = WriteSocketStream(s) + discard s.data.send(s.buf[s.lastFlush].addr, s.buf.len - s.lastFlush) + s.lastFlush = s.buf.len + +proc rsClose(s: Stream) = + {.cast(tags: []).}: + var s = ReadSocketStream(s) + s.data.close() + +proc newReadSocketStream*(s: Socket): owned ReadSocketStream = + result = ReadSocketStream(data: s, pos: 0, + closeImpl: rsClose, + atEndImpl: rsAtEnd, + setPositionImpl: rsSetPosition, + getPositionImpl: rsGetPosition, + readDataImpl: rsReadData, + peekDataImpl: rsPeekData, + readDataStrImpl: rsReadDataStr) + +proc resetStream*(s: ReadSocketStream) = + s.buf = @[] + s.pos = 0 + +proc newWriteSocketStream*(s: Socket): owned WriteSocketStream = + result = WriteSocketStream(data: s, pos: 0, + closeImpl: rsClose, + atEndImpl: wsAtEnd, + setPositionImpl: rsSetPosition, + getPositionImpl: rsGetPosition, + writeDataImpl: wsWriteData, + readDataImpl: wsReadData, + peekDataImpl: wsPeekData, + flushImpl: wsFlush) + +proc resetStream*(s: WriteSocketStream) = + s.buf = @[] + s.pos = 0 + s.lastFlush = 0 diff --git a/tests/stdlib/tsocketstreams.nim b/tests/stdlib/tsocketstreams.nim new file mode 100644 index 000000000..0cf952810 --- /dev/null +++ b/tests/stdlib/tsocketstreams.nim @@ -0,0 +1,64 @@ +discard """ + output: ''' +OM +NIM +3 +NIM +NIM +Hello server! +Hi there client! +'''""" +import std/socketstreams, net, streams + +block UDP: + var recvSocket = newSocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP) + var recvStream = newReadSocketStream(recvSocket) + recvSocket.bindAddr(Port(12345), "127.0.0.1") + + var sendSocket = newSocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP) + sendSocket.connect("127.0.0.1", Port(12345)) + var sendStream = newWriteSocketStream(sendSocket) + sendStream.write "NOM\n" + sendStream.setPosition(1) + echo sendStream.peekStr(2) + sendStream.write "I" + sendStream.setPosition(0) + echo sendStream.readStr(3) + echo sendStream.getPosition() + sendStream.flush() + + echo recvStream.readLine() + recvStream.setPosition(0) + echo recvStream.readLine() + recvStream.close() + +block TCP: + var server = newSocket() + server.setSockOpt(OptReusePort, true) + server.bindAddr(Port(12345)) + server.listen() + + var + client = newSocket() + clientRequestStream = newWriteSocketStream(client) + clientResponseStream = newReadSocketStream(client) + client.connect("127.0.0.1", Port(12345)) + clientRequestStream.writeLine("Hello server!") + clientRequestStream.flush() + + var + incoming: Socket + address: string + server.acceptAddr(incoming, address) + var + serverRequestStream = newReadSocketStream(incoming) + serverResponseStream = newWriteSocketStream(incoming) + echo serverRequestStream.readLine() + serverResponseStream.writeLine("Hi there client!") + serverResponseStream.flush() + serverResponseStream.close() + serverRequestStream.close() + + echo clientResponseStream.readLine() + clientResponseStream.close() + clientRequestStream.close() |