summary refs log tree commit diff stats
diff options
context:
space:
mode:
authorDominik Picheta <dominikpicheta@googlemail.com>2014-03-23 18:24:11 +0000
committerDominik Picheta <dominikpicheta@googlemail.com>2014-03-23 18:24:11 +0000
commitd310b01db1c6f2c8e63a561c40352b17978e1bdb (patch)
tree1ebb9cce9a04d51dfb11629b2248d364651d81de
parente855f6c0735d4dec8b34084e439c6c215f12b155 (diff)
downloadNim-d310b01db1c6f2c8e63a561c40352b17978e1bdb.tar.gz
Moved the global dispatcher to asyncdispatch.
-rw-r--r--lib/pure/asyncdispatch.nim235
-rw-r--r--lib/pure/asyncnet.nim30
-rw-r--r--lib/pure/net.nim8
-rw-r--r--lib/pure/rawsockets.nim2
-rw-r--r--tests/async/tasyncawait.nim45
5 files changed, 175 insertions, 145 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim
index 67361e46c..4a4ef8bdd 100644
--- a/lib/pure/asyncdispatch.nim
+++ b/lib/pure/asyncdispatch.nim
@@ -111,13 +111,13 @@ when defined(windows) or defined(nimdoc):
     TCompletionKey = dword
 
     TCompletionData* = object
-      sock: TSocketHandle
-      cb: proc (sock: TSocketHandle, bytesTransferred: DWORD,
+      sock: TAsyncFD
+      cb: proc (sock: TAsyncFD, bytesTransferred: DWORD,
                 errcode: TOSErrorCode) {.closure.}
 
     PDispatcher* = ref object
       ioPort: THandle
-      handles: TSet[TSocketHandle]
+      handles: TSet[TAsyncFD]
 
     TCustomOverlapped = object
       Internal*: DWORD
@@ -129,30 +129,42 @@ when defined(windows) or defined(nimdoc):
 
     PCustomOverlapped = ptr TCustomOverlapped
 
-  proc hash(x: TSocketHandle): THash {.borrow.}
+    TAsyncFD* = distinct int
+
+  proc hash(x: TAsyncFD): THash {.borrow.}
+  proc `==`*(x: TAsyncFD, y: TAsyncFD): bool {.borrow.}
 
   proc newDispatcher*(): PDispatcher =
     ## Creates a new Dispatcher instance.
     new result
     result.ioPort = CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
-    result.handles = initSet[TSocketHandle]()
+    result.handles = initSet[TAsyncFD]()
+
+  var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
+  proc getGlobalDispatcher*(): PDispatcher =
+    ## Retrieves the global thread-local dispatcher.
+    if gDisp.isNil: gDisp = newDispatcher()
+    result = gDisp
 
-  proc register*(p: PDispatcher, sock: TSocketHandle) =
-    ## Registers ``sock`` with the dispatcher ``p``.
+  proc register*(sock: TAsyncFD) =
+    ## Registers ``sock`` with the dispatcher.
+    let p = getGlobalDispatcher()
     if CreateIOCompletionPort(sock.THandle, p.ioPort,
                               cast[TCompletionKey](sock), 1) == 0:
       OSError(OSLastError())
     p.handles.incl(sock)
 
-  proc verifyPresence(p: PDispatcher, sock: TSocketHandle) =
+  proc verifyPresence(sock: TAsyncFD) =
     ## Ensures that socket has been registered with the dispatcher.
+    let p = getGlobalDispatcher()
     if sock notin p.handles:
       raise newException(EInvalidValue,
         "Operation performed on a socket which has not been registered with" &
         " the dispatcher yet.")
 
-  proc poll*(p: PDispatcher, timeout = 500) =
+  proc poll*(timeout = 500) =
     ## Waits for completion events and processes them.
+    let p = getGlobalDispatcher()
     if p.handles.len == 0:
       raise newException(EInvalidValue, "No handles registered in dispatcher.")
     
@@ -170,7 +182,7 @@ when defined(windows) or defined(nimdoc):
     var customOverlapped = cast[PCustomOverlapped](lpOverlapped)
     if res:
       # This is useful for ensuring the reliability of the overlapped struct.
-      assert customOverlapped.data.sock == lpCompletionKey.TSocketHandle
+      assert customOverlapped.data.sock == lpCompletionKey.TAsyncFD
 
       customOverlapped.data.cb(customOverlapped.data.sock,
           lpNumberOfBytesTransferred, TOSErrorCode(-1))
@@ -178,7 +190,7 @@ when defined(windows) or defined(nimdoc):
     else:
       let errCode = OSLastError()
       if lpOverlapped != nil:
-        assert customOverlapped.data.sock == lpCompletionKey.TSocketHandle
+        assert customOverlapped.data.sock == lpCompletionKey.TAsyncFD
         customOverlapped.data.cb(customOverlapped.data.sock,
             lpNumberOfBytesTransferred, errCode)
         dealloc(customOverlapped)
@@ -201,7 +213,7 @@ when defined(windows) or defined(nimdoc):
                       addr bytesRet, nil, nil) == 0
 
   proc initAll() =
-    let dummySock = socket()
+    let dummySock = newRawSocket()
     if not initPointer(dummySock, connectExPtr, WSAID_CONNECTEX):
       OSError(OSLastError())
     if not initPointer(dummySock, acceptExPtr, WSAID_ACCEPTEX):
@@ -253,20 +265,20 @@ when defined(windows) or defined(nimdoc):
                   dwRemoteAddressLength, LocalSockaddr, LocalSockaddrLength,
                   RemoteSockaddr, RemoteSockaddrLength)
 
-  proc connect*(p: PDispatcher, socket: TSocketHandle, address: string, port: TPort,
+  proc connect*(socket: TAsyncFD, address: string, port: TPort,
     af = AF_INET): PFuture[void] =
     ## Connects ``socket`` to server at ``address:port``.
     ##
     ## Returns a ``PFuture`` which will complete when the connection succeeds
     ## or an error occurs.
-    verifyPresence(p, socket)
+    verifyPresence(socket)
     var retFuture = newFuture[void]()
     # Apparently ``ConnectEx`` expects the socket to be initially bound:
     var saddr: Tsockaddr_in
     saddr.sin_family = int16(toInt(af))
     saddr.sin_port = 0
     saddr.sin_addr.s_addr = INADDR_ANY
-    if bindAddr(socket, cast[ptr TSockAddr](addr(saddr)),
+    if bindAddr(socket.TSocketHandle, cast[ptr TSockAddr](addr(saddr)),
                   sizeof(saddr).TSockLen) < 0'i32:
       OSError(OSLastError())
 
@@ -279,7 +291,7 @@ when defined(windows) or defined(nimdoc):
       # http://blogs.msdn.com/b/oldnewthing/archive/2011/02/02/10123392.aspx
       var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped)))
       ol.data = TCompletionData(sock: socket, cb:
-        proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) =
+        proc (sock: TAsyncFD, bytesCount: DWord, errcode: TOSErrorCode) =
           if not retFuture.finished:
             if errcode == TOSErrorCode(-1):
               retFuture.complete()
@@ -287,8 +299,9 @@ when defined(windows) or defined(nimdoc):
               retFuture.fail(newException(EOS, osErrorMsg(errcode)))
       )
       
-      var ret = connectEx(socket, it.ai_addr, sizeof(TSockAddrIn).cint,
-                          nil, 0, nil, cast[POverlapped](ol))
+      var ret = connectEx(socket.TSocketHandle, it.ai_addr,
+                          sizeof(TSockAddrIn).cint, nil, 0, nil,
+                          cast[POverlapped](ol))
       if ret:
         # Request to connect completed immediately.
         success = true
@@ -313,14 +326,14 @@ when defined(windows) or defined(nimdoc):
       retFuture.fail(newException(EOS, osErrorMsg(lastError)))
     return retFuture
 
-  proc recv*(p: PDispatcher, socket: TSocketHandle, size: int,
+  proc recv*(socket: TAsyncFD, size: int,
              flags: int = 0): PFuture[string] =
     ## Reads ``size`` bytes from ``socket``. Returned future will complete once
     ## all of the requested data is read. If socket is disconnected during the
     ## recv operation then the future may complete with only a part of the
     ## requested data read. If socket is disconnected and no data is available
     ## to be read then the future will complete with a value of ``""``.
-    verifyPresence(p, socket)
+    verifyPresence(socket)
     var retFuture = newFuture[string]()
     
     var dataBuf: TWSABuf
@@ -331,7 +344,7 @@ when defined(windows) or defined(nimdoc):
     var flagsio = flags.dword
     var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped)))
     ol.data = TCompletionData(sock: socket, cb:
-      proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) =
+      proc (sock: TAsyncFD, bytesCount: DWord, errcode: TOSErrorCode) =
         if not retFuture.finished:
           if errcode == TOSErrorCode(-1):
             if bytesCount == 0 and dataBuf.buf[0] == '\0':
@@ -344,7 +357,7 @@ when defined(windows) or defined(nimdoc):
             retFuture.fail(newException(EOS, osErrorMsg(errcode)))
     )
 
-    let ret = WSARecv(socket, addr dataBuf, 1, addr bytesReceived,
+    let ret = WSARecv(socket.TSocketHandle, addr dataBuf, 1, addr bytesReceived,
                       addr flagsio, cast[POverlapped](ol), nil)
     if ret == -1:
       let err = OSLastError()
@@ -373,10 +386,10 @@ when defined(windows) or defined(nimdoc):
       # free ``ol``.
     return retFuture
 
-  proc send*(p: PDispatcher, socket: TSocketHandle, data: string): PFuture[void] =
+  proc send*(socket: TAsyncFD, data: string): PFuture[void] =
     ## Sends ``data`` to ``socket``. The returned future will complete once all
     ## data has been sent.
-    verifyPresence(p, socket)
+    verifyPresence(socket)
     var retFuture = newFuture[void]()
 
     var dataBuf: TWSABuf
@@ -386,7 +399,7 @@ when defined(windows) or defined(nimdoc):
     var bytesReceived, flags: DWord
     var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped)))
     ol.data = TCompletionData(sock: socket, cb:
-      proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) =
+      proc (sock: TAsyncFD, bytesCount: DWord, errcode: TOSErrorCode) =
         if not retFuture.finished:
           if errcode == TOSErrorCode(-1):
             retFuture.complete()
@@ -394,7 +407,7 @@ when defined(windows) or defined(nimdoc):
             retFuture.fail(newException(EOS, osErrorMsg(errcode)))
     )
 
-    let ret = WSASend(socket, addr dataBuf, 1, addr bytesReceived,
+    let ret = WSASend(socket.TSocketHandle, addr dataBuf, 1, addr bytesReceived,
                       flags, cast[POverlapped](ol), nil)
     if ret == -1:
       let err = osLastError()
@@ -408,17 +421,17 @@ when defined(windows) or defined(nimdoc):
       # free ``ol``.
     return retFuture
 
-  proc acceptAddr*(p: PDispatcher, socket: TSocketHandle): 
-      PFuture[tuple[address: string, client: TSocketHandle]] =
+  proc acceptAddr*(socket: TAsyncFD): 
+      PFuture[tuple[address: string, client: TAsyncFD]] =
     ## Accepts a new connection. Returns a future containing the client socket
     ## corresponding to that connection and the remote address of the client.
     ## The future will complete when the connection is successfully accepted.
     ##
     ## The resulting client socket is automatically registered to dispatcher.
-    verifyPresence(p, socket)
-    var retFuture = newFuture[tuple[address: string, client: TSocketHandle]]()
+    verifyPresence(socket)
+    var retFuture = newFuture[tuple[address: string, client: TAsyncFD]]()
 
-    var clientSock = socket()
+    var clientSock = newRawSocket()
     if clientSock == OSInvalidSocket: osError(osLastError())
 
     const lpOutputLen = 1024
@@ -441,16 +454,16 @@ when defined(windows) or defined(nimdoc):
                            dwLocalAddressLength, dwRemoteAddressLength,
                            addr LocalSockaddr, addr localLen,
                            addr RemoteSockaddr, addr remoteLen)
-      p.register(clientSock)
+      register(clientSock.TAsyncFD)
       # TODO: IPv6. Check ``sa_family``. http://stackoverflow.com/a/9212542/492186
       retFuture.complete(
         (address: $inet_ntoa(cast[ptr Tsockaddr_in](remoteSockAddr).sin_addr),
-         client: clientSock)
+         client: clientSock.TAsyncFD)
       )
 
     var ol = cast[PCustomOverlapped](alloc0(sizeof(TCustomOverlapped)))
     ol.data = TCompletionData(sock: socket, cb:
-      proc (sock: TSocketHandle, bytesCount: DWord, errcode: TOSErrorCode) =
+      proc (sock: TAsyncFD, bytesCount: DWord, errcode: TOSErrorCode) =
         if not retFuture.finished:
           if errcode == TOSErrorCode(-1):
             completeAccept()
@@ -459,7 +472,7 @@ when defined(windows) or defined(nimdoc):
     )
 
     # http://msdn.microsoft.com/en-us/library/windows/desktop/ms737524%28v=vs.85%29.aspx
-    let ret = acceptEx(socket, clientSock, addr lpOutputBuf[0],
+    let ret = acceptEx(socket.TSocketHandle, clientSock, addr lpOutputBuf[0],
                        dwReceiveDataLength, 
                        dwLocalAddressLength,
                        dwRemoteAddressLength,
@@ -478,73 +491,87 @@ when defined(windows) or defined(nimdoc):
 
     return retFuture
 
-  proc socket*(disp: PDispatcher, domain: TDomain = AF_INET,
+  proc newAsyncRawSocket*(domain: TDomain = AF_INET,
                typ: TType = SOCK_STREAM,
-               protocol: TProtocol = IPPROTO_TCP): TSocketHandle =
+               protocol: TProtocol = IPPROTO_TCP): TAsyncFD =
     ## Creates a new socket and registers it with the dispatcher implicitly.
-    result = socket(domain, typ, protocol)
-    result.setBlocking(false)
-    disp.register(result)
+    result = newRawSocket(domain, typ, protocol).TAsyncFD
+    result.TSocketHandle.setBlocking(false)
+    register(result)
 
-  proc close*(disp: PDispatcher, socket: TSocketHandle) =
+  proc close*(socket: TAsyncFD) =
     ## Closes a socket and ensures that it is unregistered.
-    socket.close()
-    disp.handles.excl(socket)
+    socket.TSocketHandle.close()
+    getGlobalDispatcher().handles.excl(socket)
 
   initAll()
 else:
   import selectors
   from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK
   type
-    TCallback = proc (sock: TSocketHandle): bool {.closure.}
+    TAsyncFD* = distinct cint
+    TCallback = proc (sock: TAsyncFD): bool {.closure.}
 
     PData* = ref object of PObject
-      sock: TSocketHandle
+      sock: TAsyncFD
       readCBs: seq[TCallback]
       writeCBs: seq[TCallback]
 
     PDispatcher* = ref object
       selector: PSelector
 
+  proc `==`*(x, y: TAsyncFD): bool {.borrow.}
+
   proc newDispatcher*(): PDispatcher =
     new result
     result.selector = newSelector()
 
-  proc update(p: PDispatcher, sock: TSocketHandle, events: set[TEvent]) =
-    assert sock in p.selector
-    discard p.selector.update(sock, events)
+  var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
+  proc getGlobalDispatcher*(): PDispatcher =
+    if gDisp.isNil: gDisp = newDispatcher()
+    result = gDisp
+
+  proc update(sock: TAsyncFD, events: set[TEvent]) =
+    let p = getGlobalDispatcher()
+    assert sock.TSocketHandle in p.selector
+    discard p.selector.update(sock.TSocketHandle, events)
 
-  proc register(p: PDispatcher, sock: TSocketHandle) =
+  proc register(sock: TAsyncFD) =
+    let p = getGlobalDispatcher()
     var data = PData(sock: sock, readCBs: @[], writeCBs: @[])
-    p.selector.register(sock, {}, data.PObject)
+    p.selector.register(sock.TSocketHandle, {}, data.PObject)
 
-  proc socket*(disp: PDispatcher, domain: TDomain = AF_INET,
+  proc newAsyncRawSocket*(domain: TDomain = AF_INET,
                typ: TType = SOCK_STREAM,
-               protocol: TProtocol = IPPROTO_TCP): TSocketHandle =
-    result = socket(domain, typ, protocol)
-    result.setBlocking(false)
-    disp.register(result)
+               protocol: TProtocol = IPPROTO_TCP): TAsyncFD =
+    result = newRawSocket(domain, typ, protocol).TAsyncFD
+    result.TSocketHandle.setBlocking(false)
+    register(result)
   
-  proc close*(disp: PDispatcher, sock: TSocketHandle) =
-    sock.close()
-    disp.selector.unregister(sock)
-
-  proc addRead(p: PDispatcher, sock: TSocketHandle, cb: TCallback) =
-    if sock notin p.selector:
+  proc close*(sock: TAsyncFD) =
+    let disp = getGlobalDispatcher()
+    sock.TSocketHandle.close()
+    disp.selector.unregister(sock.TSocketHandle)
+
+  proc addRead(sock: TAsyncFD, cb: TCallback) =
+    let p = getGlobalDispatcher()
+    if sock.TSocketHandle notin p.selector:
       raise newException(EInvalidValue, "File descriptor not registered.")
-    p.selector[sock].data.PData.readCBs.add(cb)
-    p.update(sock, p.selector[sock].events + {EvRead})
+    p.selector[sock.TSocketHandle].data.PData.readCBs.add(cb)
+    update(sock, p.selector[sock.TSocketHandle].events + {EvRead})
   
-  proc addWrite(p: PDispatcher, sock: TSocketHandle, cb: TCallback) =
-    if sock notin p.selector:
+  proc addWrite(sock: TAsyncFD, cb: TCallback) =
+    let p = getGlobalDispatcher()
+    if sock.TSocketHandle notin p.selector:
       raise newException(EInvalidValue, "File descriptor not registered.")
-    p.selector[sock].data.PData.writeCBs.add(cb)
-    p.update(sock, p.selector[sock].events + {EvWrite})
+    p.selector[sock.TSocketHandle].data.PData.writeCBs.add(cb)
+    update(sock, p.selector[sock.TSocketHandle].events + {EvWrite})
   
-  proc poll*(p: PDispatcher, timeout = 500) =
+  proc poll*(timeout = 500) =
+    let p = getGlobalDispatcher()
     for info in p.selector.select(timeout):
       let data = PData(info.key.data)
-      assert data.sock == info.key.fd
+      assert data.sock == info.key.fd.TAsyncFD
       #echo("In poll ", data.sock.cint)
       if EvRead in info.events:
         # Callback may add items to ``data.readCBs`` which causes issues if
@@ -570,17 +597,16 @@ else:
         if data.readCBs.len != 0: newEvents = {EvRead}
         if data.writeCBs.len != 0: newEvents = newEvents + {EvWrite}
         if newEvents != info.key.events:
-          echo(info.key.events, " -> ", newEvents)
-          p.update(data.sock, newEvents)
+          update(data.sock, newEvents)
       else:
         # FD no longer a part of the selector. Likely been closed
         # (e.g. socket disconnected).
   
-  proc connect*(p: PDispatcher, socket: TSocketHandle, address: string, port: TPort,
+  proc connect*(socket: TAsyncFD, address: string, port: TPort,
     af = AF_INET): PFuture[void] =
     var retFuture = newFuture[void]()
     
-    proc cb(sock: TSocketHandle): bool =
+    proc cb(sock: TAsyncFD): bool =
       # We have connected.
       retFuture.complete()
       return true
@@ -590,7 +616,7 @@ else:
     var lastError: TOSErrorCode
     var it = aiList
     while it != nil:
-      var ret = connect(socket, it.ai_addr, it.ai_addrlen.TSocklen)
+      var ret = connect(socket.TSocketHandle, it.ai_addr, it.ai_addrlen.TSocklen)
       if ret == 0:
         # Request to connect completed immediately.
         success = true
@@ -600,7 +626,7 @@ else:
         lastError = osLastError()
         if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS:
           success = true
-          addWrite(p, socket, cb)
+          addWrite(socket, cb)
           break
         else:
           success = false
@@ -611,17 +637,18 @@ else:
       retFuture.fail(newException(EOS, osErrorMsg(lastError)))
     return retFuture
 
-  proc recv*(p: PDispatcher, socket: TSocketHandle, size: int,
+  proc recv*(socket: TAsyncFD, size: int,
              flags: int = 0): PFuture[string] =
     var retFuture = newFuture[string]()
     
     var readBuffer = newString(size)
     var sizeRead = 0
     
-    proc cb(sock: TSocketHandle): bool =
+    proc cb(sock: TAsyncFD): bool =
       result = true
       let netSize = size - sizeRead
-      let res = recv(sock, addr readBuffer[sizeRead], netSize, flags.cint)
+      let res = recv(sock.TSocketHandle, addr readBuffer[sizeRead], netSize,
+                     flags.cint)
       #echo("recv cb res: ", res)
       if res < 0:
         let lastError = osLastError()
@@ -645,19 +672,19 @@ else:
           retFuture.complete(readBuffer)
       #echo("Recv cb result: ", result)
   
-    addRead(p, socket, cb)
+    addRead(socket, cb)
     return retFuture
 
-  proc send*(p: PDispatcher, socket: TSocketHandle, data: string): PFuture[void] =
+  proc send*(socket: TAsyncFD, data: string): PFuture[void] =
     var retFuture = newFuture[void]()
     
     var written = 0
     
-    proc cb(sock: TSocketHandle): bool =
+    proc cb(sock: TAsyncFD): bool =
       result = true
       let netSize = data.len-written
       var d = data.cstring
-      let res = send(sock, addr d[written], netSize, 0.cint)
+      let res = send(sock.TSocketHandle, addr d[written], netSize, 0.cint)
       if res < 0:
         let lastError = osLastError()
         if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}:
@@ -670,18 +697,18 @@ else:
           result = false # We still have data to send.
         else:
           retFuture.complete()
-    addWrite(p, socket, cb)
+    addWrite(socket, cb)
     return retFuture
 
-  proc acceptAddr*(p: PDispatcher, socket: TSocketHandle): 
-      PFuture[tuple[address: string, client: TSocketHandle]] =
-    var retFuture = newFuture[tuple[address: string, client: TSocketHandle]]()
-    proc cb(sock: TSocketHandle): bool =
+  proc acceptAddr*(socket: TAsyncFD): 
+      PFuture[tuple[address: string, client: TAsyncFD]] =
+    var retFuture = newFuture[tuple[address: string, client: TAsyncFD]]()
+    proc cb(sock: TAsyncFD): bool =
       result = true
       var sockAddress: Tsockaddr_in
       var addrLen = sizeof(sockAddress).TSocklen
-      var client = accept(sock, cast[ptr TSockAddr](addr(sockAddress)),
-                          addr(addrLen))
+      var client = accept(sock.TSocketHandle,
+                          cast[ptr TSockAddr](addr(sockAddress)), addr(addrLen))
       if client == osInvalidSocket:
         let lastError = osLastError()
         assert lastError.int32 notin {EWOULDBLOCK, EAGAIN}
@@ -690,19 +717,19 @@ else:
         else:
           retFuture.fail(newException(EOS, osErrorMsg(lastError)))
       else:
-        p.register(client)
-        retFuture.complete(($inet_ntoa(sockAddress.sin_addr), client))
-    addRead(p, socket, cb)
+        register(client.TAsyncFD)
+        retFuture.complete(($inet_ntoa(sockAddress.sin_addr), client.TAsyncFD))
+    addRead(socket, cb)
     return retFuture
 
-proc accept*(p: PDispatcher, socket: TSocketHandle): PFuture[TSocketHandle] =
+proc accept*(socket: TAsyncFD): PFuture[TAsyncFD] =
   ## Accepts a new connection. Returns a future containing the client socket
   ## corresponding to that connection.
   ## The future will complete when the connection is successfully accepted.
-  var retFut = newFuture[TSocketHandle]()
-  var fut = p.acceptAddr(socket)
+  var retFut = newFuture[TAsyncFD]()
+  var fut = acceptAddr(socket)
   fut.callback =
-    proc (future: PFuture[tuple[address: string, client: TSocketHandle]]) =
+    proc (future: PFuture[tuple[address: string, client: TAsyncFD]]) =
       assert future.finished
       if future.failed:
         retFut.fail(future.error)
@@ -891,7 +918,7 @@ macro async*(prc: stmt): stmt {.immediate.} =
 
   echo(toStrLit(result))
 
-proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.} =
+proc recvLine*(socket: TAsyncFD): PFuture[string] {.async.} =
   ## Reads a line of data from ``socket``. Returned future will complete once
   ## a full line is read or an error occurs.
   ##
@@ -912,28 +939,24 @@ proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.}
   result = ""
   var c = ""
   while true:
-    c = await p.recv(socket, 1)
+    c = await recv(socket, 1)
     if c.len == 0:
       return ""
     if c == "\r":
-      c = await p.recv(socket, 1, MSG_PEEK)
+      c = await recv(socket, 1, MSG_PEEK)
       if c.len > 0 and c == "\L":
-        discard await p.recv(socket, 1)
+        discard await recv(socket, 1)
       addNLIfEmpty()
       return
     elif c == "\L":
       addNLIfEmpty()
       return
-    add(result.string, c)
-
-var gDisp*{.threadvar.}: PDispatcher ## Global dispatcher
-gDisp = newDispatcher()
+    add(result, c)
 
 proc runForever*() =
   ## Begins a never ending global dispatcher poll loop.
   while true:
-    gDisp.poll()
-
+    poll()
 
 when isMainModule:
   
diff --git a/lib/pure/asyncnet.nim b/lib/pure/asyncnet.nim
index ed8a2fb82..451ef25ac 100644
--- a/lib/pure/asyncnet.nim
+++ b/lib/pure/asyncnet.nim
@@ -1,3 +1,11 @@
+#
+#
+#            Nimrod's Runtime Library
+#        (c) Copyright 2014 Dominik Picheta
+#
+#    See the file "copying.txt", included in this
+#    distribution, for details about the copyright.
+#
 import asyncdispatch
 import rawsockets
 import net
@@ -7,7 +15,7 @@ when defined(ssl):
 
 type
   TAsyncSocket = object ## socket type
-    fd: TSocketHandle
+    fd: TAsyncFD
     case isBuffered: bool # determines whether this socket is buffered.
     of true:
       buffer: array[0..BufferSize, char]
@@ -28,18 +36,18 @@ type
 
 # TODO: Save AF, domain etc info and reuse it in procs which need it like connect.
 
-proc newSocket(fd: TSocketHandle, isBuff: bool): PAsyncSocket =
-  assert fd != osInvalidSocket
+proc newSocket(fd: TAsyncFD, isBuff: bool): PAsyncSocket =
+  assert fd != osInvalidSocket.TAsyncFD
   new(result)
   result.fd = fd
   result.isBuffered = isBuff
   if isBuff:
     result.currPos = 0
 
-proc AsyncSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM,
+proc newAsyncSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM,
     protocol: TProtocol = IPPROTO_TCP, buffered = true): PAsyncSocket =
   ## Creates a new asynchronous socket.
-  result = newSocket(gDisp.socket(domain, typ, protocol), buffered)
+  result = newSocket(newAsyncRawSocket(domain, typ, protocol), buffered)
 
 proc connect*(socket: PAsyncSocket, address: string, port: TPort,
     af = AF_INET): PFuture[void] =
@@ -47,7 +55,7 @@ proc connect*(socket: PAsyncSocket, address: string, port: TPort,
   ##
   ## Returns a ``PFuture`` which will complete when the connection succeeds
   ## or an error occurs.
-  result = gDisp.connect(socket.fd, address, port, af)
+  result = connect(socket.fd, address, port, af)
 
 proc recv*(socket: PAsyncSocket, size: int,
            flags: int = 0): PFuture[string] =
@@ -56,12 +64,12 @@ proc recv*(socket: PAsyncSocket, size: int,
   ## recv operation then the future may complete with only a part of the
   ## requested data read. If socket is disconnected and no data is available
   ## to be read then the future will complete with a value of ``""``.
-  result = gDisp.recv(socket.fd, size, flags)
+  result = recv(socket.fd, size, flags)
 
 proc send*(socket: PAsyncSocket, data: string): PFuture[void] =
   ## Sends ``data`` to ``socket``. The returned future will complete once all
   ## data has been sent.
-  result = gDisp.send(socket.fd, data)
+  result = send(socket.fd, data)
 
 proc acceptAddr*(socket: PAsyncSocket): 
       PFuture[tuple[address: string, client: PAsyncSocket]] =
@@ -69,9 +77,9 @@ proc acceptAddr*(socket: PAsyncSocket):
   ## corresponding to that connection and the remote address of the client.
   ## The future will complete when the connection is successfully accepted.
   var retFuture = newFuture[tuple[address: string, client: PAsyncSocket]]()
-  var fut = gDisp.acceptAddr(socket.fd)
+  var fut = acceptAddr(socket.fd)
   fut.callback =
-    proc (future: PFuture[tuple[address: string, client: TSocketHandle]]) =
+    proc (future: PFuture[tuple[address: string, client: TAsyncFD]]) =
       assert future.finished
       if future.failed:
         retFuture.fail(future.readError)
@@ -133,7 +141,7 @@ proc recvLine*(socket: PAsyncSocket): PFuture[string] {.async.} =
 
 when isMainModule:
   proc main() {.async.} =
-    var sock = AsyncSocket()
+    var sock = newAsyncSocket()
     await sock.connect("irc.freenode.net", TPort(6667))
     while true:
       let line = await sock.recvLine()
diff --git a/lib/pure/net.nim b/lib/pure/net.nim
index d40f0949b..2461ece1b 100644
--- a/lib/pure/net.nim
+++ b/lib/pure/net.nim
@@ -347,7 +347,7 @@ type
 
   ETimeout* = object of ESynch
 
-proc newSocket(fd: TSocketHandle, isBuff: bool): PSocket =
+proc createSocket(fd: TSocketHandle, isBuff: bool): PSocket =
   assert fd != osInvalidSocket
   new(result)
   result.fd = fd
@@ -355,15 +355,15 @@ proc newSocket(fd: TSocketHandle, isBuff: bool): PSocket =
   if isBuff:
     result.currPos = 0
 
-proc socket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM,
+proc newSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM,
              protocol: TProtocol = IPPROTO_TCP, buffered = true): PSocket =
   ## Creates a new socket.
   ##
   ## If an error occurs EOS will be raised.
-  let fd = rawsockets.socket(domain, typ, protocol)
+  let fd = newRawSocket(domain, typ, protocol)
   if fd == osInvalidSocket:
     osError(osLastError())
-  result = newSocket(fd, buffered)
+  result = createSocket(fd, buffered)
 
 when defined(ssl):
   CRYPTO_malloc_init()
diff --git a/lib/pure/rawsockets.nim b/lib/pure/rawsockets.nim
index de7437420..aeaa7f3b5 100644
--- a/lib/pure/rawsockets.nim
+++ b/lib/pure/rawsockets.nim
@@ -143,7 +143,7 @@ else:
     result = cint(ord(p))
 
 
-proc socket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM,
+proc newRawSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM,
              protocol: TProtocol = IPPROTO_TCP): TSocketHandle =
   ## Creates a new socket; returns `InvalidSocket` if an error occurs.
   socket(toInt(domain), toInt(typ), toInt(protocol))
diff --git a/tests/async/tasyncawait.nim b/tests/async/tasyncawait.nim
index 7f2c1e6cc..7e6270247 100644
--- a/tests/async/tasyncawait.nim
+++ b/tests/async/tasyncawait.nim
@@ -5,7 +5,6 @@ discard """
 """
 import asyncdispatch, rawsockets, net, strutils, os
 
-var disp = newDispatcher()
 var msgCount = 0
 
 const
@@ -14,31 +13,31 @@ const
 
 var clientCount = 0
 
-proc sendMessages(disp: PDispatcher, client: TSocketHandle) {.async.} =
+proc sendMessages(client: TAsyncFD) {.async.} =
   for i in 0 .. <messagesToSend:
-    await disp.send(client, "Message " & $i & "\c\L")
+    await send(client, "Message " & $i & "\c\L")
 
-proc launchSwarm(disp: PDispatcher, port: TPort) {.async.} =
+proc launchSwarm(port: TPort) {.async.} =
   for i in 0 .. <swarmSize:
-    var sock = disp.socket()
+    var sock = newAsyncRawSocket()
 
     #disp.register(sock)
-    await disp.connect(sock, "localhost", port)
+    await connect(sock, "localhost", port)
     when true:
-      await sendMessages(disp, sock)
-      disp.close(sock)
+      await sendMessages(sock)
+      close(sock)
     else:
       # Issue #932: https://github.com/Araq/Nimrod/issues/932
-      var msgFut = sendMessages(disp, sock)
+      var msgFut = sendMessages(sock)
       msgFut.callback =
         proc () =
-          disp.close(sock)
+          close(sock)
 
-proc readMessages(disp: PDispatcher, client: TSocketHandle) {.async.} =
+proc readMessages(client: TAsyncFD) {.async.} =
   while true:
-    var line = await disp.recvLine(client)
+    var line = await recvLine(client)
     if line == "":
-      disp.close(client)
+      close(client)
       clientCount.inc
       break
     else:
@@ -47,8 +46,8 @@ proc readMessages(disp: PDispatcher, client: TSocketHandle) {.async.} =
       else:
         doAssert false
 
-proc createServer(disp: PDispatcher, port: TPort) {.async.} =
-  var server = disp.socket()
+proc createServer(port: TPort) {.async.} =
+  var server = newAsyncRawSocket()
   #disp.register(server)
   block:
     var name: TSockaddr_in
@@ -58,20 +57,20 @@ proc createServer(disp: PDispatcher, port: TPort) {.async.} =
       name.sin_family = toInt(AF_INET)
     name.sin_port = htons(int16(port))
     name.sin_addr.s_addr = htonl(INADDR_ANY)
-    if bindAddr(server, cast[ptr TSockAddr](addr(name)),
-                  sizeof(name).TSocklen) < 0'i32:
+    if bindAddr(server.TSocketHandle, cast[ptr TSockAddr](addr(name)),
+                sizeof(name).TSocklen) < 0'i32:
       osError(osLastError())
   
-  discard server.listen()
+  discard server.TSocketHandle.listen()
   while true:
-    var client = await disp.accept(server)
-    readMessages(disp, client)
+    var client = await accept(server)
+    readMessages(client)
     # TODO: Test: readMessages(disp, await disp.accept(server))
 
-disp.createServer(TPort(10335))
-disp.launchSwarm(TPort(10335))
+createServer(TPort(10335))
+launchSwarm(TPort(10335))
 while true:
-  disp.poll()
+  poll()
   if clientCount == swarmSize: break
 
 assert msgCount == swarmSize * messagesToSend