summary refs log blame commit diff stats
path: root/lib/pure/asyncnet.nim
blob: b1abf627bb7ffe32c2fa6735540777f13d174809 (plain) (tree)
1
2
3
4
5
6
7
8







                                                   





































                                                                            







                    
                                    
                                                                            
                                                     



                                                                                  

                                                          

                              



                            
                                                                         

                                                                       
                                                                        






                                                                           
                                                         
 







                                                                            
                                           
                                                       




                                                                             


























                                                                                  



                                                                             
                      
                                         






                                                                            
                                          
                
                                                                      


















































                                                                             

                                         






                         


















                                                                     
                  



                                      
                         













                                                         
                               


























                                                                    

              
#
#
#            Nimrod's Runtime Library
#        (c) Copyright 2014 Dominik Picheta
#
#    See the file "copying.txt", included in this
#    distribution, for details about the copyright.
#

## This module implements a high-level asynchronous sockets API based on the
## asynchronous dispatcher defined in the ``asyncdispatch`` module.
##
## Example
## =======
## 
## The following example demonstrates a simple chat server.
##
## .. code-block::nimrod
##
##   import asyncnet, asyncdispatch
##
##   var clients: seq[PAsyncSocket] = @[]
##
##   proc processClient(client: PAsyncSocket) {.async.} =
##     while true:
##       let line = await client.recvLine()
##       for c in clients:
##         await c.send(line & "\c\L")
##
##   proc serve() {.async.} =
##     var server = newAsyncSocket()
##     server.bindAddr(TPort(12345))
##     server.listen()
##
##     while true:
##       let client = await server.accept()
##       clients.add client
##
##       processClient(client)
##
##   serve()
##   runForever()
##
##
## **Note:** This module is still largely experimental.

import asyncdispatch
import rawsockets
import net

when defined(ssl):
  import openssl

type
  # TODO: I would prefer to just do:
  # PAsyncSocket* {.borrow: `.`.} = distinct PSocket. But that doesn't work.
  TAsyncSocket {.borrow: `.`.} = distinct TSocketImpl
  PAsyncSocket* = ref TAsyncSocket

# TODO: Save AF, domain etc info and reuse it in procs which need it like connect.

proc newSocket(fd: TAsyncFD, isBuff: bool): PAsyncSocket =
  assert fd != osInvalidSocket.TAsyncFD
  new(result.PSocket)
  result.fd = fd.TSocketHandle
  result.isBuffered = isBuff
  if isBuff:
    result.currPos = 0

proc newAsyncSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM,
    protocol: TProtocol = IPPROTO_TCP, buffered = true): PAsyncSocket =
  ## Creates a new asynchronous socket.
  result = newSocket(newAsyncRawSocket(domain, typ, protocol), buffered)

proc connect*(socket: PAsyncSocket, address: string, port: TPort,
    af = AF_INET): PFuture[void] =
  ## Connects ``socket`` to server at ``address:port``.
  ##
  ## Returns a ``PFuture`` which will complete when the connection succeeds
  ## or an error occurs.
  result = connect(socket.fd.TAsyncFD, address, port, af)

proc readIntoBuf(socket: PAsyncSocket, flags: int): PFuture[int] {.async.} =
  var data = await recv(socket.fd.TAsyncFD, BufferSize, flags)
  if data.len != 0:
    copyMem(addr socket.buffer[0], addr data[0], data.len)
  socket.bufLen = data.len
  socket.currPos = 0
  result = data.len

proc recv*(socket: PAsyncSocket, size: int,
           flags: int = 0): PFuture[string] {.async.} =
  ## Reads ``size`` bytes from ``socket``. Returned future will complete once
  ## all of the requested data is read. If socket is disconnected during the
  ## recv operation then the future may complete with only a part of the
  ## requested data read. If socket is disconnected and no data is available
  ## to be read then the future will complete with a value of ``""``.
  if socket.isBuffered:
    result = newString(size)

    template returnNow(readBytes: int) =
      result.setLen(readBytes)
      # Only increase buffer position when not peeking.
      if (flags and MSG_PEEK) != MSG_PEEK:
        socket.currPos.inc(readBytes)
      return

    if socket.bufLen == 0:
      let res = await socket.readIntoBuf(flags and (not MSG_PEEK))
      if res == 0: returnNow(0)

    var read = 0
    while read < size:
      if socket.currPos >= socket.bufLen:
        let res = await socket.readIntoBuf(flags and (not MSG_PEEK))
        if res == 0: returnNow(read)

      let chunk = min(socket.bufLen-socket.currPos, size-read)
      copyMem(addr(result[read]), addr(socket.buffer[socket.currPos+read]), chunk)
      read.inc(chunk)

    returnNow(read)
  else:
    result = await recv(socket.fd.TAsyncFD, size, flags)

proc send*(socket: PAsyncSocket, data: string): PFuture[void] =
  ## Sends ``data`` to ``socket``. The returned future will complete once all
  ## data has been sent.
  assert socket != nil
  result = send(socket.fd.TAsyncFD, data)

proc acceptAddr*(socket: PAsyncSocket): 
      PFuture[tuple[address: string, client: PAsyncSocket]] =
  ## Accepts a new connection. Returns a future containing the client socket
  ## corresponding to that connection and the remote address of the client.
  ## The future will complete when the connection is successfully accepted.
  var retFuture = newFuture[tuple[address: string, client: PAsyncSocket]]()
  var fut = acceptAddr(socket.fd.TAsyncFD)
  fut.callback =
    proc (future: PFuture[tuple[address: string, client: TAsyncFD]]) =
      assert future.finished
      if future.failed:
        retFuture.fail(future.readError)
      else:
        let resultTup = (future.read.address,
                         newSocket(future.read.client, socket.isBuffered))
        retFuture.complete(resultTup)
  return retFuture

proc accept*(socket: PAsyncSocket): PFuture[PAsyncSocket] =
  ## Accepts a new connection. Returns a future containing the client socket
  ## corresponding to that connection.
  ## The future will complete when the connection is successfully accepted.
  var retFut = newFuture[PAsyncSocket]()
  var fut = acceptAddr(socket)
  fut.callback =
    proc (future: PFuture[tuple[address: string, client: PAsyncSocket]]) =
      assert future.finished
      if future.failed:
        retFut.fail(future.readError)
      else:
        retFut.complete(future.read.client)
  return retFut

proc recvLine*(socket: PAsyncSocket): PFuture[string] {.async.} =
  ## Reads a line of data from ``socket``. Returned future will complete once
  ## a full line is read or an error occurs.
  ##
  ## If a full line is read ``\r\L`` is not
  ## added to ``line``, however if solely ``\r\L`` is read then ``line``
  ## will be set to it.
  ## 
  ## If the socket is disconnected, ``line`` will be set to ``""``.
  ##
  ## If the socket is disconnected in the middle of a line (before ``\r\L``
  ## is read) then line will be set to ``""``.
  ## The partial line **will be lost**.
  
  template addNLIfEmpty(): stmt =
    if result.len == 0:
      result.add("\c\L")

  result = ""
  var c = ""
  while true:
    c = await recv(socket, 1)
    if c.len == 0:
      return ""
    if c == "\r":
      c = await recv(socket, 1, MSG_PEEK)
      if c.len > 0 and c == "\L":
        let dummy = await recv(socket, 1)
        assert dummy == "\L"
      addNLIfEmpty()
      return
    elif c == "\L":
      addNLIfEmpty()
      return
    add(result.string, c)

proc bindAddr*(socket: PAsyncSocket, port = TPort(0), address = "") =
  ## Binds ``address``:``port`` to the socket.
  ##
  ## If ``address`` is "" then ADDR_ANY will be bound.
  socket.PSocket.bindAddr(port, address)

proc listen*(socket: PAsyncSocket, backlog = SOMAXCONN) =
  ## Marks ``socket`` as accepting connections.
  ## ``Backlog`` specifies the maximum length of the
  ## queue of pending connections.
  ##
  ## Raises an EOS error upon failure.
  socket.PSocket.listen(backlog)

proc close*(socket: PAsyncSocket) =
  ## Closes the socket.
  socket.fd.TAsyncFD.close()
  # TODO SSL

when isMainModule:
  type
    TestCases = enum
      HighClient, LowClient, LowServer

  const test = HighClient

  when test == HighClient:
    proc main() {.async.} =
      var sock = newAsyncSocket()
      await sock.connect("irc.freenode.net", TPort(6667))
      while true:
        let line = await sock.recvLine()
        if line == "":
          echo("Disconnected")
          break
        else:
          echo("Got line: ", line)
    main()
  elif test == LowClient:
    var sock = newAsyncSocket()
    var f = connect(sock, "irc.freenode.net", TPort(6667))
    f.callback =
      proc (future: PFuture[void]) =
        echo("Connected in future!")
        for i in 0 .. 50:
          var recvF = recv(sock, 10)
          recvF.callback =
            proc (future: PFuture[string]) =
              echo("Read ", future.read.len, ": ", future.read.repr)
  elif test == LowServer:
    var sock = newAsyncSocket()
    sock.bindAddr(TPort(6667))
    sock.listen()
    proc onAccept(future: PFuture[PAsyncSocket]) =
      let client = future.read
      echo "Accepted ", client.fd.cint
      var t = send(client, "test\c\L")
      t.callback =
        proc (future: PFuture[void]) =
          echo("Send")
          client.close()
      
      var f = accept(sock)
      f.callback = onAccept
      
    var f = accept(sock)
    f.callback = onAccept
  runForever()