summary refs log tree commit diff stats
path: root/lib/pure/asyncdispatch.nim
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pure/asyncdispatch.nim')
-rw-r--r--lib/pure/asyncdispatch.nim303
1 files changed, 209 insertions, 94 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim
index ca5b13c78..7523b29d5 100644
--- a/lib/pure/asyncdispatch.nim
+++ b/lib/pure/asyncdispatch.nim
@@ -122,7 +122,6 @@ export Port, SocketFlag
 ## Limitations/Bugs
 ## ----------------
 ##
-## * ``except`` statement (without `try`) does not work inside async procedures.
 ## * The effect system (``raises: []``) does not work with async procedures.
 ## * Can't await in a ``except`` body
 
@@ -323,32 +322,34 @@ proc processTimers(p: PDispatcherBase) =
 when defined(windows) or defined(nimdoc):
   import winlean, sets, hashes
   type
-    TCompletionKey = Dword
+    CompletionKey = Dword
 
-    TCompletionData* = object
-      fd*: TAsyncFD # TODO: Rename this.
-      cb*: proc (fd: TAsyncFD, bytesTransferred: Dword,
+    CompletionData* = object
+      fd*: AsyncFD # TODO: Rename this.
+      cb*: proc (fd: AsyncFD, bytesTransferred: Dword,
                 errcode: OSErrorCode) {.closure,gcsafe.}
 
     PDispatcher* = ref object of PDispatcherBase
-      ioPort: THandle
-      handles: HashSet[TAsyncFD]
+      ioPort: Handle
+      handles: HashSet[AsyncFD]
 
-    TCustomOverlapped = object of TOVERLAPPED
-      data*: TCompletionData
+    CustomOverlapped = object of OVERLAPPED
+      data*: CompletionData
 
-    PCustomOverlapped* = ref TCustomOverlapped
+    PCustomOverlapped* = ref CustomOverlapped
 
-    TAsyncFD* = distinct int
+    AsyncFD* = distinct int
+  {.deprecated: [TCompletionKey: CompletionKey, TAsyncFD: AsyncFD,
+                TCustomOverlapped: CustomOverlapped, TCompletionData: CompletionData].}
 
-  proc hash(x: TAsyncFD): THash {.borrow.}
-  proc `==`*(x: TAsyncFD, y: TAsyncFD): bool {.borrow.}
+  proc hash(x: AsyncFD): Hash {.borrow.}
+  proc `==`*(x: AsyncFD, y: AsyncFD): bool {.borrow.}
 
   proc newDispatcher*(): PDispatcher =
     ## Creates a new Dispatcher instance.
     new result
     result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
-    result.handles = initSet[TAsyncFD]()
+    result.handles = initSet[AsyncFD]()
     result.timers = @[]
 
   var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
@@ -357,15 +358,15 @@ when defined(windows) or defined(nimdoc):
     if gDisp.isNil: gDisp = newDispatcher()
     result = gDisp
 
-  proc register*(fd: TAsyncFD) =
+  proc register*(fd: AsyncFD) =
     ## Registers ``fd`` with the dispatcher.
     let p = getGlobalDispatcher()
-    if createIoCompletionPort(fd.THandle, p.ioPort,
-                              cast[TCompletionKey](fd), 1) == 0:
+    if createIoCompletionPort(fd.Handle, p.ioPort,
+                              cast[CompletionKey](fd), 1) == 0:
       raiseOSError(osLastError())
     p.handles.incl(fd)
 
-  proc verifyPresence(fd: TAsyncFD) =
+  proc verifyPresence(fd: AsyncFD) =
     ## Ensures that file descriptor has been registered with the dispatcher.
     let p = getGlobalDispatcher()
     if fd notin p.handles:
@@ -394,7 +395,7 @@ when defined(windows) or defined(nimdoc):
     # TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html
     if res:
       # This is useful for ensuring the reliability of the overlapped struct.
-      assert customOverlapped.data.fd == lpCompletionKey.TAsyncFD
+      assert customOverlapped.data.fd == lpCompletionKey.AsyncFD
 
       customOverlapped.data.cb(customOverlapped.data.fd,
           lpNumberOfBytesTransferred, OSErrorCode(-1))
@@ -402,7 +403,7 @@ when defined(windows) or defined(nimdoc):
     else:
       let errCode = osLastError()
       if customOverlapped != nil:
-        assert customOverlapped.data.fd == lpCompletionKey.TAsyncFD
+        assert customOverlapped.data.fd == lpCompletionKey.AsyncFD
         customOverlapped.data.cb(customOverlapped.data.fd,
             lpNumberOfBytesTransferred, errCode)
         GC_unref(customOverlapped)
@@ -419,12 +420,12 @@ when defined(windows) or defined(nimdoc):
   var acceptExPtr: pointer = nil
   var getAcceptExSockAddrsPtr: pointer = nil
 
-  proc initPointer(s: SocketHandle, fun: var pointer, guid: var TGUID): bool =
+  proc initPointer(s: SocketHandle, fun: var pointer, guid: var GUID): bool =
     # Ref: https://github.com/powdahound/twisted/blob/master/twisted/internet/iocpreactor/iocpsupport/winsock_pointers.c
     var bytesRet: Dword
     fun = nil
     result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid,
-                      sizeof(TGUID).Dword, addr fun, sizeof(pointer).Dword,
+                      sizeof(GUID).Dword, addr fun, sizeof(pointer).Dword,
                       addr bytesRet, nil, nil) == 0
 
   proc initAll() =
@@ -480,8 +481,8 @@ when defined(windows) or defined(nimdoc):
                   dwRemoteAddressLength, LocalSockaddr, LocalSockaddrLength,
                   RemoteSockaddr, RemoteSockaddrLength)
 
-  proc connect*(socket: TAsyncFD, address: string, port: Port,
-    af = AF_INET): Future[void] =
+  proc connect*(socket: AsyncFD, address: string, port: Port,
+    domain = rawsockets.AF_INET): Future[void] =
     ## Connects ``socket`` to server at ``address:port``.
     ##
     ## Returns a ``Future`` which will complete when the connection succeeds
@@ -490,14 +491,14 @@ when defined(windows) or defined(nimdoc):
     var retFuture = newFuture[void]("connect")
     # Apparently ``ConnectEx`` expects the socket to be initially bound:
     var saddr: Sockaddr_in
-    saddr.sin_family = int16(toInt(af))
+    saddr.sin_family = int16(toInt(domain))
     saddr.sin_port = 0
     saddr.sin_addr.s_addr = INADDR_ANY
     if bindAddr(socket.SocketHandle, cast[ptr SockAddr](addr(saddr)),
                   sizeof(saddr).SockLen) < 0'i32:
       raiseOSError(osLastError())
 
-    var aiList = getAddrInfo(address, port, af)
+    var aiList = getAddrInfo(address, port, domain)
     var success = false
     var lastError: OSErrorCode
     var it = aiList
@@ -506,8 +507,8 @@ when defined(windows) or defined(nimdoc):
       # http://blogs.msdn.com/b/oldnewthing/archive/2011/02/02/10123392.aspx
       var ol = PCustomOverlapped()
       GC_ref(ol)
-      ol.data = TCompletionData(fd: socket, cb:
-        proc (fd: TAsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
+      ol.data = CompletionData(fd: socket, cb:
+        proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
           if not retFuture.finished:
             if errcode == OSErrorCode(-1):
               retFuture.complete()
@@ -542,7 +543,7 @@ when defined(windows) or defined(nimdoc):
       retFuture.fail(newException(OSError, osErrorMsg(lastError)))
     return retFuture
 
-  proc recv*(socket: TAsyncFD, size: int,
+  proc recv*(socket: AsyncFD, size: int,
              flags = {SocketFlag.SafeDisconn}): Future[string] =
     ## Reads **up to** ``size`` bytes from ``socket``. Returned future will
     ## complete once all the data requested is read, a part of the data has been
@@ -570,8 +571,8 @@ when defined(windows) or defined(nimdoc):
     var flagsio = flags.toOSFlags().Dword
     var ol = PCustomOverlapped()
     GC_ref(ol)
-    ol.data = TCompletionData(fd: socket, cb:
-      proc (fd: TAsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
+    ol.data = CompletionData(fd: socket, cb:
+      proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
         if not retFuture.finished:
           if errcode == OSErrorCode(-1):
             if bytesCount == 0 and dataBuf.buf[0] == '\0':
@@ -634,7 +635,94 @@ when defined(windows) or defined(nimdoc):
       # free ``ol``.
     return retFuture
 
-  proc send*(socket: TAsyncFD, data: string,
+  proc recvInto*(socket: AsyncFD, buf: cstring, size: int,
+                flags = {SocketFlag.SafeDisconn}): Future[int] =
+    ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``, which must
+    ## at least be of that size. Returned future will complete once all the
+    ## data requested is read, a part of the data has been read, or the socket
+    ## has disconnected in which case the future will complete with a value of
+    ## ``0``.
+    ##
+    ## **Warning**: The ``Peek`` socket flag is not supported on Windows.
+
+
+    # Things to note:
+    #   * When WSARecv completes immediately then ``bytesReceived`` is very
+    #     unreliable.
+    #   * Still need to implement message-oriented socket disconnection,
+    #     '\0' in the message currently signifies a socket disconnect. Who
+    #     knows what will happen when someone sends that to our socket.
+    verifyPresence(socket)
+    assert SocketFlag.Peek notin flags, "Peek not supported on Windows."
+
+    var retFuture = newFuture[int]("recvInto")
+
+    #buf[] = '\0'
+    var dataBuf: TWSABuf
+    dataBuf.buf = buf
+    dataBuf.len = size
+
+    var bytesReceived: Dword
+    var flagsio = flags.toOSFlags().Dword
+    var ol = PCustomOverlapped()
+    GC_ref(ol)
+    ol.data = CompletionData(fd: socket, cb:
+      proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
+        if not retFuture.finished:
+          if errcode == OSErrorCode(-1):
+            if bytesCount == 0 and dataBuf.buf[0] == '\0':
+              retFuture.complete(0)
+            else:
+              retFuture.complete(bytesCount)
+          else:
+            if flags.isDisconnectionError(errcode):
+              retFuture.complete(0)
+            else:
+              retFuture.fail(newException(OSError, osErrorMsg(errcode)))
+        if dataBuf.buf != nil:
+          dataBuf.buf = nil
+    )
+
+    let ret = WSARecv(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived,
+                      addr flagsio, cast[POVERLAPPED](ol), nil)
+    if ret == -1:
+      let err = osLastError()
+      if err.int32 != ERROR_IO_PENDING:
+        if dataBuf.buf != nil:
+          dataBuf.buf = nil
+        GC_unref(ol)
+        if flags.isDisconnectionError(err):
+          retFuture.complete(0)
+        else:
+          retFuture.fail(newException(OSError, osErrorMsg(err)))
+    elif ret == 0 and bytesReceived == 0 and dataBuf.buf[0] == '\0':
+      # We have to ensure that the buffer is empty because WSARecv will tell
+      # us immediately when it was disconnected, even when there is still
+      # data in the buffer.
+      # We want to give the user as much data as we can. So we only return
+      # the empty string (which signals a disconnection) when there is
+      # nothing left to read.
+      retFuture.complete(0)
+      # TODO: "For message-oriented sockets, where a zero byte message is often
+      # allowable, a failure with an error code of WSAEDISCON is used to
+      # indicate graceful closure."
+      # ~ http://msdn.microsoft.com/en-us/library/ms741688%28v=vs.85%29.aspx
+    else:
+      # Request to read completed immediately.
+      # From my tests bytesReceived isn't reliable.
+      let realSize =
+        if bytesReceived == 0:
+          size
+        else:
+          bytesReceived
+      assert realSize <= size
+      retFuture.complete(realSize)
+      # We don't deallocate ``ol`` here because even though this completed
+      # immediately poll will still be notified about its completion and it will
+      # free ``ol``.
+    return retFuture
+
+  proc send*(socket: AsyncFD, data: string,
              flags = {SocketFlag.SafeDisconn}): Future[void] =
     ## Sends ``data`` to ``socket``. The returned future will complete once all
     ## data has been sent.
@@ -648,8 +736,8 @@ when defined(windows) or defined(nimdoc):
     var bytesReceived, lowFlags: Dword
     var ol = PCustomOverlapped()
     GC_ref(ol)
-    ol.data = TCompletionData(fd: socket, cb:
-      proc (fd: TAsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
+    ol.data = CompletionData(fd: socket, cb:
+      proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
         if not retFuture.finished:
           if errcode == OSErrorCode(-1):
             retFuture.complete()
@@ -677,8 +765,8 @@ when defined(windows) or defined(nimdoc):
       # free ``ol``.
     return retFuture
 
-  proc acceptAddr*(socket: TAsyncFD, flags = {SocketFlag.SafeDisconn}):
-      Future[tuple[address: string, client: TAsyncFD]] =
+  proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}):
+      Future[tuple[address: string, client: AsyncFD]] =
     ## Accepts a new connection. Returns a future containing the client socket
     ## corresponding to that connection and the remote address of the client.
     ## The future will complete when the connection is successfully accepted.
@@ -691,7 +779,7 @@ when defined(windows) or defined(nimdoc):
     ## flag is specified then this error will not be raised and instead
     ## accept will be called again.
     verifyPresence(socket)
-    var retFuture = newFuture[tuple[address: string, client: TAsyncFD]]("acceptAddr")
+    var retFuture = newFuture[tuple[address: string, client: AsyncFD]]("acceptAddr")
 
     var clientSock = newRawSocket()
     if clientSock == osInvalidSocket: raiseOSError(osLastError())
@@ -716,11 +804,11 @@ when defined(windows) or defined(nimdoc):
                            dwLocalAddressLength, dwRemoteAddressLength,
                            addr localSockaddr, addr localLen,
                            addr remoteSockaddr, addr remoteLen)
-      register(clientSock.TAsyncFD)
+      register(clientSock.AsyncFD)
       # TODO: IPv6. Check ``sa_family``. http://stackoverflow.com/a/9212542/492186
       retFuture.complete(
         (address: $inet_ntoa(cast[ptr Sockaddr_in](remoteSockAddr).sin_addr),
-         client: clientSock.TAsyncFD)
+         client: clientSock.AsyncFD)
       )
 
     template failAccept(errcode): stmt =
@@ -737,8 +825,8 @@ when defined(windows) or defined(nimdoc):
 
     var ol = PCustomOverlapped()
     GC_ref(ol)
-    ol.data = TCompletionData(fd: socket, cb:
-      proc (fd: TAsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
+    ol.data = CompletionData(fd: socket, cb:
+      proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
         if not retFuture.finished:
           if errcode == OSErrorCode(-1):
             completeAccept()
@@ -766,26 +854,26 @@ when defined(windows) or defined(nimdoc):
 
     return retFuture
 
-  proc newAsyncRawSocket*(domain, typ, protocol: cint): TAsyncFD =
+  proc newAsyncRawSocket*(domain, sockType, protocol: cint): AsyncFD =
     ## Creates a new socket and registers it with the dispatcher implicitly.
-    result = newRawSocket(domain, typ, protocol).TAsyncFD
+    result = newRawSocket(domain, sockType, protocol).AsyncFD
     result.SocketHandle.setBlocking(false)
     register(result)
 
-  proc newAsyncRawSocket*(domain: Domain = AF_INET,
-               typ: SockType = SOCK_STREAM,
-               protocol: Protocol = IPPROTO_TCP): TAsyncFD =
+  proc newAsyncRawSocket*(domain: Domain = rawsockets.AF_INET,
+               sockType: SockType = SOCK_STREAM,
+               protocol: Protocol = IPPROTO_TCP): AsyncFD =
     ## Creates a new socket and registers it with the dispatcher implicitly.
-    result = newRawSocket(domain, typ, protocol).TAsyncFD
+    result = newRawSocket(domain, sockType, protocol).AsyncFD
     result.SocketHandle.setBlocking(false)
     register(result)
 
-  proc closeSocket*(socket: TAsyncFD) =
+  proc closeSocket*(socket: AsyncFD) =
     ## Closes a socket and ensures that it is unregistered.
     socket.SocketHandle.close()
     getGlobalDispatcher().handles.excl(socket)
 
-  proc unregister*(fd: TAsyncFD) =
+  proc unregister*(fd: AsyncFD) =
     ## Unregisters ``fd``.
     getGlobalDispatcher().handles.excl(fd)
 
@@ -805,18 +893,19 @@ else:
                       MSG_NOSIGNAL
 
   type
-    TAsyncFD* = distinct cint
-    TCallback = proc (fd: TAsyncFD): bool {.closure,gcsafe.}
+    AsyncFD* = distinct cint
+    Callback = proc (fd: AsyncFD): bool {.closure,gcsafe.}
 
     PData* = ref object of RootRef
-      fd: TAsyncFD
-      readCBs: seq[TCallback]
-      writeCBs: seq[TCallback]
+      fd: AsyncFD
+      readCBs: seq[Callback]
+      writeCBs: seq[Callback]
 
     PDispatcher* = ref object of PDispatcherBase
       selector: Selector
+  {.deprecated: [TAsyncFD: AsyncFD, TCallback: Callback].}
 
-  proc `==`*(x, y: TAsyncFD): bool {.borrow.}
+  proc `==`*(x, y: AsyncFD): bool {.borrow.}
 
   proc newDispatcher*(): PDispatcher =
     new result
@@ -828,48 +917,49 @@ else:
     if gDisp.isNil: gDisp = newDispatcher()
     result = gDisp
 
-  proc update(fd: TAsyncFD, events: set[Event]) =
+  proc update(fd: AsyncFD, events: set[Event]) =
     let p = getGlobalDispatcher()
     assert fd.SocketHandle in p.selector
-    discard p.selector.update(fd.SocketHandle, events)
+    p.selector.update(fd.SocketHandle, events)
 
-  proc register*(fd: TAsyncFD) =
+  proc register*(fd: AsyncFD) =
     let p = getGlobalDispatcher()
     var data = PData(fd: fd, readCBs: @[], writeCBs: @[])
     p.selector.register(fd.SocketHandle, {}, data.RootRef)
 
-  proc newAsyncRawSocket*(domain: cint, typ: cint, protocol: cint): TAsyncFD =
-    result = newRawSocket(domain, typ, protocol).TAsyncFD
+  proc newAsyncRawSocket*(domain: cint, sockType: cint,
+      protocol: cint): AsyncFD =
+    result = newRawSocket(domain, sockType, protocol).AsyncFD
     result.SocketHandle.setBlocking(false)
     when defined(macosx):
-        result.SocketHandle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1)
+      result.SocketHandle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1)
     register(result)
 
   proc newAsyncRawSocket*(domain: Domain = AF_INET,
-               typ: SockType = SOCK_STREAM,
-               protocol: Protocol = IPPROTO_TCP): TAsyncFD =
-    result = newRawSocket(domain, typ, protocol).TAsyncFD
+               sockType: SockType = SOCK_STREAM,
+               protocol: Protocol = IPPROTO_TCP): AsyncFD =
+    result = newRawSocket(domain, sockType, protocol).AsyncFD
     result.SocketHandle.setBlocking(false)
     when defined(macosx):
-        result.SocketHandle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1)
+      result.SocketHandle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1)
     register(result)
 
-  proc closeSocket*(sock: TAsyncFD) =
+  proc closeSocket*(sock: AsyncFD) =
     let disp = getGlobalDispatcher()
     sock.SocketHandle.close()
     disp.selector.unregister(sock.SocketHandle)
 
-  proc unregister*(fd: TAsyncFD) =
+  proc unregister*(fd: AsyncFD) =
     getGlobalDispatcher().selector.unregister(fd.SocketHandle)
 
-  proc addRead*(fd: TAsyncFD, cb: TCallback) =
+  proc addRead*(fd: AsyncFD, cb: Callback) =
     let p = getGlobalDispatcher()
     if fd.SocketHandle notin p.selector:
       raise newException(ValueError, "File descriptor not registered.")
     p.selector[fd.SocketHandle].data.PData.readCBs.add(cb)
     update(fd, p.selector[fd.SocketHandle].events + {EvRead})
 
-  proc addWrite*(fd: TAsyncFD, cb: TCallback) =
+  proc addWrite*(fd: AsyncFD, cb: Callback) =
     let p = getGlobalDispatcher()
     if fd.SocketHandle notin p.selector:
       raise newException(ValueError, "File descriptor not registered.")
@@ -880,7 +970,7 @@ else:
     let p = getGlobalDispatcher()
     for info in p.selector.select(timeout):
       let data = PData(info.key.data)
-      assert data.fd == info.key.fd.TAsyncFD
+      assert data.fd == info.key.fd.AsyncFD
       #echo("In poll ", data.fd.cint)
       # There may be EvError here, but we handle them in callbacks,
       # so that exceptions can be raised from `send(...)` and
@@ -918,11 +1008,11 @@ else:
 
     processTimers(p)
 
-  proc connect*(socket: TAsyncFD, address: string, port: Port,
-    af = AF_INET): Future[void] =
+  proc connect*(socket: AsyncFD, address: string, port: Port,
+    domain = AF_INET): Future[void] =
     var retFuture = newFuture[void]("connect")
 
-    proc cb(fd: TAsyncFD): bool =
+    proc cb(fd: AsyncFD): bool =
       var ret = SocketHandle(fd).getSockOptInt(cint(SOL_SOCKET), cint(SO_ERROR))
       if ret == 0:
           # We have connected.
@@ -935,7 +1025,8 @@ else:
           retFuture.fail(newException(OSError, osErrorMsg(OSErrorCode(ret))))
           return true
 
-    var aiList = getAddrInfo(address, port, af)
+    assert getSockDomain(socket.SocketHandle) == domain
+    var aiList = getAddrInfo(address, port, domain)
     var success = false
     var lastError: OSErrorCode
     var it = aiList
@@ -961,13 +1052,13 @@ else:
       retFuture.fail(newException(OSError, osErrorMsg(lastError)))
     return retFuture
 
-  proc recv*(socket: TAsyncFD, size: int,
+  proc recv*(socket: AsyncFD, size: int,
              flags = {SocketFlag.SafeDisconn}): Future[string] =
     var retFuture = newFuture[string]("recv")
 
     var readBuffer = newString(size)
 
-    proc cb(sock: TAsyncFD): bool =
+    proc cb(sock: AsyncFD): bool =
       result = true
       let res = recv(sock.SocketHandle, addr readBuffer[0], size.cint,
                      flags.toOSFlags())
@@ -991,13 +1082,37 @@ else:
     addRead(socket, cb)
     return retFuture
 
-  proc send*(socket: TAsyncFD, data: string,
+  proc recvInto*(socket: AsyncFD, buf: cstring, size: int,
+                  flags = {SocketFlag.SafeDisconn}): Future[int] =
+    var retFuture = newFuture[int]("recvInto")
+
+    proc cb(sock: AsyncFD): bool =
+      result = true
+      let res = recv(sock.SocketHandle, buf, size.cint,
+                     flags.toOSFlags())
+      if res < 0:
+        let lastError = osLastError()
+        if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
+          if flags.isDisconnectionError(lastError):
+            retFuture.complete(0)
+          else:
+            retFuture.fail(newException(OSError, osErrorMsg(lastError)))
+        else:
+          result = false # We still want this callback to be called.
+      else:
+        retFuture.complete(res)
+    # TODO: The following causes a massive slowdown.
+    #if not cb(socket):
+    addRead(socket, cb)
+    return retFuture
+
+  proc send*(socket: AsyncFD, data: string,
              flags = {SocketFlag.SafeDisconn}): Future[void] =
     var retFuture = newFuture[void]("send")
 
     var written = 0
 
-    proc cb(sock: TAsyncFD): bool =
+    proc cb(sock: AsyncFD): bool =
       result = true
       let netSize = data.len-written
       var d = data.cstring
@@ -1023,13 +1138,13 @@ else:
     addWrite(socket, cb)
     return retFuture
 
-  proc acceptAddr*(socket: TAsyncFD, flags = {SocketFlag.SafeDisconn}):
-      Future[tuple[address: string, client: TAsyncFD]] =
+  proc acceptAddr*(socket: AsyncFD, flags = {SocketFlag.SafeDisconn}):
+      Future[tuple[address: string, client: AsyncFD]] =
     var retFuture = newFuture[tuple[address: string,
-        client: TAsyncFD]]("acceptAddr")
-    proc cb(sock: TAsyncFD): bool =
+        client: AsyncFD]]("acceptAddr")
+    proc cb(sock: AsyncFD): bool =
       result = true
-      var sockAddress: SockAddr_in
+      var sockAddress: Sockaddr_storage
       var addrLen = sizeof(sockAddress).Socklen
       var client = accept(sock.SocketHandle,
                           cast[ptr SockAddr](addr(sockAddress)), addr(addrLen))
@@ -1044,28 +1159,28 @@ else:
           else:
             retFuture.fail(newException(OSError, osErrorMsg(lastError)))
       else:
-        register(client.TAsyncFD)
-        retFuture.complete(($inet_ntoa(sockAddress.sin_addr), client.TAsyncFD))
+        register(client.AsyncFD)
+        retFuture.complete((getAddrString(cast[ptr SockAddr](addr sockAddress)), client.AsyncFD))
     addRead(socket, cb)
     return retFuture
 
 proc sleepAsync*(ms: int): Future[void] =
   ## Suspends the execution of the current async procedure for the next
-  ## ``ms`` miliseconds.
+  ## ``ms`` milliseconds.
   var retFuture = newFuture[void]("sleepAsync")
   let p = getGlobalDispatcher()
   p.timers.add((epochTime() + (ms / 1000), retFuture))
   return retFuture
 
-proc accept*(socket: TAsyncFD,
-    flags = {SocketFlag.SafeDisconn}): Future[TAsyncFD] =
+proc accept*(socket: AsyncFD,
+    flags = {SocketFlag.SafeDisconn}): Future[AsyncFD] =
   ## Accepts a new connection. Returns a future containing the client socket
   ## corresponding to that connection.
   ## The future will complete when the connection is successfully accepted.
-  var retFut = newFuture[TAsyncFD]("accept")
+  var retFut = newFuture[AsyncFD]("accept")
   var fut = acceptAddr(socket, flags)
   fut.callback =
-    proc (future: Future[tuple[address: string, client: TAsyncFD]]) =
+    proc (future: Future[tuple[address: string, client: AsyncFD]]) =
       assert future.finished
       if future.failed:
         retFut.fail(future.error)
@@ -1225,7 +1340,7 @@ proc processBody(node, retFutureSym: NimNode,
     else: discard
   of nnkDiscardStmt:
     # discard await x
-    if node[0].kind != nnkEmpty and node[0][0].kind == nnkIdent and
+    if node[0].kind == nnkCommand and node[0][0].kind == nnkIdent and
           node[0][0].ident == !"await":
       var newDiscard = node
       result.createVar("futureDiscard_" & $toStrLit(node[0][1]), node[0][1],
@@ -1389,10 +1504,10 @@ macro async*(prc: stmt): stmt {.immediate.} =
   result[6] = outerProcBody
 
   #echo(treeRepr(result))
-  #if prc[0].getName == "test":
-  #  echo(toStrLit(result))
+  if prc[0].getName == "getAsync":
+    echo(toStrLit(result))
 
-proc recvLine*(socket: TAsyncFD): Future[string] {.async.} =
+proc recvLine*(socket: AsyncFD): Future[string] {.async.} =
   ## Reads a line of data from ``socket``. Returned future will complete once
   ## a full line is read or an error occurs.
   ##