summary refs log tree commit diff stats
diff options
context:
space:
mode:
authorEugene Kabanov <ka@hardcore.kiev.ua>2017-02-01 13:12:26 +0200
committerAndreas Rumpf <rumpf_a@web.de>2017-02-01 12:12:26 +0100
commitd90f3f59aca668d3000d6fd64199bfe720240911 (patch)
tree78a764c48eef8a9e10e11e690cfa95e13cba2288
parent3c773c189fc4ba4a639a1ca2d910d5a5c6e13b21 (diff)
downloadNim-d90f3f59aca668d3000d6fd64199bfe720240911.tar.gz
Fixes for upcoming asyncdispatch and ioselectors. (#5309)
-rw-r--r--lib/pure/ioselects/ioselectors_epoll.nim13
-rw-r--r--lib/pure/ioselects/ioselectors_kqueue.nim21
-rw-r--r--lib/pure/ioselects/ioselectors_poll.nim17
-rw-r--r--lib/pure/ioselects/ioselectors_select.nim13
-rw-r--r--lib/upcoming/asyncdispatch.nim64
-rw-r--r--tests/async/tupcoming_async.nim55
6 files changed, 113 insertions, 70 deletions
diff --git a/lib/pure/ioselects/ioselectors_epoll.nim b/lib/pure/ioselects/ioselectors_epoll.nim
index f8feb7361..3a5cbc87a 100644
--- a/lib/pure/ioselects/ioselectors_epoll.nim
+++ b/lib/pure/ioselects/ioselectors_epoll.nim
@@ -165,7 +165,7 @@ proc close*(ev: SelectEvent) =
 
 template checkFd(s, f) =
   if f >= s.maxFD:
-    raiseIOSelectorsError("Maximum file descriptors exceeded")
+    raiseIOSelectorsError("Maximum number of descriptors is exhausted!")
 
 proc registerHandle*[T](s: Selector[T], fd: SocketHandle,
                         events: set[Event], data: T) =
@@ -188,7 +188,8 @@ proc updateHandle*[T](s: Selector[T], fd: SocketHandle, events: set[Event]) =
   let fdi = int(fd)
   s.checkFd(fdi)
   var pkey = addr(s.fds[fdi])
-  doAssert(pkey.ident != 0)
+  doAssert(pkey.ident != 0,
+           "Descriptor [" & $fdi & "] is not registered in the queue!")
   doAssert(pkey.events * maskEvents == {})
   if pkey.events != events:
     var epv = epoll_event(events: EPOLLRDHUP)
@@ -215,8 +216,8 @@ proc unregister*[T](s: Selector[T], fd: int|SocketHandle) =
   let fdi = int(fd)
   s.checkFd(fdi)
   var pkey = addr(s.fds[fdi])
-  doAssert(pkey.ident != 0)
-
+  doAssert(pkey.ident != 0,
+           "Descriptor [" & $fdi & "] is not registered in the queue!")
   if pkey.events != {}:
     when not defined(android):
       if pkey.events * {Event.Read, Event.Write} != {}:
@@ -277,7 +278,7 @@ proc unregister*[T](s: Selector[T], ev: SelectEvent) =
   let fdi = int(ev.efd)
   s.checkFd(fdi)
   var pkey = addr(s.fds[fdi])
-  doAssert(pkey.ident != 0)
+  doAssert(pkey.ident != 0, "Event is not registered in the queue!")
   doAssert(Event.User in pkey.events)
   var epv = epoll_event()
   if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
@@ -380,7 +381,7 @@ when not defined(android):
 
 proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
   let fdi = int(ev.efd)
-  doAssert(s.fds[fdi].ident == 0)
+  doAssert(s.fds[fdi].ident == 0, "Event is already registered in the queue!")
   s.setKey(fdi, {Event.User}, 0, data)
   var epv = epoll_event(events: EPOLLIN or EPOLLRDHUP)
   epv.data.u64 = ev.efd.uint
diff --git a/lib/pure/ioselects/ioselectors_kqueue.nim b/lib/pure/ioselects/ioselectors_kqueue.nim
index 3d2aae180..01b1b9586 100644
--- a/lib/pure/ioselects/ioselectors_kqueue.nim
+++ b/lib/pure/ioselects/ioselectors_kqueue.nim
@@ -119,12 +119,13 @@ proc newSelector*[T](): Selector[T] =
   result.maxFD = maxFD.int
 
 proc close*[T](s: Selector[T]) =
-  let res = posix.close(s.kqFD)
+  let res1 = posix.close(s.kqFD)
+  let res2 = posix.close(s.sock)
   when hasThreadSupport:
     deinitLock(s.changesLock)
     deallocSharedArray(s.fds)
     deallocShared(cast[pointer](s))
-  if res != 0:
+  if res1 != 0 or res2 != 0:
     raiseIOSelectorsError(osLastError())
 
 template clearKey[T](key: ptr SelectorKey[T]) =
@@ -157,7 +158,7 @@ proc close*(ev: SelectEvent) =
 
 template checkFd(s, f) =
   if f >= s.maxFD:
-    raiseIOSelectorsError("Maximum file descriptors exceeded!")
+    raiseIOSelectorsError("Maximum number of descriptors is exhausted!")
 
 when hasThreadSupport:
   template withChangeLock[T](s: Selector[T], body: untyped) =
@@ -241,7 +242,8 @@ proc updateHandle*[T](s: Selector[T], fd: SocketHandle,
   let fdi = int(fd)
   s.checkFd(fdi)
   var pkey = addr(s.fds[fdi])
-  doAssert(pkey.ident != 0)
+  doAssert(pkey.ident != 0,
+           "Descriptor [" & $fdi & "] is not registered in the queue!")
   doAssert(pkey.events * maskEvents == {})
 
   if pkey.events != events:
@@ -329,7 +331,7 @@ proc registerProcess*[T](s: Selector[T], pid: int,
 
 proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
   let fdi = ev.rfd.int
-  doAssert(s.fds[fdi].ident == 0)
+  doAssert(s.fds[fdi].ident == 0, "Event is already registered in the queue!")
   setKey(s, fdi, {Event.User}, 0, data)
 
   modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil)
@@ -372,7 +374,8 @@ proc unregister*[T](s: Selector[T], fd: int|SocketHandle) =
   let fdi = int(fd)
   s.checkFd(fdi)
   var pkey = addr(s.fds[fdi])
-  doAssert(pkey.ident != 0)
+  doAssert(pkey.ident != 0,
+           "Descriptor [" & $fdi & "] is not registered in the queue!")
 
   if pkey.events != {}:
     if pkey.events * {Event.Read, Event.Write} != {}:
@@ -431,9 +434,8 @@ proc unregister*[T](s: Selector[T], ev: SelectEvent) =
   let fdi = int(ev.rfd)
   s.checkFd(fdi)
   var pkey = addr(s.fds[fdi])
-  doAssert(pkey.ident != 0)
+  doAssert(pkey.ident != 0, "Event is not registered in the queue!")
   doAssert(Event.User in pkey.events)
-
   modifyKQueue(s, uint(fdi), EVFILT_READ, EV_DELETE, 0, 0, nil)
   when not declared(CACHE_EVENTS):
     flushKQueue(s)
@@ -564,8 +566,7 @@ proc selectInto*[T](s: Selector[T], timeout: int,
         pkey.events.incl(Event.Finished)
         rkey.events.incl(Event.Process)
       else:
-        pkey = addr(s.fds[cast[int](kevent.udata)])
-        raiseIOSelectorsError("Unsupported kqueue filter in queue!")
+        doAssert(true, "Unsupported kqueue filter in the queue!")
 
       if (kevent.flags and EV_EOF) != 0:
         rkey.events.incl(Event.Error)
diff --git a/lib/pure/ioselects/ioselectors_poll.nim b/lib/pure/ioselects/ioselectors_poll.nim
index 9c6f9796f..1b90e0806 100644
--- a/lib/pure/ioselects/ioselectors_poll.nim
+++ b/lib/pure/ioselects/ioselectors_poll.nim
@@ -115,9 +115,8 @@ template pollUpdate[T](s: Selector[T], sock: cint, events: set[Event]) =
         s.pollfds[i].events = pollev
         break
       inc(i)
-
-    if i == s.pollcnt:
-      raiseIOSelectorsError("Descriptor is not registered in queue")
+    doAssert(i < s.pollcnt,
+             "Descriptor [" & $sock & "] is not registered in the queue!")
 
 template pollRemove[T](s: Selector[T], sock: cint) =
   withPollLock(s):
@@ -140,7 +139,7 @@ template pollRemove[T](s: Selector[T], sock: cint) =
 
 template checkFd(s, f) =
   if f >= s.maxFD:
-    raiseIOSelectorsError("Descriptor is not registered in queue")
+    raiseIOSelectorsError("Maximum number of descriptors is exhausted!")
 
 proc registerHandle*[T](s: Selector[T], fd: SocketHandle,
                         events: set[Event], data: T) =
@@ -157,7 +156,8 @@ proc updateHandle*[T](s: Selector[T], fd: SocketHandle,
   let fdi = int(fd)
   s.checkFd(fdi)
   var pkey = addr(s.fds[fdi])
-  doAssert(pkey.ident != 0)
+  doAssert(pkey.ident != 0,
+           "Descriptor [" & $fdi & "] is not registered in the queue!")
   doAssert(pkey.events * maskEvents == {})
 
   if pkey.events != events:
@@ -172,7 +172,7 @@ proc updateHandle*[T](s: Selector[T], fd: SocketHandle,
 
 proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
   var fdi = int(ev.rfd)
-  doAssert(s.fds[fdi].ident == 0)
+  doAssert(s.fds[fdi].ident == 0, "Event is already registered in the queue!")
   var events = {Event.User}
   setKey(s, fdi, events, 0, data)
   events.incl(Event.Read)
@@ -182,7 +182,8 @@ proc unregister*[T](s: Selector[T], fd: int|SocketHandle) =
   let fdi = int(fd)
   s.checkFd(fdi)
   var pkey = addr(s.fds[fdi])
-  doAssert(pkey.ident != 0)
+  doAssert(pkey.ident != 0,
+           "Descriptor [" & $fdi & "] is not registered in the queue!")
   pkey.ident = 0
   pkey.events = {}
   s.pollRemove(fdi.cint)
@@ -191,7 +192,7 @@ proc unregister*[T](s: Selector[T], ev: SelectEvent) =
   let fdi = int(ev.rfd)
   s.checkFd(fdi)
   var pkey = addr(s.fds[fdi])
-  doAssert(pkey.ident != 0)
+  doAssert(pkey.ident != 0, "Event is not registered in the queue!")
   doAssert(Event.User in pkey.events)
   pkey.ident = 0
   pkey.events = {}
diff --git a/lib/pure/ioselects/ioselectors_select.nim b/lib/pure/ioselects/ioselectors_select.nim
index 7a7d23982..dc3451d52 100644
--- a/lib/pure/ioselects/ioselectors_select.nim
+++ b/lib/pure/ioselects/ioselectors_select.nim
@@ -202,8 +202,8 @@ proc setSelectKey[T](s: Selector[T], fd: SocketHandle, events: set[Event],
       pkey.data = data
       break
     inc(i)
-  if i == FD_SETSIZE:
-    raiseIOSelectorsError("Maximum numbers of fds exceeded")
+  if i >= FD_SETSIZE:
+    raiseIOSelectorsError("Maximum number of descriptors is exhausted!")
 
 proc getKey[T](s: Selector[T], fd: SocketHandle): ptr SelectorKey[T] =
   var i = 0
@@ -213,8 +213,8 @@ proc getKey[T](s: Selector[T], fd: SocketHandle): ptr SelectorKey[T] =
       result = addr(s.fds[i])
       break
     inc(i)
-  if i == FD_SETSIZE:
-    raiseIOSelectorsError("Descriptor not registered in queue")
+  doAssert(i < FD_SETSIZE,
+           "Descriptor [" & $int(fd) & "] is not registered in the queue!")
 
 proc delKey[T](s: Selector[T], fd: SocketHandle) =
   var empty: T
@@ -226,8 +226,8 @@ proc delKey[T](s: Selector[T], fd: SocketHandle) =
       s.fds[i].data = empty
       break
     inc(i)
-  if i == FD_SETSIZE:
-    raiseIOSelectorsError("Descriptor not registered in queue")
+  doAssert(i < FD_SETSIZE,
+           "Descriptor [" & $int(fd) & "] is not registered in the queue!")
 
 proc registerHandle*[T](s: Selector[T], fd: SocketHandle,
                         events: set[Event], data: T) =
@@ -294,6 +294,7 @@ proc unregister*[T](s: Selector[T], fd: SocketHandle) =
 proc unregister*[T](s: Selector[T], ev: SelectEvent) =
   let fd = ev.rsock
   s.withSelectLock():
+    var pkey = s.getKey(fd)
     IOFD_CLR(fd, addr s.rSet)
     dec(s.count)
     s.delKey(fd)
diff --git a/lib/upcoming/asyncdispatch.nim b/lib/upcoming/asyncdispatch.nim
index 31aa6c9cb..1dfd0122a 100644
--- a/lib/upcoming/asyncdispatch.nim
+++ b/lib/upcoming/asyncdispatch.nim
@@ -1056,16 +1056,14 @@ when defined(windows) or defined(nimdoc):
 
   proc unregister*(ev: AsyncEvent) =
     ## Unregisters event ``ev``.
-    if ev.hWaiter != 0:
-      let p = getGlobalDispatcher()
-      p.handles.excl(AsyncFD(ev.hEvent))
-      if unregisterWait(ev.hWaiter) == 0:
-        let err = osLastError()
-        if err.int32 != ERROR_IO_PENDING:
-          raiseOSError(err)
-      ev.hWaiter = 0
-    else:
-      raise newException(ValueError, "Event is not registered!")
+    doAssert(ev.hWaiter != 0, "Event is not registered in the queue!")
+    let p = getGlobalDispatcher()
+    p.handles.excl(AsyncFD(ev.hEvent))
+    if unregisterWait(ev.hWaiter) == 0:
+      let err = osLastError()
+      if err.int32 != ERROR_IO_PENDING:
+        raiseOSError(err)
+    ev.hWaiter = 0
 
   proc close*(ev: AsyncEvent) =
     ## Closes event ``ev``.
@@ -1076,8 +1074,7 @@ when defined(windows) or defined(nimdoc):
 
   proc addEvent*(ev: AsyncEvent, cb: Callback) =
     ## Registers callback ``cb`` to be called when ``ev`` will be signaled
-    if ev.hWaiter != 0:
-      raise newException(ValueError, "Event is already registered!")
+    doAssert(ev.hWaiter == 0, "Event is already registered in the queue!")
 
     let p = getGlobalDispatcher()
     let hEvent = ev.hEvent
@@ -1086,17 +1083,22 @@ when defined(windows) or defined(nimdoc):
     var flags = WT_EXECUTEINWAITTHREAD.Dword
 
     proc eventcb(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
-      if cb(fd):
-        # we need this check to avoid exception, if `unregister(event)` was
-        # called in callback.
-        deallocShared(cast[pointer](pcd))
-        if ev.hWaiter != 0: unregister(ev)
+      if ev.hWaiter != 0:
+        if cb(fd):
+          # we need this check to avoid exception, if `unregister(event)` was
+          # called in callback.
+          deallocShared(cast[pointer](pcd))
+          if ev.hWaiter != 0:
+            unregister(ev)
+        else:
+          # if callback returned `false`, then it wants to be called again, so
+          # we need to ref and protect `pcd.ovl` again, because it will be
+          # unrefed and disposed in `poll()`.
+          GC_ref(pcd.ovl)
+          pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
       else:
-        # if callback returned `false`, then it wants to be called again, so
-        # we need to ref and protect `pcd.ovl` again, because it will be
-        # unrefed and disposed in `poll()`.
-        GC_ref(pcd.ovl)
-        pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
+        # if ev.hWaiter == 0, then event was unregistered before `poll()` call.
+        deallocShared(cast[pointer](pcd))
 
     registerWaitableHandle(p, hEvent, flags, pcd, INFINITE, eventcb)
     ev.hWaiter = pcd.waitFd
@@ -1205,7 +1207,7 @@ else:
     not p.selector.isEmpty() or p.timers.len != 0 or p.callbacks.len != 0
 
   template processBasicCallbacks(ident, rwlist: untyped) =
-    # Process pending descriptor's callbacks.
+    # Process pending descriptor's and AsyncEvent callbacks.
     # Invoke every callback stored in `rwlist`, until first one
     # returned `false`, which means callback wants to stay
     # alive. In such case all remaining callbacks will be added
@@ -1232,6 +1234,8 @@ else:
 
     withData(p.selector, ident, adata) do:
       adata.rwlist = newList & adata.rwlist
+      rLength = len(adata.readList)
+      wLength = len(adata.writeList)
 
   template processCustomCallbacks(ident: untyped) =
     # Process pending custom event callbacks. Custom events are
@@ -1275,6 +1279,8 @@ else:
         var custom = false
         let fd = keys[i].fd
         let events = keys[i].events
+        var rLength = 0 # len(data.readList) after callback
+        var wLength = 0 # len(data.writeList) after callback
 
         if Event.Read in events or events == {Event.Error}:
           processBasicCallbacks(fd, readList)
@@ -1283,8 +1289,10 @@ else:
           processBasicCallbacks(fd, writeList)
 
         if Event.User in events or events == {Event.Error}:
-          custom = true
           processBasicCallbacks(fd, readList)
+          custom = true
+          if rLength == 0:
+            p.selector.unregister(fd)
 
         when ioselSupportedPlatform:
           if (customSet * events) != {}:
@@ -1296,10 +1304,12 @@ else:
         if not custom:
           var update = false
           var newEvents: set[Event] = {}
-          p.selector.withData(fd, adata) do:
-            if len(adata.readList) > 0: incl(newEvents, Event.Read)
-            if len(adata.writeList) > 0: incl(newEvents, Event.Write)
+          if rLength > 0:
+            update = true
+            incl(newEvents, Event.Read)
+          if wLength > 0:
             update = true
+            incl(newEvents, Event.Write)
           if update:
             p.selector.updateHandle(SocketHandle(fd), newEvents)
         inc(i)
diff --git a/tests/async/tupcoming_async.nim b/tests/async/tupcoming_async.nim
index 7d255f213..0fe9f08a5 100644
--- a/tests/async/tupcoming_async.nim
+++ b/tests/async/tupcoming_async.nim
@@ -1,9 +1,6 @@
 discard """
   output: '''
 OK
-OK
-OK
-OK
 '''
 """
 
@@ -31,11 +28,39 @@ when defined(upcoming):
     var fut = waitEvent(event)
     asyncCheck(delayedSet(event, 500))
     waitFor(fut or sleepAsync(1000))
-    if fut.finished:
-      echo "OK"
-    else:
+    if not fut.finished:
       echo "eventTest: Timeout expired before event received!"
 
+  proc eventTest5304() =
+    # Event should not be signaled if it was uregistered,
+    # even in case, when poll() was not called yet.
+    # Issue #5304.
+    var unregistered = false
+    let e = newAsyncEvent()
+    addEvent(e) do (fd: AsyncFD) -> bool:
+      assert(not unregistered)
+    e.setEvent()
+    e.unregister()
+    unregistered = true
+    poll()
+
+  proc eventTest5298() =
+    # Event must raise `AssertionError` if event was unregistered twice.
+    # Issue #5298.
+    let e = newAsyncEvent()
+    var eventReceived = false
+    addEvent(e) do (fd: AsyncFD) -> bool:
+      eventReceived = true
+      return true
+    e.setEvent()
+    while not eventReceived:
+      poll()
+    try:
+      e.unregister()
+    except AssertionError:
+      discard
+    e.close()
+
   when ioselSupportedPlatform or defined(windows):
 
     import osproc
@@ -56,7 +81,6 @@ when defined(upcoming):
 
     proc timerTest() =
       waitFor(waitTimer(200))
-      echo "OK"
 
     proc processTest() =
       when defined(windows):
@@ -70,7 +94,7 @@ when defined(upcoming):
       var fut = waitProcess(process)
       waitFor(fut or waitTimer(2000))
       if fut.finished and process.peekExitCode() == 0:
-        echo "OK"
+        discard
       else:
         echo "processTest: Timeout expired before process exited!"
 
@@ -92,23 +116,28 @@ when defined(upcoming):
       var fut = waitSignal(posix.SIGINT)
       asyncCheck(delayedSignal(posix.SIGINT, 500))
       waitFor(fut or waitTimer(1000))
-      if fut.finished:
-        echo "OK"
-      else:
+      if not fut.finished:
         echo "signalTest: Timeout expired before signal received!"
 
   when ioselSupportedPlatform:
     timerTest()
     eventTest()
+    eventTest5304()
+    eventTest5298()
     processTest()
     signalTest()
+    echo "OK"
   elif defined(windows):
     timerTest()
     eventTest()
+    eventTest5304()
+    eventTest5298()
     processTest()
     echo "OK"
   else:
     eventTest()
-    echo "OK\nOK\nOK"
+    eventTest5304()
+    eventTest5298()
+    echo "OK"
 else:
-  echo "OK\nOK\nOK\nOK"
+  echo "OK"