diff options
author | cheatfate <ka@hardcore.kiev.ua> | 2016-07-05 14:35:55 +0300 |
---|---|---|
committer | cheatfate <ka@hardcore.kiev.ua> | 2016-07-05 14:35:55 +0300 |
commit | 7724336d73ec580a442357df798771e524ece3a3 (patch) | |
tree | e8093960174546b52ad9a57caff55bf01c28d573 /lib/pure/ioselects/ioselectors_select.nim | |
parent | 025c6c0983d25a3a8662822373d9a187ea8c5c72 (diff) | |
download | Nim-7724336d73ec580a442357df798771e524ece3a3.tar.gz |
Patch one more path problem
Diffstat (limited to 'lib/pure/ioselects/ioselectors_select.nim')
-rw-r--r-- | lib/pure/ioselects/ioselectors_select.nim | 416 |
1 files changed, 416 insertions, 0 deletions
diff --git a/lib/pure/ioselects/ioselectors_select.nim b/lib/pure/ioselects/ioselectors_select.nim new file mode 100644 index 000000000..f8099f9a0 --- /dev/null +++ b/lib/pure/ioselects/ioselectors_select.nim @@ -0,0 +1,416 @@ +# +# +# Nim's Runtime Library +# (c) Copyright 2016 Eugene Kabanov +# +# See the file "copying.txt", included in this +# distribution, for details about the copyright. +# + +# This module implements Posix and Windows select(). + +import times, nativesockets + +when defined(windows): + import winlean + when defined(gcc): + {.passL: "-lws2_32".} + elif defined(vcc): + {.passL: "ws2_32.lib".} + const platformHeaders = """#include <winsock2.h> + #include <windows.h>""" + const EAGAIN = WSAEWOULDBLOCK +else: + const platformHeaders = """#include <sys/select.h> + #include <sys/time.h> + #include <sys/types.h> + #include <unistd.h>""" +type + Fdset {.importc: "fd_set", header: platformHeaders, pure, final.} = object +var + FD_SETSIZE {.importc: "FD_SETSIZE", header: platformHeaders.}: cint + +proc IOFD_SET(fd: SocketHandle, fdset: ptr Fdset) + {.cdecl, importc: "FD_SET", header: platformHeaders, inline.} +proc IOFD_CLR(fd: SocketHandle, fdset: ptr Fdset) + {.cdecl, importc: "FD_CLR", header: platformHeaders, inline.} +proc IOFD_ZERO(fdset: ptr Fdset) + {.cdecl, importc: "FD_ZERO", header: platformHeaders, inline.} + +when defined(windows): + proc IOFD_ISSET(fd: SocketHandle, fdset: ptr Fdset): cint + {.stdcall, importc: "FD_ISSET", header: platformHeaders, inline.} + proc ioselect(nfds: cint, readFds, writeFds, exceptFds: ptr Fdset, + timeout: ptr Timeval): cint + {.stdcall, importc: "select", header: platformHeaders.} +else: + proc IOFD_ISSET(fd: SocketHandle, fdset: ptr Fdset): cint + {.cdecl, importc: "FD_ISSET", header: platformHeaders, inline.} + proc ioselect(nfds: cint, readFds, writeFds, exceptFds: ptr Fdset, + timeout: ptr Timeval): cint + {.cdecl, importc: "select", header: platformHeaders.} + +when hasThreadSupport: + type + SelectorImpl[T] = object + rSet: FdSet + wSet: FdSet + eSet: FdSet + maxFD: int + fds: ptr SharedArray[SelectorKey[T]] + count: int + lock: Lock + Selector*[T] = ptr SelectorImpl[T] +else: + type + SelectorImpl[T] = object + rSet: FdSet + wSet: FdSet + eSet: FdSet + maxFD: int + fds: seq[SelectorKey[T]] + count: int + Selector*[T] = ref SelectorImpl[T] + +type + SelectEventImpl = object + rsock: SocketHandle + wsock: SocketHandle + SelectEvent* = ptr SelectEventImpl + +when hasThreadSupport: + template withSelectLock[T](s: Selector[T], body: untyped) = + acquire(s.lock) + {.locks: [s.lock].}: + try: + body + finally: + release(s.lock) +else: + template withSelectLock[T](s: Selector[T], body: untyped) = + body + +proc newSelector*[T](): Selector[T] = + when hasThreadSupport: + result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T]))) + result.fds = allocSharedArray[SelectorKey[T]](FD_SETSIZE) + initLock result.lock + else: + result = Selector[T]() + result.fds = newSeq[SelectorKey[T]](FD_SETSIZE) + + IOFD_ZERO(addr result.rSet) + IOFD_ZERO(addr result.wSet) + IOFD_ZERO(addr result.eSet) + +proc close*[T](s: Selector[T]) = + when hasThreadSupport: + deallocSharedArray(s.fds) + deallocShared(cast[pointer](s)) + +when defined(windows): + proc newSelectEvent*(): SelectEvent = + var ssock = newNativeSocket() + var wsock = newNativeSocket() + var rsock: SocketHandle = INVALID_SOCKET + var saddr = Sockaddr_in() + + saddr.sin_family = winlean.AF_INET + saddr.sin_port = 0 + saddr.sin_addr.s_addr = INADDR_ANY + if bindAddr(ssock, cast[ptr SockAddr](addr(saddr)), + sizeof(saddr).SockLen) < 0'i32: + raiseOSError(osLastError()) + + if winlean.listen(ssock, 1) == -1: + raiseOSError(osLastError()) + + var namelen = sizeof(saddr).SockLen + if getsockname(ssock, cast[ptr SockAddr](addr(saddr)), + addr(namelen)) == -1'i32: + raiseOSError(osLastError()) + + saddr.sin_addr.s_addr = 0x0100007F + if winlean.connect(wsock, cast[ptr SockAddr](addr(saddr)), + sizeof(saddr).SockLen) == -1: + raiseOSError(osLastError()) + namelen = sizeof(saddr).SockLen + rsock = winlean.accept(ssock, cast[ptr SockAddr](addr(saddr)), + cast[ptr SockLen](addr(namelen))) + if rsock == SocketHandle(-1): + raiseOSError(osLastError()) + + if winlean.closesocket(ssock) == -1: + raiseOSError(osLastError()) + + var mode = clong(1) + if ioctlsocket(rsock, FIONBIO, addr(mode)) == -1: + raiseOSError(osLastError()) + mode = clong(1) + if ioctlsocket(wsock, FIONBIO, addr(mode)) == -1: + raiseOSError(osLastError()) + + result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl))) + result.rsock = rsock + result.wsock = wsock + + proc setEvent*(ev: SelectEvent) = + var data: int = 1 + if winlean.send(ev.wsock, cast[pointer](addr data), + cint(sizeof(int)), 0) != sizeof(int): + raiseOSError(osLastError()) + + proc close*(ev: SelectEvent) = + discard winlean.closesocket(ev.rsock) + discard winlean.closesocket(ev.wsock) + deallocShared(cast[pointer](ev)) + +else: + proc newSelectEvent*(): SelectEvent = + var fds: array[2, cint] + if posix.pipe(fds) == -1: + raiseOSError(osLastError()) + setNonBlocking(fds[0]) + setNonBlocking(fds[1]) + result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl))) + result.rsock = SocketHandle(fds[0]) + result.wsock = SocketHandle(fds[1]) + + proc setEvent*(ev: SelectEvent) = + var data: uint64 = 1 + if posix.write(cint(ev.wsock), addr data, sizeof(uint64)) != sizeof(uint64): + raiseOSError(osLastError()) + + proc close*(ev: SelectEvent) = + discard posix.close(cint(ev.rsock)) + discard posix.close(cint(ev.wsock)) + deallocShared(cast[pointer](ev)) + +proc setKey[T](s: Selector[T], fd: SocketHandle, events: set[Event], data: T) = + var i = 0 + let fdi = int(fd) + while i < FD_SETSIZE: + if s.fds[i].ident == 0: + var pkey = addr(s.fds[i]) + pkey.ident = fdi + pkey.events = events + pkey.key.fd = fd.int + pkey.key.events = {} + pkey.key.data = data + break + inc(i) + if i == FD_SETSIZE: + raise newException(ValueError, "Maximum numbers of fds exceeded") + +proc getKey[T](s: Selector[T], fd: SocketHandle): ptr SelectorKey[T] = + var i = 0 + let fdi = int(fd) + while i < FD_SETSIZE: + if s.fds[i].ident == fdi: + result = addr(s.fds[i]) + break + inc(i) + doAssert(i < FD_SETSIZE, "Descriptor not registered in queue") + +proc delKey[T](s: Selector[T], fd: SocketHandle) = + var i = 0 + while i < FD_SETSIZE: + if s.fds[i].ident == fd.int: + s.fds[i].ident = 0 + s.fds[i].events = {} + break + inc(i) + doAssert(i < FD_SETSIZE, "Descriptor not registered in queue") + +proc registerHandle*[T](s: Selector[T], fd: SocketHandle, + events: set[Event], data: T) = + when not defined(windows): + let fdi = int(fd) + s.withSelectLock(): + s.setKey(fd, events, data) + when not defined(windows): + if fdi > s.maxFD: s.maxFD = fdi + if Event.Read in events: + IOFD_SET(fd, addr s.rSet) + inc(s.count) + if Event.Write in events: + IOFD_SET(fd, addr s.wSet) + IOFD_SET(fd, addr s.eSet) + inc(s.count) + +proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) = + when not defined(windows): + let fdi = int(ev.rsock) + s.withSelectLock(): + s.setKey(ev.rsock, {Event.User}, data) + when not defined(windows): + if fdi > s.maxFD: s.maxFD = fdi + IOFD_SET(ev.rsock, addr s.rSet) + inc(s.count) + +proc updateHandle*[T](s: Selector[T], fd: SocketHandle, + events: set[Event]) = + let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode, + Event.User, Event.Oneshot, Event.Error} + s.withSelectLock(): + var pkey = s.getKey(fd) + doAssert(pkey.events * maskEvents == {}) + if pkey.events != events: + if (Event.Read in pkey.events) and (Event.Read notin events): + IOFD_CLR(fd, addr s.rSet) + dec(s.count) + if (Event.Write in pkey.events) and (Event.Write notin events): + IOFD_CLR(fd, addr s.wSet) + IOFD_CLR(fd, addr s.eSet) + dec(s.count) + if (Event.Read notin pkey.events) and (Event.Read in events): + IOFD_SET(fd, addr s.rSet) + inc(s.count) + if (Event.Write notin pkey.events) and (Event.Write in events): + IOFD_SET(fd, addr s.wSet) + IOFD_SET(fd, addr s.eSet) + inc(s.count) + pkey.events = events + +proc unregister*[T](s: Selector[T], fd: SocketHandle) = + s.withSelectLock(): + var pkey = s.getKey(fd) + if Event.Read in pkey.events: + IOFD_CLR(fd, addr s.rSet) + dec(s.count) + if Event.Write in pkey.events: + IOFD_CLR(fd, addr s.wSet) + IOFD_CLR(fd, addr s.eSet) + dec(s.count) + s.delKey(fd) + +proc unregister*[T](s: Selector[T], ev: SelectEvent) = + let fd = ev.rsock + s.withSelectLock(): + IOFD_CLR(fd, addr s.rSet) + dec(s.count) + s.delKey(fd) + +proc selectInto*[T](s: Selector[T], timeout: int, + results: var openarray[ReadyKey[T]]): int = + var tv = Timeval() + var ptv = addr tv + var rset, wset, eset: FdSet + + if timeout != -1: + tv.tv_sec = timeout.int32 div 1_000 + tv.tv_usec = (timeout.int32 %% 1_000) * 1_000 + else: + ptv = nil + + s.withSelectLock(): + rset = s.rSet + wset = s.wSet + eset = s.eSet + + var count = ioselect(cint(s.maxFD) + 1, addr(rset), addr(wset), + addr(eset), ptv) + if count < 0: + result = 0 + when defined(windows): + raiseOSError(osLastError()) + else: + let err = osLastError() + if cint(err) != EINTR: + raiseOSError(err) + elif count == 0: + result = 0 + else: + var rindex = 0 + var i = 0 + var k = 0 + + while (i < FD_SETSIZE) and (k < count): + if s.fds[i].ident != 0: + var flag = false + var pkey = addr(s.fds[i]) + pkey.key.events = {} + let fd = SocketHandle(pkey.ident) + if IOFD_ISSET(fd, addr rset) != 0: + if Event.User in pkey.events: + var data: uint64 = 0 + if recv(fd, cast[pointer](addr(data)), + sizeof(uint64).cint, 0) != sizeof(uint64): + let err = osLastError() + if cint(err) != EAGAIN: + raiseOSError(err) + else: + inc(i) + inc(k) + continue + else: + flag = true + pkey.key.events = {Event.User} + else: + flag = true + pkey.key.events = {Event.Read} + if IOFD_ISSET(fd, addr wset) != 0: + pkey.key.events.incl(Event.Write) + if IOFD_ISSET(fd, addr eset) != 0: + pkey.key.events.incl(Event.Error) + flag = true + if flag: + results[rindex] = pkey.key + inc(rindex) + inc(k) + inc(i) + result = rindex + +proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey[T]] = + result = newSeq[ReadyKey[T]](FD_SETSIZE) + var count = selectInto(s, timeout, result) + result.setLen(count) + +proc flush*[T](s: Selector[T]) = discard + +template isEmpty*[T](s: Selector[T]): bool = + (s.count == 0) + +when hasThreadSupport: + template withSelectLock[T](s: Selector[T], body: untyped) = + acquire(s.lock) + {.locks: [s.lock].}: + try: + body + finally: + release(s.lock) +else: + template withSelectLock[T](s: Selector[T], body: untyped) = + body + +template withData*[T](s: Selector[T], fd: SocketHandle, value, + body: untyped) = + mixin withSelectLock + s.withSelectLock(): + var value: ptr T + let fdi = int(fd) + var i = 0 + while i < FD_SETSIZE: + if s.fds[i].ident == fdi: + value = addr(s.fds[i].key.data) + break + inc(i) + if i != FD_SETSIZE: + body + +template withData*[T](s: Selector[T], fd: SocketHandle, value, + body1, body2: untyped) = + mixin withSelectLock + s.withSelectLock(): + var value: ptr T + let fdi = int(fd) + var i = 0 + while i < FD_SETSIZE: + if s.fds[i].ident == fdi: + value = addr(s.fds[i].key.data) + break + inc(i) + if i != FD_SETSIZE: + body1 + else: + body2 |