diff options
author | cheatfate <ka@hardcore.kiev.ua> | 2016-09-06 12:29:53 +0300 |
---|---|---|
committer | cheatfate <ka@hardcore.kiev.ua> | 2016-09-06 12:29:53 +0300 |
commit | ec7aec3d58e39e59454cb7a59cb499e1e86b7fa1 (patch) | |
tree | f456d2e2fea8cc791bdf3f46b5e556544938849e | |
parent | 147c2577200306317b5ba52335293a07b6111444 (diff) | |
download | Nim-ec7aec3d58e39e59454cb7a59cb499e1e86b7fa1.tar.gz |
Fix windows issues.
Fix semantic of AsyncEvent close/unregister #4694. Fix #4697. Added first test.
-rw-r--r-- | lib/upcoming/asyncdispatch.nim | 59 | ||||
-rw-r--r-- | tests/async/tupcoming_async.nim | 125 |
2 files changed, 159 insertions, 25 deletions
diff --git a/lib/upcoming/asyncdispatch.nim b/lib/upcoming/asyncdispatch.nim index ce44e8a6a..72ba4efd6 100644 --- a/lib/upcoming/asyncdispatch.nim +++ b/lib/upcoming/asyncdispatch.nim @@ -1163,7 +1163,7 @@ when defined(windows) or defined(nimdoc): ## receiving notifies. registerWaitableEvent(FD_WRITE or FD_CONNECT or FD_CLOSE) - template registerWaitableHandle(p, hEvent, flags, pcd, handleCallback) = + template registerWaitableHandle(p, hEvent, flags, pcd, timeout, handleCallback) = let handleFD = AsyncFD(hEvent) pcd.ioPort = p.ioPort pcd.handleFd = handleFD @@ -1177,10 +1177,10 @@ when defined(windows) or defined(nimdoc): pcd.ovl = ol if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent, cast[WAITORTIMERCALLBACK](waitableCallback), - cast[pointer](pcd), INFINITE, flags): + cast[pointer](pcd), timeout.Dword, flags): GC_unref(ol) deallocShared(cast[pointer](pcd)) - discard wsaCloseEvent(hEvent) + discard closeHandle(hEvent) raiseOSError(osLastError()) p.handles.incl(handleFD) @@ -1212,7 +1212,7 @@ when defined(windows) or defined(nimdoc): deallocShared(cast[pointer](pcd)) p.handles.excl(fd) - registerWaitableHandle(p, hEvent, flags, pcd, timercb) + registerWaitableHandle(p, hEvent, flags, pcd, timeout, timercb) proc addProcess*(pid: int, cb: Callback) = ## Registers callback ``cb`` to be called when process with pid ``pid`` @@ -1236,10 +1236,12 @@ when defined(windows) or defined(nimdoc): p.handles.excl(fd) discard cb(fd) - registerWaitableHandle(p, hProcess, flags, pcd, proccb) + registerWaitableHandle(p, hProcess, flags, pcd, INFINITE, proccb) proc newAsyncEvent*(): AsyncEvent = ## Creates new ``AsyncEvent`` object. + ## New ``AsyncEvent`` object is not automatically registered with + ## dispatcher like ``AsyncSocket``. var sa = SECURITY_ATTRIBUTES( nLength: sizeof(SECURITY_ATTRIBUTES).cint, bInheritHandle: 1 @@ -1248,14 +1250,15 @@ when defined(windows) or defined(nimdoc): if event == INVALID_HANDLE_VALUE: raiseOSError(osLastError()) result = cast[AsyncEvent](allocShared0(sizeof(AsyncEventImpl))) + result.hEvent = event proc setEvent*(ev: AsyncEvent) = ## Set event ``ev`` to signaled state. if setEvent(ev.hEvent) == 0: raiseOSError(osLastError()) - proc close*(ev: AsyncEvent) = - ## Closes event ``ev``. + proc unregister*(ev: AsyncEvent) = + ## Unregisters event ``ev``. if ev.hWaiter != 0: let p = getGlobalDispatcher() if unregisterWait(ev.hWaiter) == 0: @@ -1263,7 +1266,12 @@ when defined(windows) or defined(nimdoc): if err.int32 != ERROR_IO_PENDING: raiseOSError(osLastError()) p.handles.excl(AsyncFD(ev.hEvent)) + ev.hWaiter = 0 + else: + raise newException(ValueError, "Event is not registered!") + proc close*(ev: AsyncEvent) = + ## Closes event ``ev``. if closeHandle(ev.hEvent) == 0: raiseOSError(osLastError()) deallocShared(cast[pointer](ev)) @@ -1281,15 +1289,12 @@ when defined(windows) or defined(nimdoc): proc eventcb(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = if cb(fd): - if unregisterWait(pcd.waitFd) == 0: - let err = osLastError() - if err.int32 != ERROR_IO_PENDING: - raiseOSError(osLastError()) - ev.hWaiter = 0 + # we need this check to avoid exception, if `unregister(event)` was + # called in callback. + if ev.hWaiter != 0: unregister(ev) deallocShared(cast[pointer](pcd)) - p.handles.excl(fd) - registerWaitableHandle(p, hEvent, flags, pcd, eventcb) + registerWaitableHandle(p, hEvent, flags, pcd, INFINITE, eventcb) ev.hWaiter = pcd.waitFd initAll() @@ -1319,7 +1324,7 @@ else: readCB: Callback writeCB: Callback - AsyncEvent* = SelectEvent + AsyncEvent* = distinct SelectEvent PDispatcher* = ref object of PDispatcherBase selector: Selector[AsyncData] @@ -1368,8 +1373,8 @@ else: proc unregister*(fd: AsyncFD) = getGlobalDispatcher().selector.unregister(fd.SocketHandle) - # proc unregister*(ev: AsyncEvent) = - # getGlobalDispatcher().selector.unregister(SelectEvent(ev)) + proc unregister*(ev: AsyncEvent) = + getGlobalDispatcher().selector.unregister(SelectEvent(ev)) proc addRead*(fd: AsyncFD, cb: Callback) = let p = getGlobalDispatcher() @@ -1409,7 +1414,7 @@ else: var count = p.selector.selectInto(p.adjustedTimeout(timeout), keys) var i = 0 while i < count: - var update = false + var custom = false var fd = keys[i].fd.SocketHandle let events = keys[i].events @@ -1420,7 +1425,6 @@ else: p.selector.withData(fd, adata) do: if adata.readCB == cb: adata.readCB = nil - update = true if Event.Write in events: let cb = keys[i].data.writeCB @@ -1429,24 +1433,29 @@ else: p.selector.withData(fd, adata) do: if adata.writeCB == cb: adata.writeCB = nil - update = true when supportedPlatform: if (customSet * events) != {}: let cb = keys[i].data.readCB doAssert(cb != nil) + custom = true if cb(fd.AsyncFD): p.selector.withData(fd, adata) do: if adata.readCB == cb: adata.readCB = nil p.selector.unregister(fd) - if update: + # because state `data` can be modified in callback we need to update + # descriptor events with currently registered callbacks. + if not custom: + var update = false var newEvents: set[Event] = {} p.selector.withData(fd, adata) do: if adata.readCB != nil: incl(newEvents, Event.Read) if adata.writeCB != nil: incl(newEvents, Event.Write) - p.selector.updateHandle(fd, newEvents) + update = true + if update: + p.selector.updateHandle(fd, newEvents) inc(i) # Timer processing. @@ -1693,15 +1702,15 @@ else: proc newAsyncEvent*(): AsyncEvent = ## Creates new ``AsyncEvent``. - result = AsyncEvent(ioselectors.newSelectEvent()) + result = AsyncEvent(newSelectEvent()) proc setEvent*(ev: AsyncEvent) = ## Sets new ``AsyncEvent`` to signaled state. - ioselectors.setEvent(SelectEvent(ev)) + setEvent(SelectEvent(ev)) proc close*(ev: AsyncEvent) = ## Closes ``AsyncEvent`` - ioselectors.close(SelectEvent(ev)) + close(SelectEvent(ev)) proc addEvent*(ev: AsyncEvent, cb: Callback) = ## Start watching for event ``ev``, and call callback ``cb``, when diff --git a/tests/async/tupcoming_async.nim b/tests/async/tupcoming_async.nim new file mode 100644 index 000000000..e8bdcfdcb --- /dev/null +++ b/tests/async/tupcoming_async.nim @@ -0,0 +1,125 @@ +discard """ + output: ''' +OK +OK +OK +OK +''' +""" + +when defined(upcoming): + import asyncdispatch, times, osproc, streams + + const supportedPlatform = defined(linux) or defined(freebsd) or + defined(netbsd) or defined(openbsd) or + defined(macosx) + + proc waitEvent(ev: AsyncEvent, closeEvent = false): Future[void] = + var retFuture = newFuture[void]("waitEvent") + proc cb(fd: AsyncFD): bool = + retFuture.complete() + if closeEvent: + return true + else: + return false + addEvent(ev, cb) + return retFuture + + proc waitTimer(timeout: int): Future[void] = + var retFuture = newFuture[void]("waitTimer") + proc cb(fd: AsyncFD): bool = + retFuture.complete() + addTimer(timeout, true, cb) + return retFuture + + proc waitProcess(p: Process): Future[void] = + var retFuture = newFuture[void]("waitProcess") + proc cb(fd: AsyncFD): bool = + retFuture.complete() + addProcess(p.processID(), cb) + return retFuture + + proc delayedSet(ev: AsyncEvent, timeout: int): Future[void] {.async.} = + await waitTimer(timeout) + ev.setEvent() + + proc timerTest() = + var timeout = 200 + var errorRate = 10.0 + var start = epochTime() + waitFor(waitTimer(200)) + var finish = epochTime() + var lowlimit = float(timeout) - float(timeout) * errorRate / 100.0 + var highlimit = float(timeout) + float(timeout) * errorRate / 100.0 + var elapsed = (finish - start) * 1_000 # convert to milliseconds + if elapsed >= lowlimit and elapsed < highlimit: + echo "OK" + else: + echo "timerTest: Timeout = " & $(elapsed) & ", but must be inside of [" & + $lowlimit & ", " & $highlimit & ")" + + proc eventTest() = + var event = newAsyncEvent() + var fut = waitEvent(event) + asyncCheck(delayedSet(event, 500)) + waitFor(fut or waitTimer(1000)) + if fut.finished: + echo "OK" + else: + echo "eventTest: Timeout expired before event received!" + + proc processTest() = + when defined(windows): + var process = startProcess("ping.exe", "", + ["127.0.0.1", "-n", "2", "-w", "100"], nil, + {poStdErrToStdOut, poUsePath, poInteractive, + poDemon}) + else: + var process = startProcess("/bin/sleep", "", ["1"], nil, + {poStdErrToStdOut, poUsePath}) + var fut = waitProcess(process) + waitFor(fut or waitTimer(2000)) + if fut.finished and process.peekExitCode() == 0: + echo "OK" + else: + echo "processTest: Timeout expired before process exited!" + + when supportedPlatform: + import posix + + proc waitSignal(signal: int): Future[void] = + var retFuture = newFuture[void]("waitSignal") + proc cb(fd: AsyncFD): bool = + retFuture.complete() + addSignal(signal, cb) + return retFuture + + proc delayedSignal(signal: int, timeout: int): Future[void] {.async.} = + await waitTimer(timeout) + var pid = posix.getpid() + discard posix.kill(pid, signal.cint) + + proc signalTest() = + var fut = waitSignal(posix.SIGINT) + asyncCheck(delayedSignal(posix.SIGINT, 500)) + waitFor(fut or waitTimer(1000)) + if fut.finished: + echo "OK" + else: + echo "signalTest: Timeout expired before signal received!" + + when supportedPlatform: + timerTest() + eventTest() + processTest() + signalTest() + elif defined(windows): + timerTest() + eventTest() + processTest() + echo "OK" + else: + eventTest() + echo "OK\nOK\nOK" +else: + echo "OK\nOK\nOK\nOK" |