diff options
author | Eugene Kabanov <ka@hardcore.kiev.ua> | 2017-01-16 15:01:40 +0200 |
---|---|---|
committer | Andreas Rumpf <rumpf_a@web.de> | 2017-01-16 14:01:40 +0100 |
commit | 108f5e688ed85768e2477b47cc2e7743331ad5cd (patch) | |
tree | 8b481e0f12089c00f3be816c006ae25ea825164d /lib | |
parent | f8736dcfb7dae1d97aa09ee787ce58a340c1099e (diff) | |
download | Nim-108f5e688ed85768e2477b47cc2e7743331ad5cd.tar.gz |
Fix #5128, #5184. (#5214)
* Fix #5128, #5184. Removed flush() procedure from ioselectors.nim Changed methods of work with application-driven data * Make cache switch for kqueue, update test for it. * Fix registerProcess bug returns wrong id. Fix tupcoming_async test to compile with upcoming again. Change socket() as unique identifier to dup(socket) as unique identifier.
Diffstat (limited to 'lib')
-rw-r--r-- | lib/pure/ioselectors.nim | 72 | ||||
-rw-r--r-- | lib/pure/ioselects/ioselectors_epoll.nim | 249 | ||||
-rw-r--r-- | lib/pure/ioselects/ioselectors_kqueue.nim | 419 | ||||
-rw-r--r-- | lib/pure/ioselects/ioselectors_poll.nim | 83 | ||||
-rw-r--r-- | lib/pure/ioselects/ioselectors_select.nim | 103 | ||||
-rw-r--r-- | lib/upcoming/asyncdispatch.nim | 177 |
6 files changed, 650 insertions, 453 deletions
diff --git a/lib/pure/ioselectors.nim b/lib/pure/ioselectors.nim index 5745d0b72..d61e9c3c7 100644 --- a/lib/pure/ioselectors.nim +++ b/lib/pure/ioselectors.nim @@ -40,7 +40,6 @@ const ioselSupportedPlatform* = defined(macosx) or defined(freebsd) or const bsdPlatform = defined(macosx) or defined(freebsd) or defined(netbsd) or defined(openbsd) - when defined(nimdoc): type Selector*[T] = ref object @@ -64,11 +63,10 @@ when defined(nimdoc): VnodeRename, ## NOTE_RENAME (BSD specific, file renamed) VnodeRevoke ## NOTE_REVOKE (BSD specific, file revoke occurred) - ReadyKey*[T] = object + ReadyKey* = object ## An object which holds result for descriptor fd* : int ## file/socket descriptor events*: set[Event] ## set of events - data*: T ## application-defined data SelectEvent* = object ## An object which holds user defined event @@ -142,15 +140,8 @@ when defined(nimdoc): proc unregister*[T](s: Selector[T], fd: int|SocketHandle|cint) = ## Unregisters file/socket descriptor ``fd`` from selector ``s``. - proc flush*[T](s: Selector[T]) = - ## Flushes all changes was made to kernel pool/queue. - ## This function is useful only for BSD and MacOS, because - ## kqueue supports bulk changes to be made. - ## On Linux/Windows and other Posix compatible operation systems, - ## ``flush`` is alias for `discard`. - proc selectInto*[T](s: Selector[T], timeout: int, - results: var openarray[ReadyKey[T]]): int = + results: var openarray[ReadyKey]): int = ## Process call waiting for events registered in selector ``s``. ## The ``timeout`` argument specifies the minimum number of milliseconds ## the function will be blocked, if no events are not ready. Specifying a @@ -159,7 +150,7 @@ when defined(nimdoc): ## ## Function returns number of triggered events. - proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey[T]] = + proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] = ## Process call waiting for events registered in selector ``s``. ## The ``timeout`` argument specifies the minimum number of milliseconds ## the function will be blocked, if no events are not ready. Specifying a @@ -167,13 +158,23 @@ when defined(nimdoc): ## ## Function returns sequence of triggered events. + proc getData*[T](s: Selector[T], fd: SocketHandle|int): T = + ## Retrieves application-defined ``data`` associated with descriptor ``fd``. + ## If specified descriptor ``fd`` is not registered, empty/default value + ## will be returned. + + proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: var T): bool = + ## Associate application-defined ``data`` with descriptor ``fd``. + ## + ## Returns ``true``, if data was succesfully updated, ``false`` otherwise. + template isEmpty*[T](s: Selector[T]): bool = ## Returns ``true``, if there no registered events or descriptors ## in selector. - template withData*[T](s: Selector[T], fd: SocketHandle, value, + template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body: untyped) = - ## retrieves the application-data assigned with descriptor ``fd`` + ## Retrieves the application-data assigned with descriptor ``fd`` ## to ``value``. This ``value`` can be modified in the scope of ## the ``withData`` call. ## @@ -184,9 +185,9 @@ when defined(nimdoc): ## value.uid = 1000 ## - template withData*[T](s: Selector[T], fd: SocketHandle, value, + template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1, body2: untyped) = - ## retrieves the application-data assigned with descriptor ``fd`` + ## Retrieves the application-data assigned with descriptor ``fd`` ## to ``value``. This ``value`` can be modified in the scope of ## the ``withData`` call. ## @@ -215,55 +216,68 @@ else: type Event* {.pure.} = enum Read, Write, Timer, Signal, Process, Vnode, User, Error, Oneshot, - VnodeWrite, VnodeDelete, VnodeExtend, VnodeAttrib, VnodeLink, + Finished, VnodeWrite, VnodeDelete, VnodeExtend, VnodeAttrib, VnodeLink, VnodeRename, VnodeRevoke - ReadyKey*[T] = object + type + IOSelectorsException* = object of Exception + + ReadyKey* = object fd* : int events*: set[Event] - data*: T SelectorKey[T] = object ident: int events: set[Event] param: int - key: ReadyKey[T] + data: T + + proc raiseIOSelectorsError[T](message: T) = + var msg = "" + when T is string: + msg.add(message) + elif T is OSErrorCode: + msg.add(osErrorMsg(message) & " (code: " & $int(message) & ")") + else: + msg.add("Internal Error\n") + var err = newException(IOSelectorsException, msg) + raise err when not defined(windows): import posix + proc setNonBlocking(fd: cint) {.inline.} = var x = fcntl(fd, F_GETFL, 0) if x == -1: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) else: var mode = x or O_NONBLOCK if fcntl(fd, F_SETFL, mode) == -1: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) - template setKey(s, pident, pkeyfd, pevents, pparam, pdata) = + template setKey(s, pident, pevents, pparam, pdata: untyped) = var skey = addr(s.fds[pident]) skey.ident = pident skey.events = pevents skey.param = pparam - skey.key.fd = pkeyfd - skey.key.data = pdata + skey.data = data when ioselSupportedPlatform: template blockSignals(newmask: var Sigset, oldmask: var Sigset) = when hasThreadSupport: if posix.pthread_sigmask(SIG_BLOCK, newmask, oldmask) == -1: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) else: if posix.sigprocmask(SIG_BLOCK, newmask, oldmask) == -1: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) template unblockSignals(newmask: var Sigset, oldmask: var Sigset) = when hasThreadSupport: if posix.pthread_sigmask(SIG_UNBLOCK, newmask, oldmask) == -1: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) else: if posix.sigprocmask(SIG_UNBLOCK, newmask, oldmask) == -1: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) when defined(linux): include ioselects/ioselectors_epoll diff --git a/lib/pure/ioselects/ioselectors_epoll.nim b/lib/pure/ioselects/ioselectors_epoll.nim index ceba670fb..cb555f001 100644 --- a/lib/pure/ioselects/ioselectors_epoll.nim +++ b/lib/pure/ioselects/ioselectors_epoll.nim @@ -12,7 +12,7 @@ import posix, times # Maximum number of events that can be returned -const MAX_EPOLL_RESULT_EVENTS = 64 +const MAX_EPOLL_EVENTS = 64 when not defined(android): type @@ -115,7 +115,7 @@ proc newSelector*[T](): Selector[T] = var maxFD = int(a.rlim_max) doAssert(maxFD > 0) - var epollFD = epoll_create(MAX_EPOLL_RESULT_EVENTS) + var epollFD = epoll_create(MAX_EPOLL_EVENTS) if epollFD < 0: raiseOsError(osLastError()) @@ -132,15 +132,21 @@ proc newSelector*[T](): Selector[T] = proc close*[T](s: Selector[T]) = if posix.close(s.epollFD) != 0: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) when hasThreadSupport: deallocSharedArray(s.fds) deallocShared(cast[pointer](s)) +template clearKey[T](key: ptr SelectorKey[T]) = + var empty: T + key.ident = 0 + key.events = {} + key.data = empty + proc newSelectEvent*(): SelectEvent = let fdci = eventfd(0, 0) if fdci == -1: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) setNonBlocking(fdci) result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl))) result.efd = fdci @@ -148,29 +154,30 @@ proc newSelectEvent*(): SelectEvent = proc setEvent*(ev: SelectEvent) = var data : uint64 = 1 if posix.write(ev.efd, addr data, sizeof(uint64)) == -1: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) proc close*(ev: SelectEvent) = - discard posix.close(ev.efd) + if posix.close(ev.efd) == -1: + raiseIOSelectorsError(osLastError()) deallocShared(cast[pointer](ev)) template checkFd(s, f) = if f >= s.maxFD: - raise newException(ValueError, "Maximum file descriptors exceeded") + raiseIOSelectorsError("Maximum file descriptors exceeded") proc registerHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event], data: T) = let fdi = int(fd) s.checkFd(fdi) doAssert(s.fds[fdi].ident == 0) - s.setKey(fdi, fdi, events, 0, data) + s.setKey(fdi, events, 0, data) if events != {}: var epv = epoll_event(events: EPOLLRDHUP) epv.data.u64 = fdi.uint if Event.Read in events: epv.events = epv.events or EPOLLIN if Event.Write in events: epv.events = epv.events or EPOLLOUT if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) inc(s.count) proc updateHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event]) = @@ -190,15 +197,15 @@ proc updateHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event]) = if pkey.events == {}: if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) inc(s.count) else: if events != {}: if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fdi.cint, addr epv) == -1: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) else: if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) dec(s.count) pkey.events = events @@ -213,51 +220,56 @@ proc unregister*[T](s: Selector[T], fd: int|SocketHandle) = if pkey.events * {Event.Read, Event.Write} != {}: var epv = epoll_event() if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) dec(s.count) elif Event.Timer in pkey.events: - var epv = epoll_event() - if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1: - raiseOSError(osLastError()) - discard posix.close(fdi.cint) - dec(s.count) + if Event.Finished notin pkey.events: + var epv = epoll_event() + if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1: + raiseIOSelectorsError(osLastError()) + dec(s.count) + if posix.close(cint(fdi)) == -1: + raiseIOSelectorsError(osLastError()) elif Event.Signal in pkey.events: var epv = epoll_event() if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) var nmask, omask: Sigset discard sigemptyset(nmask) discard sigemptyset(omask) discard sigaddset(nmask, cint(s.fds[fdi].param)) unblockSignals(nmask, omask) - discard posix.close(fdi.cint) dec(s.count) + if posix.close(cint(fdi)) == -1: + raiseIOSelectorsError(osLastError()) elif Event.Process in pkey.events: - var epv = epoll_event() - if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1: - raiseOSError(osLastError()) - var nmask, omask: Sigset - discard sigemptyset(nmask) - discard sigemptyset(omask) - discard sigaddset(nmask, SIGCHLD) - unblockSignals(nmask, omask) - discard posix.close(fdi.cint) - dec(s.count) + if Event.Finished notin pkey.events: + var epv = epoll_event() + if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1: + raiseIOSelectorsError(osLastError()) + var nmask, omask: Sigset + discard sigemptyset(nmask) + discard sigemptyset(omask) + discard sigaddset(nmask, SIGCHLD) + unblockSignals(nmask, omask) + dec(s.count) + if posix.close(cint(fdi)) == -1: + raiseIOSelectorsError(osLastError()) else: if pkey.events * {Event.Read, Event.Write} != {}: var epv = epoll_event() if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) dec(s.count) elif Event.Timer in pkey.events: - var epv = epoll_event() - if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1: - raiseOSError(osLastError()) - discard posix.close(fdi.cint) - dec(s.count) - - pkey.ident = 0 - pkey.events = {} + if Event.Finished notin pkey.events: + var epv = epoll_event() + if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1: + raiseIOSelectorsError(osLastError()) + dec(s.count) + if posix.close(cint(fdi)) == -1: + raiseIOSelectorsError(osLastError()) + clearKey(pkey) proc unregister*[T](s: Selector[T], ev: SelectEvent) = let fdi = int(ev.efd) @@ -265,12 +277,11 @@ proc unregister*[T](s: Selector[T], ev: SelectEvent) = var pkey = addr(s.fds[fdi]) doAssert(pkey.ident != 0) doAssert(Event.User in pkey.events) - pkey.ident = 0 - pkey.events = {} var epv = epoll_event() if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) dec(s.count) + clearKey(pkey) proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool, data: T): int {.discardable.} = @@ -279,7 +290,7 @@ proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool, old_ts: Itimerspec let fdi = timerfd_create(CLOCK_MONOTONIC, 0).int if fdi == -1: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) setNonBlocking(fdi.cint) s.checkFd(fdi) @@ -302,10 +313,10 @@ proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool, new_ts.it_value.tv_nsec = new_ts.it_interval.tv_nsec if timerfd_settime(fdi.cint, cint(0), new_ts, old_ts) == -1: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: - raiseOSError(osLastError()) - s.setKey(fdi, fdi, events, 0, data) + raiseIOSelectorsError(osLastError()) + s.setKey(fdi, events, 0, data) inc(s.count) result = fdi @@ -323,7 +334,7 @@ when not defined(android): let fdi = signalfd(-1, nmask, 0).int if fdi == -1: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) setNonBlocking(fdi.cint) s.checkFd(fdi) @@ -332,8 +343,8 @@ when not defined(android): var epv = epoll_event(events: EPOLLIN or EPOLLRDHUP) epv.data.u64 = fdi.uint if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: - raiseOSError(osLastError()) - s.setKey(fdi, signal, {Event.Signal}, signal, data) + raiseIOSelectorsError(osLastError()) + s.setKey(fdi, {Event.Signal}, signal, data) inc(s.count) result = fdi @@ -350,7 +361,7 @@ when not defined(android): let fdi = signalfd(-1, nmask, 0).int if fdi == -1: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) setNonBlocking(fdi.cint) s.checkFd(fdi) @@ -360,30 +371,26 @@ when not defined(android): epv.data.u64 = fdi.uint epv.events = EPOLLIN or EPOLLRDHUP if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: - raiseOSError(osLastError()) - s.setKey(fdi, pid, {Event.Process, Event.Oneshot}, pid, data) + raiseIOSelectorsError(osLastError()) + s.setKey(fdi, {Event.Process, Event.Oneshot}, pid, data) inc(s.count) result = fdi proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) = let fdi = int(ev.efd) doAssert(s.fds[fdi].ident == 0) - s.setKey(fdi, fdi, {Event.User}, 0, data) + s.setKey(fdi, {Event.User}, 0, data) var epv = epoll_event(events: EPOLLIN or EPOLLRDHUP) epv.data.u64 = ev.efd.uint if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, ev.efd, addr epv) == -1: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) inc(s.count) -proc flush*[T](s: Selector[T]) = - discard - proc selectInto*[T](s: Selector[T], timeout: int, - results: var openarray[ReadyKey[T]]): int = + results: var openarray[ReadyKey]): int = var - resTable: array[MAX_EPOLL_RESULT_EVENTS, epoll_event] - maxres = MAX_EPOLL_RESULT_EVENTS - events: set[Event] = {} + resTable: array[MAX_EPOLL_EVENTS, epoll_event] + maxres = MAX_EPOLL_EVENTS i, k: int if maxres > len(results): @@ -395,7 +402,7 @@ proc selectInto*[T](s: Selector[T], timeout: int, result = 0 let err = osLastError() if cint(err) != EINTR: - raiseOSError(err) + raiseIOSelectorsError(err) elif count == 0: result = 0 else: @@ -404,108 +411,126 @@ proc selectInto*[T](s: Selector[T], timeout: int, while i < count: let fdi = int(resTable[i].data.u64) let pevents = resTable[i].events - var skey = addr(s.fds[fdi]) - doAssert(skey.ident != 0) - events = {} + var pkey = addr(s.fds[fdi]) + doAssert(pkey.ident != 0) + var rkey = ReadyKey(fd: int(fdi), events: {}) if (pevents and EPOLLERR) != 0 or (pevents and EPOLLHUP) != 0: - events.incl(Event.Error) + rkey.events.incl(Event.Error) if (pevents and EPOLLOUT) != 0: - events.incl(Event.Write) + rkey.events.incl(Event.Write) when not defined(android): if (pevents and EPOLLIN) != 0: - if Event.Read in skey.events: - events.incl(Event.Read) - elif Event.Timer in skey.events: + if Event.Read in pkey.events: + rkey.events.incl(Event.Read) + elif Event.Timer in pkey.events: var data: uint64 = 0 - if posix.read(fdi.cint, addr data, sizeof(uint64)) != sizeof(uint64): - raiseOSError(osLastError()) - events = {Event.Timer} - elif Event.Signal in skey.events: + if posix.read(cint(fdi), addr data, + sizeof(uint64)) != sizeof(uint64): + raiseIOSelectorsError(osLastError()) + rkey.events.incl(Event.Timer) + elif Event.Signal in pkey.events: var data = SignalFdInfo() - if posix.read(fdi.cint, addr data, + if posix.read(cint(fdi), addr data, sizeof(SignalFdInfo)) != sizeof(SignalFdInfo): - raiseOsError(osLastError()) - events = {Event.Signal} - elif Event.Process in skey.events: + raiseIOSelectorsError(osLastError()) + rkey.events.incl(Event.Signal) + elif Event.Process in pkey.events: var data = SignalFdInfo() - if posix.read(fdi.cint, addr data, + if posix.read(cint(fdi), addr data, sizeof(SignalFdInfo)) != sizeof(SignalFdInfo): - raiseOsError(osLastError()) - if cast[int](data.ssi_pid) == skey.param: - events = {Event.Process} + raiseIOSelectorsError(osLastError()) + if cast[int](data.ssi_pid) == pkey.param: + rkey.events.incl(Event.Process) else: inc(i) continue - elif Event.User in skey.events: + elif Event.User in pkey.events: var data: uint64 = 0 - if posix.read(fdi.cint, addr data, sizeof(uint64)) != sizeof(uint64): + if posix.read(cint(fdi), addr data, + sizeof(uint64)) != sizeof(uint64): let err = osLastError() if err == OSErrorCode(EAGAIN): inc(i) continue else: - raiseOSError(err) - events = {Event.User} + raiseIOSelectorsError(err) + rkey.events.incl(Event.User) else: if (pevents and EPOLLIN) != 0: - if Event.Read in skey.events: - events.incl(Event.Read) - elif Event.Timer in skey.events: + if Event.Read in pkey.events: + rkey.events.incl(Event.Read) + elif Event.Timer in pkey.events: var data: uint64 = 0 - if posix.read(fdi.cint, addr data, sizeof(uint64)) != sizeof(uint64): - raiseOSError(osLastError()) - events = {Event.Timer} - elif Event.User in skey.events: + if posix.read(cint(fdi), addr data, + sizeof(uint64)) != sizeof(uint64): + raiseIOSelectorsError(osLastError()) + rkey.events.incl(Event.Timer) + elif Event.User in pkey.events: var data: uint64 = 0 - if posix.read(fdi.cint, addr data, sizeof(uint64)) != sizeof(uint64): + if posix.read(cint(fdi), addr data, + sizeof(uint64)) != sizeof(uint64): let err = osLastError() if err == OSErrorCode(EAGAIN): inc(i) continue else: - raiseOSError(err) - events = {Event.User} - - skey.key.events = events - results[k] = skey.key - inc(k) + raiseIOSelectorsError(err) + rkey.events.incl(Event.User) - if Event.Oneshot in skey.events: + if Event.Oneshot in pkey.events: var epv = epoll_event() - if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) == -1: - raiseOSError(osLastError()) - discard posix.close(fdi.cint) - skey.ident = 0 - skey.events = {} + if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, cint(fdi), addr epv) == -1: + raiseIOSelectorsError(osLastError()) + # we will not clear key until it will be unregistered, so + # application can obtain data, but we will decrease counter, + # because epoll is empty. dec(s.count) + # we are marking key with `Finished` event, to avoid double decrease. + pkey.events.incl(Event.Finished) + + results[k] = rkey + inc(k) inc(i) result = k -proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey[T]] = - result = newSeq[ReadyKey[T]](MAX_EPOLL_RESULT_EVENTS) +proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] = + result = newSeq[ReadyKey](MAX_EPOLL_EVENTS) let count = selectInto(s, timeout, result) result.setLen(count) template isEmpty*[T](s: Selector[T]): bool = (s.count == 0) -template withData*[T](s: Selector[T], fd: SocketHandle, value, +proc getData*[T](s: Selector[T], fd: SocketHandle|int): T = + let fdi = int(fd) + s.checkFd(fdi) + if s.fds[fdi].ident != 0: + result = s.fds[fdi].data + +proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool = + let fdi = int(fd) + s.checkFd(fdi) + if s.fds[fdi].ident != 0: + s.fds[fdi].data = data + result = true + +template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body: untyped) = mixin checkFd let fdi = int(fd) s.checkFd(fdi) if s.fds[fdi].ident != 0: - var value = addr(s.fds[fdi].key.data) + var value = addr(s.fds[fdi].data) body -template withData*[T](s: Selector[T], fd: SocketHandle, value, body1, +template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1, body2: untyped) = mixin checkFd let fdi = int(fd) s.checkFd(fdi) if s.fds[fdi].ident != 0: - var value = addr(s.fds[fdi].key.data) + var value = addr(s.fds[fdi].data) body1 else: body2 diff --git a/lib/pure/ioselects/ioselectors_kqueue.nim b/lib/pure/ioselects/ioselectors_kqueue.nim index 3c0cf4e90..55790c200 100644 --- a/lib/pure/ioselects/ioselectors_kqueue.nim +++ b/lib/pure/ioselects/ioselectors_kqueue.nim @@ -12,15 +12,16 @@ import posix, times, kqueue const - # Maximum number of cached changes. - MAX_KQUEUE_CHANGE_EVENTS = 64 # Maximum number of events that can be returned. - MAX_KQUEUE_RESULT_EVENTS = 64 + MAX_KQUEUE_EVENTS = 64 # SIG_IGN and SIG_DFL declared in posix.nim as variables, but we need them # to be constants and GC-safe. SIG_DFL = cast[proc(x: cint) {.noconv,gcsafe.}](0) SIG_IGN = cast[proc(x: cint) {.noconv,gcsafe.}](1) +when defined(kqcache): + const CACHE_EVENTS = true + when defined(macosx) or defined(freebsd): when defined(macosx): const MAX_DESCRIPTORS_ID = 29 # KERN_MAXFILESPERPROC (MacOS) @@ -45,68 +46,88 @@ when hasThreadSupport: SelectorImpl[T] = object kqFD : cint maxFD : int - changesTable: array[MAX_KQUEUE_CHANGE_EVENTS, KEvent] - changesCount: int + changes: seq[KEvent] fds: ptr SharedArray[SelectorKey[T]] count: int changesLock: Lock + sock: cint Selector*[T] = ptr SelectorImpl[T] else: type SelectorImpl[T] = object kqFD : cint maxFD : int - changesTable: array[MAX_KQUEUE_CHANGE_EVENTS, KEvent] - changesCount: int + changes: seq[KEvent] fds: seq[SelectorKey[T]] count: int + sock: cint Selector*[T] = ref SelectorImpl[T] type SelectEventImpl = object rfd: cint wfd: cint -# SelectEvent is declared as `ptr` to be placed in `shared memory`, -# so you can share one SelectEvent handle between threads. -type SelectEvent* = ptr SelectEventImpl + + SelectEvent* = ptr SelectEventImpl + # SelectEvent is declared as `ptr` to be placed in `shared memory`, + # so you can share one SelectEvent handle between threads. + +proc getUnique[T](s: Selector[T]): int {.inline.} = + # we create duplicated handles to get unique indexes for our `fds` array. + result = posix.fcntl(s.sock, F_DUPFD, s.sock) + if result == -1: + raiseIOSelectorsError(osLastError()) proc newSelector*[T](): Selector[T] = var maxFD = 0.cint var size = csize(sizeof(cint)) var namearr = [1.cint, MAX_DESCRIPTORS_ID.cint] - # Obtain maximum number of file descriptors for process + # Obtain maximum number of opened file descriptors for process if sysctl(addr(namearr[0]), 2, cast[pointer](addr maxFD), addr size, nil, 0) != 0: - raiseOsError(osLastError()) + raiseIOSelectorsError(osLastError()) var kqFD = kqueue() if kqFD < 0: - raiseOsError(osLastError()) + raiseIOSelectorsError(osLastError()) when hasThreadSupport: result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T]))) - result.kqFD = kqFD - result.maxFD = maxFD.int result.fds = allocSharedArray[SelectorKey[T]](maxFD) initLock(result.changesLock) else: result = Selector[T]() - result.kqFD = kqFD - result.maxFD = maxFD.int result.fds = newSeq[SelectorKey[T]](maxFD) + result.kqFD = kqFD + result.maxFD = maxFD.int + result.changes = newSeqOfCap[KEvent](MAX_KQUEUE_EVENTS) + # we allocating empty socket to duplicate it handle in future, to get unique + # indexes for `fds` array. This is needed to properly identify + # {Event.Timer, Event.Signal, Event.Process} events. + result.sock = posix.socket(posix.AF_INET, posix.SOCK_STREAM, + posix.IPPROTO_TCP).cint + if result.sock == -1: + raiseIOSelectorsError(osLastError()) + proc close*[T](s: Selector[T]) = if posix.close(s.kqFD) != 0: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) when hasThreadSupport: deinitLock(s.changesLock) deallocSharedArray(s.fds) deallocShared(cast[pointer](s)) +template clearKey[T](key: ptr SelectorKey[T]) = + var empty: T + key.ident = 0 + key.events = {} + key.data = empty + proc newSelectEvent*(): SelectEvent = var fds: array[2, cint] if posix.pipe(fds) == -1: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) setNonBlocking(fds[0]) setNonBlocking(fds[1]) result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl))) @@ -116,16 +137,18 @@ proc newSelectEvent*(): SelectEvent = proc setEvent*(ev: SelectEvent) = var data: uint64 = 1 if posix.write(ev.wfd, addr data, sizeof(uint64)) != sizeof(uint64): - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) proc close*(ev: SelectEvent) = - discard posix.close(cint(ev.rfd)) - discard posix.close(cint(ev.wfd)) + if posix.close(cint(ev.rfd)) == -1: + raiseIOSelectorsError(osLastError()) + if posix.close(cint(ev.wfd)) == -1: + raiseIOSelectorsError(osLastError()) deallocShared(cast[pointer](ev)) template checkFd(s, f) = if f >= s.maxFD: - raise newException(ValueError, "Maximum file descriptors exceeded") + raiseIOSelectorsError("Maximum file descriptors exceeded!") when hasThreadSupport: template withChangeLock[T](s: Selector[T], body: untyped) = @@ -144,31 +167,40 @@ template modifyKQueue[T](s: Selector[T], nident: uint, nfilter: cshort, nudata: pointer) = mixin withChangeLock s.withChangeLock(): - s.changesTable[s.changesCount] = KEvent(ident: nident, - filter: nfilter, flags: nflags, - fflags: nfflags, data: ndata, - udata: nudata) - inc(s.changesCount) - if s.changesCount == MAX_KQUEUE_CHANGE_EVENTS: - if kevent(s.kqFD, addr(s.changesTable[0]), cint(s.changesCount), - nil, 0, nil) == -1: - raiseOSError(osLastError()) - s.changesCount = 0 + s.changes.add(KEvent(ident: nident, + filter: nfilter, flags: nflags, + fflags: nfflags, data: ndata, + udata: nudata)) + +when not declared(CACHE_EVENTS): + template flushKQueue[T](s: Selector[T]) = + mixin withChangeLock + s.withChangeLock(): + let length = cint(len(s.changes)) + if length > 0: + if kevent(s.kqFD, addr(s.changes[0]), length, + nil, 0, nil) == -1: + raiseIOSelectorsError(osLastError()) + s.changes.setLen(0) proc registerHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event], data: T) = let fdi = int(fd) s.checkFd(fdi) doAssert(s.fds[fdi].ident == 0) - s.setKey(fdi, fdi, events, 0, data) + s.setKey(fdi, events, 0, data) + if events != {}: if Event.Read in events: - modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil) + modifyKQueue(s, uint(fdi), EVFILT_READ, EV_ADD, 0, 0, nil) inc(s.count) if Event.Write in events: - modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_ADD, 0, 0, nil) + modifyKQueue(s, uint(fdi), EVFILT_WRITE, EV_ADD, 0, 0, nil) inc(s.count) + when not declared(CACHE_EVENTS): + flushKQueue(s) + proc updateHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event]) = let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode, @@ -192,40 +224,41 @@ proc updateHandle*[T](s: Selector[T], fd: SocketHandle, if (Event.Write notin pkey.events) and (Event.Write in events): modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_ADD, 0, 0, nil) inc(s.count) + + when not declared(CACHE_EVENTS): + flushKQueue(s) + pkey.events = events proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool, data: T): int {.discardable.} = - var fdi = posix.socket(posix.AF_INET, posix.SOCK_STREAM, - posix.IPPROTO_TCP).int - if fdi == -1: - raiseOsError(osLastError()) - + let fdi = getUnique(s) s.checkFd(fdi) doAssert(s.fds[fdi].ident == 0) let events = if oneshot: {Event.Timer, Event.Oneshot} else: {Event.Timer} let flags: cushort = if oneshot: EV_ONESHOT or EV_ADD else: EV_ADD - s.setKey(fdi, fdi, events, 0, data) + s.setKey(fdi, events, 0, data) + # EVFILT_TIMER on Open/Net(BSD) has granularity of only milliseconds, # but MacOS and FreeBSD allow use `0` as `fflags` to use milliseconds # too modifyKQueue(s, fdi.uint, EVFILT_TIMER, flags, 0, cint(timeout), nil) + + when not declared(CACHE_EVENTS): + flushKQueue(s) + inc(s.count) result = fdi proc registerSignal*[T](s: Selector[T], signal: int, data: T): int {.discardable.} = - var fdi = posix.socket(posix.AF_INET, posix.SOCK_STREAM, - posix.IPPROTO_TCP).int - if fdi == -1: - raiseOsError(osLastError()) - + let fdi = getUnique(s) s.checkFd(fdi) doAssert(s.fds[fdi].ident == 0) - s.setKey(fdi, signal, {Event.Signal}, signal, data) + s.setKey(fdi, {Event.Signal}, signal, data) var nmask, omask: Sigset discard sigemptyset(nmask) discard sigemptyset(omask) @@ -233,33 +266,44 @@ proc registerSignal*[T](s: Selector[T], signal: int, blockSignals(nmask, omask) # to be compatible with linux semantic we need to "eat" signals posix.signal(cint(signal), SIG_IGN) + modifyKQueue(s, signal.uint, EVFILT_SIGNAL, EV_ADD, 0, 0, cast[pointer](fdi)) + + when not declared(CACHE_EVENTS): + flushKQueue(s) + inc(s.count) result = fdi proc registerProcess*[T](s: Selector[T], pid: int, - data: T): int {.discardable.} = - var fdi = posix.socket(posix.AF_INET, posix.SOCK_STREAM, - posix.IPPROTO_TCP).int - if fdi == -1: - raiseOsError(osLastError()) - + data: T): int {.discardable.} = + let fdi = getUnique(s) s.checkFd(fdi) doAssert(s.fds[fdi].ident == 0) var kflags: cushort = EV_ONESHOT or EV_ADD - setKey(s, fdi, pid, {Event.Process, Event.Oneshot}, pid, data) + setKey(s, fdi, {Event.Process, Event.Oneshot}, pid, data) + modifyKQueue(s, pid.uint, EVFILT_PROC, kflags, NOTE_EXIT, 0, cast[pointer](fdi)) + + when not declared(CACHE_EVENTS): + flushKQueue(s) + inc(s.count) result = fdi proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) = let fdi = ev.rfd.int doAssert(s.fds[fdi].ident == 0) - setKey(s, fdi, fdi, {Event.User}, 0, data) + setKey(s, fdi, {Event.User}, 0, data) + modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil) + + when not declared(CACHE_EVENTS): + flushKQueue(s) + inc(s.count) template processVnodeEvents(events: set[Event]): cuint = @@ -281,9 +325,14 @@ template processVnodeEvents(events: set[Event]): cuint = proc registerVnode*[T](s: Selector[T], fd: cint, events: set[Event], data: T) = let fdi = fd.int - setKey(s, fdi, fdi, {Event.Vnode} + events, 0, data) + setKey(s, fdi, {Event.Vnode} + events, 0, data) var fflags = processVnodeEvents(events) + modifyKQueue(s, fdi.uint, EVFILT_VNODE, EV_ADD or EV_CLEAR, fflags, 0, nil) + + when not declared(CACHE_EVENTS): + flushKQueue(s) + inc(s.count) proc unregister*[T](s: Selector[T], fd: int|SocketHandle) = @@ -295,38 +344,55 @@ proc unregister*[T](s: Selector[T], fd: int|SocketHandle) = if pkey.events != {}: if pkey.events * {Event.Read, Event.Write} != {}: if Event.Read in pkey.events: - modifyKQueue(s, fdi.uint, EVFILT_READ, EV_DELETE, 0, 0, nil) + modifyKQueue(s, uint(fdi), EVFILT_READ, EV_DELETE, 0, 0, nil) dec(s.count) if Event.Write in pkey.events: - modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_DELETE, 0, 0, nil) + modifyKQueue(s, uint(fdi), EVFILT_WRITE, EV_DELETE, 0, 0, nil) dec(s.count) + when not declared(CACHE_EVENTS): + flushKQueue(s) elif Event.Timer in pkey.events: - discard posix.close(cint(pkey.key.fd)) - modifyKQueue(s, fdi.uint, EVFILT_TIMER, EV_DELETE, 0, 0, nil) - dec(s.count) + if Event.Finished notin pkey.events: + modifyKQueue(s, uint(fdi), EVFILT_TIMER, EV_DELETE, 0, 0, nil) + when not declared(CACHE_EVENTS): + flushKQueue(s) + dec(s.count) + if posix.close(cint(pkey.ident)) == -1: + raiseIOSelectorsError(osLastError()) elif Event.Signal in pkey.events: var nmask, omask: Sigset - var signal = cint(pkey.param) + let signal = cint(pkey.param) discard sigemptyset(nmask) discard sigemptyset(omask) discard sigaddset(nmask, signal) unblockSignals(nmask, omask) posix.signal(signal, SIG_DFL) - discard posix.close(cint(pkey.key.fd)) - modifyKQueue(s, fdi.uint, EVFILT_SIGNAL, EV_DELETE, 0, 0, nil) + modifyKQueue(s, uint(pkey.param), EVFILT_SIGNAL, EV_DELETE, 0, 0, nil) + when not declared(CACHE_EVENTS): + flushKQueue(s) dec(s.count) + if posix.close(cint(pkey.ident)) == -1: + raiseIOSelectorsError(osLastError()) elif Event.Process in pkey.events: - discard posix.close(cint(pkey.key.fd)) - modifyKQueue(s, fdi.uint, EVFILT_PROC, EV_DELETE, 0, 0, nil) - dec(s.count) + if Event.Finished notin pkey.events: + modifyKQueue(s, uint(pkey.param), EVFILT_PROC, EV_DELETE, 0, 0, nil) + when not declared(CACHE_EVENTS): + flushKQueue(s) + dec(s.count) + if posix.close(cint(pkey.ident)) == -1: + raiseIOSelectorsError(osLastError()) elif Event.Vnode in pkey.events: - modifyKQueue(s, fdi.uint, EVFILT_VNODE, EV_DELETE, 0, 0, nil) + modifyKQueue(s, uint(fdi), EVFILT_VNODE, EV_DELETE, 0, 0, nil) + when not declared(CACHE_EVENTS): + flushKQueue(s) dec(s.count) elif Event.User in pkey.events: - modifyKQueue(s, fdi.uint, EVFILT_READ, EV_DELETE, 0, 0, nil) + modifyKQueue(s, uint(fdi), EVFILT_READ, EV_DELETE, 0, 0, nil) + when not declared(CACHE_EVENTS): + flushKQueue(s) dec(s.count) - pkey.ident = 0 - pkey.events = {} + + clearKey(pkey) proc unregister*[T](s: Selector[T], ev: SelectEvent) = let fdi = int(ev.rfd) @@ -334,26 +400,20 @@ proc unregister*[T](s: Selector[T], ev: SelectEvent) = var pkey = addr(s.fds[fdi]) doAssert(pkey.ident != 0) doAssert(Event.User in pkey.events) - pkey.ident = 0 - pkey.events = {} - modifyKQueue(s, fdi.uint, EVFILT_READ, EV_DELETE, 0, 0, nil) - dec(s.count) -proc flush*[T](s: Selector[T]) = - s.withChangeLock(): - var tv = Timespec() - if kevent(s.kqFD, addr(s.changesTable[0]), cint(s.changesCount), - nil, 0, addr tv) == -1: - raiseOSError(osLastError()) - s.changesCount = 0 + modifyKQueue(s, uint(fdi), EVFILT_READ, EV_DELETE, 0, 0, nil) + when not declared(CACHE_EVENTS): + flushKQueue(s) + clearKey(pkey) + dec(s.count) proc selectInto*[T](s: Selector[T], timeout: int, - results: var openarray[ReadyKey[T]]): int = + results: var openarray[ReadyKey]): int = var tv: Timespec - resTable: array[MAX_KQUEUE_RESULT_EVENTS, KEvent] + resTable: array[MAX_KQUEUE_EVENTS, KEvent] ptv = addr tv - maxres = MAX_KQUEUE_RESULT_EVENTS + maxres = MAX_KQUEUE_EVENTS if timeout != -1: if timeout >= 1000: @@ -369,116 +429,147 @@ proc selectInto*[T](s: Selector[T], timeout: int, maxres = len(results) var count = 0 - s.withChangeLock(): - count = kevent(s.kqFD, addr(s.changesTable[0]), cint(s.changesCount), - addr(resTable[0]), cint(maxres), ptv) - s.changesCount = 0 + when not declared(CACHE_EVENTS): + count = kevent(s.kqFD, nil, cint(0), addr(resTable[0]), cint(maxres), ptv) + else: + s.withChangeLock(): + let length = cint(len(s.changes)) + if length > 0: + count = kevent(s.kqFD, addr(s.changes[0]), length, + addr(resTable[0]), cint(maxres), ptv) + s.changes.setLen(0) + else: + count = kevent(s.kqFD, nil, cint(0), addr(resTable[0]), cint(maxres), + ptv) if count < 0: result = 0 let err = osLastError() if cint(err) != EINTR: - raiseOSError(err) + raiseIOSelectorsError(err) elif count == 0: result = 0 else: var i = 0 - var k = 0 + var k = 0 # do not delete this, because `continue` used in cycle. var pkey: ptr SelectorKey[T] while i < count: let kevent = addr(resTable[i]) - if (kevent.flags and EV_ERROR) == 0: - case kevent.filter: - of EVFILT_READ: - pkey = addr(s.fds[kevent.ident.int]) - pkey.key.events = {Event.Read} - if Event.User in pkey.events: - var data: uint64 = 0 - if posix.read(kevent.ident.cint, addr data, - sizeof(uint64)) != sizeof(uint64): - let err = osLastError() - if err == OSErrorCode(EAGAIN): - # someone already consumed event data - inc(i) - continue - else: - raiseOSError(osLastError()) - pkey.key.events = {Event.User} - of EVFILT_WRITE: - pkey = addr(s.fds[kevent.ident.int]) - pkey.key.events = {Event.Write} - of EVFILT_TIMER: - pkey = addr(s.fds[kevent.ident.int]) - if Event.Oneshot in pkey.events: - if posix.close(cint(pkey.ident)) == -1: - raiseOSError(osLastError()) - pkey.ident = 0 - pkey.events = {} - dec(s.count) - pkey.key.events = {Event.Timer} - of EVFILT_VNODE: - pkey = addr(s.fds[kevent.ident.int]) - pkey.key.events = {Event.Vnode} - if (kevent.fflags and NOTE_DELETE) != 0: - pkey.key.events.incl(Event.VnodeDelete) - if (kevent.fflags and NOTE_WRITE) != 0: - pkey.key.events.incl(Event.VnodeWrite) - if (kevent.fflags and NOTE_EXTEND) != 0: - pkey.key.events.incl(Event.VnodeExtend) - if (kevent.fflags and NOTE_ATTRIB) != 0: - pkey.key.events.incl(Event.VnodeAttrib) - if (kevent.fflags and NOTE_LINK) != 0: - pkey.key.events.incl(Event.VnodeLink) - if (kevent.fflags and NOTE_RENAME) != 0: - pkey.key.events.incl(Event.VnodeRename) - if (kevent.fflags and NOTE_REVOKE) != 0: - pkey.key.events.incl(Event.VnodeRevoke) - of EVFILT_SIGNAL: - pkey = addr(s.fds[cast[int](kevent.udata)]) - pkey.key.events = {Event.Signal} - of EVFILT_PROC: - pkey = addr(s.fds[cast[int](kevent.udata)]) - if posix.close(cint(pkey.ident)) == -1: - raiseOSError(osLastError()) - pkey.ident = 0 - pkey.events = {} + var rkey = ReadyKey(fd: int(kevent.ident), events: {}) + + if (kevent.flags and EV_ERROR) != 0: + rkey.events = {Event.Error} + + case kevent.filter: + of EVFILT_READ: + pkey = addr(s.fds[int(kevent.ident)]) + rkey.events.incl(Event.Read) + if Event.User in pkey.events: + var data: uint64 = 0 + if posix.read(cint(kevent.ident), addr data, + sizeof(uint64)) != sizeof(uint64): + let err = osLastError() + if err == OSErrorCode(EAGAIN): + # someone already consumed event data + inc(i) + continue + else: + raiseIOSelectorsError(err) + rkey.events = {Event.User} + of EVFILT_WRITE: + pkey = addr(s.fds[int(kevent.ident)]) + rkey.events.incl(Event.Write) + rkey.events = {Event.Write} + of EVFILT_TIMER: + pkey = addr(s.fds[int(kevent.ident)]) + if Event.Oneshot in pkey.events: + # we will not clear key until it will be unregistered, so + # application can obtain data, but we will decrease counter, + # because kqueue is empty. dec(s.count) - pkey.key.events = {Event.Process} - else: - raise newException(ValueError, "Unsupported kqueue filter in queue") - - if (kevent.flags and EV_EOF) != 0: - pkey.key.events.incl(Event.Error) - - results[k] = pkey.key - inc(k) + # we are marking key with `Finished` event, to avoid double decrease. + pkey.events.incl(Event.Finished) + rkey.events.incl(Event.Timer) + of EVFILT_VNODE: + pkey = addr(s.fds[int(kevent.ident)]) + rkey.events.incl(Event.Vnode) + if (kevent.fflags and NOTE_DELETE) != 0: + rkey.events.incl(Event.VnodeDelete) + if (kevent.fflags and NOTE_WRITE) != 0: + rkey.events.incl(Event.VnodeWrite) + if (kevent.fflags and NOTE_EXTEND) != 0: + rkey.events.incl(Event.VnodeExtend) + if (kevent.fflags and NOTE_ATTRIB) != 0: + rkey.events.incl(Event.VnodeAttrib) + if (kevent.fflags and NOTE_LINK) != 0: + rkey.events.incl(Event.VnodeLink) + if (kevent.fflags and NOTE_RENAME) != 0: + rkey.events.incl(Event.VnodeRename) + if (kevent.fflags and NOTE_REVOKE) != 0: + rkey.events.incl(Event.VnodeRevoke) + of EVFILT_SIGNAL: + pkey = addr(s.fds[cast[int](kevent.udata)]) + rkey.fd = cast[int](kevent.udata) + rkey.events.incl(Event.Signal) + of EVFILT_PROC: + rkey.fd = cast[int](kevent.udata) + pkey = addr(s.fds[cast[int](kevent.udata)]) + # we will not clear key, until it will be unregistered, so + # application can obtain data, but we will decrease counter, + # because kqueue is empty. + dec(s.count) + # we are marking key with `Finished` event, to avoid double decrease. + pkey.events.incl(Event.Finished) + rkey.events.incl(Event.Process) + else: + pkey = addr(s.fds[cast[int](kevent.udata)]) + raiseIOSelectorsError("Unsupported kqueue filter in queue!") + + if (kevent.flags and EV_EOF) != 0: + rkey.events.incl(Event.Error) + + results[k] = rkey + inc(k) inc(i) result = k -proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey[T]] = - result = newSeq[ReadyKey[T]](MAX_KQUEUE_RESULT_EVENTS) +proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] = + result = newSeq[ReadyKey](MAX_KQUEUE_EVENTS) let count = selectInto(s, timeout, result) result.setLen(count) template isEmpty*[T](s: Selector[T]): bool = (s.count == 0) -template withData*[T](s: Selector[T], fd: SocketHandle, value, - body: untyped) = +proc getData*[T](s: Selector[T], fd: SocketHandle|int): T = + let fdi = int(fd) + s.checkFd(fdi) + if s.fds[fdi].ident != 0: + result = s.fds[fdi].data + +proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool = + let fdi = int(fd) + s.checkFd(fdi) + if s.fds[fdi].ident != 0: + s.fds[fdi].data = data + result = true + +template withData*[T](s: Selector[T], fd: SocketHandle|int, value, + body: untyped) = mixin checkFd let fdi = int(fd) s.checkFd(fdi) if s.fds[fdi].ident != 0: - var value = addr(s.fds[fdi].key.data) + var value = addr(s.fds[fdi].data) body -template withData*[T](s: Selector[T], fd: SocketHandle, value, body1, - body2: untyped) = +template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1, + body2: untyped) = mixin checkFd let fdi = int(fd) s.checkFd(fdi) if s.fds[fdi].ident != 0: - var value = addr(s.fds[fdi].key.data) + var value = addr(s.fds[fdi].data) body1 else: body2 diff --git a/lib/pure/ioselects/ioselectors_poll.nim b/lib/pure/ioselects/ioselectors_poll.nim index 56be35c70..26fe60b54 100644 --- a/lib/pure/ioselects/ioselectors_poll.nim +++ b/lib/pure/ioselects/ioselectors_poll.nim @@ -12,7 +12,7 @@ import posix, times # Maximum number of events that can be returned -const MAX_POLL_RESULT_EVENTS = 64 +const MAX_POLL_EVENTS = 64 when hasThreadSupport: type @@ -65,7 +65,7 @@ else: proc newSelector*[T](): Selector[T] = var a = rlimit() if getrlimit(RLIMIT_NOFILE, a) != 0: - raiseOsError(osLastError()) + raiseIOSelectorsError(osLastError()) var maxFD = int(a.rlim_max) when hasThreadSupport: @@ -87,6 +87,12 @@ proc close*[T](s: Selector[T]) = deallocSharedArray(s.pollfds) deallocShared(cast[pointer](s)) +template clearKey[T](key: ptr SelectorKey[T]) = + var empty: T + key.ident = 0 + key.events = {} + key.data = empty + template pollAdd[T](s: Selector[T], sock: cint, events: set[Event]) = withPollLock(s): var pollev: cshort = 0 @@ -111,7 +117,7 @@ template pollUpdate[T](s: Selector[T], sock: cint, events: set[Event]) = inc(i) if i == s.pollcnt: - raise newException(ValueError, "Descriptor is not registered in queue") + raiseIOSelectorsError("Descriptor is not registered in queue") template pollRemove[T](s: Selector[T], sock: cint) = withPollLock(s): @@ -134,14 +140,14 @@ template pollRemove[T](s: Selector[T], sock: cint) = template checkFd(s, f) = if f >= s.maxFD: - raise newException(ValueError, "Maximum file descriptors exceeded") + raiseIOSelectorsError("Descriptor is not registered in queue") proc registerHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event], data: T) = var fdi = int(fd) s.checkFd(fdi) doAssert(s.fds[fdi].ident == 0) - s.setKey(fdi, fdi, events, 0, data) + setKey(s, fdi, events, 0, data) if events != {}: s.pollAdd(fdi.cint, events) proc updateHandle*[T](s: Selector[T], fd: SocketHandle, @@ -168,12 +174,10 @@ proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) = var fdi = int(ev.rfd) doAssert(s.fds[fdi].ident == 0) var events = {Event.User} - setKey(s, fdi, fdi, events, 0, data) + setKey(s, fdi, events, 0, data) events.incl(Event.Read) s.pollAdd(fdi.cint, events) -proc flush*[T](s: Selector[T]) = discard - proc unregister*[T](s: Selector[T], fd: int|SocketHandle) = let fdi = int(fd) s.checkFd(fdi) @@ -196,7 +200,7 @@ proc unregister*[T](s: Selector[T], ev: SelectEvent) = proc newSelectEvent*(): SelectEvent = var fds: array[2, cint] if posix.pipe(fds) == -1: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) setNonBlocking(fds[0]) setNonBlocking(fds[1]) result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl))) @@ -206,16 +210,18 @@ proc newSelectEvent*(): SelectEvent = proc setEvent*(ev: SelectEvent) = var data: uint64 = 1 if posix.write(ev.wfd, addr data, sizeof(uint64)) != sizeof(uint64): - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) proc close*(ev: SelectEvent) = - discard posix.close(cint(ev.rfd)) - discard posix.close(cint(ev.wfd)) + if posix.close(cint(ev.rfd)) == -1: + raiseIOSelectorsError(osLastError()) + if posix.close(cint(ev.wfd)) == -1: + raiseIOSelectorsError(osLastError()) deallocShared(cast[pointer](ev)) proc selectInto*[T](s: Selector[T], timeout: int, - results: var openarray[ReadyKey[T]]): int = - var maxres = MAX_POLL_RESULT_EVENTS + results: var openarray[ReadyKey]): int = + var maxres = MAX_POLL_EVENTS if maxres > len(results): maxres = len(results) @@ -224,10 +230,8 @@ proc selectInto*[T](s: Selector[T], timeout: int, if count < 0: result = 0 let err = osLastError() - if err.cint == EINTR: - discard - else: - raiseOSError(osLastError()) + if cint(err) != EINTR: + raiseIOSelectorsError(err) elif count == 0: result = 0 else: @@ -238,58 +242,71 @@ proc selectInto*[T](s: Selector[T], timeout: int, let revents = s.pollfds[i].revents if revents != 0: let fd = s.pollfds[i].fd - var skey = addr(s.fds[fd]) - skey.key.events = {} + var pkey = addr(s.fds[fd]) + var rkey = ReadyKey(fd: int(fd), events: {}) if (revents and POLLIN) != 0: - skey.key.events.incl(Event.Read) - if Event.User in skey.events: + rkey.events.incl(Event.Read) + if Event.User in pkey.events: var data: uint64 = 0 if posix.read(fd, addr data, sizeof(uint64)) != sizeof(uint64): let err = osLastError() if err != OSErrorCode(EAGAIN): - raiseOSError(osLastError()) + raiseIOSelectorsError(err) else: # someone already consumed event data inc(i) continue - skey.key.events = {Event.User} + rkey.events = {Event.User} if (revents and POLLOUT) != 0: - skey.key.events.incl(Event.Write) + rkey.events.incl(Event.Write) if (revents and POLLERR) != 0 or (revents and POLLHUP) != 0 or (revents and POLLNVAL) != 0: - skey.key.events.incl(Event.Error) - results[rindex] = skey.key + rkey.events.incl(Event.Error) + results[rindex] = rkey s.pollfds[i].revents = 0 inc(rindex) inc(k) inc(i) result = k -proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey[T]] = - result = newSeq[ReadyKey[T]](MAX_POLL_RESULT_EVENTS) +proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] = + result = newSeq[ReadyKey](MAX_POLL_EVENTS) let count = selectInto(s, timeout, result) result.setLen(count) template isEmpty*[T](s: Selector[T]): bool = (s.count == 0) -template withData*[T](s: Selector[T], fd: SocketHandle, value, +proc getData*[T](s: Selector[T], fd: SocketHandle|int): T = + let fdi = int(fd) + s.checkFd(fdi) + if s.fds[fdi].ident != 0: + result = s.fds[fdi].data + +proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool = + let fdi = int(fd) + s.checkFd(fdi) + if s.fds[fdi].ident != 0: + s.fds[fdi].data = data + result = true + +template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body: untyped) = mixin checkFd let fdi = int(fd) s.checkFd(fdi) if s.fds[fdi].ident != 0: - var value = addr(s.fds[fdi].key.data) + var value = addr(s.fds[fdi].data) body -template withData*[T](s: Selector[T], fd: SocketHandle, value, body1, +template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1, body2: untyped) = mixin checkFd let fdi = int(fd) s.checkFd(fdi) if s.fds[fdi].ident != 0: - var value = addr(s.fds[fdi].key.data) + var value = addr(s.fds[fdi].data) body1 else: body2 diff --git a/lib/pure/ioselects/ioselectors_select.nim b/lib/pure/ioselects/ioselectors_select.nim index ddb70b507..19d68f0fe 100644 --- a/lib/pure/ioselects/ioselectors_select.nim +++ b/lib/pure/ioselects/ioselectors_select.nim @@ -120,35 +120,35 @@ when defined(windows): saddr.sin_addr.s_addr = INADDR_ANY if bindAddr(ssock, cast[ptr SockAddr](addr(saddr)), sizeof(saddr).SockLen) < 0'i32: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) if winlean.listen(ssock, 1) == -1: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) var namelen = sizeof(saddr).SockLen if getsockname(ssock, cast[ptr SockAddr](addr(saddr)), addr(namelen)) == -1'i32: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) saddr.sin_addr.s_addr = 0x0100007F if winlean.connect(wsock, cast[ptr SockAddr](addr(saddr)), sizeof(saddr).SockLen) == -1: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) namelen = sizeof(saddr).SockLen rsock = winlean.accept(ssock, cast[ptr SockAddr](addr(saddr)), cast[ptr SockLen](addr(namelen))) if rsock == SocketHandle(-1): - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) if winlean.closesocket(ssock) == -1: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) var mode = clong(1) if ioctlsocket(rsock, FIONBIO, addr(mode)) == -1: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) mode = clong(1) if ioctlsocket(wsock, FIONBIO, addr(mode)) == -1: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl))) result.rsock = rsock @@ -158,7 +158,7 @@ when defined(windows): var data: uint64 = 1 if winlean.send(ev.wsock, cast[pointer](addr data), cint(sizeof(uint64)), 0) != sizeof(uint64): - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) proc close*(ev: SelectEvent) = discard winlean.closesocket(ev.rsock) @@ -169,7 +169,7 @@ else: proc newSelectEvent*(): SelectEvent = var fds: array[2, cint] if posix.pipe(fds) == -1: - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) setNonBlocking(fds[0]) setNonBlocking(fds[1]) result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl))) @@ -179,14 +179,17 @@ else: proc setEvent*(ev: SelectEvent) = var data: uint64 = 1 if posix.write(cint(ev.wsock), addr data, sizeof(uint64)) != sizeof(uint64): - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) proc close*(ev: SelectEvent) = - discard posix.close(cint(ev.rsock)) - discard posix.close(cint(ev.wsock)) + if posix.close(cint(ev.rsock)) == -1: + raiseIOSelectorsError(osLastError()) + if posix.close(cint(ev.wsock)) == -1: + raiseIOSelectorsError(osLastError()) deallocShared(cast[pointer](ev)) -proc setKey[T](s: Selector[T], fd: SocketHandle, events: set[Event], data: T) = +proc setSelectKey[T](s: Selector[T], fd: SocketHandle, events: set[Event], + data: T) = var i = 0 let fdi = int(fd) while i < FD_SETSIZE: @@ -194,13 +197,11 @@ proc setKey[T](s: Selector[T], fd: SocketHandle, events: set[Event], data: T) = var pkey = addr(s.fds[i]) pkey.ident = fdi pkey.events = events - pkey.key.fd = fd.int - pkey.key.events = {} - pkey.key.data = data + pkey.data = data break inc(i) if i == FD_SETSIZE: - raise newException(ValueError, "Maximum numbers of fds exceeded") + raiseIOSelectorsError("Maximum numbers of fds exceeded") proc getKey[T](s: Selector[T], fd: SocketHandle): ptr SelectorKey[T] = var i = 0 @@ -210,24 +211,28 @@ proc getKey[T](s: Selector[T], fd: SocketHandle): ptr SelectorKey[T] = result = addr(s.fds[i]) break inc(i) - doAssert(i < FD_SETSIZE, "Descriptor not registered in queue") + if i == FD_SETSIZE: + raiseIOSelectorsError("Descriptor not registered in queue") proc delKey[T](s: Selector[T], fd: SocketHandle) = + var empty: T var i = 0 while i < FD_SETSIZE: if s.fds[i].ident == fd.int: s.fds[i].ident = 0 s.fds[i].events = {} + s.fds[i].data = empty break inc(i) - doAssert(i < FD_SETSIZE, "Descriptor not registered in queue") + if i == FD_SETSIZE: + raiseIOSelectorsError("Descriptor not registered in queue") proc registerHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event], data: T) = when not defined(windows): let fdi = int(fd) s.withSelectLock(): - s.setKey(fd, events, data) + s.setSelectKey(fd, events, data) when not defined(windows): if fdi > s.maxFD: s.maxFD = fdi if Event.Read in events: @@ -242,7 +247,7 @@ proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) = when not defined(windows): let fdi = int(ev.rsock) s.withSelectLock(): - s.setKey(ev.rsock, {Event.User}, data) + s.setSelectKey(ev.rsock, {Event.User}, data) when not defined(windows): if fdi > s.maxFD: s.maxFD = fdi IOFD_SET(ev.rsock, addr s.rSet) @@ -292,7 +297,7 @@ proc unregister*[T](s: Selector[T], ev: SelectEvent) = s.delKey(fd) proc selectInto*[T](s: Selector[T], timeout: int, - results: var openarray[ReadyKey[T]]): int = + results: var openarray[ReadyKey]): int = var tv = Timeval() var ptv = addr tv var rset, wset, eset: FdSet @@ -313,11 +318,11 @@ proc selectInto*[T](s: Selector[T], timeout: int, if count < 0: result = 0 when defined(windows): - raiseOSError(osLastError()) + raiseIOSelectorsError(osLastError()) else: let err = osLastError() if cint(err) != EINTR: - raiseOSError(err) + raiseIOSelectorsError(err) elif count == 0: result = 0 else: @@ -329,7 +334,7 @@ proc selectInto*[T](s: Selector[T], timeout: int, if s.fds[i].ident != 0: var flag = false var pkey = addr(s.fds[i]) - pkey.key.events = {} + var rkey = ReadyKey(fd: int(pkey.ident), events: {}) let fd = SocketHandle(pkey.ident) if IOFD_ISSET(fd, addr rset) != 0: if Event.User in pkey.events: @@ -338,31 +343,31 @@ proc selectInto*[T](s: Selector[T], timeout: int, sizeof(uint64).cint, 0) != sizeof(uint64): let err = osLastError() if cint(err) != EAGAIN: - raiseOSError(err) + raiseIOSelectorsError(err) else: inc(i) inc(k) continue else: flag = true - pkey.key.events = {Event.User} + rkey.events = {Event.User} else: flag = true - pkey.key.events = {Event.Read} + rkey.events = {Event.Read} if IOFD_ISSET(fd, addr wset) != 0: - pkey.key.events.incl(Event.Write) + rkey.events.incl(Event.Write) if IOFD_ISSET(fd, addr eset) != 0: - pkey.key.events.incl(Event.Error) + rkey.events.incl(Event.Error) flag = true if flag: - results[rindex] = pkey.key + results[rindex] = rkey inc(rindex) inc(k) inc(i) result = rindex -proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey[T]] = - result = newSeq[ReadyKey[T]](FD_SETSIZE) +proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] = + result = newSeq[ReadyKey](FD_SETSIZE) var count = selectInto(s, timeout, result) result.setLen(count) @@ -383,7 +388,28 @@ else: template withSelectLock[T](s: Selector[T], body: untyped) = body -template withData*[T](s: Selector[T], fd: SocketHandle, value, +proc getData*[T](s: Selector[T], fd: SocketHandle|int): T = + s.withSelectLock(): + let fdi = int(fd) + var i = 0 + while i < FD_SETSIZE: + if s.fds[i].ident == fdi: + result = s.fds[i].data + break + inc(i) + +proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool = + s.withSelectLock(): + let fdi = int(fd) + var i = 0 + while i < FD_SETSIZE: + if s.fds[i].ident == fdi: + var pkey = addr(s.fds[i]) + pkey.data = data + result = true + break + +template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body: untyped) = mixin withSelectLock s.withSelectLock(): @@ -392,13 +418,13 @@ template withData*[T](s: Selector[T], fd: SocketHandle, value, var i = 0 while i < FD_SETSIZE: if s.fds[i].ident == fdi: - value = addr(s.fds[i].key.data) + value = addr(s.fds[i].data) break inc(i) if i != FD_SETSIZE: body -template withData*[T](s: Selector[T], fd: SocketHandle, value, +template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1, body2: untyped) = mixin withSelectLock s.withSelectLock(): @@ -407,10 +433,11 @@ template withData*[T](s: Selector[T], fd: SocketHandle, value, var i = 0 while i < FD_SETSIZE: if s.fds[i].ident == fdi: - value = addr(s.fds[i].key.data) + value = addr(s.fds[i].data) break inc(i) if i != FD_SETSIZE: body1 else: body2 + diff --git a/lib/upcoming/asyncdispatch.nim b/lib/upcoming/asyncdispatch.nim index 78c7afffc..80a4f0e4f 100644 --- a/lib/upcoming/asyncdispatch.nim +++ b/lib/upcoming/asyncdispatch.nim @@ -526,10 +526,10 @@ when defined(windows) or defined(nimdoc): proc send*(socket: AsyncFD, buf: pointer, size: int, flags = {SocketFlag.SafeDisconn}): Future[void] = - ## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future will complete once all - ## data has been sent. - ## **WARNING**: Use it with caution. If ``buf`` refers to GC'ed object, you must use GC_ref/GC_unref calls - ## to avoid early freeing of the buffer + ## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future + ## will complete once all data has been sent. + ## **WARNING**: Use it with caution. If ``buf`` refers to GC'ed object, + ## you must use GC_ref/GC_unref calls to avoid early freeing of the buffer. verifyPresence(socket) var retFuture = newFuture[void]("send") @@ -946,7 +946,8 @@ when defined(windows) or defined(nimdoc): ## receiving notifies. registerWaitableEvent(fd, cb, FD_WRITE or FD_CONNECT or FD_CLOSE) - template registerWaitableHandle(p, hEvent, flags, pcd, timeout, handleCallback) = + template registerWaitableHandle(p, hEvent, flags, pcd, timeout, + handleCallback) = let handleFD = AsyncFD(hEvent) pcd.ioPort = p.ioPort pcd.handleFd = handleFD @@ -961,7 +962,7 @@ when defined(windows) or defined(nimdoc): pcd.ovl = ol if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent, cast[WAITORTIMERCALLBACK](waitableCallback), - cast[pointer](pcd), timeout.Dword, flags): + cast[pointer](pcd), timeout.Dword, flags): GC_unref(ol) deallocShared(cast[pointer](pcd)) discard closeHandle(hEvent) @@ -1098,15 +1099,18 @@ else: import ioselectors from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK, MSG_NOSIGNAL + const + InitCallbackListSize = 4 # initial size of callbacks sequence, + # associated with file/socket descriptor. + InitDelayedCallbackListSize = 64 # initial size of delayed callbacks + # queue. type AsyncFD* = distinct cint Callback = proc (fd: AsyncFD): bool {.closure,gcsafe.} - DoublyLinkedListRef = ref DoublyLinkedList[Callback] - AsyncData = object - readCBs: DoublyLinkedListRef - writeCBs: DoublyLinkedListRef + readList: seq[Callback] + writeList: seq[Callback] AsyncEvent* = distinct SelectEvent @@ -1117,11 +1121,17 @@ else: proc `==`*(x, y: AsyncFD): bool {.borrow.} proc `==`*(x, y: AsyncEvent): bool {.borrow.} + template newAsyncData(): AsyncData = + AsyncData( + readList: newSeqOfCap[Callback](InitCallbackListSize), + writeList: newSeqOfCap[Callback](InitCallbackListSize) + ) + proc newDispatcher*(): PDispatcher = new result result.selector = newSelector[AsyncData]() result.timers.newHeapQueue() - result.callbacks = initDeque[proc ()](64) + result.callbacks = initDeque[proc ()](InitDelayedCallbackListSize) var gDisp{.threadvar.}: PDispatcher ## Global dispatcher proc getGlobalDispatcher*(): PDispatcher = @@ -1130,10 +1140,7 @@ else: proc register*(fd: AsyncFD) = let p = getGlobalDispatcher() - var data = AsyncData( - readCBs: DoublyLinkedListRef(), - writeCBs: DoublyLinkedListRef() - ) + var data = newAsyncData() p.selector.registerHandle(fd.SocketHandle, {}, data) proc newAsyncNativeSocket*(domain: cint, sockType: cint, @@ -1168,10 +1175,9 @@ else: let p = getGlobalDispatcher() var newEvents = {Event.Read} withData(p.selector, fd.SocketHandle, adata) do: - adata.readCBs[].append(cb) + adata.readList.add(cb) newEvents.incl(Event.Read) - if not isNil(adata.writeCBs.head): - newEvents.incl(Event.Write) + if len(adata.writeList) != 0: newEvents.incl(Event.Write) do: raise newException(ValueError, "File descriptor not registered.") p.selector.updateHandle(fd.SocketHandle, newEvents) @@ -1180,10 +1186,9 @@ else: let p = getGlobalDispatcher() var newEvents = {Event.Write} withData(p.selector, fd.SocketHandle, adata) do: - adata.writeCBs[].append(cb) + adata.writeList.add(cb) newEvents.incl(Event.Write) - if not isNil(adata.readCBs.head): - newEvents.incl(Event.Read) + if len(adata.readList) != 0: newEvents.incl(Event.Read) do: raise newException(ValueError, "File descriptor not registered.") p.selector.updateHandle(fd.SocketHandle, newEvents) @@ -1192,13 +1197,65 @@ else: let p = getGlobalDispatcher() not p.selector.isEmpty() or p.timers.len != 0 or p.callbacks.len != 0 + template processBasicCallbacks(ident, rwlist: untyped) = + # Process pending descriptor's callbacks. + # Invoke every callback stored in `rwlist`, until first one + # returned `false`, which means callback wants to stay + # alive. In such case all remaining callbacks will be added + # to `rwlist` again, in the order they have been inserted. + # + # `rwlist` associated with file descriptor MUST BE emptied before + # dispatching callback (See https://github.com/nim-lang/Nim/issues/5128), + # or it can be possible to fall into endless cycle. + var curList: seq[Callback] + + withData(p.selector, ident, adata) do: + shallowCopy(curList, adata.rwlist) + adata.rwlist = newSeqOfCap[Callback](InitCallbackListSize) + + let newLength = max(len(curList), InitCallbackListSize) + var newList = newSeqOfCap[Callback](newLength) + + for cb in curList: + if len(newList) > 0: + newList.add(cb) + else: + if not cb(fd.AsyncFD): + newList.add(cb) + + withData(p.selector, ident, adata) do: + adata.rwlist = newList & adata.rwlist + + template processCustomCallbacks(ident: untyped) = + # Process pending custom event callbacks. Custom events are + # {Event.Timer, Event.Signal, Event.Process, Event.Vnode}. + # There can be only one callback registered with one descriptor, + # so there no need to iterate over list. + var curList: seq[Callback] + + withData(p.selector, ident, adata) do: + shallowCopy(curList, adata.readList) + adata.readList = newSeqOfCap[Callback](InitCallbackListSize) + + let newLength = len(curList) + var newList = newSeqOfCap[Callback](newLength) + + var cb = curList[0] + if not cb(fd.AsyncFD): + newList.add(cb) + else: + p.selector.unregister(fd) + + withData(p.selector, ident, adata) do: + adata.readList = newList & adata.readList + proc poll*(timeout = 500) = - var keys: array[64, ReadyKey[AsyncData]] + var keys: array[64, ReadyKey] let p = getGlobalDispatcher() when ioselSupportedPlatform: let customSet = {Event.Timer, Event.Signal, Event.Process, - Event.Vnode, Event.User} + Event.Vnode} if p.selector.isEmpty() and p.timers.len == 0 and p.callbacks.len == 0: raise newException(ValueError, @@ -1209,45 +1266,23 @@ else: var i = 0 while i < count: var custom = false - var fd = keys[i].fd.SocketHandle + let fd = keys[i].fd let events = keys[i].events if Event.Read in events or events == {Event.Error}: - for node in keys[i].data.readCBs[].nodes(): - let cb = node.value - if cb != nil: - if cb(fd.AsyncFD): - keys[i].data.readCBs[].remove(node) - else: - break + processBasicCallbacks(fd, readList) if Event.Write in events or events == {Event.Error}: - for node in keys[i].data.writeCBs[].nodes(): - let cb = node.value - if cb != nil: - if cb(fd.AsyncFD): - keys[i].data.writeCBs[].remove(node) - else: - break + processBasicCallbacks(fd, writeList) + + if Event.User in events or events == {Event.Error}: + custom = true + processBasicCallbacks(fd, readList) when ioselSupportedPlatform: if (customSet * events) != {}: - for node in keys[i].data.readCBs[].nodes(): - let cb = node.value - doAssert(cb != nil) - custom = true - if cb(fd.AsyncFD): - keys[i].data.readCBs[].remove(node) - p.selector.unregister(fd) - else: - if Event.User in events or events == {Event.Error}: - for node in keys[i].data.readCBs[].nodes(): - let cb = node.value - custom = true - if cb != nil: - if cb(fd.AsyncFD): - keys[i].data.readCBs[].remove(node) - p.selector.unregister(fd) + custom = true + processCustomCallbacks(fd) # because state `data` can be modified in callback we need to update # descriptor events with currently registered callbacks. @@ -1255,11 +1290,11 @@ else: var update = false var newEvents: set[Event] = {} p.selector.withData(fd, adata) do: - if not isNil(adata.readCBs.head): incl(newEvents, Event.Read) - if not isNil(adata.writeCBs.head): incl(newEvents, Event.Write) + if len(adata.readList) > 0: incl(newEvents, Event.Read) + if len(adata.writeList) > 0: incl(newEvents, Event.Write) update = true if update: - p.selector.updateHandle(fd, newEvents) + p.selector.updateHandle(SocketHandle(fd), newEvents) inc(i) # Timer processing. @@ -1519,33 +1554,24 @@ else: ## ``oneshot`` - if ``true`` only one event will be dispatched, ## if ``false`` continuous events every ``timeout`` milliseconds. let p = getGlobalDispatcher() - var data = AsyncData( - readCBs: DoublyLinkedListRef(), - writeCBs: DoublyLinkedListRef() - ) - data.readCBs[].append(cb) + var data = newAsyncData() + data.readList.add(cb) p.selector.registerTimer(timeout, oneshot, data) proc addSignal*(signal: int, cb: Callback) = ## Start watching signal ``signal``, and when signal appears, call the ## callback ``cb``. let p = getGlobalDispatcher() - var data = AsyncData( - readCBs: DoublyLinkedListRef(), - writeCBs: DoublyLinkedListRef() - ) - data.readCBs[].append(cb) + var data = newAsyncData() + data.readList.add(cb) p.selector.registerSignal(signal, data) proc addProcess*(pid: int, cb: Callback) = ## Start watching for process exit with pid ``pid``, and then call ## the callback ``cb``. let p = getGlobalDispatcher() - var data = AsyncData( - readCBs: DoublyLinkedListRef(), - writeCBs: DoublyLinkedListRef() - ) - data.readCBs[].append(cb) + var data = newAsyncData() + data.readList.add(cb) p.selector.registerProcess(pid, data) proc newAsyncEvent*(): AsyncEvent = @@ -1564,11 +1590,8 @@ else: ## Start watching for event ``ev``, and call callback ``cb``, when ## ev will be set to signaled state. let p = getGlobalDispatcher() - var data = AsyncData( - readCBs: DoublyLinkedListRef(), - writeCBs: DoublyLinkedListRef() - ) - data.readCBs[].append(cb) + var data = newAsyncData() + data.readList.add(cb) p.selector.registerEvent(SelectEvent(ev), data) proc sleepAsync*(ms: int): Future[void] = |