summary refs log tree commit diff stats
path: root/lib/pure/ioselects/ioselectors_epoll.nim
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pure/ioselects/ioselectors_epoll.nim')
-rw-r--r--lib/pure/ioselects/ioselectors_epoll.nim534
1 files changed, 534 insertions, 0 deletions
diff --git a/lib/pure/ioselects/ioselectors_epoll.nim b/lib/pure/ioselects/ioselectors_epoll.nim
new file mode 100644
index 000000000..10658b78e
--- /dev/null
+++ b/lib/pure/ioselects/ioselectors_epoll.nim
@@ -0,0 +1,534 @@
+#
+#
+#            Nim's Runtime Library
+#        (c) Copyright 2016 Eugene Kabanov
+#
+#    See the file "copying.txt", included in this
+#    distribution, for details about the copyright.
+#
+
+# This module implements Linux epoll().
+
+import std/[posix, times, epoll]
+
+# Maximum number of events that can be returned
+const MAX_EPOLL_EVENTS = 64
+
+when not defined(android):
+  type
+    SignalFdInfo* {.importc: "struct signalfd_siginfo",
+                    header: "<sys/signalfd.h>", pure, final.} = object
+      ssi_signo*: uint32
+      ssi_errno*: int32
+      ssi_code*: int32
+      ssi_pid*: uint32
+      ssi_uid*: uint32
+      ssi_fd*: int32
+      ssi_tid*: uint32
+      ssi_band*: uint32
+      ssi_overrun*: uint32
+      ssi_trapno*: uint32
+      ssi_status*: int32
+      ssi_int*: int32
+      ssi_ptr*: uint64
+      ssi_utime*: uint64
+      ssi_stime*: uint64
+      ssi_addr*: uint64
+      pad* {.importc: "__pad".}: array[0..47, uint8]
+
+proc timerfd_create(clock_id: ClockId, flags: cint): cint
+     {.cdecl, importc: "timerfd_create", header: "<sys/timerfd.h>".}
+proc timerfd_settime(ufd: cint, flags: cint,
+                      utmr: var Itimerspec, otmr: var Itimerspec): cint
+     {.cdecl, importc: "timerfd_settime", header: "<sys/timerfd.h>".}
+proc eventfd(count: cuint, flags: cint): cint
+     {.cdecl, importc: "eventfd", header: "<sys/eventfd.h>".}
+
+when not defined(android):
+  proc signalfd(fd: cint, mask: var Sigset, flags: cint): cint
+       {.cdecl, importc: "signalfd", header: "<sys/signalfd.h>".}
+
+when hasThreadSupport:
+  type
+    SelectorImpl[T] = object
+      epollFD: cint
+      maxFD: int
+      numFD: int
+      fds: ptr SharedArray[SelectorKey[T]]
+      count*: int
+    Selector*[T] = ptr SelectorImpl[T]
+else:
+  type
+    SelectorImpl[T] = object
+      epollFD: cint
+      maxFD: int
+      numFD: int
+      fds: seq[SelectorKey[T]]
+      count*: int
+    Selector*[T] = ref SelectorImpl[T]
+type
+  SelectEventImpl = object
+    efd: cint
+  SelectEvent* = ptr SelectEventImpl
+
+proc newSelector*[T](): Selector[T] =
+  proc initialNumFD(): int {.inline.} =
+    when defined(nuttx):
+      result = NEPOLL_MAX
+    else:
+      result = 1024
+  # Retrieve the maximum fd count (for current OS) via getrlimit()
+  var maxFD = maxDescriptors()
+  doAssert(maxFD > 0)
+  # Start with a reasonable size, checkFd() will grow this on demand
+  let numFD = initialNumFD()
+
+  var epollFD = epoll_create1(O_CLOEXEC)
+  if epollFD < 0:
+    raiseOSError(osLastError())
+
+  when hasThreadSupport:
+    result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T])))
+    result.epollFD = epollFD
+    result.maxFD = maxFD
+    result.numFD = numFD
+    result.fds = allocSharedArray[SelectorKey[T]](numFD)
+  else:
+    result = Selector[T]()
+    result.epollFD = epollFD
+    result.maxFD = maxFD
+    result.numFD = numFD
+    result.fds = newSeq[SelectorKey[T]](numFD)
+
+  for i in 0 ..< numFD:
+    result.fds[i].ident = InvalidIdent
+
+proc close*[T](s: Selector[T]) =
+  let res = posix.close(s.epollFD)
+  when hasThreadSupport:
+    deallocSharedArray(s.fds)
+    deallocShared(cast[pointer](s))
+  if res != 0:
+    raiseIOSelectorsError(osLastError())
+
+proc newSelectEvent*(): SelectEvent =
+  let fdci = eventfd(0, O_CLOEXEC or O_NONBLOCK)
+  if fdci == -1:
+    raiseIOSelectorsError(osLastError())
+  result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl)))
+  result.efd = fdci
+
+proc trigger*(ev: SelectEvent) =
+  var data: uint64 = 1
+  if posix.write(ev.efd, addr data, sizeof(uint64)) == -1:
+    raiseIOSelectorsError(osLastError())
+
+proc close*(ev: SelectEvent) =
+  let res = posix.close(ev.efd)
+  deallocShared(cast[pointer](ev))
+  if res != 0:
+    raiseIOSelectorsError(osLastError())
+
+template checkFd(s, f) =
+  # TODO: I don't see how this can ever happen. You won't be able to create an
+  # FD if there is too many. -- DP
+  if f >= s.maxFD:
+    raiseIOSelectorsError("Maximum number of descriptors is exhausted!")
+  if f >= s.numFD:
+    var numFD = s.numFD
+    while numFD <= f: numFD *= 2
+    when hasThreadSupport:
+      s.fds = reallocSharedArray(s.fds, s.numFD, numFD)
+    else:
+      s.fds.setLen(numFD)
+    for i in s.numFD ..< numFD:
+      s.fds[i].ident = InvalidIdent
+    s.numFD = numFD
+
+proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle,
+                        events: set[Event], data: T) =
+  let fdi = int(fd)
+  s.checkFd(fdi)
+  doAssert(s.fds[fdi].ident == InvalidIdent, "Descriptor $# already registered" % $fdi)
+  s.setKey(fdi, events, 0, data)
+  if events != {}:
+    var epv = EpollEvent(events: EPOLLRDHUP)
+    epv.data.u64 = fdi.uint
+    if Event.Read in events: epv.events = epv.events or EPOLLIN
+    if Event.Write in events: epv.events = epv.events or EPOLLOUT
+    if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0:
+      raiseIOSelectorsError(osLastError())
+    inc(s.count)
+
+proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle, events: set[Event]) =
+  let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode,
+                    Event.User, Event.Oneshot, Event.Error}
+  let fdi = int(fd)
+  s.checkFd(fdi)
+  var pkey = addr(s.fds[fdi])
+  doAssert(pkey.ident != InvalidIdent,
+           "Descriptor $# is not registered in the selector!" % $fdi)
+  doAssert(pkey.events * maskEvents == {})
+  if pkey.events != events:
+    var epv = EpollEvent(events: EPOLLRDHUP)
+    epv.data.u64 = fdi.uint
+
+    if Event.Read in events: epv.events = epv.events or EPOLLIN
+    if Event.Write in events: epv.events = epv.events or EPOLLOUT
+
+    if pkey.events == {}:
+      if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0:
+        raiseIOSelectorsError(osLastError())
+      inc(s.count)
+    else:
+      if events != {}:
+        if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fdi.cint, addr epv) != 0:
+          raiseIOSelectorsError(osLastError())
+      else:
+        if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
+          raiseIOSelectorsError(osLastError())
+        dec(s.count)
+    pkey.events = events
+
+proc unregister*[T](s: Selector[T], fd: int|SocketHandle) =
+  let fdi = int(fd)
+  s.checkFd(fdi)
+  var pkey = addr(s.fds[fdi])
+  doAssert(pkey.ident != InvalidIdent,
+           "Descriptor $# is not registered in the selector!" % $fdi)
+  if pkey.events != {}:
+    when not defined(android):
+      if Event.Read in pkey.events or Event.Write in pkey.events or Event.User in pkey.events:
+        var epv = EpollEvent()
+        # TODO: Refactor all these EPOLL_CTL_DEL + dec(s.count) into a proc.
+        if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
+          raiseIOSelectorsError(osLastError())
+        dec(s.count)
+      elif Event.Timer in pkey.events:
+        if Event.Finished notin pkey.events:
+          var epv = EpollEvent()
+          if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
+            raiseIOSelectorsError(osLastError())
+          dec(s.count)
+        if posix.close(cint(fdi)) != 0:
+          raiseIOSelectorsError(osLastError())
+      elif Event.Signal in pkey.events:
+        var epv = EpollEvent()
+        if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
+          raiseIOSelectorsError(osLastError())
+        var nmask, omask: Sigset
+        discard sigemptyset(nmask)
+        discard sigemptyset(omask)
+        discard sigaddset(nmask, cint(s.fds[fdi].param))
+        unblockSignals(nmask, omask)
+        dec(s.count)
+        if posix.close(cint(fdi)) != 0:
+          raiseIOSelectorsError(osLastError())
+      elif Event.Process in pkey.events:
+        if Event.Finished notin pkey.events:
+          var epv = EpollEvent()
+          if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
+            raiseIOSelectorsError(osLastError())
+          var nmask, omask: Sigset
+          discard sigemptyset(nmask)
+          discard sigemptyset(omask)
+          discard sigaddset(nmask, SIGCHLD)
+          unblockSignals(nmask, omask)
+          dec(s.count)
+        if posix.close(cint(fdi)) != 0:
+          raiseIOSelectorsError(osLastError())
+    else:
+      if Event.Read in pkey.events or Event.Write in pkey.events or Event.User in pkey.events:
+        var epv = EpollEvent()
+        if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
+          raiseIOSelectorsError(osLastError())
+        dec(s.count)
+      elif Event.Timer in pkey.events:
+        if Event.Finished notin pkey.events:
+          var epv = EpollEvent()
+          if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
+            raiseIOSelectorsError(osLastError())
+          dec(s.count)
+        if posix.close(cint(fdi)) != 0:
+          raiseIOSelectorsError(osLastError())
+  clearKey(pkey)
+
+proc unregister*[T](s: Selector[T], ev: SelectEvent) =
+  let fdi = int(ev.efd)
+  s.checkFd(fdi)
+  var pkey = addr(s.fds[fdi])
+  doAssert(pkey.ident != InvalidIdent, "Event is not registered in the queue!")
+  doAssert(Event.User in pkey.events)
+  var epv = EpollEvent()
+  if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0:
+    raiseIOSelectorsError(osLastError())
+  dec(s.count)
+  clearKey(pkey)
+
+proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool,
+                       data: T): int {.discardable.} =
+  var
+    newTs: Itimerspec
+    oldTs: Itimerspec
+  let fdi = timerfd_create(CLOCK_MONOTONIC, O_CLOEXEC or O_NONBLOCK).int
+  if fdi == -1:
+    raiseIOSelectorsError(osLastError())
+
+  s.checkFd(fdi)
+  doAssert(s.fds[fdi].ident == InvalidIdent)
+
+  var events = {Event.Timer}
+  var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP)
+  epv.data.u64 = fdi.uint
+
+  if oneshot:
+    newTs.it_interval.tv_sec = posix.Time(0)
+    newTs.it_interval.tv_nsec = 0
+    newTs.it_value.tv_sec = posix.Time(timeout div 1_000)
+    newTs.it_value.tv_nsec = (timeout %% 1_000) * 1_000_000
+    incl(events, Event.Oneshot)
+    epv.events = epv.events or EPOLLONESHOT
+  else:
+    newTs.it_interval.tv_sec = posix.Time(timeout div 1000)
+    newTs.it_interval.tv_nsec = (timeout %% 1_000) * 1_000_000
+    newTs.it_value.tv_sec = newTs.it_interval.tv_sec
+    newTs.it_value.tv_nsec = newTs.it_interval.tv_nsec
+
+  if timerfd_settime(fdi.cint, cint(0), newTs, oldTs) != 0:
+    raiseIOSelectorsError(osLastError())
+  if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0:
+    raiseIOSelectorsError(osLastError())
+  s.setKey(fdi, events, 0, data)
+  inc(s.count)
+  result = fdi
+
+when not defined(android):
+  proc registerSignal*[T](s: Selector[T], signal: int,
+                          data: T): int {.discardable.} =
+    var
+      nmask: Sigset
+      omask: Sigset
+
+    discard sigemptyset(nmask)
+    discard sigemptyset(omask)
+    discard sigaddset(nmask, cint(signal))
+    blockSignals(nmask, omask)
+
+    let fdi = signalfd(-1, nmask, O_CLOEXEC or O_NONBLOCK).int
+    if fdi == -1:
+      raiseIOSelectorsError(osLastError())
+
+    s.checkFd(fdi)
+    doAssert(s.fds[fdi].ident == InvalidIdent)
+
+    var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP)
+    epv.data.u64 = fdi.uint
+    if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0:
+      raiseIOSelectorsError(osLastError())
+    s.setKey(fdi, {Event.Signal}, signal, data)
+    inc(s.count)
+    result = fdi
+
+  proc registerProcess*[T](s: Selector, pid: int,
+                           data: T): int {.discardable.} =
+    var
+      nmask: Sigset
+      omask: Sigset
+
+    discard sigemptyset(nmask)
+    discard sigemptyset(omask)
+    discard sigaddset(nmask, posix.SIGCHLD)
+    blockSignals(nmask, omask)
+
+    let fdi = signalfd(-1, nmask, O_CLOEXEC or O_NONBLOCK).int
+    if fdi == -1:
+      raiseIOSelectorsError(osLastError())
+
+    s.checkFd(fdi)
+    doAssert(s.fds[fdi].ident == InvalidIdent)
+
+    var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP)
+    epv.data.u64 = fdi.uint
+    epv.events = EPOLLIN or EPOLLRDHUP
+    if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0:
+      raiseIOSelectorsError(osLastError())
+    s.setKey(fdi, {Event.Process, Event.Oneshot}, pid, data)
+    inc(s.count)
+    result = fdi
+
+proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) =
+  let fdi = int(ev.efd)
+  doAssert(s.fds[fdi].ident == InvalidIdent, "Event is already registered in the queue!")
+  s.setKey(fdi, {Event.User}, 0, data)
+  var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP)
+  epv.data.u64 = ev.efd.uint
+  if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, ev.efd, addr epv) != 0:
+    raiseIOSelectorsError(osLastError())
+  inc(s.count)
+
+proc selectInto*[T](s: Selector[T], timeout: int,
+                    results: var openArray[ReadyKey]): int =
+  var
+    resTable: array[MAX_EPOLL_EVENTS, EpollEvent]
+    maxres = MAX_EPOLL_EVENTS
+    i, k: int
+
+  if maxres > len(results):
+    maxres = len(results)
+
+  verifySelectParams(timeout)
+
+  let count = epoll_wait(s.epollFD, addr(resTable[0]), maxres.cint,
+                         timeout.cint)
+  if count < 0:
+    result = 0
+    let err = osLastError()
+    if cint(err) != EINTR:
+      raiseIOSelectorsError(err)
+  elif count == 0:
+    result = 0
+  else:
+    i = 0
+    k = 0
+    while i < count:
+      let fdi = int(resTable[i].data.u64)
+      let pevents = resTable[i].events
+      var pkey = addr(s.fds[fdi])
+      doAssert(pkey.ident != InvalidIdent)
+      var rkey = ReadyKey(fd: fdi, events: {})
+
+      if (pevents and EPOLLERR) != 0 or (pevents and EPOLLHUP) != 0:
+        if (pevents and EPOLLHUP) != 0:
+          rkey.errorCode = OSErrorCode ECONNRESET
+        else:
+          # Try reading SO_ERROR from fd.
+          var error: cint
+          var size = SockLen sizeof(error)
+          if getsockopt(SocketHandle fdi, SOL_SOCKET, SO_ERROR, addr(error),
+                        addr(size)) == 0'i32:
+            rkey.errorCode = OSErrorCode error
+
+        rkey.events.incl(Event.Error)
+      if (pevents and EPOLLOUT) != 0:
+        rkey.events.incl(Event.Write)
+      when not defined(android):
+        if (pevents and EPOLLIN) != 0:
+          if Event.Read in pkey.events:
+            rkey.events.incl(Event.Read)
+          elif Event.Timer in pkey.events:
+            var data: uint64 = 0
+            if posix.read(cint(fdi), addr data,
+                          sizeof(uint64)) != sizeof(uint64):
+              raiseIOSelectorsError(osLastError())
+            rkey.events.incl(Event.Timer)
+          elif Event.Signal in pkey.events:
+            var data = SignalFdInfo()
+            if posix.read(cint(fdi), addr data,
+                          sizeof(SignalFdInfo)) != sizeof(SignalFdInfo):
+              raiseIOSelectorsError(osLastError())
+            rkey.events.incl(Event.Signal)
+          elif Event.Process in pkey.events:
+            var data = SignalFdInfo()
+            if posix.read(cint(fdi), addr data,
+                          sizeof(SignalFdInfo)) != sizeof(SignalFdInfo):
+              raiseIOSelectorsError(osLastError())
+            if cast[int](data.ssi_pid) == pkey.param:
+              rkey.events.incl(Event.Process)
+            else:
+              inc(i)
+              continue
+          elif Event.User in pkey.events:
+            var data: uint64 = 0
+            if posix.read(cint(fdi), addr data,
+                          sizeof(uint64)) != sizeof(uint64):
+              let err = osLastError()
+              if err == OSErrorCode(EAGAIN):
+                inc(i)
+                continue
+              else:
+                raiseIOSelectorsError(err)
+            rkey.events.incl(Event.User)
+      else:
+        if (pevents and EPOLLIN) != 0:
+          if Event.Read in pkey.events:
+            rkey.events.incl(Event.Read)
+          elif Event.Timer in pkey.events:
+            var data: uint64 = 0
+            if posix.read(cint(fdi), addr data,
+                          sizeof(uint64)) != sizeof(uint64):
+              raiseIOSelectorsError(osLastError())
+            rkey.events.incl(Event.Timer)
+          elif Event.User in pkey.events:
+            var data: uint64 = 0
+            if posix.read(cint(fdi), addr data,
+                          sizeof(uint64)) != sizeof(uint64):
+              let err = osLastError()
+              if err == OSErrorCode(EAGAIN):
+                inc(i)
+                continue
+              else:
+                raiseIOSelectorsError(err)
+            rkey.events.incl(Event.User)
+
+      if Event.Oneshot in pkey.events:
+        var epv = EpollEvent()
+        if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, cint(fdi), addr epv) != 0:
+          raiseIOSelectorsError(osLastError())
+        # we will not clear key until it will be unregistered, so
+        # application can obtain data, but we will decrease counter,
+        # because epoll is empty.
+        dec(s.count)
+        # we are marking key with `Finished` event, to avoid double decrease.
+        pkey.events.incl(Event.Finished)
+
+      results[k] = rkey
+      inc(k)
+      inc(i)
+    result = k
+
+proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] =
+  result = newSeq[ReadyKey](MAX_EPOLL_EVENTS)
+  let count = selectInto(s, timeout, result)
+  result.setLen(count)
+
+template isEmpty*[T](s: Selector[T]): bool =
+  (s.count == 0)
+
+proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} =
+  return s.fds[fd.int].ident != InvalidIdent
+
+proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T =
+  let fdi = int(fd)
+  s.checkFd(fdi)
+  if fdi in s:
+    result = s.fds[fdi].data
+
+proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool =
+  let fdi = int(fd)
+  s.checkFd(fdi)
+  if fdi in s:
+    s.fds[fdi].data = data
+    result = true
+
+template withData*[T](s: Selector[T], fd: SocketHandle|int, value,
+                        body: untyped) =
+  mixin checkFd
+  let fdi = int(fd)
+  s.checkFd(fdi)
+  if fdi in s:
+    var value = addr(s.fds[fdi].data)
+    body
+
+template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1,
+                        body2: untyped) =
+  mixin checkFd
+  let fdi = int(fd)
+  s.checkFd(fdi)
+  if fdi in s:
+    var value = addr(s.fds[fdi].data)
+    body1
+  else:
+    body2
+
+proc getFd*[T](s: Selector[T]): int =
+  return s.epollFD.int