summary refs log tree commit diff stats
path: root/lib/pure/asyncnet.nim
blob: b1abf627bb7ffe32c2fa6735540777f13d174809 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
#
#
#            Nimrod's Runtime Library
#        (c) Copyright 2014 Dominik Picheta
#
#    See the file "copying.txt", included in this
#    distribution, for details about the copyright.
#

## This module implements a high-level asynchronous sockets API based on the
## asynchronous dispatcher defined in the ``asyncdispatch`` module.
##
## Example
## =======
## 
## The following example demonstrates a simple chat server.
##
## .. code-block::nimrod
##
##   import asyncnet, asyncdispatch
##
##   var clients: seq[PAsyncSocket] = @[]
##
##   proc processClient(client: PAsyncSocket) {.async.} =
##     while true:
##       let line = await client.recvLine()
##       for c in clients:
##         await c.send(line & "\c\L")
##
##   proc serve() {.async.} =
##     var server = newAsyncSocket()
##     server.bindAddr(TPort(12345))
##     server.listen()
##
##     while true:
##       let client = await server.accept()
##       clients.add client
##
##       processClient(client)
##
##   serve()
##   runForever()
##
##
## **Note:** This module is still largely experimental.

import asyncdispatch
import rawsockets
import net

when defined(ssl):
  import openssl

type
  # TODO: I would prefer to just do:
  # PAsyncSocket* {.borrow: `.`.} = distinct PSocket. But that doesn't work.
  TAsyncSocket {.borrow: `.`.} = distinct TSocketImpl
  PAsyncSocket* = ref TAsyncSocket

# TODO: Save AF, domain etc info and reuse it in procs which need it like connect.

proc newSocket(fd: TAsyncFD, isBuff: bool): PAsyncSocket =
  assert fd != osInvalidSocket.TAsyncFD
  new(result.PSocket)
  result.fd = fd.TSocketHandle
  result.isBuffered = isBuff
  if isBuff:
    result.currPos = 0

proc newAsyncSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM,
    protocol: TProtocol = IPPROTO_TCP, buffered = true): PAsyncSocket =
  ## Creates a new asynchronous socket.
  result = newSocket(newAsyncRawSocket(domain, typ, protocol), buffered)

proc connect*(socket: PAsyncSocket, address: string, port: TPort,
    af = AF_INET): PFuture[void] =
  ## Connects ``socket`` to server at ``address:port``.
  ##
  ## Returns a ``PFuture`` which will complete when the connection succeeds
  ## or an error occurs.
  result = connect(socket.fd.TAsyncFD, address, port, af)

proc readIntoBuf(socket: PAsyncSocket, flags: int): PFuture[int] {.async.} =
  var data = await recv(socket.fd.TAsyncFD, BufferSize, flags)
  if data.len != 0:
    copyMem(addr socket.buffer[0], addr data[0], data.len)
  socket.bufLen = data.len
  socket.currPos = 0
  result = data.len

proc recv*(socket: PAsyncSocket, size: int,
           flags: int = 0): PFuture[string] {.async.} =
  ## Reads ``size`` bytes from ``socket``. Returned future will complete once
  ## all of the requested data is read. If socket is disconnected during the
  ## recv operation then the future may complete with only a part of the
  ## requested data read. If socket is disconnected and no data is available
  ## to be read then the future will complete with a value of ``""``.
  if socket.isBuffered:
    result = newString(size)

    template returnNow(readBytes: int) =
      result.setLen(readBytes)
      # Only increase buffer position when not peeking.
      if (flags and MSG_PEEK) != MSG_PEEK:
        socket.currPos.inc(readBytes)
      return

    if socket.bufLen == 0:
      let res = await socket.readIntoBuf(flags and (not MSG_PEEK))
      if res == 0: returnNow(0)

    var read = 0
    while read < size:
      if socket.currPos >= socket.bufLen:
        let res = await socket.readIntoBuf(flags and (not MSG_PEEK))
        if res == 0: returnNow(read)

      let chunk = min(socket.bufLen-socket.currPos, size-read)
      copyMem(addr(result[read]), addr(socket.buffer[socket.currPos+read]), chunk)
      read.inc(chunk)

    returnNow(read)
  else:
    result = await recv(socket.fd.TAsyncFD, size, flags)

proc send*(socket: PAsyncSocket, data: string): PFuture[void] =
  ## Sends ``data`` to ``socket``. The returned future will complete once all
  ## data has been sent.
  assert socket != nil
  result = send(socket.fd.TAsyncFD, data)

proc acceptAddr*(socket: PAsyncSocket): 
      PFuture[tuple[address: string, client: PAsyncSocket]] =
  ## Accepts a new connection. Returns a future containing the client socket
  ## corresponding to that connection and the remote address of the client.
  ## The future will complete when the connection is successfully accepted.
  var retFuture = newFuture[tuple[address: string, client: PAsyncSocket]]()
  var fut = acceptAddr(socket.fd.TAsyncFD)
  fut.callback =
    proc (future: PFuture[tuple[address: string, client: TAsyncFD]]) =
      assert future.finished
      if future.failed:
        retFuture.fail(future.readError)
      else:
        let resultTup = (future.read.address,
                         newSocket(future.read.client, socket.isBuffered))
        retFuture.complete(resultTup)
  return retFuture

proc accept*(socket: PAsyncSocket): PFuture[PAsyncSocket] =
  ## Accepts a new connection. Returns a future containing the client socket
  ## corresponding to that connection.
  ## The future will complete when the connection is successfully accepted.
  var retFut = newFuture[PAsyncSocket]()
  var fut = acceptAddr(socket)
  fut.callback =
    proc (future: PFuture[tuple[address: string, client: PAsyncSocket]]) =
      assert future.finished
      if future.failed:
        retFut.fail(future.readError)
      else:
        retFut.complete(future.read.client)
  return retFut

proc recvLine*(socket: PAsyncSocket): PFuture[string] {.async.} =
  ## Reads a line of data from ``socket``. Returned future will complete once
  ## a full line is read or an error occurs.
  ##
  ## If a full line is read ``\r\L`` is not
  ## added to ``line``, however if solely ``\r\L`` is read then ``line``
  ## will be set to it.
  ## 
  ## If the socket is disconnected, ``line`` will be set to ``""``.
  ##
  ## If the socket is disconnected in the middle of a line (before ``\r\L``
  ## is read) then line will be set to ``""``.
  ## The partial line **will be lost**.
  
  template addNLIfEmpty(): stmt =
    if result.len == 0:
      result.add("\c\L")

  result = ""
  var c = ""
  while true:
    c = await recv(socket, 1)
    if c.len == 0:
      return ""
    if c == "\r":
      c = await recv(socket, 1, MSG_PEEK)
      if c.len > 0 and c == "\L":
        let dummy = await recv(socket, 1)
        assert dummy == "\L"
      addNLIfEmpty()
      return
    elif c == "\L":
      addNLIfEmpty()
      return
    add(result.string, c)

proc bindAddr*(socket: PAsyncSocket, port = TPort(0), address = "") =
  ## Binds ``address``:``port`` to the socket.
  ##
  ## If ``address`` is "" then ADDR_ANY will be bound.
  socket.PSocket.bindAddr(port, address)

proc listen*(socket: PAsyncSocket, backlog = SOMAXCONN) =
  ## Marks ``socket`` as accepting connections.
  ## ``Backlog`` specifies the maximum length of the
  ## queue of pending connections.
  ##
  ## Raises an EOS error upon failure.
  socket.PSocket.listen(backlog)

proc close*(socket: PAsyncSocket) =
  ## Closes the socket.
  socket.fd.TAsyncFD.close()
  # TODO SSL

when isMainModule:
  type
    TestCases = enum
      HighClient, LowClient, LowServer

  const test = HighClient

  when test == HighClient:
    proc main() {.async.} =
      var sock = newAsyncSocket()
      await sock.connect("irc.freenode.net", TPort(6667))
      while true:
        let line = await sock.recvLine()
        if line == "":
          echo("Disconnected")
          break
        else:
          echo("Got line: ", line)
    main()
  elif test == LowClient:
    var sock = newAsyncSocket()
    var f = connect(sock, "irc.freenode.net", TPort(6667))
    f.callback =
      proc (future: PFuture[void]) =
        echo("Connected in future!")
        for i in 0 .. 50:
          var recvF = recv(sock, 10)
          recvF.callback =
            proc (future: PFuture[string]) =
              echo("Read ", future.read.len, ": ", future.read.repr)
  elif test == LowServer:
    var sock = newAsyncSocket()
    sock.bindAddr(TPort(6667))
    sock.listen()
    proc onAccept(future: PFuture[PAsyncSocket]) =
      let client = future.read
      echo "Accepted ", client.fd.cint
      var t = send(client, "test\c\L")
      t.callback =
        proc (future: PFuture[void]) =
          echo("Send")
          client.close()
      
      var f = accept(sock)
      f.callback = onAccept
      
    var f = accept(sock)
    f.callback = onAccept
  runForever()