summary refs log tree commit diff stats
path: root/lib/upcoming/asyncdispatch.nim
diff options
context:
space:
mode:
Diffstat (limited to 'lib/upcoming/asyncdispatch.nim')
-rw-r--r--lib/upcoming/asyncdispatch.nim59
1 files changed, 34 insertions, 25 deletions
diff --git a/lib/upcoming/asyncdispatch.nim b/lib/upcoming/asyncdispatch.nim
index ce44e8a6a..72ba4efd6 100644
--- a/lib/upcoming/asyncdispatch.nim
+++ b/lib/upcoming/asyncdispatch.nim
@@ -1163,7 +1163,7 @@ when defined(windows) or defined(nimdoc):
     ## receiving notifies.
     registerWaitableEvent(FD_WRITE or FD_CONNECT or FD_CLOSE)
 
-  template registerWaitableHandle(p, hEvent, flags, pcd, handleCallback) =
+  template registerWaitableHandle(p, hEvent, flags, pcd, timeout, handleCallback) =
     let handleFD = AsyncFD(hEvent)
     pcd.ioPort = p.ioPort
     pcd.handleFd = handleFD
@@ -1177,10 +1177,10 @@ when defined(windows) or defined(nimdoc):
     pcd.ovl = ol
     if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
                                     cast[WAITORTIMERCALLBACK](waitableCallback),
-                                       cast[pointer](pcd), INFINITE, flags):
+                                       cast[pointer](pcd), timeout.Dword, flags):
       GC_unref(ol)
       deallocShared(cast[pointer](pcd))
-      discard wsaCloseEvent(hEvent)
+      discard closeHandle(hEvent)
       raiseOSError(osLastError())
     p.handles.incl(handleFD)
 
@@ -1212,7 +1212,7 @@ when defined(windows) or defined(nimdoc):
         deallocShared(cast[pointer](pcd))
         p.handles.excl(fd)
 
-    registerWaitableHandle(p, hEvent, flags, pcd, timercb)
+    registerWaitableHandle(p, hEvent, flags, pcd, timeout, timercb)
 
   proc addProcess*(pid: int, cb: Callback) =
     ## Registers callback ``cb`` to be called when process with pid ``pid``
@@ -1236,10 +1236,12 @@ when defined(windows) or defined(nimdoc):
       p.handles.excl(fd)
       discard cb(fd)
 
-    registerWaitableHandle(p, hProcess, flags, pcd, proccb)
+    registerWaitableHandle(p, hProcess, flags, pcd, INFINITE, proccb)
 
   proc newAsyncEvent*(): AsyncEvent =
     ## Creates new ``AsyncEvent`` object.
+    ## New ``AsyncEvent`` object is not automatically registered with
+    ## dispatcher like ``AsyncSocket``.
     var sa = SECURITY_ATTRIBUTES(
       nLength: sizeof(SECURITY_ATTRIBUTES).cint,
       bInheritHandle: 1
@@ -1248,14 +1250,15 @@ when defined(windows) or defined(nimdoc):
     if event == INVALID_HANDLE_VALUE:
       raiseOSError(osLastError())
     result = cast[AsyncEvent](allocShared0(sizeof(AsyncEventImpl)))
+    result.hEvent = event
 
   proc setEvent*(ev: AsyncEvent) =
     ## Set event ``ev`` to signaled state.
     if setEvent(ev.hEvent) == 0:
       raiseOSError(osLastError())
 
-  proc close*(ev: AsyncEvent) =
-    ## Closes event ``ev``.
+  proc unregister*(ev: AsyncEvent) =
+    ## Unregisters event ``ev``.
     if ev.hWaiter != 0:
       let p = getGlobalDispatcher()
       if unregisterWait(ev.hWaiter) == 0:
@@ -1263,7 +1266,12 @@ when defined(windows) or defined(nimdoc):
         if err.int32 != ERROR_IO_PENDING:
           raiseOSError(osLastError())
       p.handles.excl(AsyncFD(ev.hEvent))
+      ev.hWaiter = 0
+    else:
+      raise newException(ValueError, "Event is not registered!")
 
+  proc close*(ev: AsyncEvent) =
+    ## Closes event ``ev``.
     if closeHandle(ev.hEvent) == 0:
       raiseOSError(osLastError())
     deallocShared(cast[pointer](ev))
@@ -1281,15 +1289,12 @@ when defined(windows) or defined(nimdoc):
 
     proc eventcb(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
       if cb(fd):
-        if unregisterWait(pcd.waitFd) == 0:
-          let err = osLastError()
-          if err.int32 != ERROR_IO_PENDING:
-            raiseOSError(osLastError())
-        ev.hWaiter = 0
+        # we need this check to avoid exception, if `unregister(event)` was
+        # called in callback.
+        if ev.hWaiter != 0: unregister(ev)
         deallocShared(cast[pointer](pcd))
-        p.handles.excl(fd)
 
-    registerWaitableHandle(p, hEvent, flags, pcd, eventcb)
+    registerWaitableHandle(p, hEvent, flags, pcd, INFINITE, eventcb)
     ev.hWaiter = pcd.waitFd
 
   initAll()
@@ -1319,7 +1324,7 @@ else:
       readCB: Callback
       writeCB: Callback
 
-    AsyncEvent* = SelectEvent
+    AsyncEvent* = distinct SelectEvent
 
     PDispatcher* = ref object of PDispatcherBase
       selector: Selector[AsyncData]
@@ -1368,8 +1373,8 @@ else:
   proc unregister*(fd: AsyncFD) =
     getGlobalDispatcher().selector.unregister(fd.SocketHandle)
 
-  # proc unregister*(ev: AsyncEvent) =
-  #   getGlobalDispatcher().selector.unregister(SelectEvent(ev))
+  proc unregister*(ev: AsyncEvent) =
+    getGlobalDispatcher().selector.unregister(SelectEvent(ev))
 
   proc addRead*(fd: AsyncFD, cb: Callback) =
     let p = getGlobalDispatcher()
@@ -1409,7 +1414,7 @@ else:
       var count = p.selector.selectInto(p.adjustedTimeout(timeout), keys)
       var i = 0
       while i < count:
-        var update = false
+        var custom = false
         var fd = keys[i].fd.SocketHandle
         let events = keys[i].events
 
@@ -1420,7 +1425,6 @@ else:
             p.selector.withData(fd, adata) do:
               if adata.readCB == cb:
                 adata.readCB = nil
-                update = true
 
         if Event.Write in events:
           let cb = keys[i].data.writeCB
@@ -1429,24 +1433,29 @@ else:
             p.selector.withData(fd, adata) do:
               if adata.writeCB == cb:
                 adata.writeCB = nil
-                update = true
 
         when supportedPlatform:
           if (customSet * events) != {}:
             let cb = keys[i].data.readCB
             doAssert(cb != nil)
+            custom = true
             if cb(fd.AsyncFD):
               p.selector.withData(fd, adata) do:
                 if adata.readCB == cb:
                   adata.readCB = nil
                   p.selector.unregister(fd)
 
-        if update:
+        # because state `data` can be modified in callback we need to update
+        # descriptor events with currently registered callbacks.
+        if not custom:
+          var update = false
           var newEvents: set[Event] = {}
           p.selector.withData(fd, adata) do:
             if adata.readCB != nil: incl(newEvents, Event.Read)
             if adata.writeCB != nil: incl(newEvents, Event.Write)
-          p.selector.updateHandle(fd, newEvents)
+            update = true
+          if update:
+            p.selector.updateHandle(fd, newEvents)
         inc(i)
 
     # Timer processing.
@@ -1693,15 +1702,15 @@ else:
 
   proc newAsyncEvent*(): AsyncEvent =
     ## Creates new ``AsyncEvent``.
-    result = AsyncEvent(ioselectors.newSelectEvent())
+    result = AsyncEvent(newSelectEvent())
 
   proc setEvent*(ev: AsyncEvent) =
     ## Sets new ``AsyncEvent`` to signaled state.
-    ioselectors.setEvent(SelectEvent(ev))
+    setEvent(SelectEvent(ev))
 
   proc close*(ev: AsyncEvent) =
     ## Closes ``AsyncEvent``
-    ioselectors.close(SelectEvent(ev))
+    close(SelectEvent(ev))
 
   proc addEvent*(ev: AsyncEvent, cb: Callback) =
     ## Start watching for event ``ev``, and call callback ``cb``, when