diff options
-rw-r--r-- | compiler/ast.nim | 5 | ||||
-rw-r--r-- | compiler/msgs.nim | 8 | ||||
-rw-r--r-- | compiler/sem.nim | 18 | ||||
-rw-r--r-- | compiler/semdata.nim | 3 | ||||
-rw-r--r-- | lib/pure/asyncio2.nim | 111 | ||||
-rw-r--r-- | lib/pure/os.nim | 11 | ||||
-rw-r--r-- | lib/pure/selectors.nim | 65 | ||||
-rw-r--r-- | lib/system.nim | 1 | ||||
-rw-r--r-- | lib/windows/winlean.nim | 1 | ||||
-rw-r--r-- | tests/async/tasyncawait.nim | 13 |
10 files changed, 150 insertions, 86 deletions
diff --git a/compiler/ast.nim b/compiler/ast.nim index f9cab0d88..ee0d55920 100644 --- a/compiler/ast.nim +++ b/compiler/ast.nim @@ -1291,7 +1291,7 @@ proc skipTypes*(t: PType, kinds: TTypeKinds): PType = proc propagateToOwner*(owner, elem: PType) = const HaveTheirOwnEmpty = {tySequence, tySet} owner.flags = owner.flags + (elem.flags * {tfHasShared, tfHasMeta, - tfHasStatic, tfHasGCedMem}) + tfHasGCedMem}) if tfNotNil in elem.flags: if owner.kind in {tyGenericInst, tyGenericBody, tyGenericInvokation}: owner.flags.incl tfNotNil @@ -1308,9 +1308,6 @@ proc propagateToOwner*(owner, elem: PType) = if elem.kind in tyMetaTypes: owner.flags.incl tfHasMeta - if elem.kind == tyStatic: - owner.flags.incl tfHasStatic - if elem.kind in {tyString, tyRef, tySequence} or elem.kind == tyProc and elem.callConv == ccClosure: owner.flags.incl tfHasGCedMem diff --git a/compiler/msgs.nim b/compiler/msgs.nim index 5bc490d14..c75876843 100644 --- a/compiler/msgs.nim +++ b/compiler/msgs.nim @@ -719,19 +719,19 @@ proc handleError(msg: TMsgKind, eh: TErrorHandling, s: string) = writeStackTrace() quit 1 - if msg >= fatalMin and msg <= fatalMax: + if msg >= fatalMin and msg <= fatalMax: quit() - if msg >= errMin and msg <= errMax: + if msg >= errMin and msg <= errMax: inc(gErrorCounter) options.gExitcode = 1'i8 - if gErrorCounter >= gErrorMax: + if gErrorCounter >= gErrorMax: quit() elif eh == doAbort and gCmd != cmdIdeTools: quit() elif eh == doRaise: raiseRecoverableError(s) -proc `==`*(a, b: TLineInfo): bool = +proc `==`*(a, b: TLineInfo): bool = result = a.line == b.line and a.fileIndex == b.fileIndex proc writeContext(lastinfo: TLineInfo) = diff --git a/compiler/sem.nim b/compiler/sem.nim index c8228618b..093fc9452 100644 --- a/compiler/sem.nim +++ b/compiler/sem.nim @@ -219,14 +219,26 @@ proc tryConstExpr(c: PContext, n: PNode): PNode = result = getConstExpr(c.module, e) if result != nil: return + let oldErrorCount = msgs.gErrorCounter + let oldErrorMax = msgs.gErrorMax + let oldErrorOutputs = errorOutputs + + errorOutputs = {} + msgs.gErrorMax = high(int) + try: result = evalConstExpr(c.module, e) if result == nil or result.kind == nkEmpty: - return nil + result = nil + else: + result = fixupTypeAfterEval(c, result, e) - result = fixupTypeAfterEval(c, result, e) except ERecoverableError: - return nil + result = nil + + msgs.gErrorCounter = oldErrorCount + msgs.gErrorMax = oldErrorMax + errorOutputs = oldErrorOutputs proc semConstExpr(c: PContext, n: PNode): PNode = var e = semExprWithType(c, n) diff --git a/compiler/semdata.nim b/compiler/semdata.nim index d942aa41e..84e017050 100644 --- a/compiler/semdata.nim +++ b/compiler/semdata.nim @@ -236,17 +236,20 @@ proc makeAndType*(c: PContext, t1, t2: PType): PType = result.sons = @[t1, t2] propagateToOwner(result, t1) propagateToOwner(result, t2) + result.flags.incl((t1.flags + t2.flags) * {tfHasStatic}) proc makeOrType*(c: PContext, t1, t2: PType): PType = result = newTypeS(tyOr, c) result.sons = @[t1, t2] propagateToOwner(result, t1) propagateToOwner(result, t2) + result.flags.incl((t1.flags + t2.flags) * {tfHasStatic}) proc makeNotType*(c: PContext, t1: PType): PType = result = newTypeS(tyNot, c) result.sons = @[t1] propagateToOwner(result, t1) + result.flags.incl(t1.flags * {tfHasStatic}) proc newTypeS(kind: TTypeKind, c: PContext): PType = result = newType(kind, getCurrOwner()) diff --git a/lib/pure/asyncio2.nim b/lib/pure/asyncio2.nim index 60d489dda..c37370b7b 100644 --- a/lib/pure/asyncio2.nim +++ b/lib/pure/asyncio2.nim @@ -94,7 +94,8 @@ proc failed*[T](future: PFuture[T]): bool = # TODO: Get rid of register. Do it implicitly. when defined(windows) or defined(nimdoc): - import winlean + import winlean, sets, hashes + #from hashes import THash type TCompletionKey = dword @@ -105,7 +106,7 @@ when defined(windows) or defined(nimdoc): PDispatcher* = ref object ioPort: THandle - hasHandles: bool + handles: TSet[TSocketHandle] TCustomOverlapped = object Internal*: DWORD @@ -117,21 +118,32 @@ when defined(windows) or defined(nimdoc): PCustomOverlapped = ptr TCustomOverlapped + proc hash(x: TSocketHandle): THash {.borrow.} + proc newDispatcher*(): PDispatcher = ## Creates a new Dispatcher instance. new result result.ioPort = CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1) + result.handles = initSet[TSocketHandle]() proc register*(p: PDispatcher, sock: TSocketHandle) = ## Registers ``sock`` with the dispatcher ``p``. if CreateIOCompletionPort(sock.THandle, p.ioPort, cast[TCompletionKey](sock), 1) == 0: OSError(OSLastError()) - p.hasHandles = true + p.handles.incl(sock) + # TODO: fd closure detection, we need to remove the fd from handles set + + proc verifyPresence(p: PDispatcher, sock: TSocketHandle) = + ## Ensures that socket has been registered with the dispatcher. + if sock notin p.handles: + raise newException(EInvalidValue, + "Operation performed on a socket which has not been registered with" & + " the dispatcher yet.") proc poll*(p: PDispatcher, timeout = 500) = ## Waits for completion events and processes them. - if not p.hasHandles: + if p.handles.len == 0: raise newException(EInvalidValue, "No handles registered in dispatcher.") let llTimeout = @@ -237,7 +249,7 @@ when defined(windows) or defined(nimdoc): ## ## Returns a ``PFuture`` which will complete when the connection succeeds ## or an error occurs. - + verifyPresence(p, socket) var retFuture = newFuture[int]()# TODO: Change to void when that regression is fixed. # Apparently ``ConnectEx`` expects the socket to be initially bound: var saddr: Tsockaddr_in @@ -298,7 +310,7 @@ when defined(windows) or defined(nimdoc): ## recv operation then the future may complete with only a part of the ## requested data read. If socket is disconnected and no data is available ## to be read then the future will complete with a value of ``""``. - + verifyPresence(p, socket) var retFuture = newFuture[string]() var dataBuf: TWSABuf @@ -354,6 +366,7 @@ when defined(windows) or defined(nimdoc): proc send*(p: PDispatcher, socket: TSocketHandle, data: string): PFuture[int] = ## Sends ``data`` to ``socket``. The returned future will complete once all ## data has been sent. + verifyPresence(p, socket) var retFuture = newFuture[int]() var dataBuf: TWSABuf @@ -390,7 +403,9 @@ when defined(windows) or defined(nimdoc): ## Accepts a new connection. Returns a future containing the client socket ## corresponding to that connection and the remote address of the client. ## The future will complete when the connection is successfully accepted. - + ## + ## The resulting client socket is automatically registered to dispatcher. + verifyPresence(p, socket) var retFuture = newFuture[tuple[address: string, client: TSocketHandle]]() var clientSock = socket() @@ -416,6 +431,7 @@ when defined(windows) or defined(nimdoc): dwLocalAddressLength, dwRemoteAddressLength, addr LocalSockaddr, addr localLen, addr RemoteSockaddr, addr remoteLen) + p.register(clientSock) # TODO: IPv6. Check ``sa_family``. http://stackoverflow.com/a/9212542/492186 retFuture.complete( (address: $inet_ntoa(cast[ptr Tsockaddr_in](remoteSockAddr).sin_addr), @@ -452,6 +468,13 @@ when defined(windows) or defined(nimdoc): return retFuture + proc socket*(disp: PDispatcher, domain: TDomain = AF_INET, + typ: TType = SOCK_STREAM, + protocol: TProtocol = IPPROTO_TCP): TSocketHandle = + ## Creates a new socket and registers it with the dispatcher implicitly. + result = socket(domain, typ, protocol) + disp.register(result) + initAll() else: import selectors @@ -473,32 +496,35 @@ else: proc update(p: PDispatcher, sock: TSocketHandle, events: set[TEvent]) = assert sock in p.selector - if events == {}: - discard p.selector.unregister(sock) - else: - discard p.selector.update(sock, events) + discard p.selector.update(sock, events) + + proc register(p: PDispatcher, sock: TSocketHandle) = + var data = PData(sock: sock, readCBs: @[], writeCBs: @[]) + p.selector.register(sock, {}, data.PObject) + + proc socket*(disp: PDispatcher, domain: TDomain = AF_INET, + typ: TType = SOCK_STREAM, + protocol: TProtocol = IPPROTO_TCP): TSocketHandle = + result = socket(domain, typ, protocol) + disp.register(result) proc addRead(p: PDispatcher, sock: TSocketHandle, cb: TCallback) = if sock notin p.selector: - var data = PData(sock: sock, readCBs: @[cb], writeCBs: @[]) - p.selector.register(sock, {EvRead}, data.PObject) - else: - p.selector[sock].data.PData.readCBs.add(cb) - p.update(sock, p.selector[sock].events + {EvRead}) + raise newException(EInvalidValue, "File descriptor not registered.") + p.selector[sock].data.PData.readCBs.add(cb) + p.update(sock, p.selector[sock].events + {EvRead}) proc addWrite(p: PDispatcher, sock: TSocketHandle, cb: TCallback) = if sock notin p.selector: - var data = PData(sock: sock, readCBs: @[], writeCBs: @[cb]) - p.selector.register(sock, {EvWrite}, data.PObject) - else: - p.selector[sock].data.PData.writeCBs.add(cb) - p.update(sock, p.selector[sock].events + {EvWrite}) + raise newException(EInvalidValue, "File descriptor not registered.") + p.selector[sock].data.PData.writeCBs.add(cb) + p.update(sock, p.selector[sock].events + {EvWrite}) proc poll*(p: PDispatcher, timeout = 500) = for info in p.selector.select(timeout): let data = PData(info.key.data) assert data.sock == info.key.fd - + #echo("In poll ", data.sock.cint) if EvRead in info.events: # Callback may add items to ``data.readCBs`` which causes issues if # we are iterating over ``data.readCBs`` at the same time. We therefore @@ -517,11 +543,17 @@ else: if not cb(data.sock): # Callback wants to be called again. data.writeCBs.add(cb) - - var newEvents: set[TEvent] - if data.readCBs.len != 0: newEvents = {EvRead} - if data.writeCBs.len != 0: newEvents = newEvents + {EvWrite} - p.update(data.sock, newEvents) + + if info.key in p.selector: + var newEvents: set[TEvent] + if data.readCBs.len != 0: newEvents = {EvRead} + if data.writeCBs.len != 0: newEvents = newEvents + {EvWrite} + if newEvents != info.key.events: + echo(info.key.events, " -> ", newEvents) + p.update(data.sock, newEvents) + else: + # FD no longer a part of the selector. Likely been closed + # (e.g. socket disconnected). proc connect*(p: PDispatcher, socket: TSocketHandle, address: string, port: TPort, af = AF_INET): PFuture[int] = @@ -569,6 +601,7 @@ else: result = true let netSize = size - sizeRead let res = recv(sock, addr readBuffer[sizeRead], netSize, flags.cint) + #echo("recv cb res: ", res) if res < 0: let lastError = osLastError() if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: @@ -576,6 +609,7 @@ else: else: result = false # We still want this callback to be called. elif res == 0: + #echo("Disconnected recv: ", sizeRead) # Disconnected if sizeRead == 0: retFuture.complete("") @@ -588,6 +622,7 @@ else: result = false # We want to read all the data requested. else: retFuture.complete(readBuffer) + #echo("Recv cb result: ", result) addRead(p, socket, cb) return retFuture @@ -634,6 +669,7 @@ else: else: retFuture.fail(newException(EOS, osErrorMsg(lastError))) else: + p.register(client) retFuture.complete(($inet_ntoa(sockAddress.sin_addr), client)) addRead(p, socket, cb) return retFuture @@ -833,9 +869,13 @@ proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.} result = "" var c = "" while true: + #echo("1") c = await p.recv(socket, 1) + #echo("Received ", c.len) if c.len == 0: + #echo("returning") return + #echo("2") if c == "\r": c = await p.recv(socket, 1, MSG_PEEK) if c.len > 0 and c == "\L": @@ -845,7 +885,9 @@ proc recvLine*(p: PDispatcher, socket: TSocketHandle): PFuture[string] {.async.} elif c == "\L": addNLIfEmpty() return + #echo("3") add(result.string, c) + #echo("4") when isMainModule: @@ -854,11 +896,12 @@ when isMainModule: sock.setBlocking false - when true: + when false: # Await tests proc main(p: PDispatcher): PFuture[int] {.async.} = discard await p.connect(sock, "irc.freenode.net", TPort(6667)) while true: + echo("recvLine") var line = await p.recvLine(sock) echo("Line is: ", line.repr) if line == "": @@ -880,9 +923,9 @@ when isMainModule: else: - when true: + when false: - var f = p.connect(sock, "irc.freenode.org", TPort(6667)) + var f = p.connect(sock, "irc.poop.nl", TPort(6667)) f.callback = proc (future: PFuture[int]) = echo("Connected in future!") @@ -898,11 +941,13 @@ when isMainModule: sock.bindAddr(TPort(6667)) sock.listen() proc onAccept(future: PFuture[TSocketHandle]) = - echo "Accepted" - var t = p.send(future.read, "test\c\L") + let client = future.read + echo "Accepted ", client.cint + var t = p.send(client, "test\c\L") t.callback = proc (future: PFuture[int]) = - echo(future.read) + echo("Send: ", future.read) + client.close() var f = p.accept(sock) f.callback = onAccept diff --git a/lib/pure/os.nim b/lib/pure/os.nim index bfecc569a..faca17e98 100644 --- a/lib/pure/os.nim +++ b/lib/pure/os.nim @@ -260,11 +260,12 @@ proc osError*(errorCode: TOSErrorCode) = ## ## If the error code is ``0`` or an error message could not be retrieved, ## the message ``unknown OS error`` will be used. - let msg = osErrorMsg(errorCode) - if msg == "": - raise newException(EOS, "unknown OS error") - else: - raise newException(EOS, msg) + var e: ref EOS; new(e) + e.errorCode = errorCode.int32 + e.msg = osErrorMsg(errorCode) + if e.msg == "": + e.msg = "unknown OS error" + raise e {.push stackTrace:off.} proc osLastError*(): TOSErrorCode = diff --git a/lib/pure/selectors.nim b/lib/pure/selectors.nim index e086ee3ab..a113e3362 100644 --- a/lib/pure/selectors.nim +++ b/lib/pure/selectors.nim @@ -10,7 +10,6 @@ # TODO: Docs. import tables, os, unsigned, hashes -import sockets2 when defined(linux): import posix, epoll elif defined(windows): import winlean @@ -41,17 +40,14 @@ when defined(linux) or defined(nimdoc): result.events = EPOLLIN if EvWrite in events: result.events = result.events or EPOLLOUT + result.events = result.events or EPOLLRDHUP result.data.fd = fd.cint proc register*(s: PSelector, fd: TSocketHandle, events: set[TEvent], data: PObject): PSelectorKey {.discardable.} = ## Registers file descriptor ``fd`` to selector ``s`` with a set of TEvent ## ``events``. - if s.fds.hasKey(fd): - raise newException(EInvalidValue, "File descriptor already exists.") - var event = createEventStruct(events, fd) - if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fd, addr(event)) != 0: OSError(OSLastError()) @@ -63,30 +59,18 @@ when defined(linux) or defined(nimdoc): proc update*(s: PSelector, fd: TSocketHandle, events: set[TEvent]): PSelectorKey {.discardable.} = ## Updates the events which ``fd`` wants notifications for. - if not s.fds.hasKey(fd): - raise newException(EInvalidValue, "File descriptor not found.") - var event = createEventStruct(events, fd) - - s.fds[fd].events = events - if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0: - if OSLastError().cint == ENOENT: - # Socket has been closed. Epoll automatically removes disconnected - # sockets. - s.fds.del(fd) - osError("Socket has been disconnected") - - OSError(OSLastError()) - result = s.fds[fd] + if s.fds[fd].events != events: + echo("Update ", fd.cint, " to ", events) + var event = createEventStruct(events, fd) + + s.fds[fd].events = events + if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0: + OSError(OSLastError()) + result = s.fds[fd] proc unregister*(s: PSelector, fd: TSocketHandle): PSelectorKey {.discardable.} = - if not s.fds.hasKey(fd): - raise newException(EInvalidValue, "File descriptor not found.") if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fd, nil) != 0: - if osLastError().cint == ENOENT: - # Socket has been closed. Epoll automatically removes disconnected - # sockets so its already been removed. - else: - OSError(OSLastError()) + OSError(OSLastError()) result = s.fds[fd] s.fds.del(fd) @@ -113,6 +97,14 @@ when defined(linux) or defined(nimdoc): assert selectorKey != nil result.add((selectorKey, evSet)) + if (s.events[i].events and EPOLLHUP) != 0 or + (s.events[i].events and EPOLLRDHUP) != 0: + # fd closed + #echo("fd closed ", s.events[i].data.fd) + s.unregister(s.events[i].data.fd.TSocketHandle) + + #echo("Epoll: ", result[i].key.fd, " ", result[i].events, " ", result[i].key.events) + proc newSelector*(): PSelector = new result result.epollFD = epoll_create(64) @@ -123,7 +115,26 @@ when defined(linux) or defined(nimdoc): proc contains*(s: PSelector, fd: TSocketHandle): bool = ## Determines whether selector contains a file descriptor. - return s.fds.hasKey(fd) + if s.fds.hasKey(fd): + result = true + + # Ensure the underlying epoll instance still contains this fd. + var event = createEventStruct(s.fds[fd].events, fd) + if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0: + let err = osLastError() + if err.cint in {ENOENT, EBADF}: + return false + OSError(OSLastError()) + else: + return false + + proc contains*(s: PSelector, key: PSelectorKey): bool = + ## Determines whether selector contains this selector key. More accurate + ## than checking if the file descriptor is in the selector because it + ## ensures that the keys are equal. File descriptors may not always be + ## unique especially when an fd is closed and then a new one is opened, + ## the new one may have the same value. + return key.fd in s and s.fds[key.fd] == key proc `[]`*(s: PSelector, fd: TSocketHandle): PSelectorKey = ## Retrieves the selector key for ``fd``. diff --git a/lib/system.nim b/lib/system.nim index 24ad50f97..41624bb05 100644 --- a/lib/system.nim +++ b/lib/system.nim @@ -260,6 +260,7 @@ type ## system raises. EIO* = object of ESystem ## raised if an IO error occured. EOS* = object of ESystem ## raised if an operating system service failed. + errorCode*: int32 ## OS-defined error code describing this error. EInvalidLibrary* = object of EOS ## raised if a dynamic library ## could not be loaded. EResourceExhausted* = object of ESystem ## raised if a resource request diff --git a/lib/windows/winlean.nim b/lib/windows/winlean.nim index 74ef9c9ec..4d87cf4b2 100644 --- a/lib/windows/winlean.nim +++ b/lib/windows/winlean.nim @@ -456,6 +456,7 @@ var SO_DONTLINGER* {.importc, header: "Winsock2.h".}: cint SO_EXCLUSIVEADDRUSE* {.importc, header: "Winsock2.h".}: cint # disallow local address reuse + SO_ERROR* {.importc, header: "Winsock2.h".}: cint proc `==`*(x, y: TSocketHandle): bool {.borrow.} diff --git a/tests/async/tasyncawait.nim b/tests/async/tasyncawait.nim index 9e5d270c3..fea0783a0 100644 --- a/tests/async/tasyncawait.nim +++ b/tests/async/tasyncawait.nim @@ -15,24 +15,17 @@ const var clientCount = 0 proc sendMessages(disp: PDispatcher, client: TSocketHandle): PFuture[int] {.async.} = - echo("entering sendMessages") for i in 0 .. <messagesToSend: discard await disp.send(client, "Message " & $i & "\c\L") - echo("returning sendMessages") proc launchSwarm(disp: PDispatcher, port: TPort): PFuture[int] {.async.} = for i in 0 .. <swarmSize: - var sock = socket() - # TODO: We may need to explicitly register and unregister the fd. - # This is because when the socket is closed, selectors is not aware - # that it has been closed. While epoll is. Perhaps we should just unregister - # in close()? - echo(sock.cint) + var sock = disp.socket() + #disp.register(sock) discard await disp.connect(sock, "localhost", port) when true: discard await sendMessages(disp, sock) - echo("Calling close") sock.close() else: # Issue #932: https://github.com/Araq/Nimrod/issues/932 @@ -55,7 +48,7 @@ proc readMessages(disp: PDispatcher, client: TSocketHandle): PFuture[int] {.asyn doAssert false proc createServer(disp: PDispatcher, port: TPort): PFuture[int] {.async.} = - var server = socket() + var server = disp.socket() #disp.register(server) server.bindAddr(port) server.listen() |