diff options
author | Dominik Picheta <dominikpicheta@googlemail.com> | 2014-03-09 13:49:38 +0000 |
---|---|---|
committer | Dominik Picheta <dominikpicheta@googlemail.com> | 2014-03-09 13:49:38 +0000 |
commit | a0975269564f2474953d2b12aaaab4ba71cec664 (patch) | |
tree | aa2e55c68a8a53633ac99d15cd96321b37422ef3 | |
parent | 7704cdc90ec187ef22f259f0e0f89ceeb5c13431 (diff) | |
download | Nim-a0975269564f2474953d2b12aaaab4ba71cec664.tar.gz |
Fixes to asyncio2 on Linux.
-rw-r--r-- | lib/posix/epoll.nim | 2 | ||||
-rw-r--r-- | lib/pure/asyncio2.nim | 28 | ||||
-rw-r--r-- | lib/pure/selectors.nim | 26 | ||||
-rw-r--r-- | lib/pure/sockets2.nim | 22 | ||||
-rw-r--r-- | tests/async/tasyncawait.nim | 10 |
5 files changed, 64 insertions, 24 deletions
diff --git a/lib/posix/epoll.nim b/lib/posix/epoll.nim index 366521551..57a2f001f 100644 --- a/lib/posix/epoll.nim +++ b/lib/posix/epoll.nim @@ -36,7 +36,7 @@ type epoll_data* {.importc: "union epoll_data", header: "<sys/epoll.h>", pure, final.} = object # TODO: This is actually a union. #thePtr* {.importc: "ptr".}: pointer - fd*: cint # \ + fd* {.importc: "fd".}: cint # \ #u32*: uint32 #u64*: uint64 diff --git a/lib/pure/asyncio2.nim b/lib/pure/asyncio2.nim index 12d4cb5a3..60d489dda 100644 --- a/lib/pure/asyncio2.nim +++ b/lib/pure/asyncio2.nim @@ -473,7 +473,6 @@ else: proc update(p: PDispatcher, sock: TSocketHandle, events: set[TEvent]) = assert sock in p.selector - echo("Update: ", events) if events == {}: discard p.selector.unregister(sock) else: @@ -499,23 +498,25 @@ else: for info in p.selector.select(timeout): let data = PData(info.key.data) assert data.sock == info.key.fd - echo("R: ", data.readCBs.len, " W: ", data.writeCBs.len, ". ", info.events) if EvRead in info.events: - var newReadCBs: seq[TCallback] = @[] - for cb in data.readCBs: + # Callback may add items to ``data.readCBs`` which causes issues if + # we are iterating over ``data.readCBs`` at the same time. We therefore + # make a copy to iterate over. + let currentCBs = data.readCBs + data.readCBs = @[] + for cb in currentCBs: if not cb(data.sock): # Callback wants to be called again. - newReadCBs.add(cb) - data.readCBs = newReadCBs + data.readCBs.add(cb) if EvWrite in info.events: - var newWriteCBs: seq[TCallback] = @[] - for cb in data.writeCBs: + let currentCBs = data.writeCBs + data.writeCBs = @[] + for cb in currentCBs: if not cb(data.sock): # Callback wants to be called again. - newWriteCBs.add(cb) - data.writeCBs = newWriteCBs + data.writeCBs.add(cb) var newEvents: set[TEvent] if data.readCBs.len != 0: newEvents = {EvRead} @@ -615,7 +616,6 @@ else: retFuture.complete(0) addWrite(p, socket, cb) return retFuture - proc acceptAddr*(p: PDispatcher, socket: TSocketHandle): PFuture[tuple[address: string, client: TSocketHandle]] = @@ -854,7 +854,7 @@ when isMainModule: sock.setBlocking false - when false: + when true: # Await tests proc main(p: PDispatcher): PFuture[int] {.async.} = discard await p.connect(sock, "irc.freenode.net", TPort(6667)) @@ -880,7 +880,7 @@ when isMainModule: else: - when false: + when true: var f = p.connect(sock, "irc.freenode.org", TPort(6667)) f.callback = @@ -919,4 +919,4 @@ when isMainModule: - \ No newline at end of file + diff --git a/lib/pure/selectors.nim b/lib/pure/selectors.nim index 6482a01a6..e086ee3ab 100644 --- a/lib/pure/selectors.nim +++ b/lib/pure/selectors.nim @@ -10,11 +10,13 @@ # TODO: Docs. import tables, os, unsigned, hashes +import sockets2 when defined(linux): import posix, epoll elif defined(windows): import winlean proc hash*(x: TSocketHandle): THash {.borrow.} +proc `$`*(x: TSocketHandle): string {.borrow.} type TEvent* = enum @@ -31,7 +33,7 @@ when defined(linux) or defined(nimdoc): type PSelector* = ref object epollFD: cint - events: array[64, ptr epoll_event] + events: array[64, epoll_event] fds: TTable[TSocketHandle, PSelectorKey] proc createEventStruct(events: set[TEvent], fd: TSocketHandle): epoll_event = @@ -66,17 +68,25 @@ when defined(linux) or defined(nimdoc): var event = createEventStruct(events, fd) s.fds[fd].events = events - echo("About to update") if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0: + if OSLastError().cint == ENOENT: + # Socket has been closed. Epoll automatically removes disconnected + # sockets. + s.fds.del(fd) + osError("Socket has been disconnected") + OSError(OSLastError()) - echo("finished updating") result = s.fds[fd] proc unregister*(s: PSelector, fd: TSocketHandle): PSelectorKey {.discardable.} = if not s.fds.hasKey(fd): raise newException(EInvalidValue, "File descriptor not found.") if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fd, nil) != 0: - OSError(OSLastError()) + if osLastError().cint == ENOENT: + # Socket has been closed. Epoll automatically removes disconnected + # sockets so its already been removed. + else: + OSError(OSLastError()) result = s.fds[fd] s.fds.del(fd) @@ -92,21 +102,21 @@ when defined(linux) or defined(nimdoc): ## on the ``fd``. result = @[] - let evNum = epoll_wait(s.epollFD, s.events[0], 64.cint, timeout.cint) + let evNum = epoll_wait(s.epollFD, addr s.events[0], 64.cint, timeout.cint) if evNum < 0: OSError(OSLastError()) if evNum == 0: return @[] for i in 0 .. <evNum: var evSet: set[TEvent] = {} if (s.events[i].events and EPOLLIN) != 0: evSet = evSet + {EvRead} if (s.events[i].events and EPOLLOUT) != 0: evSet = evSet + {EvWrite} - let selectorKey = s.fds[s.events[i].data.fd.TSocketHandle] + assert selectorKey != nil result.add((selectorKey, evSet)) proc newSelector*(): PSelector = new result result.epollFD = epoll_create(64) - result.events = cast[array[64, ptr epoll_event]](alloc0(sizeof(epoll_event)*64)) + result.events = cast[array[64, epoll_event]](alloc0(sizeof(epoll_event)*64)) result.fds = initTable[TSocketHandle, PSelectorKey]() if result.epollFD < 0: OSError(OSLastError()) @@ -247,4 +257,4 @@ when isMainModule: - \ No newline at end of file + diff --git a/lib/pure/sockets2.nim b/lib/pure/sockets2.nim index 3542a0694..290f414b4 100644 --- a/lib/pure/sockets2.nim +++ b/lib/pure/sockets2.nim @@ -24,6 +24,10 @@ else: export TSocketHandle, TSockaddr_in, TAddrinfo, INADDR_ANY, TSockAddr, TSockLen, inet_ntoa, recv, `==`, connect, send, accept +export + SO_ERROR, + SOL_SOCKET + type TPort* = distinct uint16 ## port type @@ -208,6 +212,24 @@ proc htons*(x: int16): int16 = ## order, this is a no-op; otherwise, it performs a 2-byte swap operation. result = sockets2.ntohs(x) +proc getSockOptInt*(socket: TSocketHandle, level, optname: int): int {. + tags: [FReadIO].} = + ## getsockopt for integer options. + var res: cint + var size = sizeof(res).TSocklen + if getsockopt(socket, cint(level), cint(optname), + addr(res), addr(size)) < 0'i32: + osError(osLastError()) + result = int(res) + +proc setSockOptInt*(socket: TSocketHandle, level, optname, optval: int) {. + tags: [FWriteIO].} = + ## setsockopt for integer options. + var value = cint(optval) + if setsockopt(socket, cint(level), cint(optname), addr(value), + sizeof(value).TSocklen) < 0'i32: + osError(osLastError()) + when defined(Windows): var wsa: TWSADATA if WSAStartup(0x0101'i16, addr wsa) != 0: OSError(OSLastError()) diff --git a/tests/async/tasyncawait.nim b/tests/async/tasyncawait.nim index bde5bf8c8..9e5d270c3 100644 --- a/tests/async/tasyncawait.nim +++ b/tests/async/tasyncawait.nim @@ -15,16 +15,24 @@ const var clientCount = 0 proc sendMessages(disp: PDispatcher, client: TSocketHandle): PFuture[int] {.async.} = + echo("entering sendMessages") for i in 0 .. <messagesToSend: - discard await disp.send(client, "Message " & $i & "\c\L") + discard await disp.send(client, "Message " & $i & "\c\L") + echo("returning sendMessages") proc launchSwarm(disp: PDispatcher, port: TPort): PFuture[int] {.async.} = for i in 0 .. <swarmSize: var sock = socket() + # TODO: We may need to explicitly register and unregister the fd. + # This is because when the socket is closed, selectors is not aware + # that it has been closed. While epoll is. Perhaps we should just unregister + # in close()? + echo(sock.cint) #disp.register(sock) discard await disp.connect(sock, "localhost", port) when true: discard await sendMessages(disp, sock) + echo("Calling close") sock.close() else: # Issue #932: https://github.com/Araq/Nimrod/issues/932 |