From 2ce07042fdbe7104694245b56006204ade9df34a Mon Sep 17 00:00:00 2001 From: Dominik Picheta Date: Tue, 11 Mar 2014 21:53:35 +0000 Subject: tasyncawait now works on Linux. Reworked detection of a file descriptor being closed with epoll (in the case of sockets it is when the remote host disconnects). Ensured that events are only updated when they change. --- lib/pure/asyncio2.nim | 40 +++++++++++++++++++++++-------- lib/pure/selectors.nim | 65 +++++++++++++++++++++++++++++--------------------- 2 files changed, 68 insertions(+), 37 deletions(-) (limited to 'lib/pure') diff --git a/lib/pure/asyncio2.nim b/lib/pure/asyncio2.nim index 60d489dda..cdb2b3a89 100644 --- a/lib/pure/asyncio2.nim +++ b/lib/pure/asyncio2.nim @@ -479,9 +479,11 @@ else: discard p.selector.update(sock, events) proc addRead(p: PDispatcher, sock: TSocketHandle, cb: TCallback) = + #echo("addRead") if sock notin p.selector: var data = PData(sock: sock, readCBs: @[cb], writeCBs: @[]) p.selector.register(sock, {EvRead}, data.PObject) + #echo("registered") else: p.selector[sock].data.PData.readCBs.add(cb) p.update(sock, p.selector[sock].events + {EvRead}) @@ -498,7 +500,7 @@ else: for info in p.selector.select(timeout): let data = PData(info.key.data) assert data.sock == info.key.fd - + #echo("In poll ", data.sock.cint) if EvRead in info.events: # Callback may add items to ``data.readCBs`` which causes issues if # we are iterating over ``data.readCBs`` at the same time. We therefore @@ -517,11 +519,17 @@ else: if not cb(data.sock): # Callback wants to be called again. data.writeCBs.add(cb) - - var newEvents: set[TEvent] - if data.readCBs.len != 0: newEvents = {EvRead} - if data.writeCBs.len != 0: newEvents = newEvents + {EvWrite} - p.update(data.sock, newEvents) + + if info.key in p.selector: + var newEvents: set[TEvent] + if data.readCBs.len != 0: newEvents = {EvRead} + if data.writeCBs.len != 0: newEvents = newEvents + {EvWrite} + if newEvents != info.key.events: + echo(info.key.events, " -> ", newEvents) + p.update(data.sock, newEvents) + else: + # FD no longer a part of the selector. Likely been closed + # (e.g. socket disconnected). proc connect*(p: PDispatcher, socket: TSocketHandle, address: string, port: TPort, af = AF_INET): PFuture[int] = @@ -569,6 +577,7 @@ else: result = true let netSize = size - sizeRead let res = recv(sock, addr readBuffer[sizeRead], netSize, flags.cint) + #echo("recv cb res: ", res) if res < 0: let lastError = osLastError() if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: @@ -576,6 +585,7 @@ else: else: result = false # We still want this callback to be called. elif res == 0: + #echo("Disconnected recv: ", sizeRead) # Disconnected if sizeRead == 0: retFuture.complete("") @@ -588,6 +598,7 @@ else: result = false # We want to read all the data requested. else: retFuture.complete(readBuffer) + #echo("Recv cb result: ", result) addRead(p, socket, cb) return retFuture @@ -833,9 +844,13 @@ proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.} result = "" var c = "" while true: + #echo("1") c = await p.recv(socket, 1) + #echo("Received ", c.len) if c.len == 0: + #echo("returning") return + #echo("2") if c == "\r": c = await p.recv(socket, 1, MSG_PEEK) if c.len > 0 and c == "\L": @@ -845,7 +860,9 @@ proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.} elif c == "\L": addNLIfEmpty() return + #echo("3") add(result.string, c) + #echo("4") when isMainModule: @@ -859,6 +876,7 @@ when isMainModule: proc main(p: PDispatcher): PFuture[int] {.async.} = discard await p.connect(sock, "irc.freenode.net", TPort(6667)) while true: + echo("recvLine") var line = await p.recvLine(sock) echo("Line is: ", line.repr) if line == "": @@ -882,7 +900,7 @@ when isMainModule: else: when true: - var f = p.connect(sock, "irc.freenode.org", TPort(6667)) + var f = p.connect(sock, "irc.poop.nl", TPort(6667)) f.callback = proc (future: PFuture[int]) = echo("Connected in future!") @@ -898,11 +916,13 @@ when isMainModule: sock.bindAddr(TPort(6667)) sock.listen() proc onAccept(future: PFuture[TSocketHandle]) = - echo "Accepted" - var t = p.send(future.read, "test\c\L") + let client = future.read + echo "Accepted ", client.cint + var t = p.send(client, "test\c\L") t.callback = proc (future: PFuture[int]) = - echo(future.read) + echo("Send: ", future.read) + client.close() var f = p.accept(sock) f.callback = onAccept diff --git a/lib/pure/selectors.nim b/lib/pure/selectors.nim index e086ee3ab..a113e3362 100644 --- a/lib/pure/selectors.nim +++ b/lib/pure/selectors.nim @@ -10,7 +10,6 @@ # TODO: Docs. import tables, os, unsigned, hashes -import sockets2 when defined(linux): import posix, epoll elif defined(windows): import winlean @@ -41,17 +40,14 @@ when defined(linux) or defined(nimdoc): result.events = EPOLLIN if EvWrite in events: result.events = result.events or EPOLLOUT + result.events = result.events or EPOLLRDHUP result.data.fd = fd.cint proc register*(s: PSelector, fd: TSocketHandle, events: set[TEvent], data: PObject): PSelectorKey {.discardable.} = ## Registers file descriptor ``fd`` to selector ``s`` with a set of TEvent ## ``events``. - if s.fds.hasKey(fd): - raise newException(EInvalidValue, "File descriptor already exists.") - var event = createEventStruct(events, fd) - if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fd, addr(event)) != 0: OSError(OSLastError()) @@ -63,30 +59,18 @@ when defined(linux) or defined(nimdoc): proc update*(s: PSelector, fd: TSocketHandle, events: set[TEvent]): PSelectorKey {.discardable.} = ## Updates the events which ``fd`` wants notifications for. - if not s.fds.hasKey(fd): - raise newException(EInvalidValue, "File descriptor not found.") - var event = createEventStruct(events, fd) - - s.fds[fd].events = events - if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0: - if OSLastError().cint == ENOENT: - # Socket has been closed. Epoll automatically removes disconnected - # sockets. - s.fds.del(fd) - osError("Socket has been disconnected") - - OSError(OSLastError()) - result = s.fds[fd] + if s.fds[fd].events != events: + echo("Update ", fd.cint, " to ", events) + var event = createEventStruct(events, fd) + + s.fds[fd].events = events + if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0: + OSError(OSLastError()) + result = s.fds[fd] proc unregister*(s: PSelector, fd: TSocketHandle): PSelectorKey {.discardable.} = - if not s.fds.hasKey(fd): - raise newException(EInvalidValue, "File descriptor not found.") if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fd, nil) != 0: - if osLastError().cint == ENOENT: - # Socket has been closed. Epoll automatically removes disconnected - # sockets so its already been removed. - else: - OSError(OSLastError()) + OSError(OSLastError()) result = s.fds[fd] s.fds.del(fd) @@ -113,6 +97,14 @@ when defined(linux) or defined(nimdoc): assert selectorKey != nil result.add((selectorKey, evSet)) + if (s.events[i].events and EPOLLHUP) != 0 or + (s.events[i].events and EPOLLRDHUP) != 0: + # fd closed + #echo("fd closed ", s.events[i].data.fd) + s.unregister(s.events[i].data.fd.TSocketHandle) + + #echo("Epoll: ", result[i].key.fd, " ", result[i].events, " ", result[i].key.events) + proc newSelector*(): PSelector = new result result.epollFD = epoll_create(64) @@ -123,7 +115,26 @@ when defined(linux) or defined(nimdoc): proc contains*(s: PSelector, fd: TSocketHandle): bool = ## Determines whether selector contains a file descriptor. - return s.fds.hasKey(fd) + if s.fds.hasKey(fd): + result = true + + # Ensure the underlying epoll instance still contains this fd. + var event = createEventStruct(s.fds[fd].events, fd) + if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0: + let err = osLastError() + if err.cint in {ENOENT, EBADF}: + return false + OSError(OSLastError()) + else: + return false + + proc contains*(s: PSelector, key: PSelectorKey): bool = + ## Determines whether selector contains this selector key. More accurate + ## than checking if the file descriptor is in the selector because it + ## ensures that the keys are equal. File descriptors may not always be + ## unique especially when an fd is closed and then a new one is opened, + ## the new one may have the same value. + return key.fd in s and s.fds[key.fd] == key proc `[]`*(s: PSelector, fd: TSocketHandle): PSelectorKey = ## Retrieves the selector key for ``fd``. -- cgit 1.4.1-2-gfad0 From 15919b7c988af12708d89222bdebc6063d813dab Mon Sep 17 00:00:00 2001 From: Dominik Picheta Date: Tue, 11 Mar 2014 23:06:22 +0000 Subject: EOS exception now contains the error code. --- lib/pure/os.nim | 11 ++++++----- lib/system.nim | 1 + 2 files changed, 7 insertions(+), 5 deletions(-) (limited to 'lib/pure') diff --git a/lib/pure/os.nim b/lib/pure/os.nim index bfecc569a..faca17e98 100644 --- a/lib/pure/os.nim +++ b/lib/pure/os.nim @@ -260,11 +260,12 @@ proc osError*(errorCode: TOSErrorCode) = ## ## If the error code is ``0`` or an error message could not be retrieved, ## the message ``unknown OS error`` will be used. - let msg = osErrorMsg(errorCode) - if msg == "": - raise newException(EOS, "unknown OS error") - else: - raise newException(EOS, msg) + var e: ref EOS; new(e) + e.errorCode = errorCode.int32 + e.msg = osErrorMsg(errorCode) + if e.msg == "": + e.msg = "unknown OS error" + raise e {.push stackTrace:off.} proc osLastError*(): TOSErrorCode = diff --git a/lib/system.nim b/lib/system.nim index 24ad50f97..41624bb05 100644 --- a/lib/system.nim +++ b/lib/system.nim @@ -260,6 +260,7 @@ type ## system raises. EIO* = object of ESystem ## raised if an IO error occured. EOS* = object of ESystem ## raised if an operating system service failed. + errorCode*: int32 ## OS-defined error code describing this error. EInvalidLibrary* = object of EOS ## raised if a dynamic library ## could not be loaded. EResourceExhausted* = object of ESystem ## raised if a resource request -- cgit 1.4.1-2-gfad0 From d97a397139a6e7fdae6f3ec887ec5d76ef681b2c Mon Sep 17 00:00:00 2001 From: Dominik Picheta Date: Wed, 12 Mar 2014 20:42:36 +0000 Subject: Fixed tasyncawait on Windows. Implicit registration of an fd now only occurs when a new socket is created (in socket() or accept()). This makes the implementation much simpler, changes to the linux version will follow. --- lib/pure/asyncio2.nim | 40 +++++++++++++++++++++++++++++++--------- lib/windows/winlean.nim | 1 + tests/async/tasyncawait.nim | 4 ++-- 3 files changed, 34 insertions(+), 11 deletions(-) (limited to 'lib/pure') diff --git a/lib/pure/asyncio2.nim b/lib/pure/asyncio2.nim index cdb2b3a89..c15415130 100644 --- a/lib/pure/asyncio2.nim +++ b/lib/pure/asyncio2.nim @@ -94,7 +94,8 @@ proc failed*[T](future: PFuture[T]): bool = # TODO: Get rid of register. Do it implicitly. when defined(windows) or defined(nimdoc): - import winlean + import winlean, sets, hashes + #from hashes import THash type TCompletionKey = dword @@ -105,7 +106,7 @@ when defined(windows) or defined(nimdoc): PDispatcher* = ref object ioPort: THandle - hasHandles: bool + handles: TSet[TSocketHandle] TCustomOverlapped = object Internal*: DWORD @@ -117,21 +118,31 @@ when defined(windows) or defined(nimdoc): PCustomOverlapped = ptr TCustomOverlapped + proc hash(x: TSocketHandle): THash {.borrow.} + proc newDispatcher*(): PDispatcher = ## Creates a new Dispatcher instance. new result result.ioPort = CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1) + result.handles = initSet[TSocketHandle]() proc register*(p: PDispatcher, sock: TSocketHandle) = ## Registers ``sock`` with the dispatcher ``p``. if CreateIOCompletionPort(sock.THandle, p.ioPort, cast[TCompletionKey](sock), 1) == 0: OSError(OSLastError()) - p.hasHandles = true + p.handles.incl(sock) + + proc verifyPresence(p: PDispatcher, sock: TSocketHandle) = + ## Ensures that socket has been registered with the dispatcher. + if sock notin p.handles: + raise newException(EInvalidValue, + "Operation performed on a socket which has not been registered with" & + " the dispatcher yet.") proc poll*(p: PDispatcher, timeout = 500) = ## Waits for completion events and processes them. - if not p.hasHandles: + if p.handles.len == 0: raise newException(EInvalidValue, "No handles registered in dispatcher.") let llTimeout = @@ -237,7 +248,7 @@ when defined(windows) or defined(nimdoc): ## ## Returns a ``PFuture`` which will complete when the connection succeeds ## or an error occurs. - + verifyPresence(p, socket) var retFuture = newFuture[int]()# TODO: Change to void when that regression is fixed. # Apparently ``ConnectEx`` expects the socket to be initially bound: var saddr: Tsockaddr_in @@ -298,7 +309,7 @@ when defined(windows) or defined(nimdoc): ## recv operation then the future may complete with only a part of the ## requested data read. If socket is disconnected and no data is available ## to be read then the future will complete with a value of ``""``. - + verifyPresence(p, socket) var retFuture = newFuture[string]() var dataBuf: TWSABuf @@ -354,6 +365,7 @@ when defined(windows) or defined(nimdoc): proc send*(p: PDispatcher, socket: TSocketHandle, data: string): PFuture[int] = ## Sends ``data`` to ``socket``. The returned future will complete once all ## data has been sent. + verifyPresence(p, socket) var retFuture = newFuture[int]() var dataBuf: TWSABuf @@ -390,7 +402,9 @@ when defined(windows) or defined(nimdoc): ## Accepts a new connection. Returns a future containing the client socket ## corresponding to that connection and the remote address of the client. ## The future will complete when the connection is successfully accepted. - + ## + ## The resulting client socket is automatically registered to dispatcher. + verifyPresence(p, socket) var retFuture = newFuture[tuple[address: string, client: TSocketHandle]]() var clientSock = socket() @@ -416,6 +430,7 @@ when defined(windows) or defined(nimdoc): dwLocalAddressLength, dwRemoteAddressLength, addr LocalSockaddr, addr localLen, addr RemoteSockaddr, addr remoteLen) + p.register(clientSock) # TODO: IPv6. Check ``sa_family``. http://stackoverflow.com/a/9212542/492186 retFuture.complete( (address: $inet_ntoa(cast[ptr Tsockaddr_in](remoteSockAddr).sin_addr), @@ -452,6 +467,13 @@ when defined(windows) or defined(nimdoc): return retFuture + proc socket*(disp: PDispatcher, domain: TDomain = AF_INET, + typ: TType = SOCK_STREAM, + protocol: TProtocol = IPPROTO_TCP): TSocketHandle = + ## Creates a new socket and registers it with the dispatcher implicitly. + result = socket() + disp.register(result) + initAll() else: import selectors @@ -871,7 +893,7 @@ when isMainModule: sock.setBlocking false - when true: + when false: # Await tests proc main(p: PDispatcher): PFuture[int] {.async.} = discard await p.connect(sock, "irc.freenode.net", TPort(6667)) @@ -898,7 +920,7 @@ when isMainModule: else: - when true: + when false: var f = p.connect(sock, "irc.poop.nl", TPort(6667)) f.callback = diff --git a/lib/windows/winlean.nim b/lib/windows/winlean.nim index 74ef9c9ec..4d87cf4b2 100644 --- a/lib/windows/winlean.nim +++ b/lib/windows/winlean.nim @@ -456,6 +456,7 @@ var SO_DONTLINGER* {.importc, header: "Winsock2.h".}: cint SO_EXCLUSIVEADDRUSE* {.importc, header: "Winsock2.h".}: cint # disallow local address reuse + SO_ERROR* {.importc, header: "Winsock2.h".}: cint proc `==`*(x, y: TSocketHandle): bool {.borrow.} diff --git a/tests/async/tasyncawait.nim b/tests/async/tasyncawait.nim index 458efea56..fea0783a0 100644 --- a/tests/async/tasyncawait.nim +++ b/tests/async/tasyncawait.nim @@ -20,7 +20,7 @@ proc sendMessages(disp: PDispatcher, client: TSocketHandle): PFuture[int] {.asyn proc launchSwarm(disp: PDispatcher, port: TPort): PFuture[int] {.async.} = for i in 0 .. Date: Wed, 12 Mar 2014 23:19:40 +0000 Subject: Fix compilation on linux. --- lib/pure/asyncio2.nim | 37 ++++++++++++++++++++----------------- 1 file changed, 20 insertions(+), 17 deletions(-) (limited to 'lib/pure') diff --git a/lib/pure/asyncio2.nim b/lib/pure/asyncio2.nim index c15415130..c37370b7b 100644 --- a/lib/pure/asyncio2.nim +++ b/lib/pure/asyncio2.nim @@ -132,6 +132,7 @@ when defined(windows) or defined(nimdoc): cast[TCompletionKey](sock), 1) == 0: OSError(OSLastError()) p.handles.incl(sock) + # TODO: fd closure detection, we need to remove the fd from handles set proc verifyPresence(p: PDispatcher, sock: TSocketHandle) = ## Ensures that socket has been registered with the dispatcher. @@ -471,7 +472,7 @@ when defined(windows) or defined(nimdoc): typ: TType = SOCK_STREAM, protocol: TProtocol = IPPROTO_TCP): TSocketHandle = ## Creates a new socket and registers it with the dispatcher implicitly. - result = socket() + result = socket(domain, typ, protocol) disp.register(result) initAll() @@ -495,28 +496,29 @@ else: proc update(p: PDispatcher, sock: TSocketHandle, events: set[TEvent]) = assert sock in p.selector - if events == {}: - discard p.selector.unregister(sock) - else: - discard p.selector.update(sock, events) + discard p.selector.update(sock, events) + + proc register(p: PDispatcher, sock: TSocketHandle) = + var data = PData(sock: sock, readCBs: @[], writeCBs: @[]) + p.selector.register(sock, {}, data.PObject) + + proc socket*(disp: PDispatcher, domain: TDomain = AF_INET, + typ: TType = SOCK_STREAM, + protocol: TProtocol = IPPROTO_TCP): TSocketHandle = + result = socket(domain, typ, protocol) + disp.register(result) proc addRead(p: PDispatcher, sock: TSocketHandle, cb: TCallback) = - #echo("addRead") if sock notin p.selector: - var data = PData(sock: sock, readCBs: @[cb], writeCBs: @[]) - p.selector.register(sock, {EvRead}, data.PObject) - #echo("registered") - else: - p.selector[sock].data.PData.readCBs.add(cb) - p.update(sock, p.selector[sock].events + {EvRead}) + raise newException(EInvalidValue, "File descriptor not registered.") + p.selector[sock].data.PData.readCBs.add(cb) + p.update(sock, p.selector[sock].events + {EvRead}) proc addWrite(p: PDispatcher, sock: TSocketHandle, cb: TCallback) = if sock notin p.selector: - var data = PData(sock: sock, readCBs: @[], writeCBs: @[cb]) - p.selector.register(sock, {EvWrite}, data.PObject) - else: - p.selector[sock].data.PData.writeCBs.add(cb) - p.update(sock, p.selector[sock].events + {EvWrite}) + raise newException(EInvalidValue, "File descriptor not registered.") + p.selector[sock].data.PData.writeCBs.add(cb) + p.update(sock, p.selector[sock].events + {EvWrite}) proc poll*(p: PDispatcher, timeout = 500) = for info in p.selector.select(timeout): @@ -667,6 +669,7 @@ else: else: retFuture.fail(newException(EOS, osErrorMsg(lastError))) else: + p.register(client) retFuture.complete(($inet_ntoa(sockAddress.sin_addr), client)) addRead(p, socket, cb) return retFuture -- cgit 1.4.1-2-gfad0