diff options
author | Dominik Picheta <dominikpicheta@googlemail.com> | 2014-03-11 21:53:35 +0000 |
---|---|---|
committer | Dominik Picheta <dominikpicheta@googlemail.com> | 2014-03-11 21:53:35 +0000 |
commit | 2ce07042fdbe7104694245b56006204ade9df34a (patch) | |
tree | 3e4085cdf114885f25608aea604975f88e9a9f6d /lib/pure | |
parent | 9b5357da5aea54bf9036b98d48ef52da16f7e1a7 (diff) | |
download | Nim-2ce07042fdbe7104694245b56006204ade9df34a.tar.gz |
tasyncawait now works on Linux.
Reworked detection of a file descriptor being closed with epoll (in the case of sockets it is when the remote host disconnects). Ensured that events are only updated when they change.
Diffstat (limited to 'lib/pure')
-rw-r--r-- | lib/pure/asyncio2.nim | 40 | ||||
-rw-r--r-- | lib/pure/selectors.nim | 65 |
2 files changed, 68 insertions, 37 deletions
diff --git a/lib/pure/asyncio2.nim b/lib/pure/asyncio2.nim index 60d489dda..cdb2b3a89 100644 --- a/lib/pure/asyncio2.nim +++ b/lib/pure/asyncio2.nim @@ -479,9 +479,11 @@ else: discard p.selector.update(sock, events) proc addRead(p: PDispatcher, sock: TSocketHandle, cb: TCallback) = + #echo("addRead") if sock notin p.selector: var data = PData(sock: sock, readCBs: @[cb], writeCBs: @[]) p.selector.register(sock, {EvRead}, data.PObject) + #echo("registered") else: p.selector[sock].data.PData.readCBs.add(cb) p.update(sock, p.selector[sock].events + {EvRead}) @@ -498,7 +500,7 @@ else: for info in p.selector.select(timeout): let data = PData(info.key.data) assert data.sock == info.key.fd - + #echo("In poll ", data.sock.cint) if EvRead in info.events: # Callback may add items to ``data.readCBs`` which causes issues if # we are iterating over ``data.readCBs`` at the same time. We therefore @@ -517,11 +519,17 @@ else: if not cb(data.sock): # Callback wants to be called again. data.writeCBs.add(cb) - - var newEvents: set[TEvent] - if data.readCBs.len != 0: newEvents = {EvRead} - if data.writeCBs.len != 0: newEvents = newEvents + {EvWrite} - p.update(data.sock, newEvents) + + if info.key in p.selector: + var newEvents: set[TEvent] + if data.readCBs.len != 0: newEvents = {EvRead} + if data.writeCBs.len != 0: newEvents = newEvents + {EvWrite} + if newEvents != info.key.events: + echo(info.key.events, " -> ", newEvents) + p.update(data.sock, newEvents) + else: + # FD no longer a part of the selector. Likely been closed + # (e.g. socket disconnected). proc connect*(p: PDispatcher, socket: TSocketHandle, address: string, port: TPort, af = AF_INET): PFuture[int] = @@ -569,6 +577,7 @@ else: result = true let netSize = size - sizeRead let res = recv(sock, addr readBuffer[sizeRead], netSize, flags.cint) + #echo("recv cb res: ", res) if res < 0: let lastError = osLastError() if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: @@ -576,6 +585,7 @@ else: else: result = false # We still want this callback to be called. elif res == 0: + #echo("Disconnected recv: ", sizeRead) # Disconnected if sizeRead == 0: retFuture.complete("") @@ -588,6 +598,7 @@ else: result = false # We want to read all the data requested. else: retFuture.complete(readBuffer) + #echo("Recv cb result: ", result) addRead(p, socket, cb) return retFuture @@ -833,9 +844,13 @@ proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.} result = "" var c = "" while true: + #echo("1") c = await p.recv(socket, 1) + #echo("Received ", c.len) if c.len == 0: + #echo("returning") return + #echo("2") if c == "\r": c = await p.recv(socket, 1, MSG_PEEK) if c.len > 0 and c == "\L": @@ -845,7 +860,9 @@ proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.} elif c == "\L": addNLIfEmpty() return + #echo("3") add(result.string, c) + #echo("4") when isMainModule: @@ -859,6 +876,7 @@ when isMainModule: proc main(p: PDispatcher): PFuture[int] {.async.} = discard await p.connect(sock, "irc.freenode.net", TPort(6667)) while true: + echo("recvLine") var line = await p.recvLine(sock) echo("Line is: ", line.repr) if line == "": @@ -882,7 +900,7 @@ when isMainModule: else: when true: - var f = p.connect(sock, "irc.freenode.org", TPort(6667)) + var f = p.connect(sock, "irc.poop.nl", TPort(6667)) f.callback = proc (future: PFuture[int]) = echo("Connected in future!") @@ -898,11 +916,13 @@ when isMainModule: sock.bindAddr(TPort(6667)) sock.listen() proc onAccept(future: PFuture[TSocketHandle]) = - echo "Accepted" - var t = p.send(future.read, "test\c\L") + let client = future.read + echo "Accepted ", client.cint + var t = p.send(client, "test\c\L") t.callback = proc (future: PFuture[int]) = - echo(future.read) + echo("Send: ", future.read) + client.close() var f = p.accept(sock) f.callback = onAccept diff --git a/lib/pure/selectors.nim b/lib/pure/selectors.nim index e086ee3ab..a113e3362 100644 --- a/lib/pure/selectors.nim +++ b/lib/pure/selectors.nim @@ -10,7 +10,6 @@ # TODO: Docs. import tables, os, unsigned, hashes -import sockets2 when defined(linux): import posix, epoll elif defined(windows): import winlean @@ -41,17 +40,14 @@ when defined(linux) or defined(nimdoc): result.events = EPOLLIN if EvWrite in events: result.events = result.events or EPOLLOUT + result.events = result.events or EPOLLRDHUP result.data.fd = fd.cint proc register*(s: PSelector, fd: TSocketHandle, events: set[TEvent], data: PObject): PSelectorKey {.discardable.} = ## Registers file descriptor ``fd`` to selector ``s`` with a set of TEvent ## ``events``. - if s.fds.hasKey(fd): - raise newException(EInvalidValue, "File descriptor already exists.") - var event = createEventStruct(events, fd) - if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fd, addr(event)) != 0: OSError(OSLastError()) @@ -63,30 +59,18 @@ when defined(linux) or defined(nimdoc): proc update*(s: PSelector, fd: TSocketHandle, events: set[TEvent]): PSelectorKey {.discardable.} = ## Updates the events which ``fd`` wants notifications for. - if not s.fds.hasKey(fd): - raise newException(EInvalidValue, "File descriptor not found.") - var event = createEventStruct(events, fd) - - s.fds[fd].events = events - if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0: - if OSLastError().cint == ENOENT: - # Socket has been closed. Epoll automatically removes disconnected - # sockets. - s.fds.del(fd) - osError("Socket has been disconnected") - - OSError(OSLastError()) - result = s.fds[fd] + if s.fds[fd].events != events: + echo("Update ", fd.cint, " to ", events) + var event = createEventStruct(events, fd) + + s.fds[fd].events = events + if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0: + OSError(OSLastError()) + result = s.fds[fd] proc unregister*(s: PSelector, fd: TSocketHandle): PSelectorKey {.discardable.} = - if not s.fds.hasKey(fd): - raise newException(EInvalidValue, "File descriptor not found.") if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fd, nil) != 0: - if osLastError().cint == ENOENT: - # Socket has been closed. Epoll automatically removes disconnected - # sockets so its already been removed. - else: - OSError(OSLastError()) + OSError(OSLastError()) result = s.fds[fd] s.fds.del(fd) @@ -113,6 +97,14 @@ when defined(linux) or defined(nimdoc): assert selectorKey != nil result.add((selectorKey, evSet)) + if (s.events[i].events and EPOLLHUP) != 0 or + (s.events[i].events and EPOLLRDHUP) != 0: + # fd closed + #echo("fd closed ", s.events[i].data.fd) + s.unregister(s.events[i].data.fd.TSocketHandle) + + #echo("Epoll: ", result[i].key.fd, " ", result[i].events, " ", result[i].key.events) + proc newSelector*(): PSelector = new result result.epollFD = epoll_create(64) @@ -123,7 +115,26 @@ when defined(linux) or defined(nimdoc): proc contains*(s: PSelector, fd: TSocketHandle): bool = ## Determines whether selector contains a file descriptor. - return s.fds.hasKey(fd) + if s.fds.hasKey(fd): + result = true + + # Ensure the underlying epoll instance still contains this fd. + var event = createEventStruct(s.fds[fd].events, fd) + if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0: + let err = osLastError() + if err.cint in {ENOENT, EBADF}: + return false + OSError(OSLastError()) + else: + return false + + proc contains*(s: PSelector, key: PSelectorKey): bool = + ## Determines whether selector contains this selector key. More accurate + ## than checking if the file descriptor is in the selector because it + ## ensures that the keys are equal. File descriptors may not always be + ## unique especially when an fd is closed and then a new one is opened, + ## the new one may have the same value. + return key.fd in s and s.fds[key.fd] == key proc `[]`*(s: PSelector, fd: TSocketHandle): PSelectorKey = ## Retrieves the selector key for ``fd``. |