diff options
author | Eugene Kabanov <ka@hardcore.kiev.ua> | 2017-02-01 13:12:26 +0200 |
---|---|---|
committer | Andreas Rumpf <rumpf_a@web.de> | 2017-02-01 12:12:26 +0100 |
commit | d90f3f59aca668d3000d6fd64199bfe720240911 (patch) | |
tree | 78a764c48eef8a9e10e11e690cfa95e13cba2288 /lib/upcoming | |
parent | 3c773c189fc4ba4a639a1ca2d910d5a5c6e13b21 (diff) | |
download | Nim-d90f3f59aca668d3000d6fd64199bfe720240911.tar.gz |
Fixes for upcoming asyncdispatch and ioselectors. (#5309)
Diffstat (limited to 'lib/upcoming')
-rw-r--r-- | lib/upcoming/asyncdispatch.nim | 64 |
1 files changed, 37 insertions, 27 deletions
diff --git a/lib/upcoming/asyncdispatch.nim b/lib/upcoming/asyncdispatch.nim index 31aa6c9cb..1dfd0122a 100644 --- a/lib/upcoming/asyncdispatch.nim +++ b/lib/upcoming/asyncdispatch.nim @@ -1056,16 +1056,14 @@ when defined(windows) or defined(nimdoc): proc unregister*(ev: AsyncEvent) = ## Unregisters event ``ev``. - if ev.hWaiter != 0: - let p = getGlobalDispatcher() - p.handles.excl(AsyncFD(ev.hEvent)) - if unregisterWait(ev.hWaiter) == 0: - let err = osLastError() - if err.int32 != ERROR_IO_PENDING: - raiseOSError(err) - ev.hWaiter = 0 - else: - raise newException(ValueError, "Event is not registered!") + doAssert(ev.hWaiter != 0, "Event is not registered in the queue!") + let p = getGlobalDispatcher() + p.handles.excl(AsyncFD(ev.hEvent)) + if unregisterWait(ev.hWaiter) == 0: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + raiseOSError(err) + ev.hWaiter = 0 proc close*(ev: AsyncEvent) = ## Closes event ``ev``. @@ -1076,8 +1074,7 @@ when defined(windows) or defined(nimdoc): proc addEvent*(ev: AsyncEvent, cb: Callback) = ## Registers callback ``cb`` to be called when ``ev`` will be signaled - if ev.hWaiter != 0: - raise newException(ValueError, "Event is already registered!") + doAssert(ev.hWaiter == 0, "Event is already registered in the queue!") let p = getGlobalDispatcher() let hEvent = ev.hEvent @@ -1086,17 +1083,22 @@ when defined(windows) or defined(nimdoc): var flags = WT_EXECUTEINWAITTHREAD.Dword proc eventcb(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = - if cb(fd): - # we need this check to avoid exception, if `unregister(event)` was - # called in callback. - deallocShared(cast[pointer](pcd)) - if ev.hWaiter != 0: unregister(ev) + if ev.hWaiter != 0: + if cb(fd): + # we need this check to avoid exception, if `unregister(event)` was + # called in callback. + deallocShared(cast[pointer](pcd)) + if ev.hWaiter != 0: + unregister(ev) + else: + # if callback returned `false`, then it wants to be called again, so + # we need to ref and protect `pcd.ovl` again, because it will be + # unrefed and disposed in `poll()`. + GC_ref(pcd.ovl) + pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb)) else: - # if callback returned `false`, then it wants to be called again, so - # we need to ref and protect `pcd.ovl` again, because it will be - # unrefed and disposed in `poll()`. - GC_ref(pcd.ovl) - pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb)) + # if ev.hWaiter == 0, then event was unregistered before `poll()` call. + deallocShared(cast[pointer](pcd)) registerWaitableHandle(p, hEvent, flags, pcd, INFINITE, eventcb) ev.hWaiter = pcd.waitFd @@ -1205,7 +1207,7 @@ else: not p.selector.isEmpty() or p.timers.len != 0 or p.callbacks.len != 0 template processBasicCallbacks(ident, rwlist: untyped) = - # Process pending descriptor's callbacks. + # Process pending descriptor's and AsyncEvent callbacks. # Invoke every callback stored in `rwlist`, until first one # returned `false`, which means callback wants to stay # alive. In such case all remaining callbacks will be added @@ -1232,6 +1234,8 @@ else: withData(p.selector, ident, adata) do: adata.rwlist = newList & adata.rwlist + rLength = len(adata.readList) + wLength = len(adata.writeList) template processCustomCallbacks(ident: untyped) = # Process pending custom event callbacks. Custom events are @@ -1275,6 +1279,8 @@ else: var custom = false let fd = keys[i].fd let events = keys[i].events + var rLength = 0 # len(data.readList) after callback + var wLength = 0 # len(data.writeList) after callback if Event.Read in events or events == {Event.Error}: processBasicCallbacks(fd, readList) @@ -1283,8 +1289,10 @@ else: processBasicCallbacks(fd, writeList) if Event.User in events or events == {Event.Error}: - custom = true processBasicCallbacks(fd, readList) + custom = true + if rLength == 0: + p.selector.unregister(fd) when ioselSupportedPlatform: if (customSet * events) != {}: @@ -1296,10 +1304,12 @@ else: if not custom: var update = false var newEvents: set[Event] = {} - p.selector.withData(fd, adata) do: - if len(adata.readList) > 0: incl(newEvents, Event.Read) - if len(adata.writeList) > 0: incl(newEvents, Event.Write) + if rLength > 0: + update = true + incl(newEvents, Event.Read) + if wLength > 0: update = true + incl(newEvents, Event.Write) if update: p.selector.updateHandle(SocketHandle(fd), newEvents) inc(i) |