From cd2bd7fa7b1048956c72ad7665b70b2eabecd549 Mon Sep 17 00:00:00 2001 From: Michał Zieliński Date: Fri, 14 Feb 2014 15:08:02 +0100 Subject: osproc: use clone with CLONE_VM on Linux for faster process spawning --- lib/posix/linux.nim | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 lib/posix/linux.nim (limited to 'lib/posix') diff --git a/lib/posix/linux.nim b/lib/posix/linux.nim new file mode 100644 index 000000000..1ed1af3b6 --- /dev/null +++ b/lib/posix/linux.nim @@ -0,0 +1,25 @@ +import posix + +const + CSIGNAL* = 0x000000FF + CLONE_VM* = 0x00000100 + CLONE_FS* = 0x00000200 + CLONE_FILES* = 0x00000400 + CLONE_SIGHAND* = 0x00000800 + CLONE_PTRACE* = 0x00002000 + CLONE_VFORK* = 0x00004000 + CLONE_PARENT* = 0x00008000 + CLONE_THREAD* = 0x00010000 + CLONE_NEWNS* = 0x00020000 + CLONE_SYSVSEM* = 0x00040000 + CLONE_SETTLS* = 0x00080000 + CLONE_PARENT_SETTID* = 0x00100000 + CLONE_CHILD_CLEARTID* = 0x00200000 + CLONE_DETACHED* = 0x00400000 + CLONE_UNTRACED* = 0x00800000 + CLONE_CHILD_SETTID* = 0x01000000 + CLONE_STOPPED* = 0x02000000 + +# fn should be of type proc (a2: pointer): void {.cdecl.} +proc clone*(fn: pointer; child_stack: pointer; flags: cint; + arg: pointer; ptid: ptr TPid; tls: pointer; ctid: ptr TPid): cint {.importc, header: "".} -- cgit 1.4.1-2-gfad0 From 4c09fc110f3d269c34ccbfabb665bc34c768b63e Mon Sep 17 00:00:00 2001 From: Michał Zieliński Date: Fri, 14 Feb 2014 15:51:06 +0100 Subject: osproc: make failed execv an exception (when using fork or clone) startProcessAuxFork creates a pipe, which is used by a child to pass an error code if execv fails. --- lib/posix/posix.nim | 1 + lib/pure/osproc.nim | 86 +++++++++++++++++++++++++++++++++++++++-------------- 2 files changed, 64 insertions(+), 23 deletions(-) (limited to 'lib/posix') diff --git a/lib/posix/posix.nim b/lib/posix/posix.nim index 41260b36f..138df1aec 100644 --- a/lib/posix/posix.nim +++ b/lib/posix/posix.nim @@ -2066,6 +2066,7 @@ proc pthread_spin_unlock*(a1: ptr Tpthread_spinlock): cint {. proc pthread_testcancel*() {.importc, header: "".} +proc exitnow*(code: int): void {.importc: "_exit", header: "".} proc access*(a1: cstring, a2: cint): cint {.importc, header: "".} proc alarm*(a1: cint): cint {.importc, header: "".} proc chdir*(a1: cstring): cint {.importc, header: "".} diff --git a/lib/pure/osproc.nim b/lib/pure/osproc.nim index 40877f638..aa2f6f937 100644 --- a/lib/pure/osproc.nim +++ b/lib/pure/osproc.nim @@ -600,13 +600,16 @@ elif not defined(useNimRtl): sysArgs: cstringArray sysEnv: cstringArray workingDir: cstring - pStdin, pStdout, pStderr: array[0..1, cint] + pStdin, pStdout, pStderr, pErrorPipe: array[0..1, cint] optionPoUsePath: bool optionPoParentStreams: bool optionPoStdErrToStdOut: bool proc startProcessAuxSpawn(data: TStartProcessData): TPid {.tags: [FExecIO, FReadEnv].} - proc startProcessAfterFork(data: ptr TStartProcessData) {.tags: [FExecIO, FReadEnv], cdecl.} + + proc startProcessAuxFork(data: TStartProcessData): TPid {.tags: [FExecIO, FReadEnv].} + proc startProcessAfterFork(data: ptr TStartProcessData) {. + tags: [FExecIO, FReadEnv], noStackFrame, cdecl.} proc startProcess(command: string, workingDir: string = "", @@ -658,23 +661,11 @@ elif not defined(useNimRtl): data.optionPoStdErrToStdOut = poStdErrToStdOut in options data.workingDir = workingDir - when defined(useClone): - const stackSize = 8096 - let stackEnd = cast[clong](alloc(stackSize)) - let stack = cast[pointer](stackEnd + stackSize) - let fn: pointer = startProcessAfterFork - pid = clone(fn, stack, - cint(CLONE_VM or CLONE_VFORK or SIGCHLD), - pointer(addr data), nil, nil, nil) - if pid < 0: osError(osLastError()) - elif defined(posix_spawn) and not defined(useFork): + when defined(posix_spawn) and not defined(useFork) and not defined(useClone): pid = startProcessAuxSpawn(data) else: - pid = fork() - if pid < 0: osError(osLastError()) - if pid == 0: - startProcessAfterFork(addr(data)) + pid = startProcessAuxFork(data) # Parent process. Copy process information. if poEchoCmd in options: @@ -747,30 +738,79 @@ elif not defined(useNimRtl): chck res return pid + proc startProcessAuxFork(data: TStartProcessData): TPid = + if pipe(data.pErrorPipe) != 0: + osError(osLastError()) + + finally: + discard close(data.pErrorPipe[readIdx]) + + var pid: TPid + var dataCopy = data + + if defined(useClone): + const stackSize = 8096 + let stackEnd = cast[clong](alloc(stackSize)) + let stack = cast[pointer](stackEnd + stackSize) + let fn: pointer = startProcessAfterFork + pid = clone(fn, stack, + cint(CLONE_VM or CLONE_VFORK or SIGCHLD), + pointer(addr dataCopy), nil, nil, nil) + discard close(data.pErrorPipe[writeIdx]) + dealloc(stack) + else: + pid = fork() + if pid == 0: + startProcessAfterFork(addr(dataCopy)) + exitnow(1) + + discard close(data.pErrorPipe[writeIdx]) + if pid < 0: osError(osLastError()) + + var error: cint + let sizeRead = read(data.pErrorPipe[readIdx], addr error, sizeof(error)) + if sizeRead == sizeof(error): + osError($strerror(error)) + + return pid + + proc startProcessFail(data: ptr TStartProcessData) {.noStackFrame.} = + var error: cint = errno + discard write(data.pErrorPipe[writeIdx], addr error, sizeof(error)) + exitnow(1) + proc startProcessAfterFork(data: ptr TStartProcessData) = # Warning: no GC here! + # Or anythink that touches global structures - all called nimrod procs + # must be marked with noStackFrame. Inspect C code after making changes. if not data.optionPoParentStreams: discard close(data.pStdin[writeIdx]) - if dup2(data.pStdin[readIdx], readIdx) < 0: osError(osLastError()) + if dup2(data.pStdin[readIdx], readIdx) < 0: + startProcessFail(data) discard close(data.pStdout[readIdx]) - if dup2(data.pStdout[writeIdx], writeIdx) < 0: osError(osLastError()) + if dup2(data.pStdout[writeIdx], writeIdx) < 0: + startProcessFail(data) discard close(data.pStderr[readIdx]) if data.optionPoStdErrToStdOut: - if dup2(data.pStdout[writeIdx], 2) < 0: osError(osLastError()) + if dup2(data.pStdout[writeIdx], 2) < 0: + startProcessFail(data) else: - if dup2(data.pStderr[writeIdx], 2) < 0: osError(osLastError()) + if dup2(data.pStderr[writeIdx], 2) < 0: + startProcessFail(data) if data.workingDir.len > 0: if chdir(data.workingDir) < 0: - quit("chdir failed") + startProcessFail(data) + + discard close(data.pErrorPipe[readIdx]) + discard fcntl(data.pErrorPipe[writeIdx], F_SETFD, FD_CLOEXEC) if data.optionPoUsePath: discard execvpe(data.sysCommand, data.sysArgs, data.sysEnv) else: discard execve(data.sysCommand, data.sysArgs, data.sysEnv) - # too risky to raise an exception here: - quit("execve call failed: " & $strerror(errno)) + startProcessFail(data) proc close(p: PProcess) = if p.inStream != nil: close(p.inStream) -- cgit 1.4.1-2-gfad0 From c2b50c0e38d758f86ae8909649da545235a6b7c6 Mon Sep 17 00:00:00 2001 From: Grzegorz Adam Hankiewicz Date: Sat, 15 Feb 2014 17:50:40 +0100 Subject: Adds posix.timegm(), brother of posix.mktime(). --- lib/posix/posix.nim | 1 + 1 file changed, 1 insertion(+) (limited to 'lib/posix') diff --git a/lib/posix/posix.nim b/lib/posix/posix.nim index 41260b36f..e2c436749 100644 --- a/lib/posix/posix.nim +++ b/lib/posix/posix.nim @@ -2265,6 +2265,7 @@ proc gmtime_r*(a1: var TTime, a2: var Ttm): ptr Ttm {.importc, header: " proc localtime*(a1: var TTime): ptr Ttm {.importc, header: "".} proc localtime_r*(a1: var TTime, a2: var Ttm): ptr Ttm {.importc, header: "".} proc mktime*(a1: var Ttm): TTime {.importc, header: "".} +proc timegm*(a1: var Ttm): TTime {.importc, header: "".} proc nanosleep*(a1, a2: var Ttimespec): cint {.importc, header: "".} proc strftime*(a1: cstring, a2: int, a3: cstring, a4: var Ttm): int {.importc, header: "".} -- cgit 1.4.1-2-gfad0 From 3b5825e9bcf09bb6da98601d07af10c2640f4cd1 Mon Sep 17 00:00:00 2001 From: Dominik Picheta Date: Sat, 22 Feb 2014 17:22:19 +0000 Subject: Implemented selector support for asyncio2. --- lib/posix/epoll.nim | 8 +- lib/posix/posix.nim | 2 +- lib/pure/asyncio2.nim | 231 +++++++++++++++++++++++++--- lib/pure/net.nim | 17 ++- lib/pure/selectors.nim | 355 ++++++++++++++++++++++---------------------- lib/pure/sockets2.nim | 8 +- tests/async/tasyncawait.nim | 4 +- 7 files changed, 415 insertions(+), 210 deletions(-) (limited to 'lib/posix') diff --git a/lib/posix/epoll.nim b/lib/posix/epoll.nim index d50394f60..366521551 100644 --- a/lib/posix/epoll.nim +++ b/lib/posix/epoll.nim @@ -7,6 +7,8 @@ # distribution, for details about the copyright. # +from posix import TSocketHandle + const EPOLLIN* = 0x00000001 EPOLLPRI* = 0x00000002 @@ -33,8 +35,8 @@ const type epoll_data* {.importc: "union epoll_data", header: "", pure, final.} = object # TODO: This is actually a union. - thePtr* {.importc: "ptr".}: pointer # \ - #fd*: cint + #thePtr* {.importc: "ptr".}: pointer + fd*: cint # \ #u32*: uint32 #u64*: uint64 @@ -54,7 +56,7 @@ proc epoll_create1*(flags: cint): cint {.importc: "epoll_create1", ## Same as epoll_create but with an FLAGS parameter. The unused SIZE ## parameter has been dropped. -proc epoll_ctl*(epfd: cint; op: cint; fd: cint; event: ptr epoll_event): cint {. +proc epoll_ctl*(epfd: cint; op: cint; fd: cint | TSocketHandle; event: ptr epoll_event): cint {. importc: "epoll_ctl", header: "".} ## Manipulate an epoll instance "epfd". Returns 0 in case of success, ## -1 in case of error ( the "errno" variable will contain the diff --git a/lib/posix/posix.nim b/lib/posix/posix.nim index 41260b36f..bb4039c1b 100644 --- a/lib/posix/posix.nim +++ b/lib/posix/posix.nim @@ -2356,7 +2356,7 @@ proc FD_ZERO*(a1: var TFdSet) {.importc, header: "".} proc pselect*(a1: cint, a2, a3, a4: ptr TFdSet, a5: ptr Ttimespec, a6: var Tsigset): cint {.importc, header: "".} -proc select*(a1: cint, a2, a3, a4: ptr TFdSet, a5: ptr Ttimeval): cint {. +proc select*(a1: cint | TSocketHandle, a2, a3, a4: ptr TFdSet, a5: ptr Ttimeval): cint {. importc, header: "".} when hasSpawnH: diff --git a/lib/pure/asyncio2.nim b/lib/pure/asyncio2.nim index 8541b2ba7..12d4cb5a3 100644 --- a/lib/pure/asyncio2.nim +++ b/lib/pure/asyncio2.nim @@ -9,8 +9,6 @@ import os, oids, tables, strutils, macros -import winlean - import sockets2, net ## Asyncio2 @@ -93,7 +91,10 @@ proc failed*[T](future: PFuture[T]): bool = ## Determines whether ``future`` completed with an error. future.error != nil -when defined(windows): +# TODO: Get rid of register. Do it implicitly. + +when defined(windows) or defined(nimdoc): + import winlean type TCompletionKey = dword @@ -293,7 +294,10 @@ when defined(windows): proc recv*(p: PDispatcher, socket: TSocketHandle, size: int, flags: int = 0): PFuture[string] = ## Reads ``size`` bytes from ``socket``. Returned future will complete once - ## all of the requested data is read. + ## all of the requested data is read. If socket is disconnected during the + ## recv operation then the future may complete with only a part of the + ## requested data read. If socket is disconnected and no data is available + ## to be read then the future will complete with a value of ``""``. var retFuture = newFuture[string]() @@ -448,24 +452,206 @@ when defined(windows): return retFuture - proc accept*(p: PDispatcher, socket: TSocketHandle): PFuture[TSocketHandle] = - ## Accepts a new connection. Returns a future containing the client socket - ## corresponding to that connection. - ## The future will complete when the connection is successfully accepted. - var retFut = newFuture[TSocketHandle]() - var fut = p.acceptAddr(socket) - fut.callback = - proc (future: PFuture[tuple[address: string, client: TSocketHandle]]) = - assert future.finished - if future.failed: - retFut.fail(future.error) - else: - retFut.complete(future.read.client) - return retFut - initAll() else: - # TODO: Selectors. + import selectors + from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK + type + TCallback = proc (sock: TSocketHandle): bool {.closure.} + + PData* = ref object of PObject + sock: TSocketHandle + readCBs: seq[TCallback] + writeCBs: seq[TCallback] + + PDispatcher* = ref object + selector: PSelector + + proc newDispatcher*(): PDispatcher = + new result + result.selector = newSelector() + + proc update(p: PDispatcher, sock: TSocketHandle, events: set[TEvent]) = + assert sock in p.selector + echo("Update: ", events) + if events == {}: + discard p.selector.unregister(sock) + else: + discard p.selector.update(sock, events) + + proc addRead(p: PDispatcher, sock: TSocketHandle, cb: TCallback) = + if sock notin p.selector: + var data = PData(sock: sock, readCBs: @[cb], writeCBs: @[]) + p.selector.register(sock, {EvRead}, data.PObject) + else: + p.selector[sock].data.PData.readCBs.add(cb) + p.update(sock, p.selector[sock].events + {EvRead}) + + proc addWrite(p: PDispatcher, sock: TSocketHandle, cb: TCallback) = + if sock notin p.selector: + var data = PData(sock: sock, readCBs: @[], writeCBs: @[cb]) + p.selector.register(sock, {EvWrite}, data.PObject) + else: + p.selector[sock].data.PData.writeCBs.add(cb) + p.update(sock, p.selector[sock].events + {EvWrite}) + + proc poll*(p: PDispatcher, timeout = 500) = + for info in p.selector.select(timeout): + let data = PData(info.key.data) + assert data.sock == info.key.fd + echo("R: ", data.readCBs.len, " W: ", data.writeCBs.len, ". ", info.events) + + if EvRead in info.events: + var newReadCBs: seq[TCallback] = @[] + for cb in data.readCBs: + if not cb(data.sock): + # Callback wants to be called again. + newReadCBs.add(cb) + data.readCBs = newReadCBs + + if EvWrite in info.events: + var newWriteCBs: seq[TCallback] = @[] + for cb in data.writeCBs: + if not cb(data.sock): + # Callback wants to be called again. + newWriteCBs.add(cb) + data.writeCBs = newWriteCBs + + var newEvents: set[TEvent] + if data.readCBs.len != 0: newEvents = {EvRead} + if data.writeCBs.len != 0: newEvents = newEvents + {EvWrite} + p.update(data.sock, newEvents) + + proc connect*(p: PDispatcher, socket: TSocketHandle, address: string, port: TPort, + af = AF_INET): PFuture[int] = + var retFuture = newFuture[int]() + + proc cb(sock: TSocketHandle): bool = + # We have connected. + retFuture.complete(0) + return true + + var aiList = getAddrInfo(address, port, af) + var success = false + var lastError: TOSErrorCode + var it = aiList + while it != nil: + var ret = connect(socket, it.ai_addr, it.ai_addrlen.TSocklen) + if ret == 0: + # Request to connect completed immediately. + success = true + retFuture.complete(0) + break + else: + lastError = osLastError() + if lastError.int32 == EINTR or lastError.int32 == EINPROGRESS: + success = true + addWrite(p, socket, cb) + break + else: + success = false + it = it.ai_next + + dealloc(aiList) + if not success: + retFuture.fail(newException(EOS, osErrorMsg(lastError))) + return retFuture + + proc recv*(p: PDispatcher, socket: TSocketHandle, size: int, + flags: int = 0): PFuture[string] = + var retFuture = newFuture[string]() + + var readBuffer = newString(size) + var sizeRead = 0 + + proc cb(sock: TSocketHandle): bool = + result = true + let netSize = size - sizeRead + let res = recv(sock, addr readBuffer[sizeRead], netSize, flags.cint) + if res < 0: + let lastError = osLastError() + if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: + retFuture.fail(newException(EOS, osErrorMsg(lastError))) + else: + result = false # We still want this callback to be called. + elif res == 0: + # Disconnected + if sizeRead == 0: + retFuture.complete("") + else: + readBuffer.setLen(sizeRead) + retFuture.complete(readBuffer) + else: + sizeRead.inc(res) + if res != netSize: + result = false # We want to read all the data requested. + else: + retFuture.complete(readBuffer) + + addRead(p, socket, cb) + return retFuture + + proc send*(p: PDispatcher, socket: TSocketHandle, data: string): PFuture[int] = + var retFuture = newFuture[int]() + + var written = 0 + + proc cb(sock: TSocketHandle): bool = + result = true + let netSize = data.len-written + var d = data.cstring + let res = send(sock, addr d[written], netSize, 0.cint) + if res < 0: + let lastError = osLastError() + if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: + retFuture.fail(newException(EOS, osErrorMsg(lastError))) + else: + result = false # We still want this callback to be called. + else: + written.inc(res) + if res != netSize: + result = false # We still have data to send. + else: + retFuture.complete(0) + addWrite(p, socket, cb) + return retFuture + + + proc acceptAddr*(p: PDispatcher, socket: TSocketHandle): + PFuture[tuple[address: string, client: TSocketHandle]] = + var retFuture = newFuture[tuple[address: string, client: TSocketHandle]]() + proc cb(sock: TSocketHandle): bool = + result = true + var sockAddress: Tsockaddr_in + var addrLen = sizeof(sockAddress).TSocklen + var client = accept(sock, cast[ptr TSockAddr](addr(sockAddress)), + addr(addrLen)) + if client == osInvalidSocket: + let lastError = osLastError() + assert lastError.int32 notin {EWOULDBLOCK, EAGAIN} + if lastError.int32 == EINTR: + return false + else: + retFuture.fail(newException(EOS, osErrorMsg(lastError))) + else: + retFuture.complete(($inet_ntoa(sockAddress.sin_addr), client)) + addRead(p, socket, cb) + return retFuture + +proc accept*(p: PDispatcher, socket: TSocketHandle): PFuture[TSocketHandle] = + ## Accepts a new connection. Returns a future containing the client socket + ## corresponding to that connection. + ## The future will complete when the connection is successfully accepted. + var retFut = newFuture[TSocketHandle]() + var fut = p.acceptAddr(socket) + fut.callback = + proc (future: PFuture[tuple[address: string, client: TSocketHandle]]) = + assert future.finished + if future.failed: + retFut.fail(future.error) + else: + retFut.complete(future.read.client) + return retFut # -- Await Macro @@ -665,8 +851,7 @@ when isMainModule: var p = newDispatcher() var sock = socket() - #sock.setBlocking false - p.register(sock) + sock.setBlocking false when false: @@ -706,7 +891,7 @@ when isMainModule: var recvF = p.recv(sock, 10) recvF.callback = proc (future: PFuture[string]) = - echo("Read: ", future.read) + echo("Read ", future.read.len, ": ", future.read.repr) else: diff --git a/lib/pure/net.nim b/lib/pure/net.nim index bdcae677e..0ec007009 100644 --- a/lib/pure/net.nim +++ b/lib/pure/net.nim @@ -37,4 +37,19 @@ proc bindAddr*(socket: TSocket, port = TPort(0), address = "") {. if bindAddr(socket, aiList.ai_addr, aiList.ai_addrlen.TSocklen) < 0'i32: dealloc(aiList) osError(osLastError()) - dealloc(aiList) \ No newline at end of file + dealloc(aiList) + +proc setBlocking*(s: TSocket, blocking: bool) {.tags: [].} = + ## Sets blocking mode on socket + when defined(Windows): + var mode = clong(ord(not blocking)) # 1 for non-blocking, 0 for blocking + if ioctlsocket(s, FIONBIO, addr(mode)) == -1: + osError(osLastError()) + else: # BSD sockets + var x: int = fcntl(s, F_GETFL, 0) + if x == -1: + osError(osLastError()) + else: + var mode = if blocking: x and not O_NONBLOCK else: x or O_NONBLOCK + if fcntl(s, F_SETFL, mode) == -1: + osError(osLastError()) \ No newline at end of file diff --git a/lib/pure/selectors.nim b/lib/pure/selectors.nim index 83c158da1..6482a01a6 100644 --- a/lib/pure/selectors.nim +++ b/lib/pure/selectors.nim @@ -1,7 +1,7 @@ # # # Nimrod's Runtime Library -# (c) Copyright 2013 Dominik Picheta +# (c) Copyright 2014 Dominik Picheta # # See the file "copying.txt", included in this # distribution, for details about the copyright. @@ -9,212 +9,211 @@ # TODO: Docs. -import tables, os, unsigned -when defined(windows): - import winlean -else: - import posix +import tables, os, unsigned, hashes + +when defined(linux): import posix, epoll +elif defined(windows): import winlean + +proc hash*(x: TSocketHandle): THash {.borrow.} type TEvent* = enum EvRead, EvWrite - TSelectorKey* = object - fd: cint - events: set[TEvent] - data: PObject - - TReadyInfo* = tuple[key: TSelectorKey, events: set[TEvent]] - - PSelector* = ref object of PObject ## Selector interface. - fds*: TTable[cint, TSelectorKey] - registerImpl*: proc (s: PSelector, fd: cint, events: set[TEvent], - data: PObject): TSelectorKey {.nimcall, tags: [FWriteIO].} - unregisterImpl*: proc (s: PSelector, fd: cint): TSelectorKey {.nimcall, tags: [FWriteIO].} - selectImpl*: proc (s: PSelector, timeout: int): seq[TReadyInfo] {.nimcall, tags: [FReadIO].} - closeImpl*: proc (s: PSelector) {.nimcall.} - -template initSelector(r: expr) = - new r - r.fds = initTable[cint, TSelectorKey]() - -proc register*(s: PSelector, fd: cint, events: set[TEvent], data: PObject): - TSelectorKey = - if not s.registerImpl.isNil: result = s.registerImpl(s, fd, events, data) - -proc unregister*(s: PSelector, fd: cint): TSelectorKey = - ## - ## **Note:** For the ``epoll`` implementation the resulting ``TSelectorKey`` - ## will only have the ``fd`` field set. This is an optimisation and may - ## change in the future if a viable use case is presented. - if not s.unregisterImpl.isNil: result = s.unregisterImpl(s, fd) - -proc select*(s: PSelector, timeout = 500): seq[TReadyInfo] = - ## - ## The ``events`` field of the returned ``key`` contains the original events - ## for which the ``fd`` was bound. This is contrary to the ``events`` field - ## of the ``TReadyInfo`` tuple which determines which events are ready - ## on the ``fd``. - - if not s.selectImpl.isNil: result = s.selectImpl(s, timeout) + PSelectorKey* = ref object + fd*: TSocketHandle + events*: set[TEvent] ## The events which ``fd`` listens for. + data*: PObject ## User object. -proc close*(s: PSelector) = - if not s.closeImpl.isNil: s.closeImpl(s) + TReadyInfo* = tuple[key: PSelectorKey, events: set[TEvent]] -# ---- Select() ---------------------------------------------------------------- - -type - PSelectSelector* = ref object of PSelector ## Implementation of select() - -proc ssRegister(s: PSelector, fd: cint, events: set[TEvent], - data: PObject): TSelectorKey = - if s.fds.hasKey(fd): - raise newException(EInvalidValue, "FD already exists in selector.") - var sk = TSelectorKey(fd: fd, events: events, data: data) - s.fds[fd] = sk - result = sk - -proc ssUnregister(s: PSelector, fd: cint): TSelectorKey = - result = s.fds[fd] - s.fds.del(fd) - -proc ssClose(s: PSelector) = nil - -proc timeValFromMilliseconds(timeout: int): TTimeVal = - if timeout != -1: - var seconds = timeout div 1000 - result.tv_sec = seconds.int32 - result.tv_usec = ((timeout - seconds * 1000) * 1000).int32 - -proc createFdSet(rd, wr: var TFdSet, fds: TTable[cint, TSelectorKey], - m: var int) = - FD_ZERO(rd); FD_ZERO(wr) - for k, v in pairs(fds): - if EvRead in v.events: - m = max(m, int(k)) - FD_SET(k, rd) - if EvWrite in v.events: - m = max(m, int(k)) - FD_SET(k, wr) - -proc getReadyFDs(rd, wr: var TFdSet, fds: TTable[cint, TSelectorKey]): - seq[TReadyInfo] = - result = @[] - for k, v in pairs(fds): - var events: set[TEvent] = {} - if FD_ISSET(k, rd) != 0'i32: - events = events + {EvRead} - if FD_ISSET(k, wr) != 0'i32: - events = events + {EvWrite} - result.add((v, events)) - -proc select(fds: TTable[cint, TSelectorKey], timeout = 500): - seq[TReadyInfo] = - var tv {.noInit.}: TTimeVal = timeValFromMilliseconds(timeout) - - var rd, wr: TFdSet - var m = 0 - createFdSet(rd, wr, fds, m) - - var retCode = 0 - if timeout != -1: - retCode = int(select(cint(m+1), addr(rd), addr(wr), nil, addr(tv))) - else: - retCode = int(select(cint(m+1), addr(rd), addr(wr), nil, nil)) - - if retCode < 0: - OSError(OSLastError()) - elif retCode == 0: - return @[] - else: - return getReadyFDs(rd, wr, fds) - -proc ssSelect(s: PSelector, timeout: int): seq[TReadyInfo] = - result = select(s.fds, timeout) - -proc newSelectSelector*(): PSelectSelector = - initSelector(result) - result.registerImpl = ssRegister - result.unregisterImpl = ssUnregister - result.selectImpl = ssSelect - result.closeImpl = ssClose - -# ---- Epoll ------------------------------------------------------------------- - -when defined(linux): - import epoll +when defined(linux) or defined(nimdoc): type - PEpollSelector* = ref object of PSelector + PSelector* = ref object epollFD: cint events: array[64, ptr epoll_event] + fds: TTable[TSocketHandle, PSelectorKey] - TDataWrapper = object - fd: cint - boundEvents: set[TEvent] ## The events which ``fd`` listens for. - data: PObject ## User object. - - proc esRegister(s: PSelector, fd: cint, events: set[TEvent], - data: PObject): TSelectorKey = - var es = PEpollSelector(s) - var event: epoll_event + proc createEventStruct(events: set[TEvent], fd: TSocketHandle): epoll_event = if EvRead in events: - event.events = EPOLLIN + result.events = EPOLLIN if EvWrite in events: - event.events = event.events or EPOLLOUT - - var dw = cast[ptr TDataWrapper](alloc0(sizeof(TDataWrapper))) # TODO: This needs to be dealloc'd - dw.fd = fd - dw.boundEvents = events - dw.data = data - event.data.thePtr = dw + result.events = result.events or EPOLLOUT + result.data.fd = fd.cint + + proc register*(s: PSelector, fd: TSocketHandle, events: set[TEvent], + data: PObject): PSelectorKey {.discardable.} = + ## Registers file descriptor ``fd`` to selector ``s`` with a set of TEvent + ## ``events``. + if s.fds.hasKey(fd): + raise newException(EInvalidValue, "File descriptor already exists.") - if epoll_ctl(es.epollFD, EPOLL_CTL_ADD, fd, addr(event)) != 0: + var event = createEventStruct(events, fd) + + if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fd, addr(event)) != 0: OSError(OSLastError()) - - result = TSelectorKey(fd: fd, events: events, data: data) - proc esUnregister(s: PSelector, fd: cint): TSelectorKey = - # We cannot find out the information about this ``fd`` from the epoll - # context. As such I will simply return an almost empty TSelectorKey. - var es = PEpollSelector(s) - if epoll_ctl(es.epollFD, EPOLL_CTL_DEL, fd, nil) != 0: + var key = PSelectorKey(fd: fd, events: events, data: data) + + s.fds[fd] = key + result = key + + proc update*(s: PSelector, fd: TSocketHandle, + events: set[TEvent]): PSelectorKey {.discardable.} = + ## Updates the events which ``fd`` wants notifications for. + if not s.fds.hasKey(fd): + raise newException(EInvalidValue, "File descriptor not found.") + var event = createEventStruct(events, fd) + + s.fds[fd].events = events + echo("About to update") + if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0: OSError(OSLastError()) - # We could fill in the ``fds`` TTable to get the info, but that wouldn't - # be nice for our memory. - result = TSelectorKey(fd: fd, events: {}, data: nil) - - proc esClose(s: PSelector) = - var es = PEpollSelector(s) - if es.epollFD.close() != 0: OSError(OSLastError()) - dealloc(addr es.events) # TODO: Test this + echo("finished updating") + result = s.fds[fd] - proc esSelect(s: PSelector, timeout: int): seq[TReadyInfo] = + proc unregister*(s: PSelector, fd: TSocketHandle): PSelectorKey {.discardable.} = + if not s.fds.hasKey(fd): + raise newException(EInvalidValue, "File descriptor not found.") + if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fd, nil) != 0: + OSError(OSLastError()) + result = s.fds[fd] + s.fds.del(fd) + + proc close*(s: PSelector) = + if s.epollFD.close() != 0: OSError(OSLastError()) + dealloc(addr s.events) # TODO: Test this + + proc select*(s: PSelector, timeout: int): seq[TReadyInfo] = + ## + ## The ``events`` field of the returned ``key`` contains the original events + ## for which the ``fd`` was bound. This is contrary to the ``events`` field + ## of the ``TReadyInfo`` tuple which determines which events are ready + ## on the ``fd``. result = @[] - var es = PEpollSelector(s) - let evNum = epoll_wait(es.epollFD, es.events[0], 64.cint, timeout.cint) + let evNum = epoll_wait(s.epollFD, s.events[0], 64.cint, timeout.cint) if evNum < 0: OSError(OSLastError()) if evNum == 0: return @[] for i in 0 .. 0: echo ready[0].events i.inc if i == 6: + assert selector.unregister(sock.getFD).fd == sock.getFD selector.close() break diff --git a/lib/pure/sockets2.nim b/lib/pure/sockets2.nim index f8284b339..031217b90 100644 --- a/lib/pure/sockets2.nim +++ b/lib/pure/sockets2.nim @@ -17,11 +17,13 @@ when hostos == "solaris": when defined(Windows): import winlean + export ioctlsocket else: import posix + export fcntl, F_GETFL, O_NONBLOCK, F_SETFL export TSocketHandle, TSockaddr_in, TAddrinfo, INADDR_ANY, TSockAddr, TSockLen, - inet_ntoa + inet_ntoa, recv, `==`, connect, send, accept type @@ -63,10 +65,10 @@ type when defined(windows): let - OSInvalidSocket* = winlean.INVALID_SOCKET + osInvalidSocket* = winlean.INVALID_SOCKET else: let - OSInvalidSocket* = posix.INVALID_SOCKET + osInvalidSocket* = posix.INVALID_SOCKET proc `==`*(a, b: TPort): bool {.borrow.} ## ``==`` for ports. diff --git a/tests/async/tasyncawait.nim b/tests/async/tasyncawait.nim index bcaffc287..bde5bf8c8 100644 --- a/tests/async/tasyncawait.nim +++ b/tests/async/tasyncawait.nim @@ -21,7 +21,7 @@ proc sendMessages(disp: PDispatcher, client: TSocketHandle): PFuture[int] {.asyn proc launchSwarm(disp: PDispatcher, port: TPort): PFuture[int] {.async.} = for i in 0 .. Date: Sun, 9 Mar 2014 13:49:38 +0000 Subject: Fixes to asyncio2 on Linux. --- lib/posix/epoll.nim | 2 +- lib/pure/asyncio2.nim | 28 ++++++++++++++-------------- lib/pure/selectors.nim | 26 ++++++++++++++++++-------- lib/pure/sockets2.nim | 22 ++++++++++++++++++++++ tests/async/tasyncawait.nim | 10 +++++++++- 5 files changed, 64 insertions(+), 24 deletions(-) (limited to 'lib/posix') diff --git a/lib/posix/epoll.nim b/lib/posix/epoll.nim index 366521551..57a2f001f 100644 --- a/lib/posix/epoll.nim +++ b/lib/posix/epoll.nim @@ -36,7 +36,7 @@ type epoll_data* {.importc: "union epoll_data", header: "", pure, final.} = object # TODO: This is actually a union. #thePtr* {.importc: "ptr".}: pointer - fd*: cint # \ + fd* {.importc: "fd".}: cint # \ #u32*: uint32 #u64*: uint64 diff --git a/lib/pure/asyncio2.nim b/lib/pure/asyncio2.nim index 12d4cb5a3..60d489dda 100644 --- a/lib/pure/asyncio2.nim +++ b/lib/pure/asyncio2.nim @@ -473,7 +473,6 @@ else: proc update(p: PDispatcher, sock: TSocketHandle, events: set[TEvent]) = assert sock in p.selector - echo("Update: ", events) if events == {}: discard p.selector.unregister(sock) else: @@ -499,23 +498,25 @@ else: for info in p.selector.select(timeout): let data = PData(info.key.data) assert data.sock == info.key.fd - echo("R: ", data.readCBs.len, " W: ", data.writeCBs.len, ". ", info.events) if EvRead in info.events: - var newReadCBs: seq[TCallback] = @[] - for cb in data.readCBs: + # Callback may add items to ``data.readCBs`` which causes issues if + # we are iterating over ``data.readCBs`` at the same time. We therefore + # make a copy to iterate over. + let currentCBs = data.readCBs + data.readCBs = @[] + for cb in currentCBs: if not cb(data.sock): # Callback wants to be called again. - newReadCBs.add(cb) - data.readCBs = newReadCBs + data.readCBs.add(cb) if EvWrite in info.events: - var newWriteCBs: seq[TCallback] = @[] - for cb in data.writeCBs: + let currentCBs = data.writeCBs + data.writeCBs = @[] + for cb in currentCBs: if not cb(data.sock): # Callback wants to be called again. - newWriteCBs.add(cb) - data.writeCBs = newWriteCBs + data.writeCBs.add(cb) var newEvents: set[TEvent] if data.readCBs.len != 0: newEvents = {EvRead} @@ -615,7 +616,6 @@ else: retFuture.complete(0) addWrite(p, socket, cb) return retFuture - proc acceptAddr*(p: PDispatcher, socket: TSocketHandle): PFuture[tuple[address: string, client: TSocketHandle]] = @@ -854,7 +854,7 @@ when isMainModule: sock.setBlocking false - when false: + when true: # Await tests proc main(p: PDispatcher): PFuture[int] {.async.} = discard await p.connect(sock, "irc.freenode.net", TPort(6667)) @@ -880,7 +880,7 @@ when isMainModule: else: - when false: + when true: var f = p.connect(sock, "irc.freenode.org", TPort(6667)) f.callback = @@ -919,4 +919,4 @@ when isMainModule: - \ No newline at end of file + diff --git a/lib/pure/selectors.nim b/lib/pure/selectors.nim index 6482a01a6..e086ee3ab 100644 --- a/lib/pure/selectors.nim +++ b/lib/pure/selectors.nim @@ -10,11 +10,13 @@ # TODO: Docs. import tables, os, unsigned, hashes +import sockets2 when defined(linux): import posix, epoll elif defined(windows): import winlean proc hash*(x: TSocketHandle): THash {.borrow.} +proc `$`*(x: TSocketHandle): string {.borrow.} type TEvent* = enum @@ -31,7 +33,7 @@ when defined(linux) or defined(nimdoc): type PSelector* = ref object epollFD: cint - events: array[64, ptr epoll_event] + events: array[64, epoll_event] fds: TTable[TSocketHandle, PSelectorKey] proc createEventStruct(events: set[TEvent], fd: TSocketHandle): epoll_event = @@ -66,17 +68,25 @@ when defined(linux) or defined(nimdoc): var event = createEventStruct(events, fd) s.fds[fd].events = events - echo("About to update") if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0: + if OSLastError().cint == ENOENT: + # Socket has been closed. Epoll automatically removes disconnected + # sockets. + s.fds.del(fd) + osError("Socket has been disconnected") + OSError(OSLastError()) - echo("finished updating") result = s.fds[fd] proc unregister*(s: PSelector, fd: TSocketHandle): PSelectorKey {.discardable.} = if not s.fds.hasKey(fd): raise newException(EInvalidValue, "File descriptor not found.") if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fd, nil) != 0: - OSError(OSLastError()) + if osLastError().cint == ENOENT: + # Socket has been closed. Epoll automatically removes disconnected + # sockets so its already been removed. + else: + OSError(OSLastError()) result = s.fds[fd] s.fds.del(fd) @@ -92,21 +102,21 @@ when defined(linux) or defined(nimdoc): ## on the ``fd``. result = @[] - let evNum = epoll_wait(s.epollFD, s.events[0], 64.cint, timeout.cint) + let evNum = epoll_wait(s.epollFD, addr s.events[0], 64.cint, timeout.cint) if evNum < 0: OSError(OSLastError()) if evNum == 0: return @[] for i in 0 ..