summary refs log tree commit diff stats
diff options
context:
space:
mode:
authorcheatfate <ka@hardcore.kiev.ua>2016-09-06 12:29:53 +0300
committercheatfate <ka@hardcore.kiev.ua>2016-09-06 12:29:53 +0300
commitec7aec3d58e39e59454cb7a59cb499e1e86b7fa1 (patch)
treef456d2e2fea8cc791bdf3f46b5e556544938849e
parent147c2577200306317b5ba52335293a07b6111444 (diff)
downloadNim-ec7aec3d58e39e59454cb7a59cb499e1e86b7fa1.tar.gz
Fix windows issues.
Fix semantic of AsyncEvent close/unregister #4694.
Fix #4697.
Added first test.
-rw-r--r--lib/upcoming/asyncdispatch.nim59
-rw-r--r--tests/async/tupcoming_async.nim125
2 files changed, 159 insertions, 25 deletions
diff --git a/lib/upcoming/asyncdispatch.nim b/lib/upcoming/asyncdispatch.nim
index ce44e8a6a..72ba4efd6 100644
--- a/lib/upcoming/asyncdispatch.nim
+++ b/lib/upcoming/asyncdispatch.nim
@@ -1163,7 +1163,7 @@ when defined(windows) or defined(nimdoc):
     ## receiving notifies.
     registerWaitableEvent(FD_WRITE or FD_CONNECT or FD_CLOSE)
 
-  template registerWaitableHandle(p, hEvent, flags, pcd, handleCallback) =
+  template registerWaitableHandle(p, hEvent, flags, pcd, timeout, handleCallback) =
     let handleFD = AsyncFD(hEvent)
     pcd.ioPort = p.ioPort
     pcd.handleFd = handleFD
@@ -1177,10 +1177,10 @@ when defined(windows) or defined(nimdoc):
     pcd.ovl = ol
     if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent,
                                     cast[WAITORTIMERCALLBACK](waitableCallback),
-                                       cast[pointer](pcd), INFINITE, flags):
+                                       cast[pointer](pcd), timeout.Dword, flags):
       GC_unref(ol)
       deallocShared(cast[pointer](pcd))
-      discard wsaCloseEvent(hEvent)
+      discard closeHandle(hEvent)
       raiseOSError(osLastError())
     p.handles.incl(handleFD)
 
@@ -1212,7 +1212,7 @@ when defined(windows) or defined(nimdoc):
         deallocShared(cast[pointer](pcd))
         p.handles.excl(fd)
 
-    registerWaitableHandle(p, hEvent, flags, pcd, timercb)
+    registerWaitableHandle(p, hEvent, flags, pcd, timeout, timercb)
 
   proc addProcess*(pid: int, cb: Callback) =
     ## Registers callback ``cb`` to be called when process with pid ``pid``
@@ -1236,10 +1236,12 @@ when defined(windows) or defined(nimdoc):
       p.handles.excl(fd)
       discard cb(fd)
 
-    registerWaitableHandle(p, hProcess, flags, pcd, proccb)
+    registerWaitableHandle(p, hProcess, flags, pcd, INFINITE, proccb)
 
   proc newAsyncEvent*(): AsyncEvent =
     ## Creates new ``AsyncEvent`` object.
+    ## New ``AsyncEvent`` object is not automatically registered with
+    ## dispatcher like ``AsyncSocket``.
     var sa = SECURITY_ATTRIBUTES(
       nLength: sizeof(SECURITY_ATTRIBUTES).cint,
       bInheritHandle: 1
@@ -1248,14 +1250,15 @@ when defined(windows) or defined(nimdoc):
     if event == INVALID_HANDLE_VALUE:
       raiseOSError(osLastError())
     result = cast[AsyncEvent](allocShared0(sizeof(AsyncEventImpl)))
+    result.hEvent = event
 
   proc setEvent*(ev: AsyncEvent) =
     ## Set event ``ev`` to signaled state.
     if setEvent(ev.hEvent) == 0:
       raiseOSError(osLastError())
 
-  proc close*(ev: AsyncEvent) =
-    ## Closes event ``ev``.
+  proc unregister*(ev: AsyncEvent) =
+    ## Unregisters event ``ev``.
     if ev.hWaiter != 0:
       let p = getGlobalDispatcher()
       if unregisterWait(ev.hWaiter) == 0:
@@ -1263,7 +1266,12 @@ when defined(windows) or defined(nimdoc):
         if err.int32 != ERROR_IO_PENDING:
           raiseOSError(osLastError())
       p.handles.excl(AsyncFD(ev.hEvent))
+      ev.hWaiter = 0
+    else:
+      raise newException(ValueError, "Event is not registered!")
 
+  proc close*(ev: AsyncEvent) =
+    ## Closes event ``ev``.
     if closeHandle(ev.hEvent) == 0:
       raiseOSError(osLastError())
     deallocShared(cast[pointer](ev))
@@ -1281,15 +1289,12 @@ when defined(windows) or defined(nimdoc):
 
     proc eventcb(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) =
       if cb(fd):
-        if unregisterWait(pcd.waitFd) == 0:
-          let err = osLastError()
-          if err.int32 != ERROR_IO_PENDING:
-            raiseOSError(osLastError())
-        ev.hWaiter = 0
+        # we need this check to avoid exception, if `unregister(event)` was
+        # called in callback.
+        if ev.hWaiter != 0: unregister(ev)
         deallocShared(cast[pointer](pcd))
-        p.handles.excl(fd)
 
-    registerWaitableHandle(p, hEvent, flags, pcd, eventcb)
+    registerWaitableHandle(p, hEvent, flags, pcd, INFINITE, eventcb)
     ev.hWaiter = pcd.waitFd
 
   initAll()
@@ -1319,7 +1324,7 @@ else:
       readCB: Callback
       writeCB: Callback
 
-    AsyncEvent* = SelectEvent
+    AsyncEvent* = distinct SelectEvent
 
     PDispatcher* = ref object of PDispatcherBase
       selector: Selector[AsyncData]
@@ -1368,8 +1373,8 @@ else:
   proc unregister*(fd: AsyncFD) =
     getGlobalDispatcher().selector.unregister(fd.SocketHandle)
 
-  # proc unregister*(ev: AsyncEvent) =
-  #   getGlobalDispatcher().selector.unregister(SelectEvent(ev))
+  proc unregister*(ev: AsyncEvent) =
+    getGlobalDispatcher().selector.unregister(SelectEvent(ev))
 
   proc addRead*(fd: AsyncFD, cb: Callback) =
     let p = getGlobalDispatcher()
@@ -1409,7 +1414,7 @@ else:
       var count = p.selector.selectInto(p.adjustedTimeout(timeout), keys)
       var i = 0
       while i < count:
-        var update = false
+        var custom = false
         var fd = keys[i].fd.SocketHandle
         let events = keys[i].events
 
@@ -1420,7 +1425,6 @@ else:
             p.selector.withData(fd, adata) do:
               if adata.readCB == cb:
                 adata.readCB = nil
-                update = true
 
         if Event.Write in events:
           let cb = keys[i].data.writeCB
@@ -1429,24 +1433,29 @@ else:
             p.selector.withData(fd, adata) do:
               if adata.writeCB == cb:
                 adata.writeCB = nil
-                update = true
 
         when supportedPlatform:
           if (customSet * events) != {}:
             let cb = keys[i].data.readCB
             doAssert(cb != nil)
+            custom = true
             if cb(fd.AsyncFD):
               p.selector.withData(fd, adata) do:
                 if adata.readCB == cb:
                   adata.readCB = nil
                   p.selector.unregister(fd)
 
-        if update:
+        # because state `data` can be modified in callback we need to update
+        # descriptor events with currently registered callbacks.
+        if not custom:
+          var update = false
           var newEvents: set[Event] = {}
           p.selector.withData(fd, adata) do:
             if adata.readCB != nil: incl(newEvents, Event.Read)
             if adata.writeCB != nil: incl(newEvents, Event.Write)
-          p.selector.updateHandle(fd, newEvents)
+            update = true
+          if update:
+            p.selector.updateHandle(fd, newEvents)
         inc(i)
 
     # Timer processing.
@@ -1693,15 +1702,15 @@ else:
 
   proc newAsyncEvent*(): AsyncEvent =
     ## Creates new ``AsyncEvent``.
-    result = AsyncEvent(ioselectors.newSelectEvent())
+    result = AsyncEvent(newSelectEvent())
 
   proc setEvent*(ev: AsyncEvent) =
     ## Sets new ``AsyncEvent`` to signaled state.
-    ioselectors.setEvent(SelectEvent(ev))
+    setEvent(SelectEvent(ev))
 
   proc close*(ev: AsyncEvent) =
     ## Closes ``AsyncEvent``
-    ioselectors.close(SelectEvent(ev))
+    close(SelectEvent(ev))
 
   proc addEvent*(ev: AsyncEvent, cb: Callback) =
     ## Start watching for event ``ev``, and call callback ``cb``, when
diff --git a/tests/async/tupcoming_async.nim b/tests/async/tupcoming_async.nim
new file mode 100644
index 000000000..e8bdcfdcb
--- /dev/null
+++ b/tests/async/tupcoming_async.nim
@@ -0,0 +1,125 @@
+discard """
+  output: '''
+OK
+OK
+OK
+OK
+'''
+"""
+
+when defined(upcoming):
+  import asyncdispatch, times, osproc, streams
+
+  const supportedPlatform = defined(linux) or defined(freebsd) or
+                            defined(netbsd) or defined(openbsd) or
+                            defined(macosx)
+
+  proc waitEvent(ev: AsyncEvent, closeEvent = false): Future[void] =
+    var retFuture = newFuture[void]("waitEvent")
+    proc cb(fd: AsyncFD): bool =
+      retFuture.complete()
+      if closeEvent:
+        return true
+      else:
+        return false
+    addEvent(ev, cb)
+    return retFuture
+
+  proc waitTimer(timeout: int): Future[void] =
+    var retFuture = newFuture[void]("waitTimer")
+    proc cb(fd: AsyncFD): bool =
+      retFuture.complete()
+    addTimer(timeout, true, cb)
+    return retFuture
+
+  proc waitProcess(p: Process): Future[void] =
+    var retFuture = newFuture[void]("waitProcess")
+    proc cb(fd: AsyncFD): bool =
+      retFuture.complete()
+    addProcess(p.processID(), cb)
+    return retFuture
+
+  proc delayedSet(ev: AsyncEvent, timeout: int): Future[void] {.async.} =
+    await waitTimer(timeout)
+    ev.setEvent()
+
+  proc timerTest() =
+    var timeout = 200
+    var errorRate = 10.0
+    var start = epochTime()
+    waitFor(waitTimer(200))
+    var finish = epochTime()
+    var lowlimit = float(timeout) - float(timeout) * errorRate / 100.0
+    var highlimit = float(timeout) + float(timeout) * errorRate / 100.0
+    var elapsed = (finish - start) * 1_000 # convert to milliseconds
+    if elapsed >= lowlimit and elapsed < highlimit:
+      echo "OK"
+    else:
+      echo "timerTest: Timeout = " & $(elapsed) & ", but must be inside of [" &
+                                   $lowlimit & ", " & $highlimit & ")"
+
+  proc eventTest() =
+    var event = newAsyncEvent()
+    var fut = waitEvent(event)
+    asyncCheck(delayedSet(event, 500))
+    waitFor(fut or waitTimer(1000))
+    if fut.finished:
+      echo "OK"
+    else:
+      echo "eventTest: Timeout expired before event received!"
+
+  proc processTest() =
+    when defined(windows):
+      var process = startProcess("ping.exe", "",
+                                 ["127.0.0.1", "-n", "2", "-w", "100"], nil,
+                                 {poStdErrToStdOut, poUsePath, poInteractive,
+                                 poDemon})
+    else:
+      var process = startProcess("/bin/sleep", "", ["1"], nil,
+                                 {poStdErrToStdOut, poUsePath})
+    var fut = waitProcess(process)
+    waitFor(fut or waitTimer(2000))
+    if fut.finished and process.peekExitCode() == 0:
+      echo "OK"
+    else:
+      echo "processTest: Timeout expired before process exited!"
+
+  when supportedPlatform:
+    import posix
+
+    proc waitSignal(signal: int): Future[void] =
+      var retFuture = newFuture[void]("waitSignal")
+      proc cb(fd: AsyncFD): bool =
+        retFuture.complete()
+      addSignal(signal, cb)
+      return retFuture
+
+    proc delayedSignal(signal: int, timeout: int): Future[void] {.async.} =
+      await waitTimer(timeout)
+      var pid = posix.getpid()
+      discard posix.kill(pid, signal.cint)
+
+    proc signalTest() =
+      var fut = waitSignal(posix.SIGINT)
+      asyncCheck(delayedSignal(posix.SIGINT, 500))
+      waitFor(fut or waitTimer(1000))
+      if fut.finished:
+        echo "OK"
+      else:
+        echo "signalTest: Timeout expired before signal received!"
+
+  when supportedPlatform:
+    timerTest()
+    eventTest()
+    processTest()
+    signalTest()
+  elif defined(windows):
+    timerTest()
+    eventTest()
+    processTest()
+    echo "OK"
+  else:
+    eventTest()
+    echo "OK\nOK\nOK"
+else:
+  echo "OK\nOK\nOK\nOK"