diff options
author | Daniil Yarancev <21169548+Yardanico@users.noreply.github.com> | 2018-01-07 21:02:00 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-01-07 21:02:00 +0300 |
commit | fb44c522e6173528efa8035ecc459c84887d0167 (patch) | |
tree | a2f5e98606be265981a5f72748896967033e23d7 /lib/pure/asyncdispatch.nim | |
parent | ccf99fa5ce4fe992fb80dc89271faa51456c3fa5 (diff) | |
parent | e23ea64c41e101d4e1d933f0b015f51cc6c2f7de (diff) | |
download | Nim-fb44c522e6173528efa8035ecc459c84887d0167.tar.gz |
Merge pull request #1 from nim-lang/devel
upstream
Diffstat (limited to 'lib/pure/asyncdispatch.nim')
-rw-r--r-- | lib/pure/asyncdispatch.nim | 587 |
1 files changed, 434 insertions, 153 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index 281d5b848..42ffa236c 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -9,8 +9,9 @@ include "system/inclrtl" -import os, tables, strutils, times, heapqueue, options, asyncstreams +import os, tables, strutils, times, heapqueue, lists, options, asyncstreams import asyncfutures except callSoon + import nativesockets, net, deques export Port, SocketFlag @@ -58,9 +59,10 @@ export asyncfutures, asyncstreams ## ## .. code-block::nim ## var future = socket.recv(100) -## future.callback = +## future.addCallback( ## proc () = ## echo(future.read) +## ) ## ## All asynchronous functions returning a ``Future`` will not block. They ## will not however return immediately. An asynchronous function will have @@ -136,6 +138,7 @@ export asyncfutures, asyncstreams ## and occasionally the compilation may fail altogether. ## As such it is better to use the former style when possible. ## +## ## Discarding futures ## ------------------ ## @@ -165,18 +168,20 @@ type timers*: HeapQueue[tuple[finishAt: float, fut: Future[void]]] callbacks*: Deque[proc ()] -proc processTimers(p: PDispatcherBase) {.inline.} = +proc processTimers(p: PDispatcherBase; didSomeWork: var bool) {.inline.} = #Process just part if timers at a step var count = p.timers.len let t = epochTime() while count > 0 and t >= p.timers[0].finishAt: p.timers.pop().fut.complete() dec count + didSomeWork = true -proc processPendingCallbacks(p: PDispatcherBase) = +proc processPendingCallbacks(p: PDispatcherBase; didSomeWork: var bool) = while p.callbacks.len > 0: var cb = p.callbacks.popFirst() cb() + didSomeWork = true proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} = # If dispatcher has active timers this proc returns the timeout @@ -226,6 +231,12 @@ when defined(windows) or defined(nimdoc): ovl: PCustomOverlapped PostCallbackDataPtr = ptr PostCallbackData + AsyncEventImpl = object + hEvent: Handle + hWaiter: Handle + pcd: PostCallbackDataPtr + AsyncEvent* = ptr AsyncEventImpl + Callback = proc (fd: AsyncFD): bool {.closure,gcsafe.} {.deprecated: [TCompletionKey: CompletionKey, TAsyncFD: AsyncFD, TCustomOverlapped: CustomOverlapped, TCompletionData: CompletionData].} @@ -275,14 +286,13 @@ when defined(windows) or defined(nimdoc): let p = getGlobalDispatcher() p.handles.len != 0 or p.timers.len != 0 or p.callbacks.len != 0 - proc poll*(timeout = 500) = - ## Waits for completion events and processes them. Raises ``ValueError`` - ## if there are no pending operations. + proc runOnce(timeout = 500): bool = let p = getGlobalDispatcher() if p.handles.len == 0 and p.timers.len == 0 and p.callbacks.len == 0: raise newException(ValueError, "No handles or timers registered in dispatcher.") + result = false if p.handles.len != 0: let at = p.adjustedTimeout(timeout) var llTimeout = @@ -295,6 +305,7 @@ when defined(windows) or defined(nimdoc): let res = getQueuedCompletionStatus(p.ioPort, addr lpNumberOfBytesTransferred, addr lpCompletionKey, cast[ptr POVERLAPPED](addr customOverlapped), llTimeout).bool + result = true # http://stackoverflow.com/a/12277264/492186 # TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html @@ -324,17 +335,18 @@ when defined(windows) or defined(nimdoc): else: if errCode.int32 == WAIT_TIMEOUT: # Timed out - discard + result = false else: raiseOSError(errCode) # Timer processing. - processTimers(p) + processTimers(p, result) # Callback queue processing - processPendingCallbacks(p) + processPendingCallbacks(p, result) + - var connectExPtr: pointer = nil - var acceptExPtr: pointer = nil - var getAcceptExSockAddrsPtr: pointer = nil + var acceptEx: WSAPROC_ACCEPTEX + var connectEx: WSAPROC_CONNECTEX + var getAcceptExSockAddrs: WSAPROC_GETACCEPTEXSOCKADDRS proc initPointer(s: SocketHandle, fun: var pointer, guid: var GUID): bool = # Ref: https://github.com/powdahound/twisted/blob/master/twisted/internet/iocpreactor/iocpsupport/winsock_pointers.c @@ -346,56 +358,19 @@ when defined(windows) or defined(nimdoc): proc initAll() = let dummySock = newNativeSocket() - if not initPointer(dummySock, connectExPtr, WSAID_CONNECTEX): + if dummySock == INVALID_SOCKET: raiseOSError(osLastError()) - if not initPointer(dummySock, acceptExPtr, WSAID_ACCEPTEX): + var fun: pointer = nil + if not initPointer(dummySock, fun, WSAID_CONNECTEX): raiseOSError(osLastError()) - if not initPointer(dummySock, getAcceptExSockAddrsPtr, WSAID_GETACCEPTEXSOCKADDRS): + connectEx = cast[WSAPROC_CONNECTEX](fun) + if not initPointer(dummySock, fun, WSAID_ACCEPTEX): raiseOSError(osLastError()) - - proc connectEx(s: SocketHandle, name: ptr SockAddr, namelen: cint, - lpSendBuffer: pointer, dwSendDataLength: Dword, - lpdwBytesSent: PDword, lpOverlapped: POVERLAPPED): bool = - if connectExPtr.isNil: raise newException(ValueError, "Need to initialise ConnectEx().") - let fun = - cast[proc (s: SocketHandle, name: ptr SockAddr, namelen: cint, - lpSendBuffer: pointer, dwSendDataLength: Dword, - lpdwBytesSent: PDword, lpOverlapped: POVERLAPPED): bool {.stdcall,gcsafe.}](connectExPtr) - - result = fun(s, name, namelen, lpSendBuffer, dwSendDataLength, lpdwBytesSent, - lpOverlapped) - - proc acceptEx(listenSock, acceptSock: SocketHandle, lpOutputBuffer: pointer, - dwReceiveDataLength, dwLocalAddressLength, - dwRemoteAddressLength: Dword, lpdwBytesReceived: PDword, - lpOverlapped: POVERLAPPED): bool = - if acceptExPtr.isNil: raise newException(ValueError, "Need to initialise AcceptEx().") - let fun = - cast[proc (listenSock, acceptSock: SocketHandle, lpOutputBuffer: pointer, - dwReceiveDataLength, dwLocalAddressLength, - dwRemoteAddressLength: Dword, lpdwBytesReceived: PDword, - lpOverlapped: POVERLAPPED): bool {.stdcall,gcsafe.}](acceptExPtr) - result = fun(listenSock, acceptSock, lpOutputBuffer, dwReceiveDataLength, - dwLocalAddressLength, dwRemoteAddressLength, lpdwBytesReceived, - lpOverlapped) - - proc getAcceptExSockaddrs(lpOutputBuffer: pointer, - dwReceiveDataLength, dwLocalAddressLength, dwRemoteAddressLength: Dword, - LocalSockaddr: ptr ptr SockAddr, LocalSockaddrLength: LPInt, - RemoteSockaddr: ptr ptr SockAddr, RemoteSockaddrLength: LPInt) = - if getAcceptExSockAddrsPtr.isNil: - raise newException(ValueError, "Need to initialise getAcceptExSockAddrs().") - - let fun = - cast[proc (lpOutputBuffer: pointer, - dwReceiveDataLength, dwLocalAddressLength, - dwRemoteAddressLength: Dword, LocalSockaddr: ptr ptr SockAddr, - LocalSockaddrLength: LPInt, RemoteSockaddr: ptr ptr SockAddr, - RemoteSockaddrLength: LPInt) {.stdcall,gcsafe.}](getAcceptExSockAddrsPtr) - - fun(lpOutputBuffer, dwReceiveDataLength, dwLocalAddressLength, - dwRemoteAddressLength, LocalSockaddr, LocalSockaddrLength, - RemoteSockaddr, RemoteSockaddrLength) + acceptEx = cast[WSAPROC_ACCEPTEX](fun) + if not initPointer(dummySock, fun, WSAID_GETACCEPTEXSOCKADDRS): + raiseOSError(osLastError()) + getAcceptExSockAddrs = cast[WSAPROC_GETACCEPTEXSOCKADDRS](fun) + close(dummySock) proc recv*(socket: AsyncFD, size: int, flags = {SocketFlag.SafeDisconn}): Future[string] = @@ -506,10 +481,7 @@ when defined(windows) or defined(nimdoc): proc (fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = if not retFuture.finished: if errcode == OSErrorCode(-1): - if bytesCount == 0 and dataBuf.buf[0] == '\0': - retFuture.complete(0) - else: - retFuture.complete(bytesCount) + retFuture.complete(bytesCount) else: if flags.isDisconnectionError(errcode): retFuture.complete(0) @@ -543,10 +515,11 @@ when defined(windows) or defined(nimdoc): proc send*(socket: AsyncFD, buf: pointer, size: int, flags = {SocketFlag.SafeDisconn}): Future[void] = - ## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future will complete once all - ## data has been sent. - ## **WARNING**: Use it with caution. If ``buf`` refers to GC'ed object, you must use GC_ref/GC_unref calls - ## to avoid early freeing of the buffer + ## Sends ``size`` bytes from ``buf`` to ``socket``. The returned future + ## will complete once all data has been sent. + ## + ## **WARNING**: Use it with caution. If ``buf`` refers to GC'ed object, + ## you must use GC_ref/GC_unref calls to avoid early freeing of the buffer. verifyPresence(socket) var retFuture = newFuture[void]("send") @@ -793,7 +766,7 @@ when defined(windows) or defined(nimdoc): cast[pointer](p.ovl)) {.pop.} - template registerWaitableEvent(mask) = + proc registerWaitableEvent(fd: AsyncFD, cb: Callback; mask: Dword) = let p = getGlobalDispatcher() var flags = (WT_EXECUTEINWAITTHREAD or WT_EXECUTEONLYONCE).Dword var hEvent = wsaCreateEvent() @@ -843,8 +816,8 @@ when defined(windows) or defined(nimdoc): cast[pointer](pcd), INFINITE, flags): # pcd.ovl will be unrefed in poll() let err = osLastError() - discard wsaCloseEvent(hEvent) deallocShared(cast[pointer](pcd)) + discard wsaCloseEvent(hEvent) raiseOSError(err) else: # we incref `pcd.ovl` and `protect` callback one more time, @@ -883,16 +856,17 @@ when defined(windows) or defined(nimdoc): ## ## This is not ``pure`` mechanism for Windows Completion Ports (IOCP), ## so if you can avoid it, please do it. Use `addRead` only if really - ## need it (main usecase is adaptation of `unix like` libraries to be + ## need it (main usecase is adaptation of unix-like libraries to be ## asynchronous on Windows). - ## If you use this function, you dont need to use asyncdispatch.recv() + ## + ## If you use this function, you don't need to use asyncdispatch.recv() ## or asyncdispatch.accept(), because they are using IOCP, please use ## nativesockets.recv() and nativesockets.accept() instead. ## ## Be sure your callback ``cb`` returns ``true``, if you want to remove ## watch of `read` notifications, and ``false``, if you want to continue - ## receiving notifies. - registerWaitableEvent(FD_READ or FD_ACCEPT or FD_OOB or FD_CLOSE) + ## receiving notifications. + registerWaitableEvent(fd, cb, FD_READ or FD_ACCEPT or FD_OOB or FD_CLOSE) proc addWrite*(fd: AsyncFD, cb: Callback) = ## Start watching the file descriptor for write availability and then call @@ -900,43 +874,213 @@ when defined(windows) or defined(nimdoc): ## ## This is not ``pure`` mechanism for Windows Completion Ports (IOCP), ## so if you can avoid it, please do it. Use `addWrite` only if really - ## need it (main usecase is adaptation of `unix like` libraries to be + ## need it (main usecase is adaptation of unix-like libraries to be ## asynchronous on Windows). - ## If you use this function, you dont need to use asyncdispatch.send() + ## + ## If you use this function, you don't need to use asyncdispatch.send() ## or asyncdispatch.connect(), because they are using IOCP, please use ## nativesockets.send() and nativesockets.connect() instead. ## ## Be sure your callback ``cb`` returns ``true``, if you want to remove ## watch of `write` notifications, and ``false``, if you want to continue - ## receiving notifies. - registerWaitableEvent(FD_WRITE or FD_CONNECT or FD_CLOSE) + ## receiving notifications. + registerWaitableEvent(fd, cb, FD_WRITE or FD_CONNECT or FD_CLOSE) + + template registerWaitableHandle(p, hEvent, flags, pcd, timeout, + handleCallback) = + let handleFD = AsyncFD(hEvent) + pcd.ioPort = p.ioPort + pcd.handleFd = handleFD + var ol = PCustomOverlapped() + GC_ref(ol) + ol.data.fd = handleFD + ol.data.cb = handleCallback + # We need to protect our callback environment value, so GC will not free it + # accidentally. + ol.data.cell = system.protect(rawEnv(ol.data.cb)) + + pcd.ovl = ol + if not registerWaitForSingleObject(addr(pcd.waitFd), hEvent, + cast[WAITORTIMERCALLBACK](waitableCallback), + cast[pointer](pcd), timeout.Dword, flags): + let err = osLastError() + GC_unref(ol) + deallocShared(cast[pointer](pcd)) + discard closeHandle(hEvent) + raiseOSError(err) + p.handles.incl(handleFD) + + template closeWaitable(handle: untyped) = + let waitFd = pcd.waitFd + deallocShared(cast[pointer](pcd)) + p.handles.excl(fd) + if unregisterWait(waitFd) == 0: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + discard closeHandle(handle) + raiseOSError(err) + if closeHandle(handle) == 0: + raiseOSError(osLastError()) + + proc addTimer*(timeout: int, oneshot: bool, cb: Callback) = + ## Registers callback ``cb`` to be called when timer expired. + ## + ## Parameters: + ## + ## * ``timeout`` - timeout value in milliseconds. + ## * ``oneshot`` + ## * `true` - generate only one timeout event + ## * `false` - generate timeout events periodically + + doAssert(timeout > 0) + let p = getGlobalDispatcher() + + var hEvent = createEvent(nil, 1, 0, nil) + if hEvent == INVALID_HANDLE_VALUE: + raiseOSError(osLastError()) + + var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData))) + var flags = WT_EXECUTEINWAITTHREAD.Dword + if oneshot: flags = flags or WT_EXECUTEONLYONCE + + proc timercb(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + let res = cb(fd) + if res or oneshot: + closeWaitable(hEvent) + else: + # if callback returned `false`, then it wants to be called again, so + # we need to ref and protect `pcd.ovl` again, because it will be + # unrefed and disposed in `poll()`. + GC_ref(pcd.ovl) + pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb)) + + registerWaitableHandle(p, hEvent, flags, pcd, timeout, timercb) + + proc addProcess*(pid: int, cb: Callback) = + ## Registers callback ``cb`` to be called when process with process ID + ## ``pid`` exited. + let p = getGlobalDispatcher() + let procFlags = SYNCHRONIZE + var hProcess = openProcess(procFlags, 0, pid.Dword) + if hProcess == INVALID_HANDLE_VALUE: + raiseOSError(osLastError()) + + var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData))) + var flags = WT_EXECUTEINWAITTHREAD.Dword + + proc proccb(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + closeWaitable(hProcess) + discard cb(fd) + + registerWaitableHandle(p, hProcess, flags, pcd, INFINITE, proccb) + + proc newAsyncEvent*(): AsyncEvent = + ## Creates a new thread-safe ``AsyncEvent`` object. + ## + ## New ``AsyncEvent`` object is not automatically registered with # TODO: Why? -- DP + ## dispatcher like ``AsyncSocket``. + var sa = SECURITY_ATTRIBUTES( + nLength: sizeof(SECURITY_ATTRIBUTES).cint, + bInheritHandle: 1 + ) + var event = createEvent(addr(sa), 0'i32, 0'i32, nil) + if event == INVALID_HANDLE_VALUE: + raiseOSError(osLastError()) + result = cast[AsyncEvent](allocShared0(sizeof(AsyncEventImpl))) + result.hEvent = event + + proc trigger*(ev: AsyncEvent) = + ## Set event ``ev`` to signaled state. + if setEvent(ev.hEvent) == 0: + raiseOSError(osLastError()) + + proc unregister*(ev: AsyncEvent) = + ## Unregisters event ``ev``. + doAssert(ev.hWaiter != 0, "Event is not registered in the queue!") + let p = getGlobalDispatcher() + p.handles.excl(AsyncFD(ev.hEvent)) + if unregisterWait(ev.hWaiter) == 0: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + raiseOSError(err) + ev.hWaiter = 0 + + proc close*(ev: AsyncEvent) = + ## Closes event ``ev``. + let res = closeHandle(ev.hEvent) + deallocShared(cast[pointer](ev)) + if res == 0: + raiseOSError(osLastError()) + + proc addEvent*(ev: AsyncEvent, cb: Callback) = + ## Registers callback ``cb`` to be called when ``ev`` will be signaled + doAssert(ev.hWaiter == 0, "Event is already registered in the queue!") + + let p = getGlobalDispatcher() + let hEvent = ev.hEvent + + var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData))) + var flags = WT_EXECUTEINWAITTHREAD.Dword + + proc eventcb(fd: AsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + if ev.hWaiter != 0: + if cb(fd): + # we need this check to avoid exception, if `unregister(event)` was + # called in callback. + deallocShared(cast[pointer](pcd)) + if ev.hWaiter != 0: + unregister(ev) + else: + # if callback returned `false`, then it wants to be called again, so + # we need to ref and protect `pcd.ovl` again, because it will be + # unrefed and disposed in `poll()`. + GC_ref(pcd.ovl) + pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb)) + else: + # if ev.hWaiter == 0, then event was unregistered before `poll()` call. + deallocShared(cast[pointer](pcd)) + + registerWaitableHandle(p, hEvent, flags, pcd, INFINITE, eventcb) + ev.hWaiter = pcd.waitFd initAll() else: import selectors from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK, MSG_NOSIGNAL - + const + InitCallbackListSize = 4 # initial size of callbacks sequence, + # associated with file/socket descriptor. + InitDelayedCallbackListSize = 64 # initial size of delayed callbacks + # queue. type AsyncFD* = distinct cint Callback = proc (fd: AsyncFD): bool {.closure,gcsafe.} - PData* = ref object of RootRef - fd: AsyncFD - readCBs: seq[Callback] - writeCBs: seq[Callback] + AsyncData = object + readList: seq[Callback] + writeList: seq[Callback] + + AsyncEvent* = distinct SelectEvent PDispatcher* = ref object of PDispatcherBase - selector: Selector + selector: Selector[AsyncData] {.deprecated: [TAsyncFD: AsyncFD, TCallback: Callback].} proc `==`*(x, y: AsyncFD): bool {.borrow.} + proc `==`*(x, y: AsyncEvent): bool {.borrow.} + + template newAsyncData(): AsyncData = + AsyncData( + readList: newSeqOfCap[Callback](InitCallbackListSize), + writeList: newSeqOfCap[Callback](InitCallbackListSize) + ) proc newDispatcher*(): PDispatcher = new result - result.selector = newSelector() + result.selector = newSelector[AsyncData]() result.timers.newHeapQueue() - result.callbacks = initDeque[proc ()](64) + result.callbacks = initDeque[proc ()](InitDelayedCallbackListSize) var gDisp{.threadvar.}: PDispatcher ## Global dispatcher @@ -951,15 +1095,10 @@ else: setGlobalDispatcher(newDispatcher()) result = gDisp - proc update(fd: AsyncFD, events: set[Event]) = - let p = getGlobalDispatcher() - assert fd.SocketHandle in p.selector - p.selector.update(fd.SocketHandle, events) - proc register*(fd: AsyncFD) = let p = getGlobalDispatcher() - var data = PData(fd: fd, readCBs: @[], writeCBs: @[]) - p.selector.register(fd.SocketHandle, {}, data.RootRef) + var data = newAsyncData() + p.selector.registerHandle(fd.SocketHandle, {}, data) proc closeSocket*(sock: AsyncFD) = let disp = getGlobalDispatcher() @@ -969,80 +1108,158 @@ else: proc unregister*(fd: AsyncFD) = getGlobalDispatcher().selector.unregister(fd.SocketHandle) + proc unregister*(ev: AsyncEvent) = + getGlobalDispatcher().selector.unregister(SelectEvent(ev)) + proc addRead*(fd: AsyncFD, cb: Callback) = let p = getGlobalDispatcher() - if fd.SocketHandle notin p.selector: + var newEvents = {Event.Read} + withData(p.selector, fd.SocketHandle, adata) do: + adata.readList.add(cb) + newEvents.incl(Event.Read) + if len(adata.writeList) != 0: newEvents.incl(Event.Write) + do: raise newException(ValueError, "File descriptor not registered.") - p.selector[fd.SocketHandle].data.PData.readCBs.add(cb) - update(fd, p.selector[fd.SocketHandle].events + {EvRead}) + p.selector.updateHandle(fd.SocketHandle, newEvents) proc addWrite*(fd: AsyncFD, cb: Callback) = let p = getGlobalDispatcher() - if fd.SocketHandle notin p.selector: + var newEvents = {Event.Write} + withData(p.selector, fd.SocketHandle, adata) do: + adata.writeList.add(cb) + newEvents.incl(Event.Write) + if len(adata.readList) != 0: newEvents.incl(Event.Read) + do: raise newException(ValueError, "File descriptor not registered.") - p.selector[fd.SocketHandle].data.PData.writeCBs.add(cb) - update(fd, p.selector[fd.SocketHandle].events + {EvWrite}) - - template processCallbacks(callbacks: untyped) = - # Callback may add items to ``callbacks`` which causes issues if - # we are iterating over it at the same time. We therefore - # make a copy to iterate over. - let currentCBs = callbacks - callbacks = @[] - # Using another sequence because callbacks themselves can add - # other callbacks. - var newCBs: seq[Callback] = @[] - for cb in currentCBs: - if newCBs.len > 0: - # A callback has already returned with EAGAIN, don't call any - # others until next `poll`. - newCBs.add(cb) - else: - if not cb(data.fd): - # Callback wants to be called again. - newCBs.add(cb) - callbacks = newCBs & callbacks + p.selector.updateHandle(fd.SocketHandle, newEvents) proc hasPendingOperations*(): bool = let p = getGlobalDispatcher() - p.selector.len != 0 or p.timers.len != 0 or p.callbacks.len != 0 - - proc poll*(timeout = 500) = + not p.selector.isEmpty() or p.timers.len != 0 or p.callbacks.len != 0 + + template processBasicCallbacks(ident, rwlist: untyped) = + # Process pending descriptor and AsyncEvent callbacks. + # + # Invoke every callback stored in `rwlist`, until one + # returns `false` (which means callback wants to stay + # alive). In such case all remaining callbacks will be added + # to `rwlist` again, in the order they have been inserted. + # + # `rwlist` associated with file descriptor MUST BE emptied before + # dispatching callback (See https://github.com/nim-lang/Nim/issues/5128), + # or it can be possible to fall into endless cycle. + var curList: seq[Callback] + + withData(p.selector, ident, adata) do: + shallowCopy(curList, adata.rwlist) + adata.rwlist = newSeqOfCap[Callback](InitCallbackListSize) + + let newLength = max(len(curList), InitCallbackListSize) + var newList = newSeqOfCap[Callback](newLength) + + for cb in curList: + if len(newList) > 0: + # A callback has already returned with EAGAIN, don't call any others + # until next `poll`. + newList.add(cb) + else: + if not cb(fd.AsyncFD): + # Callback wants to be called again. + newList.add(cb) + + withData(p.selector, ident, adata) do: + # descriptor still present in queue. + adata.rwlist = newList & adata.rwlist + rLength = len(adata.readList) + wLength = len(adata.writeList) + do: + # descriptor was unregistered in callback via `unregister()`. + rLength = -1 + wLength = -1 + + template processCustomCallbacks(ident: untyped) = + # Process pending custom event callbacks. Custom events are + # {Event.Timer, Event.Signal, Event.Process, Event.Vnode}. + # There can be only one callback registered with one descriptor, + # so there is no need to iterate over list. + var curList: seq[Callback] + + withData(p.selector, ident, adata) do: + shallowCopy(curList, adata.readList) + adata.readList = newSeqOfCap[Callback](InitCallbackListSize) + + let newLength = len(curList) + var newList = newSeqOfCap[Callback](newLength) + + var cb = curList[0] + if not cb(fd.AsyncFD): + newList.add(cb) + + withData(p.selector, ident, adata) do: + # descriptor still present in queue. + adata.readList = newList & adata.readList + if len(adata.readList) == 0: + # if no callbacks registered with descriptor, unregister it. + p.selector.unregister(fd) + do: + # descriptor was unregistered in callback via `unregister()`. + discard + + proc runOnce(timeout = 500): bool = let p = getGlobalDispatcher() - if p.selector.len == 0 and p.timers.len == 0 and p.callbacks.len == 0: + when ioselSupportedPlatform: + let customSet = {Event.Timer, Event.Signal, Event.Process, + Event.Vnode} + + if p.selector.isEmpty() and p.timers.len == 0 and p.callbacks.len == 0: raise newException(ValueError, "No handles or timers registered in dispatcher.") - if p.selector.len > 0: - for info in p.selector.select(p.adjustedTimeout(timeout)): - let data = PData(info.key.data) - assert data.fd == info.key.fd.AsyncFD - #echo("In poll ", data.fd.cint) - # There may be EvError here, but we handle them in callbacks, - # so that exceptions can be raised from `send(...)` and - # `recv(...)` routines. - - if EvRead in info.events or info.events == {EvError}: - processCallbacks(data.readCBs) - - if EvWrite in info.events or info.events == {EvError}: - processCallbacks(data.writeCBs) - - if info.key in p.selector: - var newEvents: set[Event] - if data.readCBs.len != 0: newEvents = {EvRead} - if data.writeCBs.len != 0: newEvents = newEvents + {EvWrite} - if newEvents != info.key.events: - update(data.fd, newEvents) - else: - # FD no longer a part of the selector. Likely been closed - # (e.g. socket disconnected). - discard + result = false + if not p.selector.isEmpty(): + var keys: array[64, ReadyKey] + var count = p.selector.selectInto(p.adjustedTimeout(timeout), keys) + for i in 0..<count: + var custom = false + let fd = keys[i].fd + let events = keys[i].events + var rLength = 0 # len(data.readList) after callback + var wLength = 0 # len(data.writeList) after callback + + if Event.Read in events or events == {Event.Error}: + processBasicCallbacks(fd, readList) + result = true + + if Event.Write in events or events == {Event.Error}: + processBasicCallbacks(fd, writeList) + result = true + + if Event.User in events: + processBasicCallbacks(fd, readList) + custom = true + if rLength == 0: + p.selector.unregister(fd) + result = true + + when ioselSupportedPlatform: + if (customSet * events) != {}: + custom = true + processCustomCallbacks(fd) + result = true + + # because state `data` can be modified in callback we need to update + # descriptor events with currently registered callbacks. + if not custom: + var newEvents: set[Event] = {} + if rLength != -1 and wLength != -1: + if rLength > 0: incl(newEvents, Event.Read) + if wLength > 0: incl(newEvents, Event.Write) + p.selector.updateHandle(SocketHandle(fd), newEvents) # Timer processing. - processTimers(p) + processTimers(p, result) # Callback queue processing - processPendingCallbacks(p) + processPendingCallbacks(p, result) proc recv*(socket: AsyncFD, size: int, flags = {SocketFlag.SafeDisconn}): Future[string] = @@ -1075,7 +1292,7 @@ else: return retFuture proc recvInto*(socket: AsyncFD, buf: pointer, size: int, - flags = {SocketFlag.SafeDisconn}): Future[int] = + flags = {SocketFlag.SafeDisconn}): Future[int] = var retFuture = newFuture[int]("recvInto") proc cb(sock: AsyncFD): bool = @@ -1216,6 +1433,68 @@ else: addRead(socket, cb) return retFuture + when ioselSupportedPlatform: + + proc addTimer*(timeout: int, oneshot: bool, cb: Callback) = + ## Start watching for timeout expiration, and then call the + ## callback ``cb``. + ## ``timeout`` - time in milliseconds, + ## ``oneshot`` - if ``true`` only one event will be dispatched, + ## if ``false`` continuous events every ``timeout`` milliseconds. + let p = getGlobalDispatcher() + var data = newAsyncData() + data.readList.add(cb) + p.selector.registerTimer(timeout, oneshot, data) + + proc addSignal*(signal: int, cb: Callback) = + ## Start watching signal ``signal``, and when signal appears, call the + ## callback ``cb``. + let p = getGlobalDispatcher() + var data = newAsyncData() + data.readList.add(cb) + p.selector.registerSignal(signal, data) + + proc addProcess*(pid: int, cb: Callback) = + ## Start watching for process exit with pid ``pid``, and then call + ## the callback ``cb``. + let p = getGlobalDispatcher() + var data = newAsyncData() + data.readList.add(cb) + p.selector.registerProcess(pid, data) + + proc newAsyncEvent*(): AsyncEvent = + ## Creates new ``AsyncEvent``. + result = AsyncEvent(newSelectEvent()) + + proc trigger*(ev: AsyncEvent) = + ## Sets new ``AsyncEvent`` to signaled state. + trigger(SelectEvent(ev)) + + proc close*(ev: AsyncEvent) = + ## Closes ``AsyncEvent`` + close(SelectEvent(ev)) + + proc addEvent*(ev: AsyncEvent, cb: Callback) = + ## Start watching for event ``ev``, and call callback ``cb``, when + ## ev will be set to signaled state. + let p = getGlobalDispatcher() + var data = newAsyncData() + data.readList.add(cb) + p.selector.registerEvent(SelectEvent(ev), data) + +proc drain*(timeout = 500) = + ## Waits for completion events and processes them. Raises ``ValueError`` + ## if there are no pending operations. In contrast to ``poll`` this + ## processes as many events as are available. + if runOnce(timeout): + while hasPendingOperations() and runOnce(0): discard + +proc poll*(timeout = 500) = + ## Waits for completion events and processes them. Raises ``ValueError`` + ## if there are no pending operations. This runs the underlying OS + ## `epoll`:idx: or `kqueue`:idx: primitive only once. + discard runOnce(timeout) + # Common procedures between current and upcoming asyncdispatch include includes.asynccommon @@ -1269,7 +1548,7 @@ proc send*(socket: AsyncFD, data: string, var copiedData = data GC_ref(copiedData) # we need to protect data until send operation is completed - # or failed. + # or failed. let sendFut = socket.send(addr copiedData[0], data.len, flags) sendFut.callback = @@ -1317,7 +1596,7 @@ proc recvLine*(socket: AsyncFD): Future[string] {.async, deprecated.} = ## ## **Deprecated since version 0.15.0**: Use ``asyncnet.recvLine()`` instead. - template addNLIfEmpty(): untyped = + template addNLIfEmpty(): typed = if result.len == 0: result.add("\c\L") @@ -1353,3 +1632,5 @@ proc waitFor*[T](fut: Future[T]): T = poll() fut.read + +{.deprecated: [setEvent: trigger].} |