diff options
Diffstat (limited to 'lib/pure/asyncio2.nim')
-rw-r--r-- | lib/pure/asyncio2.nim | 111 |
1 files changed, 78 insertions, 33 deletions
diff --git a/lib/pure/asyncio2.nim b/lib/pure/asyncio2.nim index 60d489dda..c37370b7b 100644 --- a/lib/pure/asyncio2.nim +++ b/lib/pure/asyncio2.nim @@ -94,7 +94,8 @@ proc failed*[T](future: PFuture[T]): bool = # TODO: Get rid of register. Do it implicitly. when defined(windows) or defined(nimdoc): - import winlean + import winlean, sets, hashes + #from hashes import THash type TCompletionKey = dword @@ -105,7 +106,7 @@ when defined(windows) or defined(nimdoc): PDispatcher* = ref object ioPort: THandle - hasHandles: bool + handles: TSet[TSocketHandle] TCustomOverlapped = object Internal*: DWORD @@ -117,21 +118,32 @@ when defined(windows) or defined(nimdoc): PCustomOverlapped = ptr TCustomOverlapped + proc hash(x: TSocketHandle): THash {.borrow.} + proc newDispatcher*(): PDispatcher = ## Creates a new Dispatcher instance. new result result.ioPort = CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1) + result.handles = initSet[TSocketHandle]() proc register*(p: PDispatcher, sock: TSocketHandle) = ## Registers ``sock`` with the dispatcher ``p``. if CreateIOCompletionPort(sock.THandle, p.ioPort, cast[TCompletionKey](sock), 1) == 0: OSError(OSLastError()) - p.hasHandles = true + p.handles.incl(sock) + # TODO: fd closure detection, we need to remove the fd from handles set + + proc verifyPresence(p: PDispatcher, sock: TSocketHandle) = + ## Ensures that socket has been registered with the dispatcher. + if sock notin p.handles: + raise newException(EInvalidValue, + "Operation performed on a socket which has not been registered with" & + " the dispatcher yet.") proc poll*(p: PDispatcher, timeout = 500) = ## Waits for completion events and processes them. - if not p.hasHandles: + if p.handles.len == 0: raise newException(EInvalidValue, "No handles registered in dispatcher.") let llTimeout = @@ -237,7 +249,7 @@ when defined(windows) or defined(nimdoc): ## ## Returns a ``PFuture`` which will complete when the connection succeeds ## or an error occurs. - + verifyPresence(p, socket) var retFuture = newFuture[int]()# TODO: Change to void when that regression is fixed. # Apparently ``ConnectEx`` expects the socket to be initially bound: var saddr: Tsockaddr_in @@ -298,7 +310,7 @@ when defined(windows) or defined(nimdoc): ## recv operation then the future may complete with only a part of the ## requested data read. If socket is disconnected and no data is available ## to be read then the future will complete with a value of ``""``. - + verifyPresence(p, socket) var retFuture = newFuture[string]() var dataBuf: TWSABuf @@ -354,6 +366,7 @@ when defined(windows) or defined(nimdoc): proc send*(p: PDispatcher, socket: TSocketHandle, data: string): PFuture[int] = ## Sends ``data`` to ``socket``. The returned future will complete once all ## data has been sent. + verifyPresence(p, socket) var retFuture = newFuture[int]() var dataBuf: TWSABuf @@ -390,7 +403,9 @@ when defined(windows) or defined(nimdoc): ## Accepts a new connection. Returns a future containing the client socket ## corresponding to that connection and the remote address of the client. ## The future will complete when the connection is successfully accepted. - + ## + ## The resulting client socket is automatically registered to dispatcher. + verifyPresence(p, socket) var retFuture = newFuture[tuple[address: string, client: TSocketHandle]]() var clientSock = socket() @@ -416,6 +431,7 @@ when defined(windows) or defined(nimdoc): dwLocalAddressLength, dwRemoteAddressLength, addr LocalSockaddr, addr localLen, addr RemoteSockaddr, addr remoteLen) + p.register(clientSock) # TODO: IPv6. Check ``sa_family``. http://stackoverflow.com/a/9212542/492186 retFuture.complete( (address: $inet_ntoa(cast[ptr Tsockaddr_in](remoteSockAddr).sin_addr), @@ -452,6 +468,13 @@ when defined(windows) or defined(nimdoc): return retFuture + proc socket*(disp: PDispatcher, domain: TDomain = AF_INET, + typ: TType = SOCK_STREAM, + protocol: TProtocol = IPPROTO_TCP): TSocketHandle = + ## Creates a new socket and registers it with the dispatcher implicitly. + result = socket(domain, typ, protocol) + disp.register(result) + initAll() else: import selectors @@ -473,32 +496,35 @@ else: proc update(p: PDispatcher, sock: TSocketHandle, events: set[TEvent]) = assert sock in p.selector - if events == {}: - discard p.selector.unregister(sock) - else: - discard p.selector.update(sock, events) + discard p.selector.update(sock, events) + + proc register(p: PDispatcher, sock: TSocketHandle) = + var data = PData(sock: sock, readCBs: @[], writeCBs: @[]) + p.selector.register(sock, {}, data.PObject) + + proc socket*(disp: PDispatcher, domain: TDomain = AF_INET, + typ: TType = SOCK_STREAM, + protocol: TProtocol = IPPROTO_TCP): TSocketHandle = + result = socket(domain, typ, protocol) + disp.register(result) proc addRead(p: PDispatcher, sock: TSocketHandle, cb: TCallback) = if sock notin p.selector: - var data = PData(sock: sock, readCBs: @[cb], writeCBs: @[]) - p.selector.register(sock, {EvRead}, data.PObject) - else: - p.selector[sock].data.PData.readCBs.add(cb) - p.update(sock, p.selector[sock].events + {EvRead}) + raise newException(EInvalidValue, "File descriptor not registered.") + p.selector[sock].data.PData.readCBs.add(cb) + p.update(sock, p.selector[sock].events + {EvRead}) proc addWrite(p: PDispatcher, sock: TSocketHandle, cb: TCallback) = if sock notin p.selector: - var data = PData(sock: sock, readCBs: @[], writeCBs: @[cb]) - p.selector.register(sock, {EvWrite}, data.PObject) - else: - p.selector[sock].data.PData.writeCBs.add(cb) - p.update(sock, p.selector[sock].events + {EvWrite}) + raise newException(EInvalidValue, "File descriptor not registered.") + p.selector[sock].data.PData.writeCBs.add(cb) + p.update(sock, p.selector[sock].events + {EvWrite}) proc poll*(p: PDispatcher, timeout = 500) = for info in p.selector.select(timeout): let data = PData(info.key.data) assert data.sock == info.key.fd - + #echo("In poll ", data.sock.cint) if EvRead in info.events: # Callback may add items to ``data.readCBs`` which causes issues if # we are iterating over ``data.readCBs`` at the same time. We therefore @@ -517,11 +543,17 @@ else: if not cb(data.sock): # Callback wants to be called again. data.writeCBs.add(cb) - - var newEvents: set[TEvent] - if data.readCBs.len != 0: newEvents = {EvRead} - if data.writeCBs.len != 0: newEvents = newEvents + {EvWrite} - p.update(data.sock, newEvents) + + if info.key in p.selector: + var newEvents: set[TEvent] + if data.readCBs.len != 0: newEvents = {EvRead} + if data.writeCBs.len != 0: newEvents = newEvents + {EvWrite} + if newEvents != info.key.events: + echo(info.key.events, " -> ", newEvents) + p.update(data.sock, newEvents) + else: + # FD no longer a part of the selector. Likely been closed + # (e.g. socket disconnected). proc connect*(p: PDispatcher, socket: TSocketHandle, address: string, port: TPort, af = AF_INET): PFuture[int] = @@ -569,6 +601,7 @@ else: result = true let netSize = size - sizeRead let res = recv(sock, addr readBuffer[sizeRead], netSize, flags.cint) + #echo("recv cb res: ", res) if res < 0: let lastError = osLastError() if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: @@ -576,6 +609,7 @@ else: else: result = false # We still want this callback to be called. elif res == 0: + #echo("Disconnected recv: ", sizeRead) # Disconnected if sizeRead == 0: retFuture.complete("") @@ -588,6 +622,7 @@ else: result = false # We want to read all the data requested. else: retFuture.complete(readBuffer) + #echo("Recv cb result: ", result) addRead(p, socket, cb) return retFuture @@ -634,6 +669,7 @@ else: else: retFuture.fail(newException(EOS, osErrorMsg(lastError))) else: + p.register(client) retFuture.complete(($inet_ntoa(sockAddress.sin_addr), client)) addRead(p, socket, cb) return retFuture @@ -833,9 +869,13 @@ proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.} result = "" var c = "" while true: + #echo("1") c = await p.recv(socket, 1) + #echo("Received ", c.len) if c.len == 0: + #echo("returning") return + #echo("2") if c == "\r": c = await p.recv(socket, 1, MSG_PEEK) if c.len > 0 and c == "\L": @@ -845,7 +885,9 @@ proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.} elif c == "\L": addNLIfEmpty() return + #echo("3") add(result.string, c) + #echo("4") when isMainModule: @@ -854,11 +896,12 @@ when isMainModule: sock.setBlocking false - when true: + when false: # Await tests proc main(p: PDispatcher): PFuture[int] {.async.} = discard await p.connect(sock, "irc.freenode.net", TPort(6667)) while true: + echo("recvLine") var line = await p.recvLine(sock) echo("Line is: ", line.repr) if line == "": @@ -880,9 +923,9 @@ when isMainModule: else: - when true: + when false: - var f = p.connect(sock, "irc.freenode.org", TPort(6667)) + var f = p.connect(sock, "irc.poop.nl", TPort(6667)) f.callback = proc (future: PFuture[int]) = echo("Connected in future!") @@ -898,11 +941,13 @@ when isMainModule: sock.bindAddr(TPort(6667)) sock.listen() proc onAccept(future: PFuture[TSocketHandle]) = - echo "Accepted" - var t = p.send(future.read, "test\c\L") + let client = future.read + echo "Accepted ", client.cint + var t = p.send(client, "test\c\L") t.callback = proc (future: PFuture[int]) = - echo(future.read) + echo("Send: ", future.read) + client.close() var f = p.accept(sock) f.callback = onAccept |