diff options
author | Dominik Picheta <dominikpicheta@googlemail.com> | 2014-03-11 21:53:35 +0000 |
---|---|---|
committer | Dominik Picheta <dominikpicheta@googlemail.com> | 2014-03-11 21:53:35 +0000 |
commit | 2ce07042fdbe7104694245b56006204ade9df34a (patch) | |
tree | 3e4085cdf114885f25608aea604975f88e9a9f6d | |
parent | 9b5357da5aea54bf9036b98d48ef52da16f7e1a7 (diff) | |
download | Nim-2ce07042fdbe7104694245b56006204ade9df34a.tar.gz |
tasyncawait now works on Linux.
Reworked detection of a file descriptor being closed with epoll (in the case of sockets it is when the remote host disconnects). Ensured that events are only updated when they change.
-rw-r--r-- | lib/pure/asyncio2.nim | 40 | ||||
-rw-r--r-- | lib/pure/selectors.nim | 65 | ||||
-rw-r--r-- | tests/async/tasyncawait.nim | 9 |
3 files changed, 69 insertions, 45 deletions
diff --git a/lib/pure/asyncio2.nim b/lib/pure/asyncio2.nim index 60d489dda..cdb2b3a89 100644 --- a/lib/pure/asyncio2.nim +++ b/lib/pure/asyncio2.nim @@ -479,9 +479,11 @@ else: discard p.selector.update(sock, events) proc addRead(p: PDispatcher, sock: TSocketHandle, cb: TCallback) = + #echo("addRead") if sock notin p.selector: var data = PData(sock: sock, readCBs: @[cb], writeCBs: @[]) p.selector.register(sock, {EvRead}, data.PObject) + #echo("registered") else: p.selector[sock].data.PData.readCBs.add(cb) p.update(sock, p.selector[sock].events + {EvRead}) @@ -498,7 +500,7 @@ else: for info in p.selector.select(timeout): let data = PData(info.key.data) assert data.sock == info.key.fd - + #echo("In poll ", data.sock.cint) if EvRead in info.events: # Callback may add items to ``data.readCBs`` which causes issues if # we are iterating over ``data.readCBs`` at the same time. We therefore @@ -517,11 +519,17 @@ else: if not cb(data.sock): # Callback wants to be called again. data.writeCBs.add(cb) - - var newEvents: set[TEvent] - if data.readCBs.len != 0: newEvents = {EvRead} - if data.writeCBs.len != 0: newEvents = newEvents + {EvWrite} - p.update(data.sock, newEvents) + + if info.key in p.selector: + var newEvents: set[TEvent] + if data.readCBs.len != 0: newEvents = {EvRead} + if data.writeCBs.len != 0: newEvents = newEvents + {EvWrite} + if newEvents != info.key.events: + echo(info.key.events, " -> ", newEvents) + p.update(data.sock, newEvents) + else: + # FD no longer a part of the selector. Likely been closed + # (e.g. socket disconnected). proc connect*(p: PDispatcher, socket: TSocketHandle, address: string, port: TPort, af = AF_INET): PFuture[int] = @@ -569,6 +577,7 @@ else: result = true let netSize = size - sizeRead let res = recv(sock, addr readBuffer[sizeRead], netSize, flags.cint) + #echo("recv cb res: ", res) if res < 0: let lastError = osLastError() if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: @@ -576,6 +585,7 @@ else: else: result = false # We still want this callback to be called. elif res == 0: + #echo("Disconnected recv: ", sizeRead) # Disconnected if sizeRead == 0: retFuture.complete("") @@ -588,6 +598,7 @@ else: result = false # We want to read all the data requested. else: retFuture.complete(readBuffer) + #echo("Recv cb result: ", result) addRead(p, socket, cb) return retFuture @@ -833,9 +844,13 @@ proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.} result = "" var c = "" while true: + #echo("1") c = await p.recv(socket, 1) + #echo("Received ", c.len) if c.len == 0: + #echo("returning") return + #echo("2") if c == "\r": c = await p.recv(socket, 1, MSG_PEEK) if c.len > 0 and c == "\L": @@ -845,7 +860,9 @@ proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.} elif c == "\L": addNLIfEmpty() return + #echo("3") add(result.string, c) + #echo("4") when isMainModule: @@ -859,6 +876,7 @@ when isMainModule: proc main(p: PDispatcher): PFuture[int] {.async.} = discard await p.connect(sock, "irc.freenode.net", TPort(6667)) while true: + echo("recvLine") var line = await p.recvLine(sock) echo("Line is: ", line.repr) if line == "": @@ -882,7 +900,7 @@ when isMainModule: else: when true: - var f = p.connect(sock, "irc.freenode.org", TPort(6667)) + var f = p.connect(sock, "irc.poop.nl", TPort(6667)) f.callback = proc (future: PFuture[int]) = echo("Connected in future!") @@ -898,11 +916,13 @@ when isMainModule: sock.bindAddr(TPort(6667)) sock.listen() proc onAccept(future: PFuture[TSocketHandle]) = - echo "Accepted" - var t = p.send(future.read, "test\c\L") + let client = future.read + echo "Accepted ", client.cint + var t = p.send(client, "test\c\L") t.callback = proc (future: PFuture[int]) = - echo(future.read) + echo("Send: ", future.read) + client.close() var f = p.accept(sock) f.callback = onAccept diff --git a/lib/pure/selectors.nim b/lib/pure/selectors.nim index e086ee3ab..a113e3362 100644 --- a/lib/pure/selectors.nim +++ b/lib/pure/selectors.nim @@ -10,7 +10,6 @@ # TODO: Docs. import tables, os, unsigned, hashes -import sockets2 when defined(linux): import posix, epoll elif defined(windows): import winlean @@ -41,17 +40,14 @@ when defined(linux) or defined(nimdoc): result.events = EPOLLIN if EvWrite in events: result.events = result.events or EPOLLOUT + result.events = result.events or EPOLLRDHUP result.data.fd = fd.cint proc register*(s: PSelector, fd: TSocketHandle, events: set[TEvent], data: PObject): PSelectorKey {.discardable.} = ## Registers file descriptor ``fd`` to selector ``s`` with a set of TEvent ## ``events``. - if s.fds.hasKey(fd): - raise newException(EInvalidValue, "File descriptor already exists.") - var event = createEventStruct(events, fd) - if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fd, addr(event)) != 0: OSError(OSLastError()) @@ -63,30 +59,18 @@ when defined(linux) or defined(nimdoc): proc update*(s: PSelector, fd: TSocketHandle, events: set[TEvent]): PSelectorKey {.discardable.} = ## Updates the events which ``fd`` wants notifications for. - if not s.fds.hasKey(fd): - raise newException(EInvalidValue, "File descriptor not found.") - var event = createEventStruct(events, fd) - - s.fds[fd].events = events - if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0: - if OSLastError().cint == ENOENT: - # Socket has been closed. Epoll automatically removes disconnected - # sockets. - s.fds.del(fd) - osError("Socket has been disconnected") - - OSError(OSLastError()) - result = s.fds[fd] + if s.fds[fd].events != events: + echo("Update ", fd.cint, " to ", events) + var event = createEventStruct(events, fd) + + s.fds[fd].events = events + if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0: + OSError(OSLastError()) + result = s.fds[fd] proc unregister*(s: PSelector, fd: TSocketHandle): PSelectorKey {.discardable.} = - if not s.fds.hasKey(fd): - raise newException(EInvalidValue, "File descriptor not found.") if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fd, nil) != 0: - if osLastError().cint == ENOENT: - # Socket has been closed. Epoll automatically removes disconnected - # sockets so its already been removed. - else: - OSError(OSLastError()) + OSError(OSLastError()) result = s.fds[fd] s.fds.del(fd) @@ -113,6 +97,14 @@ when defined(linux) or defined(nimdoc): assert selectorKey != nil result.add((selectorKey, evSet)) + if (s.events[i].events and EPOLLHUP) != 0 or + (s.events[i].events and EPOLLRDHUP) != 0: + # fd closed + #echo("fd closed ", s.events[i].data.fd) + s.unregister(s.events[i].data.fd.TSocketHandle) + + #echo("Epoll: ", result[i].key.fd, " ", result[i].events, " ", result[i].key.events) + proc newSelector*(): PSelector = new result result.epollFD = epoll_create(64) @@ -123,7 +115,26 @@ when defined(linux) or defined(nimdoc): proc contains*(s: PSelector, fd: TSocketHandle): bool = ## Determines whether selector contains a file descriptor. - return s.fds.hasKey(fd) + if s.fds.hasKey(fd): + result = true + + # Ensure the underlying epoll instance still contains this fd. + var event = createEventStruct(s.fds[fd].events, fd) + if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0: + let err = osLastError() + if err.cint in {ENOENT, EBADF}: + return false + OSError(OSLastError()) + else: + return false + + proc contains*(s: PSelector, key: PSelectorKey): bool = + ## Determines whether selector contains this selector key. More accurate + ## than checking if the file descriptor is in the selector because it + ## ensures that the keys are equal. File descriptors may not always be + ## unique especially when an fd is closed and then a new one is opened, + ## the new one may have the same value. + return key.fd in s and s.fds[key.fd] == key proc `[]`*(s: PSelector, fd: TSocketHandle): PSelectorKey = ## Retrieves the selector key for ``fd``. diff --git a/tests/async/tasyncawait.nim b/tests/async/tasyncawait.nim index 9e5d270c3..458efea56 100644 --- a/tests/async/tasyncawait.nim +++ b/tests/async/tasyncawait.nim @@ -15,24 +15,17 @@ const var clientCount = 0 proc sendMessages(disp: PDispatcher, client: TSocketHandle): PFuture[int] {.async.} = - echo("entering sendMessages") for i in 0 .. <messagesToSend: discard await disp.send(client, "Message " & $i & "\c\L") - echo("returning sendMessages") proc launchSwarm(disp: PDispatcher, port: TPort): PFuture[int] {.async.} = for i in 0 .. <swarmSize: var sock = socket() - # TODO: We may need to explicitly register and unregister the fd. - # This is because when the socket is closed, selectors is not aware - # that it has been closed. While epoll is. Perhaps we should just unregister - # in close()? - echo(sock.cint) + #disp.register(sock) discard await disp.connect(sock, "localhost", port) when true: discard await sendMessages(disp, sock) - echo("Calling close") sock.close() else: # Issue #932: https://github.com/Araq/Nimrod/issues/932 |