diff options
Diffstat (limited to 'lib/upcoming/asyncdispatch.nim')
-rw-r--r-- | lib/upcoming/asyncdispatch.nim | 177 |
1 files changed, 100 insertions, 77 deletions
diff --git a/lib/upcoming/asyncdispatch.nim b/lib/upcoming/asyncdispatch.nim index 78c7afffc..80a4f0e4f 100644 --- a/lib/upcoming/asyncdispatch.nim +++ b/lib/upcoming/asyncdispatch.nim @@ -526,10 +526,10 @@ when defined(windows) or defined(nimdoc): proc send*(socket: AsyncFD, buf: pointer, size: int, flags = {SocketFlag.SafeDisconn}): Future[void] = - ## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future will complete once all - ## data has been sent. - ## **WARNING**: Use it with caution. If ``buf`` refers to GC'ed object, you must use GC_ref/GC_unref calls - ## to avoid early freeing of the buffer + ## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future + ## will complete once all data has been sent. + ## **WARNING**: Use it with caution. If ``buf`` refers to GC'ed object, + ## you must use GC_ref/GC_unref calls to avoid early freeing of the buffer. verifyPresence(socket) var retFuture = newFuture[void]("send") @@ -946,7 +946,8 @@ when defined(windows) or defined(nimdoc): ## receiving notifies. registerWaitableEvent(fd, cb, FD_WRITE or FD_CONNECT or FD_CLOSE) - template registerWaitableHandle(p, hEvent, flags, pcd, timeout, handleCallback) = + template registerWaitableHandle(p, hEvent, flags, pcd, timeout, + handleCallback) = let handleFD = AsyncFD(hEvent) pcd.ioPort = p.ioPort pcd.handleFd = handleFD @@ -961,7 +962,7 @@ when defined(windows) or defined(nimdoc): pcd.ovl = ol if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent, cast[WAITORTIMERCALLBACK](waitableCallback), - cast[pointer](pcd), timeout.Dword, flags): + cast[pointer](pcd), timeout.Dword, flags): GC_unref(ol) deallocShared(cast[pointer](pcd)) discard closeHandle(hEvent) @@ -1098,15 +1099,18 @@ else: import ioselectors from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK, MSG_NOSIGNAL + const + InitCallbackListSize = 4 # initial size of callbacks sequence, + # associated with file/socket descriptor. + InitDelayedCallbackListSize = 64 # initial size of delayed callbacks + # queue. type AsyncFD* = distinct cint Callback = proc (fd: AsyncFD): bool {.closure,gcsafe.} - DoublyLinkedListRef = ref DoublyLinkedList[Callback] - AsyncData = object - readCBs: DoublyLinkedListRef - writeCBs: DoublyLinkedListRef + readList: seq[Callback] + writeList: seq[Callback] AsyncEvent* = distinct SelectEvent @@ -1117,11 +1121,17 @@ else: proc `==`*(x, y: AsyncFD): bool {.borrow.} proc `==`*(x, y: AsyncEvent): bool {.borrow.} + template newAsyncData(): AsyncData = + AsyncData( + readList: newSeqOfCap[Callback](InitCallbackListSize), + writeList: newSeqOfCap[Callback](InitCallbackListSize) + ) + proc newDispatcher*(): PDispatcher = new result result.selector = newSelector[AsyncData]() result.timers.newHeapQueue() - result.callbacks = initDeque[proc ()](64) + result.callbacks = initDeque[proc ()](InitDelayedCallbackListSize) var gDisp{.threadvar.}: PDispatcher ## Global dispatcher proc getGlobalDispatcher*(): PDispatcher = @@ -1130,10 +1140,7 @@ else: proc register*(fd: AsyncFD) = let p = getGlobalDispatcher() - var data = AsyncData( - readCBs: DoublyLinkedListRef(), - writeCBs: DoublyLinkedListRef() - ) + var data = newAsyncData() p.selector.registerHandle(fd.SocketHandle, {}, data) proc newAsyncNativeSocket*(domain: cint, sockType: cint, @@ -1168,10 +1175,9 @@ else: let p = getGlobalDispatcher() var newEvents = {Event.Read} withData(p.selector, fd.SocketHandle, adata) do: - adata.readCBs[].append(cb) + adata.readList.add(cb) newEvents.incl(Event.Read) - if not isNil(adata.writeCBs.head): - newEvents.incl(Event.Write) + if len(adata.writeList) != 0: newEvents.incl(Event.Write) do: raise newException(ValueError, "File descriptor not registered.") p.selector.updateHandle(fd.SocketHandle, newEvents) @@ -1180,10 +1186,9 @@ else: let p = getGlobalDispatcher() var newEvents = {Event.Write} withData(p.selector, fd.SocketHandle, adata) do: - adata.writeCBs[].append(cb) + adata.writeList.add(cb) newEvents.incl(Event.Write) - if not isNil(adata.readCBs.head): - newEvents.incl(Event.Read) + if len(adata.readList) != 0: newEvents.incl(Event.Read) do: raise newException(ValueError, "File descriptor not registered.") p.selector.updateHandle(fd.SocketHandle, newEvents) @@ -1192,13 +1197,65 @@ else: let p = getGlobalDispatcher() not p.selector.isEmpty() or p.timers.len != 0 or p.callbacks.len != 0 + template processBasicCallbacks(ident, rwlist: untyped) = + # Process pending descriptor's callbacks. + # Invoke every callback stored in `rwlist`, until first one + # returned `false`, which means callback wants to stay + # alive. In such case all remaining callbacks will be added + # to `rwlist` again, in the order they have been inserted. + # + # `rwlist` associated with file descriptor MUST BE emptied before + # dispatching callback (See https://github.com/nim-lang/Nim/issues/5128), + # or it can be possible to fall into endless cycle. + var curList: seq[Callback] + + withData(p.selector, ident, adata) do: + shallowCopy(curList, adata.rwlist) + adata.rwlist = newSeqOfCap[Callback](InitCallbackListSize) + + let newLength = max(len(curList), InitCallbackListSize) + var newList = newSeqOfCap[Callback](newLength) + + for cb in curList: + if len(newList) > 0: + newList.add(cb) + else: + if not cb(fd.AsyncFD): + newList.add(cb) + + withData(p.selector, ident, adata) do: + adata.rwlist = newList & adata.rwlist + + template processCustomCallbacks(ident: untyped) = + # Process pending custom event callbacks. Custom events are + # {Event.Timer, Event.Signal, Event.Process, Event.Vnode}. + # There can be only one callback registered with one descriptor, + # so there no need to iterate over list. + var curList: seq[Callback] + + withData(p.selector, ident, adata) do: + shallowCopy(curList, adata.readList) + adata.readList = newSeqOfCap[Callback](InitCallbackListSize) + + let newLength = len(curList) + var newList = newSeqOfCap[Callback](newLength) + + var cb = curList[0] + if not cb(fd.AsyncFD): + newList.add(cb) + else: + p.selector.unregister(fd) + + withData(p.selector, ident, adata) do: + adata.readList = newList & adata.readList + proc poll*(timeout = 500) = - var keys: array[64, ReadyKey[AsyncData]] + var keys: array[64, ReadyKey] let p = getGlobalDispatcher() when ioselSupportedPlatform: let customSet = {Event.Timer, Event.Signal, Event.Process, - Event.Vnode, Event.User} + Event.Vnode} if p.selector.isEmpty() and p.timers.len == 0 and p.callbacks.len == 0: raise newException(ValueError, @@ -1209,45 +1266,23 @@ else: var i = 0 while i < count: var custom = false - var fd = keys[i].fd.SocketHandle + let fd = keys[i].fd let events = keys[i].events if Event.Read in events or events == {Event.Error}: - for node in keys[i].data.readCBs[].nodes(): - let cb = node.value - if cb != nil: - if cb(fd.AsyncFD): - keys[i].data.readCBs[].remove(node) - else: - break + processBasicCallbacks(fd, readList) if Event.Write in events or events == {Event.Error}: - for node in keys[i].data.writeCBs[].nodes(): - let cb = node.value - if cb != nil: - if cb(fd.AsyncFD): - keys[i].data.writeCBs[].remove(node) - else: - break + processBasicCallbacks(fd, writeList) + + if Event.User in events or events == {Event.Error}: + custom = true + processBasicCallbacks(fd, readList) when ioselSupportedPlatform: if (customSet * events) != {}: - for node in keys[i].data.readCBs[].nodes(): - let cb = node.value - doAssert(cb != nil) - custom = true - if cb(fd.AsyncFD): - keys[i].data.readCBs[].remove(node) - p.selector.unregister(fd) - else: - if Event.User in events or events == {Event.Error}: - for node in keys[i].data.readCBs[].nodes(): - let cb = node.value - custom = true - if cb != nil: - if cb(fd.AsyncFD): - keys[i].data.readCBs[].remove(node) - p.selector.unregister(fd) + custom = true + processCustomCallbacks(fd) # because state `data` can be modified in callback we need to update # descriptor events with currently registered callbacks. @@ -1255,11 +1290,11 @@ else: var update = false var newEvents: set[Event] = {} p.selector.withData(fd, adata) do: - if not isNil(adata.readCBs.head): incl(newEvents, Event.Read) - if not isNil(adata.writeCBs.head): incl(newEvents, Event.Write) + if len(adata.readList) > 0: incl(newEvents, Event.Read) + if len(adata.writeList) > 0: incl(newEvents, Event.Write) update = true if update: - p.selector.updateHandle(fd, newEvents) + p.selector.updateHandle(SocketHandle(fd), newEvents) inc(i) # Timer processing. @@ -1519,33 +1554,24 @@ else: ## ``oneshot`` - if ``true`` only one event will be dispatched, ## if ``false`` continuous events every ``timeout`` milliseconds. let p = getGlobalDispatcher() - var data = AsyncData( - readCBs: DoublyLinkedListRef(), - writeCBs: DoublyLinkedListRef() - ) - data.readCBs[].append(cb) + var data = newAsyncData() + data.readList.add(cb) p.selector.registerTimer(timeout, oneshot, data) proc addSignal*(signal: int, cb: Callback) = ## Start watching signal ``signal``, and when signal appears, call the ## callback ``cb``. let p = getGlobalDispatcher() - var data = AsyncData( - readCBs: DoublyLinkedListRef(), - writeCBs: DoublyLinkedListRef() - ) - data.readCBs[].append(cb) + var data = newAsyncData() + data.readList.add(cb) p.selector.registerSignal(signal, data) proc addProcess*(pid: int, cb: Callback) = ## Start watching for process exit with pid ``pid``, and then call ## the callback ``cb``. let p = getGlobalDispatcher() - var data = AsyncData( - readCBs: DoublyLinkedListRef(), - writeCBs: DoublyLinkedListRef() - ) - data.readCBs[].append(cb) + var data = newAsyncData() + data.readList.add(cb) p.selector.registerProcess(pid, data) proc newAsyncEvent*(): AsyncEvent = @@ -1564,11 +1590,8 @@ else: ## Start watching for event ``ev``, and call callback ``cb``, when ## ev will be set to signaled state. let p = getGlobalDispatcher() - var data = AsyncData( - readCBs: DoublyLinkedListRef(), - writeCBs: DoublyLinkedListRef() - ) - data.readCBs[].append(cb) + var data = newAsyncData() + data.readList.add(cb) p.selector.registerEvent(SelectEvent(ev), data) proc sleepAsync*(ms: int): Future[void] = |