diff options
author | def <dennis@felsin9.de> | 2015-03-02 02:52:55 +0100 |
---|---|---|
committer | def <dennis@felsin9.de> | 2015-03-17 19:39:02 +0100 |
commit | 07a50caf64d1ed2891349cff9a22b53c4ef61c2d (patch) | |
tree | 928e66d02c456e44d0628cab6b5af70d553c083d | |
parent | 5aab532c9262bdd062d3f225a9402ede189b7a9b (diff) | |
download | Nim-07a50caf64d1ed2891349cff9a22b53c4ef61c2d.tar.gz |
Make asyncnet usable when avoiding allocations.
- readInto, readIntoBuf, are templates instead of procs now - New recvLineInto template that reads directly into a string instead of creating a new one. Used by recvLine proc now - Need fd and bufLen fields of AsyncSocketDesc exported because of the templates - recv returns a shallow string to prevent copying - This gives significant speedups, mostly by using templates instead of creating new Futures and waiting for them all the time.
-rw-r--r-- | lib/pure/asyncnet.nim | 159 |
1 files changed, 98 insertions, 61 deletions
diff --git a/lib/pure/asyncnet.nim b/lib/pure/asyncnet.nim index e7325e0d7..7e1ff5db4 100644 --- a/lib/pure/asyncnet.nim +++ b/lib/pure/asyncnet.nim @@ -69,13 +69,13 @@ type # TODO: I would prefer to just do: # AsyncSocket* {.borrow: `.`.} = distinct Socket. But that doesn't work. AsyncSocketDesc = object - fd: SocketHandle + fd*: SocketHandle closed: bool ## determines whether this socket has been closed case isBuffered: bool ## determines whether this socket is buffered. of true: buffer: array[0..BufferSize, char] currPos: int # current index in buffer - bufLen: int # current length of buffer + bufLen*: int # current length of buffer of false: nil case isSsl: bool of true: @@ -182,26 +182,30 @@ proc connect*(socket: AsyncSocket, address: string, port: Port, sslSetConnectState(socket.sslHandle) sslLoop(socket, flags, sslDoHandshake(socket.sslHandle)) -proc readInto(buf: cstring, size: int, socket: AsyncSocket, - flags: set[SocketFlag]): Future[int] {.async.} = +template readInto*(buf: cstring, size: int, socket: AsyncSocket, + flags: set[SocketFlag]): int = + ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``. Note that + ## this is a template and not a proc. + var res = 0 if socket.isSsl: when defined(ssl): # SSL mode. sslLoop(socket, flags, sslRead(socket.sslHandle, buf, size.cint)) - result = opResult + res = opResult else: - var data = await recv(socket.fd.TAsyncFD, size, flags) - if data.len != 0: - copyMem(buf, addr data[0], data.len) + var recvIntoFut = recvInto(socket.fd.TAsyncFD, buf, size, flags) + yield recvIntoFut # Not in SSL mode. - result = data.len + res = recvIntoFut.read() + res -proc readIntoBuf(socket: AsyncSocket, - flags: set[SocketFlag]): Future[int] {.async.} = - result = await readInto(addr socket.buffer[0], BufferSize, socket, flags) +template readIntoBuf(socket: AsyncSocket, + flags: set[SocketFlag]): int = + var size = readInto(addr socket.buffer[0], BufferSize, socket, flags) socket.currPos = 0 - socket.bufLen = result + socket.bufLen = size + size proc recv*(socket: AsyncSocket, size: int, flags = {SocketFlag.SafeDisconn}): Future[string] {.async.} = @@ -222,10 +226,11 @@ proc recv*(socket: AsyncSocket, size: int, ## to be read then the future will complete with a value of ``""``. if socket.isBuffered: result = newString(size) + shallow(result) let originalBufPos = socket.currPos if socket.bufLen == 0: - let res = await socket.readIntoBuf(flags - {SocketFlag.Peek}) + let res = socket.readIntoBuf(flags - {SocketFlag.Peek}) if res == 0: result.setLen(0) return @@ -236,7 +241,7 @@ proc recv*(socket: AsyncSocket, size: int, if SocketFlag.Peek in flags: # We don't want to get another buffer if we're peeking. break - let res = await socket.readIntoBuf(flags - {SocketFlag.Peek}) + let res = socket.readIntoBuf(flags - {SocketFlag.Peek}) if res == 0: break @@ -251,7 +256,7 @@ proc recv*(socket: AsyncSocket, size: int, result.setLen(read) else: result = newString(size) - let read = await readInto(addr result[0], size, socket, flags) + let read = readInto(addr result[0], size, socket, flags) result.setLen(read) proc send*(socket: AsyncSocket, data: string, @@ -302,6 +307,81 @@ proc accept*(socket: AsyncSocket, retFut.complete(future.read.client) return retFut +template recvLineInto*(socket: AsyncSocket, resString: var string, + flags = {SocketFlag.SafeDisconn}) = + ## Reads a line of data from ``socket`` into ``resString``. + ## + ## If a full line is read ``\r\L`` is not + ## added to ``line``, however if solely ``\r\L`` is read then ``line`` + ## will be set to it. + ## + ## If the socket is disconnected, ``line`` will be set to ``""``. + ## + ## If the socket is disconnected in the middle of a line (before ``\r\L`` + ## is read) then line will be set to ``""``. + ## The partial line **will be lost**. + ## + ## **Warning**: The ``Peek`` flag is not yet implemented. + ## + ## **Warning**: ``recvLineInto`` on unbuffered sockets assumes that the + ## protocol uses ``\r\L`` to delimit a new line. + assert SocketFlag.Peek notin flags ## TODO: + + template addNLIfEmpty(): stmt = + if resString.len == 0: + resString.add("\c\L") + + block recvLineInto: + if socket.isBuffered: + if socket.bufLen == 0: + let res = socket.readIntoBuf(flags) + if res == 0: + break recvLineInto + + var lastR = false + while true: + if socket.currPos >= socket.bufLen: + let res = socket.readIntoBuf(flags) + if res == 0: + resString.setLen(0) + break recvLineInto + + case socket.buffer[socket.currPos] + of '\r': + lastR = true + addNLIfEmpty() + of '\L': + addNLIfEmpty() + socket.currPos.inc() + break recvLineInto + else: + if lastR: + socket.currPos.inc() + break recvLineInto + else: + resString.add socket.buffer[socket.currPos] + socket.currPos.inc() + else: + var c = "" + while true: + let recvFut = recv(socket, 1, flags) + yield recvFut + c = recvFut.read() + if c.len == 0: + resString.setLen(0) + break recvLineInto + if c == "\r": + let recvFut = recv(socket, 1, flags) # Skip \L + yield recvFut + c = recvFut.read() + assert c == "\L" + addNLIfEmpty() + break recvLineInto + elif c == "\L": + addNLIfEmpty() + break recvLineInto + add(resString, c) + proc recvLine*(socket: AsyncSocket, flags = {SocketFlag.SafeDisconn}): Future[string] {.async.} = ## Reads a line of data from ``socket``. Returned future will complete once @@ -325,52 +405,9 @@ proc recvLine*(socket: AsyncSocket, if result.len == 0: result.add("\c\L") assert SocketFlag.Peek notin flags ## TODO: - if socket.isBuffered: - result = "" - if socket.bufLen == 0: - let res = await socket.readIntoBuf(flags) - if res == 0: - return - var lastR = false - while true: - if socket.currPos >= socket.bufLen: - let res = await socket.readIntoBuf(flags) - if res == 0: - result = "" - break - - case socket.buffer[socket.currPos] - of '\r': - lastR = true - addNLIfEmpty() - of '\L': - addNLIfEmpty() - socket.currPos.inc() - return - else: - if lastR: - socket.currPos.inc() - return - else: - result.add socket.buffer[socket.currPos] - socket.currPos.inc() - else: - result = "" - var c = "" - while true: - c = await recv(socket, 1, flags) - if c.len == 0: - return "" - if c == "\r": - c = await recv(socket, 1, flags) # Skip \L - assert c == "\L" - addNLIfEmpty() - return - elif c == "\L": - addNLIfEmpty() - return - add(result.string, c) + result = "" + socket.recvLineInto(result, flags) proc listen*(socket: AsyncSocket, backlog = SOMAXCONN) {.tags: [ReadIOEffect].} = ## Marks ``socket`` as accepting connections. |