summary refs log tree commit diff stats
path: root/lib/std/socketstreams.nim
blob: 5c882858dba6a79fa14c9e6b349444a40157cb4e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
pre { line-height: 125%; }
td.linenos .normal { color: inherit; background-color: transparent; padding-left: 5px; padding-right: 5px; }
span.linenos { color: inherit; background-color: transparent; padding-left: 5px; padding-right: 5px; }
td.linenos .special { color: #000000; background-color: #ffffc0; padding-left: 5px; padding-right: 5px; }
span.linenos.special { color: #000000; background-color: #ffffc0; padding-left: 5px; padding-right: 5px; }
.highlight .hll { background-color: #ffffcc }
.highlight .c { color: #888888 } /* Comment */
.highlight .err { color: #a61717; background-color: #e3d2d2 } /* Error */
.highlight .k { color: #008800; font-weight: bold } /* Keyword */
.highlight .ch { color: #888888 } /* Comment.Hashbang */
.highlight .cm { color: #888888 } /* Comment.Multiline */
.highlight .cp { color: #cc0000; font-weight: bold } /* Comment.Preproc */
.highlight .cpf { color: #888888 } /* Comment.PreprocFile */
.highlight .c1 { color: #888888 } /* Comment.Single */
.highlight .cs { color: #cc0000; font-weight: bold; background-color: #fff0f0 } /* Comment.Special */
.highlight .gd { color: #000000; background-color: #ffdddd } /* Generic.Deleted */
.highlight .ge { font-style: italic } /* Generic.Emph */
.highlight .ges { font-weight: bold; font-style: italic } /* Generic.EmphStrong */
.highlight .gr { color: #aa0000 } /* Generic.Error */
.highlight .gh { color: #333333 } /* Generic.Heading */
.highlight .gi { color: #000000; background-color: #ddffdd } /* Generic.Inserted */
.highlight .go { color: #888888 } /* Generic.Output */
.highlight .gp { color: #555555 } /* Generic.Prompt */
.highlight .gs { font-weight: bold } /* Generic.Strong */
.highlight .gu { color: #666666 } /* Generic.Subheading */
.highlight .gt { color: #aa0000 } /* Generic.Traceback */
.highlight .kc { color: #008800; font-weight: bold } /* Keyword.Constant */
.highlight .kd { color: #008800; font-weight: bold } /* Keyword.Declaration */
.highlight .kn { color: #008800; font-weight: bold } /* Keyword.Namespace */
.highlight .kp { color: #008800 } /* Keyword.Pseudo */
.highlight .kr { color: #008800; font-weight: bold } /* Keyword.Reserved */
.highlight .kt { color: #888888; font-weight: bold } /* Keyword.Type */
.highligh
#
#
#            Nim's Runtime Library
#        (c) Copyright 2021 Nim contributors
#
#    See the file "copying.txt", included in this
#    distribution, for details about the copyright.
#

## This module provides an implementation of the streams interface for sockets.
## It contains two separate implementations, a
## `ReadSocketStream <#ReadSocketStream>`_ and a
## `WriteSocketStream <#WriteSocketStream>`_.
##
## The `ReadSocketStream` only supports reading, peeking, and seeking.
## It reads into a buffer, so even by
## seeking backwards it will only read the same position a single time from the
## underlying socket. To clear the buffer and free the data read into it you
## can call `resetStream`, this will also reset the position back to 0 but
## won't do anything to the underlying socket.
##
## The `WriteSocketStream` allows both reading and writing, but it performs the
## reads on the internal buffer. So by writing to the buffer you can then read
## back what was written but without receiving anything from the socket. You
## can also set the position and overwrite parts of the buffer, and to send
## anything over the socket you need to call `flush` at which point you can't
## write anything to the buffer before the point of the flush (but it can still
## be read). Again to empty the underlying buffer you need to call
## `resetStream`.
##
## Examples
## ========
##
## .. code-block:: Nim
##  import std/socketstreams
##
##  var
##    socket = newSocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)
##    stream = newReadSocketStream(socket)
##  socket.sendTo("127.0.0.1", Port(12345), "SOME REQUEST")
##  echo stream.readLine() # Will call `recv`
##  stream.setPosition(0)
##  echo stream.readLine() # Will return the read line from the buffer
##  stream.resetStream() # Buffer is now empty, position is 0
##  echo stream.readLine() # Will call `recv` again
##  stream.close() # Closes the socket
##
## .. code-block:: Nim
##
##  import std/socketstreams
##
##  var socket = newSocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)
##  socket.connect("127.0.0.1", Port(12345))
##  var sendStream = newWriteSocketStream(socket)
##  sendStream.write "NOM"
##  sendStream.setPosition(1)
##  echo sendStream.peekStr(2) # OM
##  sendStream.write "I"
##  sendStream.setPosition(0)
##  echo sendStream.readStr(3) # NIM
##  echo sendStream.getPosition() # 3
##  sendStream.flush() # This actually performs the writing to the socket
##  sendStream.setPosition(1)
##  sendStream.write "I" # Throws an error as we can't write into an already sent buffer

import net, streams

type
  ReadSocketStream* = ref ReadSocketStreamObj
  ReadSocketStreamObj* = object of StreamObj
    data: Socket
    pos: int
    buf: seq[byte]
  WriteSocketStream* = ref WriteSocketStreamObj
  WriteSocketStreamObj* = object of ReadSocketStreamObj
    lastFlush: int

proc rsAtEnd(s: Stream): bool =
  return false

proc rsSetPosition(s: Stream, pos: int) =
  var s = ReadSocketStream(s)
  s.pos = pos

proc rsGetPosition(s: Stream): int =
  var s = ReadSocketStream(s)
  return s.pos

proc rsPeekData(s: Stream, buffer: pointer, bufLen: int): int =
  let s = ReadSocketStream(s)
  if bufLen > 0:
    let oldLen = s.buf.len
    s.buf.setLen(max(s.pos + bufLen, s.buf.len))
    if s.pos + bufLen > oldLen:
      result = s.data.recv(s.buf[oldLen].addr, s.buf.len - oldLen)
      if result > 0:
        result += oldLen - s.pos
    else:
      result = bufLen
    copyMem(buffer, s.buf[s.pos].addr, result)

proc rsReadData(s: Stream, buffer: pointer, bufLen: int): int =
  result = s.rsPeekData(buffer, bufLen)
  var s = ReadSocketStream(s)
  s.pos += bufLen

proc rsReadDataStr(s: Stream, buffer: var string, slice: Slice[int]): int =
  var s = ReadSocketStream(s)
  result = slice.b + 1 - slice.a
  if result > 0:
    result = s.rsReadData(buffer[slice.a].addr, result)
    inc(s.pos, result)
  else:
    result = 0

proc wsWriteData(s: Stream, buffer: pointer, bufLen: int) =
  var s = WriteSocketStream(s)
  if s.pos < s.lastFlush:
    raise newException(IOError, "Unable to write into buffer that has already been sent")
  if s.buf.len < s.pos + bufLen:
    s.buf.setLen(s.pos + bufLen)
  copyMem(s.buf[s.pos].addr, buffer, bufLen)
  s.pos += bufLen

proc wsPeekData(s: Stream, buffer: pointer, bufLen: int): int =
  var s = WriteSocketStream(s)
  result = bufLen
  if result > 0:
    if s.pos > s.buf.len or s.pos == s.buf.len or s.pos + bufLen > s.buf.len:
      raise newException(IOError, "Unable to read past end of write buffer")
    else:
      copyMem(buffer, s.buf[s.pos].addr, bufLen)

proc wsReadData(s: Stream, buffer: pointer, bufLen: int): int =
  result = s.wsPeekData(buffer, bufLen)
  var s = ReadSocketStream(s)
  s.pos += bufLen

proc wsAtEnd(s: Stream): bool =
  var s = WriteSocketStream(s)
  return s.pos == s.buf.len

proc wsFlush(s: Stream) =
  var s = WriteSocketStream(s)
  discard s.data.send(s.buf[s.lastFlush].addr, s.buf.len - s.lastFlush)
  s.lastFlush = s.buf.len

proc rsClose(s: Stream) =
  {.cast(tags: []).}:
    var s = ReadSocketStream(s)
    s.data.close()

proc newReadSocketStream*(s: Socket): owned ReadSocketStream =
  result = ReadSocketStream(data: s, pos: 0,
    closeImpl: rsClose,
    atEndImpl: rsAtEnd,
    setPositionImpl: rsSetPosition,
    getPositionImpl: rsGetPosition,
    readDataImpl: rsReadData,
    peekDataImpl: rsPeekData,
    readDataStrImpl: rsReadDataStr)

proc resetStream*(s: ReadSocketStream) =
  s.buf = @[]
  s.pos = 0

proc newWriteSocketStream*(s: Socket): owned WriteSocketStream =
  result = WriteSocketStream(data: s, pos: 0,
    closeImpl: rsClose,
    atEndImpl: wsAtEnd,
    setPositionImpl: rsSetPosition,
    getPositionImpl: rsGetPosition,
    writeDataImpl: wsWriteData,
    readDataImpl: wsReadData,
    peekDataImpl: wsPeekData,
    flushImpl: wsFlush)

proc resetStream*(s: WriteSocketStream) =
  s.buf = @[]
  s.pos = 0
  s.lastFlush = 0