summary refs log blame commit diff stats
path: root/lib/pure/asyncnet.nim
blob: 93334d0e2dec419c070d262b6d9561a5fcac9cdd (plain) (tree)
1
2
3
4
5
6
7
8

 
                                  
                                           



                                                   



                                                                            
                         
                         



































                                                                              
      
      







                                                                               
           

              
              
  

                                                           
                     


                                   
                                                
  
                                                        

                                           
                                



                                      
                    
                                    
                                            
                                   





                                           
                                         
  
                       

                 
 

                                            
 

             

                                                  


                                                 


                
                                    
                                                                          
                          
                    
                                                                      

                                                                   

                                               





                            


                      

                                    

                                                           
                                                                            
                                                                





                                                                            
                                      
             
                             
                                    
                              


                            
              

                      
                                                                                
                                                                            
                                       


                                                                    



                                                                   
 











                                                         
                                                      
                                          
                                       


                                                                    



                                                                 
                                                       
 
               

                                                     
                                             













                                                                                        
                                         

                                           
                               


                                                          
                       
                       
                                               
 
                                                              
                                                                  

                                                                              



                                             
                                                                 

                            
                                                                   




                                                                             
                      


                                          
                                                               
                                 

                                     
                                   


                                                                

                                            
                                             


                                                                        

                                                              




                                                     
                             



                                                         
                                                               
                                                                   








                                                                              
 
                                                                           

                                                       
                                                                          
                        
                                                                
                  
                   




                                                                        
                                          

                                                              
 
                                                               
                                                

                                                                           
                                                               
             
                  
                   
                 
                            
                                                                 
                    
       
                                                                                 
                     
                      

                            
 
                                         

                                                                       
                    

                      
 
                                                            
                                                                            













































                                                                                
                                          
                                                                               









                                                                                
                                                                        


                                                       
                                                                     

                            
                   
                                       

                          
                                                             


                        



                                         
                                    
                                                                 
               
                                                               
                    
               

                                                              
                                                                             
                     
                               
 
                                


                                           
       
                            
                                                            
                       
 
                                                        



                                                                                                
                                                               


                            
                                                                        

                                             
                                                   
 

                                                        

                                                                             
                      
                  
                   




                                                                
                                              
 
                                                                        
                                                                  


                                                                            
                                                                                               
                                                
                
                                                                    




                                             

                                                                              


                                     
                                 
                                                                   


                                                                            
                                                        
                                     
                
                                                                        






                                           
                                                                     
                                                                            
                                                             



                                                                        
    




                                                                           
    
                                                                            
                                                                  
    
                                                           
    

                                                                         
                                             
                                                   
 



                                                                               
                                    

                                
 
                       
                          
                                         
                  
                            




                                         
                                           
                    

                                  
                







                                        
                            



                              
                              

                
                                                          
                          

                                                   
                                              
       

               
                                      
                    

                                
              
                   
                                                  
                        
                      
                            


                      
                            
              
                          

                                                   
                                              
                      
 
                                   
                                     
                                                                 





                                                                             
    




                                                                           
    
                                                                            
                                                          
    
                                                           
    

                                                                              
                                             
 

                                                           
                       
                                                        
                           
 

                                                                



                                                    
                                          

                                                                    
                                                                    



                                                      

                        
                      
                                
                                    
         

                                                                             
 
                                                         

                                                                            
                               
                      
 

                    
                                                                             




                                                                     
 









                                                                                  
 

                                                                      
                                                                           









                                                                      
                                                      





                                                                     
                                                                       



                                   
                                                                             


                                                                     
 




                                                                     
                                  
                       
        
                                   
                 


                                              
                  
               
                    
                       
                                                                   
 
               







                                                                           
                                                         
                               
                     
 

                                     

                                                            
                                                                 
                                                        
                                                    

                                                                            

                                                                               





                                                                           

                           
                  
                         
                                                        


                                                                        



                                          










                                                                                      










                                                                               
 







                                                   



                                                   

                                                                            









                                                                              

                                                                 
 

                                                                         






                                
                                                                             
















                                                  
                         
         
                                                                         
 


                                                                             
                                        



                                                                                



                                                                          
                                                  

                                     


                                                                             
    
                    
    

                                                                  
 



                                               
 


                                                                               
 
                            
  








                                                                   
 
                                           



                                      
                         



                                 
                                                        






                                        
                     
                         
                               
                                                         
                
                                   



                                    
                                           


                                                                    
                             
                 
                                                



                                      
                                     

                        
 

                           
 

                         
              
#
#
#            Nim's Runtime Library
#        (c) Copyright 2017 Dominik Picheta
#
#    See the file "copying.txt", included in this
#    distribution, for details about the copyright.
#

## This module implements a high-level asynchronous sockets API based on the
## asynchronous dispatcher defined in the ``asyncdispatch`` module.
##
## Asynchronous IO in Nim
## ======================
##
## Async IO in Nim consists of multiple layers (from highest to lowest):
##
## * ``asyncnet`` module
##
## * Async await
##
## * ``asyncdispatch`` module (event loop)
##
## * ``selectors`` module
##
## Each builds on top of the layers below it. The selectors module is an
## abstraction for the various system ``select()`` mechanisms such as epoll or
## kqueue. If you wish you can use it directly, and some people have done so
## `successfully <http://goran.krampe.se/2014/10/25/nim-socketserver/>`_.
## But you must be aware that on Windows it only supports
## ``select()``.
##
## The async dispatcher implements the proactor pattern and also has an
## implementation of IOCP. It implements the proactor pattern for other
## OS' via the selectors module. Futures are also implemented here, and
## indeed all the procedures return a future.
##
## The final layer is the async await transformation. This allows you to
## write asynchronous code in a synchronous style and works similar to
## C#'s await. The transformation works by converting any async procedures
## into an iterator.
##
## This is all single threaded, fully non-blocking and does give you a
## lot of control. In theory you should be able to work with any of these
## layers interchangeably (as long as you only care about non-Windows
## platforms).
##
## For most applications using ``asyncnet`` is the way to go as it builds
## over all the layers, providing some extra features such as buffering.
##
## SSL
## ===
##
## SSL can be enabled by compiling with the ``-d:ssl`` flag.
##
## You must create a new SSL context with the ``newContext`` function defined
## in the ``net`` module. You may then call ``wrapSocket`` on your socket using
## the newly created SSL context to get an SSL socket.
##
## Examples
## ========
##
## Chat server
## -----------
##
## The following example demonstrates a simple chat server.
##
## .. code-block::nim
##
##   import asyncnet, asyncdispatch
##
##   var clients {.threadvar.}: seq[AsyncSocket]
##
##   proc processClient(client: AsyncSocket) {.async.} =
##     while true:
##       let line = await client.recvLine()
##       if line.len == 0: break
##       for c in clients:
##         await c.send(line & "\c\L")
##
##   proc serve() {.async.} =
##     clients = @[]
##     var server = newAsyncSocket()
##     server.setSockOpt(OptReuseAddr, true)
##     server.bindAddr(Port(12345))
##     server.listen()
##
##     while true:
##       let client = await server.accept()
##       clients.add client
##
##       asyncCheck processClient(client)
##
##   asyncCheck serve()
##   runForever()
##

import std/private/since
import asyncdispatch, nativesockets, net, os

export SOBool

# TODO: Remove duplication introduced by PR #4683.

const defineSsl = defined(ssl) or defined(nimdoc)

when defineSsl:
  import openssl

type
  # TODO: I would prefer to just do:
  # AsyncSocket* {.borrow: `.`.} = distinct Socket. But that doesn't work.
  AsyncSocketDesc = object
    fd: SocketHandle
    closed: bool     ## determines whether this socket has been closed
    isBuffered: bool ## determines whether this socket is buffered.
    buffer: array[0..BufferSize, char]
    currPos: int     # current index in buffer
    bufLen: int      # current length of buffer
    isSsl: bool
    when defineSsl:
      sslHandle: SslPtr
      sslContext: SslContext
      bioIn: BIO
      bioOut: BIO
    domain: Domain
    sockType: SockType
    protocol: Protocol
  AsyncSocket* = ref AsyncSocketDesc

proc newAsyncSocket*(fd: AsyncFD, domain: Domain = AF_INET,
    sockType: SockType = SOCK_STREAM,
    protocol: Protocol = IPPROTO_TCP, buffered = true): owned(AsyncSocket) =
  ## Creates a new ``AsyncSocket`` based on the supplied params.
  ##
  ## The supplied ``fd``'s non-blocking state will be enabled implicitly.
  ##
  ## **Note**: This procedure will **NOT** register ``fd`` with the global
  ## async dispatcher. You need to do this manually. If you have used
  ## ``newAsyncNativeSocket`` to create ``fd`` then it's already registered.
  assert fd != osInvalidSocket.AsyncFD
  new(result)
  result.fd = fd.SocketHandle
  fd.SocketHandle.setBlocking(false)
  result.isBuffered = buffered
  result.domain = domain
  result.sockType = sockType
  result.protocol = protocol
  if buffered:
    result.currPos = 0

proc newAsyncSocket*(domain: Domain = AF_INET, sockType: SockType = SOCK_STREAM,
    protocol: Protocol = IPPROTO_TCP, buffered = true): owned(AsyncSocket) =
  ## Creates a new asynchronous socket.
  ##
  ## This procedure will also create a brand new file descriptor for
  ## this socket.
  let fd = createAsyncNativeSocket(domain, sockType, protocol)
  if fd.SocketHandle == osInvalidSocket:
    raiseOSError(osLastError())
  result = newAsyncSocket(fd, domain, sockType, protocol, buffered)

proc getLocalAddr*(socket: AsyncSocket): (string, Port) =
  ## Get the socket's local address and port number.
  ##
  ## This is high-level interface for `getsockname`:idx:.
  getLocalAddr(socket.fd, socket.domain)

proc getPeerAddr*(socket: AsyncSocket): (string, Port) =
  ## Get the socket's peer address and port number.
  ##
  ## This is high-level interface for `getpeername`:idx:.
  getPeerAddr(socket.fd, socket.domain)

proc newAsyncSocket*(domain, sockType, protocol: cint,
    buffered = true): owned(AsyncSocket) =
  ## Creates a new asynchronous socket.
  ##
  ## This procedure will also create a brand new file descriptor for
  ## this socket.
  let fd = createAsyncNativeSocket(domain, sockType, protocol)
  if fd.SocketHandle == osInvalidSocket:
    raiseOSError(osLastError())
  result = newAsyncSocket(fd, Domain(domain), SockType(sockType),
                          Protocol(protocol), buffered)

when defineSsl:
  proc getSslError(handle: SslPtr, err: cint): cint =
    assert err < 0
    var ret = SSL_get_error(handle, err.cint)
    case ret
    of SSL_ERROR_ZERO_RETURN:
      raiseSSLError("TLS/SSL connection failed to initiate, socket closed prematurely.")
    of SSL_ERROR_WANT_CONNECT, SSL_ERROR_WANT_ACCEPT:
      return ret
    of SSL_ERROR_WANT_WRITE, SSL_ERROR_WANT_READ:
      return ret
    of SSL_ERROR_WANT_X509_LOOKUP:
      raiseSSLError("Function for x509 lookup has been called.")
    of SSL_ERROR_SYSCALL, SSL_ERROR_SSL:
      raiseSSLError()
    else: raiseSSLError("Unknown Error")

  proc sendPendingSslData(socket: AsyncSocket,
      flags: set[SocketFlag]) {.async.} =
    let len = bioCtrlPending(socket.bioOut)
    if len > 0:
      var data = newString(len)
      let read = bioRead(socket.bioOut, addr data[0], len)
      assert read != 0
      if read < 0:
        raiseSSLError()
      data.setLen(read)
      await socket.fd.AsyncFD.send(data, flags)

  proc appeaseSsl(socket: AsyncSocket, flags: set[SocketFlag],
                  sslError: cint): owned(Future[bool]) {.async.} =
    ## Returns ``true`` if ``socket`` is still connected, otherwise ``false``.
    result = true
    case sslError
    of SSL_ERROR_WANT_WRITE:
      await sendPendingSslData(socket, flags)
    of SSL_ERROR_WANT_READ:
      var data = await recv(socket.fd.AsyncFD, BufferSize, flags)
      let length = len(data)
      if length > 0:
        let ret = bioWrite(socket.bioIn, addr data[0], length.cint)
        if ret < 0:
          raiseSSLError()
      elif length == 0:
        # connection not properly closed by remote side or connection dropped
        SSL_set_shutdown(socket.sslHandle, SSL_RECEIVED_SHUTDOWN)
        result = false
    else:
      raiseSSLError("Cannot appease SSL.")

  template sslLoop(socket: AsyncSocket, flags: set[SocketFlag],
                   op: untyped) =
    var opResult {.inject.} = -1.cint
    while opResult < 0:
      # Call the desired operation.
      opResult = op
      # Bit hackish here.
      # TODO: Introduce an async template transformation pragma?

      # Send any remaining pending SSL data.
      yield sendPendingSslData(socket, flags)

      # If the operation failed, try to see if SSL has some data to read
      # or write.
      if opResult < 0:
        let err = getSslError(socket.sslHandle, opResult.cint)
        let fut = appeaseSsl(socket, flags, err.cint)
        yield fut
        if not fut.read():
          # Socket disconnected.
          if SocketFlag.SafeDisconn in flags:
            opResult = 0.cint
            break
          else:
            raiseSSLError("Socket has been disconnected")

proc dial*(address: string, port: Port, protocol = IPPROTO_TCP,
           buffered = true): owned(Future[AsyncSocket]) {.async.} =
  ## Establishes connection to the specified ``address``:``port`` pair via the
  ## specified protocol. The procedure iterates through possible
  ## resolutions of the ``address`` until it succeeds, meaning that it
  ## seamlessly works with both IPv4 and IPv6.
  ## Returns AsyncSocket ready to send or receive data.
  let asyncFd = await asyncdispatch.dial(address, port, protocol)
  let sockType = protocol.toSockType()
  let domain = getSockDomain(asyncFd.SocketHandle)
  result = newAsyncSocket(asyncFd, domain, sockType, protocol, buffered)

proc connect*(socket: AsyncSocket, address: string, port: Port) {.async.} =
  ## Connects ``socket`` to server at ``address:port``.
  ##
  ## Returns a ``Future`` which will complete when the connection succeeds
  ## or an error occurs.
  await connect(socket.fd.AsyncFD, address, port, socket.domain)
  if socket.isSsl:
    when defineSsl:
      if not isIpAddress(address):
        # Set the SNI address for this connection. This call can fail if
        # we're not using TLSv1+.
        discard SSL_set_tlsext_host_name(socket.sslHandle, address)

      let flags = {SocketFlag.SafeDisconn}
      sslSetConnectState(socket.sslHandle)
      sslLoop(socket, flags, sslDoHandshake(socket.sslHandle))

template readInto(buf: pointer, size: int, socket: AsyncSocket,
                  flags: set[SocketFlag]): int =
  ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``. Note that
  ## this is a template and not a proc.
  assert(not socket.closed, "Cannot `recv` on a closed socket")
  var res = 0
  if socket.isSsl:
    when defineSsl:
      # SSL mode.
      sslLoop(socket, flags,
        sslRead(socket.sslHandle, cast[cstring](buf), size.cint))
      res = opResult
  else:
    var recvIntoFut = asyncdispatch.recvInto(socket.fd.AsyncFD, buf, size, flags)
    yield recvIntoFut
    # Not in SSL mode.
    res = recvIntoFut.read()
  res

template readIntoBuf(socket: AsyncSocket,
    flags: set[SocketFlag]): int =
  var size = readInto(addr socket.buffer[0], BufferSize, socket, flags)
  socket.currPos = 0
  socket.bufLen = size
  size

proc recvInto*(socket: AsyncSocket, buf: pointer, size: int,
           flags = {SocketFlag.SafeDisconn}): owned(Future[int]) {.async.} =
  ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``.
  ##
  ## For buffered sockets this function will attempt to read all the requested
  ## data. It will read this data in ``BufferSize`` chunks.
  ##
  ## For unbuffered sockets this function makes no effort to read
  ## all the data requested. It will return as much data as the operating system
  ## gives it.
  ##
  ## If socket is disconnected during the
  ## recv operation then the future may complete with only a part of the
  ## requested data.
  ##
  ## If socket is disconnected and no data is available
  ## to be read then the future will complete with a value of ``0``.
  if socket.isBuffered:
    let originalBufPos = socket.currPos

    if socket.bufLen == 0:
      let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
      if res == 0:
        return 0

    var read = 0
    var cbuf = cast[cstring](buf)
    while read < size:
      if socket.currPos >= socket.bufLen:
        if SocketFlag.Peek in flags:
          # We don't want to get another buffer if we're peeking.
          break
        let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
        if res == 0:
          break

      let chunk = min(socket.bufLen-socket.currPos, size-read)
      copyMem(addr(cbuf[read]), addr(socket.buffer[socket.currPos]), chunk)
      read.inc(chunk)
      socket.currPos.inc(chunk)

    if SocketFlag.Peek in flags:
      # Restore old buffer cursor position.
      socket.currPos = originalBufPos
    result = read
  else:
    result = readInto(buf, size, socket, flags)

proc recv*(socket: AsyncSocket, size: int,
           flags = {SocketFlag.SafeDisconn}): owned(Future[string]) {.async.} =
  ## Reads **up to** ``size`` bytes from ``socket``.
  ##
  ## For buffered sockets this function will attempt to read all the requested
  ## data. It will read this data in ``BufferSize`` chunks.
  ##
  ## For unbuffered sockets this function makes no effort to read
  ## all the data requested. It will return as much data as the operating system
  ## gives it.
  ##
  ## If socket is disconnected during the
  ## recv operation then the future may complete with only a part of the
  ## requested data.
  ##
  ## If socket is disconnected and no data is available
  ## to be read then the future will complete with a value of ``""``.
  if socket.isBuffered:
    result = newString(size)
    shallow(result)
    let originalBufPos = socket.currPos

    if socket.bufLen == 0:
      let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
      if res == 0:
        result.setLen(0)
        return

    var read = 0
    while read < size:
      if socket.currPos >= socket.bufLen:
        if SocketFlag.Peek in flags:
          # We don't want to get another buffer if we're peeking.
          break
        let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
        if res == 0:
          break

      let chunk = min(socket.bufLen-socket.currPos, size-read)
      copyMem(addr(result[read]), addr(socket.buffer[socket.currPos]), chunk)
      read.inc(chunk)
      socket.currPos.inc(chunk)

    if SocketFlag.Peek in flags:
      # Restore old buffer cursor position.
      socket.currPos = originalBufPos
    result.setLen(read)
  else:
    result = newString(size)
    let read = readInto(addr result[0], size, socket, flags)
    result.setLen(read)

proc send*(socket: AsyncSocket, buf: pointer, size: int,
            flags = {SocketFlag.SafeDisconn}) {.async.} =
  ## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future will complete once all
  ## data has been sent.
  assert socket != nil
  assert(not socket.closed, "Cannot `send` on a closed socket")
  if socket.isSsl:
    when defineSsl:
      sslLoop(socket, flags,
              sslWrite(socket.sslHandle, cast[cstring](buf), size.cint))
      await sendPendingSslData(socket, flags)
  else:
    await send(socket.fd.AsyncFD, buf, size, flags)

proc send*(socket: AsyncSocket, data: string,
           flags = {SocketFlag.SafeDisconn}) {.async.} =
  ## Sends ``data`` to ``socket``. The returned future will complete once all
  ## data has been sent.
  assert socket != nil
  if socket.isSsl:
    when defineSsl:
      var copy = data
      sslLoop(socket, flags,
        sslWrite(socket.sslHandle, addr copy[0], copy.len.cint))
      await sendPendingSslData(socket, flags)
  else:
    await send(socket.fd.AsyncFD, data, flags)

proc acceptAddr*(socket: AsyncSocket, flags = {SocketFlag.SafeDisconn}):
      owned(Future[tuple[address: string, client: AsyncSocket]]) =
  ## Accepts a new connection. Returns a future containing the client socket
  ## corresponding to that connection and the remote address of the client.
  ## The future will complete when the connection is successfully accepted.
  var retFuture = newFuture[tuple[address: string, client: AsyncSocket]]("asyncnet.acceptAddr")
  var fut = acceptAddr(socket.fd.AsyncFD, flags)
  fut.callback =
    proc (future: Future[tuple[address: string, client: AsyncFD]]) =
      assert future.finished
      if future.failed:
        retFuture.fail(future.readError)
      else:
        let resultTup = (future.read.address,
                         newAsyncSocket(future.read.client, socket.domain,
                         socket.sockType, socket.protocol, socket.isBuffered))
        retFuture.complete(resultTup)
  return retFuture

proc accept*(socket: AsyncSocket,
    flags = {SocketFlag.SafeDisconn}): owned(Future[AsyncSocket]) =
  ## Accepts a new connection. Returns a future containing the client socket
  ## corresponding to that connection.
  ## The future will complete when the connection is successfully accepted.
  var retFut = newFuture[AsyncSocket]("asyncnet.accept")
  var fut = acceptAddr(socket, flags)
  fut.callback =
    proc (future: Future[tuple[address: string, client: AsyncSocket]]) =
      assert future.finished
      if future.failed:
        retFut.fail(future.readError)
      else:
        retFut.complete(future.read.client)
  return retFut

proc recvLineInto*(socket: AsyncSocket, resString: FutureVar[string],
    flags = {SocketFlag.SafeDisconn}, maxLength = MaxLineLength) {.async.} =
  ## Reads a line of data from ``socket`` into ``resString``.
  ##
  ## If a full line is read ``\r\L`` is not
  ## added to ``line``, however if solely ``\r\L`` is read then ``line``
  ## will be set to it.
  ##
  ## If the socket is disconnected, ``line`` will be set to ``""``.
  ##
  ## If the socket is disconnected in the middle of a line (before ``\r\L``
  ## is read) then line will be set to ``""``.
  ## The partial line **will be lost**.
  ##
  ## The ``maxLength`` parameter determines the maximum amount of characters
  ## that can be read. ``resString`` will be truncated after that.
  ##
  ## **Warning**: The ``Peek`` flag is not yet implemented.
  ##
  ## **Warning**: ``recvLineInto`` on unbuffered sockets assumes that the
  ## protocol uses ``\r\L`` to delimit a new line.
  assert SocketFlag.Peek notin flags ## TODO:
  result = newFuture[void]("asyncnet.recvLineInto")

  # TODO: Make the async transformation check for FutureVar params and complete
  # them when the result future is completed.
  # Can we replace the result future with the FutureVar?

  template addNLIfEmpty(): untyped =
    if resString.mget.len == 0:
      resString.mget.add("\c\L")

  if socket.isBuffered:
    if socket.bufLen == 0:
      let res = socket.readIntoBuf(flags)
      if res == 0:
        resString.complete()
        return

    var lastR = false
    while true:
      if socket.currPos >= socket.bufLen:
        let res = socket.readIntoBuf(flags)
        if res == 0:
          resString.mget.setLen(0)
          resString.complete()
          return

      case socket.buffer[socket.currPos]
      of '\r':
        lastR = true
        addNLIfEmpty()
      of '\L':
        addNLIfEmpty()
        socket.currPos.inc()
        resString.complete()
        return
      else:
        if lastR:
          socket.currPos.inc()
          resString.complete()
          return
        else:
          resString.mget.add socket.buffer[socket.currPos]
      socket.currPos.inc()

      # Verify that this isn't a DOS attack: #3847.
      if resString.mget.len > maxLength: break
  else:
    var c = ""
    while true:
      c = await recv(socket, 1, flags)
      if c.len == 0:
        resString.mget.setLen(0)
        resString.complete()
        return
      if c == "\r":
        c = await recv(socket, 1, flags) # Skip \L
        assert c == "\L"
        addNLIfEmpty()
        resString.complete()
        return
      elif c == "\L":
        addNLIfEmpty()
        resString.complete()
        return
      resString.mget.add c

      # Verify that this isn't a DOS attack: #3847.
      if resString.mget.len > maxLength: break
  resString.complete()

proc recvLine*(socket: AsyncSocket,
    flags = {SocketFlag.SafeDisconn},
    maxLength = MaxLineLength): owned(Future[string]) {.async.} =
  ## Reads a line of data from ``socket``. Returned future will complete once
  ## a full line is read or an error occurs.
  ##
  ## If a full line is read ``\r\L`` is not
  ## added to ``line``, however if solely ``\r\L`` is read then ``line``
  ## will be set to it.
  ##
  ## If the socket is disconnected, ``line`` will be set to ``""``.
  ##
  ## If the socket is disconnected in the middle of a line (before ``\r\L``
  ## is read) then line will be set to ``""``.
  ## The partial line **will be lost**.
  ##
  ## The ``maxLength`` parameter determines the maximum amount of characters
  ## that can be read. The result is truncated after that.
  ##
  ## **Warning**: The ``Peek`` flag is not yet implemented.
  ##
  ## **Warning**: ``recvLine`` on unbuffered sockets assumes that the protocol
  ## uses ``\r\L`` to delimit a new line.
  assert SocketFlag.Peek notin flags ## TODO:

  # TODO: Optimise this
  var resString = newFutureVar[string]("asyncnet.recvLine")
  resString.mget() = ""
  await socket.recvLineInto(resString, flags, maxLength)
  result = resString.mget()

proc listen*(socket: AsyncSocket, backlog = SOMAXCONN) {.tags: [
    ReadIOEffect].} =
  ## Marks ``socket`` as accepting connections.
  ## ``Backlog`` specifies the maximum length of the
  ## queue of pending connections.
  ##
  ## Raises an OSError error upon failure.
  if listen(socket.fd, backlog) < 0'i32: raiseOSError(osLastError())

proc bindAddr*(socket: AsyncSocket, port = Port(0), address = "") {.
  tags: [ReadIOEffect].} =
  ## Binds ``address``:``port`` to the socket.
  ##
  ## If ``address`` is "" then ADDR_ANY will be bound.
  var realaddr = address
  if realaddr == "":
    case socket.domain
    of AF_INET6: realaddr = "::"
    of AF_INET: realaddr = "0.0.0.0"
    else:
      raise newException(ValueError,
        "Unknown socket address family and no address specified to bindAddr")

  var aiList = getAddrInfo(realaddr, port, socket.domain)
  if bindAddr(socket.fd, aiList.ai_addr, aiList.ai_addrlen.SockLen) < 0'i32:
    freeaddrinfo(aiList)
    raiseOSError(osLastError())
  freeaddrinfo(aiList)

when defined(posix):

  proc connectUnix*(socket: AsyncSocket, path: string): owned(Future[void]) =
    ## Binds Unix socket to `path`.
    ## This only works on Unix-style systems: Mac OS X, BSD and Linux
    when not defined(nimdoc):
      let retFuture = newFuture[void]("connectUnix")
      result = retFuture

      proc cb(fd: AsyncFD): bool =
        let ret = SocketHandle(fd).getSockOptInt(cint(SOL_SOCKET), cint(SO_ERROR))
        if ret == 0:
          retFuture.complete()
          return true
        elif ret == EINTR:
          return false
        else:
          retFuture.fail(newException(OSError, osErrorMsg(OSErrorCode(ret))))
          return true

      var socketAddr = makeUnixAddr(path)
      let ret = socket.fd.connect(cast[ptr SockAddr](addr socketAddr),
                        (sizeof(socketAddr.sun_family) + path.len).SockLen)
      if ret == 0:
        # Request to connect completed immediately.
        retFuture.complete()
      else:
        let lastError = osLastError()
        if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS:
          addWrite(AsyncFD(socket.fd), cb)
        else:
          retFuture.fail(newException(OSError, osErrorMsg(lastError)))

  proc bindUnix*(socket: AsyncSocket, path: string) {.
    tags: [ReadIOEffect].} =
    ## Binds Unix socket to `path`.
    ## This only works on Unix-style systems: Mac OS X, BSD and Linux
    when not defined(nimdoc):
      var socketAddr = makeUnixAddr(path)
      if socket.fd.bindAddr(cast[ptr SockAddr](addr socketAddr),
          (sizeof(socketAddr.sun_family) + path.len).SockLen) != 0'i32:
        raiseOSError(osLastError())

elif defined(nimdoc):

  proc connectUnix*(socket: AsyncSocket, path: string): owned(Future[void]) =
    ## Binds Unix socket to `path`.
    ## This only works on Unix-style systems: Mac OS X, BSD and Linux
    discard

  proc bindUnix*(socket: AsyncSocket, path: string) =
    ## Binds Unix socket to `path`.
    ## This only works on Unix-style systems: Mac OS X, BSD and Linux
    discard

proc close*(socket: AsyncSocket) =
  ## Closes the socket.
  defer:
    socket.fd.AsyncFD.closeSocket()
  when defineSsl:
    if socket.isSsl:
      let res = SSL_shutdown(socket.sslHandle)
      SSL_free(socket.sslHandle)
      if res == 0:
        discard
      elif res != 1:
        raiseSSLError()
  socket.closed = true # TODO: Add extra debugging checks for this.

when defineSsl:
  proc wrapSocket*(ctx: SslContext, socket: AsyncSocket) =
    ## Wraps a socket in an SSL context. This function effectively turns
    ## ``socket`` into an SSL socket.
    ##
    ## **Disclaimer**: This code is not well tested, may be very unsafe and
    ## prone to security vulnerabilities.
    socket.isSsl = true
    socket.sslContext = ctx
    socket.sslHandle = SSL_new(socket.sslContext.context)
    if socket.sslHandle == nil:
      raiseSSLError()

    socket.bioIn = bioNew(bioSMem())
    socket.bioOut = bioNew(bioSMem())
    sslSetBio(socket.sslHandle, socket.bioIn, socket.bioOut)

  proc wrapConnectedSocket*(ctx: SslContext, socket: AsyncSocket,
                            handshake: SslHandshakeType,
                            hostname: string = "") =
    ## Wraps a connected socket in an SSL context. This function effectively
    ## turns ``socket`` into an SSL socket.
    ## ``hostname`` should be specified so that the client knows which hostname
    ## the server certificate should be validated against.
    ##
    ## This should be called on a connected socket, and will perform
    ## an SSL handshake immediately.
    ##
    ## **Disclaimer**: This code is not well tested, may be very unsafe and
    ## prone to security vulnerabilities.
    wrapSocket(ctx, socket)

    case handshake
    of handshakeAsClient:
      if hostname.len > 0 and not isIpAddress(hostname):
        # Set the SNI address for this connection. This call can fail if
        # we're not using TLSv1+.
        discard SSL_set_tlsext_host_name(socket.sslHandle, hostname)
      sslSetConnectState(socket.sslHandle)
    of handshakeAsServer:
      sslSetAcceptState(socket.sslHandle)

  proc getPeerCertificates*(socket: AsyncSocket): seq[Certificate] {.since: (1, 1).} =
    ## Returns the certificate chain received by the peer we are connected to
    ## through the given socket.
    ## The handshake must have been completed and the certificate chain must
    ## have been verified successfully or else an empty sequence is returned.
    ## The chain is ordered from leaf certificate to root certificate.
    if not socket.isSsl:
      result = newSeq[Certificate]()
    else:
      result = getPeerCertificates(socket.sslHandle)

proc getSockOpt*(socket: AsyncSocket, opt: SOBool, level = SOL_SOCKET): bool {.
  tags: [ReadIOEffect].} =
  ## Retrieves option ``opt`` as a boolean value.
  var res = getSockOptInt(socket.fd, cint(level), toCInt(opt))
  result = res != 0

proc setSockOpt*(socket: AsyncSocket, opt: SOBool, value: bool,
    level = SOL_SOCKET) {.tags: [WriteIOEffect].} =
  ## Sets option ``opt`` to a boolean value specified by ``value``.
  var valuei = cint(if value: 1 else: 0)
  setSockOptInt(socket.fd, cint(level), toCInt(opt), valuei)

proc isSsl*(socket: AsyncSocket): bool =
  ## Determines whether ``socket`` is a SSL socket.
  socket.isSsl

proc getFd*(socket: AsyncSocket): SocketHandle =
  ## Returns the socket's file descriptor.
  return socket.fd

proc isClosed*(socket: AsyncSocket): bool =
  ## Determines whether the socket has been closed.
  return socket.closed

proc sendTo*(socket: AsyncSocket, address: string, port: Port, data: string,
             flags = {SocketFlag.SafeDisconn}): owned(Future[void])
            {.async, since: (1, 3).} =
  ## This proc sends ``data`` to the specified ``address``, which may be an IP
  ## address or a hostname. If a hostname is specified this function will try
  ## each IP of that hostname. The returned future will complete once all data
  ## has been sent.
  ## 
  ## If an error occurs an OSError exception will be raised.
  ## 
  ## This proc is normally used with connectionless sockets (UDP sockets).
  assert(socket.protocol != IPPROTO_TCP,
         "Cannot `sendTo` on a TCP socket. Use `send` instead")
  assert(not socket.closed, "Cannot `sendTo` on a closed socket")

  let aiList = getAddrInfo(address, port, socket.domain, socket.sockType,
                           socket.protocol)

  var
    it = aiList
    success = false
    lastException: ref Exception
  
  while it != nil:
    let fut = sendTo(socket.fd.AsyncFD, cstring(data), len(data), it.ai_addr,
                     it.ai_addrlen.SockLen, flags)
    
    yield fut

    if not fut.failed:
      success = true

      break
    
    lastException = fut.readError()

    it = it.ai_next
  
  freeaddrinfo(aiList)

  if not success:
    if lastException != nil:
      raise lastException
    else:
      raise newException(IOError, "Couldn't resolve address: " & address)

proc recvFrom*(socket: AsyncSocket, size: int,
               flags = {SocketFlag.SafeDisconn}):
              owned(Future[tuple[data: string, address: string, port: Port]])
              {.async, since: (1, 3).} =
  ## Receives a datagram data from ``socket``, which must be at least of size
  ## ``size``. Returned future will complete once one datagram has been received
  ## and will return tuple with: data of packet received; and address and port
  ## of datagram's sender.
  ## 
  ## If an error occurs an OSError exception will be raised.
  ## 
  ## This proc is normally used with connectionless sockets (UDP sockets).
  template adaptRecvFromToDomain(domain: Domain) =
    var lAddr = sizeof(sAddr).SockLen
    
    let fut = await recvFromInto(AsyncFD(getFd(socket)), cstring(data), size,
                                 cast[ptr SockAddr](addr sAddr), addr lAddr,
                                 flags)
    
    data.setLen(fut)
    
    result.data = data
    result.address = getAddrString(cast[ptr SockAddr](addr sAddr))

    when domain == AF_INET6:
      result.port = ntohs(sAddr.sin6_port).Port
    else:
      result.port = ntohs(sAddr.sin_port).Port

  assert(socket.protocol != IPPROTO_TCP,
         "Cannot `recvFrom` on a TCP socket. Use `recv` or `recvInto` instead")
  assert(not socket.closed, "Cannot `recvFrom` on a closed socket")

  var data = newString(size)
  
  case socket.domain
  of AF_INET6:
    var sAddr: Sockaddr_in6
    adaptRecvFromToDomain(AF_INET6)
  of AF_INET:
    var sAddr: Sockaddr_in
    adaptRecvFromToDomain(AF_INET)
  else:
    raise newException(ValueError, "Unknown socket address family")

when not defined(testing) and isMainModule:
  type
    TestCases = enum
      HighClient, LowClient, LowServer

  const test = HighClient

  when test == HighClient:
    proc main() {.async.} =
      var sock = newAsyncSocket()
      await sock.connect("irc.freenode.net", Port(6667))
      while true:
        let line = await sock.recvLine()
        if line == "":
          echo("Disconnected")
          break
        else:
          echo("Got line: ", line)
    asyncCheck main()
  elif test == LowClient:
    var sock = newAsyncSocket()
    var f = connect(sock, "irc.freenode.net", Port(6667))
    f.callback =
      proc (future: Future[void]) =
        echo("Connected in future!")
        for i in 0 .. 50:
          var recvF = recv(sock, 10)
          recvF.callback =
            proc (future: Future[string]) =
              echo("Read ", future.read.len, ": ", future.read.repr)
  elif test == LowServer:
    var sock = newAsyncSocket()
    sock.bindAddr(Port(6667))
    sock.listen()
    proc onAccept(future: Future[AsyncSocket]) =
      let client = future.read
      echo "Accepted ", client.fd.cint
      var t = send(client, "test\c\L")
      t.callback =
        proc (future: Future[void]) =
          echo("Send")
          client.close()

      var f = accept(sock)
      f.callback = onAccept

    var f = accept(sock)
    f.callback = onAccept
  runForever()