summary refs log tree commit diff stats
path: root/lib/pure/asyncdispatch.nim
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pure/asyncdispatch.nim')
-rw-r--r--lib/pure/asyncdispatch.nim1013
1 files changed, 1013 insertions, 0 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim
new file mode 100644
index 000000000..67361e46c
--- /dev/null
+++ b/lib/pure/asyncdispatch.nim
@@ -0,0 +1,1013 @@
+#
+#
+#            Nimrod's Runtime Library
+#        (c) Copyright 2014 Dominik Picheta
+#
+#    See the file "copying.txt", included in this
+#    distribution, for details about the copyright.
+#
+
+import os, oids, tables, strutils, macros
+
+import rawsockets
+
+## AsyncDispatch
+## --------
+##
+## This module implements a brand new dispatcher based on Futures.
+## On Windows IOCP is used and on other operating systems the selectors module
+## is used instead.
+
+# -- Futures
+
+type
+  PFutureBase* = ref object of PObject
+    cb: proc () {.closure.}
+    finished: bool
+
+  PFuture*[T] = ref object of PFutureBase
+    value: T
+    error*: ref EBase # TODO: This shouldn't be necessary, generics bug?
+
+proc newFuture*[T](): PFuture[T] =
+  ## Creates a new future.
+  new(result)
+  result.finished = false
+
+proc complete*[T](future: PFuture[T], val: T) =
+  ## Completes ``future`` with value ``val``.
+  assert(not future.finished, "Future already finished, cannot finish twice.")
+  assert(future.error == nil)
+  future.value = val
+  future.finished = true
+  if future.cb != nil:
+    future.cb()
+
+proc complete*(future: PFuture[void]) =
+  ## Completes a void ``future``.
+  assert(not future.finished, "Future already finished, cannot finish twice.")
+  assert(future.error == nil)
+  future.finished = true
+  if future.cb != nil:
+    future.cb()
+
+proc fail*[T](future: PFuture[T], error: ref EBase) =
+  ## Completes ``future`` with ``error``.
+  assert(not future.finished, "Future already finished, cannot finish twice.")
+  future.finished = true
+  future.error = error
+  if future.cb != nil:
+    future.cb()
+
+proc `callback=`*(future: PFutureBase, cb: proc () {.closure.}) =
+  ## Sets the callback proc to be called when the future completes.
+  ##
+  ## If future has already completed then ``cb`` will be called immediately.
+  ##
+  ## **Note**: You most likely want the other ``callback`` setter which
+  ## passes ``future`` as a param to the callback.
+  future.cb = cb
+  if future.finished:
+    future.cb()
+
+proc `callback=`*[T](future: PFuture[T],
+    cb: proc (future: PFuture[T]) {.closure.}) =
+  ## Sets the callback proc to be called when the future completes.
+  ##
+  ## If future has already completed then ``cb`` will be called immediately.
+  future.callback = proc () = cb(future)
+
+proc read*[T](future: PFuture[T]): T =
+  ## Retrieves the value of ``future``. Future must be finished otherwise
+  ## this function will fail with a ``EInvalidValue`` exception.
+  ##
+  ## If the result of the future is an error then that error will be raised.
+  if future.finished:
+    if future.error != nil: raise future.error
+    when T isnot void:
+      return future.value
+  else:
+    # TODO: Make a custom exception type for this?
+    raise newException(EInvalidValue, "Future still in progress.")
+
+proc readError*[T](future: PFuture[T]): ref EBase =
+  if future.error != nil: return future.error
+  else:
+    raise newException(EInvalidValue, "No error in future.")
+
+proc finished*[T](future: PFuture[T]): bool =
+  ## Determines whether ``future`` has completed.
+  ##
+  ## ``True`` may indicate an error or a value. Use ``failed`` to distinguish.
+  future.finished
+
+proc failed*[T](future: PFuture[T]): bool =
+  ## Determines whether ``future`` completed with an error.
+  future.error != nil
+
+when defined(windows) or defined(nimdoc):
+  import winlean, sets, hashes
+  type
+    TCompletionKey = dword
+
+    TCompletionData* = object
+      sock: TSocketHandle
+      cb: proc (sock: TSocketHandle, bytesTransferred: DWORD,
+                errcode: TOSErrorCode) {.closure.}
+
+    PDispatcher* = ref object
+      ioPort: THandle
+      handles: TSet[TSocketHandle]
+
+    TCustomOverlapped = object
+      Internal*: DWORD
+      InternalHigh*: DWORD
+      Offset*: DWORD
+      OffsetHigh*: DWORD
+      hEvent*: THANDLE
+      data*: TCompletionData
+
+    PCustomOverlapped = ptr TCustomOverlapped
+
+  proc hash(x: TSocketHandle): THash {.borrow.}
+
+  proc newDispatcher*(): PDispatcher =
+    ## Creates a new Dispatcher instance.
+    new result
+    result.ioPort = CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
+    result.handles = initSet[TSocketHandle]()
+
+  proc register*(p: PDispatcher, sock: TSocketHandle) =
+    ## Registers ``sock`` with the dispatcher ``p``.
+    if CreateIOCompletionPort(sock.THandle, p.ioPort,
+                              cast[TCompletionKey](sock), 1) == 0:
+      OSError(OSLastError())
+    p.handles.incl(sock)
+
+  proc verifyPresence(p: PDispatcher, sock: TSocketHandle) =
+    ## Ensures that socket has been registered with the dispatcher.
+    if sock notin p.handles:
+      raise newException(EInvalidValue,
+        "Operation performed on a socket which has not been registered with" &
+        " the dispatcher yet.")
+
+  proc poll*(p: PDispatcher, timeout = 500) =
+    ## Waits for completion events and processes them.
+    if p.handles.len == 0:
+      raise newException(EInvalidValue, "No handles registered in dispatcher.")
+    
+    let llTimeout =
+      if timeout ==  -1: winlean.INFINITE
+      else: timeout.int32
+    var lpNumberOfBytesTransferred: DWORD
+    var lpCompletionKey: ULONG
+    var lpOverlapped: POverlapped
+    let res = GetQueuedCompletionStatus(p.ioPort, addr lpNumberOfBytesTransferred,
+        addr lpCompletionKey, addr lpOverlapped, llTimeout).bool
+
+    # http://stackoverflow.com/a/12277264/492186
+    # TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html
+    var customOverlapped = cast[PCustomOverlapped](lpOverlapped)
+    if res:
+      # This is useful for ensuring the reliability of the overlapped struct.
+      assert customOverlapped.data.sock == lpCompletionKey.TSocketHandle
+
+      customOverlapped.data.cb(customOverlapped.data.sock,
+          lpNumberOfBytesTransferred, TOSErrorCode(-1))
+      dealloc(customOverlapped)
+    else:
+      let errCode = OSLastError()
+      if lpOverlapped != nil:
+        assert customOverlapped.data.sock == lpCompletionKey.TSocketHandle
+        customOverlapped.data.cb(customOverlapped.data.sock,
+            lpNumberOfBytesTransferred, errCode)
+        dealloc(customOverlapped)
+      else:
+        if errCode.int32 == WAIT_TIMEOUT:
+          # Timed out
+          discard
+        else: OSError(errCode)
+
+  var connectExPtr: pointer = nil
+  var acceptExPtr: pointer = nil
+  var getAcceptExSockAddrsPtr: pointer = nil
+
+  proc initPointer(s: TSocketHandle, func: var pointer, guid: var TGUID): bool =
+    # Ref: https://github.com/powdahound/twisted/blob/master/twisted/internet/iocpreactor/iocpsupport/winsock_pointers.c
+    var bytesRet: DWord
+    func = nil
+    result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid,
+                      sizeof(TGUID).dword, addr func, sizeof(pointer).DWORD,
+                      addr bytesRet, nil, nil) == 0
+
+  proc initAll() =
+    let dummySock = socket()
+    if not initPointer(dummySock, connectExPtr, WSAID_CONNECTEX):
+      OSError(OSLastError())
+    if not initPointer(dummySock, acceptExPtr, WSAID_ACCEPTEX):
+      OSError(OSLastError())
+    if not initPointer(dummySock, getAcceptExSockAddrsPtr, WSAID_GETACCEPTEXSOCKADDRS):
+      OSError(OSLastError())
+
+  proc connectEx(s: TSocketHandle, name: ptr TSockAddr, namelen: cint, 
+                  lpSendBuffer: pointer, dwSendDataLength: dword,
+                  lpdwBytesSent: PDWORD, lpOverlapped: POverlapped): bool =
+    if connectExPtr.isNil: raise newException(EInvalidValue, "Need to initialise ConnectEx().")
+    let func =
+      cast[proc (s: TSocketHandle, name: ptr TSockAddr, namelen: cint, 
+         lpSendBuffer: pointer, dwSendDataLength: dword,
+         lpdwBytesSent: PDWORD, lpOverlapped: POverlapped): bool {.stdcall.}](connectExPtr)
+
+    result = func(s, name, namelen, lpSendBuffer, dwSendDataLength, lpdwBytesSent,
+         lpOverlapped)
+
+  proc acceptEx(listenSock, acceptSock: TSocketHandle, lpOutputBuffer: pointer,
+                 dwReceiveDataLength, dwLocalAddressLength,
+                 dwRemoteAddressLength: DWORD, lpdwBytesReceived: PDWORD,
+                 lpOverlapped: POverlapped): bool =
+    if acceptExPtr.isNil: raise newException(EInvalidValue, "Need to initialise AcceptEx().")
+    let func =
+      cast[proc (listenSock, acceptSock: TSocketHandle, lpOutputBuffer: pointer,
+                 dwReceiveDataLength, dwLocalAddressLength,
+                 dwRemoteAddressLength: DWORD, lpdwBytesReceived: PDWORD,
+                 lpOverlapped: POverlapped): bool {.stdcall.}](acceptExPtr)
+    result = func(listenSock, acceptSock, lpOutputBuffer, dwReceiveDataLength,
+        dwLocalAddressLength, dwRemoteAddressLength, lpdwBytesReceived,
+        lpOverlapped)
+
+  proc getAcceptExSockaddrs(lpOutputBuffer: pointer,
+      dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength: DWORD,
+      LocalSockaddr: ptr ptr TSockAddr, LocalSockaddrLength: lpint,
+      RemoteSockaddr: ptr ptr TSockAddr, RemoteSockaddrLength: lpint) =
+    if getAcceptExSockAddrsPtr.isNil:
+      raise newException(EInvalidValue, "Need to initialise getAcceptExSockAddrs().")
+
+    let func =
+      cast[proc (lpOutputBuffer: pointer,
+                 dwReceiveDataLength, dwLocalAddressLength,
+                 dwRemoteAddressLength: DWORD, LocalSockaddr: ptr ptr TSockAddr,
+                 LocalSockaddrLength: lpint, RemoteSockaddr: ptr ptr TSockAddr,
+                RemoteSockaddrLength: lpint) {.stdcall.}](getAcceptExSockAddrsPtr)
+    
+    func(lpOutputBuffer, dwReceiveDataLength, dwLocalAddressLength,
+                  dwRemoteAddressLength, LocalSockaddr, LocalSockaddrLength,
+                  RemoteSockaddr, RemoteSockaddrLength)
+
+  proc connect*(p: PDispatcher, socket: TSocketHandle, address: string, port: TPort,
+    af = AF_INET): PFuture[void] =
+    ## Connects ``socket`` to server at ``address:port``.
+    ##
+    ## Returns a ``PFuture`` which will complete when the connection succeeds
+    ## or an error occurs.
+    verifyPresence(p, socket)
+    var retFuture = newFuture[void]()
+    # Apparently ``ConnectEx`` expects the socket to be initially bound:
+    var saddr: Tsockaddr_in
+    saddr.sin_family = int16(toInt(af))
+    saddr.sin_port = 0
+    saddr.sin_addr.s_addr = INADDR_ANY
+    if bindAddr(socket, cast[ptr TSockAddr](addr(saddr)),
+                  sizeof(saddr).TSockLen) < 0'i32:
+      OSError(OSLastError())
+
+    var aiList = getAddrInfo(address, port, af)
+    var success = false
+    var lastError: TOSErrorCode
+    var it = aiList
+    while it != nil:
+      # "the OVERLAPPED structure must remain valid until the I/O completes"
+      # http://blogs.msdn.com/b/oldnewthing/archive/2011/02/02/10123392.aspx
+      var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped)))
+      ol.data = TCompletionData(sock: socket, cb:
+        proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) =
+          if not retFuture.finished:
+            if errcode == TOSErrorCode(-1):
+              retFuture.complete()
+            else:
+              retFuture.fail(newException(EOS, osErrorMsg(errcode)))
+      )
+      
+      var ret = connectEx(socket, it.ai_addr, sizeof(TSockAddrIn).cint,
+                          nil, 0, nil, cast[POverlapped](ol))
+      if ret:
+        # Request to connect completed immediately.
+        success = true
+        retFuture.complete()
+        # We don't deallocate ``ol`` here because even though this completed
+        # immediately poll will still be notified about its completion and it will
+        # free ``ol``.
+        break
+      else:
+        lastError = OSLastError()
+        if lastError.int32 == ERROR_IO_PENDING:
+          # In this case ``ol`` will be deallocated in ``poll``.
+          success = true
+          break
+        else:
+          dealloc(ol)
+          success = false
+      it = it.ai_next
+
+    dealloc(aiList)
+    if not success:
+      retFuture.fail(newException(EOS, osErrorMsg(lastError)))
+    return retFuture
+
+  proc recv*(p: PDispatcher, socket: TSocketHandle, size: int,
+             flags: int = 0): PFuture[string] =
+    ## Reads ``size`` bytes from ``socket``. Returned future will complete once
+    ## all of the requested data is read. If socket is disconnected during the
+    ## recv operation then the future may complete with only a part of the
+    ## requested data read. If socket is disconnected and no data is available
+    ## to be read then the future will complete with a value of ``""``.
+    verifyPresence(p, socket)
+    var retFuture = newFuture[string]()
+    
+    var dataBuf: TWSABuf
+    dataBuf.buf = newString(size)
+    dataBuf.len = size
+    
+    var bytesReceived: DWord
+    var flagsio = flags.dword
+    var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped)))
+    ol.data = TCompletionData(sock: socket, cb:
+      proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) =
+        if not retFuture.finished:
+          if errcode == TOSErrorCode(-1):
+            if bytesCount == 0 and dataBuf.buf[0] == '\0':
+              retFuture.complete("")
+            else:
+              var data = newString(size)
+              copyMem(addr data[0], addr dataBuf.buf[0], size)
+              retFuture.complete($data)
+          else:
+            retFuture.fail(newException(EOS, osErrorMsg(errcode)))
+    )
+
+    let ret = WSARecv(socket, addr dataBuf, 1, addr bytesReceived,
+                      addr flagsio, cast[POverlapped](ol), nil)
+    if ret == -1:
+      let err = OSLastError()
+      if err.int32 != ERROR_IO_PENDING:
+        retFuture.fail(newException(EOS, osErrorMsg(err)))
+        dealloc(ol)
+    elif ret == 0 and bytesReceived == 0 and dataBuf.buf[0] == '\0':
+      # We have to ensure that the buffer is empty because WSARecv will tell
+      # us immediatelly when it was disconnected, even when there is still
+      # data in the buffer.
+      # We want to give the user as much data as we can. So we only return
+      # the empty string (which signals a disconnection) when there is
+      # nothing left to read.
+      retFuture.complete("")
+      # TODO: "For message-oriented sockets, where a zero byte message is often 
+      # allowable, a failure with an error code of WSAEDISCON is used to 
+      # indicate graceful closure." 
+      # ~ http://msdn.microsoft.com/en-us/library/ms741688%28v=vs.85%29.aspx
+    else:
+      # Request to read completed immediately.
+      var data = newString(size)
+      copyMem(addr data[0], addr dataBuf.buf[0], size)
+      retFuture.complete($data)
+      # We don't deallocate ``ol`` here because even though this completed
+      # immediately poll will still be notified about its completion and it will
+      # free ``ol``.
+    return retFuture
+
+  proc send*(p: PDispatcher, socket: TSocketHandle, data: string): PFuture[void] =
+    ## Sends ``data`` to ``socket``. The returned future will complete once all
+    ## data has been sent.
+    verifyPresence(p, socket)
+    var retFuture = newFuture[void]()
+
+    var dataBuf: TWSABuf
+    dataBuf.buf = data
+    dataBuf.len = data.len
+
+    var bytesReceived, flags: DWord
+    var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped)))
+    ol.data = TCompletionData(sock: socket, cb:
+      proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) =
+        if not retFuture.finished:
+          if errcode == TOSErrorCode(-1):
+            retFuture.complete()
+          else:
+            retFuture.fail(newException(EOS, osErrorMsg(errcode)))
+    )
+
+    let ret = WSASend(socket, addr dataBuf, 1, addr bytesReceived,
+                      flags, cast[POverlapped](ol), nil)
+    if ret == -1:
+      let err = osLastError()
+      if err.int32 != ERROR_IO_PENDING:
+        retFuture.fail(newException(EOS, osErrorMsg(err)))
+        dealloc(ol)
+    else:
+      retFuture.complete()
+      # We don't deallocate ``ol`` here because even though this completed
+      # immediately poll will still be notified about its completion and it will
+      # free ``ol``.
+    return retFuture
+
+  proc acceptAddr*(p: PDispatcher, socket: TSocketHandle): 
+      PFuture[tuple[address: string, client: TSocketHandle]] =
+    ## Accepts a new connection. Returns a future containing the client socket
+    ## corresponding to that connection and the remote address of the client.
+    ## The future will complete when the connection is successfully accepted.
+    ##
+    ## The resulting client socket is automatically registered to dispatcher.
+    verifyPresence(p, socket)
+    var retFuture = newFuture[tuple[address: string, client: TSocketHandle]]()
+
+    var clientSock = socket()
+    if clientSock == OSInvalidSocket: osError(osLastError())
+
+    const lpOutputLen = 1024
+    var lpOutputBuf = newString(lpOutputLen)
+    var dwBytesReceived: DWORD
+    let dwReceiveDataLength = 0.DWORD # We don't want any data to be read.
+    let dwLocalAddressLength = DWORD(sizeof (TSockaddr_in) + 16)
+    let dwRemoteAddressLength = DWORD(sizeof(TSockaddr_in) + 16)
+
+    template completeAccept(): stmt {.immediate, dirty.} =
+      var listenSock = socket
+      let setoptRet = setsockopt(clientSock, SOL_SOCKET,
+          SO_UPDATE_ACCEPT_CONTEXT, addr listenSock,
+          sizeof(listenSock).TSockLen)
+      if setoptRet != 0: osError(osLastError())
+
+      var LocalSockaddr, RemoteSockaddr: ptr TSockAddr
+      var localLen, remoteLen: int32
+      getAcceptExSockaddrs(addr lpOutputBuf[0], dwReceiveDataLength,
+                           dwLocalAddressLength, dwRemoteAddressLength,
+                           addr LocalSockaddr, addr localLen,
+                           addr RemoteSockaddr, addr remoteLen)
+      p.register(clientSock)
+      # TODO: IPv6. Check ``sa_family``. http://stackoverflow.com/a/9212542/492186
+      retFuture.complete(
+        (address: $inet_ntoa(cast[ptr Tsockaddr_in](remoteSockAddr).sin_addr),
+         client: clientSock)
+      )
+
+    var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped)))
+    ol.data = TCompletionData(sock: socket, cb:
+      proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) =
+        if not retFuture.finished:
+          if errcode == TOSErrorCode(-1):
+            completeAccept()
+          else:
+            retFuture.fail(newException(EOS, osErrorMsg(errcode)))
+    )
+
+    # http://msdn.microsoft.com/en-us/library/windows/desktop/ms737524%28v=vs.85%29.aspx
+    let ret = acceptEx(socket, clientSock, addr lpOutputBuf[0],
+                       dwReceiveDataLength, 
+                       dwLocalAddressLength,
+                       dwRemoteAddressLength,
+                       addr dwBytesReceived, cast[POverlapped](ol))
+
+    if not ret:
+      let err = osLastError()
+      if err.int32 != ERROR_IO_PENDING:
+        retFuture.fail(newException(EOS, osErrorMsg(err)))
+        dealloc(ol)
+    else:
+      completeAccept()
+      # We don't deallocate ``ol`` here because even though this completed
+      # immediately poll will still be notified about its completion and it will
+      # free ``ol``.
+
+    return retFuture
+
+  proc socket*(disp: PDispatcher, domain: TDomain = AF_INET,
+               typ: TType = SOCK_STREAM,
+               protocol: TProtocol = IPPROTO_TCP): TSocketHandle =
+    ## Creates a new socket and registers it with the dispatcher implicitly.
+    result = socket(domain, typ, protocol)
+    result.setBlocking(false)
+    disp.register(result)
+
+  proc close*(disp: PDispatcher, socket: TSocketHandle) =
+    ## Closes a socket and ensures that it is unregistered.
+    socket.close()
+    disp.handles.excl(socket)
+
+  initAll()
+else:
+  import selectors
+  from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK
+  type
+    TCallback = proc (sock: TSocketHandle): bool {.closure.}
+
+    PData* = ref object of PObject
+      sock: TSocketHandle
+      readCBs: seq[TCallback]
+      writeCBs: seq[TCallback]
+
+    PDispatcher* = ref object
+      selector: PSelector
+
+  proc newDispatcher*(): PDispatcher =
+    new result
+    result.selector = newSelector()
+
+  proc update(p: PDispatcher, sock: TSocketHandle, events: set[TEvent]) =
+    assert sock in p.selector
+    discard p.selector.update(sock, events)
+
+  proc register(p: PDispatcher, sock: TSocketHandle) =
+    var data = PData(sock: sock, readCBs: @[], writeCBs: @[])
+    p.selector.register(sock, {}, data.PObject)
+
+  proc socket*(disp: PDispatcher, domain: TDomain = AF_INET,
+               typ: TType = SOCK_STREAM,
+               protocol: TProtocol = IPPROTO_TCP): TSocketHandle =
+    result = socket(domain, typ, protocol)
+    result.setBlocking(false)
+    disp.register(result)
+  
+  proc close*(disp: PDispatcher, sock: TSocketHandle) =
+    sock.close()
+    disp.selector.unregister(sock)
+
+  proc addRead(p: PDispatcher, sock: TSocketHandle, cb: TCallback) =
+    if sock notin p.selector:
+      raise newException(EInvalidValue, "File descriptor not registered.")
+    p.selector[sock].data.PData.readCBs.add(cb)
+    p.update(sock, p.selector[sock].events + {EvRead})
+  
+  proc addWrite(p: PDispatcher, sock: TSocketHandle, cb: TCallback) =
+    if sock notin p.selector:
+      raise newException(EInvalidValue, "File descriptor not registered.")
+    p.selector[sock].data.PData.writeCBs.add(cb)
+    p.update(sock, p.selector[sock].events + {EvWrite})
+  
+  proc poll*(p: PDispatcher, timeout = 500) =
+    for info in p.selector.select(timeout):
+      let data = PData(info.key.data)
+      assert data.sock == info.key.fd
+      #echo("In poll ", data.sock.cint)
+      if EvRead in info.events:
+        # Callback may add items to ``data.readCBs`` which causes issues if
+        # we are iterating over ``data.readCBs`` at the same time. We therefore
+        # make a copy to iterate over.
+        let currentCBs = data.readCBs
+        data.readCBs = @[]
+        for cb in currentCBs:
+          if not cb(data.sock):
+            # Callback wants to be called again.
+            data.readCBs.add(cb)
+      
+      if EvWrite in info.events:
+        let currentCBs = data.writeCBs
+        data.writeCBs = @[]
+        for cb in currentCBs:
+          if not cb(data.sock):
+            # Callback wants to be called again.
+            data.writeCBs.add(cb)
+      
+      if info.key in p.selector:
+        var newEvents: set[TEvent]
+        if data.readCBs.len != 0: newEvents = {EvRead}
+        if data.writeCBs.len != 0: newEvents = newEvents + {EvWrite}
+        if newEvents != info.key.events:
+          echo(info.key.events, " -> ", newEvents)
+          p.update(data.sock, newEvents)
+      else:
+        # FD no longer a part of the selector. Likely been closed
+        # (e.g. socket disconnected).
+  
+  proc connect*(p: PDispatcher, socket: TSocketHandle, address: string, port: TPort,
+    af = AF_INET): PFuture[void] =
+    var retFuture = newFuture[void]()
+    
+    proc cb(sock: TSocketHandle): bool =
+      # We have connected.
+      retFuture.complete()
+      return true
+    
+    var aiList = getAddrInfo(address, port, af)
+    var success = false
+    var lastError: TOSErrorCode
+    var it = aiList
+    while it != nil:
+      var ret = connect(socket, it.ai_addr, it.ai_addrlen.TSocklen)
+      if ret == 0:
+        # Request to connect completed immediately.
+        success = true
+        retFuture.complete()
+        break
+      else:
+        lastError = osLastError()
+        if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS:
+          success = true
+          addWrite(p, socket, cb)
+          break
+        else:
+          success = false
+      it = it.ai_next
+
+    dealloc(aiList)
+    if not success:
+      retFuture.fail(newException(EOS, osErrorMsg(lastError)))
+    return retFuture
+
+  proc recv*(p: PDispatcher, socket: TSocketHandle, size: int,
+             flags: int = 0): PFuture[string] =
+    var retFuture = newFuture[string]()
+    
+    var readBuffer = newString(size)
+    var sizeRead = 0
+    
+    proc cb(sock: TSocketHandle): bool =
+      result = true
+      let netSize = size - sizeRead
+      let res = recv(sock, addr readBuffer[sizeRead], netSize, flags.cint)
+      #echo("recv cb res: ", res)
+      if res < 0:
+        let lastError = osLastError()
+        if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: 
+          retFuture.fail(newException(EOS, osErrorMsg(lastError)))
+        else:
+          result = false # We still want this callback to be called.
+      elif res == 0:
+        #echo("Disconnected recv: ", sizeRead)
+        # Disconnected
+        if sizeRead == 0:
+          retFuture.complete("")
+        else:
+          readBuffer.setLen(sizeRead)
+          retFuture.complete(readBuffer)
+      else:
+        sizeRead.inc(res)
+        if res != netSize:
+          result = false # We want to read all the data requested.
+        else:
+          retFuture.complete(readBuffer)
+      #echo("Recv cb result: ", result)
+  
+    addRead(p, socket, cb)
+    return retFuture
+
+  proc send*(p: PDispatcher, socket: TSocketHandle, data: string): PFuture[void] =
+    var retFuture = newFuture[void]()
+    
+    var written = 0
+    
+    proc cb(sock: TSocketHandle): bool =
+      result = true
+      let netSize = data.len-written
+      var d = data.cstring
+      let res = send(sock, addr d[written], netSize, 0.cint)
+      if res < 0:
+        let lastError = osLastError()
+        if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
+          retFuture.fail(newException(EOS, osErrorMsg(lastError)))
+        else:
+          result = false # We still want this callback to be called.
+      else:
+        written.inc(res)
+        if res != netSize:
+          result = false # We still have data to send.
+        else:
+          retFuture.complete()
+    addWrite(p, socket, cb)
+    return retFuture
+
+  proc acceptAddr*(p: PDispatcher, socket: TSocketHandle): 
+      PFuture[tuple[address: string, client: TSocketHandle]] =
+    var retFuture = newFuture[tuple[address: string, client: TSocketHandle]]()
+    proc cb(sock: TSocketHandle): bool =
+      result = true
+      var sockAddress: Tsockaddr_in
+      var addrLen = sizeof(sockAddress).TSocklen
+      var client = accept(sock, cast[ptr TSockAddr](addr(sockAddress)),
+                          addr(addrLen))
+      if client == osInvalidSocket:
+        let lastError = osLastError()
+        assert lastError.int32 notin {EWOULDBLOCK, EAGAIN}
+        if lastError.int32 == EINTR:
+          return false
+        else:
+          retFuture.fail(newException(EOS, osErrorMsg(lastError)))
+      else:
+        p.register(client)
+        retFuture.complete(($inet_ntoa(sockAddress.sin_addr), client))
+    addRead(p, socket, cb)
+    return retFuture
+
+proc accept*(p: PDispatcher, socket: TSocketHandle): PFuture[TSocketHandle] =
+  ## Accepts a new connection. Returns a future containing the client socket
+  ## corresponding to that connection.
+  ## The future will complete when the connection is successfully accepted.
+  var retFut = newFuture[TSocketHandle]()
+  var fut = p.acceptAddr(socket)
+  fut.callback =
+    proc (future: PFuture[tuple[address: string, client: TSocketHandle]]) =
+      assert future.finished
+      if future.failed:
+        retFut.fail(future.error)
+      else:
+        retFut.complete(future.read.client)
+  return retFut
+
+# -- Await Macro
+
+template createCb*(cbName, varNameIterSym, retFutureSym: expr): stmt {.immediate, dirty.} =
+  proc cbName {.closure.} =
+    if not varNameIterSym.finished:
+      var next = varNameIterSym()
+      if next == nil:
+        assert retFutureSym.finished, "Async procedure's return Future was not finished."
+      else:
+        next.callback = cbName
+
+template createVar(futSymName: string, asyncProc: PNimrodNode,
+                   valueReceiver: expr) {.immediate, dirty.} =
+  # TODO: Used template here due to bug #926
+  result = newNimNode(nnkStmtList)
+  var futSym = newIdentNode(futSymName) #genSym(nskVar, "future")
+  result.add newVarStmt(futSym, asyncProc) # -> var future<x> = y
+  result.add newNimNode(nnkYieldStmt).add(futSym) # -> yield future<x>
+  valueReceiver = newDotExpr(futSym, newIdentNode("read")) # -> future<x>.read
+
+proc processBody(node, retFutureSym: PNimrodNode): PNimrodNode {.compileTime.} =
+  result = node
+  case node.kind
+  of nnkReturnStmt:
+    result = newNimNode(nnkStmtList)
+    result.add newCall(newIdentNode("complete"), retFutureSym,
+      if node[0].kind == nnkEmpty: newIdentNode("result") else: node[0])
+    result.add newNimNode(nnkYieldStmt).add(newNilLit())
+  of nnkCommand:
+    if node[0].ident == !"await":
+      case node[1].kind
+      of nnkIdent:
+        # await x
+        result = newNimNode(nnkYieldStmt).add(node[1]) # -> yield x
+      of nnkCall:
+        # await foo(p, x)
+        var futureValue: PNimrodNode
+        createVar("future" & $node[1][0].toStrLit, node[1], futureValue)
+        result.add futureValue
+      else:
+        error("Invalid node kind in 'await', got: " & $node[1].kind)
+    elif node[1].kind == nnkCommand and node[1][0].kind == nnkIdent and
+         node[1][0].ident == !"await":
+      # foo await x
+      var newCommand = node
+      createVar("future" & $node[0].ident, node[1][0], newCommand[1])
+      result.add newCommand
+
+  of nnkVarSection, nnkLetSection:
+    case node[0][2].kind
+    of nnkCommand:
+      if node[0][2][0].ident == !"await":
+        # var x = await y
+        var newVarSection = node # TODO: Should this use copyNimNode?
+        createVar("future" & $node[0][0].ident, node[0][2][1],
+          newVarSection[0][2])
+        result.add newVarSection
+    else: discard
+  of nnkAsgn:
+    case node[1].kind
+    of nnkCommand:
+      if node[1][0].ident == !"await":
+        # x = await y
+        var newAsgn = node
+        createVar("future" & $node[0].ident, node[1][1], newAsgn[1])
+        result.add newAsgn
+    else: discard
+  of nnkDiscardStmt:
+    # discard await x
+    if node[0][0].kind == nnkIdent and node[0][0].ident == !"await":
+      var dummy = newNimNode(nnkStmtList)
+      createVar("futureDiscard_" & $toStrLit(node[0][1]), node[0][1], dummy)
+  else: discard
+  
+  for i in 0 .. <result.len:
+    result[i] = processBody(result[i], retFutureSym)
+  #echo(treeRepr(result))
+
+proc getName(node: PNimrodNode): string {.compileTime.} =
+  case node.kind
+  of nnkPostfix:
+    return $node[1].ident
+  of nnkIdent:
+    return $node.ident
+  else:
+    assert false
+
+macro async*(prc: stmt): stmt {.immediate.} =
+  ## Macro which processes async procedures into the appropriate
+  ## iterators and yield statements.
+
+  expectKind(prc, nnkProcDef)
+
+  hint("Processing " & prc[0].getName & " as an async proc.")
+
+  let returnType = prc[3][0]
+  var subtypeName = ""
+  # Verify that the return type is a PFuture[T]
+  if returnType.kind == nnkIdent:
+    error("Expected return type of 'PFuture' got '" & $returnType & "'")
+  elif returnType.kind == nnkBracketExpr:
+    if $returnType[0] != "PFuture":
+      error("Expected return type of 'PFuture' got '" & $returnType[0] & "'")
+    subtypeName = $returnType[1].ident
+  elif returnType.kind == nnkEmpty:
+    subtypeName = "void"
+  
+  # TODO: Why can't I use genSym? I get illegal capture errors for Syms.
+  # TODO: It seems genSym is broken. Change all usages back to genSym when fixed
+
+  var outerProcBody = newNimNode(nnkStmtList)
+
+  # -> var retFuture = newFuture[T]()
+  var retFutureSym = newIdentNode("retFuture") #genSym(nskVar, "retFuture")
+  outerProcBody.add(
+    newVarStmt(retFutureSym, 
+      newCall(
+        newNimNode(nnkBracketExpr).add(
+          newIdentNode(!"newFuture"), # TODO: Strange bug here? Remove the `!`.
+          newIdentNode(subtypeName))))) # Get type from return type of this proc
+  echo(treeRepr(outerProcBody))
+  # -> iterator nameIter(): PFutureBase {.closure.} = 
+  # ->   var result: T
+  # ->   <proc_body>
+  # ->   complete(retFuture, result)
+  var iteratorNameSym = newIdentNode($prc[0].getName & "Iter") #genSym(nskIterator, $prc[0].ident & "Iter")
+  var procBody = prc[6].processBody(retFutureSym)
+  if subtypeName != "void":
+    procBody.insert(0, newNimNode(nnkVarSection).add(
+      newIdentDefs(newIdentNode("result"), returnType[1]))) # -> var result: T
+    procBody.add(
+      newCall(newIdentNode("complete"),
+        retFutureSym, newIdentNode("result"))) # -> complete(retFuture, result)
+  else:
+    # -> complete(retFuture)
+    procBody.add(newCall(newIdentNode("complete"), retFutureSym))
+  
+  var closureIterator = newProc(iteratorNameSym, [newIdentNode("PFutureBase")],
+                                procBody, nnkIteratorDef)
+  closureIterator[4] = newNimNode(nnkPragma).add(newIdentNode("closure"))
+  outerProcBody.add(closureIterator)
+
+  # -> var nameIterVar = nameIter
+  # -> var first = nameIterVar()
+  var varNameIterSym = newIdentNode($prc[0].getName & "IterVar") #genSym(nskVar, $prc[0].ident & "IterVar")
+  var varNameIter = newVarStmt(varNameIterSym, iteratorNameSym)
+  outerProcBody.add varNameIter
+  var varFirstSym = genSym(nskVar, "first")
+  var varFirst = newVarStmt(varFirstSym, newCall(varNameIterSym))
+  outerProcBody.add varFirst
+
+  # -> createCb(cb, nameIter, retFuture)
+  var cbName = newIdentNode("cb")
+  var procCb = newCall("createCb", cbName, varNameIterSym, retFutureSym)
+  outerProcBody.add procCb
+
+  # -> first.callback = cb
+  outerProcBody.add newAssignment(
+    newDotExpr(varFirstSym, newIdentNode("callback")),
+    cbName)
+
+  # -> return retFuture
+  outerProcBody.add newNimNode(nnkReturnStmt).add(retFutureSym)
+  
+  result = prc
+
+  # Remove the 'async' pragma.
+  for i in 0 .. <result[4].len:
+    if result[4][i].ident == !"async":
+      result[4].del(i)
+  if subtypeName == "void":
+    # Add discardable pragma.
+    result[4].add(newIdentNode("discardable"))
+    if returnType.kind == nnkEmpty:
+      # Add PFuture[void]
+      result[3][0] = parseExpr("PFuture[void]")
+
+  result[6] = outerProcBody
+
+  echo(toStrLit(result))
+
+proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.} =
+  ## Reads a line of data from ``socket``. Returned future will complete once
+  ## a full line is read or an error occurs.
+  ##
+  ## If a full line is read ``\r\L`` is not
+  ## added to ``line``, however if solely ``\r\L`` is read then ``line``
+  ## will be set to it.
+  ## 
+  ## If the socket is disconnected, ``line`` will be set to ``""``.
+  ##
+  ## If the socket is disconnected in the middle of a line (before ``\r\L``
+  ## is read) then line will be set to ``""``.
+  ## The partial line **will be lost**.
+  
+  template addNLIfEmpty(): stmt =
+    if result.len == 0:
+      result.add("\c\L")
+
+  result = ""
+  var c = ""
+  while true:
+    c = await p.recv(socket, 1)
+    if c.len == 0:
+      return ""
+    if c == "\r":
+      c = await p.recv(socket, 1, MSG_PEEK)
+      if c.len > 0 and c == "\L":
+        discard await p.recv(socket, 1)
+      addNLIfEmpty()
+      return
+    elif c == "\L":
+      addNLIfEmpty()
+      return
+    add(result.string, c)
+
+var gDisp*{.threadvar.}: PDispatcher ## Global dispatcher
+gDisp = newDispatcher()
+
+proc runForever*() =
+  ## Begins a never ending global dispatcher poll loop.
+  while true:
+    gDisp.poll()
+
+
+when isMainModule:
+  
+  var p = newDispatcher()
+  var sock = p.socket()
+  sock.setBlocking false
+
+
+  when false:
+    # Await tests
+    proc main(p: PDispatcher): PFuture[int] {.async.} =
+      discard await p.connect(sock, "irc.freenode.net", TPort(6667))
+      while true:
+        echo("recvLine")
+        var line = await p.recvLine(sock)
+        echo("Line is: ", line.repr)
+        if line == "":
+          echo "Disconnected"
+          break
+
+    proc peekTest(p: PDispatcher): PFuture[int] {.async.} =
+      discard await p.connect(sock, "localhost", TPort(6667))
+      while true:
+        var line = await p.recv(sock, 1, MSG_PEEK)
+        var line2 = await p.recv(sock, 1)
+        echo(line.repr)
+        echo(line2.repr)
+        echo("---")
+        if line2 == "": break
+        sleep(500)
+
+    var f = main(p)
+    
+
+  else:
+    when false:
+
+      var f = p.connect(sock, "irc.poop.nl", TPort(6667))
+      f.callback =
+        proc (future: PFuture[int]) =
+          echo("Connected in future!")
+          echo(future.read)
+          for i in 0 .. 50:
+            var recvF = p.recv(sock, 10)
+            recvF.callback =
+              proc (future: PFuture[string]) =
+                echo("Read ", future.read.len, ": ", future.read.repr)
+
+    else:
+
+      sock.bindAddr(TPort(6667))
+      sock.listen()
+      proc onAccept(future: PFuture[TSocketHandle]) =
+        let client = future.read
+        echo "Accepted ", client.cint
+        var t = p.send(client, "test\c\L")
+        t.callback =
+          proc (future: PFuture[int]) =
+            echo("Send: ", future.read)
+            client.close()
+        
+        var f = p.accept(sock)
+        f.callback = onAccept
+        
+      var f = p.accept(sock)
+      f.callback = onAccept
+  
+  while true:
+    p.poll()
+
+
+
+
+
+  
+
+