diff options
Diffstat (limited to 'lib/pure/asyncnet.nim')
-rw-r--r-- | lib/pure/asyncnet.nim | 135 |
1 files changed, 95 insertions, 40 deletions
diff --git a/lib/pure/asyncnet.nim b/lib/pure/asyncnet.nim index 39d05d36b..62e85042f 100644 --- a/lib/pure/asyncnet.nim +++ b/lib/pure/asyncnet.nim @@ -24,7 +24,7 @@ ## ## Chat server ## ^^^^^^^^^^^ -## +## ## The following example demonstrates a simple chat server. ## ## .. code-block::nim @@ -182,26 +182,30 @@ proc connect*(socket: AsyncSocket, address: string, port: Port, sslSetConnectState(socket.sslHandle) sslLoop(socket, flags, sslDoHandshake(socket.sslHandle)) -proc readInto(buf: cstring, size: int, socket: AsyncSocket, - flags: set[SocketFlag]): Future[int] {.async.} = +template readInto(buf: cstring, size: int, socket: AsyncSocket, + flags: set[SocketFlag]): int = + ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``. Note that + ## this is a template and not a proc. + var res = 0 if socket.isSsl: when defined(ssl): # SSL mode. sslLoop(socket, flags, sslRead(socket.sslHandle, buf, size.cint)) - result = opResult + res = opResult else: - var data = await recv(socket.fd.TAsyncFD, size, flags) - if data.len != 0: - copyMem(buf, addr data[0], data.len) + var recvIntoFut = recvInto(socket.fd.TAsyncFD, buf, size, flags) + yield recvIntoFut # Not in SSL mode. - result = data.len + res = recvIntoFut.read() + res -proc readIntoBuf(socket: AsyncSocket, - flags: set[SocketFlag]): Future[int] {.async.} = - result = await readInto(addr socket.buffer[0], BufferSize, socket, flags) +template readIntoBuf(socket: AsyncSocket, + flags: set[SocketFlag]): int = + var size = readInto(addr socket.buffer[0], BufferSize, socket, flags) socket.currPos = 0 - socket.bufLen = result + socket.bufLen = size + size proc recv*(socket: AsyncSocket, size: int, flags = {SocketFlag.SafeDisconn}): Future[string] {.async.} = @@ -222,10 +226,11 @@ proc recv*(socket: AsyncSocket, size: int, ## to be read then the future will complete with a value of ``""``. if socket.isBuffered: result = newString(size) + shallow(result) let originalBufPos = socket.currPos if socket.bufLen == 0: - let res = await socket.readIntoBuf(flags - {SocketFlag.Peek}) + let res = socket.readIntoBuf(flags - {SocketFlag.Peek}) if res == 0: result.setLen(0) return @@ -236,7 +241,7 @@ proc recv*(socket: AsyncSocket, size: int, if SocketFlag.Peek in flags: # We don't want to get another buffer if we're peeking. break - let res = await socket.readIntoBuf(flags - {SocketFlag.Peek}) + let res = socket.readIntoBuf(flags - {SocketFlag.Peek}) if res == 0: break @@ -251,7 +256,7 @@ proc recv*(socket: AsyncSocket, size: int, result.setLen(read) else: result = newString(size) - let read = await readInto(addr result[0], size, socket, flags) + let read = readInto(addr result[0], size, socket, flags) result.setLen(read) proc send*(socket: AsyncSocket, data: string, @@ -302,15 +307,17 @@ proc accept*(socket: AsyncSocket, retFut.complete(future.read.client) return retFut -proc recvLine*(socket: AsyncSocket, - flags = {SocketFlag.SafeDisconn}): Future[string] {.async.} = - ## Reads a line of data from ``socket``. Returned future will complete once - ## a full line is read or an error occurs. +proc recvLineInto*(socket: AsyncSocket, resString: FutureVar[string], + flags = {SocketFlag.SafeDisconn}) {.async.} = + ## Reads a line of data from ``socket`` into ``resString``. + ## + ## The ``resString`` future and the string value contained within must both + ## be initialised. ## ## If a full line is read ``\r\L`` is not ## added to ``line``, however if solely ``\r\L`` is read then ``line`` ## will be set to it. - ## + ## ## If the socket is disconnected, ``line`` will be set to ``""``. ## ## If the socket is disconnected in the middle of a line (before ``\r\L`` @@ -318,27 +325,37 @@ proc recvLine*(socket: AsyncSocket, ## The partial line **will be lost**. ## ## **Warning**: The ``Peek`` flag is not yet implemented. - ## - ## **Warning**: ``recvLine`` on unbuffered sockets assumes that the protocol - ## uses ``\r\L`` to delimit a new line. - template addNLIfEmpty(): stmt = - if result.len == 0: - result.add("\c\L") + ## + ## **Warning**: ``recvLineInto`` on unbuffered sockets assumes that the + ## protocol uses ``\r\L`` to delimit a new line. assert SocketFlag.Peek notin flags ## TODO: + assert(not resString.mget.isNil(), + "String inside resString future needs to be initialised") + result = newFuture[void]("asyncnet.recvLineInto") + + # TODO: Make the async transformation check for FutureVar params and complete + # them when the result future is completed. + # Can we replace the result future with the FutureVar? + + template addNLIfEmpty(): stmt = + if resString.mget.len == 0: + resString.mget.add("\c\L") + if socket.isBuffered: - result = "" if socket.bufLen == 0: - let res = await socket.readIntoBuf(flags) + let res = socket.readIntoBuf(flags) if res == 0: + resString.complete() return var lastR = false while true: if socket.currPos >= socket.bufLen: - let res = await socket.readIntoBuf(flags) + let res = socket.readIntoBuf(flags) if res == 0: - result = "" - break + resString.mget().setLen(0) + resString.complete() + return case socket.buffer[socket.currPos] of '\r': @@ -347,30 +364,68 @@ proc recvLine*(socket: AsyncSocket, of '\L': addNLIfEmpty() socket.currPos.inc() + resString.complete() return else: if lastR: socket.currPos.inc() + resString.complete() return else: - result.add socket.buffer[socket.currPos] + resString.mget.add socket.buffer[socket.currPos] socket.currPos.inc() else: - result = "" var c = "" while true: - c = await recv(socket, 1, flags) + let recvFut = recv(socket, 1, flags) + c = recvFut.read() if c.len == 0: - return "" + resString.mget.setLen(0) + resString.complete() + return if c == "\r": - c = await recv(socket, 1, flags) # Skip \L + let recvFut = recv(socket, 1, flags) # Skip \L + c = recvFut.read() assert c == "\L" addNLIfEmpty() + resString.complete() return elif c == "\L": addNLIfEmpty() + resString.complete() return - add(result.string, c) + resString.mget.add c + + resString.complete() + +proc recvLine*(socket: AsyncSocket, + flags = {SocketFlag.SafeDisconn}): Future[string] {.async.} = + ## Reads a line of data from ``socket``. Returned future will complete once + ## a full line is read or an error occurs. + ## + ## If a full line is read ``\r\L`` is not + ## added to ``line``, however if solely ``\r\L`` is read then ``line`` + ## will be set to it. + ## + ## If the socket is disconnected, ``line`` will be set to ``""``. + ## + ## If the socket is disconnected in the middle of a line (before ``\r\L`` + ## is read) then line will be set to ``""``. + ## The partial line **will be lost**. + ## + ## **Warning**: The ``Peek`` flag is not yet implemented. + ## + ## **Warning**: ``recvLine`` on unbuffered sockets assumes that the protocol + ## uses ``\r\L`` to delimit a new line. + template addNLIfEmpty(): stmt = + if result.len == 0: + result.add("\c\L") + assert SocketFlag.Peek notin flags ## TODO: + + # TODO: Optimise this. + var resString = newFutureVar[string]("asyncnet.recvLine") + await socket.recvLineInto(resString, flags) + result = resString.mget() proc listen*(socket: AsyncSocket, backlog = SOMAXCONN) {.tags: [ReadIOEffect].} = ## Marks ``socket`` as accepting connections. @@ -500,11 +555,11 @@ when not defined(testing) and isMainModule: proc (future: Future[void]) = echo("Send") client.close() - + var f = accept(sock) f.callback = onAccept - + var f = accept(sock) f.callback = onAccept runForever() - + |