summary refs log tree commit diff stats
path: root/lib/upcoming/asyncdispatch.nim
diff options
context:
space:
mode:
authorAndreas Rumpf <rumpf_a@web.de>2017-05-04 16:02:50 +0200
committerAndreas Rumpf <rumpf_a@web.de>2017-05-04 16:02:50 +0200
commit764cc0217735454c166cb7dd490db58f5fa6fe26 (patch)
tree63633b39ae24815e8b677ccbc9bac2cb01c39d3b /lib/upcoming/asyncdispatch.nim
parentafa80092d378a6dbc116c0aa3ed3964fd8c599d6 (diff)
parentc1aa973758a60d7ef0e698c94861b74132612de5 (diff)
downloadNim-764cc0217735454c166cb7dd490db58f5fa6fe26.tar.gz
Merge branch 'devel' into araq
Diffstat (limited to 'lib/upcoming/asyncdispatch.nim')
-rw-r--r--lib/upcoming/asyncdispatch.nim171
1 files changed, 22 insertions, 149 deletions
diff --git a/lib/upcoming/asyncdispatch.nim b/lib/upcoming/asyncdispatch.nim
index feee87bae..7c0497cc6 100644
--- a/lib/upcoming/asyncdispatch.nim
+++ b/lib/upcoming/asyncdispatch.nim
@@ -9,7 +9,7 @@
 
 include "system/inclrtl"
 
-import os, oids, tables, strutils, times, heapqueue, lists
+import os, oids, tables, strutils, times, heapqueue, lists, options
 
 import nativesockets, net, deques
 
@@ -325,68 +325,6 @@ when defined(windows) or defined(nimdoc):
     getAcceptExSockAddrs = cast[WSAPROC_GETACCEPTEXSOCKADDRS](fun)
     close(dummySock)
 
-  proc connect*(socket: AsyncFD, address: string, port: Port,
-    domain = nativesockets.AF_INET): Future[void] =
-    ## Connects ``socket`` to server at ``address:port``.
-    ##
-    ## Returns a ``Future`` which will complete when the connection succeeds
-    ## or an error occurs.
-    verifyPresence(socket)
-    var retFuture = newFuture[void]("connect")
-    # Apparently ``ConnectEx`` expects the socket to be initially bound:
-    var saddr: Sockaddr_in
-    saddr.sin_family = int16(toInt(domain))
-    saddr.sin_port = 0
-    saddr.sin_addr.s_addr = INADDR_ANY
-    if bindAddr(socket.SocketHandle, cast[ptr SockAddr](addr(saddr)),
-                  sizeof(saddr).SockLen) < 0'i32:
-      raiseOSError(osLastError())
-
-    var aiList = getAddrInfo(address, port, domain)
-    var success = false
-    var lastError: OSErrorCode
-    var it = aiList
-    while it != nil:
-      # "the OVERLAPPED structure must remain valid until the I/O completes"
-      # http://blogs.msdn.com/b/oldnewthing/archive/2011/02/02/10123392.aspx
-      var ol = PCustomOverlapped()
-      GC_ref(ol)
-      ol.data = CompletionData(fd: socket, cb:
-        proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
-          if not retFuture.finished:
-            if errcode == OSErrorCode(-1):
-              retFuture.complete()
-            else:
-              retFuture.fail(newException(OSError, osErrorMsg(errcode)))
-      )
-
-      var ret = connectEx(socket.SocketHandle, it.ai_addr,
-                          sizeof(Sockaddr_in).cint, nil, 0, nil,
-                          cast[POVERLAPPED](ol))
-      if ret:
-        # Request to connect completed immediately.
-        success = true
-        retFuture.complete()
-        # We don't deallocate ``ol`` here because even though this completed
-        # immediately poll will still be notified about its completion and it will
-        # free ``ol``.
-        break
-      else:
-        lastError = osLastError()
-        if lastError.int32 == ERROR_IO_PENDING:
-          # In this case ``ol`` will be deallocated in ``poll``.
-          success = true
-          break
-        else:
-          GC_unref(ol)
-          success = false
-      it = it.ai_next
-
-    freeAddrInfo(aiList)
-    if not success:
-      retFuture.fail(newException(OSError, osErrorMsg(lastError)))
-    return retFuture
-
   proc recv*(socket: AsyncFD, size: int,
              flags = {SocketFlag.SafeDisconn}): Future[string] =
     ## Reads **up to** ``size`` bytes from ``socket``. Returned future will
@@ -739,8 +677,8 @@ when defined(windows) or defined(nimdoc):
     var lpOutputBuf = newString(lpOutputLen)
     var dwBytesReceived: Dword
     let dwReceiveDataLength = 0.Dword # We don't want any data to be read.
-    let dwLocalAddressLength = Dword(sizeof(Sockaddr_in) + 16)
-    let dwRemoteAddressLength = Dword(sizeof(Sockaddr_in) + 16)
+    let dwLocalAddressLength = Dword(sizeof(Sockaddr_in6) + 16)
+    let dwRemoteAddressLength = Dword(sizeof(Sockaddr_in6) + 16)
 
     template failAccept(errcode) =
       if flags.isDisconnectionError(errcode):
@@ -770,12 +708,14 @@ when defined(windows) or defined(nimdoc):
                              dwLocalAddressLength, dwRemoteAddressLength,
                              addr localSockaddr, addr localLen,
                              addr remoteSockaddr, addr remoteLen)
-        register(clientSock.AsyncFD)
-        # TODO: IPv6. Check ``sa_family``. http://stackoverflow.com/a/9212542/492186
-        retFuture.complete(
-          (address: $inet_ntoa(cast[ptr Sockaddr_in](remoteSockAddr).sin_addr),
-          client: clientSock.AsyncFD)
-        )
+        try:
+          let address = getAddrString(remoteSockAddr)
+          register(clientSock.AsyncFD)
+          retFuture.complete((address: address, client: clientSock.AsyncFD))
+        except:
+          # getAddrString may raise
+          clientSock.close()
+          retFuture.fail(getCurrentException())
 
     var ol = PCustomOverlapped()
     GC_ref(ol)
@@ -808,20 +748,6 @@ when defined(windows) or defined(nimdoc):
 
     return retFuture
 
-  proc newAsyncNativeSocket*(domain, sockType, protocol: cint): AsyncFD =
-    ## Creates a new socket and registers it with the dispatcher implicitly.
-    result = newNativeSocket(domain, sockType, protocol).AsyncFD
-    result.SocketHandle.setBlocking(false)
-    register(result)
-
-  proc newAsyncNativeSocket*(domain: Domain = nativesockets.AF_INET,
-                             sockType: SockType = SOCK_STREAM,
-                             protocol: Protocol = IPPROTO_TCP): AsyncFD =
-    ## Creates a new socket and registers it with the dispatcher implicitly.
-    result = newNativeSocket(domain, sockType, protocol).AsyncFD
-    result.SocketHandle.setBlocking(false)
-    register(result)
-
   proc closeSocket*(socket: AsyncFD) =
     ## Closes a socket and ensures that it is unregistered.
     socket.SocketHandle.close()
@@ -1159,23 +1085,6 @@ else:
     var data = newAsyncData()
     p.selector.registerHandle(fd.SocketHandle, {}, data)
 
-  proc newAsyncNativeSocket*(domain: cint, sockType: cint,
-                             protocol: cint): AsyncFD =
-    result = newNativeSocket(domain, sockType, protocol).AsyncFD
-    result.SocketHandle.setBlocking(false)
-    when defined(macosx):
-      result.SocketHandle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1)
-    register(result)
-
-  proc newAsyncNativeSocket*(domain: Domain = AF_INET,
-                             sockType: SockType = SOCK_STREAM,
-                             protocol: Protocol = IPPROTO_TCP): AsyncFD =
-    result = newNativeSocket(domain, sockType, protocol).AsyncFD
-    result.SocketHandle.setBlocking(false)
-    when defined(macosx):
-      result.SocketHandle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1)
-    register(result)
-
   proc closeSocket*(sock: AsyncFD) =
     let disp = getGlobalDispatcher()
     disp.selector.unregister(sock.SocketHandle)
@@ -1331,50 +1240,6 @@ else:
     # Callback queue processing
     processPendingCallbacks(p)
 
-  proc connect*(socket: AsyncFD, address: string, port: Port,
-    domain = AF_INET): Future[void] =
-    var retFuture = newFuture[void]("connect")
-
-    proc cb(fd: AsyncFD): bool =
-      var ret = SocketHandle(fd).getSockOptInt(cint(SOL_SOCKET), cint(SO_ERROR))
-      if ret == 0:
-          # We have connected.
-          retFuture.complete()
-          return true
-      elif ret == EINTR:
-          # interrupted, keep waiting
-          return false
-      else:
-          retFuture.fail(newException(OSError, osErrorMsg(OSErrorCode(ret))))
-          return true
-
-    assert getSockDomain(socket.SocketHandle) == domain
-    var aiList = getAddrInfo(address, port, domain)
-    var success = false
-    var lastError: OSErrorCode
-    var it = aiList
-    while it != nil:
-      var ret = connect(socket.SocketHandle, it.ai_addr, it.ai_addrlen.Socklen)
-      if ret == 0:
-        # Request to connect completed immediately.
-        success = true
-        retFuture.complete()
-        break
-      else:
-        lastError = osLastError()
-        if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS:
-          success = true
-          addWrite(socket, cb)
-          break
-        else:
-          success = false
-      it = it.ai_next
-
-    freeAddrInfo(aiList)
-    if not success:
-      retFuture.fail(newException(OSError, osErrorMsg(lastError)))
-    return retFuture
-
   proc recv*(socket: AsyncFD, size: int,
              flags = {SocketFlag.SafeDisconn}): Future[string] =
     var retFuture = newFuture[string]("recv")
@@ -1568,9 +1433,14 @@ else:
           else:
             retFuture.fail(newException(OSError, osErrorMsg(lastError)))
       else:
-        register(client.AsyncFD)
-        retFuture.complete((getAddrString(cast[ptr SockAddr](addr sockAddress)),
-                            client.AsyncFD))
+        try:
+          let address = getAddrString(cast[ptr SockAddr](addr sockAddress))
+          register(client.AsyncFD)
+          retFuture.complete((address, client.AsyncFD))
+        except:
+          # getAddrString may raise
+          client.close()
+          retFuture.fail(getCurrentException())
     addRead(socket, cb)
     return retFuture
 
@@ -1623,6 +1493,9 @@ else:
     data.readList.add(cb)
     p.selector.registerEvent(SelectEvent(ev), data)
 
+# Common procedures between current and upcoming asyncdispatch
+include includes.asynccommon
+
 proc sleepAsync*(ms: int): Future[void] =
   ## Suspends the execution of the current async procedure for the next
   ## ``ms`` milliseconds.
82'>582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701