diff options
-rw-r--r-- | lib/pure/ioselects/ioselectors_epoll.nim | 13 | ||||
-rw-r--r-- | lib/pure/ioselects/ioselectors_kqueue.nim | 21 | ||||
-rw-r--r-- | lib/pure/ioselects/ioselectors_poll.nim | 17 | ||||
-rw-r--r-- | lib/pure/ioselects/ioselectors_select.nim | 13 | ||||
-rw-r--r-- | lib/upcoming/asyncdispatch.nim | 64 | ||||
-rw-r--r-- | tests/async/tupcoming_async.nim | 55 |
6 files changed, 113 insertions, 70 deletions
diff --git a/lib/pure/ioselects/ioselectors_epoll.nim b/lib/pure/ioselects/ioselectors_epoll.nim index f8feb7361..3a5cbc87a 100644 --- a/lib/pure/ioselects/ioselectors_epoll.nim +++ b/lib/pure/ioselects/ioselectors_epoll.nim @@ -165,7 +165,7 @@ proc close*(ev: SelectEvent) = template checkFd(s, f) = if f >= s.maxFD: - raiseIOSelectorsError("Maximum file descriptors exceeded") + raiseIOSelectorsError("Maximum number of descriptors is exhausted!") proc registerHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event], data: T) = @@ -188,7 +188,8 @@ proc updateHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event]) = let fdi = int(fd) s.checkFd(fdi) var pkey = addr(s.fds[fdi]) - doAssert(pkey.ident != 0) + doAssert(pkey.ident != 0, + "Descriptor [" & $fdi & "] is not registered in the queue!") doAssert(pkey.events * maskEvents == {}) if pkey.events != events: var epv = epoll_event(events: EPOLLRDHUP) @@ -215,8 +216,8 @@ proc unregister*[T](s: Selector[T], fd: int|SocketHandle) = let fdi = int(fd) s.checkFd(fdi) var pkey = addr(s.fds[fdi]) - doAssert(pkey.ident != 0) - + doAssert(pkey.ident != 0, + "Descriptor [" & $fdi & "] is not registered in the queue!") if pkey.events != {}: when not defined(android): if pkey.events * {Event.Read, Event.Write} != {}: @@ -277,7 +278,7 @@ proc unregister*[T](s: Selector[T], ev: SelectEvent) = let fdi = int(ev.efd) s.checkFd(fdi) var pkey = addr(s.fds[fdi]) - doAssert(pkey.ident != 0) + doAssert(pkey.ident != 0, "Event is not registered in the queue!") doAssert(Event.User in pkey.events) var epv = epoll_event() if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0: @@ -380,7 +381,7 @@ when not defined(android): proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) = let fdi = int(ev.efd) - doAssert(s.fds[fdi].ident == 0) + doAssert(s.fds[fdi].ident == 0, "Event is already registered in the queue!") s.setKey(fdi, {Event.User}, 0, data) var epv = epoll_event(events: EPOLLIN or EPOLLRDHUP) epv.data.u64 = ev.efd.uint diff --git a/lib/pure/ioselects/ioselectors_kqueue.nim b/lib/pure/ioselects/ioselectors_kqueue.nim index 3d2aae180..01b1b9586 100644 --- a/lib/pure/ioselects/ioselectors_kqueue.nim +++ b/lib/pure/ioselects/ioselectors_kqueue.nim @@ -119,12 +119,13 @@ proc newSelector*[T](): Selector[T] = result.maxFD = maxFD.int proc close*[T](s: Selector[T]) = - let res = posix.close(s.kqFD) + let res1 = posix.close(s.kqFD) + let res2 = posix.close(s.sock) when hasThreadSupport: deinitLock(s.changesLock) deallocSharedArray(s.fds) deallocShared(cast[pointer](s)) - if res != 0: + if res1 != 0 or res2 != 0: raiseIOSelectorsError(osLastError()) template clearKey[T](key: ptr SelectorKey[T]) = @@ -157,7 +158,7 @@ proc close*(ev: SelectEvent) = template checkFd(s, f) = if f >= s.maxFD: - raiseIOSelectorsError("Maximum file descriptors exceeded!") + raiseIOSelectorsError("Maximum number of descriptors is exhausted!") when hasThreadSupport: template withChangeLock[T](s: Selector[T], body: untyped) = @@ -241,7 +242,8 @@ proc updateHandle*[T](s: Selector[T], fd: SocketHandle, let fdi = int(fd) s.checkFd(fdi) var pkey = addr(s.fds[fdi]) - doAssert(pkey.ident != 0) + doAssert(pkey.ident != 0, + "Descriptor [" & $fdi & "] is not registered in the queue!") doAssert(pkey.events * maskEvents == {}) if pkey.events != events: @@ -329,7 +331,7 @@ proc registerProcess*[T](s: Selector[T], pid: int, proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) = let fdi = ev.rfd.int - doAssert(s.fds[fdi].ident == 0) + doAssert(s.fds[fdi].ident == 0, "Event is already registered in the queue!") setKey(s, fdi, {Event.User}, 0, data) modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil) @@ -372,7 +374,8 @@ proc unregister*[T](s: Selector[T], fd: int|SocketHandle) = let fdi = int(fd) s.checkFd(fdi) var pkey = addr(s.fds[fdi]) - doAssert(pkey.ident != 0) + doAssert(pkey.ident != 0, + "Descriptor [" & $fdi & "] is not registered in the queue!") if pkey.events != {}: if pkey.events * {Event.Read, Event.Write} != {}: @@ -431,9 +434,8 @@ proc unregister*[T](s: Selector[T], ev: SelectEvent) = let fdi = int(ev.rfd) s.checkFd(fdi) var pkey = addr(s.fds[fdi]) - doAssert(pkey.ident != 0) + doAssert(pkey.ident != 0, "Event is not registered in the queue!") doAssert(Event.User in pkey.events) - modifyKQueue(s, uint(fdi), EVFILT_READ, EV_DELETE, 0, 0, nil) when not declared(CACHE_EVENTS): flushKQueue(s) @@ -564,8 +566,7 @@ proc selectInto*[T](s: Selector[T], timeout: int, pkey.events.incl(Event.Finished) rkey.events.incl(Event.Process) else: - pkey = addr(s.fds[cast[int](kevent.udata)]) - raiseIOSelectorsError("Unsupported kqueue filter in queue!") + doAssert(true, "Unsupported kqueue filter in the queue!") if (kevent.flags and EV_EOF) != 0: rkey.events.incl(Event.Error) diff --git a/lib/pure/ioselects/ioselectors_poll.nim b/lib/pure/ioselects/ioselectors_poll.nim index 9c6f9796f..1b90e0806 100644 --- a/lib/pure/ioselects/ioselectors_poll.nim +++ b/lib/pure/ioselects/ioselectors_poll.nim @@ -115,9 +115,8 @@ template pollUpdate[T](s: Selector[T], sock: cint, events: set[Event]) = s.pollfds[i].events = pollev break inc(i) - - if i == s.pollcnt: - raiseIOSelectorsError("Descriptor is not registered in queue") + doAssert(i < s.pollcnt, + "Descriptor [" & $sock & "] is not registered in the queue!") template pollRemove[T](s: Selector[T], sock: cint) = withPollLock(s): @@ -140,7 +139,7 @@ template pollRemove[T](s: Selector[T], sock: cint) = template checkFd(s, f) = if f >= s.maxFD: - raiseIOSelectorsError("Descriptor is not registered in queue") + raiseIOSelectorsError("Maximum number of descriptors is exhausted!") proc registerHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event], data: T) = @@ -157,7 +156,8 @@ proc updateHandle*[T](s: Selector[T], fd: SocketHandle, let fdi = int(fd) s.checkFd(fdi) var pkey = addr(s.fds[fdi]) - doAssert(pkey.ident != 0) + doAssert(pkey.ident != 0, + "Descriptor [" & $fdi & "] is not registered in the queue!") doAssert(pkey.events * maskEvents == {}) if pkey.events != events: @@ -172,7 +172,7 @@ proc updateHandle*[T](s: Selector[T], fd: SocketHandle, proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) = var fdi = int(ev.rfd) - doAssert(s.fds[fdi].ident == 0) + doAssert(s.fds[fdi].ident == 0, "Event is already registered in the queue!") var events = {Event.User} setKey(s, fdi, events, 0, data) events.incl(Event.Read) @@ -182,7 +182,8 @@ proc unregister*[T](s: Selector[T], fd: int|SocketHandle) = let fdi = int(fd) s.checkFd(fdi) var pkey = addr(s.fds[fdi]) - doAssert(pkey.ident != 0) + doAssert(pkey.ident != 0, + "Descriptor [" & $fdi & "] is not registered in the queue!") pkey.ident = 0 pkey.events = {} s.pollRemove(fdi.cint) @@ -191,7 +192,7 @@ proc unregister*[T](s: Selector[T], ev: SelectEvent) = let fdi = int(ev.rfd) s.checkFd(fdi) var pkey = addr(s.fds[fdi]) - doAssert(pkey.ident != 0) + doAssert(pkey.ident != 0, "Event is not registered in the queue!") doAssert(Event.User in pkey.events) pkey.ident = 0 pkey.events = {} diff --git a/lib/pure/ioselects/ioselectors_select.nim b/lib/pure/ioselects/ioselectors_select.nim index 7a7d23982..dc3451d52 100644 --- a/lib/pure/ioselects/ioselectors_select.nim +++ b/lib/pure/ioselects/ioselectors_select.nim @@ -202,8 +202,8 @@ proc setSelectKey[T](s: Selector[T], fd: SocketHandle, events: set[Event], pkey.data = data break inc(i) - if i == FD_SETSIZE: - raiseIOSelectorsError("Maximum numbers of fds exceeded") + if i >= FD_SETSIZE: + raiseIOSelectorsError("Maximum number of descriptors is exhausted!") proc getKey[T](s: Selector[T], fd: SocketHandle): ptr SelectorKey[T] = var i = 0 @@ -213,8 +213,8 @@ proc getKey[T](s: Selector[T], fd: SocketHandle): ptr SelectorKey[T] = result = addr(s.fds[i]) break inc(i) - if i == FD_SETSIZE: - raiseIOSelectorsError("Descriptor not registered in queue") + doAssert(i < FD_SETSIZE, + "Descriptor [" & $int(fd) & "] is not registered in the queue!") proc delKey[T](s: Selector[T], fd: SocketHandle) = var empty: T @@ -226,8 +226,8 @@ proc delKey[T](s: Selector[T], fd: SocketHandle) = s.fds[i].data = empty break inc(i) - if i == FD_SETSIZE: - raiseIOSelectorsError("Descriptor not registered in queue") + doAssert(i < FD_SETSIZE, + "Descriptor [" & $int(fd) & "] is not registered in the queue!") proc registerHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event], data: T) = @@ -294,6 +294,7 @@ proc unregister*[T](s: Selector[T], fd: SocketHandle) = proc unregister*[T](s: Selector[T], ev: SelectEvent) = let fd = ev.rsock s.withSelectLock(): + var pkey = s.getKey(fd) IOFD_CLR(fd, addr s.rSet) dec(s.count) s.delKey(fd) diff --git a/lib/upcoming/asyncdispatch.nim b/lib/upcoming/asyncdispatch.nim index 31aa6c9cb..1dfd0122a 100644 --- a/lib/upcoming/asyncdispatch.nim +++ b/lib/upcoming/asyncdispatch.nim @@ -1056,16 +1056,14 @@ when defined(windows) or defined(nimdoc): proc unregister*(ev: AsyncEvent) = ## Unregisters event ``ev``. - if ev.hWaiter != 0: - let p = getGlobalDispatcher() - p.handles.excl(AsyncFD(ev.hEvent)) - if unregisterWait(ev.hWaiter) == 0: - let err = osLastError() - if err.int32 != ERROR_IO_PENDING: - raiseOSError(err) - ev.hWaiter = 0 - else: - raise newException(ValueError, "Event is not registered!") + doAssert(ev.hWaiter != 0, "Event is not registered in the queue!") + let p = getGlobalDispatcher() + p.handles.excl(AsyncFD(ev.hEvent)) + if unregisterWait(ev.hWaiter) == 0: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + raiseOSError(err) + ev.hWaiter = 0 proc close*(ev: AsyncEvent) = ## Closes event ``ev``. @@ -1076,8 +1074,7 @@ when defined(windows) or defined(nimdoc): proc addEvent*(ev: AsyncEvent, cb: Callback) = ## Registers callback ``cb`` to be called when ``ev`` will be signaled - if ev.hWaiter != 0: - raise newException(ValueError, "Event is already registered!") + doAssert(ev.hWaiter == 0, "Event is already registered in the queue!") let p = getGlobalDispatcher() let hEvent = ev.hEvent @@ -1086,17 +1083,22 @@ when defined(windows) or defined(nimdoc): var flags = WT_EXECUTEINWAITTHREAD.Dword proc eventcb(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = - if cb(fd): - # we need this check to avoid exception, if `unregister(event)` was - # called in callback. - deallocShared(cast[pointer](pcd)) - if ev.hWaiter != 0: unregister(ev) + if ev.hWaiter != 0: + if cb(fd): + # we need this check to avoid exception, if `unregister(event)` was + # called in callback. + deallocShared(cast[pointer](pcd)) + if ev.hWaiter != 0: + unregister(ev) + else: + # if callback returned `false`, then it wants to be called again, so + # we need to ref and protect `pcd.ovl` again, because it will be + # unrefed and disposed in `poll()`. + GC_ref(pcd.ovl) + pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb)) else: - # if callback returned `false`, then it wants to be called again, so - # we need to ref and protect `pcd.ovl` again, because it will be - # unrefed and disposed in `poll()`. - GC_ref(pcd.ovl) - pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb)) + # if ev.hWaiter == 0, then event was unregistered before `poll()` call. + deallocShared(cast[pointer](pcd)) registerWaitableHandle(p, hEvent, flags, pcd, INFINITE, eventcb) ev.hWaiter = pcd.waitFd @@ -1205,7 +1207,7 @@ else: not p.selector.isEmpty() or p.timers.len != 0 or p.callbacks.len != 0 template processBasicCallbacks(ident, rwlist: untyped) = - # Process pending descriptor's callbacks. + # Process pending descriptor's and AsyncEvent callbacks. # Invoke every callback stored in `rwlist`, until first one # returned `false`, which means callback wants to stay # alive. In such case all remaining callbacks will be added @@ -1232,6 +1234,8 @@ else: withData(p.selector, ident, adata) do: adata.rwlist = newList & adata.rwlist + rLength = len(adata.readList) + wLength = len(adata.writeList) template processCustomCallbacks(ident: untyped) = # Process pending custom event callbacks. Custom events are @@ -1275,6 +1279,8 @@ else: var custom = false let fd = keys[i].fd let events = keys[i].events + var rLength = 0 # len(data.readList) after callback + var wLength = 0 # len(data.writeList) after callback if Event.Read in events or events == {Event.Error}: processBasicCallbacks(fd, readList) @@ -1283,8 +1289,10 @@ else: processBasicCallbacks(fd, writeList) if Event.User in events or events == {Event.Error}: - custom = true processBasicCallbacks(fd, readList) + custom = true + if rLength == 0: + p.selector.unregister(fd) when ioselSupportedPlatform: if (customSet * events) != {}: @@ -1296,10 +1304,12 @@ else: if not custom: var update = false var newEvents: set[Event] = {} - p.selector.withData(fd, adata) do: - if len(adata.readList) > 0: incl(newEvents, Event.Read) - if len(adata.writeList) > 0: incl(newEvents, Event.Write) + if rLength > 0: + update = true + incl(newEvents, Event.Read) + if wLength > 0: update = true + incl(newEvents, Event.Write) if update: p.selector.updateHandle(SocketHandle(fd), newEvents) inc(i) diff --git a/tests/async/tupcoming_async.nim b/tests/async/tupcoming_async.nim index 7d255f213..0fe9f08a5 100644 --- a/tests/async/tupcoming_async.nim +++ b/tests/async/tupcoming_async.nim @@ -1,9 +1,6 @@ discard """ output: ''' OK -OK -OK -OK ''' """ @@ -31,11 +28,39 @@ when defined(upcoming): var fut = waitEvent(event) asyncCheck(delayedSet(event, 500)) waitFor(fut or sleepAsync(1000)) - if fut.finished: - echo "OK" - else: + if not fut.finished: echo "eventTest: Timeout expired before event received!" + proc eventTest5304() = + # Event should not be signaled if it was uregistered, + # even in case, when poll() was not called yet. + # Issue #5304. + var unregistered = false + let e = newAsyncEvent() + addEvent(e) do (fd: AsyncFD) -> bool: + assert(not unregistered) + e.setEvent() + e.unregister() + unregistered = true + poll() + + proc eventTest5298() = + # Event must raise `AssertionError` if event was unregistered twice. + # Issue #5298. + let e = newAsyncEvent() + var eventReceived = false + addEvent(e) do (fd: AsyncFD) -> bool: + eventReceived = true + return true + e.setEvent() + while not eventReceived: + poll() + try: + e.unregister() + except AssertionError: + discard + e.close() + when ioselSupportedPlatform or defined(windows): import osproc @@ -56,7 +81,6 @@ when defined(upcoming): proc timerTest() = waitFor(waitTimer(200)) - echo "OK" proc processTest() = when defined(windows): @@ -70,7 +94,7 @@ when defined(upcoming): var fut = waitProcess(process) waitFor(fut or waitTimer(2000)) if fut.finished and process.peekExitCode() == 0: - echo "OK" + discard else: echo "processTest: Timeout expired before process exited!" @@ -92,23 +116,28 @@ when defined(upcoming): var fut = waitSignal(posix.SIGINT) asyncCheck(delayedSignal(posix.SIGINT, 500)) waitFor(fut or waitTimer(1000)) - if fut.finished: - echo "OK" - else: + if not fut.finished: echo "signalTest: Timeout expired before signal received!" when ioselSupportedPlatform: timerTest() eventTest() + eventTest5304() + eventTest5298() processTest() signalTest() + echo "OK" elif defined(windows): timerTest() eventTest() + eventTest5304() + eventTest5298() processTest() echo "OK" else: eventTest() - echo "OK\nOK\nOK" + eventTest5304() + eventTest5298() + echo "OK" else: - echo "OK\nOK\nOK\nOK" + echo "OK" |