summary refs log tree commit diff stats
path: root/lib/pure/asyncdispatch.nim
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pure/asyncdispatch.nim')
-rw-r--r--lib/pure/asyncdispatch.nim465
1 files changed, 328 insertions, 137 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim
index 1b9887098..cc337452f 100644
--- a/lib/pure/asyncdispatch.nim
+++ b/lib/pure/asyncdispatch.nim
@@ -11,7 +11,7 @@ include "system/inclrtl"
 
 import os, oids, tables, strutils, macros, times
 
-import rawsockets, net
+import nativesockets, net
 
 export Port, SocketFlag
 
@@ -122,10 +122,11 @@ export Port, SocketFlag
 ## Limitations/Bugs
 ## ----------------
 ##
-## * ``except`` statement (without `try`) does not work inside async procedures.
 ## * The effect system (``raises: []``) does not work with async procedures.
 ## * Can't await in a ``except`` body
-
+## * Forward declarations for async procs are broken,
+##   link includes workaround: https://github.com/nim-lang/Nim/issues/3182.
+## * FutureVar[T] needs to be completed manually.
 
 # TODO: Check if yielded future is nil and throw a more meaningful exception
 
@@ -145,10 +146,15 @@ type
   Future*[T] = ref object of FutureBase ## Typed future.
     value: T ## Stored value
 
-{.deprecated: [PFutureBase: FutureBase, PFuture: Future].}
+  FutureVar*[T] = distinct Future[T]
 
+  FutureError* = object of Exception
+    cause*: FutureBase
+
+{.deprecated: [PFutureBase: FutureBase, PFuture: Future].}
 
-var currentID = 0
+when not defined(release):
+  var currentID = 0
 proc newFuture*[T](fromProc: string = "unspecified"): Future[T] =
   ## Creates a new future.
   ##
@@ -162,18 +168,39 @@ proc newFuture*[T](fromProc: string = "unspecified"): Future[T] =
     result.fromProc = fromProc
     currentID.inc()
 
+proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] =
+  ## Create a new ``FutureVar``. This Future type is ideally suited for
+  ## situations where you want to avoid unnecessary allocations of Futures.
+  ##
+  ## Specifying ``fromProc``, which is a string specifying the name of the proc
+  ## that this future belongs to, is a good habit as it helps with debugging.
+  result = FutureVar[T](newFuture[T](fromProc))
+
+proc clean*[T](future: FutureVar[T]) =
+  ## Resets the ``finished`` status of ``future``.
+  Future[T](future).finished = false
+  Future[T](future).error = nil
+
 proc checkFinished[T](future: Future[T]) =
+  ## Checks whether `future` is finished. If it is then raises a
+  ## ``FutureError``.
   when not defined(release):
     if future.finished:
-      echo("<-----> ", future.id, " ", future.fromProc)
-      echo(future.stackTrace)
-      echo("-----")
+      var msg = ""
+      msg.add("An attempt was made to complete a Future more than once. ")
+      msg.add("Details:")
+      msg.add("\n  Future ID: " & $future.id)
+      msg.add("\n  Created in proc: " & future.fromProc)
+      msg.add("\n  Stack trace to moment of creation:")
+      msg.add("\n" & indent(future.stackTrace.strip(), 4))
       when T is string:
-        echo("Contents: ", future.value.repr)
-      echo("<----->")
-      echo("Future already finished, cannot finish twice.")
-      echo getStackTrace()
-      assert false
+        msg.add("\n  Contents (string): ")
+        msg.add("\n" & indent(future.value.repr, 4))
+      msg.add("\n  Stack trace to moment of secondary completion:")
+      msg.add("\n" & indent(getStackTrace().strip(), 4))
+      var err = newException(FutureError, msg)
+      err.cause = future
+      raise err
 
 proc complete*[T](future: Future[T], val: T) =
   ## Completes ``future`` with value ``val``.
@@ -194,6 +221,15 @@ proc complete*(future: Future[void]) =
   if future.cb != nil:
     future.cb()
 
+proc complete*[T](future: FutureVar[T]) =
+  ## Completes a ``FutureVar``.
+  template fut: expr = Future[T](future)
+  checkFinished(fut)
+  assert(fut.error == nil)
+  fut.finished = true
+  if fut.cb != nil:
+    fut.cb()
+
 proc fail*[T](future: Future[T], error: ref Exception) =
   ## Completes ``future`` with ``error``.
   #assert(not future.finished, "Future already finished, cannot finish twice.")
@@ -230,15 +266,17 @@ proc `callback=`*[T](future: Future[T],
   ## If future has already completed then ``cb`` will be called immediately.
   future.callback = proc () = cb(future)
 
-proc echoOriginalStackTrace[T](future: Future[T]) =
+proc injectStacktrace[T](future: Future[T]) =
   # TODO: Come up with something better.
   when not defined(release):
-    echo("Original stack trace in ", future.fromProc, ":")
+    var msg = ""
+    msg.add("\n  " & future.fromProc & "'s lead up to read of failed Future:")
+
     if not future.errorStackTrace.isNil and future.errorStackTrace != "":
-      echo(future.errorStackTrace)
+      msg.add("\n" & indent(future.errorStackTrace.strip(), 4))
     else:
-      echo("Empty or nil stack trace.")
-    echo("Continuing...")
+      msg.add("\n    Empty or nil stack trace.")
+    future.error.msg.add(msg)
 
 proc read*[T](future: Future[T]): T =
   ## Retrieves the value of ``future``. Future must be finished otherwise
@@ -247,7 +285,7 @@ proc read*[T](future: Future[T]): T =
   ## If the result of the future is an error then that error will be raised.
   if future.finished:
     if future.error != nil:
-      echoOriginalStackTrace(future)
+      injectStacktrace(future)
       raise future.error
     when T isnot void:
       return future.value
@@ -264,6 +302,13 @@ proc readError*[T](future: Future[T]): ref Exception =
   else:
     raise newException(ValueError, "No error in future.")
 
+proc mget*[T](future: FutureVar[T]): var T =
+  ## Returns a mutable value stored in ``future``.
+  ##
+  ## Unlike ``read``, this function will not raise an exception if the
+  ## Future has not been finished.
+  result = Future[T](future).value
+
 proc finished*[T](future: Future[T]): bool =
   ## Determines whether ``future`` has completed.
   ##
@@ -282,7 +327,7 @@ proc asyncCheck*[T](future: Future[T]) =
   future.callback =
     proc () =
       if future.failed:
-        echoOriginalStackTrace(future)
+        injectStacktrace(future)
         raise future.error
 
 proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
@@ -323,32 +368,34 @@ proc processTimers(p: PDispatcherBase) =
 when defined(windows) or defined(nimdoc):
   import winlean, sets, hashes
   type
-    TCompletionKey = Dword
+    CompletionKey = Dword
 
-    TCompletionData* = object
-      fd*: TAsyncFD # TODO: Rename this.
-      cb*: proc (fd: TAsyncFD, bytesTransferred: Dword,
+    CompletionData* = object
+      fd*: AsyncFD # TODO: Rename this.
+      cb*: proc (fd: AsyncFD, bytesTransferred: Dword,
                 errcode: OSErrorCode) {.closure,gcsafe.}
 
     PDispatcher* = ref object of PDispatcherBase
-      ioPort: THandle
-      handles: HashSet[TAsyncFD]
+      ioPort: Handle
+      handles: HashSet[AsyncFD]
 
-    TCustomOverlapped = object of TOVERLAPPED
-      data*: TCompletionData
+    CustomOverlapped = object of OVERLAPPED
+      data*: CompletionData
 
-    PCustomOverlapped* = ref TCustomOverlapped
+    PCustomOverlapped* = ref CustomOverlapped
 
-    TAsyncFD* = distinct int
+    AsyncFD* = distinct int
+  {.deprecated: [TCompletionKey: CompletionKey, TAsyncFD: AsyncFD,
+                TCustomOverlapped: CustomOverlapped, TCompletionData: CompletionData].}
 
-  proc hash(x: TAsyncFD): THash {.borrow.}
-  proc `==`*(x: TAsyncFD, y: TAsyncFD): bool {.borrow.}
+  proc hash(x: AsyncFD): Hash {.borrow.}
+  proc `==`*(x: AsyncFD, y: AsyncFD): bool {.borrow.}
 
   proc newDispatcher*(): PDispatcher =
     ## Creates a new Dispatcher instance.
     new result
     result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
-    result.handles = initSet[TAsyncFD]()
+    result.handles = initSet[AsyncFD]()
     result.timers = @[]
 
   var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
@@ -357,15 +404,15 @@ when defined(windows) or defined(nimdoc):
     if gDisp.isNil: gDisp = newDispatcher()
     result = gDisp
 
-  proc register*(fd: TAsyncFD) =
+  proc register*(fd: AsyncFD) =
     ## Registers ``fd`` with the dispatcher.
     let p = getGlobalDispatcher()
-    if createIoCompletionPort(fd.THandle, p.ioPort,
-                              cast[TCompletionKey](fd), 1) == 0:
+    if createIoCompletionPort(fd.Handle, p.ioPort,
+                              cast[CompletionKey](fd), 1) == 0:
       raiseOSError(osLastError())
     p.handles.incl(fd)
 
-  proc verifyPresence(fd: TAsyncFD) =
+  proc verifyPresence(fd: AsyncFD) =
     ## Ensures that file descriptor has been registered with the dispatcher.
     let p = getGlobalDispatcher()
     if fd notin p.handles:
@@ -394,7 +441,7 @@ when defined(windows) or defined(nimdoc):
     # TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html
     if res:
       # This is useful for ensuring the reliability of the overlapped struct.
-      assert customOverlapped.data.fd == lpCompletionKey.TAsyncFD
+      assert customOverlapped.data.fd == lpCompletionKey.AsyncFD
 
       customOverlapped.data.cb(customOverlapped.data.fd,
           lpNumberOfBytesTransferred, OSErrorCode(-1))
@@ -402,7 +449,7 @@ when defined(windows) or defined(nimdoc):
     else:
       let errCode = osLastError()
       if customOverlapped != nil:
-        assert customOverlapped.data.fd == lpCompletionKey.TAsyncFD
+        assert customOverlapped.data.fd == lpCompletionKey.AsyncFD
         customOverlapped.data.cb(customOverlapped.data.fd,
             lpNumberOfBytesTransferred, errCode)
         GC_unref(customOverlapped)
@@ -419,16 +466,16 @@ when defined(windows) or defined(nimdoc):
   var acceptExPtr: pointer = nil
   var getAcceptExSockAddrsPtr: pointer = nil
 
-  proc initPointer(s: SocketHandle, fun: var pointer, guid: var TGUID): bool =
+  proc initPointer(s: SocketHandle, fun: var pointer, guid: var GUID): bool =
     # Ref: https://github.com/powdahound/twisted/blob/master/twisted/internet/iocpreactor/iocpsupport/winsock_pointers.c
     var bytesRet: Dword
     fun = nil
     result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid,
-                      sizeof(TGUID).Dword, addr fun, sizeof(pointer).Dword,
+                      sizeof(GUID).Dword, addr fun, sizeof(pointer).Dword,
                       addr bytesRet, nil, nil) == 0
 
   proc initAll() =
-    let dummySock = newRawSocket()
+    let dummySock = newNativeSocket()
     if not initPointer(dummySock, connectExPtr, WSAID_CONNECTEX):
       raiseOSError(osLastError())
     if not initPointer(dummySock, acceptExPtr, WSAID_ACCEPTEX):
@@ -480,8 +527,8 @@ when defined(windows) or defined(nimdoc):
                   dwRemoteAddressLength, LocalSockaddr, LocalSockaddrLength,
                   RemoteSockaddr, RemoteSockaddrLength)
 
-  proc connect*(socket: TAsyncFD, address: string, port: Port,
-    af = AF_INET): Future[void] =
+  proc connect*(socket: AsyncFD, address: string, port: Port,
+    domain = nativesockets.AF_INET): Future[void] =
     ## Connects ``socket`` to server at ``address:port``.
     ##
     ## Returns a ``Future`` which will complete when the connection succeeds
@@ -490,14 +537,14 @@ when defined(windows) or defined(nimdoc):
     var retFuture = newFuture[void]("connect")
     # Apparently ``ConnectEx`` expects the socket to be initially bound:
     var saddr: Sockaddr_in
-    saddr.sin_family = int16(toInt(af))
+    saddr.sin_family = int16(toInt(domain))
     saddr.sin_port = 0
     saddr.sin_addr.s_addr = INADDR_ANY
     if bindAddr(socket.SocketHandle, cast[ptr SockAddr](addr(saddr)),
                   sizeof(saddr).SockLen) < 0'i32:
       raiseOSError(osLastError())
 
-    var aiList = getAddrInfo(address, port, af)
+    var aiList = getAddrInfo(address, port, domain)
     var success = false
     var lastError: OSErrorCode
     var it = aiList
@@ -506,8 +553,8 @@ when defined(windows) or defined(nimdoc):
       # http://blogs.msdn.com/b/oldnewthing/archive/2011/02/02/10123392.aspx
       var ol = PCustomOverlapped()
       GC_ref(ol)
-      ol.data = TCompletionData(fd: socket, cb:
-        proc (fd: TAsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
+      ol.data = CompletionData(fd: socket, cb:
+        proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
           if not retFuture.finished:
             if errcode == OSErrorCode(-1):
               retFuture.complete()
@@ -542,7 +589,7 @@ when defined(windows) or defined(nimdoc):
       retFuture.fail(newException(OSError, osErrorMsg(lastError)))
     return retFuture
 
-  proc recv*(socket: TAsyncFD, size: int,
+  proc recv*(socket: AsyncFD, size: int,
              flags = {SocketFlag.SafeDisconn}): Future[string] =
     ## Reads **up to** ``size`` bytes from ``socket``. Returned future will
     ## complete once all the data requested is read, a part of the data has been
@@ -564,14 +611,14 @@ when defined(windows) or defined(nimdoc):
     var retFuture = newFuture[string]("recv")
     var dataBuf: TWSABuf
     dataBuf.buf = cast[cstring](alloc0(size))
-    dataBuf.len = size
+    dataBuf.len = size.ULONG
 
     var bytesReceived: Dword
     var flagsio = flags.toOSFlags().Dword
     var ol = PCustomOverlapped()
     GC_ref(ol)
-    ol.data = TCompletionData(fd: socket, cb:
-      proc (fd: TAsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
+    ol.data = CompletionData(fd: socket, cb:
+      proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
         if not retFuture.finished:
           if errcode == OSErrorCode(-1):
             if bytesCount == 0 and dataBuf.buf[0] == '\0':
@@ -634,7 +681,94 @@ when defined(windows) or defined(nimdoc):
       # free ``ol``.
     return retFuture
 
-  proc send*(socket: TAsyncFD, data: string,
+  proc recvInto*(socket: AsyncFD, buf: cstring, size: int,
+                flags = {SocketFlag.SafeDisconn}): Future[int] =
+    ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``, which must
+    ## at least be of that size. Returned future will complete once all the
+    ## data requested is read, a part of the data has been read, or the socket
+    ## has disconnected in which case the future will complete with a value of
+    ## ``0``.
+    ##
+    ## **Warning**: The ``Peek`` socket flag is not supported on Windows.
+
+
+    # Things to note:
+    #   * When WSARecv completes immediately then ``bytesReceived`` is very
+    #     unreliable.
+    #   * Still need to implement message-oriented socket disconnection,
+    #     '\0' in the message currently signifies a socket disconnect. Who
+    #     knows what will happen when someone sends that to our socket.
+    verifyPresence(socket)
+    assert SocketFlag.Peek notin flags, "Peek not supported on Windows."
+
+    var retFuture = newFuture[int]("recvInto")
+
+    #buf[] = '\0'
+    var dataBuf: TWSABuf
+    dataBuf.buf = buf
+    dataBuf.len = size.ULONG
+
+    var bytesReceived: Dword
+    var flagsio = flags.toOSFlags().Dword
+    var ol = PCustomOverlapped()
+    GC_ref(ol)
+    ol.data = CompletionData(fd: socket, cb:
+      proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
+        if not retFuture.finished:
+          if errcode == OSErrorCode(-1):
+            if bytesCount == 0 and dataBuf.buf[0] == '\0':
+              retFuture.complete(0)
+            else:
+              retFuture.complete(bytesCount)
+          else:
+            if flags.isDisconnectionError(errcode):
+              retFuture.complete(0)
+            else:
+              retFuture.fail(newException(OSError, osErrorMsg(errcode)))
+        if dataBuf.buf != nil:
+          dataBuf.buf = nil
+    )
+
+    let ret = WSARecv(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
+                      addr flagsio, cast[POVERLAPPED](ol), nil)
+    if ret == -1:
+      let err = osLastError()
+      if err.int32 != ERROR_IO_PENDING:
+        if dataBuf.buf != nil:
+          dataBuf.buf = nil
+        GC_unref(ol)
+        if flags.isDisconnectionError(err):
+          retFuture.complete(0)
+        else:
+          retFuture.fail(newException(OSError, osErrorMsg(err)))
+    elif ret == 0 and bytesReceived == 0 and dataBuf.buf[0] == '\0':
+      # We have to ensure that the buffer is empty because WSARecv will tell
+      # us immediately when it was disconnected, even when there is still
+      # data in the buffer.
+      # We want to give the user as much data as we can. So we only return
+      # the empty string (which signals a disconnection) when there is
+      # nothing left to read.
+      retFuture.complete(0)
+      # TODO: "For message-oriented sockets, where a zero byte message is often
+      # allowable, a failure with an error code of WSAEDISCON is used to
+      # indicate graceful closure."
+      # ~ http://msdn.microsoft.com/en-us/library/ms741688%28v=vs.85%29.aspx
+    else:
+      # Request to read completed immediately.
+      # From my tests bytesReceived isn't reliable.
+      let realSize =
+        if bytesReceived == 0:
+          size
+        else:
+          bytesReceived
+      assert realSize <= size
+      retFuture.complete(realSize)
+      # We don't deallocate ``ol`` here because even though this completed
+      # immediately poll will still be notified about its completion and it will
+      # free ``ol``.
+    return retFuture
+
+  proc send*(socket: AsyncFD, data: string,
              flags = {SocketFlag.SafeDisconn}): Future[void] =
     ## Sends ``data`` to ``socket``. The returned future will complete once all
     ## data has been sent.
@@ -643,13 +777,13 @@ when defined(windows) or defined(nimdoc):
 
     var dataBuf: TWSABuf
     dataBuf.buf = data # since this is not used in a callback, this is fine
-    dataBuf.len = data.len
+    dataBuf.len = data.len.ULONG
 
     var bytesReceived, lowFlags: Dword
     var ol = PCustomOverlapped()
     GC_ref(ol)
-    ol.data = TCompletionData(fd: socket, cb:
-      proc (fd: TAsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
+    ol.data = CompletionData(fd: socket, cb:
+      proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
         if not retFuture.finished:
           if errcode == OSErrorCode(-1):
             retFuture.complete()
@@ -677,8 +811,8 @@ when defined(windows) or defined(nimdoc):
       # free ``ol``.
     return retFuture
 
-  proc acceptAddr*(socket: TAsyncFD, flags = {SocketFlag.SafeDisconn}):
-      Future[tuple[address: string, client: TAsyncFD]] =
+  proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}):
+      Future[tuple[address: string, client: AsyncFD]] =
     ## Accepts a new connection. Returns a future containing the client socket
     ## corresponding to that connection and the remote address of the client.
     ## The future will complete when the connection is successfully accepted.
@@ -691,9 +825,9 @@ when defined(windows) or defined(nimdoc):
     ## flag is specified then this error will not be raised and instead
     ## accept will be called again.
     verifyPresence(socket)
-    var retFuture = newFuture[tuple[address: string, client: TAsyncFD]]("acceptAddr")
+    var retFuture = newFuture[tuple[address: string, client: AsyncFD]]("acceptAddr")
 
-    var clientSock = newRawSocket()
+    var clientSock = newNativeSocket()
     if clientSock == osInvalidSocket: raiseOSError(osLastError())
 
     const lpOutputLen = 1024
@@ -716,11 +850,11 @@ when defined(windows) or defined(nimdoc):
                            dwLocalAddressLength, dwRemoteAddressLength,
                            addr localSockaddr, addr localLen,
                            addr remoteSockaddr, addr remoteLen)
-      register(clientSock.TAsyncFD)
+      register(clientSock.AsyncFD)
       # TODO: IPv6. Check ``sa_family``. http://stackoverflow.com/a/9212542/492186
       retFuture.complete(
         (address: $inet_ntoa(cast[ptr Sockaddr_in](remoteSockAddr).sin_addr),
-         client: clientSock.TAsyncFD)
+         client: clientSock.AsyncFD)
       )
 
     template failAccept(errcode): stmt =
@@ -737,8 +871,8 @@ when defined(windows) or defined(nimdoc):
 
     var ol = PCustomOverlapped()
     GC_ref(ol)
-    ol.data = TCompletionData(fd: socket, cb:
-      proc (fd: TAsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
+    ol.data = CompletionData(fd: socket, cb:
+      proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
         if not retFuture.finished:
           if errcode == OSErrorCode(-1):
             completeAccept()
@@ -766,26 +900,26 @@ when defined(windows) or defined(nimdoc):
 
     return retFuture
 
-  proc newAsyncRawSocket*(domain, typ, protocol: cint): TAsyncFD =
+  proc newAsyncNativeSocket*(domain, sockType, protocol: cint): AsyncFD =
     ## Creates a new socket and registers it with the dispatcher implicitly.
-    result = newRawSocket(domain, typ, protocol).TAsyncFD
+    result = newNativeSocket(domain, sockType, protocol).AsyncFD
     result.SocketHandle.setBlocking(false)
     register(result)
 
-  proc newAsyncRawSocket*(domain: Domain = AF_INET,
-               typ: SockType = SOCK_STREAM,
-               protocol: Protocol = IPPROTO_TCP): TAsyncFD =
+  proc newAsyncNativeSocket*(domain: Domain = nativesockets.AF_INET,
+                             sockType: SockType = SOCK_STREAM,
+                             protocol: Protocol = IPPROTO_TCP): AsyncFD =
     ## Creates a new socket and registers it with the dispatcher implicitly.
-    result = newRawSocket(domain, typ, protocol).TAsyncFD
+    result = newNativeSocket(domain, sockType, protocol).AsyncFD
     result.SocketHandle.setBlocking(false)
     register(result)
 
-  proc closeSocket*(socket: TAsyncFD) =
+  proc closeSocket*(socket: AsyncFD) =
     ## Closes a socket and ensures that it is unregistered.
     socket.SocketHandle.close()
     getGlobalDispatcher().handles.excl(socket)
 
-  proc unregister*(fd: TAsyncFD) =
+  proc unregister*(fd: AsyncFD) =
     ## Unregisters ``fd``.
     getGlobalDispatcher().handles.excl(fd)
 
@@ -805,18 +939,19 @@ else:
                       MSG_NOSIGNAL
 
   type
-    TAsyncFD* = distinct cint
-    TCallback = proc (fd: TAsyncFD): bool {.closure,gcsafe.}
+    AsyncFD* = distinct cint
+    Callback = proc (fd: AsyncFD): bool {.closure,gcsafe.}
 
     PData* = ref object of RootRef
-      fd: TAsyncFD
-      readCBs: seq[TCallback]
-      writeCBs: seq[TCallback]
+      fd: AsyncFD
+      readCBs: seq[Callback]
+      writeCBs: seq[Callback]
 
     PDispatcher* = ref object of PDispatcherBase
       selector: Selector
+  {.deprecated: [TAsyncFD: AsyncFD, TCallback: Callback].}
 
-  proc `==`*(x, y: TAsyncFD): bool {.borrow.}
+  proc `==`*(x, y: AsyncFD): bool {.borrow.}
 
   proc newDispatcher*(): PDispatcher =
     new result
@@ -828,44 +963,49 @@ else:
     if gDisp.isNil: gDisp = newDispatcher()
     result = gDisp
 
-  proc update(fd: TAsyncFD, events: set[Event]) =
+  proc update(fd: AsyncFD, events: set[Event]) =
     let p = getGlobalDispatcher()
     assert fd.SocketHandle in p.selector
-    discard p.selector.update(fd.SocketHandle, events)
+    p.selector.update(fd.SocketHandle, events)
 
-  proc register*(fd: TAsyncFD) =
+  proc register*(fd: AsyncFD) =
     let p = getGlobalDispatcher()
     var data = PData(fd: fd, readCBs: @[], writeCBs: @[])
     p.selector.register(fd.SocketHandle, {}, data.RootRef)
 
-  proc newAsyncRawSocket*(domain: cint, typ: cint, protocol: cint): TAsyncFD =
-    result = newRawSocket(domain, typ, protocol).TAsyncFD
+  proc newAsyncNativeSocket*(domain: cint, sockType: cint,
+                             protocol: cint): AsyncFD =
+    result = newNativeSocket(domain, sockType, protocol).AsyncFD
     result.SocketHandle.setBlocking(false)
+    when defined(macosx):
+      result.SocketHandle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1)
     register(result)
 
-  proc newAsyncRawSocket*(domain: Domain = AF_INET,
-               typ: SockType = SOCK_STREAM,
-               protocol: Protocol = IPPROTO_TCP): TAsyncFD =
-    result = newRawSocket(domain, typ, protocol).TAsyncFD
+  proc newAsyncNativeSocket*(domain: Domain = AF_INET,
+                             sockType: SockType = SOCK_STREAM,
+                             protocol: Protocol = IPPROTO_TCP): AsyncFD =
+    result = newNativeSocket(domain, sockType, protocol).AsyncFD
     result.SocketHandle.setBlocking(false)
+    when defined(macosx):
+      result.SocketHandle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1)
     register(result)
 
-  proc closeSocket*(sock: TAsyncFD) =
+  proc closeSocket*(sock: AsyncFD) =
     let disp = getGlobalDispatcher()
-    sock.SocketHandle.close()
     disp.selector.unregister(sock.SocketHandle)
+    sock.SocketHandle.close()
 
-  proc unregister*(fd: TAsyncFD) =
+  proc unregister*(fd: AsyncFD) =
     getGlobalDispatcher().selector.unregister(fd.SocketHandle)
 
-  proc addRead*(fd: TAsyncFD, cb: TCallback) =
+  proc addRead*(fd: AsyncFD, cb: Callback) =
     let p = getGlobalDispatcher()
     if fd.SocketHandle notin p.selector:
       raise newException(ValueError, "File descriptor not registered.")
     p.selector[fd.SocketHandle].data.PData.readCBs.add(cb)
     update(fd, p.selector[fd.SocketHandle].events + {EvRead})
 
-  proc addWrite*(fd: TAsyncFD, cb: TCallback) =
+  proc addWrite*(fd: AsyncFD, cb: Callback) =
     let p = getGlobalDispatcher()
     if fd.SocketHandle notin p.selector:
       raise newException(ValueError, "File descriptor not registered.")
@@ -876,11 +1016,11 @@ else:
     let p = getGlobalDispatcher()
     for info in p.selector.select(timeout):
       let data = PData(info.key.data)
-      assert data.fd == info.key.fd.TAsyncFD
+      assert data.fd == info.key.fd.AsyncFD
       #echo("In poll ", data.fd.cint)
-      if EvError in info.events:
-        closeSocket(data.fd)
-        continue
+      # There may be EvError here, but we handle them in callbacks,
+      # so that exceptions can be raised from `send(...)` and
+      # `recv(...)` routines.
 
       if EvRead in info.events:
         # Callback may add items to ``data.readCBs`` which causes issues if
@@ -914,16 +1054,25 @@ else:
 
     processTimers(p)
 
-  proc connect*(socket: TAsyncFD, address: string, port: Port,
-    af = AF_INET): Future[void] =
+  proc connect*(socket: AsyncFD, address: string, port: Port,
+    domain = AF_INET): Future[void] =
     var retFuture = newFuture[void]("connect")
 
-    proc cb(fd: TAsyncFD): bool =
-      # We have connected.
-      retFuture.complete()
-      return true
+    proc cb(fd: AsyncFD): bool =
+      var ret = SocketHandle(fd).getSockOptInt(cint(SOL_SOCKET), cint(SO_ERROR))
+      if ret == 0:
+          # We have connected.
+          retFuture.complete()
+          return true
+      elif ret == EINTR:
+          # interrupted, keep waiting
+          return false
+      else:
+          retFuture.fail(newException(OSError, osErrorMsg(OSErrorCode(ret))))
+          return true
 
-    var aiList = getAddrInfo(address, port, af)
+    assert getSockDomain(socket.SocketHandle) == domain
+    var aiList = getAddrInfo(address, port, domain)
     var success = false
     var lastError: OSErrorCode
     var it = aiList
@@ -949,17 +1098,16 @@ else:
       retFuture.fail(newException(OSError, osErrorMsg(lastError)))
     return retFuture
 
-  proc recv*(socket: TAsyncFD, size: int,
+  proc recv*(socket: AsyncFD, size: int,
              flags = {SocketFlag.SafeDisconn}): Future[string] =
     var retFuture = newFuture[string]("recv")
 
     var readBuffer = newString(size)
 
-    proc cb(sock: TAsyncFD): bool =
+    proc cb(sock: AsyncFD): bool =
       result = true
       let res = recv(sock.SocketHandle, addr readBuffer[0], size.cint,
                      flags.toOSFlags())
-      #echo("recv cb res: ", res)
       if res < 0:
         let lastError = osLastError()
         if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
@@ -980,13 +1128,37 @@ else:
     addRead(socket, cb)
     return retFuture
 
-  proc send*(socket: TAsyncFD, data: string,
+  proc recvInto*(socket: AsyncFD, buf: cstring, size: int,
+                  flags = {SocketFlag.SafeDisconn}): Future[int] =
+    var retFuture = newFuture[int]("recvInto")
+
+    proc cb(sock: AsyncFD): bool =
+      result = true
+      let res = recv(sock.SocketHandle, buf, size.cint,
+                     flags.toOSFlags())
+      if res < 0:
+        let lastError = osLastError()
+        if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
+          if flags.isDisconnectionError(lastError):
+            retFuture.complete(0)
+          else:
+            retFuture.fail(newException(OSError, osErrorMsg(lastError)))
+        else:
+          result = false # We still want this callback to be called.
+      else:
+        retFuture.complete(res)
+    # TODO: The following causes a massive slowdown.
+    #if not cb(socket):
+    addRead(socket, cb)
+    return retFuture
+
+  proc send*(socket: AsyncFD, data: string,
              flags = {SocketFlag.SafeDisconn}): Future[void] =
     var retFuture = newFuture[void]("send")
 
     var written = 0
 
-    proc cb(sock: TAsyncFD): bool =
+    proc cb(sock: AsyncFD): bool =
       result = true
       let netSize = data.len-written
       var d = data.cstring
@@ -1012,13 +1184,13 @@ else:
     addWrite(socket, cb)
     return retFuture
 
-  proc acceptAddr*(socket: TAsyncFD, flags = {SocketFlag.SafeDisconn}):
-      Future[tuple[address: string, client: TAsyncFD]] =
+  proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}):
+      Future[tuple[address: string, client: AsyncFD]] =
     var retFuture = newFuture[tuple[address: string,
-        client: TAsyncFD]]("acceptAddr")
-    proc cb(sock: TAsyncFD): bool =
+        client: AsyncFD]]("acceptAddr")
+    proc cb(sock: AsyncFD): bool =
       result = true
-      var sockAddress: SockAddr_in
+      var sockAddress: Sockaddr_storage
       var addrLen = sizeof(sockAddress).Socklen
       var client = accept(sock.SocketHandle,
                           cast[ptr SockAddr](addr(sockAddress)), addr(addrLen))
@@ -1033,28 +1205,28 @@ else:
           else:
             retFuture.fail(newException(OSError, osErrorMsg(lastError)))
       else:
-        register(client.TAsyncFD)
-        retFuture.complete(($inet_ntoa(sockAddress.sin_addr), client.TAsyncFD))
+        register(client.AsyncFD)
+        retFuture.complete((getAddrString(cast[ptr SockAddr](addr sockAddress)), client.AsyncFD))
     addRead(socket, cb)
     return retFuture
 
 proc sleepAsync*(ms: int): Future[void] =
   ## Suspends the execution of the current async procedure for the next
-  ## ``ms`` miliseconds.
+  ## ``ms`` milliseconds.
   var retFuture = newFuture[void]("sleepAsync")
   let p = getGlobalDispatcher()
   p.timers.add((epochTime() + (ms / 1000), retFuture))
   return retFuture
 
-proc accept*(socket: TAsyncFD,
-    flags = {SocketFlag.SafeDisconn}): Future[TAsyncFD] =
+proc accept*(socket: AsyncFD,
+    flags = {SocketFlag.SafeDisconn}): Future[AsyncFD] =
   ## Accepts a new connection. Returns a future containing the client socket
   ## corresponding to that connection.
   ## The future will complete when the connection is successfully accepted.
-  var retFut = newFuture[TAsyncFD]("accept")
+  var retFut = newFuture[AsyncFD]("accept")
   var fut = acceptAddr(socket, flags)
   fut.callback =
-    proc (future: Future[tuple[address: string, client: TAsyncFD]]) =
+    proc (future: Future[tuple[address: string, client: AsyncFD]]) =
       assert future.finished
       if future.failed:
         retFut.fail(future.error)
@@ -1214,7 +1386,7 @@ proc processBody(node, retFutureSym: NimNode,
     else: discard
   of nnkDiscardStmt:
     # discard await x
-    if node[0].kind != nnkEmpty and node[0][0].kind == nnkIdent and
+    if node[0].kind == nnkCommand and node[0][0].kind == nnkIdent and
           node[0][0].ident == !"await":
       var newDiscard = node
       result.createVar("futureDiscard_" & $toStrLit(node[0][1]), node[0][1],
@@ -1286,26 +1458,35 @@ proc getName(node: NimNode): string {.compileTime.} =
   else:
     error("Unknown name.")
 
-macro async*(prc: stmt): stmt {.immediate.} =
-  ## Macro which processes async procedures into the appropriate
-  ## iterators and yield statements.
+proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} =
+  ## This macro transforms a single procedure into a closure iterator.
+  ## The ``async`` macro supports a stmtList holding multiple async procedures.
   if prc.kind notin {nnkProcDef, nnkLambda}:
-    error("Cannot transform this node kind into an async proc." &
-          " Proc definition or lambda node expected.")
+      error("Cannot transform this node kind into an async proc." &
+            " Proc definition or lambda node expected.")
 
   hint("Processing " & prc[0].getName & " as an async proc.")
 
   let returnType = prc[3][0]
+  var baseType: NimNode
   # Verify that the return type is a Future[T]
-  if returnType.kind == nnkIdent:
-    error("Expected return type of 'Future' got '" & $returnType & "'")
-  elif returnType.kind == nnkBracketExpr:
-    if $returnType[0] != "Future":
-      error("Expected return type of 'Future' got '" & $returnType[0] & "'")
+  if returnType.kind == nnkBracketExpr:
+    let fut = repr(returnType[0])
+    if fut != "Future":
+      error("Expected return type of 'Future' got '" & fut & "'")
+    baseType = returnType[1]
+  elif returnType.kind in nnkCallKinds and $returnType[0] == "[]":
+    let fut = repr(returnType[1])
+    if fut != "Future":
+      error("Expected return type of 'Future' got '" & fut & "'")
+    baseType = returnType[2]
+  elif returnType.kind == nnkEmpty:
+    baseType = returnType
+  else:
+    error("Expected return type of 'Future' got '" & repr(returnType) & "'")
 
   let subtypeIsVoid = returnType.kind == nnkEmpty or
-        (returnType.kind == nnkBracketExpr and
-         returnType[1].kind == nnkIdent and returnType[1].ident == !"void")
+        (baseType.kind == nnkIdent and returnType[1].ident == !"void")
 
   var outerProcBody = newNimNode(nnkStmtList, prc[6])
 
@@ -1313,7 +1494,7 @@ macro async*(prc: stmt): stmt {.immediate.} =
   var retFutureSym = genSym(nskVar, "retFuture")
   var subRetType =
     if returnType.kind == nnkEmpty: newIdentNode("void")
-    else: returnType[1]
+    else: baseType
   outerProcBody.add(
     newVarStmt(retFutureSym,
       newCall(
@@ -1337,7 +1518,7 @@ macro async*(prc: stmt): stmt {.immediate.} =
       newIdentNode("off")))) # -> {.push warning[resultshadowed]: off.}
 
     procBody.insert(1, newNimNode(nnkVarSection, prc[6]).add(
-      newIdentDefs(newIdentNode("result"), returnType[1]))) # -> var result: T
+      newIdentDefs(newIdentNode("result"), baseType))) # -> var result: T
 
     procBody.insert(2, newNimNode(nnkPragma).add(
       newIdentNode("pop"))) # -> {.pop.})
@@ -1378,10 +1559,20 @@ macro async*(prc: stmt): stmt {.immediate.} =
   result[6] = outerProcBody
 
   #echo(treeRepr(result))
-  #if prc[0].getName == "test":
+  #if prc[0].getName == "hubConnectionLoop":
   #  echo(toStrLit(result))
 
-proc recvLine*(socket: TAsyncFD): Future[string] {.async.} =
+macro async*(prc: stmt): stmt {.immediate.} =
+  ## Macro which processes async procedures into the appropriate
+  ## iterators and yield statements.
+  if prc.kind == nnkStmtList:
+    for oneProc in prc:
+      result = newStmtList()
+      result.add asyncSingleProc(oneProc)
+  else:
+    result = asyncSingleProc(prc)
+
+proc recvLine*(socket: AsyncFD): Future[string] {.async.} =
   ## Reads a line of data from ``socket``. Returned future will complete once
   ## a full line is read or an error occurs.
   ##