diff options
-rw-r--r-- | lib/pure/asyncio2.nim | 40 | ||||
-rw-r--r-- | lib/windows/winlean.nim | 1 | ||||
-rw-r--r-- | tests/async/tasyncawait.nim | 4 |
3 files changed, 34 insertions, 11 deletions
diff --git a/lib/pure/asyncio2.nim b/lib/pure/asyncio2.nim index cdb2b3a89..c15415130 100644 --- a/lib/pure/asyncio2.nim +++ b/lib/pure/asyncio2.nim @@ -94,7 +94,8 @@ proc failed*[T](future: PFuture[T]): bool = # TODO: Get rid of register. Do it implicitly. when defined(windows) or defined(nimdoc): - import winlean + import winlean, sets, hashes + #from hashes import THash type TCompletionKey = dword @@ -105,7 +106,7 @@ when defined(windows) or defined(nimdoc): PDispatcher* = ref object ioPort: THandle - hasHandles: bool + handles: TSet[TSocketHandle] TCustomOverlapped = object Internal*: DWORD @@ -117,21 +118,31 @@ when defined(windows) or defined(nimdoc): PCustomOverlapped = ptr TCustomOverlapped + proc hash(x: TSocketHandle): THash {.borrow.} + proc newDispatcher*(): PDispatcher = ## Creates a new Dispatcher instance. new result result.ioPort = CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1) + result.handles = initSet[TSocketHandle]() proc register*(p: PDispatcher, sock: TSocketHandle) = ## Registers ``sock`` with the dispatcher ``p``. if CreateIOCompletionPort(sock.THandle, p.ioPort, cast[TCompletionKey](sock), 1) == 0: OSError(OSLastError()) - p.hasHandles = true + p.handles.incl(sock) + + proc verifyPresence(p: PDispatcher, sock: TSocketHandle) = + ## Ensures that socket has been registered with the dispatcher. + if sock notin p.handles: + raise newException(EInvalidValue, + "Operation performed on a socket which has not been registered with" & + " the dispatcher yet.") proc poll*(p: PDispatcher, timeout = 500) = ## Waits for completion events and processes them. - if not p.hasHandles: + if p.handles.len == 0: raise newException(EInvalidValue, "No handles registered in dispatcher.") let llTimeout = @@ -237,7 +248,7 @@ when defined(windows) or defined(nimdoc): ## ## Returns a ``PFuture`` which will complete when the connection succeeds ## or an error occurs. - + verifyPresence(p, socket) var retFuture = newFuture[int]()# TODO: Change to void when that regression is fixed. # Apparently ``ConnectEx`` expects the socket to be initially bound: var saddr: Tsockaddr_in @@ -298,7 +309,7 @@ when defined(windows) or defined(nimdoc): ## recv operation then the future may complete with only a part of the ## requested data read. If socket is disconnected and no data is available ## to be read then the future will complete with a value of ``""``. - + verifyPresence(p, socket) var retFuture = newFuture[string]() var dataBuf: TWSABuf @@ -354,6 +365,7 @@ when defined(windows) or defined(nimdoc): proc send*(p: PDispatcher, socket: TSocketHandle, data: string): PFuture[int] = ## Sends ``data`` to ``socket``. The returned future will complete once all ## data has been sent. + verifyPresence(p, socket) var retFuture = newFuture[int]() var dataBuf: TWSABuf @@ -390,7 +402,9 @@ when defined(windows) or defined(nimdoc): ## Accepts a new connection. Returns a future containing the client socket ## corresponding to that connection and the remote address of the client. ## The future will complete when the connection is successfully accepted. - + ## + ## The resulting client socket is automatically registered to dispatcher. + verifyPresence(p, socket) var retFuture = newFuture[tuple[address: string, client: TSocketHandle]]() var clientSock = socket() @@ -416,6 +430,7 @@ when defined(windows) or defined(nimdoc): dwLocalAddressLength, dwRemoteAddressLength, addr LocalSockaddr, addr localLen, addr RemoteSockaddr, addr remoteLen) + p.register(clientSock) # TODO: IPv6. Check ``sa_family``. http://stackoverflow.com/a/9212542/492186 retFuture.complete( (address: $inet_ntoa(cast[ptr Tsockaddr_in](remoteSockAddr).sin_addr), @@ -452,6 +467,13 @@ when defined(windows) or defined(nimdoc): return retFuture + proc socket*(disp: PDispatcher, domain: TDomain = AF_INET, + typ: TType = SOCK_STREAM, + protocol: TProtocol = IPPROTO_TCP): TSocketHandle = + ## Creates a new socket and registers it with the dispatcher implicitly. + result = socket() + disp.register(result) + initAll() else: import selectors @@ -871,7 +893,7 @@ when isMainModule: sock.setBlocking false - when true: + when false: # Await tests proc main(p: PDispatcher): PFuture[int] {.async.} = discard await p.connect(sock, "irc.freenode.net", TPort(6667)) @@ -898,7 +920,7 @@ when isMainModule: else: - when true: + when false: var f = p.connect(sock, "irc.poop.nl", TPort(6667)) f.callback = diff --git a/lib/windows/winlean.nim b/lib/windows/winlean.nim index 74ef9c9ec..4d87cf4b2 100644 --- a/lib/windows/winlean.nim +++ b/lib/windows/winlean.nim @@ -456,6 +456,7 @@ var SO_DONTLINGER* {.importc, header: "Winsock2.h".}: cint SO_EXCLUSIVEADDRUSE* {.importc, header: "Winsock2.h".}: cint # disallow local address reuse + SO_ERROR* {.importc, header: "Winsock2.h".}: cint proc `==`*(x, y: TSocketHandle): bool {.borrow.} diff --git a/tests/async/tasyncawait.nim b/tests/async/tasyncawait.nim index 458efea56..fea0783a0 100644 --- a/tests/async/tasyncawait.nim +++ b/tests/async/tasyncawait.nim @@ -20,7 +20,7 @@ proc sendMessages(disp: PDispatcher, client: TSocketHandle): PFuture[int] {.asyn proc launchSwarm(disp: PDispatcher, port: TPort): PFuture[int] {.async.} = for i in 0 .. <swarmSize: - var sock = socket() + var sock = disp.socket() #disp.register(sock) discard await disp.connect(sock, "localhost", port) @@ -48,7 +48,7 @@ proc readMessages(disp: PDispatcher, client: TSocketHandle): PFuture[int] {.asyn doAssert false proc createServer(disp: PDispatcher, port: TPort): PFuture[int] {.async.} = - var server = socket() + var server = disp.socket() #disp.register(server) server.bindAddr(port) server.listen() |