summary refs log tree commit diff stats
path: root/lib/pure/ioselects/ioselectors_kqueue.nim
diff options
context:
space:
mode:
authorcheatfate <ka@hardcore.kiev.ua>2017-01-26 18:28:34 +0200
committercheatfate <ka@hardcore.kiev.ua>2017-01-26 18:28:34 +0200
commit78e3bd392c532ff3f6dad9204ec8d0c78bfcc3ad (patch)
tree51fe018d5bd0f444c993a1d4a48e7c2d11f0cafc /lib/pure/ioselects/ioselectors_kqueue.nim
parentca0b16fd337d15abd2ea873a06dbca85777d886e (diff)
downloadNim-78e3bd392c532ff3f6dad9204ec8d0c78bfcc3ad.tar.gz
Fix changes table must be SharedArray when hasThreadSupport.
Diffstat (limited to 'lib/pure/ioselects/ioselectors_kqueue.nim')
-rw-r--r--lib/pure/ioselects/ioselectors_kqueue.nim63
1 files changed, 50 insertions, 13 deletions
diff --git a/lib/pure/ioselects/ioselectors_kqueue.nim b/lib/pure/ioselects/ioselectors_kqueue.nim
index 55790c200..01d6eada5 100644
--- a/lib/pure/ioselects/ioselectors_kqueue.nim
+++ b/lib/pure/ioselects/ioselectors_kqueue.nim
@@ -46,10 +46,12 @@ when hasThreadSupport:
     SelectorImpl[T] = object
       kqFD : cint
       maxFD : int
-      changes: seq[KEvent]
+      changes: ptr SharedArray[KEvent]
       fds: ptr SharedArray[SelectorKey[T]]
       count: int
       changesLock: Lock
+      changesSize: int
+      changesLength: int
       sock: cint
     Selector*[T] = ptr SelectorImpl[T]
 else:
@@ -94,6 +96,8 @@ proc newSelector*[T](): Selector[T] =
   when hasThreadSupport:
     result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T])))
     result.fds = allocSharedArray[SelectorKey[T]](maxFD)
+    result.changes = allocSharedArray[KEvent](MAX_KQUEUE_EVENTS)
+    result.changesSize = MAX_KQUEUE_EVENTS
     initLock(result.changesLock)
   else:
     result = Selector[T]()
@@ -101,9 +105,9 @@ proc newSelector*[T](): Selector[T] =
 
   result.kqFD = kqFD
   result.maxFD = maxFD.int
-  result.changes = newSeqOfCap[KEvent](MAX_KQUEUE_EVENTS)
+
   # we allocating empty socket to duplicate it handle in future, to get unique
-  # indexes for `fds` array. This is needed to properly identify 
+  # indexes for `fds` array. This is needed to properly identify
   # {Event.Timer, Event.Signal, Event.Process} events.
   result.sock = posix.socket(posix.AF_INET, posix.SOCK_STREAM,
                              posix.IPPROTO_TCP).cint
@@ -162,20 +166,44 @@ else:
   template withChangeLock(s, body: untyped) =
     body
 
-template modifyKQueue[T](s: Selector[T], nident: uint, nfilter: cshort,
-                         nflags: cushort, nfflags: cuint, ndata: int,
-                         nudata: pointer) =
-  mixin withChangeLock
-  s.withChangeLock():
+when hasThreadSupport:
+  template modifyKQueue[T](s: Selector[T], nident: uint, nfilter: cshort,
+                           nflags: cushort, nfflags: cuint, ndata: int,
+                           nudata: pointer) =
+    mixin withChangeLock
+    s.withChangeLock():
+      if s.changesLength == s.changesSize:
+        # if cache array is full, we allocating new with size * 2
+        let newSize = s.changesSize shl 1
+        let rdata = allocSharedArray[KEvent](newSize)
+        copyMem(rdata, s.changes, s.changesSize * sizeof(KEvent))
+        s.changesSize = newSize
+      s.changes[s.changesLength] = KEvent(ident: nident,
+                                          filter: nfilter, flags: nflags,
+                                          fflags: nfflags, data: ndata,
+                                          udata: nudata)
+      inc(s.changesLength)
+
+  when not declared(CACHE_EVENTS):
+    template flushKQueue[T](s: Selector[T]) =
+      mixin withChangeLock
+      s.withChangeLock():
+        if s.changesLength > 0:
+          if kevent(s.kqFD, addr(s.changes[0]), cint(s.changesLength),
+                    nil, 0, nil) == -1:
+            raiseIOSelectorsError(osLastError())
+          s.changesLength = 0
+else:
+  template modifyKQueue[T](s: Selector[T], nident: uint, nfilter: cshort,
+                           nflags: cushort, nfflags: cuint, ndata: int,
+                           nudata: pointer) =
     s.changes.add(KEvent(ident: nident,
                          filter: nfilter, flags: nflags,
                          fflags: nfflags, data: ndata,
                          udata: nudata))
 
-when not declared(CACHE_EVENTS):
-  template flushKQueue[T](s: Selector[T]) =
-    mixin withChangeLock
-    s.withChangeLock():
+  when not declared(CACHE_EVENTS):
+    template flushKQueue[T](s: Selector[T]) =
       let length = cint(len(s.changes))
       if length > 0:
         if kevent(s.kqFD, addr(s.changes[0]), length,
@@ -432,7 +460,16 @@ proc selectInto*[T](s: Selector[T], timeout: int,
   when not declared(CACHE_EVENTS):
     count = kevent(s.kqFD, nil, cint(0), addr(resTable[0]), cint(maxres), ptv)
   else:
-    s.withChangeLock():
+    when hasThreadSupport:
+      s.withChangeLock():
+        if s.changesLength > 0:
+          count = kevent(s.kqFD, addr(s.changes[0]), cint(s.changesLength),
+                         addr(resTable[0]), cint(maxres), ptv)
+          s.changesLength = 0
+        else:
+          count = kevent(s.kqFD, nil, cint(0), addr(resTable[0]), cint(maxres),
+                         ptv)
+    else:
       let length = cint(len(s.changes))
       if length > 0:
         count = kevent(s.kqFD, addr(s.changes[0]), length,