diff options
author | Araq <rumpf_a@web.de> | 2014-03-13 02:52:51 +0100 |
---|---|---|
committer | Araq <rumpf_a@web.de> | 2014-03-13 02:52:51 +0100 |
commit | 1c35fb3c89bbac393b50c4bc6fe8205af2b7fb9d (patch) | |
tree | 6f72beee37e3873a1434e10e8bb0fd156acdef78 /lib/pure | |
parent | 2fc84325c7cfba82c54c652a5f1a795845b169a9 (diff) | |
parent | d2f130c3fc1ffa786e8d323c3cd3c51fd45124da (diff) | |
download | Nim-1c35fb3c89bbac393b50c4bc6fe8205af2b7fb9d.tar.gz |
Merge branch 'devel' of https://github.com/Araq/Nimrod into devel
Diffstat (limited to 'lib/pure')
-rw-r--r-- | lib/pure/asyncio2.nim | 111 | ||||
-rw-r--r-- | lib/pure/os.nim | 11 | ||||
-rw-r--r-- | lib/pure/selectors.nim | 65 |
3 files changed, 122 insertions, 65 deletions
diff --git a/lib/pure/asyncio2.nim b/lib/pure/asyncio2.nim index 60d489dda..c37370b7b 100644 --- a/lib/pure/asyncio2.nim +++ b/lib/pure/asyncio2.nim @@ -94,7 +94,8 @@ proc failed*[T](future: PFuture[T]): bool = # TODO: Get rid of register. Do it implicitly. when defined(windows) or defined(nimdoc): - import winlean + import winlean, sets, hashes + #from hashes import THash type TCompletionKey = dword @@ -105,7 +106,7 @@ when defined(windows) or defined(nimdoc): PDispatcher* = ref object ioPort: THandle - hasHandles: bool + handles: TSet[TSocketHandle] TCustomOverlapped = object Internal*: DWORD @@ -117,21 +118,32 @@ when defined(windows) or defined(nimdoc): PCustomOverlapped = ptr TCustomOverlapped + proc hash(x: TSocketHandle): THash {.borrow.} + proc newDispatcher*(): PDispatcher = ## Creates a new Dispatcher instance. new result result.ioPort = CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1) + result.handles = initSet[TSocketHandle]() proc register*(p: PDispatcher, sock: TSocketHandle) = ## Registers ``sock`` with the dispatcher ``p``. if CreateIOCompletionPort(sock.THandle, p.ioPort, cast[TCompletionKey](sock), 1) == 0: OSError(OSLastError()) - p.hasHandles = true + p.handles.incl(sock) + # TODO: fd closure detection, we need to remove the fd from handles set + + proc verifyPresence(p: PDispatcher, sock: TSocketHandle) = + ## Ensures that socket has been registered with the dispatcher. + if sock notin p.handles: + raise newException(EInvalidValue, + "Operation performed on a socket which has not been registered with" & + " the dispatcher yet.") proc poll*(p: PDispatcher, timeout = 500) = ## Waits for completion events and processes them. - if not p.hasHandles: + if p.handles.len == 0: raise newException(EInvalidValue, "No handles registered in dispatcher.") let llTimeout = @@ -237,7 +249,7 @@ when defined(windows) or defined(nimdoc): ## ## Returns a ``PFuture`` which will complete when the connection succeeds ## or an error occurs. - + verifyPresence(p, socket) var retFuture = newFuture[int]()# TODO: Change to void when that regression is fixed. # Apparently ``ConnectEx`` expects the socket to be initially bound: var saddr: Tsockaddr_in @@ -298,7 +310,7 @@ when defined(windows) or defined(nimdoc): ## recv operation then the future may complete with only a part of the ## requested data read. If socket is disconnected and no data is available ## to be read then the future will complete with a value of ``""``. - + verifyPresence(p, socket) var retFuture = newFuture[string]() var dataBuf: TWSABuf @@ -354,6 +366,7 @@ when defined(windows) or defined(nimdoc): proc send*(p: PDispatcher, socket: TSocketHandle, data: string): PFuture[int] = ## Sends ``data`` to ``socket``. The returned future will complete once all ## data has been sent. + verifyPresence(p, socket) var retFuture = newFuture[int]() var dataBuf: TWSABuf @@ -390,7 +403,9 @@ when defined(windows) or defined(nimdoc): ## Accepts a new connection. Returns a future containing the client socket ## corresponding to that connection and the remote address of the client. ## The future will complete when the connection is successfully accepted. - + ## + ## The resulting client socket is automatically registered to dispatcher. + verifyPresence(p, socket) var retFuture = newFuture[tuple[address: string, client: TSocketHandle]]() var clientSock = socket() @@ -416,6 +431,7 @@ when defined(windows) or defined(nimdoc): dwLocalAddressLength, dwRemoteAddressLength, addr LocalSockaddr, addr localLen, addr RemoteSockaddr, addr remoteLen) + p.register(clientSock) # TODO: IPv6. Check ``sa_family``. http://stackoverflow.com/a/9212542/492186 retFuture.complete( (address: $inet_ntoa(cast[ptr Tsockaddr_in](remoteSockAddr).sin_addr), @@ -452,6 +468,13 @@ when defined(windows) or defined(nimdoc): return retFuture + proc socket*(disp: PDispatcher, domain: TDomain = AF_INET, + typ: TType = SOCK_STREAM, + protocol: TProtocol = IPPROTO_TCP): TSocketHandle = + ## Creates a new socket and registers it with the dispatcher implicitly. + result = socket(domain, typ, protocol) + disp.register(result) + initAll() else: import selectors @@ -473,32 +496,35 @@ else: proc update(p: PDispatcher, sock: TSocketHandle, events: set[TEvent]) = assert sock in p.selector - if events == {}: - discard p.selector.unregister(sock) - else: - discard p.selector.update(sock, events) + discard p.selector.update(sock, events) + + proc register(p: PDispatcher, sock: TSocketHandle) = + var data = PData(sock: sock, readCBs: @[], writeCBs: @[]) + p.selector.register(sock, {}, data.PObject) + + proc socket*(disp: PDispatcher, domain: TDomain = AF_INET, + typ: TType = SOCK_STREAM, + protocol: TProtocol = IPPROTO_TCP): TSocketHandle = + result = socket(domain, typ, protocol) + disp.register(result) proc addRead(p: PDispatcher, sock: TSocketHandle, cb: TCallback) = if sock notin p.selector: - var data = PData(sock: sock, readCBs: @[cb], writeCBs: @[]) - p.selector.register(sock, {EvRead}, data.PObject) - else: - p.selector[sock].data.PData.readCBs.add(cb) - p.update(sock, p.selector[sock].events + {EvRead}) + raise newException(EInvalidValue, "File descriptor not registered.") + p.selector[sock].data.PData.readCBs.add(cb) + p.update(sock, p.selector[sock].events + {EvRead}) proc addWrite(p: PDispatcher, sock: TSocketHandle, cb: TCallback) = if sock notin p.selector: - var data = PData(sock: sock, readCBs: @[], writeCBs: @[cb]) - p.selector.register(sock, {EvWrite}, data.PObject) - else: - p.selector[sock].data.PData.writeCBs.add(cb) - p.update(sock, p.selector[sock].events + {EvWrite}) + raise newException(EInvalidValue, "File descriptor not registered.") + p.selector[sock].data.PData.writeCBs.add(cb) + p.update(sock, p.selector[sock].events + {EvWrite}) proc poll*(p: PDispatcher, timeout = 500) = for info in p.selector.select(timeout): let data = PData(info.key.data) assert data.sock == info.key.fd - + #echo("In poll ", data.sock.cint) if EvRead in info.events: # Callback may add items to ``data.readCBs`` which causes issues if # we are iterating over ``data.readCBs`` at the same time. We therefore @@ -517,11 +543,17 @@ else: if not cb(data.sock): # Callback wants to be called again. data.writeCBs.add(cb) - - var newEvents: set[TEvent] - if data.readCBs.len != 0: newEvents = {EvRead} - if data.writeCBs.len != 0: newEvents = newEvents + {EvWrite} - p.update(data.sock, newEvents) + + if info.key in p.selector: + var newEvents: set[TEvent] + if data.readCBs.len != 0: newEvents = {EvRead} + if data.writeCBs.len != 0: newEvents = newEvents + {EvWrite} + if newEvents != info.key.events: + echo(info.key.events, " -> ", newEvents) + p.update(data.sock, newEvents) + else: + # FD no longer a part of the selector. Likely been closed + # (e.g. socket disconnected). proc connect*(p: PDispatcher, socket: TSocketHandle, address: string, port: TPort, af = AF_INET): PFuture[int] = @@ -569,6 +601,7 @@ else: result = true let netSize = size - sizeRead let res = recv(sock, addr readBuffer[sizeRead], netSize, flags.cint) + #echo("recv cb res: ", res) if res < 0: let lastError = osLastError() if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: @@ -576,6 +609,7 @@ else: else: result = false # We still want this callback to be called. elif res == 0: + #echo("Disconnected recv: ", sizeRead) # Disconnected if sizeRead == 0: retFuture.complete("") @@ -588,6 +622,7 @@ else: result = false # We want to read all the data requested. else: retFuture.complete(readBuffer) + #echo("Recv cb result: ", result) addRead(p, socket, cb) return retFuture @@ -634,6 +669,7 @@ else: else: retFuture.fail(newException(EOS, osErrorMsg(lastError))) else: + p.register(client) retFuture.complete(($inet_ntoa(sockAddress.sin_addr), client)) addRead(p, socket, cb) return retFuture @@ -833,9 +869,13 @@ proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.} result = "" var c = "" while true: + #echo("1") c = await p.recv(socket, 1) + #echo("Received ", c.len) if c.len == 0: + #echo("returning") return + #echo("2") if c == "\r": c = await p.recv(socket, 1, MSG_PEEK) if c.len > 0 and c == "\L": @@ -845,7 +885,9 @@ proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.} elif c == "\L": addNLIfEmpty() return + #echo("3") add(result.string, c) + #echo("4") when isMainModule: @@ -854,11 +896,12 @@ when isMainModule: sock.setBlocking false - when true: + when false: # Await tests proc main(p: PDispatcher): PFuture[int] {.async.} = discard await p.connect(sock, "irc.freenode.net", TPort(6667)) while true: + echo("recvLine") var line = await p.recvLine(sock) echo("Line is: ", line.repr) if line == "": @@ -880,9 +923,9 @@ when isMainModule: else: - when true: + when false: - var f = p.connect(sock, "irc.freenode.org", TPort(6667)) + var f = p.connect(sock, "irc.poop.nl", TPort(6667)) f.callback = proc (future: PFuture[int]) = echo("Connected in future!") @@ -898,11 +941,13 @@ when isMainModule: sock.bindAddr(TPort(6667)) sock.listen() proc onAccept(future: PFuture[TSocketHandle]) = - echo "Accepted" - var t = p.send(future.read, "test\c\L") + let client = future.read + echo "Accepted ", client.cint + var t = p.send(client, "test\c\L") t.callback = proc (future: PFuture[int]) = - echo(future.read) + echo("Send: ", future.read) + client.close() var f = p.accept(sock) f.callback = onAccept diff --git a/lib/pure/os.nim b/lib/pure/os.nim index bfecc569a..faca17e98 100644 --- a/lib/pure/os.nim +++ b/lib/pure/os.nim @@ -260,11 +260,12 @@ proc osError*(errorCode: TOSErrorCode) = ## ## If the error code is ``0`` or an error message could not be retrieved, ## the message ``unknown OS error`` will be used. - let msg = osErrorMsg(errorCode) - if msg == "": - raise newException(EOS, "unknown OS error") - else: - raise newException(EOS, msg) + var e: ref EOS; new(e) + e.errorCode = errorCode.int32 + e.msg = osErrorMsg(errorCode) + if e.msg == "": + e.msg = "unknown OS error" + raise e {.push stackTrace:off.} proc osLastError*(): TOSErrorCode = diff --git a/lib/pure/selectors.nim b/lib/pure/selectors.nim index e086ee3ab..a113e3362 100644 --- a/lib/pure/selectors.nim +++ b/lib/pure/selectors.nim @@ -10,7 +10,6 @@ # TODO: Docs. import tables, os, unsigned, hashes -import sockets2 when defined(linux): import posix, epoll elif defined(windows): import winlean @@ -41,17 +40,14 @@ when defined(linux) or defined(nimdoc): result.events = EPOLLIN if EvWrite in events: result.events = result.events or EPOLLOUT + result.events = result.events or EPOLLRDHUP result.data.fd = fd.cint proc register*(s: PSelector, fd: TSocketHandle, events: set[TEvent], data: PObject): PSelectorKey {.discardable.} = ## Registers file descriptor ``fd`` to selector ``s`` with a set of TEvent ## ``events``. - if s.fds.hasKey(fd): - raise newException(EInvalidValue, "File descriptor already exists.") - var event = createEventStruct(events, fd) - if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fd, addr(event)) != 0: OSError(OSLastError()) @@ -63,30 +59,18 @@ when defined(linux) or defined(nimdoc): proc update*(s: PSelector, fd: TSocketHandle, events: set[TEvent]): PSelectorKey {.discardable.} = ## Updates the events which ``fd`` wants notifications for. - if not s.fds.hasKey(fd): - raise newException(EInvalidValue, "File descriptor not found.") - var event = createEventStruct(events, fd) - - s.fds[fd].events = events - if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0: - if OSLastError().cint == ENOENT: - # Socket has been closed. Epoll automatically removes disconnected - # sockets. - s.fds.del(fd) - osError("Socket has been disconnected") - - OSError(OSLastError()) - result = s.fds[fd] + if s.fds[fd].events != events: + echo("Update ", fd.cint, " to ", events) + var event = createEventStruct(events, fd) + + s.fds[fd].events = events + if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0: + OSError(OSLastError()) + result = s.fds[fd] proc unregister*(s: PSelector, fd: TSocketHandle): PSelectorKey {.discardable.} = - if not s.fds.hasKey(fd): - raise newException(EInvalidValue, "File descriptor not found.") if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fd, nil) != 0: - if osLastError().cint == ENOENT: - # Socket has been closed. Epoll automatically removes disconnected - # sockets so its already been removed. - else: - OSError(OSLastError()) + OSError(OSLastError()) result = s.fds[fd] s.fds.del(fd) @@ -113,6 +97,14 @@ when defined(linux) or defined(nimdoc): assert selectorKey != nil result.add((selectorKey, evSet)) + if (s.events[i].events and EPOLLHUP) != 0 or + (s.events[i].events and EPOLLRDHUP) != 0: + # fd closed + #echo("fd closed ", s.events[i].data.fd) + s.unregister(s.events[i].data.fd.TSocketHandle) + + #echo("Epoll: ", result[i].key.fd, " ", result[i].events, " ", result[i].key.events) + proc newSelector*(): PSelector = new result result.epollFD = epoll_create(64) @@ -123,7 +115,26 @@ when defined(linux) or defined(nimdoc): proc contains*(s: PSelector, fd: TSocketHandle): bool = ## Determines whether selector contains a file descriptor. - return s.fds.hasKey(fd) + if s.fds.hasKey(fd): + result = true + + # Ensure the underlying epoll instance still contains this fd. + var event = createEventStruct(s.fds[fd].events, fd) + if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0: + let err = osLastError() + if err.cint in {ENOENT, EBADF}: + return false + OSError(OSLastError()) + else: + return false + + proc contains*(s: PSelector, key: PSelectorKey): bool = + ## Determines whether selector contains this selector key. More accurate + ## than checking if the file descriptor is in the selector because it + ## ensures that the keys are equal. File descriptors may not always be + ## unique especially when an fd is closed and then a new one is opened, + ## the new one may have the same value. + return key.fd in s and s.fds[key.fd] == key proc `[]`*(s: PSelector, fd: TSocketHandle): PSelectorKey = ## Retrieves the selector key for ``fd``. |