diff options
-rw-r--r-- | lib/pure/ioselects/ioselectors_kqueue.nim | 63 |
1 files changed, 50 insertions, 13 deletions
diff --git a/lib/pure/ioselects/ioselectors_kqueue.nim b/lib/pure/ioselects/ioselectors_kqueue.nim index 55790c200..01d6eada5 100644 --- a/lib/pure/ioselects/ioselectors_kqueue.nim +++ b/lib/pure/ioselects/ioselectors_kqueue.nim @@ -46,10 +46,12 @@ when hasThreadSupport: SelectorImpl[T] = object kqFD : cint maxFD : int - changes: seq[KEvent] + changes: ptr SharedArray[KEvent] fds: ptr SharedArray[SelectorKey[T]] count: int changesLock: Lock + changesSize: int + changesLength: int sock: cint Selector*[T] = ptr SelectorImpl[T] else: @@ -94,6 +96,8 @@ proc newSelector*[T](): Selector[T] = when hasThreadSupport: result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T]))) result.fds = allocSharedArray[SelectorKey[T]](maxFD) + result.changes = allocSharedArray[KEvent](MAX_KQUEUE_EVENTS) + result.changesSize = MAX_KQUEUE_EVENTS initLock(result.changesLock) else: result = Selector[T]() @@ -101,9 +105,9 @@ proc newSelector*[T](): Selector[T] = result.kqFD = kqFD result.maxFD = maxFD.int - result.changes = newSeqOfCap[KEvent](MAX_KQUEUE_EVENTS) + # we allocating empty socket to duplicate it handle in future, to get unique - # indexes for `fds` array. This is needed to properly identify + # indexes for `fds` array. This is needed to properly identify # {Event.Timer, Event.Signal, Event.Process} events. result.sock = posix.socket(posix.AF_INET, posix.SOCK_STREAM, posix.IPPROTO_TCP).cint @@ -162,20 +166,44 @@ else: template withChangeLock(s, body: untyped) = body -template modifyKQueue[T](s: Selector[T], nident: uint, nfilter: cshort, - nflags: cushort, nfflags: cuint, ndata: int, - nudata: pointer) = - mixin withChangeLock - s.withChangeLock(): +when hasThreadSupport: + template modifyKQueue[T](s: Selector[T], nident: uint, nfilter: cshort, + nflags: cushort, nfflags: cuint, ndata: int, + nudata: pointer) = + mixin withChangeLock + s.withChangeLock(): + if s.changesLength == s.changesSize: + # if cache array is full, we allocating new with size * 2 + let newSize = s.changesSize shl 1 + let rdata = allocSharedArray[KEvent](newSize) + copyMem(rdata, s.changes, s.changesSize * sizeof(KEvent)) + s.changesSize = newSize + s.changes[s.changesLength] = KEvent(ident: nident, + filter: nfilter, flags: nflags, + fflags: nfflags, data: ndata, + udata: nudata) + inc(s.changesLength) + + when not declared(CACHE_EVENTS): + template flushKQueue[T](s: Selector[T]) = + mixin withChangeLock + s.withChangeLock(): + if s.changesLength > 0: + if kevent(s.kqFD, addr(s.changes[0]), cint(s.changesLength), + nil, 0, nil) == -1: + raiseIOSelectorsError(osLastError()) + s.changesLength = 0 +else: + template modifyKQueue[T](s: Selector[T], nident: uint, nfilter: cshort, + nflags: cushort, nfflags: cuint, ndata: int, + nudata: pointer) = s.changes.add(KEvent(ident: nident, filter: nfilter, flags: nflags, fflags: nfflags, data: ndata, udata: nudata)) -when not declared(CACHE_EVENTS): - template flushKQueue[T](s: Selector[T]) = - mixin withChangeLock - s.withChangeLock(): + when not declared(CACHE_EVENTS): + template flushKQueue[T](s: Selector[T]) = let length = cint(len(s.changes)) if length > 0: if kevent(s.kqFD, addr(s.changes[0]), length, @@ -432,7 +460,16 @@ proc selectInto*[T](s: Selector[T], timeout: int, when not declared(CACHE_EVENTS): count = kevent(s.kqFD, nil, cint(0), addr(resTable[0]), cint(maxres), ptv) else: - s.withChangeLock(): + when hasThreadSupport: + s.withChangeLock(): + if s.changesLength > 0: + count = kevent(s.kqFD, addr(s.changes[0]), cint(s.changesLength), + addr(resTable[0]), cint(maxres), ptv) + s.changesLength = 0 + else: + count = kevent(s.kqFD, nil, cint(0), addr(resTable[0]), cint(maxres), + ptv) + else: let length = cint(len(s.changes)) if length > 0: count = kevent(s.kqFD, addr(s.changes[0]), length, |