summary refs log tree commit diff stats
path: root/lib/pure/asyncnet.nim
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pure/asyncnet.nim')
-rw-r--r--lib/pure/asyncnet.nim840
1 files changed, 667 insertions, 173 deletions
diff --git a/lib/pure/asyncnet.nim b/lib/pure/asyncnet.nim
index e7325e0d7..ee07e599e 100644
--- a/lib/pure/asyncnet.nim
+++ b/lib/pure/asyncnet.nim
@@ -1,47 +1,86 @@
 #
 #
 #            Nim's Runtime Library
-#        (c) Copyright 2015 Dominik Picheta
+#        (c) Copyright 2017 Dominik Picheta
 #
 #    See the file "copying.txt", included in this
 #    distribution, for details about the copyright.
 #
 
 ## This module implements a high-level asynchronous sockets API based on the
-## asynchronous dispatcher defined in the ``asyncdispatch`` module.
+## asynchronous dispatcher defined in the `asyncdispatch` module.
+##
+## Asynchronous IO in Nim
+## ======================
+##
+## Async IO in Nim consists of multiple layers (from highest to lowest):
+##
+## * `asyncnet` module
+##
+## * Async await
+##
+## * `asyncdispatch` module (event loop)
+##
+## * `selectors` module
+##
+## Each builds on top of the layers below it. The selectors module is an
+## abstraction for the various system `select()` mechanisms such as epoll or
+## kqueue. If you wish you can use it directly, and some people have done so
+## `successfully <http://goran.krampe.se/2014/10/25/nim-socketserver/>`_.
+## But you must be aware that on Windows it only supports
+## `select()`.
+##
+## The async dispatcher implements the proactor pattern and also has an
+## implementation of IOCP. It implements the proactor pattern for other
+## OS' via the selectors module. Futures are also implemented here, and
+## indeed all the procedures return a future.
+##
+## The final layer is the async await transformation. This allows you to
+## write asynchronous code in a synchronous style and works similar to
+## C#'s await. The transformation works by converting any async procedures
+## into an iterator.
+##
+## This is all single threaded, fully non-blocking and does give you a
+## lot of control. In theory you should be able to work with any of these
+## layers interchangeably (as long as you only care about non-Windows
+## platforms).
+##
+## For most applications using `asyncnet` is the way to go as it builds
+## over all the layers, providing some extra features such as buffering.
 ##
 ## SSL
-## ---
+## ===
 ##
-## SSL can be enabled by compiling with the ``-d:ssl`` flag.
+## SSL can be enabled by compiling with the `-d:ssl` flag.
 ##
-## You must create a new SSL context with the ``newContext`` function defined
-## in the ``net`` module. You may then call ``wrapSocket`` on your socket using
+## You must create a new SSL context with the `newContext` function defined
+## in the `net` module. You may then call `wrapSocket` on your socket using
 ## the newly created SSL context to get an SSL socket.
 ##
 ## Examples
-## --------
+## ========
 ##
 ## Chat server
-## ^^^^^^^^^^^
-## 
-## The following example demonstrates a simple chat server.
+## -----------
 ##
-## .. code-block::nim
+## The following example demonstrates a simple chat server.
 ##
-##   import asyncnet, asyncdispatch
+##   ```Nim
+##   import std/[asyncnet, asyncdispatch]
 ##
 ##   var clients {.threadvar.}: seq[AsyncSocket]
 ##
 ##   proc processClient(client: AsyncSocket) {.async.} =
 ##     while true:
 ##       let line = await client.recvLine()
+##       if line.len == 0: break
 ##       for c in clients:
 ##         await c.send(line & "\c\L")
 ##
 ##   proc serve() {.async.} =
 ##     clients = @[]
 ##     var server = newAsyncSocket()
+##     server.setSockOpt(OptReuseAddr, true)
 ##     server.bindAddr(Port(12345))
 ##     server.listen()
 ##
@@ -53,72 +92,125 @@
 ##
 ##   asyncCheck serve()
 ##   runForever()
-##
+##   ```
+
+import std/private/since
 
-import asyncdispatch
-import rawsockets
-import net
-import os
+when defined(nimPreviewSlimSystem):
+  import std/[assertions, syncio]
+
+import std/[asyncdispatch, nativesockets, net, os]
 
 export SOBool
 
-when defined(ssl):
-  import openssl
+# TODO: Remove duplication introduced by PR #4683.
+
+const defineSsl = defined(ssl) or defined(nimdoc)
+const useNimNetLite = defined(nimNetLite) or defined(freertos) or defined(zephyr) or
+    defined(nuttx)
+
+when defineSsl:
+  import std/openssl
 
 type
   # TODO: I would prefer to just do:
   # AsyncSocket* {.borrow: `.`.} = distinct Socket. But that doesn't work.
-  AsyncSocketDesc  = object
+  AsyncSocketDesc = object
     fd: SocketHandle
-    closed: bool ## determines whether this socket has been closed
-    case isBuffered: bool ## determines whether this socket is buffered.
-    of true:
-      buffer: array[0..BufferSize, char]
-      currPos: int # current index in buffer
-      bufLen: int # current length of buffer
-    of false: nil
-    case isSsl: bool
-    of true:
-      when defined(ssl):
-        sslHandle: SslPtr
-        sslContext: SslContext
-        bioIn: BIO
-        bioOut: BIO
-    of false: nil
+    closed: bool     ## determines whether this socket has been closed
+    isBuffered: bool ## determines whether this socket is buffered.
+    buffer: array[0..BufferSize, char]
+    currPos: int     # current index in buffer
+    bufLen: int      # current length of buffer
+    isSsl: bool
+    when defineSsl:
+      sslHandle: SslPtr
+      sslContext: SslContext
+      bioIn: BIO
+      bioOut: BIO
+      sslNoShutdown: bool
+    domain: Domain
+    sockType: SockType
+    protocol: Protocol
   AsyncSocket* = ref AsyncSocketDesc
 
-{.deprecated: [PAsyncSocket: AsyncSocket].}
-
-# TODO: Save AF, domain etc info and reuse it in procs which need it like connect.
-
-proc newAsyncSocket*(fd: TAsyncFD, isBuff: bool): AsyncSocket =
-  ## Creates a new ``AsyncSocket`` based on the supplied params.
-  assert fd != osInvalidSocket.TAsyncFD
+proc newAsyncSocket*(fd: AsyncFD, domain: Domain = AF_INET,
+                     sockType: SockType = SOCK_STREAM,
+                     protocol: Protocol = IPPROTO_TCP,
+                     buffered = true,
+                     inheritable = defined(nimInheritHandles)): owned(AsyncSocket) =
+  ## Creates a new `AsyncSocket` based on the supplied params.
+  ##
+  ## The supplied `fd`'s non-blocking state will be enabled implicitly.
+  ##
+  ## If `inheritable` is false (the default), the supplied `fd` will not
+  ## be inheritable by child processes.
+  ##
+  ## **Note**: This procedure will **NOT** register `fd` with the global
+  ## async dispatcher. You need to do this manually. If you have used
+  ## `newAsyncNativeSocket` to create `fd` then it's already registered.
+  assert fd != osInvalidSocket.AsyncFD
   new(result)
   result.fd = fd.SocketHandle
-  result.isBuffered = isBuff
-  if isBuff:
+  fd.SocketHandle.setBlocking(false)
+  if not fd.SocketHandle.setInheritable(inheritable):
+    raiseOSError(osLastError())
+  result.isBuffered = buffered
+  result.domain = domain
+  result.sockType = sockType
+  result.protocol = protocol
+  if buffered:
     result.currPos = 0
 
-proc newAsyncSocket*(domain: Domain = AF_INET, typ: SockType = SOCK_STREAM,
-    protocol: Protocol = IPPROTO_TCP, buffered = true): AsyncSocket =
+proc newAsyncSocket*(domain: Domain = AF_INET, sockType: SockType = SOCK_STREAM,
+                     protocol: Protocol = IPPROTO_TCP, buffered = true,
+                     inheritable = defined(nimInheritHandles)): owned(AsyncSocket) =
   ## Creates a new asynchronous socket.
   ##
   ## This procedure will also create a brand new file descriptor for
   ## this socket.
-  result = newAsyncSocket(newAsyncRawSocket(domain, typ, protocol), buffered)
+  ##
+  ## If `inheritable` is false (the default), the new file descriptor will not
+  ## be inheritable by child processes.
+  let fd = createAsyncNativeSocket(domain, sockType, protocol, inheritable)
+  if fd.SocketHandle == osInvalidSocket:
+    raiseOSError(osLastError())
+  result = newAsyncSocket(fd, domain, sockType, protocol, buffered, inheritable)
+
+proc getLocalAddr*(socket: AsyncSocket): (string, Port) =
+  ## Get the socket's local address and port number.
+  ##
+  ## This is high-level interface for `getsockname`:idx:.
+  getLocalAddr(socket.fd, socket.domain)
 
-proc newAsyncSocket*(domain, typ, protocol: cint, buffered = true): AsyncSocket =
+when not useNimNetLite:
+  proc getPeerAddr*(socket: AsyncSocket): (string, Port) =
+    ## Get the socket's peer address and port number.
+    ##
+    ## This is high-level interface for `getpeername`:idx:.
+    getPeerAddr(socket.fd, socket.domain)
+
+proc newAsyncSocket*(domain, sockType, protocol: cint,
+                     buffered = true,
+                     inheritable = defined(nimInheritHandles)): owned(AsyncSocket) =
   ## Creates a new asynchronous socket.
   ##
   ## This procedure will also create a brand new file descriptor for
   ## this socket.
-  result = newAsyncSocket(newAsyncRawSocket(domain, typ, protocol), buffered)
-
-when defined(ssl):
-  proc getSslError(handle: SslPtr, err: cint): cint =
+  ##
+  ## If `inheritable` is false (the default), the new file descriptor will not
+  ## be inheritable by child processes.
+  let fd = createAsyncNativeSocket(domain, sockType, protocol, inheritable)
+  if fd.SocketHandle == osInvalidSocket:
+    raiseOSError(osLastError())
+  result = newAsyncSocket(fd, Domain(domain), SockType(sockType),
+                          Protocol(protocol), buffered, inheritable)
+
+when defineSsl:
+  proc getSslError(socket: AsyncSocket, err: cint): cint =
+    assert socket.isSsl
     assert err < 0
-    var ret = SSLGetError(handle, err.cint)
+    var ret = SSL_get_error(socket.sslHandle, err.cint)
     case ret
     of SSL_ERROR_ZERO_RETURN:
       raiseSSLError("TLS/SSL connection failed to initiate, socket closed prematurely.")
@@ -129,6 +221,7 @@ when defined(ssl):
     of SSL_ERROR_WANT_X509_LOOKUP:
       raiseSSLError("Function for x509 lookup has been called.")
     of SSL_ERROR_SYSCALL, SSL_ERROR_SSL:
+      socket.sslNoShutdown = true
       raiseSSLError()
     else: raiseSSLError("Unknown Error")
 
@@ -136,79 +229,170 @@ when defined(ssl):
       flags: set[SocketFlag]) {.async.} =
     let len = bioCtrlPending(socket.bioOut)
     if len > 0:
-      var data = newStringOfCap(len)
-      let read = bioRead(socket.bioOut, addr data[0], len)
+      var data = newString(len)
+      let read = bioRead(socket.bioOut, cast[cstring](addr data[0]), len)
       assert read != 0
       if read < 0:
-        raiseSslError()
+        raiseSSLError()
       data.setLen(read)
-      await socket.fd.TAsyncFd.send(data, flags)
+      await socket.fd.AsyncFD.send(data, flags)
 
   proc appeaseSsl(socket: AsyncSocket, flags: set[SocketFlag],
-                  sslError: cint) {.async.} =
+                  sslError: cint): owned(Future[bool]) {.async.} =
+    ## Returns `true` if `socket` is still connected, otherwise `false`.
+    result = true
     case sslError
     of SSL_ERROR_WANT_WRITE:
       await sendPendingSslData(socket, flags)
     of SSL_ERROR_WANT_READ:
-      var data = await recv(socket.fd.TAsyncFD, BufferSize, flags)
-      let ret = bioWrite(socket.bioIn, addr data[0], data.len.cint)
-      if ret < 0:
-        raiseSSLError()
+      var data = await recv(socket.fd.AsyncFD, BufferSize, flags)
+      let length = len(data)
+      if length > 0:
+        let ret = bioWrite(socket.bioIn, cast[cstring](addr data[0]), length.cint)
+        if ret < 0:
+          raiseSSLError()
+      elif length == 0:
+        # connection not properly closed by remote side or connection dropped
+        SSL_set_shutdown(socket.sslHandle, SSL_RECEIVED_SHUTDOWN)
+        result = false
     else:
       raiseSSLError("Cannot appease SSL.")
 
   template sslLoop(socket: AsyncSocket, flags: set[SocketFlag],
-                   op: expr) =
+                   op: untyped) =
     var opResult {.inject.} = -1.cint
     while opResult < 0:
+      ErrClearError()
+      # Call the desired operation.
       opResult = op
-      # Bit hackish here.
-      # TODO: Introduce an async template transformation pragma?
-      yield sendPendingSslData(socket, flags)
-      if opResult < 0:
-        let err = getSslError(socket.sslHandle, opResult.cint)
-        yield appeaseSsl(socket, flags, err.cint)
+      let err =
+        if opResult < 0:
+          getSslError(socket, opResult.cint)
+        else:
+          SSL_ERROR_NONE
+      # Send any remaining pending SSL data.
+      await sendPendingSslData(socket, flags)
 
-proc connect*(socket: AsyncSocket, address: string, port: Port,
-    af = AF_INET) {.async.} =
-  ## Connects ``socket`` to server at ``address:port``.
+      # If the operation failed, try to see if SSL has some data to read
+      # or write.
+      if opResult < 0:
+        let fut = appeaseSsl(socket, flags, err.cint)
+        yield fut
+        if not fut.read():
+          # Socket disconnected.
+          if SocketFlag.SafeDisconn in flags:
+            opResult = 0.cint
+            break
+          else:
+            raiseSSLError("Socket has been disconnected")
+
+proc dial*(address: string, port: Port, protocol = IPPROTO_TCP,
+           buffered = true): owned(Future[AsyncSocket]) {.async.} =
+  ## Establishes connection to the specified `address`:`port` pair via the
+  ## specified protocol. The procedure iterates through possible
+  ## resolutions of the `address` until it succeeds, meaning that it
+  ## seamlessly works with both IPv4 and IPv6.
+  ## Returns AsyncSocket ready to send or receive data.
+  let asyncFd = await asyncdispatch.dial(address, port, protocol)
+  let sockType = protocol.toSockType()
+  let domain = getSockDomain(asyncFd.SocketHandle)
+  result = newAsyncSocket(asyncFd, domain, sockType, protocol, buffered)
+
+proc connect*(socket: AsyncSocket, address: string, port: Port) {.async.} =
+  ## Connects `socket` to server at `address:port`.
   ##
-  ## Returns a ``Future`` which will complete when the connection succeeds
+  ## Returns a `Future` which will complete when the connection succeeds
   ## or an error occurs.
-  await connect(socket.fd.TAsyncFD, address, port, af)
+  await connect(socket.fd.AsyncFD, address, port, socket.domain)
   if socket.isSsl:
-    when defined(ssl):
+    when defineSsl:
+      if not isIpAddress(address):
+        # Set the SNI address for this connection. This call can fail if
+        # we're not using TLSv1+.
+        discard SSL_set_tlsext_host_name(socket.sslHandle, address)
+
       let flags = {SocketFlag.SafeDisconn}
       sslSetConnectState(socket.sslHandle)
       sslLoop(socket, flags, sslDoHandshake(socket.sslHandle))
 
-proc readInto(buf: cstring, size: int, socket: AsyncSocket,
-              flags: set[SocketFlag]): Future[int] {.async.} =
+template readInto(buf: pointer, size: int, socket: AsyncSocket,
+                  flags: set[SocketFlag]): int =
+  ## Reads **up to** `size` bytes from `socket` into `buf`. Note that
+  ## this is a template and not a proc.
+  assert(not socket.closed, "Cannot `recv` on a closed socket")
+  var res = 0
   if socket.isSsl:
-    when defined(ssl):
+    when defineSsl:
       # SSL mode.
       sslLoop(socket, flags,
-        sslRead(socket.sslHandle, buf, size.cint))
-      result = opResult
+        sslRead(socket.sslHandle, cast[cstring](buf), size.cint))
+      res = opResult
   else:
-    var data = await recv(socket.fd.TAsyncFD, size, flags)
-    if data.len != 0:
-      copyMem(buf, addr data[0], data.len)
     # Not in SSL mode.
-    result = data.len
+    res = await asyncdispatch.recvInto(socket.fd.AsyncFD, buf, size, flags)
+  res
 
-proc readIntoBuf(socket: AsyncSocket,
-    flags: set[SocketFlag]): Future[int] {.async.} =
-  result = await readInto(addr socket.buffer[0], BufferSize, socket, flags)
+template readIntoBuf(socket: AsyncSocket,
+    flags: set[SocketFlag]): int =
+  var size = readInto(addr socket.buffer[0], BufferSize, socket, flags)
   socket.currPos = 0
-  socket.bufLen = result
+  socket.bufLen = size
+  size
+
+proc recvInto*(socket: AsyncSocket, buf: pointer, size: int,
+           flags = {SocketFlag.SafeDisconn}): owned(Future[int]) {.async.} =
+  ## Reads **up to** `size` bytes from `socket` into `buf`.
+  ##
+  ## For buffered sockets this function will attempt to read all the requested
+  ## data. It will read this data in `BufferSize` chunks.
+  ##
+  ## For unbuffered sockets this function makes no effort to read
+  ## all the data requested. It will return as much data as the operating system
+  ## gives it.
+  ##
+  ## If socket is disconnected during the
+  ## recv operation then the future may complete with only a part of the
+  ## requested data.
+  ##
+  ## If socket is disconnected and no data is available
+  ## to be read then the future will complete with a value of `0`.
+  if socket.isBuffered:
+    let originalBufPos = socket.currPos
+
+    if socket.bufLen == 0:
+      let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
+      if res == 0:
+        return 0
+
+    var read = 0
+    var cbuf = cast[cstring](buf)
+    while read < size:
+      if socket.currPos >= socket.bufLen:
+        if SocketFlag.Peek in flags:
+          # We don't want to get another buffer if we're peeking.
+          break
+        let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
+        if res == 0:
+          break
+
+      let chunk = min(socket.bufLen-socket.currPos, size-read)
+      copyMem(addr(cbuf[read]), addr(socket.buffer[socket.currPos]), chunk)
+      read.inc(chunk)
+      socket.currPos.inc(chunk)
+
+    if SocketFlag.Peek in flags:
+      # Restore old buffer cursor position.
+      socket.currPos = originalBufPos
+    result = read
+  else:
+    result = readInto(buf, size, socket, flags)
 
 proc recv*(socket: AsyncSocket, size: int,
-           flags = {SocketFlag.SafeDisconn}): Future[string] {.async.} =
-  ## Reads **up to** ``size`` bytes from ``socket``.
+           flags = {SocketFlag.SafeDisconn}): owned(Future[string]) {.async.} =
+  ## Reads **up to** `size` bytes from `socket`.
   ##
   ## For buffered sockets this function will attempt to read all the requested
-  ## data. It will read this data in ``BufferSize`` chunks.
+  ## data. It will read this data in `BufferSize` chunks.
   ##
   ## For unbuffered sockets this function makes no effort to read
   ## all the data requested. It will return as much data as the operating system
@@ -219,13 +403,15 @@ proc recv*(socket: AsyncSocket, size: int,
   ## requested data.
   ##
   ## If socket is disconnected and no data is available
-  ## to be read then the future will complete with a value of ``""``.
+  ## to be read then the future will complete with a value of `""`.
   if socket.isBuffered:
     result = newString(size)
+    when not defined(nimSeqsV2):
+      shallow(result)
     let originalBufPos = socket.currPos
 
     if socket.bufLen == 0:
-      let res = await socket.readIntoBuf(flags - {SocketFlag.Peek})
+      let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
       if res == 0:
         result.setLen(0)
         return
@@ -236,7 +422,7 @@ proc recv*(socket: AsyncSocket, size: int,
         if SocketFlag.Peek in flags:
           # We don't want to get another buffer if we're peeking.
           break
-        let res = await socket.readIntoBuf(flags - {SocketFlag.Peek})
+        let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
         if res == 0:
           break
 
@@ -251,45 +437,67 @@ proc recv*(socket: AsyncSocket, size: int,
     result.setLen(read)
   else:
     result = newString(size)
-    let read = await readInto(addr result[0], size, socket, flags)
+    let read = readInto(addr result[0], size, socket, flags)
     result.setLen(read)
 
+proc send*(socket: AsyncSocket, buf: pointer, size: int,
+            flags = {SocketFlag.SafeDisconn}) {.async.} =
+  ## Sends `size` bytes from `buf` to `socket`. The returned future will complete once all
+  ## data has been sent.
+  assert socket != nil
+  assert(not socket.closed, "Cannot `send` on a closed socket")
+  if socket.isSsl:
+    when defineSsl:
+      sslLoop(socket, flags,
+              sslWrite(socket.sslHandle, cast[cstring](buf), size.cint))
+      await sendPendingSslData(socket, flags)
+  else:
+    await send(socket.fd.AsyncFD, buf, size, flags)
+
 proc send*(socket: AsyncSocket, data: string,
            flags = {SocketFlag.SafeDisconn}) {.async.} =
-  ## Sends ``data`` to ``socket``. The returned future will complete once all
+  ## Sends `data` to `socket`. The returned future will complete once all
   ## data has been sent.
   assert socket != nil
   if socket.isSsl:
-    when defined(ssl):
+    when defineSsl:
       var copy = data
       sslLoop(socket, flags,
-        sslWrite(socket.sslHandle, addr copy[0], copy.len.cint))
+        sslWrite(socket.sslHandle, cast[cstring](addr copy[0]), copy.len.cint))
       await sendPendingSslData(socket, flags)
   else:
-    await send(socket.fd.TAsyncFD, data, flags)
+    await send(socket.fd.AsyncFD, data, flags)
 
-proc acceptAddr*(socket: AsyncSocket, flags = {SocketFlag.SafeDisconn}):
-      Future[tuple[address: string, client: AsyncSocket]] =
+proc acceptAddr*(socket: AsyncSocket, flags = {SocketFlag.SafeDisconn},
+                 inheritable = defined(nimInheritHandles)):
+      owned(Future[tuple[address: string, client: AsyncSocket]]) =
   ## Accepts a new connection. Returns a future containing the client socket
   ## corresponding to that connection and the remote address of the client.
+  ##
+  ## If `inheritable` is false (the default), the resulting client socket will
+  ## not be inheritable by child processes.
+  ##
   ## The future will complete when the connection is successfully accepted.
   var retFuture = newFuture[tuple[address: string, client: AsyncSocket]]("asyncnet.acceptAddr")
-  var fut = acceptAddr(socket.fd.TAsyncFD, flags)
+  var fut = acceptAddr(socket.fd.AsyncFD, flags, inheritable)
   fut.callback =
-    proc (future: Future[tuple[address: string, client: TAsyncFD]]) =
+    proc (future: Future[tuple[address: string, client: AsyncFD]]) =
       assert future.finished
       if future.failed:
         retFuture.fail(future.readError)
       else:
         let resultTup = (future.read.address,
-                         newAsyncSocket(future.read.client, socket.isBuffered))
+                         newAsyncSocket(future.read.client, socket.domain,
+                         socket.sockType, socket.protocol, socket.isBuffered, inheritable))
         retFuture.complete(resultTup)
   return retFuture
 
 proc accept*(socket: AsyncSocket,
-    flags = {SocketFlag.SafeDisconn}): Future[AsyncSocket] =
+    flags = {SocketFlag.SafeDisconn}): owned(Future[AsyncSocket]) =
   ## Accepts a new connection. Returns a future containing the client socket
   ## corresponding to that connection.
+  ## If `inheritable` is false (the default), the resulting client socket will
+  ## not be inheritable by child processes.
   ## The future will complete when the connection is successfully accepted.
   var retFut = newFuture[AsyncSocket]("asyncnet.accept")
   var fut = acceptAddr(socket, flags)
@@ -302,43 +510,52 @@ proc accept*(socket: AsyncSocket,
         retFut.complete(future.read.client)
   return retFut
 
-proc recvLine*(socket: AsyncSocket,
-    flags = {SocketFlag.SafeDisconn}): Future[string] {.async.} =
-  ## Reads a line of data from ``socket``. Returned future will complete once
-  ## a full line is read or an error occurs.
+proc recvLineInto*(socket: AsyncSocket, resString: FutureVar[string],
+    flags = {SocketFlag.SafeDisconn}, maxLength = MaxLineLength) {.async.} =
+  ## Reads a line of data from `socket` into `resString`.
   ##
-  ## If a full line is read ``\r\L`` is not
-  ## added to ``line``, however if solely ``\r\L`` is read then ``line``
+  ## If a full line is read `\r\L` is not
+  ## added to `line`, however if solely `\r\L` is read then `line`
   ## will be set to it.
-  ## 
-  ## If the socket is disconnected, ``line`` will be set to ``""``.
   ##
-  ## If the socket is disconnected in the middle of a line (before ``\r\L``
-  ## is read) then line will be set to ``""``.
+  ## If the socket is disconnected, `line` will be set to `""`.
+  ##
+  ## If the socket is disconnected in the middle of a line (before `\r\L`
+  ## is read) then line will be set to `""`.
   ## The partial line **will be lost**.
   ##
-  ## **Warning**: The ``Peek`` flag is not yet implemented.
-  ## 
-  ## **Warning**: ``recvLine`` on unbuffered sockets assumes that the protocol
-  ## uses ``\r\L`` to delimit a new line.
-  template addNLIfEmpty(): stmt =
-    if result.len == 0:
-      result.add("\c\L")
+  ## The `maxLength` parameter determines the maximum amount of characters
+  ## that can be read. `resString` will be truncated after that.
+  ##
+  ## .. warning:: The `Peek` flag is not yet implemented.
+  ##
+  ## .. warning:: `recvLineInto` on unbuffered sockets assumes that the protocol uses `\r\L` to delimit a new line.
   assert SocketFlag.Peek notin flags ## TODO:
+  result = newFuture[void]("asyncnet.recvLineInto")
+
+  # TODO: Make the async transformation check for FutureVar params and complete
+  # them when the result future is completed.
+  # Can we replace the result future with the FutureVar?
+
+  template addNLIfEmpty(): untyped =
+    if resString.mget.len == 0:
+      resString.mget.add("\c\L")
+
   if socket.isBuffered:
-    result = ""
     if socket.bufLen == 0:
-      let res = await socket.readIntoBuf(flags)
+      let res = socket.readIntoBuf(flags)
       if res == 0:
+        resString.complete()
         return
 
     var lastR = false
     while true:
       if socket.currPos >= socket.bufLen:
-        let res = await socket.readIntoBuf(flags)
+        let res = socket.readIntoBuf(flags)
         if res == 0:
-          result = ""
-          break
+          resString.mget.setLen(0)
+          resString.complete()
+          return
 
       case socket.buffer[socket.currPos]
       of '\r':
@@ -347,107 +564,261 @@ proc recvLine*(socket: AsyncSocket,
       of '\L':
         addNLIfEmpty()
         socket.currPos.inc()
+        resString.complete()
         return
       else:
         if lastR:
           socket.currPos.inc()
+          resString.complete()
           return
         else:
-          result.add socket.buffer[socket.currPos]
+          resString.mget.add socket.buffer[socket.currPos]
       socket.currPos.inc()
+
+      # Verify that this isn't a DOS attack: #3847.
+      if resString.mget.len > maxLength: break
   else:
-    result = ""
     var c = ""
     while true:
       c = await recv(socket, 1, flags)
       if c.len == 0:
-        return ""
+        resString.mget.setLen(0)
+        resString.complete()
+        return
       if c == "\r":
         c = await recv(socket, 1, flags) # Skip \L
         assert c == "\L"
         addNLIfEmpty()
+        resString.complete()
         return
       elif c == "\L":
         addNLIfEmpty()
+        resString.complete()
         return
-      add(result.string, c)
+      resString.mget.add c
 
-proc listen*(socket: AsyncSocket, backlog = SOMAXCONN) {.tags: [ReadIOEffect].} =
-  ## Marks ``socket`` as accepting connections.
-  ## ``Backlog`` specifies the maximum length of the
+      # Verify that this isn't a DOS attack: #3847.
+      if resString.mget.len > maxLength: break
+  resString.complete()
+
+proc recvLine*(socket: AsyncSocket,
+    flags = {SocketFlag.SafeDisconn},
+    maxLength = MaxLineLength): owned(Future[string]) {.async.} =
+  ## Reads a line of data from `socket`. Returned future will complete once
+  ## a full line is read or an error occurs.
+  ##
+  ## If a full line is read `\r\L` is not
+  ## added to `line`, however if solely `\r\L` is read then `line`
+  ## will be set to it.
+  ##
+  ## If the socket is disconnected, `line` will be set to `""`.
+  ##
+  ## If the socket is disconnected in the middle of a line (before `\r\L`
+  ## is read) then line will be set to `""`.
+  ## The partial line **will be lost**.
+  ##
+  ## The `maxLength` parameter determines the maximum amount of characters
+  ## that can be read. The result is truncated after that.
+  ##
+  ## .. warning:: The `Peek` flag is not yet implemented.
+  ##
+  ## .. warning:: `recvLine` on unbuffered sockets assumes that the protocol uses `\r\L` to delimit a new line.
+  assert SocketFlag.Peek notin flags ## TODO:
+
+  # TODO: Optimise this
+  var resString = newFutureVar[string]("asyncnet.recvLine")
+  resString.mget() = ""
+  await socket.recvLineInto(resString, flags, maxLength)
+  result = resString.mget()
+
+proc listen*(socket: AsyncSocket, backlog = SOMAXCONN) {.tags: [
+    ReadIOEffect].} =
+  ## Marks `socket` as accepting connections.
+  ## `Backlog` specifies the maximum length of the
   ## queue of pending connections.
   ##
-  ## Raises an EOS error upon failure.
+  ## Raises an OSError error upon failure.
   if listen(socket.fd, backlog) < 0'i32: raiseOSError(osLastError())
 
 proc bindAddr*(socket: AsyncSocket, port = Port(0), address = "") {.
   tags: [ReadIOEffect].} =
-  ## Binds ``address``:``port`` to the socket.
+  ## Binds `address`:`port` to the socket.
   ##
-  ## If ``address`` is "" then ADDR_ANY will be bound.
-
-  if address == "":
-    var name: Sockaddr_in
-    when defined(Windows) or defined(nimdoc):
-      name.sin_family = toInt(AF_INET).int16
+  ## If `address` is "" then ADDR_ANY will be bound.
+  var realaddr = address
+  if realaddr == "":
+    case socket.domain
+    of AF_INET6: realaddr = "::"
+    of AF_INET: realaddr = "0.0.0.0"
     else:
-      name.sin_family = toInt(AF_INET)
-    name.sin_port = htons(int16(port))
-    name.sin_addr.s_addr = htonl(INADDR_ANY)
-    if bindAddr(socket.fd, cast[ptr SockAddr](addr(name)),
-                  sizeof(name).Socklen) < 0'i32:
-      raiseOSError(osLastError())
-  else:
-    var aiList = getAddrInfo(address, port, AF_INET)
-    if bindAddr(socket.fd, aiList.ai_addr, aiList.ai_addrlen.Socklen) < 0'i32:
-      dealloc(aiList)
-      raiseOSError(osLastError())
-    dealloc(aiList)
+      raise newException(ValueError,
+        "Unknown socket address family and no address specified to bindAddr")
+
+  var aiList = getAddrInfo(realaddr, port, socket.domain)
+  if bindAddr(socket.fd, aiList.ai_addr, aiList.ai_addrlen.SockLen) < 0'i32:
+    freeAddrInfo(aiList)
+    raiseOSError(osLastError())
+  freeAddrInfo(aiList)
+
+proc hasDataBuffered*(s: AsyncSocket): bool {.since: (1, 5).} =
+  ## Determines whether an AsyncSocket has data buffered.
+  # xxx dedup with std/net
+  s.isBuffered and s.bufLen > 0 and s.currPos != s.bufLen
+
+when defined(posix) and not useNimNetLite:
+
+  proc connectUnix*(socket: AsyncSocket, path: string): owned(Future[void]) =
+    ## Binds Unix socket to `path`.
+    ## This only works on Unix-style systems: Mac OS X, BSD and Linux
+    when not defined(nimdoc):
+      let retFuture = newFuture[void]("connectUnix")
+      result = retFuture
+
+      proc cb(fd: AsyncFD): bool =
+        let ret = SocketHandle(fd).getSockOptInt(cint(SOL_SOCKET), cint(SO_ERROR))
+        if ret == 0:
+          retFuture.complete()
+          return true
+        elif ret == EINTR:
+          return false
+        else:
+          retFuture.fail(newOSError(OSErrorCode(ret)))
+          return true
+
+      var socketAddr = makeUnixAddr(path)
+      let ret = socket.fd.connect(cast[ptr SockAddr](addr socketAddr),
+                        (offsetOf(socketAddr, sun_path) + path.len + 1).SockLen)
+      if ret == 0:
+        # Request to connect completed immediately.
+        retFuture.complete()
+      else:
+        let lastError = osLastError()
+        if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS:
+          addWrite(AsyncFD(socket.fd), cb)
+        else:
+          retFuture.fail(newOSError(lastError))
+
+  proc bindUnix*(socket: AsyncSocket, path: string) {.
+    tags: [ReadIOEffect].} =
+    ## Binds Unix socket to `path`.
+    ## This only works on Unix-style systems: Mac OS X, BSD and Linux
+    when not defined(nimdoc):
+      var socketAddr = makeUnixAddr(path)
+      if socket.fd.bindAddr(cast[ptr SockAddr](addr socketAddr),
+          (offsetOf(socketAddr, sun_path) + path.len + 1).SockLen) != 0'i32:
+        raiseOSError(osLastError())
+
+elif defined(nimdoc):
+
+  proc connectUnix*(socket: AsyncSocket, path: string): owned(Future[void]) =
+    ## Binds Unix socket to `path`.
+    ## This only works on Unix-style systems: Mac OS X, BSD and Linux
+    discard
+
+  proc bindUnix*(socket: AsyncSocket, path: string) =
+    ## Binds Unix socket to `path`.
+    ## This only works on Unix-style systems: Mac OS X, BSD and Linux
+    discard
 
 proc close*(socket: AsyncSocket) =
   ## Closes the socket.
+  if socket.closed: return
+
   defer:
-    socket.fd.TAsyncFD.closeSocket()
-  when defined(ssl):
-    if socket.isSSL:
-      let res = SslShutdown(socket.sslHandle)
+    socket.fd.AsyncFD.closeSocket()
+    socket.closed = true # TODO: Add extra debugging checks for this.
+
+  when defineSsl:
+    if socket.isSsl:
+      let res =
+        # Don't call SSL_shutdown if the connection has not been fully
+        # established, see:
+        # https://github.com/openssl/openssl/issues/710#issuecomment-253897666
+        if not socket.sslNoShutdown and SSL_in_init(socket.sslHandle) == 0:
+          ErrClearError()
+          SSL_shutdown(socket.sslHandle)
+        else:
+          0
+      SSL_free(socket.sslHandle)
       if res == 0:
         discard
       elif res != 1:
-        raiseSslError()
-  socket.closed = true # TODO: Add extra debugging checks for this.
+        raiseSSLError()
 
-when defined(ssl):
+when defineSsl:
+  proc sslHandle*(self: AsyncSocket): SslPtr =
+    ## Retrieve the ssl pointer of `socket`.
+    ## Useful for interfacing with `openssl`.
+    self.sslHandle
+  
   proc wrapSocket*(ctx: SslContext, socket: AsyncSocket) =
     ## Wraps a socket in an SSL context. This function effectively turns
-    ## ``socket`` into an SSL socket.
+    ## `socket` into an SSL socket.
     ##
     ## **Disclaimer**: This code is not well tested, may be very unsafe and
     ## prone to security vulnerabilities.
     socket.isSsl = true
     socket.sslContext = ctx
-    socket.sslHandle = SSLNew(SSLCTX(socket.sslContext))
+    socket.sslHandle = SSL_new(socket.sslContext.context)
     if socket.sslHandle == nil:
-      raiseSslError()
+      raiseSSLError()
 
-    socket.bioIn = bioNew(bio_s_mem())
-    socket.bioOut = bioNew(bio_s_mem())
+    socket.bioIn = bioNew(bioSMem())
+    socket.bioOut = bioNew(bioSMem())
     sslSetBio(socket.sslHandle, socket.bioIn, socket.bioOut)
 
+    socket.sslNoShutdown = true
+
+  proc wrapConnectedSocket*(ctx: SslContext, socket: AsyncSocket,
+                            handshake: SslHandshakeType,
+                            hostname: string = "") =
+    ## Wraps a connected socket in an SSL context. This function effectively
+    ## turns `socket` into an SSL socket.
+    ## `hostname` should be specified so that the client knows which hostname
+    ## the server certificate should be validated against.
+    ##
+    ## This should be called on a connected socket, and will perform
+    ## an SSL handshake immediately.
+    ##
+    ## **Disclaimer**: This code is not well tested, may be very unsafe and
+    ## prone to security vulnerabilities.
+    wrapSocket(ctx, socket)
+
+    case handshake
+    of handshakeAsClient:
+      if hostname.len > 0 and not isIpAddress(hostname):
+        # Set the SNI address for this connection. This call can fail if
+        # we're not using TLSv1+.
+        discard SSL_set_tlsext_host_name(socket.sslHandle, hostname)
+      sslSetConnectState(socket.sslHandle)
+    of handshakeAsServer:
+      sslSetAcceptState(socket.sslHandle)
+
+  proc getPeerCertificates*(socket: AsyncSocket): seq[Certificate] {.since: (1, 1).} =
+    ## Returns the certificate chain received by the peer we are connected to
+    ## through the given socket.
+    ## The handshake must have been completed and the certificate chain must
+    ## have been verified successfully or else an empty sequence is returned.
+    ## The chain is ordered from leaf certificate to root certificate.
+    if not socket.isSsl:
+      result = newSeq[Certificate]()
+    else:
+      result = getPeerCertificates(socket.sslHandle)
+
 proc getSockOpt*(socket: AsyncSocket, opt: SOBool, level = SOL_SOCKET): bool {.
   tags: [ReadIOEffect].} =
-  ## Retrieves option ``opt`` as a boolean value.
+  ## Retrieves option `opt` as a boolean value.
   var res = getSockOptInt(socket.fd, cint(level), toCInt(opt))
   result = res != 0
 
 proc setSockOpt*(socket: AsyncSocket, opt: SOBool, value: bool,
     level = SOL_SOCKET) {.tags: [WriteIOEffect].} =
-  ## Sets option ``opt`` to a boolean value specified by ``value``.
+  ## Sets option `opt` to a boolean value specified by `value`.
   var valuei = cint(if value: 1 else: 0)
   setSockOptInt(socket.fd, cint(level), toCInt(opt), valuei)
 
 proc isSsl*(socket: AsyncSocket): bool =
-  ## Determines whether ``socket`` is a SSL socket.
+  ## Determines whether `socket` is a SSL socket.
   socket.isSsl
 
 proc getFd*(socket: AsyncSocket): SocketHandle =
@@ -458,7 +829,131 @@ proc isClosed*(socket: AsyncSocket): bool =
   ## Determines whether the socket has been closed.
   return socket.closed
 
-when isMainModule:
+proc sendTo*(socket: AsyncSocket, address: string, port: Port, data: string,
+             flags = {SocketFlag.SafeDisconn}): owned(Future[void])
+            {.async, since: (1, 3).} =
+  ## This proc sends `data` to the specified `address`, which may be an IP
+  ## address or a hostname. If a hostname is specified this function will try
+  ## each IP of that hostname. The returned future will complete once all data
+  ## has been sent.
+  ##
+  ## If an error occurs an OSError exception will be raised.
+  ##
+  ## This proc is normally used with connectionless sockets (UDP sockets).
+  assert(socket.protocol != IPPROTO_TCP,
+         "Cannot `sendTo` on a TCP socket. Use `send` instead")
+  assert(not socket.closed, "Cannot `sendTo` on a closed socket")
+
+  let aiList = getAddrInfo(address, port, socket.domain, socket.sockType,
+                           socket.protocol)
+
+  var
+    it = aiList
+    success = false
+    lastException: ref Exception
+
+  while it != nil:
+    let fut = sendTo(socket.fd.AsyncFD, cstring(data), len(data), it.ai_addr,
+                     it.ai_addrlen.SockLen, flags)
+
+    yield fut
+
+    if not fut.failed:
+      success = true
+
+      break
+
+    lastException = fut.readError()
+
+    it = it.ai_next
+
+  freeAddrInfo(aiList)
+
+  if not success:
+    if lastException != nil:
+      raise lastException
+    else:
+      raise newException(IOError, "Couldn't resolve address: " & address)
+
+proc recvFrom*(socket: AsyncSocket, data: FutureVar[string], size: int,
+               address: FutureVar[string], port: FutureVar[Port],
+               flags = {SocketFlag.SafeDisconn}): owned(Future[int])
+              {.async, since: (1, 3).} =
+  ## Receives a datagram data from `socket` into `data`, which must be at
+  ## least of size `size`. The address and port of datagram's sender will be
+  ## stored into `address` and `port`, respectively. Returned future will
+  ## complete once one datagram has been received, and will return size of
+  ## packet received.
+  ##
+  ## If an error occurs an OSError exception will be raised.
+  ##
+  ## This proc is normally used with connectionless sockets (UDP sockets).
+  ##
+  ## **Notes**
+  ## * `data` must be initialized to the length of `size`.
+  ## * `address` must be initialized to 46 in length.
+  template adaptRecvFromToDomain(domain: Domain) =
+    var lAddr = sizeof(sAddr).SockLen
+
+    result = await recvFromInto(AsyncFD(getFd(socket)), cstring(data.mget()), size,
+                                cast[ptr SockAddr](addr sAddr), addr lAddr,
+                                flags)
+
+    data.mget().setLen(result)
+    data.complete()
+
+    getAddrString(cast[ptr SockAddr](addr sAddr), address.mget())
+
+    address.complete()
+
+    when domain == AF_INET6:
+      port.complete(ntohs(sAddr.sin6_port).Port)
+    else:
+      port.complete(ntohs(sAddr.sin_port).Port)
+
+  assert(socket.protocol != IPPROTO_TCP,
+         "Cannot `recvFrom` on a TCP socket. Use `recv` or `recvInto` instead")
+  assert(not socket.closed, "Cannot `recvFrom` on a closed socket")
+  assert(size == len(data.mget()),
+         "`date` was not initialized correctly. `size` != `len(data.mget())`")
+  assert(46 == len(address.mget()),
+         "`address` was not initialized correctly. 46 != `len(address.mget())`")
+
+  case socket.domain
+  of AF_INET6:
+    var sAddr: Sockaddr_in6
+    adaptRecvFromToDomain(AF_INET6)
+  of AF_INET:
+    var sAddr: Sockaddr_in
+    adaptRecvFromToDomain(AF_INET)
+  else:
+    raise newException(ValueError, "Unknown socket address family")
+
+proc recvFrom*(socket: AsyncSocket, size: int,
+               flags = {SocketFlag.SafeDisconn}):
+              owned(Future[tuple[data: string, address: string, port: Port]])
+              {.async, since: (1, 3).} =
+  ## Receives a datagram data from `socket`, which must be at least of size
+  ## `size`. Returned future will complete once one datagram has been received
+  ## and will return tuple with: data of packet received; and address and port
+  ## of datagram's sender.
+  ##
+  ## If an error occurs an OSError exception will be raised.
+  ##
+  ## This proc is normally used with connectionless sockets (UDP sockets).
+  var
+    data = newFutureVar[string]()
+    address = newFutureVar[string]()
+    port = newFutureVar[Port]()
+
+  data.mget().setLen(size)
+  address.mget().setLen(46)
+
+  let read = await recvFrom(socket, data, size, address, port, flags)
+
+  result = (data.mget(), address.mget(), port.mget())
+
+when not defined(testing) and isMainModule:
   type
     TestCases = enum
       HighClient, LowClient, LowServer
@@ -500,11 +995,10 @@ when isMainModule:
         proc (future: Future[void]) =
           echo("Send")
           client.close()
-      
+
       var f = accept(sock)
       f.callback = onAccept
-      
+
     var f = accept(sock)
     f.callback = onAccept
   runForever()
-