From 72b4912c84b16644657f94e54105739cba4b2457 Mon Sep 17 00:00:00 2001 From: Dominik Picheta Date: Fri, 24 Apr 2015 17:56:04 +0100 Subject: Introduce FutureVar[T] to make recvLineInto safer. FutureVar[T] is a new distinct Future type which is designed to be used for situations where the highest performance is needed. It reduces the number of Future allocations needed. It acts as a replacement for 'var' params in async procs. This commit modifies @def-'s PR in order to make it safer. The recvLineInto procedure has been modified to take a ``FutureVar[string]`` param instead of a ``ptr string`` param. --- lib/pure/asyncdispatch.nim | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) (limited to 'lib/pure/asyncdispatch.nim') diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index 27f77cef2..bedbb9510 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -145,6 +145,8 @@ type Future*[T] = ref object of FutureBase ## Typed future. value: T ## Stored value + FutureVar*[T] = distinct Future[T] + {.deprecated: [PFutureBase: FutureBase, PFuture: Future].} @@ -162,6 +164,19 @@ proc newFuture*[T](fromProc: string = "unspecified"): Future[T] = result.fromProc = fromProc currentID.inc() +proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] = + ## Create a new ``FutureVar``. This Future type is ideally suited for + ## situations where you want to avoid unnecessary allocations of Futures. + ## + ## Specifying ``fromProc``, which is a string specifying the name of the proc + ## that this future belongs to, is a good habit as it helps with debugging. + result = FutureVar[T](newFuture[T](fromProc)) + +proc clean*[T](future: FutureVar[T]) = + ## Resets the ``finished`` status of ``future``. + Future[T](future).finished = false + Future[T](future).error = nil + proc checkFinished[T](future: Future[T]) = when not defined(release): if future.finished: @@ -194,6 +209,15 @@ proc complete*(future: Future[void]) = if future.cb != nil: future.cb() +proc complete*[T](future: FutureVar[T]) = + ## Completes a ``FutureVar``. + template fut: expr = Future[T](future) + checkFinished(fut) + assert(fut.error == nil) + fut.finished = true + if fut.cb != nil: + fut.cb() + proc fail*[T](future: Future[T], error: ref Exception) = ## Completes ``future`` with ``error``. #assert(not future.finished, "Future already finished, cannot finish twice.") @@ -264,6 +288,13 @@ proc readError*[T](future: Future[T]): ref Exception = else: raise newException(ValueError, "No error in future.") +proc mget*[T](future: FutureVar[T]): var T = + ## Returns a mutable value stored in ``future``. + ## + ## Unlike ``read``, this function will not raise an exception if the + ## Future has not been finished. + result = Future[T](future).value + proc finished*[T](future: Future[T]): bool = ## Determines whether ``future`` has completed. ## -- cgit 1.4.1-2-gfad0 From f4c1c252a7b2d5233eda5abb81049d8af774cdc3 Mon Sep 17 00:00:00 2001 From: Dominik Picheta Date: Fri, 24 Apr 2015 18:12:13 +0100 Subject: Fix asyncdispatch on Windows. --- lib/pure/asyncdispatch.nim | 111 ++++++++++++++++++++++++++++++++++++++++++++ lib/pure/asyncnet.nim | 112 --------------------------------------------- 2 files changed, 111 insertions(+), 112 deletions(-) (limited to 'lib/pure/asyncdispatch.nim') diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index bedbb9510..bec2632d5 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -665,6 +665,93 @@ when defined(windows) or defined(nimdoc): # free ``ol``. return retFuture + proc recvInto*(socket: TAsyncFD, buf: cstring, size: int, + flags = {SocketFlag.SafeDisconn}): Future[int] = + ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``, which must + ## at least be of that size. Returned future will complete once all the + ## data requested is read, a part of the data has been read, or the socket + ## has disconnected in which case the future will complete with a value of + ## ``0``. + ## + ## **Warning**: The ``Peek`` socket flag is not supported on Windows. + + + # Things to note: + # * When WSARecv completes immediately then ``bytesReceived`` is very + # unreliable. + # * Still need to implement message-oriented socket disconnection, + # '\0' in the message currently signifies a socket disconnect. Who + # knows what will happen when someone sends that to our socket. + verifyPresence(socket) + assert SocketFlag.Peek notin flags, "Peek not supported on Windows." + + var retFuture = newFuture[int]("recvInto") + + #buf[] = '\0' + var dataBuf: TWSABuf + dataBuf.buf = buf + dataBuf.len = size + + var bytesReceived: Dword + var flagsio = flags.toOSFlags().Dword + var ol = PCustomOverlapped() + GC_ref(ol) + ol.data = TCompletionData(fd: socket, cb: + proc (fd: TAsyncFD, bytesCount: Dword, errcode: OSErrorCode) = + if not retFuture.finished: + if errcode == OSErrorCode(-1): + if bytesCount == 0 and dataBuf.buf[0] == '\0': + retFuture.complete(0) + else: + retFuture.complete(bytesCount) + else: + if flags.isDisconnectionError(errcode): + retFuture.complete(0) + else: + retFuture.fail(newException(OSError, osErrorMsg(errcode))) + if dataBuf.buf != nil: + dataBuf.buf = nil + ) + + let ret = WSARecv(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived, + addr flagsio, cast[POVERLAPPED](ol), nil) + if ret == -1: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + if dataBuf.buf != nil: + dataBuf.buf = nil + GC_unref(ol) + if flags.isDisconnectionError(err): + retFuture.complete(0) + else: + retFuture.fail(newException(OSError, osErrorMsg(err))) + elif ret == 0 and bytesReceived == 0 and dataBuf.buf[0] == '\0': + # We have to ensure that the buffer is empty because WSARecv will tell + # us immediately when it was disconnected, even when there is still + # data in the buffer. + # We want to give the user as much data as we can. So we only return + # the empty string (which signals a disconnection) when there is + # nothing left to read. + retFuture.complete(0) + # TODO: "For message-oriented sockets, where a zero byte message is often + # allowable, a failure with an error code of WSAEDISCON is used to + # indicate graceful closure." + # ~ http://msdn.microsoft.com/en-us/library/ms741688%28v=vs.85%29.aspx + else: + # Request to read completed immediately. + # From my tests bytesReceived isn't reliable. + let realSize = + if bytesReceived == 0: + size + else: + bytesReceived + assert realSize <= size + retFuture.complete(realSize) + # We don't deallocate ``ol`` here because even though this completed + # immediately poll will still be notified about its completion and it will + # free ``ol``. + return retFuture + proc send*(socket: TAsyncFD, data: string, flags = {SocketFlag.SafeDisconn}): Future[void] = ## Sends ``data`` to ``socket``. The returned future will complete once all @@ -1014,6 +1101,30 @@ else: addRead(socket, cb) return retFuture + proc recvInto*(socket: TAsyncFD, buf: cstring, size: int, + flags = {SocketFlag.SafeDisconn}): Future[int] = + var retFuture = newFuture[int]("recvInto") + + proc cb(sock: TAsyncFD): bool = + result = true + let res = recv(sock.SocketHandle, buf, size.cint, + flags.toOSFlags()) + if res < 0: + let lastError = osLastError() + if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: + if flags.isDisconnectionError(lastError): + retFuture.complete(0) + else: + retFuture.fail(newException(OSError, osErrorMsg(lastError))) + else: + result = false # We still want this callback to be called. + else: + retFuture.complete(res) + # TODO: The following causes a massive slowdown. + #if not cb(socket): + addRead(socket, cb) + return retFuture + proc send*(socket: TAsyncFD, data: string, flags = {SocketFlag.SafeDisconn}): Future[void] = var retFuture = newFuture[void]("send") diff --git a/lib/pure/asyncnet.nim b/lib/pure/asyncnet.nim index a91bcf87a..62e85042f 100644 --- a/lib/pure/asyncnet.nim +++ b/lib/pure/asyncnet.nim @@ -182,118 +182,6 @@ proc connect*(socket: AsyncSocket, address: string, port: Port, sslSetConnectState(socket.sslHandle) sslLoop(socket, flags, sslDoHandshake(socket.sslHandle)) -when defined(windows): - proc recvInto(socket: TAsyncFD, buf: cstring, size: int, - flags = {SocketFlag.SafeDisconn}): Future[int] = - ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``, which must - ## at least be of that size. Returned future will complete once all the - ## data requested is read, a part of the data has been read, or the socket - ## has disconnected in which case the future will complete with a value of - ## ``0``. - ## - ## **Warning**: The ``Peek`` socket flag is not supported on Windows. - - - # Things to note: - # * When WSARecv completes immediately then ``bytesReceived`` is very - # unreliable. - # * Still need to implement message-oriented socket disconnection, - # '\0' in the message currently signifies a socket disconnect. Who - # knows what will happen when someone sends that to our socket. - verifyPresence(socket) - assert SocketFlag.Peek notin flags, "Peek not supported on Windows." - - var retFuture = newFuture[int]("recvInto") - - buf[0] = '\0' - var dataBuf: TWSABuf - dataBuf.buf = buf - dataBuf.len = size - - var bytesReceived: Dword - var flagsio = flags.toOSFlags().Dword - var ol = PCustomOverlapped() - GC_ref(ol) - ol.data = TCompletionData(fd: socket, cb: - proc (fd: TAsyncFD, bytesCount: Dword, errcode: OSErrorCode) = - if not retFuture.finished: - if errcode == OSErrorCode(-1): - if bytesCount == 0 and dataBuf.buf[0] == '\0': - retFuture.complete(0) - else: - retFuture.complete(bytesCount) - else: - if flags.isDisconnectionError(errcode): - retFuture.complete(0) - else: - retFuture.fail(newException(OSError, osErrorMsg(errcode))) - if dataBuf.buf != nil: - dataBuf.buf = nil - ) - - let ret = WSARecv(socket.SocketHandle, addr dataBuf, 1, addr bytesReceived, - addr flagsio, cast[POVERLAPPED](ol), nil) - if ret == -1: - let err = osLastError() - if err.int32 != ERROR_IO_PENDING: - if dataBuf.buf != nil: - dataBuf.buf = nil - GC_unref(ol) - if flags.isDisconnectionError(err): - retFuture.complete(0) - else: - retFuture.fail(newException(OSError, osErrorMsg(err))) - elif ret == 0 and bytesReceived == 0 and dataBuf.buf[0] == '\0': - # We have to ensure that the buffer is empty because WSARecv will tell - # us immediately when it was disconnected, even when there is still - # data in the buffer. - # We want to give the user as much data as we can. So we only return - # the empty string (which signals a disconnection) when there is - # nothing left to read. - retFuture.complete(0) - # TODO: "For message-oriented sockets, where a zero byte message is often - # allowable, a failure with an error code of WSAEDISCON is used to - # indicate graceful closure." - # ~ http://msdn.microsoft.com/en-us/library/ms741688%28v=vs.85%29.aspx - else: - # Request to read completed immediately. - # From my tests bytesReceived isn't reliable. - let realSize = - if bytesReceived == 0: - size - else: - bytesReceived - assert realSize <= size - retFuture.complete(realSize) - # We don't deallocate ``ol`` here because even though this completed - # immediately poll will still be notified about its completion and it will - # free ``ol``. - return retFuture -else: - proc recvInto(socket: TAsyncFD, buf: cstring, size: int, - flags = {SocketFlag.SafeDisconn}): Future[int] = - var retFuture = newFuture[int]("recvInto") - - proc cb(sock: TAsyncFD): bool = - result = true - let res = recv(sock.SocketHandle, buf, size.cint, - flags.toOSFlags()) - if res < 0: - let lastError = osLastError() - if lastError.int32 notin {EINTR, EWOULDBLOCK, EAGAIN}: - if flags.isDisconnectionError(lastError): - retFuture.complete(0) - else: - retFuture.fail(newException(OSError, osErrorMsg(lastError))) - else: - result = false # We still want this callback to be called. - else: - retFuture.complete(res) - # TODO: The following causes a massive slowdown. - #if not cb(socket): - addRead(socket, cb) - return retFuture - template readInto(buf: cstring, size: int, socket: AsyncSocket, flags: set[SocketFlag]): int = ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``. Note that -- cgit 1.4.1-2-gfad0 From 84315c6a9ce383620a49620cda98ad1158f92010 Mon Sep 17 00:00:00 2001 From: def Date: Sun, 26 Apr 2015 17:01:04 +0200 Subject: Revert "Introduce FutureVar[T] to make recvLineInto safer." This reverts commit 72b4912c84b16644657f94e54105739cba4b2457. --- lib/pure/asyncdispatch.nim | 31 ------------------------------- lib/pure/asynchttpserver.nim | 25 +++++++++++-------------- lib/pure/asyncnet.nim | 38 +++++++++----------------------------- 3 files changed, 20 insertions(+), 74 deletions(-) (limited to 'lib/pure/asyncdispatch.nim') diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index bec2632d5..a4d7a1632 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -145,8 +145,6 @@ type Future*[T] = ref object of FutureBase ## Typed future. value: T ## Stored value - FutureVar*[T] = distinct Future[T] - {.deprecated: [PFutureBase: FutureBase, PFuture: Future].} @@ -164,19 +162,6 @@ proc newFuture*[T](fromProc: string = "unspecified"): Future[T] = result.fromProc = fromProc currentID.inc() -proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] = - ## Create a new ``FutureVar``. This Future type is ideally suited for - ## situations where you want to avoid unnecessary allocations of Futures. - ## - ## Specifying ``fromProc``, which is a string specifying the name of the proc - ## that this future belongs to, is a good habit as it helps with debugging. - result = FutureVar[T](newFuture[T](fromProc)) - -proc clean*[T](future: FutureVar[T]) = - ## Resets the ``finished`` status of ``future``. - Future[T](future).finished = false - Future[T](future).error = nil - proc checkFinished[T](future: Future[T]) = when not defined(release): if future.finished: @@ -209,15 +194,6 @@ proc complete*(future: Future[void]) = if future.cb != nil: future.cb() -proc complete*[T](future: FutureVar[T]) = - ## Completes a ``FutureVar``. - template fut: expr = Future[T](future) - checkFinished(fut) - assert(fut.error == nil) - fut.finished = true - if fut.cb != nil: - fut.cb() - proc fail*[T](future: Future[T], error: ref Exception) = ## Completes ``future`` with ``error``. #assert(not future.finished, "Future already finished, cannot finish twice.") @@ -288,13 +264,6 @@ proc readError*[T](future: Future[T]): ref Exception = else: raise newException(ValueError, "No error in future.") -proc mget*[T](future: FutureVar[T]): var T = - ## Returns a mutable value stored in ``future``. - ## - ## Unlike ``read``, this function will not raise an exception if the - ## Future has not been finished. - result = Future[T](future).value - proc finished*[T](future: Future[T]): bool = ## Determines whether ``future`` has completed. ## diff --git a/lib/pure/asynchttpserver.nim b/lib/pure/asynchttpserver.nim index 74e9e9f36..279cedb5d 100644 --- a/lib/pure/asynchttpserver.nim +++ b/lib/pure/asynchttpserver.nim @@ -148,8 +148,7 @@ proc processClient(client: AsyncSocket, address: string, var request: Request request.url = initUri() request.headers = newStringTable(modeCaseInsensitive) - var lineFut = newFutureVar[string]("asynchttpserver.processClient") - lineFut.mget() = newStringOfCap(80) + var line = newStringOfCap(80) var key, value = "" while not client.isClosed: @@ -162,15 +161,14 @@ proc processClient(client: AsyncSocket, address: string, request.client = client # First line - GET /path HTTP/1.1 - lineFut.mget().setLen(0) - lineFut.clean() - await client.recvLineInto(lineFut) # TODO: Timeouts. - if lineFut.mget == "": + line.setLen(0) + await client.recvLineInto(addr line) # TODO: Timeouts. + if line == "": client.close() return var i = 0 - for linePart in lineFut.mget.split(' '): + for linePart in line.split(' '): case i of 0: request.reqMethod.shallowCopy(linePart.normalize) of 1: parseUri(linePart, request.url) @@ -182,21 +180,20 @@ proc processClient(client: AsyncSocket, address: string, "Invalid request protocol. Got: " & linePart) continue else: - await request.respond(Http400, "Invalid request. Got: " & lineFut.mget) + await request.respond(Http400, "Invalid request. Got: " & line) continue inc i # Headers while true: i = 0 - lineFut.mget.setLen(0) - lineFut.clean() - await client.recvLineInto(lineFut) + line.setLen(0) + await client.recvLineInto(addr line) - if lineFut.mget == "": + if line == "": client.close(); return - if lineFut.mget == "\c\L": break - let (key, value) = parseHeader(lineFut.mget) + if line == "\c\L": break + let (key, value) = parseHeader(line) request.headers[key] = value if request.reqMethod == "post": diff --git a/lib/pure/asyncnet.nim b/lib/pure/asyncnet.nim index 62e85042f..a79f30ab3 100644 --- a/lib/pure/asyncnet.nim +++ b/lib/pure/asyncnet.nim @@ -307,13 +307,10 @@ proc accept*(socket: AsyncSocket, retFut.complete(future.read.client) return retFut -proc recvLineInto*(socket: AsyncSocket, resString: FutureVar[string], +proc recvLineInto*(socket: AsyncSocket, resString: ptr string, flags = {SocketFlag.SafeDisconn}) {.async.} = ## Reads a line of data from ``socket`` into ``resString``. ## - ## The ``resString`` future and the string value contained within must both - ## be initialised. - ## ## If a full line is read ``\r\L`` is not ## added to ``line``, however if solely ``\r\L`` is read then ``line`` ## will be set to it. @@ -329,23 +326,16 @@ proc recvLineInto*(socket: AsyncSocket, resString: FutureVar[string], ## **Warning**: ``recvLineInto`` on unbuffered sockets assumes that the ## protocol uses ``\r\L`` to delimit a new line. assert SocketFlag.Peek notin flags ## TODO: - assert(not resString.mget.isNil(), - "String inside resString future needs to be initialised") result = newFuture[void]("asyncnet.recvLineInto") - # TODO: Make the async transformation check for FutureVar params and complete - # them when the result future is completed. - # Can we replace the result future with the FutureVar? - template addNLIfEmpty(): stmt = - if resString.mget.len == 0: - resString.mget.add("\c\L") + if resString[].len == 0: + resString[].add("\c\L") if socket.isBuffered: if socket.bufLen == 0: let res = socket.readIntoBuf(flags) if res == 0: - resString.complete() return var lastR = false @@ -353,8 +343,7 @@ proc recvLineInto*(socket: AsyncSocket, resString: FutureVar[string], if socket.currPos >= socket.bufLen: let res = socket.readIntoBuf(flags) if res == 0: - resString.mget().setLen(0) - resString.complete() + resString[].setLen(0) return case socket.buffer[socket.currPos] @@ -364,15 +353,13 @@ proc recvLineInto*(socket: AsyncSocket, resString: FutureVar[string], of '\L': addNLIfEmpty() socket.currPos.inc() - resString.complete() return else: if lastR: socket.currPos.inc() - resString.complete() return else: - resString.mget.add socket.buffer[socket.currPos] + resString[].add socket.buffer[socket.currPos] socket.currPos.inc() else: var c = "" @@ -380,23 +367,18 @@ proc recvLineInto*(socket: AsyncSocket, resString: FutureVar[string], let recvFut = recv(socket, 1, flags) c = recvFut.read() if c.len == 0: - resString.mget.setLen(0) - resString.complete() + resString[].setLen(0) return if c == "\r": let recvFut = recv(socket, 1, flags) # Skip \L c = recvFut.read() assert c == "\L" addNLIfEmpty() - resString.complete() return elif c == "\L": addNLIfEmpty() - resString.complete() return - resString.mget.add c - - resString.complete() + resString[].add c proc recvLine*(socket: AsyncSocket, flags = {SocketFlag.SafeDisconn}): Future[string] {.async.} = @@ -422,10 +404,8 @@ proc recvLine*(socket: AsyncSocket, result.add("\c\L") assert SocketFlag.Peek notin flags ## TODO: - # TODO: Optimise this. - var resString = newFutureVar[string]("asyncnet.recvLine") - await socket.recvLineInto(resString, flags) - result = resString.mget() + result = "" + await socket.recvLineInto(addr result, flags) proc listen*(socket: AsyncSocket, backlog = SOMAXCONN) {.tags: [ReadIOEffect].} = ## Marks ``socket`` as accepting connections. -- cgit 1.4.1-2-gfad0