diff options
Diffstat (limited to 'tests/async/tioselectors.nim')
-rw-r--r-- | tests/async/tioselectors.nim | 643 |
1 files changed, 643 insertions, 0 deletions
diff --git a/tests/async/tioselectors.nim b/tests/async/tioselectors.nim new file mode 100644 index 000000000..f53767408 --- /dev/null +++ b/tests/async/tioselectors.nim @@ -0,0 +1,643 @@ +discard """ + output: "All tests passed!" +""" +import selectors + +const hasThreadSupport = compileOption("threads") + +template processTest(t, x: untyped) = + #stdout.write(t) + #stdout.flushFile() + if not x: echo(t & " FAILED\r\n") + +when not defined(windows): + import os, posix, nativesockets + + when ioselSupportedPlatform: + import osproc + + proc socket_notification_test(): bool = + proc create_test_socket(): SocketHandle = + var sock = posix.socket(posix.AF_INET, posix.SOCK_STREAM, + posix.IPPROTO_TCP) + var x: int = fcntl(sock, F_GETFL, 0) + if x == -1: raiseOSError(osLastError()) + else: + var mode = x or O_NONBLOCK + if fcntl(sock, F_SETFL, mode) == -1: + raiseOSError(osLastError()) + result = sock + + var client_message = "SERVER HELLO =>" + var server_message = "CLIENT HELLO" + var buffer : array[128, char] + + var selector = newSelector[int]() + var client_socket = create_test_socket() + var server_socket = create_test_socket() + + var option : int32 = 1 + if setsockopt(server_socket, cint(SOL_SOCKET), cint(SO_REUSEADDR), + addr(option), sizeof(option).SockLen) < 0: + raiseOSError(osLastError()) + + var aiList = getAddrInfo("0.0.0.0", Port(13337)) + if bindAddr(server_socket, aiList.ai_addr, + aiList.ai_addrlen.Socklen) < 0'i32: + freeAddrInfo(aiList) + raiseOSError(osLastError()) + if server_socket.listen() == -1: + raiseOSError(osLastError()) + freeAddrInfo(aiList) + + aiList = getAddrInfo("127.0.0.1", Port(13337)) + discard posix.connect(client_socket, aiList.ai_addr, + aiList.ai_addrlen.Socklen) + + registerHandle(selector, server_socket, {Event.Read}, 0) + registerHandle(selector, client_socket, {Event.Write}, 0) + + freeAddrInfo(aiList) + + # make sure both sockets are selected + var nevs = 0 + while nevs < 2: + nevs += selector.select(100).len + + var sockAddress: SockAddr + var addrLen = sizeof(sockAddress).Socklen + var server2_socket = accept(server_socket, + cast[ptr SockAddr](addr(sockAddress)), + addr(addrLen)) + assert(server2_socket != osInvalidSocket) + selector.registerHandle(server2_socket, {Event.Read}, 0) + + if posix.send(client_socket, addr(client_message[0]), + len(client_message), 0) == -1: + raiseOSError(osLastError()) + + selector.updateHandle(client_socket, {Event.Read}) + + var rc2 = selector.select(100) + assert(len(rc2) == 1) + + var read_count = posix.recv(server2_socket, addr buffer[0], 128, 0) + if read_count == -1: + raiseOSError(osLastError()) + + assert(read_count == len(client_message)) + var test1 = true + for i in 0..<read_count: + if client_message[i] != buffer[i]: + test1 = false + break + assert(test1) + + selector.updateHandle(server2_socket, {Event.Write}) + var rc3 = selector.select(0) + assert(len(rc3) == 1) + if posix.send(server2_socket, addr(server_message[0]), + len(server_message), 0) == -1: + raiseOSError(osLastError()) + selector.updateHandle(server2_socket, {Event.Read}) + + var rc4 = selector.select(100) + assert(len(rc4) == 1) + read_count = posix.recv(client_socket, addr(buffer[0]), 128, 0) + if read_count == -1: + raiseOSError(osLastError()) + + assert(read_count == len(server_message)) + var test2 = true + for i in 0..<read_count: + if server_message[i] != buffer[i]: + test2 = false + break + assert(test2) + + selector.unregister(server_socket) + selector.unregister(server2_socket) + selector.unregister(client_socket) + discard posix.close(server_socket) + discard posix.close(server2_socket) + discard posix.close(client_socket) + assert(selector.isEmpty()) + close(selector) + result = true + + proc event_notification_test(): bool = + var selector = newSelector[int]() + var event = newSelectEvent() + selector.registerEvent(event, 1) + var rc0 = selector.select(0) + event.trigger() + var rc1 = selector.select(0) + event.trigger() + var rc2 = selector.select(0) + var rc3 = selector.select(0) + assert(len(rc0) == 0 and len(rc1) == 1 and len(rc2) == 1 and len(rc3) == 0) + var ev1 = selector.getData(rc1[0].fd) + var ev2 = selector.getData(rc2[0].fd) + assert(ev1 == 1 and ev2 == 1) + selector.unregister(event) + event.close() + assert(selector.isEmpty()) + selector.close() + result = true + + when ioselSupportedPlatform: + proc timer_notification_test(): bool = + var selector = newSelector[int]() + var timer = selector.registerTimer(100, false, 0) + var rc1 = selector.select(10000) + var rc2 = selector.select(10000) + # if this flakes, see tests/m14634.nim + assert len(rc1) == 1 and len(rc2) == 1, $(len(rc1), len(rc2)) + selector.unregister(timer) + discard selector.select(0) + selector.registerTimer(100, true, 0) + var rc4 = selector.select(10000) + var rc5 = selector.select(1000) # this will be an actual wait, keep it small + assert len(rc4) == 1 and len(rc5) == 0, $(len(rc4), len(rc5)) + assert(selector.isEmpty()) + selector.close() + result = true + + proc process_notification_test(): bool = + var selector = newSelector[int]() + var process2 = startProcess("sleep", "", ["2"], nil, + {poStdErrToStdOut, poUsePath}) + discard startProcess("sleep", "", ["1"], nil, + {poStdErrToStdOut, poUsePath}) + + selector.registerProcess(process2.processID, 0) + var rc1 = selector.select(1500) + var rc2 = selector.select(1500) + var r = len(rc1) + len(rc2) + assert(r == 1) + result = true + + proc signal_notification_test(): bool = + var sigset1n, sigset1o, sigset2n, sigset2o: Sigset + var pid = posix.getpid() + + discard sigemptyset(sigset1n) + discard sigemptyset(sigset1o) + discard sigemptyset(sigset2n) + discard sigemptyset(sigset2o) + + when hasThreadSupport: + if pthread_sigmask(SIG_BLOCK, sigset1n, sigset1o) == -1: + raiseOSError(osLastError()) + else: + if sigprocmask(SIG_BLOCK, sigset1n, sigset1o) == -1: + raiseOSError(osLastError()) + + var selector = newSelector[int]() + var s1 = selector.registerSignal(SIGUSR1, 1) + var s2 = selector.registerSignal(SIGUSR2, 2) + var s3 = selector.registerSignal(SIGTERM, 3) + discard selector.select(0) + discard posix.kill(pid, SIGUSR1) + discard posix.kill(pid, SIGUSR2) + discard posix.kill(pid, SIGTERM) + var rc = selector.select(0) + var cd0 = selector.getData(rc[0].fd) + var cd1 = selector.getData(rc[1].fd) + var cd2 = selector.getData(rc[2].fd) + selector.unregister(s1) + selector.unregister(s2) + selector.unregister(s3) + + when hasThreadSupport: + if pthread_sigmask(SIG_BLOCK, sigset2n, sigset2o) == -1: + raiseOSError(osLastError()) + else: + if sigprocmask(SIG_BLOCK, sigset2n, sigset2o) == -1: + raiseOSError(osLastError()) + + assert(len(rc) == 3) + assert(cd0 + cd1 + cd2 == 6, $(cd0 + cd1 + cd2)) # 1 + 2 + 3 + assert(equalMem(addr sigset1o, addr sigset2o, sizeof(Sigset))) + assert(selector.isEmpty()) + result = true + + when defined(macosx) or defined(freebsd) or defined(openbsd) or + defined(netbsd): + + proc rename(frompath: cstring, topath: cstring): cint + {.importc: "rename", header: "<stdio.h>".} + + proc createFile(name: string): cint = + result = posix.open(cstring(name), posix.O_CREAT or posix.O_RDWR) + if result == -1: + raiseOsError(osLastError()) + + proc writeFile(name: string, data: string) = + let fd = posix.open(cstring(name), posix.O_APPEND or posix.O_RDWR) + if fd == -1: + raiseOsError(osLastError()) + let length = len(data).cint + if posix.write(fd, cast[pointer](addr data[0]), + len(data).cint) != length: + raiseOsError(osLastError()) + if posix.close(fd) == -1: + raiseOsError(osLastError()) + + proc closeFile(fd: cint) = + if posix.close(fd) == -1: + raiseOsError(osLastError()) + + proc removeFile(name: string) = + let err = posix.unlink(cstring(name)) + if err == -1: + raiseOsError(osLastError()) + + proc createDir(name: string) = + let err = posix.mkdir(cstring(name), 0x1FF) + if err == -1: + raiseOsError(osLastError()) + + proc removeDir(name: string) = + let err = posix.rmdir(cstring(name)) + if err == -1: + raiseOsError(osLastError()) + + proc chmodPath(name: string, mode: cint) = + let err = posix.chmod(cstring(name), Mode(mode)) + if err == -1: + raiseOsError(osLastError()) + + proc renameFile(names: string, named: string) = + let err = rename(cstring(names), cstring(named)) + if err == -1: + raiseOsError(osLastError()) + + proc symlink(names: string, named: string) = + let err = posix.symlink(cstring(names), cstring(named)) + if err == -1: + raiseOsError(osLastError()) + + proc openWatch(name: string): cint = + result = posix.open(cstring(name), posix.O_RDONLY) + if result == -1: + raiseOsError(osLastError()) + + const + testDirectory = "/tmp/kqtest" + + type + valType = object + fd: cint + events: set[Event] + + proc vnode_test(): bool = + proc validate(test: openArray[ReadyKey], + check: openArray[valType]): bool = + result = false + if len(test) == len(check): + for checkItem in check: + result = false + for testItem in test: + if testItem.fd == checkItem.fd and + checkItem.events <= testItem.events: + result = true + break + if not result: + break + + var res: seq[ReadyKey] + var selector = newSelector[int]() + var events = {Event.VnodeWrite, Event.VnodeDelete, Event.VnodeExtend, + Event.VnodeAttrib, Event.VnodeLink, Event.VnodeRename, + Event.VnodeRevoke} + + result = true + discard posix.unlink(testDirectory) + + createDir(testDirectory) + var dirfd = posix.open(cstring(testDirectory), posix.O_RDONLY) + if dirfd == -1: + raiseOsError(osLastError()) + + selector.registerVnode(dirfd, events, 1) + discard selector.select(0) + + # chmod testDirectory to 0777 + chmodPath(testDirectory, 0x1FF) + res = selector.select(0) + doAssert(len(res) == 1) + doAssert(len(selector.select(0)) == 0) + doAssert(res[0].fd == dirfd and + {Event.Vnode, Event.VnodeAttrib} <= res[0].events) + + # create subdirectory + createDir(testDirectory & "/test") + res = selector.select(0) + doAssert(len(res) == 1) + doAssert(len(selector.select(0)) == 0) + doAssert(res[0].fd == dirfd and + {Event.Vnode, Event.VnodeWrite, + Event.VnodeLink} <= res[0].events) + + # open test directory for watching + var testfd = openWatch(testDirectory & "/test") + selector.registerVnode(testfd, events, 2) + doAssert(len(selector.select(0)) == 0) + + # rename test directory + renameFile(testDirectory & "/test", testDirectory & "/renamed") + res = selector.select(0) + doAssert(len(res) == 2) + doAssert(len(selector.select(0)) == 0) + doAssert(validate(res, + [valType(fd: dirfd, events: {Event.Vnode, Event.VnodeWrite}), + valType(fd: testfd, + events: {Event.Vnode, Event.VnodeRename})]) + ) + + # remove test directory + removeDir(testDirectory & "/renamed") + res = selector.select(0) + doAssert(len(res) == 2) + doAssert(len(selector.select(0)) == 0) + doAssert(validate(res, + [valType(fd: dirfd, events: {Event.Vnode, Event.VnodeWrite, + Event.VnodeLink}), + valType(fd: testfd, + events: {Event.Vnode, Event.VnodeDelete})]) + ) + # create file new test file + testfd = createFile(testDirectory & "/testfile") + res = selector.select(0) + doAssert(len(res) == 1) + doAssert(len(selector.select(0)) == 0) + doAssert(res[0].fd == dirfd and + {Event.Vnode, Event.VnodeWrite} <= res[0].events) + + # close new test file + closeFile(testfd) + doAssert(len(selector.select(0)) == 0) + doAssert(len(selector.select(0)) == 0) + + # chmod test file with 0666 + chmodPath(testDirectory & "/testfile", 0x1B6) + doAssert(len(selector.select(0)) == 0) + + testfd = openWatch(testDirectory & "/testfile") + selector.registerVnode(testfd, events, 1) + discard selector.select(0) + + # write data to test file + writeFile(testDirectory & "/testfile", "TESTDATA") + res = selector.select(0) + doAssert(len(res) == 1) + doAssert(len(selector.select(0)) == 0) + doAssert(res[0].fd == testfd and + {Event.Vnode, Event.VnodeWrite, + Event.VnodeExtend} <= res[0].events) + + # symlink test file + symlink(testDirectory & "/testfile", testDirectory & "/testlink") + res = selector.select(0) + doAssert(len(res) == 1) + doAssert(len(selector.select(0)) == 0) + doAssert(res[0].fd == dirfd and + {Event.Vnode, Event.VnodeWrite} <= res[0].events) + + # remove test file + removeFile(testDirectory & "/testfile") + res = selector.select(0) + doAssert(len(res) == 2) + doAssert(len(selector.select(0)) == 0) + doAssert(validate(res, + [valType(fd: testfd, events: {Event.Vnode, Event.VnodeDelete}), + valType(fd: dirfd, events: {Event.Vnode, Event.VnodeWrite})]) + ) + + # remove symlink + removeFile(testDirectory & "/testlink") + res = selector.select(0) + doAssert(len(res) == 1) + doAssert(len(selector.select(0)) == 0) + doAssert(res[0].fd == dirfd and + {Event.Vnode, Event.VnodeWrite} <= res[0].events) + + # remove testDirectory + removeDir(testDirectory) + res = selector.select(0) + doAssert(len(res) == 1) + doAssert(len(selector.select(0)) == 0) + doAssert(res[0].fd == dirfd and + {Event.Vnode, Event.VnodeDelete} <= res[0].events) + + proc pipe_test(): bool = + # closing the read end of a pipe will result in it automatically + # being removed from the kqueue; make sure no exception is raised + var s = newSelector[int]() + var fds: array[2, cint] + discard pipe(fds) + s.registerHandle(fds[1], {Write}, 0) + discard close(fds[0]) + let res = s.select(-1) + doAssert(res.len == 1) + s.unregister(fds[1]) + discard close(fds[1]) + return true + + when hasThreadSupport: + + var counter = 0 + + proc event_wait_thread(event: SelectEvent) {.thread.} = + var selector = newSelector[int]() + selector.registerEvent(event, 1) + var rc = selector.select(1000) + if len(rc) == 1: + inc(counter) + selector.unregister(event) + assert(selector.isEmpty()) + + proc mt_event_test(): bool = + var + thr: array[0..7, Thread[SelectEvent]] + var selector = newSelector[int]() + var sock = createNativeSocket() + var event = newSelectEvent() + for i in 0..high(thr): + createThread(thr[i], event_wait_thread, event) + selector.registerHandle(sock, {Event.Read}, 1) + discard selector.select(500) + selector.unregister(sock) + event.trigger() + joinThreads(thr) + assert(counter == 1) + result = true + + processTest("Socket notification test...", socket_notification_test()) + processTest("User event notification test...", event_notification_test()) + when hasThreadSupport: + processTest("Multithreaded user event notification test...", + mt_event_test()) + when ioselSupportedPlatform: + processTest("Timer notification test...", timer_notification_test()) + processTest("Process notification test...", process_notification_test()) + processTest("Signal notification test...", signal_notification_test()) + when defined(macosx) or defined(freebsd) or defined(openbsd) or + defined(netbsd): + processTest("File notification test...", vnode_test()) + processTest("Pipe test...", pipe_test()) + echo("All tests passed!") +else: + import nativesockets, winlean, os, osproc + + proc socket_notification_test(): bool = + proc create_test_socket(): SocketHandle = + var sock = createNativeSocket() + setBlocking(sock, false) + result = sock + + var client_message = "SERVER HELLO =>" + var server_message = "CLIENT HELLO" + var buffer : array[128, char] + + var selector = newSelector[int]() + var client_socket = create_test_socket() + var server_socket = create_test_socket() + + selector.registerHandle(server_socket, {Event.Read}, 0) + selector.registerHandle(client_socket, {Event.Write}, 0) + + var option : int32 = 1 + if setsockopt(server_socket, cint(SOL_SOCKET), cint(SO_REUSEADDR), + addr(option), sizeof(option).SockLen) < 0: + raiseOSError(osLastError()) + + var aiList = getAddrInfo("0.0.0.0", Port(13337)) + if bindAddr(server_socket, aiList.ai_addr, + aiList.ai_addrlen.Socklen) < 0'i32: + freeAddrInfo(aiList) + raiseOSError(osLastError()) + discard server_socket.listen() + freeAddrInfo(aiList) + + aiList = getAddrInfo("127.0.0.1", Port(13337)) + discard connect(client_socket, aiList.ai_addr, + aiList.ai_addrlen.Socklen) + freeAddrInfo(aiList) + # for some reason Windows select doesn't return both + # descriptors from first call, so we need to make 2 calls + var n = 0 + var rcm = selector.select(1000) + while n < 10 and len(rcm) < 2: + sleep(1000) + rcm = selector.select(1000) + inc(n) + + assert(len(rcm) == 2) + + var sockAddress = SockAddr() + var addrLen = sizeof(sockAddress).Socklen + var server2_socket = accept(server_socket, + cast[ptr SockAddr](addr(sockAddress)), + addr(addrLen)) + assert(server2_socket != osInvalidSocket) + selector.registerHandle(server2_socket, {Event.Read}, 0) + + if send(client_socket, cast[pointer](addr(client_message[0])), + cint(len(client_message)), 0) == -1: + raiseOSError(osLastError()) + + selector.updateHandle(client_socket, {Event.Read}) + + var rc2 = selector.select(1000) + assert(len(rc2) == 1) + + var read_count = recv(server2_socket, addr buffer[0], 128, 0) + if read_count == -1: + raiseOSError(osLastError()) + + assert(read_count == len(client_message)) + var test1 = true + for i in 0..<read_count: + if client_message[i] != buffer[i]: + test1 = false + break + assert(test1) + + if send(server2_socket, cast[pointer](addr(server_message[0])), + cint(len(server_message)), 0) == -1: + raiseOSError(osLastError()) + + var rc3 = selector.select(0) + assert(len(rc3) == 1) + read_count = recv(client_socket, addr(buffer[0]), 128, 0) + if read_count == -1: + raiseOSError(osLastError()) + + assert(read_count == len(server_message)) + var test2 = true + for i in 0..<read_count: + if server_message[i] != buffer[i]: + test2 = false + break + assert(test2) + + selector.unregister(server_socket) + selector.unregister(server2_socket) + selector.unregister(client_socket) + close(server_socket) + close(server2_socket) + close(client_socket) + assert(selector.isEmpty()) + close(selector) + result = true + + proc event_notification_test(): bool = + var selector = newSelector[int]() + var event = newSelectEvent() + selector.registerEvent(event, 1) + discard selector.select(0) + event.trigger() + var rc1 = selector.select(0) + event.trigger() + var rc2 = selector.select(0) + var rc3 = selector.select(0) + assert(len(rc1) == 1 and len(rc2) == 1 and len(rc3) == 0) + var ev1 = selector.getData(rc1[0].fd) + var ev2 = selector.getData(rc2[0].fd) + assert(ev1 == 1 and ev2 == 1) + selector.unregister(event) + event.close() + assert(selector.isEmpty()) + selector.close() + result = true + + when hasThreadSupport: + var counter = 0 + + proc event_wait_thread(event: SelectEvent) {.thread.} = + var selector = newSelector[int]() + selector.registerEvent(event, 1) + var rc = selector.select(1500) + if len(rc) == 1: + inc(counter) + selector.unregister(event) + assert(selector.isEmpty()) + + proc mt_event_test(): bool = + var thr: array[0..7, Thread[SelectEvent]] + var event = newSelectEvent() + for i in 0..high(thr): + createThread(thr[i], event_wait_thread, event) + event.trigger() + joinThreads(thr) + assert(counter == 1) + result = true + + processTest("Socket notification test...", socket_notification_test()) + processTest("User event notification test...", event_notification_test()) + when hasThreadSupport: + processTest("Multithreaded user event notification test...", + mt_event_test()) + echo("All tests passed!") |