summary refs log tree commit diff stats
path: root/lib/upcoming
diff options
context:
space:
mode:
authorEugene Kabanov <ka@hardcore.kiev.ua>2017-02-01 13:12:26 +0200
committerAndreas Rumpf <rumpf_a@web.de>2017-02-01 12:12:26 +0100
commitd90f3f59aca668d3000d6fd64199bfe720240911 (patch)
tree78a764c48eef8a9e10e11e690cfa95e13cba2288 /lib/upcoming
parent3c773c189fc4ba4a639a1ca2d910d5a5c6e13b21 (diff)
downloadNim-d90f3f59aca668d3000d6fd64199bfe720240911.tar.gz
Fixes for upcoming asyncdispatch and ioselectors. (#5309)
Diffstat (limited to 'lib/upcoming')
-rw-r--r--lib/upcoming/asyncdispatch.nim64
1 files changed, 37 insertions, 27 deletions
diff --git a/lib/upcoming/asyncdispatch.nim b/lib/upcoming/asyncdispatch.nim
index 31aa6c9cb..1dfd0122a 100644
--- a/lib/upcoming/asyncdispatch.nim
+++ b/lib/upcoming/asyncdispatch.nim
@@ -1056,16 +1056,14 @@ when defined(windows) or defined(nimdoc):
 
   proc unregister*(ev: AsyncEvent) =
     ## Unregisters event ``ev``.
-    if ev.hWaiter != 0:
-      let p = getGlobalDispatcher()
-      p.handles.excl(AsyncFD(ev.hEvent))
-      if unregisterWait(ev.hWaiter) == 0:
-        let err = osLastError()
-        if err.int32 != ERROR_IO_PENDING:
-          raiseOSError(err)
-      ev.hWaiter = 0
-    else:
-      raise newException(ValueError, "Event is not registered!")
+    doAssert(ev.hWaiter != 0, "Event is not registered in the queue!")
+    let p = getGlobalDispatcher()
+    p.handles.excl(AsyncFD(ev.hEvent))
+    if unregisterWait(ev.hWaiter) == 0:
+      let err = osLastError()
+      if err.int32 != ERROR_IO_PENDING:
+        raiseOSError(err)
+    ev.hWaiter = 0
 
   proc close*(ev: AsyncEvent) =
     ## Closes event ``ev``.
@@ -1076,8 +1074,7 @@ when defined(windows) or defined(nimdoc):
 
   proc addEvent*(ev: AsyncEvent, cb: Callback) =
     ## Registers callback ``cb`` to be called when ``ev`` will be signaled
-    if ev.hWaiter != 0:
-      raise newException(ValueError, "Event is already registered!")
+    doAssert(ev.hWaiter == 0, "Event is already registered in the queue!")
 
     let p = getGlobalDispatcher()
     let hEvent = ev.hEvent
@@ -1086,17 +1083,22 @@ when defined(windows) or defined(nimdoc):
     var flags = WT_EXECUTEINWAITTHREAD.Dword
 
     proc eventcb(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
-      if cb(fd):
-        # we need this check to avoid exception, if `unregister(event)` was
-        # called in callback.
-        deallocShared(cast[pointer](pcd))
-        if ev.hWaiter != 0: unregister(ev)
+      if ev.hWaiter != 0:
+        if cb(fd):
+          # we need this check to avoid exception, if `unregister(event)` was
+          # called in callback.
+          deallocShared(cast[pointer](pcd))
+          if ev.hWaiter != 0:
+            unregister(ev)
+        else:
+          # if callback returned `false`, then it wants to be called again, so
+          # we need to ref and protect `pcd.ovl` again, because it will be
+          # unrefed and disposed in `poll()`.
+          GC_ref(pcd.ovl)
+          pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
       else:
-        # if callback returned `false`, then it wants to be called again, so
-        # we need to ref and protect `pcd.ovl` again, because it will be
-        # unrefed and disposed in `poll()`.
-        GC_ref(pcd.ovl)
-        pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
+        # if ev.hWaiter == 0, then event was unregistered before `poll()` call.
+        deallocShared(cast[pointer](pcd))
 
     registerWaitableHandle(p, hEvent, flags, pcd, INFINITE, eventcb)
     ev.hWaiter = pcd.waitFd
@@ -1205,7 +1207,7 @@ else:
     not p.selector.isEmpty() or p.timers.len != 0 or p.callbacks.len != 0
 
   template processBasicCallbacks(ident, rwlist: untyped) =
-    # Process pending descriptor's callbacks.
+    # Process pending descriptor's and AsyncEvent callbacks.
     # Invoke every callback stored in `rwlist`, until first one
     # returned `false`, which means callback wants to stay
     # alive. In such case all remaining callbacks will be added
@@ -1232,6 +1234,8 @@ else:
 
     withData(p.selector, ident, adata) do:
       adata.rwlist = newList & adata.rwlist
+      rLength = len(adata.readList)
+      wLength = len(adata.writeList)
 
   template processCustomCallbacks(ident: untyped) =
     # Process pending custom event callbacks. Custom events are
@@ -1275,6 +1279,8 @@ else:
         var custom = false
         let fd = keys[i].fd
         let events = keys[i].events
+        var rLength = 0 # len(data.readList) after callback
+        var wLength = 0 # len(data.writeList) after callback
 
         if Event.Read in events or events == {Event.Error}:
           processBasicCallbacks(fd, readList)
@@ -1283,8 +1289,10 @@ else:
           processBasicCallbacks(fd, writeList)
 
         if Event.User in events or events == {Event.Error}:
-          custom = true
           processBasicCallbacks(fd, readList)
+          custom = true
+          if rLength == 0:
+            p.selector.unregister(fd)
 
         when ioselSupportedPlatform:
           if (customSet * events) != {}:
@@ -1296,10 +1304,12 @@ else:
         if not custom:
           var update = false
           var newEvents: set[Event] = {}
-          p.selector.withData(fd, adata) do:
-            if len(adata.readList) > 0: incl(newEvents, Event.Read)
-            if len(adata.writeList) > 0: incl(newEvents, Event.Write)
+          if rLength > 0:
+            update = true
+            incl(newEvents, Event.Read)
+          if wLength > 0:
             update = true
+            incl(newEvents, Event.Write)
           if update:
             p.selector.updateHandle(SocketHandle(fd), newEvents)
         inc(i)