diff options
-rw-r--r-- | lib/pure/ioselectors.nim | 408 |
1 files changed, 198 insertions, 210 deletions
diff --git a/lib/pure/ioselectors.nim b/lib/pure/ioselectors.nim index 65e549b19..4ea7ee539 100644 --- a/lib/pure/ioselectors.nim +++ b/lib/pure/ioselectors.nim @@ -50,7 +50,7 @@ when defined(nimdoc): Selector*[T] = ref object ## An object which holds descriptors to be checked for read/write status - Event* {.pure.} = enum + Event* {.pure.} = enum ## An enum which hold event types Read, ## Descriptor is available for read Write, ## Descriptor is available for write @@ -249,7 +249,7 @@ else: import locks type - Event* {.pure.} = enum + Event* {.pure.} = enum Read, Write, Timer, Signal, Process, Vnode, User, Error, flagHandle, flagTimer, flagSignal, flagProcess, flagVnode, flagUser, flagOneshot @@ -299,6 +299,10 @@ else: s.fds[f].ident = 0 s.fds[f].flags = {} + template checkMaxFd(s, fd) = + if fd.uint >= s.maxFD: + raise newException(ValueError, "Maximum file descriptors exceeded") + when supportedPlatform: template blockSignals(newmask: var Sigset, oldmask: var Sigset) = when hasThreadSupport: @@ -326,7 +330,9 @@ else: # when bsdPlatform: const + # Maximum number of cached changes MAX_KQUEUE_CHANGE_EVENTS = 64 + # Maximum number of events that can be returned MAX_KQUEUE_RESULT_EVENTS = 64 type @@ -401,43 +407,39 @@ else: proc registerHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event], data: T) = var fdi = int(fd) - if fdi.uint < s.maxFD: - doAssert(s.fds[fdi].ident == 0) - setKey(s, fdi, fdi, {Event.flagHandle} + events, 0, data) - if events != {}: - if Event.Read in events: - modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil) - inc(s.count) - if Event.Write in events: - modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_ADD, 0, 0, nil) - inc(s.count) - else: - raise newException(ValueError, "Maximum file descriptors exceeded") + s.checkMaxFd(fdi) + doAssert(s.fds[fdi].ident == 0) + setKey(s, fdi, fdi, {Event.flagHandle} + events, 0, data) + if events != {}: + if Event.Read in events: + modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil) + inc(s.count) + if Event.Write in events: + modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_ADD, 0, 0, nil) + inc(s.count) proc updateHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event]) = var fdi = int(fd) - if fdi.uint < s.maxFD: - doAssert(s.fds[fdi].ident != 0) - doAssert(Event.flagHandle in s.fds[fdi].flags) - var ne = events + {Event.flagHandle} - var oe = s.fds[fdi].flags - if oe != ne: - if (Event.Read in oe) and (Event.Read notin ne): - modifyKQueue(s, fdi.uint, EVFILT_READ, EV_DELETE, 0, 0, nil) - dec(s.count) - if (Event.Write in oe) and (Event.Write notin ne): - modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_DELETE, 0, 0, nil) - dec(s.count) - if (Event.Read notin oe) and (Event.Read in ne): - modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil) - inc(s.count) - if (Event.Write notin oe) and (Event.Write in ne): - modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_ADD, 0, 0, nil) - inc(s.count) - s.fds[fdi].flags = ne - else: - raise newException(ValueError, "Maximum file descriptors exceeded") + s.checkMaxFd(fdi) + doAssert(s.fds[fdi].ident != 0) + doAssert(Event.flagHandle in s.fds[fdi].flags) + var ne = events + {Event.flagHandle} + var oe = s.fds[fdi].flags + if oe != ne: + if (Event.Read in oe) and (Event.Read notin ne): + modifyKQueue(s, fdi.uint, EVFILT_READ, EV_DELETE, 0, 0, nil) + dec(s.count) + if (Event.Write in oe) and (Event.Write notin ne): + modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_DELETE, 0, 0, nil) + dec(s.count) + if (Event.Read notin oe) and (Event.Read in ne): + modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil) + inc(s.count) + if (Event.Write notin oe) and (Event.Write in ne): + modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_ADD, 0, 0, nil) + inc(s.count) + s.fds[fdi].flags = ne proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool, data: T): int {.discardable.} = @@ -445,21 +447,19 @@ else: posix.IPPROTO_TCP).int if fdi == -1: raiseOsError(osLastError()) - if fdi.uint < s.maxFD: - doAssert(s.fds[fdi].ident == 0) - var mflags = if oneshot: {Event.flagTimer, Event.flagOneshot} - else: {Event.flagTimer} - var kflags: cushort = if oneshot: EV_ONESHOT or EV_ADD - else: EV_ADD - setKey(s, fdi, fdi, mflags, 0, data) - # EVFILT_TIMER on Open/Net(BSD) has granularity of only milliseconds, - # but MacOS and FreeBSD allow use `0` as `fflags` to use milliseconds - # too - modifyKQueue(s, fdi.uint, EVFILT_TIMER, kflags, 0, cint(timeout), nil) - inc(s.count) - result = fdi - else: - raise newException(ValueError, "Maximum file descriptors exceeded") + s.checkMaxFd(fdi) + doAssert(s.fds[fdi].ident == 0) + var mflags = if oneshot: {Event.flagTimer, Event.flagOneshot} + else: {Event.flagTimer} + var kflags: cushort = if oneshot: EV_ONESHOT or EV_ADD + else: EV_ADD + setKey(s, fdi, fdi, mflags, 0, data) + # EVFILT_TIMER on Open/Net(BSD) has granularity of only milliseconds, + # but MacOS and FreeBSD allow use `0` as `fflags` to use milliseconds + # too + modifyKQueue(s, fdi.uint, EVFILT_TIMER, kflags, 0, cint(timeout), nil) + inc(s.count) + result = fdi proc registerSignal*[T](s: Selector[T], signal: int, data: T): int {.discardable.} = @@ -468,24 +468,22 @@ else: if fdi == -1: raiseOsError(osLastError()) - if fdi.uint < s.maxFD: - doAssert(s.fds[fdi].ident == 0) - setKey(s, fdi, signal, {Event.flagSignal}, signal, data) - # block signal `signal` - var nmask: Sigset - var omask: Sigset - discard sigemptyset(nmask) - discard sigemptyset(omask) - discard sigaddset(nmask, cint(signal)) - blockSignals(nmask, omask) - # to be compatible with linux semantic we need to "eat" signals - posix.signal(cint(signal), SIG_IGN) - modifyKQueue(s, signal.uint, EVFILT_SIGNAL, EV_ADD, 0, 0, - cast[pointer](fdi)) - inc(s.count) - result = fdi - else: - raise newException(ValueError, "Maximum file descriptors exceeded") + s.checkMaxFd(fdi) + doAssert(s.fds[fdi].ident == 0) + setKey(s, fdi, signal, {Event.flagSignal}, signal, data) + # block signal `signal` + var nmask: Sigset + var omask: Sigset + discard sigemptyset(nmask) + discard sigemptyset(omask) + discard sigaddset(nmask, cint(signal)) + blockSignals(nmask, omask) + # to be compatible with linux semantic we need to "eat" signals + posix.signal(cint(signal), SIG_IGN) + modifyKQueue(s, signal.uint, EVFILT_SIGNAL, EV_ADD, 0, 0, + cast[pointer](fdi)) + inc(s.count) + result = fdi proc registerProcess*[T](s: Selector[T], pid: int, data: T): int {.discardable.} = @@ -494,16 +492,14 @@ else: if fdi == -1: raiseOsError(osLastError()) - if fdi.uint < s.maxFD: - doAssert(s.fds[fdi].ident == 0) - var kflags: cushort = EV_ONESHOT or EV_ADD - setKey(s, fdi, pid, {Event.flagProcess, Event.flagOneshot}, pid, data) - modifyKQueue(s, pid.uint, EVFILT_PROC, kflags, NOTE_EXIT, 0, - cast[pointer](fdi)) - inc(s.count) - result = fdi - else: - raise newException(ValueError, "Maximum file descriptors exceeded") + s.checkMaxFd(fdi) + doAssert(s.fds[fdi].ident == 0) + var kflags: cushort = EV_ONESHOT or EV_ADD + setKey(s, fdi, pid, {Event.flagProcess, Event.flagOneshot}, pid, data) + modifyKQueue(s, pid.uint, EVFILT_PROC, kflags, NOTE_EXIT, 0, + cast[pointer](fdi)) + inc(s.count) + result = fdi proc unregister*[T](s: Selector[T], fd: int|SocketHandle|cint) = var fdi = int(fd) @@ -592,12 +588,11 @@ else: proc unregister*[T](s: Selector[T], ev: SelectEvent) = let fdi = ev.rfd.int - if fdi.uint < s.maxFD: - var flags = s.fds[fdi].flags - if s.fds[fdi].ident != 0 and flags != {}: - modifyKQueue(s, fdi.uint, EVFILT_READ, EV_DELETE, 0, 0, nil) - dec(s.count) - clearKey(s, fdi) + var flags = s.fds[fdi].flags + if s.fds[fdi].ident != 0 and flags != {}: + modifyKQueue(s, fdi.uint, EVFILT_READ, EV_DELETE, 0, 0, nil) + dec(s.count) + clearKey(s, fdi) proc selectInto*[T](s: Selector[T], timeout: int, results: var openarray[ReadyKey[T]]): int = @@ -710,6 +705,7 @@ else: elif defined(linux): const + # Maximum number of events that can be returned MAX_EPOLL_RESULT_EVENTS = 64 type SignalFdInfo* {.importc: "struct signalfd_siginfo", @@ -772,7 +768,6 @@ else: proc eventfd(count: cuint, flags: cint): cint {.cdecl, importc: "eventfd", header: "<sys/eventfd.h>".} - type SelectorImpl[T] = object epollFD : cint @@ -807,60 +802,56 @@ else: proc registerHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event], data: T) = var fdi = int(fd) - if fdi.uint < s.maxFD: - doAssert(s.fds[fdi].ident == 0) - setKey(s, fdi, fdi, events + {Event.flagHandle}, 0, data) - if events != {}: - var epv: epoll_event - epv.events = EPOLLRDHUP - epv.data.u64 = fdi.uint - if Event.Read in events: - epv.events = epv.events or EPOLLIN - if Event.Write in events: - epv.events = epv.events or EPOLLOUT - if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: - raiseOSError(osLastError()) - inc(s.count) - else: - raise newException(ValueError, "Maximum file descriptors exceeded") + s.checkMaxFd(fdi) + doAssert(s.fds[fdi].ident == 0) + setKey(s, fdi, fdi, events + {Event.flagHandle}, 0, data) + if events != {}: + var epv: epoll_event + epv.events = EPOLLRDHUP + epv.data.u64 = fdi.uint + if Event.Read in events: + epv.events = epv.events or EPOLLIN + if Event.Write in events: + epv.events = epv.events or EPOLLOUT + if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: + raiseOSError(osLastError()) + inc(s.count) proc updateHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event]) = var fdi = int(fd) - if fdi.uint < s.maxFD: - var oe = s.fds[fdi].flags - doAssert(s.fds[fdi].ident != 0) - doAssert(Event.flagHandle in oe) - var ne = events + {Event.flagHandle} - if oe != ne: - var epv: epoll_event - epv.data.u64 = fdi.uint - epv.events = EPOLLRDHUP + s.checkMaxFd(fdi) + var oe = s.fds[fdi].flags + doAssert(s.fds[fdi].ident != 0) + doAssert(Event.flagHandle in oe) + var ne = events + {Event.flagHandle} + if oe != ne: + var epv: epoll_event + epv.data.u64 = fdi.uint + epv.events = EPOLLRDHUP - if Event.Read in events: - epv.events = epv.events or EPOLLIN - if Event.Write in events: - epv.events = epv.events or EPOLLOUT + if Event.Read in events: + epv.events = epv.events or EPOLLIN + if Event.Write in events: + epv.events = epv.events or EPOLLOUT - if oe == {Event.flagHandle}: - if ne != {Event.flagHandle}: - if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, - addr epv) == -1: - raiseOSError(osLastError()) - inc(s.count) + if oe == {Event.flagHandle}: + if ne != {Event.flagHandle}: + if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, + addr epv) == -1: + raiseOSError(osLastError()) + inc(s.count) + else: + if ne != {Event.flagHandle}: + if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fdi.cint, + addr epv) == -1: + raiseOSError(osLastError()) else: - if events != {}: - if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fdi.cint, - addr epv) == -1: - raiseOSError(osLastError()) - else: - if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, - addr epv) == -1: - raiseOSError(osLastError()) - dec(s.count) - s.fds[fdi].flags = ne - else: - raise newException(ValueError, "Maximum file descriptors exceeded") + if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, + addr epv) == -1: + raiseOSError(osLastError()) + dec(s.count) + s.fds[fdi].flags = ne proc unregister*[T](s: Selector[T], fd: int|SocketHandle|cint) = var epv: epoll_event @@ -924,88 +915,88 @@ else: var fdi = timerfd_create(CLOCK_MONOTONIC, 0) if fdi == -1: raiseOSError(osLastError()) - if fdi.uint < s.maxFD: - doAssert(s.fds[fdi].ident == 0) - var flags = {Event.flagTimer} - var epv: epoll_event - epv.data.u64 = fdi.uint - epv.events = EPOLLIN or EPOLLRDHUP - setNonBlocking(fdi.cint) - if oneshot: - new_ts.it_interval.tv_sec = 0.Time - new_ts.it_interval.tv_nsec = 0 - new_ts.it_value.tv_sec = (timeout div 1_000).Time - new_ts.it_value.tv_nsec = (timeout %% 1_000) * 1_000_000 - flags = flags + {Event.flagOneshot} - epv.events = epv.events or EPOLLONESHOT - else: - new_ts.it_interval.tv_sec = (timeout div 1000).Time - new_ts.it_interval.tv_nsec = (timeout %% 1_000) * 1_000_000 - new_ts.it_value.tv_sec = new_ts.it_interval.tv_sec - new_ts.it_value.tv_nsec = new_ts.it_interval.tv_nsec - if timerfd_settime(fdi.cint, cint(0), new_ts, old_ts) == -1: - raiseOSError(osLastError()) - if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: - raiseOSError(osLastError()) - setKey(s, fdi, fdi, flags, 0, data) - inc(s.count) - result = fdi + s.checkMaxFd(fdi) + doAssert(s.fds[fdi].ident == 0) + var flags = {Event.flagTimer} + var epv: epoll_event + epv.data.u64 = fdi.uint + epv.events = EPOLLIN or EPOLLRDHUP + setNonBlocking(fdi.cint) + if oneshot: + new_ts.it_interval.tv_sec = 0.Time + new_ts.it_interval.tv_nsec = 0 + new_ts.it_value.tv_sec = (timeout div 1_000).Time + new_ts.it_value.tv_nsec = (timeout %% 1_000) * 1_000_000 + flags = flags + {Event.flagOneshot} + epv.events = epv.events or EPOLLONESHOT else: - raise newException(ValueError, "Maximum file descriptors exceeded") + new_ts.it_interval.tv_sec = (timeout div 1000).Time + new_ts.it_interval.tv_nsec = (timeout %% 1_000) * 1_000_000 + new_ts.it_value.tv_sec = new_ts.it_interval.tv_sec + new_ts.it_value.tv_nsec = new_ts.it_interval.tv_nsec + if timerfd_settime(fdi.cint, cint(0), new_ts, old_ts) == -1: + raiseOSError(osLastError()) + if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: + raiseOSError(osLastError()) + setKey(s, fdi, fdi, flags, 0, data) + inc(s.count) + result = fdi proc registerSignal*[T](s: Selector[T], signal: int, data: T): int {.discardable.} = var nmask: Sigset omask: Sigset - fdi: cint + discard sigemptyset(nmask) discard sigemptyset(omask) - discard sigaddset(nmask, signal.cint) + discard sigaddset(nmask, cint(signal)) blockSignals(nmask, omask) - fdi = signalfd(-1, nmask, 0) + + var fdi = signalfd(-1, nmask, 0).int if fdi == -1: raiseOSError(osLastError()) - if fdi.uint < s.maxFD: - doAssert(s.fds[fdi].ident == 0) - setNonBlocking(fdi.cint) - var epv: epoll_event - epv.data.u64 = fdi.uint - epv.events = EPOLLIN or EPOLLRDHUP - if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: - raiseOSError(osLastError()) - setKey(s, fdi, signal, {Event.flagSignal}, signal, data) - inc(s.count) - result = fdi - else: - raise newException(ValueError, "Maximum file descriptors exceeded") + + s.checkMaxFd(fdi) + doAssert(s.fds[fdi].ident == 0) + setNonBlocking(fdi.cint) + + var epv: epoll_event + epv.data.u64 = fdi.uint + epv.events = EPOLLIN or EPOLLRDHUP + if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: + raiseOSError(osLastError()) + setKey(s, fdi, signal, {Event.flagSignal}, signal, data) + inc(s.count) + result = fdi proc registerProcess*[T](s: Selector, pid: int, data: T): int {.discardable.} = var nmask: Sigset omask: Sigset - fd: int + discard sigemptyset(nmask) discard sigemptyset(omask) discard sigaddset(nmask, posix.SIGCHLD) blockSignals(nmask, omask) - var fdi = signalfd(-1, nmask, 0) - if fd == -1: + + var fdi = signalfd(-1, nmask, 0).int + if fdi == -1: raiseOSError(osLastError()) - if fdi.uint < s.maxFD: - doAssert(s.fds[fdi].ident == 0) - setNonBlocking(fdi.cint) - var epv: epoll_event - epv.data.u64 = fdi.uint - epv.events = EPOLLIN or EPOLLRDHUP - if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: - raiseOSError(osLastError()) - setKey(s, fdi, pid, {Event.flagProcess}, pid, data) - inc(s.count) - result = fdi - else: - raise newException(ValueError, "Maximum file descriptors exceeded") + + s.checkMaxFd(fdi) + doAssert(s.fds[fdi].ident == 0) + setNonBlocking(fdi.cint) + + var epv: epoll_event + epv.data.u64 = fdi.uint + epv.events = EPOLLIN or EPOLLRDHUP + if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) == -1: + raiseOSError(osLastError()) + setKey(s, fdi, pid, {Event.flagProcess}, pid, data) + inc(s.count) + result = fdi proc flush*[T](s: Selector[T]) = discard @@ -1502,6 +1493,7 @@ else: # else: + # Maximum number of events that can be returned const MAX_POLL_RESULT_EVENTS = 64 type @@ -1600,29 +1592,25 @@ else: proc registerHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event], data: T) = var fdi = int(fd) - if fdi.uint < s.maxFD: - doAssert(s.fds[fdi].ident == 0) - setKey(s, fdi, fdi, {Event.flagHandle} + events, 0, data) - s.pollAdd(fdi.cint, events) - else: - raise newException(ValueError, "Maximum file descriptors exceeded") + s.checkMaxFd(fdi) + doAssert(s.fds[fdi].ident == 0) + setKey(s, fdi, fdi, {Event.flagHandle} + events, 0, data) + s.pollAdd(fdi.cint, events) proc updateHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event]) = var fdi = int(fd) - if fdi.uint < s.maxFD: - var oe = s.fds[fdi].flags - doAssert(s.fds[fdi].ident != 0) - doAssert(Event.flagHandle in oe) - var ne = events + {Event.flagHandle} - if ne != oe: - if events != {}: - s.pollUpdate(fd.cint, events) - else: - s.pollRemove(fd.cint) - s.fds[fdi].flags = ne - else: - raise newException(ValueError, "Maximum file descriptors exceeded") + s.checkMaxFd(fdi) + var oe = s.fds[fdi].flags + doAssert(s.fds[fdi].ident != 0) + doAssert(Event.flagHandle in oe) + var ne = events + {Event.flagHandle} + if ne != oe: + if events != {}: + s.pollUpdate(fd.cint, events) + else: + s.pollRemove(fd.cint) + s.fds[fdi].flags = ne proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool, data: T): int {.discardable.} = |