diff options
Diffstat (limited to 'lib/upcoming/asyncdispatch.nim')
-rw-r--r-- | lib/upcoming/asyncdispatch.nim | 59 |
1 files changed, 34 insertions, 25 deletions
diff --git a/lib/upcoming/asyncdispatch.nim b/lib/upcoming/asyncdispatch.nim index ce44e8a6a..72ba4efd6 100644 --- a/lib/upcoming/asyncdispatch.nim +++ b/lib/upcoming/asyncdispatch.nim @@ -1163,7 +1163,7 @@ when defined(windows) or defined(nimdoc): ## receiving notifies. registerWaitableEvent(FD_WRITE or FD_CONNECT or FD_CLOSE) - template registerWaitableHandle(p, hEvent, flags, pcd, handleCallback) = + template registerWaitableHandle(p, hEvent, flags, pcd, timeout, handleCallback) = let handleFD = AsyncFD(hEvent) pcd.ioPort = p.ioPort pcd.handleFd = handleFD @@ -1177,10 +1177,10 @@ when defined(windows) or defined(nimdoc): pcd.ovl = ol if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent, cast[WAITORTIMERCALLBACK](waitableCallback), - cast[pointer](pcd), INFINITE, flags): + cast[pointer](pcd), timeout.Dword, flags): GC_unref(ol) deallocShared(cast[pointer](pcd)) - discard wsaCloseEvent(hEvent) + discard closeHandle(hEvent) raiseOSError(osLastError()) p.handles.incl(handleFD) @@ -1212,7 +1212,7 @@ when defined(windows) or defined(nimdoc): deallocShared(cast[pointer](pcd)) p.handles.excl(fd) - registerWaitableHandle(p, hEvent, flags, pcd, timercb) + registerWaitableHandle(p, hEvent, flags, pcd, timeout, timercb) proc addProcess*(pid: int, cb: Callback) = ## Registers callback ``cb`` to be called when process with pid ``pid`` @@ -1236,10 +1236,12 @@ when defined(windows) or defined(nimdoc): p.handles.excl(fd) discard cb(fd) - registerWaitableHandle(p, hProcess, flags, pcd, proccb) + registerWaitableHandle(p, hProcess, flags, pcd, INFINITE, proccb) proc newAsyncEvent*(): AsyncEvent = ## Creates new ``AsyncEvent`` object. + ## New ``AsyncEvent`` object is not automatically registered with + ## dispatcher like ``AsyncSocket``. var sa = SECURITY_ATTRIBUTES( nLength: sizeof(SECURITY_ATTRIBUTES).cint, bInheritHandle: 1 @@ -1248,14 +1250,15 @@ when defined(windows) or defined(nimdoc): if event == INVALID_HANDLE_VALUE: raiseOSError(osLastError()) result = cast[AsyncEvent](allocShared0(sizeof(AsyncEventImpl))) + result.hEvent = event proc setEvent*(ev: AsyncEvent) = ## Set event ``ev`` to signaled state. if setEvent(ev.hEvent) == 0: raiseOSError(osLastError()) - proc close*(ev: AsyncEvent) = - ## Closes event ``ev``. + proc unregister*(ev: AsyncEvent) = + ## Unregisters event ``ev``. if ev.hWaiter != 0: let p = getGlobalDispatcher() if unregisterWait(ev.hWaiter) == 0: @@ -1263,7 +1266,12 @@ when defined(windows) or defined(nimdoc): if err.int32 != ERROR_IO_PENDING: raiseOSError(osLastError()) p.handles.excl(AsyncFD(ev.hEvent)) + ev.hWaiter = 0 + else: + raise newException(ValueError, "Event is not registered!") + proc close*(ev: AsyncEvent) = + ## Closes event ``ev``. if closeHandle(ev.hEvent) == 0: raiseOSError(osLastError()) deallocShared(cast[pointer](ev)) @@ -1281,15 +1289,12 @@ when defined(windows) or defined(nimdoc): proc eventcb(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = if cb(fd): - if unregisterWait(pcd.waitFd) == 0: - let err = osLastError() - if err.int32 != ERROR_IO_PENDING: - raiseOSError(osLastError()) - ev.hWaiter = 0 + # we need this check to avoid exception, if `unregister(event)` was + # called in callback. + if ev.hWaiter != 0: unregister(ev) deallocShared(cast[pointer](pcd)) - p.handles.excl(fd) - registerWaitableHandle(p, hEvent, flags, pcd, eventcb) + registerWaitableHandle(p, hEvent, flags, pcd, INFINITE, eventcb) ev.hWaiter = pcd.waitFd initAll() @@ -1319,7 +1324,7 @@ else: readCB: Callback writeCB: Callback - AsyncEvent* = SelectEvent + AsyncEvent* = distinct SelectEvent PDispatcher* = ref object of PDispatcherBase selector: Selector[AsyncData] @@ -1368,8 +1373,8 @@ else: proc unregister*(fd: AsyncFD) = getGlobalDispatcher().selector.unregister(fd.SocketHandle) - # proc unregister*(ev: AsyncEvent) = - # getGlobalDispatcher().selector.unregister(SelectEvent(ev)) + proc unregister*(ev: AsyncEvent) = + getGlobalDispatcher().selector.unregister(SelectEvent(ev)) proc addRead*(fd: AsyncFD, cb: Callback) = let p = getGlobalDispatcher() @@ -1409,7 +1414,7 @@ else: var count = p.selector.selectInto(p.adjustedTimeout(timeout), keys) var i = 0 while i < count: - var update = false + var custom = false var fd = keys[i].fd.SocketHandle let events = keys[i].events @@ -1420,7 +1425,6 @@ else: p.selector.withData(fd, adata) do: if adata.readCB == cb: adata.readCB = nil - update = true if Event.Write in events: let cb = keys[i].data.writeCB @@ -1429,24 +1433,29 @@ else: p.selector.withData(fd, adata) do: if adata.writeCB == cb: adata.writeCB = nil - update = true when supportedPlatform: if (customSet * events) != {}: let cb = keys[i].data.readCB doAssert(cb != nil) + custom = true if cb(fd.AsyncFD): p.selector.withData(fd, adata) do: if adata.readCB == cb: adata.readCB = nil p.selector.unregister(fd) - if update: + # because state `data` can be modified in callback we need to update + # descriptor events with currently registered callbacks. + if not custom: + var update = false var newEvents: set[Event] = {} p.selector.withData(fd, adata) do: if adata.readCB != nil: incl(newEvents, Event.Read) if adata.writeCB != nil: incl(newEvents, Event.Write) - p.selector.updateHandle(fd, newEvents) + update = true + if update: + p.selector.updateHandle(fd, newEvents) inc(i) # Timer processing. @@ -1693,15 +1702,15 @@ else: proc newAsyncEvent*(): AsyncEvent = ## Creates new ``AsyncEvent``. - result = AsyncEvent(ioselectors.newSelectEvent()) + result = AsyncEvent(newSelectEvent()) proc setEvent*(ev: AsyncEvent) = ## Sets new ``AsyncEvent`` to signaled state. - ioselectors.setEvent(SelectEvent(ev)) + setEvent(SelectEvent(ev)) proc close*(ev: AsyncEvent) = ## Closes ``AsyncEvent`` - ioselectors.close(SelectEvent(ev)) + close(SelectEvent(ev)) proc addEvent*(ev: AsyncEvent, cb: Callback) = ## Start watching for event ``ev``, and call callback ``cb``, when |