diff options
author | Dominik Picheta <dominikpicheta@googlemail.com> | 2014-03-22 22:33:02 +0000 |
---|---|---|
committer | Dominik Picheta <dominikpicheta@googlemail.com> | 2014-03-22 22:33:53 +0000 |
commit | 192e11e7b72470e27bc6bccf1fedbfefc9c4ebd8 (patch) | |
tree | 0891ff99456d2af5f1a74198aa3a5b3b9f02191d | |
parent | 2ce9f1c77f9aa4504c55c75c57e74a5ad840916d (diff) | |
download | Nim-192e11e7b72470e27bc6bccf1fedbfefc9c4ebd8.tar.gz |
Many renames. Created high level asyncnet module.
-rw-r--r-- | lib/pure/asyncdispatch.nim (renamed from lib/pure/asyncio2.nim) | 28 | ||||
-rw-r--r-- | lib/pure/asyncnet.nim | 147 | ||||
-rw-r--r-- | lib/pure/net.nim | 4 | ||||
-rw-r--r-- | lib/pure/rawsockets.nim (renamed from lib/pure/sockets2.nim) | 8 |
4 files changed, 175 insertions, 12 deletions
diff --git a/lib/pure/asyncio2.nim b/lib/pure/asyncdispatch.nim index fbb02c37c..67361e46c 100644 --- a/lib/pure/asyncio2.nim +++ b/lib/pure/asyncdispatch.nim @@ -9,14 +9,14 @@ import os, oids, tables, strutils, macros -import sockets2 +import rawsockets -## Asyncio2 +## AsyncDispatch ## -------- ## -## This module implements a brand new asyncio module based on Futures. -## IOCP is used under the hood on Windows and the selectors module is used for -## other operating systems. +## This module implements a brand new dispatcher based on Futures. +## On Windows IOCP is used and on other operating systems the selectors module +## is used instead. # -- Futures @@ -27,7 +27,7 @@ type PFuture*[T] = ref object of PFutureBase value: T - error: ref EBase + error*: ref EBase # TODO: This shouldn't be necessary, generics bug? proc newFuture*[T](): PFuture[T] = ## Creates a new future. @@ -90,6 +90,11 @@ proc read*[T](future: PFuture[T]): T = # TODO: Make a custom exception type for this? raise newException(EInvalidValue, "Future still in progress.") +proc readError*[T](future: PFuture[T]): ref EBase = + if future.error != nil: return future.error + else: + raise newException(EInvalidValue, "No error in future.") + proc finished*[T](future: PFuture[T]): bool = ## Determines whether ``future`` has completed. ## @@ -478,6 +483,7 @@ when defined(windows) or defined(nimdoc): protocol: TProtocol = IPPROTO_TCP): TSocketHandle = ## Creates a new socket and registers it with the dispatcher implicitly. result = socket(domain, typ, protocol) + result.setBlocking(false) disp.register(result) proc close*(disp: PDispatcher, socket: TSocketHandle) = @@ -516,6 +522,7 @@ else: typ: TType = SOCK_STREAM, protocol: TProtocol = IPPROTO_TCP): TSocketHandle = result = socket(domain, typ, protocol) + result.setBlocking(false) disp.register(result) proc close*(disp: PDispatcher, sock: TSocketHandle) = @@ -919,6 +926,15 @@ proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.} return add(result.string, c) +var gDisp*{.threadvar.}: PDispatcher ## Global dispatcher +gDisp = newDispatcher() + +proc runForever*() = + ## Begins a never ending global dispatcher poll loop. + while true: + gDisp.poll() + + when isMainModule: var p = newDispatcher() diff --git a/lib/pure/asyncnet.nim b/lib/pure/asyncnet.nim new file mode 100644 index 000000000..ed8a2fb82 --- /dev/null +++ b/lib/pure/asyncnet.nim @@ -0,0 +1,147 @@ +import asyncdispatch +import rawsockets +import net + +when defined(ssl): + import openssl + +type + TAsyncSocket = object ## socket type + fd: TSocketHandle + case isBuffered: bool # determines whether this socket is buffered. + of true: + buffer: array[0..BufferSize, char] + currPos: int # current index in buffer + bufLen: int # current length of buffer + of false: nil + when defined(ssl): + case isSsl: bool + of true: + sslHandle: PSSL + sslContext: PSSLContext + sslNoHandshake: bool # True if needs handshake. + sslHasPeekChar: bool + sslPeekChar: char + of false: nil + + PAsyncSocket* = ref TAsyncSocket + +# TODO: Save AF, domain etc info and reuse it in procs which need it like connect. + +proc newSocket(fd: TSocketHandle, isBuff: bool): PAsyncSocket = + assert fd != osInvalidSocket + new(result) + result.fd = fd + result.isBuffered = isBuff + if isBuff: + result.currPos = 0 + +proc AsyncSocket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM, + protocol: TProtocol = IPPROTO_TCP, buffered = true): PAsyncSocket = + ## Creates a new asynchronous socket. + result = newSocket(gDisp.socket(domain, typ, protocol), buffered) + +proc connect*(socket: PAsyncSocket, address: string, port: TPort, + af = AF_INET): PFuture[void] = + ## Connects ``socket`` to server at ``address:port``. + ## + ## Returns a ``PFuture`` which will complete when the connection succeeds + ## or an error occurs. + result = gDisp.connect(socket.fd, address, port, af) + +proc recv*(socket: PAsyncSocket, size: int, + flags: int = 0): PFuture[string] = + ## Reads ``size`` bytes from ``socket``. Returned future will complete once + ## all of the requested data is read. If socket is disconnected during the + ## recv operation then the future may complete with only a part of the + ## requested data read. If socket is disconnected and no data is available + ## to be read then the future will complete with a value of ``""``. + result = gDisp.recv(socket.fd, size, flags) + +proc send*(socket: PAsyncSocket, data: string): PFuture[void] = + ## Sends ``data`` to ``socket``. The returned future will complete once all + ## data has been sent. + result = gDisp.send(socket.fd, data) + +proc acceptAddr*(socket: PAsyncSocket): + PFuture[tuple[address: string, client: PAsyncSocket]] = + ## Accepts a new connection. Returns a future containing the client socket + ## corresponding to that connection and the remote address of the client. + ## The future will complete when the connection is successfully accepted. + var retFuture = newFuture[tuple[address: string, client: PAsyncSocket]]() + var fut = gDisp.acceptAddr(socket.fd) + fut.callback = + proc (future: PFuture[tuple[address: string, client: TSocketHandle]]) = + assert future.finished + if future.failed: + retFuture.fail(future.readError) + else: + let resultTup = (future.read.address, + newSocket(future.read.client, socket.isBuffered)) + retFuture.complete(resultTup) + return retFuture + +proc accept*(socket: PAsyncSocket): PFuture[PAsyncSocket] = + ## Accepts a new connection. Returns a future containing the client socket + ## corresponding to that connection. + ## The future will complete when the connection is successfully accepted. + var retFut = newFuture[PAsyncSocket]() + var fut = acceptAddr(socket) + fut.callback = + proc (future: PFuture[tuple[address: string, client: PAsyncSocket]]) = + assert future.finished + if future.failed: + retFut.fail(future.readError) + else: + retFut.complete(future.read.client) + return retFut + +proc recvLine*(socket: PAsyncSocket): PFuture[string] {.async.} = + ## Reads a line of data from ``socket``. Returned future will complete once + ## a full line is read or an error occurs. + ## + ## If a full line is read ``\r\L`` is not + ## added to ``line``, however if solely ``\r\L`` is read then ``line`` + ## will be set to it. + ## + ## If the socket is disconnected, ``line`` will be set to ``""``. + ## + ## If the socket is disconnected in the middle of a line (before ``\r\L`` + ## is read) then line will be set to ``""``. + ## The partial line **will be lost**. + + template addNLIfEmpty(): stmt = + if result.len == 0: + result.add("\c\L") + + result = "" + var c = "" + while true: + c = await recv(socket, 1) + if c.len == 0: + return "" + if c == "\r": + c = await recv(socket, 1, MSG_PEEK) + if c.len > 0 and c == "\L": + discard await recv(socket, 1) + addNLIfEmpty() + return + elif c == "\L": + addNLIfEmpty() + return + add(result.string, c) + +when isMainModule: + proc main() {.async.} = + var sock = AsyncSocket() + await sock.connect("irc.freenode.net", TPort(6667)) + while true: + let line = await sock.recvLine() + if line == "": + echo("Disconnected") + break + else: + echo("Got line: ", line) + main() + runForever() + diff --git a/lib/pure/net.nim b/lib/pure/net.nim index 45883166b..d40f0949b 100644 --- a/lib/pure/net.nim +++ b/lib/pure/net.nim @@ -10,7 +10,7 @@ ## This module implements a high-level cross-platform sockets interface. {.deadCodeElim: on.} -import sockets2, os, strutils, unsigned, parseutils, times +import rawsockets, os, strutils, unsigned, parseutils, times type IpAddressFamily* {.pure.} = enum ## Describes the type of an IP address @@ -360,7 +360,7 @@ proc socket*(domain: TDomain = AF_INET, typ: TType = SOCK_STREAM, ## Creates a new socket. ## ## If an error occurs EOS will be raised. - let fd = sockets2.socket(domain, typ, protocol) + let fd = rawsockets.socket(domain, typ, protocol) if fd == osInvalidSocket: osError(osLastError()) result = newSocket(fd, buffered) diff --git a/lib/pure/sockets2.nim b/lib/pure/rawsockets.nim index 975cc685a..db04f6097 100644 --- a/lib/pure/sockets2.nim +++ b/lib/pure/rawsockets.nim @@ -209,13 +209,13 @@ proc htonl*(x: int32): int32 = ## Converts 32-bit integers from host to network byte order. On machines ## where the host byte order is the same as network byte order, this is ## a no-op; otherwise, it performs a 4-byte swap operation. - result = sockets2.ntohl(x) + result = rawsockets.ntohl(x) proc htons*(x: int16): int16 = ## Converts 16-bit positive integers from host to network byte order. ## On machines where the host byte order is the same as network byte ## order, this is a no-op; otherwise, it performs a 2-byte swap operation. - result = sockets2.ntohs(x) + result = rawsockets.ntohs(x) proc getServByName*(name, proto: string): TServent {.tags: [FReadIO].} = ## Searches the database from the beginning and finds the first entry for @@ -256,7 +256,7 @@ proc getHostByAddr*(ip: string): Thostent {.tags: [FReadIO].} = when defined(windows): var s = winlean.gethostbyaddr(addr(myaddr), sizeof(myaddr).cuint, - cint(sockets2.AF_INET)) + cint(rawsockets.AF_INET)) if s == nil: osError(osLastError()) else: var s = posix.gethostbyaddr(addr(myaddr), sizeof(myaddr).TSocklen, @@ -312,7 +312,7 @@ proc getSockName*(socket: TSocketHandle): TPort = if getsockname(socket, cast[ptr TSockAddr](addr(name)), addr(namelen)) == -1'i32: osError(osLastError()) - result = TPort(sockets2.ntohs(name.sin_port)) + result = TPort(rawsockets.ntohs(name.sin_port)) proc getSockOptInt*(socket: TSocketHandle, level, optname: int): int {. tags: [FReadIO].} = |