diff options
-rw-r--r-- | lib/pure/asyncdispatch.nim | 170 | ||||
-rw-r--r-- | lib/pure/asyncnet.nim | 11 | ||||
-rw-r--r-- | lib/pure/httpclient.nim | 34 | ||||
-rw-r--r-- | lib/pure/includes/asynccommon.nim | 201 | ||||
-rw-r--r-- | lib/pure/nativesockets.nim | 63 | ||||
-rw-r--r-- | lib/pure/net.nim | 61 | ||||
-rw-r--r-- | lib/pure/uri.nim | 21 | ||||
-rw-r--r-- | lib/upcoming/asyncdispatch.nim | 171 | ||||
-rw-r--r-- | lib/windows/winlean.nim | 3 | ||||
-rw-r--r-- | tests/async/tasyncdial.nim | 53 | ||||
-rw-r--r-- | tests/stdlib/thttpclient.nim | 40 | ||||
-rw-r--r-- | tests/stdlib/tnetdial.nim | 60 | ||||
-rw-r--r-- | web/news/e031_version_0_16_2.rst | 5 |
13 files changed, 559 insertions, 334 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index 1696c4ed9..6a877be30 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -9,7 +9,7 @@ include "system/inclrtl" -import os, oids, tables, strutils, times, heapqueue +import os, oids, tables, strutils, times, heapqueue, options import nativesockets, net, deques @@ -385,68 +385,6 @@ when defined(windows) or defined(nimdoc): dwRemoteAddressLength, LocalSockaddr, LocalSockaddrLength, RemoteSockaddr, RemoteSockaddrLength) - proc connect*(socket: AsyncFD, address: string, port: Port, - domain = nativesockets.AF_INET): Future[void] = - ## Connects ``socket`` to server at ``address:port``. - ## - ## Returns a ``Future`` which will complete when the connection succeeds - ## or an error occurs. - verifyPresence(socket) - var retFuture = newFuture[void]("connect") - # Apparently ``ConnectEx`` expects the socket to be initially bound: - var saddr: Sockaddr_in - saddr.sin_family = int16(toInt(domain)) - saddr.sin_port = 0 - saddr.sin_addr.s_addr = INADDR_ANY - if bindAddr(socket.SocketHandle, cast[ptr SockAddr](addr(saddr)), - sizeof(saddr).SockLen) < 0'i32: - raiseOSError(osLastError()) - - var aiList = getAddrInfo(address, port, domain) - var success = false - var lastError: OSErrorCode - var it = aiList - while it != nil: - # "the OVERLAPPED structure must remain valid until the I/O completes" - # http://blogs.msdn.com/b/oldnewthing/archive/2011/02/02/10123392.aspx - var ol = PCustomOverlapped() - GC_ref(ol) - ol.data = CompletionData(fd: socket, cb: - proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = - if not retFuture.finished: - if errcode == OSErrorCode(-1): - retFuture.complete() - else: - retFuture.fail(newException(OSError, osErrorMsg(errcode))) - ) - - var ret = connectEx(socket.SocketHandle, it.ai_addr, - sizeof(Sockaddr_in).cint, nil, 0, nil, - cast[POVERLAPPED](ol)) - if ret: - # Request to connect completed immediately. - success = true - retFuture.complete() - # We don't deallocate ``ol`` here because even though this completed - # immediately poll will still be notified about its completion and it will - # free ``ol``. - break - else: - lastError = osLastError() - if lastError.int32 == ERROR_IO_PENDING: - # In this case ``ol`` will be deallocated in ``poll``. - success = true - break - else: - GC_unref(ol) - success = false - it = it.ai_next - - freeAddrInfo(aiList) - if not success: - retFuture.fail(newException(OSError, osErrorMsg(lastError))) - return retFuture - proc recv*(socket: AsyncFD, size: int, flags = {SocketFlag.SafeDisconn}): Future[string] = ## Reads **up to** ``size`` bytes from ``socket``. Returned future will @@ -754,8 +692,8 @@ when defined(windows) or defined(nimdoc): var lpOutputBuf = newString(lpOutputLen) var dwBytesReceived: Dword let dwReceiveDataLength = 0.Dword # We don't want any data to be read. - let dwLocalAddressLength = Dword(sizeof(Sockaddr_in) + 16) - let dwRemoteAddressLength = Dword(sizeof(Sockaddr_in) + 16) + let dwLocalAddressLength = Dword(sizeof(Sockaddr_in6) + 16) + let dwRemoteAddressLength = Dword(sizeof(Sockaddr_in6) + 16) template failAccept(errcode) = if flags.isDisconnectionError(errcode): @@ -785,12 +723,14 @@ when defined(windows) or defined(nimdoc): dwLocalAddressLength, dwRemoteAddressLength, addr localSockaddr, addr localLen, addr remoteSockaddr, addr remoteLen) - register(clientSock.AsyncFD) - # TODO: IPv6. Check ``sa_family``. http://stackoverflow.com/a/9212542/492186 - retFuture.complete( - (address: $inet_ntoa(cast[ptr Sockaddr_in](remoteSockAddr).sin_addr), - client: clientSock.AsyncFD) - ) + try: + let address = getAddrString(remoteSockAddr) + register(clientSock.AsyncFD) + retFuture.complete((address: address, client: clientSock.AsyncFD)) + except: + # getAddrString may raise + clientSock.close() + retFuture.fail(getCurrentException()) var ol = PCustomOverlapped() GC_ref(ol) @@ -823,20 +763,6 @@ when defined(windows) or defined(nimdoc): return retFuture - proc newAsyncNativeSocket*(domain, sockType, protocol: cint): AsyncFD = - ## Creates a new socket and registers it with the dispatcher implicitly. - result = newNativeSocket(domain, sockType, protocol).AsyncFD - result.SocketHandle.setBlocking(false) - register(result) - - proc newAsyncNativeSocket*(domain: Domain = nativesockets.AF_INET, - sockType: SockType = SOCK_STREAM, - protocol: Protocol = IPPROTO_TCP): AsyncFD = - ## Creates a new socket and registers it with the dispatcher implicitly. - result = newNativeSocket(domain, sockType, protocol).AsyncFD - result.SocketHandle.setBlocking(false) - register(result) - proc closeSocket*(socket: AsyncFD) = ## Closes a socket and ensures that it is unregistered. socket.SocketHandle.close() @@ -1015,23 +941,6 @@ else: var data = PData(fd: fd, readCBs: @[], writeCBs: @[]) p.selector.register(fd.SocketHandle, {}, data.RootRef) - proc newAsyncNativeSocket*(domain: cint, sockType: cint, - protocol: cint): AsyncFD = - result = newNativeSocket(domain, sockType, protocol).AsyncFD - result.SocketHandle.setBlocking(false) - when defined(macosx): - result.SocketHandle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1) - register(result) - - proc newAsyncNativeSocket*(domain: Domain = AF_INET, - sockType: SockType = SOCK_STREAM, - protocol: Protocol = IPPROTO_TCP): AsyncFD = - result = newNativeSocket(domain, sockType, protocol).AsyncFD - result.SocketHandle.setBlocking(false) - when defined(macosx): - result.SocketHandle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1) - register(result) - proc closeSocket*(sock: AsyncFD) = let disp = getGlobalDispatcher() disp.selector.unregister(sock.SocketHandle) @@ -1115,50 +1024,6 @@ else: # Callback queue processing processPendingCallbacks(p) - proc connect*(socket: AsyncFD, address: string, port: Port, - domain = AF_INET): Future[void] = - var retFuture = newFuture[void]("connect") - - proc cb(fd: AsyncFD): bool = - var ret = SocketHandle(fd).getSockOptInt(cint(SOL_SOCKET), cint(SO_ERROR)) - if ret == 0: - # We have connected. - retFuture.complete() - return true - elif ret == EINTR: - # interrupted, keep waiting - return false - else: - retFuture.fail(newException(OSError, osErrorMsg(OSErrorCode(ret)))) - return true - - assert getSockDomain(socket.SocketHandle) == domain - var aiList = getAddrInfo(address, port, domain) - var success = false - var lastError: OSErrorCode - var it = aiList - while it != nil: - var ret = connect(socket.SocketHandle, it.ai_addr, it.ai_addrlen.Socklen) - if ret == 0: - # Request to connect completed immediately. - success = true - retFuture.complete() - break - else: - lastError = osLastError() - if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS: - success = true - addWrite(socket, cb) - break - else: - success = false - it = it.ai_next - - freeAddrInfo(aiList) - if not success: - retFuture.fail(newException(OSError, osErrorMsg(lastError))) - return retFuture - proc recv*(socket: AsyncFD, size: int, flags = {SocketFlag.SafeDisconn}): Future[string] = var retFuture = newFuture[string]("recv") @@ -1320,11 +1185,20 @@ else: else: retFuture.fail(newException(OSError, osErrorMsg(lastError))) else: - register(client.AsyncFD) - retFuture.complete((getAddrString(cast[ptr SockAddr](addr sockAddress)), client.AsyncFD)) + try: + let address = getAddrString(cast[ptr SockAddr](addr sockAddress)) + register(client.AsyncFD) + retFuture.complete((address, client.AsyncFD)) + except: + # getAddrString may raise + client.close() + retFuture.fail(getCurrentException()) addRead(socket, cb) return retFuture +# Common procedures between current and upcoming asyncdispatch +include includes.asynccommon + proc sleepAsync*(ms: int): Future[void] = ## Suspends the execution of the current async procedure for the next ## ``ms`` milliseconds. diff --git a/lib/pure/asyncnet.nim b/lib/pure/asyncnet.nim index 1ec751a64..11b7998b2 100644 --- a/lib/pure/asyncnet.nim +++ b/lib/pure/asyncnet.nim @@ -244,6 +244,17 @@ when defineSsl: else: raiseSSLError("Socket has been disconnected") +proc dial*(address: string, port: Port, protocol = IPPROTO_TCP, + buffered = true): Future[AsyncSocket] {.async.} = + ## Establishes connection to the specified ``address``:``port`` pair via the + ## specified protocol. The procedure iterates through possible + ## resolutions of the ``address`` until it succeeds, meaning that it + ## seamlessly works with both IPv4 and IPv6. + ## Returns AsyncSocket ready to send or receive data. + let asyncFd = await asyncdispatch.dial(address, port, protocol) + let sockType = protocol.toSockType() + let domain = getSockDomain(asyncFd.SocketHandle) + result = newAsyncSocket(asyncFd, domain, sockType, protocol, buffered) proc connect*(socket: AsyncSocket, address: string, port: Port) {.async.} = ## Connects ``socket`` to server at ``address:port``. diff --git a/lib/pure/httpclient.nim b/lib/pure/httpclient.nim index 20b76503e..1b8a20b65 100644 --- a/lib/pure/httpclient.nim +++ b/lib/pure/httpclient.nim @@ -1033,32 +1033,38 @@ proc newConnection(client: HttpClient | AsyncHttpClient, if client.currentURL.hostname != url.hostname or client.currentURL.scheme != url.scheme or client.currentURL.port != url.port: + let isSsl = url.scheme.toLowerAscii() == "https" + + if isSsl and not defined(ssl): + raise newException(HttpRequestError, + "SSL support is not available. Cannot connect over SSL.") + if client.connected: client.close() - when client is HttpClient: - client.socket = newSocket() - elif client is AsyncHttpClient: - client.socket = newAsyncSocket() - else: {.fatal: "Unsupported client type".} - # TODO: I should be able to write 'net.Port' here... let port = if url.port == "": - if url.scheme.toLower() == "https": + if isSsl: nativesockets.Port(443) else: nativesockets.Port(80) else: nativesockets.Port(url.port.parseInt) - if url.scheme.toLower() == "https": - when defined(ssl): - client.sslContext.wrapSocket(client.socket) - else: - raise newException(HttpRequestError, - "SSL support is not available. Cannot connect over SSL.") + when client is HttpClient: + client.socket = await net.dial(url.hostname, port) + elif client is AsyncHttpClient: + client.socket = await asyncnet.dial(url.hostname, port) + else: {.fatal: "Unsupported client type".} + + when defined(ssl): + if isSsl: + try: + client.sslContext.wrapConnectedSocket(client.socket, handshakeAsClient) + except: + client.socket.close() + raise getCurrentException() - await client.socket.connect(url.hostname, port) client.currentURL = url client.connected = true diff --git a/lib/pure/includes/asynccommon.nim b/lib/pure/includes/asynccommon.nim new file mode 100644 index 000000000..a7d2f803f --- /dev/null +++ b/lib/pure/includes/asynccommon.nim @@ -0,0 +1,201 @@ +template newAsyncNativeSocketImpl(domain, sockType, protocol) = + let handle = newNativeSocket(domain, sockType, protocol) + if handle == osInvalidSocket: + raiseOSError(osLastError()) + handle.setBlocking(false) + when defined(macosx): + handle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1) + result = handle.AsyncFD + register(result) + +proc newAsyncNativeSocket*(domain: cint, sockType: cint, + protocol: cint): AsyncFD = + newAsyncNativeSocketImpl(domain, sockType, protocol) + +proc newAsyncNativeSocket*(domain: Domain = Domain.AF_INET, + sockType: SockType = SOCK_STREAM, + protocol: Protocol = IPPROTO_TCP): AsyncFD = + newAsyncNativeSocketImpl(domain, sockType, protocol) + +when defined(windows) or defined(nimdoc): + proc bindToDomain(handle: SocketHandle, domain: Domain) = + # Extracted into a separate proc, because connect() on Windows requires + # the socket to be initially bound. + template doBind(saddr) = + if bindAddr(handle, cast[ptr SockAddr](addr(saddr)), + sizeof(saddr).SockLen) < 0'i32: + raiseOSError(osLastError()) + + if domain == Domain.AF_INET6: + var saddr: Sockaddr_in6 + saddr.sin6_family = int16(toInt(domain)) + doBind(saddr) + else: + var saddr: Sockaddr_in + saddr.sin_family = int16(toInt(domain)) + doBind(saddr) + + proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): Future[void] = + let retFuture = newFuture[void]("doConnect") + result = retFuture + + var ol = PCustomOverlapped() + GC_ref(ol) + ol.data = CompletionData(fd: socket, cb: + proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + if not retFuture.finished: + if errcode == OSErrorCode(-1): + retFuture.complete() + else: + retFuture.fail(newException(OSError, osErrorMsg(errcode))) + ) + + let ret = connectEx(socket.SocketHandle, addrInfo.ai_addr, + cint(addrInfo.ai_addrlen), nil, 0, nil, + cast[POVERLAPPED](ol)) + if ret: + # Request to connect completed immediately. + retFuture.complete() + # We don't deallocate ``ol`` here because even though this completed + # immediately poll will still be notified about its completion and it + # will free ``ol``. + else: + let lastError = osLastError() + if lastError.int32 != ERROR_IO_PENDING: + # With ERROR_IO_PENDING ``ol`` will be deallocated in ``poll``, + # and the future will be completed/failed there, too. + GC_unref(ol) + retFuture.fail(newException(OSError, osErrorMsg(lastError))) +else: + proc doConnect(socket: AsyncFD, addrInfo: ptr AddrInfo): Future[void] = + let retFuture = newFuture[void]("doConnect") + result = retFuture + + proc cb(fd: AsyncFD): bool = + let ret = SocketHandle(fd).getSockOptInt( + cint(SOL_SOCKET), cint(SO_ERROR)) + if ret == 0: + # We have connected. + retFuture.complete() + return true + elif ret == EINTR: + # interrupted, keep waiting + return false + else: + retFuture.fail(newException(OSError, osErrorMsg(OSErrorCode(ret)))) + return true + + let ret = connect(socket.SocketHandle, + addrInfo.ai_addr, + addrInfo.ai_addrlen.Socklen) + if ret == 0: + # Request to connect completed immediately. + retFuture.complete() + else: + let lastError = osLastError() + if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS: + addWrite(socket, cb) + else: + retFuture.fail(newException(OSError, osErrorMsg(lastError))) + +template asyncAddrInfoLoop(addrInfo: ptr AddrInfo, fd: untyped, + protocol: Protocol = IPPROTO_RAW) = + ## Iterates through the AddrInfo linked list asynchronously + ## until the connection can be established. + const shouldCreateFd = not declared(fd) + + when shouldCreateFd: + let sockType = protocol.toSockType() + + var fdPerDomain: array[low(Domain).ord..high(Domain).ord, AsyncFD] + for i in low(fdPerDomain)..high(fdPerDomain): + fdPerDomain[i] = osInvalidSocket.AsyncFD + template closeUnusedFds(domainToKeep = -1) {.dirty.} = + for i, fd in fdPerDomain: + if fd != osInvalidSocket.AsyncFD and i != domainToKeep: + fd.closeSocket() + + var lastException: ref Exception + var curAddrInfo = addrInfo + var domain: Domain + when shouldCreateFd: + var curFd: AsyncFD + else: + var curFd = fd + proc tryNextAddrInfo(fut: Future[void]) {.gcsafe.} = + if fut == nil or fut.failed: + if fut != nil: + lastException = fut.readError() + + while curAddrInfo != nil: + let domainOpt = curAddrInfo.ai_family.toKnownDomain() + if domainOpt.isSome: + domain = domainOpt.unsafeGet() + break + curAddrInfo = curAddrInfo.ai_next + + if curAddrInfo == nil: + freeAddrInfo(addrInfo) + when shouldCreateFd: + closeUnusedFds() + if lastException != nil: + retFuture.fail(lastException) + else: + retFuture.fail(newException( + IOError, "Couldn't resolve address: " & address)) + return + + when shouldCreateFd: + curFd = fdPerDomain[ord(domain)] + if curFd == osInvalidSocket.AsyncFD: + try: + curFd = newAsyncNativeSocket(domain, sockType, protocol) + except: + freeAddrInfo(addrInfo) + closeUnusedFds() + raise getCurrentException() + when defined(windows): + curFd.SocketHandle.bindToDomain(domain) + fdPerDomain[ord(domain)] = curFd + + doConnect(curFd, curAddrInfo).callback = tryNextAddrInfo + curAddrInfo = curAddrInfo.ai_next + else: + freeAddrInfo(addrInfo) + when shouldCreateFd: + closeUnusedFds(ord(domain)) + retFuture.complete(curFd) + else: + retFuture.complete() + + tryNextAddrInfo(nil) + +proc dial*(address: string, port: Port, + protocol: Protocol = IPPROTO_TCP): Future[AsyncFD] = + ## Establishes connection to the specified ``address``:``port`` pair via the + ## specified protocol. The procedure iterates through possible + ## resolutions of the ``address`` until it succeeds, meaning that it + ## seamlessly works with both IPv4 and IPv6. + ## Returns the async file descriptor, registered in the dispatcher of + ## the current thread, ready to send or receive data. + let retFuture = newFuture[AsyncFD]("dial") + result = retFuture + let sockType = protocol.toSockType() + + let aiList = getAddrInfo(address, port, Domain.AF_UNSPEC, sockType, protocol) + asyncAddrInfoLoop(aiList, noFD, protocol) + +proc connect*(socket: AsyncFD, address: string, port: Port, + domain = Domain.AF_INET): Future[void] = + let retFuture = newFuture[void]("connect") + result = retFuture + + when defined(windows): + verifyPresence(socket) + else: + assert getSockDomain(socket.SocketHandle) == domain + + let aiList = getAddrInfo(address, port, domain) + when defined(windows): + socket.SocketHandle.bindToDomain(domain) + asyncAddrInfoLoop(aiList, socket) diff --git a/lib/pure/nativesockets.nim b/lib/pure/nativesockets.nim index fa3fa4aea..7568408a6 100644 --- a/lib/pure/nativesockets.nim +++ b/lib/pure/nativesockets.nim @@ -12,7 +12,7 @@ # TODO: Clean up the exports a bit and everything else in general. -import os +import os, options when hostOS == "solaris": {.passl: "-lsocket -lnsl".} @@ -52,9 +52,11 @@ type Domain* = enum ## domain, which specifies the protocol family of the ## created socket. Other domains than those that are listed ## here are unsupported. - AF_UNIX, ## for local socket (using a file). Unsupported on Windows. + AF_UNSPEC = 0, ## unspecified domain (can be detected automatically by + ## some procedures, such as getaddrinfo) + AF_UNIX = 1, ## for local socket (using a file). Unsupported on Windows. AF_INET = 2, ## for network protocol IPv4 or - AF_INET6 = 23 ## for network protocol IPv6. + AF_INET6 = when defined(macosx): 30 else: 23 ## for network protocol IPv6. SockType* = enum ## second argument to `socket` proc SOCK_STREAM = 1, ## reliable stream-oriented service or Stream Sockets @@ -125,10 +127,19 @@ proc toInt*(p: Protocol): cint when not useWinVersion: proc toInt(domain: Domain): cshort = case domain + of AF_UNSPEC: result = posix.AF_UNSPEC.cshort of AF_UNIX: result = posix.AF_UNIX.cshort of AF_INET: result = posix.AF_INET.cshort of AF_INET6: result = posix.AF_INET6.cshort - else: discard + + proc toKnownDomain*(family: cint): Option[Domain] = + ## Converts the platform-dependent ``cint`` to the Domain or none(), + ## if the ``cint`` is not known. + result = if family == posix.AF_UNSPEC: some(Domain.AF_UNSPEC) + elif family == posix.AF_UNIX: some(Domain.AF_UNIX) + elif family == posix.AF_INET: some(Domain.AF_INET) + elif family == posix.AF_INET6: some(Domain.AF_INET6) + else: none(Domain) proc toInt(typ: SockType): cint = case typ @@ -136,7 +147,6 @@ when not useWinVersion: of SOCK_DGRAM: result = posix.SOCK_DGRAM of SOCK_SEQPACKET: result = posix.SOCK_SEQPACKET of SOCK_RAW: result = posix.SOCK_RAW - else: discard proc toInt(p: Protocol): cint = case p @@ -146,18 +156,33 @@ when not useWinVersion: of IPPROTO_IPV6: result = posix.IPPROTO_IPV6 of IPPROTO_RAW: result = posix.IPPROTO_RAW of IPPROTO_ICMP: result = posix.IPPROTO_ICMP - else: discard else: proc toInt(domain: Domain): cshort = result = toU16(ord(domain)) + proc toKnownDomain*(family: cint): Option[Domain] = + ## Converts the platform-dependent ``cint`` to the Domain or none(), + ## if the ``cint`` is not known. + result = if family == winlean.AF_UNSPEC: some(Domain.AF_UNSPEC) + elif family == winlean.AF_INET: some(Domain.AF_INET) + elif family == winlean.AF_INET6: some(Domain.AF_INET6) + else: none(Domain) + proc toInt(typ: SockType): cint = result = cint(ord(typ)) proc toInt(p: Protocol): cint = result = cint(ord(p)) +proc toSockType*(protocol: Protocol): SockType = + result = case protocol + of IPPROTO_TCP: + SOCK_STREAM + of IPPROTO_UDP: + SOCK_DGRAM + of IPPROTO_IP, IPPROTO_IPV6, IPPROTO_RAW, IPPROTO_ICMP: + SOCK_RAW proc newNativeSocket*(domain: Domain = AF_INET, sockType: SockType = SOCK_STREAM, @@ -392,14 +417,14 @@ proc getHostname*(): string {.tags: [ReadIOEffect].} = proc getSockDomain*(socket: SocketHandle): Domain = ## returns the socket's domain (AF_INET or AF_INET6). - var name: SockAddr + var name: Sockaddr_in6 var namelen = sizeof(name).SockLen if getsockname(socket, cast[ptr SockAddr](addr(name)), addr(namelen)) == -1'i32: raiseOSError(osLastError()) - if name.sa_family == nativeAfInet: + if name.sin6_family == nativeAfInet: result = AF_INET - elif name.sa_family == nativeAfInet6: + elif name.sin6_family == nativeAfInet6: result = AF_INET6 else: raiseOSError(osLastError(), "unknown socket family in getSockFamily") @@ -410,17 +435,23 @@ proc getAddrString*(sockAddr: ptr SockAddr): string = if sockAddr.sa_family == nativeAfInet: result = $inet_ntoa(cast[ptr Sockaddr_in](sockAddr).sin_addr) elif sockAddr.sa_family == nativeAfInet6: + let addrLen = when not useWinVersion: posix.INET6_ADDRSTRLEN + else: 46 # it's actually 46 in both cases + result = newString(addrLen) + let addr6 = addr cast[ptr Sockaddr_in6](sockAddr).sin6_addr when not useWinVersion: - # TODO: Windows - result = newString(posix.INET6_ADDRSTRLEN) - let addr6 = addr cast[ptr Sockaddr_in6](sockAddr).sin6_addr - discard posix.inet_ntop(posix.AF_INET6, addr6, result.cstring, - result.len.int32) + if posix.inet_ntop(posix.AF_INET6, addr6, addr result[0], + result.len.int32) == nil: + raiseOSError(osLastError()) if posix.IN6_IS_ADDR_V4MAPPED(addr6) != 0: result = result.substr("::ffff:".len) + else: + if winlean.inet_ntop(winlean.AF_INET6, addr6, addr result[0], + result.len.int32) == nil: + raiseOSError(osLastError()) + setLen(result, len(cstring(result))) else: - raiseOSError(osLastError(), "unknown socket family in getAddrString") - + raise newException(IOError, "Unknown socket family in getAddrString") proc getSockName*(socket: SocketHandle): Port = ## returns the socket's associated port number. diff --git a/lib/pure/net.nim b/lib/pure/net.nim index 56f8b9399..d175bd537 100644 --- a/lib/pure/net.nim +++ b/lib/pure/net.nim @@ -66,7 +66,7 @@ ## {.deadCodeElim: on.} -import nativesockets, os, strutils, parseutils, times, sets +import nativesockets, os, strutils, parseutils, times, sets, options export Port, `$`, `==` export Domain, SockType, Protocol @@ -669,7 +669,7 @@ proc close*(socket: Socket) = ## Closes a socket. try: when defineSsl: - if socket.isSSL: + if socket.isSSL and socket.sslHandle != nil: ErrClearError() # As we are closing the underlying socket immediately afterwards, # it is valid, under the TLS standard, to perform a unidirectional @@ -1477,6 +1477,63 @@ proc isIpAddress*(address_str: string): bool {.tags: [].} = return false return true +proc dial*(address: string, port: Port, + protocol = IPPROTO_TCP, buffered = true): Socket + {.tags: [ReadIOEffect, WriteIOEffect].} = + ## Establishes connection to the specified ``address``:``port`` pair via the + ## specified protocol. The procedure iterates through possible + ## resolutions of the ``address`` until it succeeds, meaning that it + ## seamlessly works with both IPv4 and IPv6. + ## Returns Socket ready to send or receive data. + let sockType = protocol.toSockType() + + let aiList = getAddrInfo(address, port, AF_UNSPEC, sockType, protocol) + + var fdPerDomain: array[low(Domain).ord..high(Domain).ord, SocketHandle] + for i in low(fdPerDomain)..high(fdPerDomain): + fdPerDomain[i] = osInvalidSocket + template closeUnusedFds(domainToKeep = -1) {.dirty.} = + for i, fd in fdPerDomain: + if fd != osInvalidSocket and i != domainToKeep: + fd.close() + + var success = false + var lastError: OSErrorCode + var it = aiList + var domain: Domain + var lastFd: SocketHandle + while it != nil: + let domainOpt = it.ai_family.toKnownDomain() + if domainOpt.isNone: + it = it.ai_next + continue + domain = domainOpt.unsafeGet() + lastFd = fdPerDomain[ord(domain)] + if lastFd == osInvalidSocket: + lastFd = newNativeSocket(domain, sockType, protocol) + if lastFd == osInvalidSocket: + # we always raise if socket creation failed, because it means a + # network system problem (e.g. not enough FDs), and not an unreachable + # address. + let err = osLastError() + freeAddrInfo(aiList) + closeUnusedFds() + raiseOSError(err) + fdPerDomain[ord(domain)] = lastFd + if connect(lastFd, it.ai_addr, it.ai_addrlen.SockLen) == 0'i32: + success = true + break + lastError = osLastError() + it = it.ai_next + freeAddrInfo(aiList) + closeUnusedFds(ord(domain)) + + if success: + result = newSocket(lastFd, domain, sockType, protocol) + elif lastError != 0.OSErrorCode: + raiseOSError(lastError) + else: + raise newException(IOError, "Couldn't resolve address: " & address) proc connect*(socket: Socket, address: string, port = Port(0)) {.tags: [ReadIOEffect].} = diff --git a/lib/pure/uri.nim b/lib/pure/uri.nim index ba745cfd3..c7e0ed1da 100644 --- a/lib/pure/uri.nim +++ b/lib/pure/uri.nim @@ -50,6 +50,7 @@ proc add*(url: var Url, a: Url) {.deprecated.} = proc parseAuthority(authority: string, result: var Uri) = var i = 0 var inPort = false + var inIPv6 = false while true: case authority[i] of '@': @@ -59,7 +60,14 @@ proc parseAuthority(authority: string, result: var Uri) = result.hostname.setLen(0) inPort = false of ':': - inPort = true + if inIPv6: + result.hostname.add(authority[i]) + else: + inPort = true + of '[': + inIPv6 = true + of ']': + inIPv6 = false of '\0': break else: if inPort: @@ -346,6 +354,17 @@ when isMainModule: doAssert($test == str) block: + # IPv6 address + let str = "foo://[::1]:1234/bar?baz=true&qux#quux" + let uri = parseUri(str) + doAssert uri.scheme == "foo" + doAssert uri.hostname == "::1" + doAssert uri.port == "1234" + doAssert uri.path == "/bar" + doAssert uri.query == "baz=true&qux" + doAssert uri.anchor == "quux" + + block: let str = "urn:example:animal:ferret:nose" let test = parseUri(str) doAssert test.scheme == "urn" diff --git a/lib/upcoming/asyncdispatch.nim b/lib/upcoming/asyncdispatch.nim index feee87bae..7c0497cc6 100644 --- a/lib/upcoming/asyncdispatch.nim +++ b/lib/upcoming/asyncdispatch.nim @@ -9,7 +9,7 @@ include "system/inclrtl" -import os, oids, tables, strutils, times, heapqueue, lists +import os, oids, tables, strutils, times, heapqueue, lists, options import nativesockets, net, deques @@ -325,68 +325,6 @@ when defined(windows) or defined(nimdoc): getAcceptExSockAddrs = cast[WSAPROC_GETACCEPTEXSOCKADDRS](fun) close(dummySock) - proc connect*(socket: AsyncFD, address: string, port: Port, - domain = nativesockets.AF_INET): Future[void] = - ## Connects ``socket`` to server at ``address:port``. - ## - ## Returns a ``Future`` which will complete when the connection succeeds - ## or an error occurs. - verifyPresence(socket) - var retFuture = newFuture[void]("connect") - # Apparently ``ConnectEx`` expects the socket to be initially bound: - var saddr: Sockaddr_in - saddr.sin_family = int16(toInt(domain)) - saddr.sin_port = 0 - saddr.sin_addr.s_addr = INADDR_ANY - if bindAddr(socket.SocketHandle, cast[ptr SockAddr](addr(saddr)), - sizeof(saddr).SockLen) < 0'i32: - raiseOSError(osLastError()) - - var aiList = getAddrInfo(address, port, domain) - var success = false - var lastError: OSErrorCode - var it = aiList - while it != nil: - # "the OVERLAPPED structure must remain valid until the I/O completes" - # http://blogs.msdn.com/b/oldnewthing/archive/2011/02/02/10123392.aspx - var ol = PCustomOverlapped() - GC_ref(ol) - ol.data = CompletionData(fd: socket, cb: - proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = - if not retFuture.finished: - if errcode == OSErrorCode(-1): - retFuture.complete() - else: - retFuture.fail(newException(OSError, osErrorMsg(errcode))) - ) - - var ret = connectEx(socket.SocketHandle, it.ai_addr, - sizeof(Sockaddr_in).cint, nil, 0, nil, - cast[POVERLAPPED](ol)) - if ret: - # Request to connect completed immediately. - success = true - retFuture.complete() - # We don't deallocate ``ol`` here because even though this completed - # immediately poll will still be notified about its completion and it will - # free ``ol``. - break - else: - lastError = osLastError() - if lastError.int32 == ERROR_IO_PENDING: - # In this case ``ol`` will be deallocated in ``poll``. - success = true - break - else: - GC_unref(ol) - success = false - it = it.ai_next - - freeAddrInfo(aiList) - if not success: - retFuture.fail(newException(OSError, osErrorMsg(lastError))) - return retFuture - proc recv*(socket: AsyncFD, size: int, flags = {SocketFlag.SafeDisconn}): Future[string] = ## Reads **up to** ``size`` bytes from ``socket``. Returned future will @@ -739,8 +677,8 @@ when defined(windows) or defined(nimdoc): var lpOutputBuf = newString(lpOutputLen) var dwBytesReceived: Dword let dwReceiveDataLength = 0.Dword # We don't want any data to be read. - let dwLocalAddressLength = Dword(sizeof(Sockaddr_in) + 16) - let dwRemoteAddressLength = Dword(sizeof(Sockaddr_in) + 16) + let dwLocalAddressLength = Dword(sizeof(Sockaddr_in6) + 16) + let dwRemoteAddressLength = Dword(sizeof(Sockaddr_in6) + 16) template failAccept(errcode) = if flags.isDisconnectionError(errcode): @@ -770,12 +708,14 @@ when defined(windows) or defined(nimdoc): dwLocalAddressLength, dwRemoteAddressLength, addr localSockaddr, addr localLen, addr remoteSockaddr, addr remoteLen) - register(clientSock.AsyncFD) - # TODO: IPv6. Check ``sa_family``. http://stackoverflow.com/a/9212542/492186 - retFuture.complete( - (address: $inet_ntoa(cast[ptr Sockaddr_in](remoteSockAddr).sin_addr), - client: clientSock.AsyncFD) - ) + try: + let address = getAddrString(remoteSockAddr) + register(clientSock.AsyncFD) + retFuture.complete((address: address, client: clientSock.AsyncFD)) + except: + # getAddrString may raise + clientSock.close() + retFuture.fail(getCurrentException()) var ol = PCustomOverlapped() GC_ref(ol) @@ -808,20 +748,6 @@ when defined(windows) or defined(nimdoc): return retFuture - proc newAsyncNativeSocket*(domain, sockType, protocol: cint): AsyncFD = - ## Creates a new socket and registers it with the dispatcher implicitly. - result = newNativeSocket(domain, sockType, protocol).AsyncFD - result.SocketHandle.setBlocking(false) - register(result) - - proc newAsyncNativeSocket*(domain: Domain = nativesockets.AF_INET, - sockType: SockType = SOCK_STREAM, - protocol: Protocol = IPPROTO_TCP): AsyncFD = - ## Creates a new socket and registers it with the dispatcher implicitly. - result = newNativeSocket(domain, sockType, protocol).AsyncFD - result.SocketHandle.setBlocking(false) - register(result) - proc closeSocket*(socket: AsyncFD) = ## Closes a socket and ensures that it is unregistered. socket.SocketHandle.close() @@ -1159,23 +1085,6 @@ else: var data = newAsyncData() p.selector.registerHandle(fd.SocketHandle, {}, data) - proc newAsyncNativeSocket*(domain: cint, sockType: cint, - protocol: cint): AsyncFD = - result = newNativeSocket(domain, sockType, protocol).AsyncFD - result.SocketHandle.setBlocking(false) - when defined(macosx): - result.SocketHandle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1) - register(result) - - proc newAsyncNativeSocket*(domain: Domain = AF_INET, - sockType: SockType = SOCK_STREAM, - protocol: Protocol = IPPROTO_TCP): AsyncFD = - result = newNativeSocket(domain, sockType, protocol).AsyncFD - result.SocketHandle.setBlocking(false) - when defined(macosx): - result.SocketHandle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1) - register(result) - proc closeSocket*(sock: AsyncFD) = let disp = getGlobalDispatcher() disp.selector.unregister(sock.SocketHandle) @@ -1331,50 +1240,6 @@ else: # Callback queue processing processPendingCallbacks(p) - proc connect*(socket: AsyncFD, address: string, port: Port, - domain = AF_INET): Future[void] = - var retFuture = newFuture[void]("connect") - - proc cb(fd: AsyncFD): bool = - var ret = SocketHandle(fd).getSockOptInt(cint(SOL_SOCKET), cint(SO_ERROR)) - if ret == 0: - # We have connected. - retFuture.complete() - return true - elif ret == EINTR: - # interrupted, keep waiting - return false - else: - retFuture.fail(newException(OSError, osErrorMsg(OSErrorCode(ret)))) - return true - - assert getSockDomain(socket.SocketHandle) == domain - var aiList = getAddrInfo(address, port, domain) - var success = false - var lastError: OSErrorCode - var it = aiList - while it != nil: - var ret = connect(socket.SocketHandle, it.ai_addr, it.ai_addrlen.Socklen) - if ret == 0: - # Request to connect completed immediately. - success = true - retFuture.complete() - break - else: - lastError = osLastError() - if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS: - success = true - addWrite(socket, cb) - break - else: - success = false - it = it.ai_next - - freeAddrInfo(aiList) - if not success: - retFuture.fail(newException(OSError, osErrorMsg(lastError))) - return retFuture - proc recv*(socket: AsyncFD, size: int, flags = {SocketFlag.SafeDisconn}): Future[string] = var retFuture = newFuture[string]("recv") @@ -1568,9 +1433,14 @@ else: else: retFuture.fail(newException(OSError, osErrorMsg(lastError))) else: - register(client.AsyncFD) - retFuture.complete((getAddrString(cast[ptr SockAddr](addr sockAddress)), - client.AsyncFD)) + try: + let address = getAddrString(cast[ptr SockAddr](addr sockAddress)) + register(client.AsyncFD) + retFuture.complete((address, client.AsyncFD)) + except: + # getAddrString may raise + client.close() + retFuture.fail(getCurrentException()) addRead(socket, cb) return retFuture @@ -1623,6 +1493,9 @@ else: data.readList.add(cb) p.selector.registerEvent(SelectEvent(ev), data) +# Common procedures between current and upcoming asyncdispatch +include includes.asynccommon + proc sleepAsync*(ms: int): Future[void] = ## Suspends the execution of the current async procedure for the next ## ``ms`` milliseconds. diff --git a/lib/windows/winlean.nim b/lib/windows/winlean.nim index 1f8dc9ad6..164499543 100644 --- a/lib/windows/winlean.nim +++ b/lib/windows/winlean.nim @@ -495,7 +495,7 @@ type ai_family*: cint ## Address family of socket. ai_socktype*: cint ## Socket type. ai_protocol*: cint ## Protocol of socket. - ai_addrlen*: int ## Length of socket address. + ai_addrlen*: csize ## Length of socket address. ai_canonname*: cstring ## Canonical name of service location. ai_addr*: ptr SockAddr ## Socket address of socket. ai_next*: ptr AddrInfo ## Pointer to next in list. @@ -803,6 +803,7 @@ const SIO_GET_EXTENSION_FUNCTION_POINTER* = WSAIORW(IOC_WS2,6).DWORD SO_UPDATE_ACCEPT_CONTEXT* = 0x700B AI_V4MAPPED* = 0x0008 + AF_UNSPEC* = 0 AF_INET* = 2 AF_INET6* = 23 diff --git a/tests/async/tasyncdial.nim b/tests/async/tasyncdial.nim new file mode 100644 index 000000000..d70e14020 --- /dev/null +++ b/tests/async/tasyncdial.nim @@ -0,0 +1,53 @@ +discard """ + file: "tasyncdial.nim" + output: ''' +OK AF_INET +OK AF_INET6 +''' +""" + +import + nativesockets, os, asyncdispatch + +proc setupServerSocket(hostname: string, port: Port, domain: Domain): AsyncFD = + ## Creates a socket, binds it to the specified address, and starts listening for connecitons. + ## Registers the descriptor with the dispatcher of the current thread + ## Raises OSError in case of an error. + let fd = newNativeSocket(domain) + setSockOptInt(fd, SOL_SOCKET, SO_REUSEADDR, 1) + var aiList = getAddrInfo(hostname, port, domain) + if bindAddr(fd, aiList.ai_addr, aiList.ai_addrlen.Socklen) < 0'i32: + freeAddrInfo(aiList) + raiseOSError(osLastError()) + freeAddrInfo(aiList) + if listen(fd) != 0: + raiseOSError(osLastError()) + setBlocking(fd, false) + result = fd.AsyncFD + register(result) + +proc doTest(domain: static[Domain]) {.async.} = + const + testHost = when domain == Domain.AF_INET6: "::1" else: "127.0.0.1" + testPort = Port(17384) + let serverFd = setupServerSocket(testHost, testPort, domain) + let acceptFut = serverFd.accept() + let clientFdFut = dial(testHost, testPort) + + let serverClientFd = await acceptFut + serverFd.closeSocket() + + let clientFd = await clientFdFut + + let recvFut = serverClientFd.recv(2) + await clientFd.send("Hi") + let msg = await recvFut + + serverClientFd.closeSocket() + clientFd.closeSocket() + + if msg == "Hi": + echo "OK ", domain + +waitFor(doTest(Domain.AF_INET)) +waitFor(doTest(Domain.AF_INET6)) diff --git a/tests/stdlib/thttpclient.nim b/tests/stdlib/thttpclient.nim index 62c1ebee7..40324d92a 100644 --- a/tests/stdlib/thttpclient.nim +++ b/tests/stdlib/thttpclient.nim @@ -1,11 +1,13 @@ discard """ cmd: "nim c --threads:on -d:ssl $file" + exitcode: 0 + output: "OK" """ import strutils from net import TimeoutError -import httpclient, asyncdispatch +import nativesockets, os, httpclient, asyncdispatch const manualTests = false @@ -112,6 +114,40 @@ proc syncTest() = except: doAssert false, "TimeoutError should have been raised." -syncTest() +proc makeIPv6HttpServer(hostname: string, port: Port): AsyncFD = + let fd = newNativeSocket(AF_INET6) + setSockOptInt(fd, SOL_SOCKET, SO_REUSEADDR, 1) + var aiList = getAddrInfo(hostname, port, AF_INET6) + if bindAddr(fd, aiList.ai_addr, aiList.ai_addrlen.Socklen) < 0'i32: + freeAddrInfo(aiList) + raiseOSError(osLastError()) + freeAddrInfo(aiList) + if listen(fd) != 0: + raiseOSError(osLastError()) + setBlocking(fd, false) + + var serverFd = fd.AsyncFD + register(serverFd) + result = serverFd + + proc onAccept(fut: Future[AsyncFD]) {.gcsafe.} = + if not fut.failed: + let clientFd = fut.read() + clientFd.send("HTTP/1.1 200 OK\r\LContent-Length: 0\r\LConnection: Closed\r\L\r\L").callback = proc() = + clientFd.closeSocket() + serverFd.accept().callback = onAccept + serverFd.accept().callback = onAccept + +proc ipv6Test() = + var client = newAsyncHttpClient() + let serverFd = makeIPv6HttpServer("::1", Port(18473)) + var resp = waitFor client.request("http://[::1]:18473/") + doAssert(resp.status == "200 OK") + serverFd.closeSocket() + client.close() +syncTest() waitFor(asyncTest()) +ipv6Test() + +echo "OK" diff --git a/tests/stdlib/tnetdial.nim b/tests/stdlib/tnetdial.nim new file mode 100644 index 000000000..da6088d70 --- /dev/null +++ b/tests/stdlib/tnetdial.nim @@ -0,0 +1,60 @@ +discard """ + cmd: "nim c --threads:on $file" + exitcode: 0 + output: "OK" +""" + +import os, net, nativesockets, asyncdispatch + +## Test for net.dial + +const port = Port(28431) + +proc initIPv6Server(hostname: string, port: Port): AsyncFD = + let fd = newNativeSocket(AF_INET6) + setSockOptInt(fd, SOL_SOCKET, SO_REUSEADDR, 1) + var aiList = getAddrInfo(hostname, port, AF_INET6) + if bindAddr(fd, aiList.ai_addr, aiList.ai_addrlen.Socklen) < 0'i32: + freeAddrInfo(aiList) + raiseOSError(osLastError()) + freeAddrInfo(aiList) + if listen(fd) != 0: + raiseOSError(osLastError()) + setBlocking(fd, false) + + var serverFd = fd.AsyncFD + register(serverFd) + result = serverFd + +# Since net.dial is synchronous, we use main thread to setup server, +# and dial to it from another thread. + +proc testThread() {.thread.} = + let fd = net.dial("::1", port) + var s = newString(5) + doAssert fd.recv(addr s[0], 5) == 5 + if s == "Hello": + echo "OK" + fd.close() + +proc test() = + var t: Thread[void] + createThread(t, testThread) + + let serverFd = initIPv6Server("::1", port) + var done = false + + serverFd.accept().callback = proc(fut: Future[AsyncFD]) = + serverFd.closeSocket() + if not fut.failed: + let fd = fut.read() + fd.send("Hello").callback = proc() = + fd.closeSocket() + done = true + + while not done: + poll() + + joinThread(t) + +test() diff --git a/web/news/e031_version_0_16_2.rst b/web/news/e031_version_0_16_2.rst index b38f0c159..63884700f 100644 --- a/web/news/e031_version_0_16_2.rst +++ b/web/news/e031_version_0_16_2.rst @@ -66,7 +66,10 @@ Library Additions ----------------- - Added ``system.onThreadDestruction``. - +- Added ``dial`` procedure to networking modules: ``net``, ``asyncdispatch``, + ``asyncnet``. It merges socket creation, address resolution, and connection + into single step. When using ``dial``, you don't have to worry about + IPv4 vs IPv6 problem. ``httpclient`` now supports IPv6. Tool Additions -------------- |