summary refs log tree commit diff stats
path: root/lib/pure/asyncnet.nim
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pure/asyncnet.nim')
-rw-r--r--lib/pure/asyncnet.nim135
1 files changed, 95 insertions, 40 deletions
diff --git a/lib/pure/asyncnet.nim b/lib/pure/asyncnet.nim
index 39d05d36b..62e85042f 100644
--- a/lib/pure/asyncnet.nim
+++ b/lib/pure/asyncnet.nim
@@ -24,7 +24,7 @@
 ##
 ## Chat server
 ## ^^^^^^^^^^^
-## 
+##
 ## The following example demonstrates a simple chat server.
 ##
 ## .. code-block::nim
@@ -182,26 +182,30 @@ proc connect*(socket: AsyncSocket, address: string, port: Port,
       sslSetConnectState(socket.sslHandle)
       sslLoop(socket, flags, sslDoHandshake(socket.sslHandle))
 
-proc readInto(buf: cstring, size: int, socket: AsyncSocket,
-              flags: set[SocketFlag]): Future[int] {.async.} =
+template readInto(buf: cstring, size: int, socket: AsyncSocket,
+                  flags: set[SocketFlag]): int =
+  ## Reads **up to** ``size`` bytes from ``socket`` into ``buf``. Note that
+  ## this is a template and not a proc.
+  var res = 0
   if socket.isSsl:
     when defined(ssl):
       # SSL mode.
       sslLoop(socket, flags,
         sslRead(socket.sslHandle, buf, size.cint))
-      result = opResult
+      res = opResult
   else:
-    var data = await recv(socket.fd.TAsyncFD, size, flags)
-    if data.len != 0:
-      copyMem(buf, addr data[0], data.len)
+    var recvIntoFut = recvInto(socket.fd.TAsyncFD, buf, size, flags)
+    yield recvIntoFut
     # Not in SSL mode.
-    result = data.len
+    res = recvIntoFut.read()
+  res
 
-proc readIntoBuf(socket: AsyncSocket,
-    flags: set[SocketFlag]): Future[int] {.async.} =
-  result = await readInto(addr socket.buffer[0], BufferSize, socket, flags)
+template readIntoBuf(socket: AsyncSocket,
+    flags: set[SocketFlag]): int =
+  var size = readInto(addr socket.buffer[0], BufferSize, socket, flags)
   socket.currPos = 0
-  socket.bufLen = result
+  socket.bufLen = size
+  size
 
 proc recv*(socket: AsyncSocket, size: int,
            flags = {SocketFlag.SafeDisconn}): Future[string] {.async.} =
@@ -222,10 +226,11 @@ proc recv*(socket: AsyncSocket, size: int,
   ## to be read then the future will complete with a value of ``""``.
   if socket.isBuffered:
     result = newString(size)
+    shallow(result)
     let originalBufPos = socket.currPos
 
     if socket.bufLen == 0:
-      let res = await socket.readIntoBuf(flags - {SocketFlag.Peek})
+      let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
       if res == 0:
         result.setLen(0)
         return
@@ -236,7 +241,7 @@ proc recv*(socket: AsyncSocket, size: int,
         if SocketFlag.Peek in flags:
           # We don't want to get another buffer if we're peeking.
           break
-        let res = await socket.readIntoBuf(flags - {SocketFlag.Peek})
+        let res = socket.readIntoBuf(flags - {SocketFlag.Peek})
         if res == 0:
           break
 
@@ -251,7 +256,7 @@ proc recv*(socket: AsyncSocket, size: int,
     result.setLen(read)
   else:
     result = newString(size)
-    let read = await readInto(addr result[0], size, socket, flags)
+    let read = readInto(addr result[0], size, socket, flags)
     result.setLen(read)
 
 proc send*(socket: AsyncSocket, data: string,
@@ -302,15 +307,17 @@ proc accept*(socket: AsyncSocket,
         retFut.complete(future.read.client)
   return retFut
 
-proc recvLine*(socket: AsyncSocket,
-    flags = {SocketFlag.SafeDisconn}): Future[string] {.async.} =
-  ## Reads a line of data from ``socket``. Returned future will complete once
-  ## a full line is read or an error occurs.
+proc recvLineInto*(socket: AsyncSocket, resString: FutureVar[string],
+    flags = {SocketFlag.SafeDisconn}) {.async.} =
+  ## Reads a line of data from ``socket`` into ``resString``.
+  ##
+  ## The ``resString`` future and the string value contained within must both
+  ## be initialised.
   ##
   ## If a full line is read ``\r\L`` is not
   ## added to ``line``, however if solely ``\r\L`` is read then ``line``
   ## will be set to it.
-  ## 
+  ##
   ## If the socket is disconnected, ``line`` will be set to ``""``.
   ##
   ## If the socket is disconnected in the middle of a line (before ``\r\L``
@@ -318,27 +325,37 @@ proc recvLine*(socket: AsyncSocket,
   ## The partial line **will be lost**.
   ##
   ## **Warning**: The ``Peek`` flag is not yet implemented.
-  ## 
-  ## **Warning**: ``recvLine`` on unbuffered sockets assumes that the protocol
-  ## uses ``\r\L`` to delimit a new line.
-  template addNLIfEmpty(): stmt =
-    if result.len == 0:
-      result.add("\c\L")
+  ##
+  ## **Warning**: ``recvLineInto`` on unbuffered sockets assumes that the
+  ## protocol uses ``\r\L`` to delimit a new line.
   assert SocketFlag.Peek notin flags ## TODO:
+  assert(not resString.mget.isNil(),
+      "String inside resString future needs to be initialised")
+  result = newFuture[void]("asyncnet.recvLineInto")
+
+  # TODO: Make the async transformation check for FutureVar params and complete
+  # them when the result future is completed.
+  # Can we replace the result future with the FutureVar?
+
+  template addNLIfEmpty(): stmt =
+    if resString.mget.len == 0:
+      resString.mget.add("\c\L")
+
   if socket.isBuffered:
-    result = ""
     if socket.bufLen == 0:
-      let res = await socket.readIntoBuf(flags)
+      let res = socket.readIntoBuf(flags)
       if res == 0:
+        resString.complete()
         return
 
     var lastR = false
     while true:
       if socket.currPos >= socket.bufLen:
-        let res = await socket.readIntoBuf(flags)
+        let res = socket.readIntoBuf(flags)
         if res == 0:
-          result = ""
-          break
+          resString.mget().setLen(0)
+          resString.complete()
+          return
 
       case socket.buffer[socket.currPos]
       of '\r':
@@ -347,30 +364,68 @@ proc recvLine*(socket: AsyncSocket,
       of '\L':
         addNLIfEmpty()
         socket.currPos.inc()
+        resString.complete()
         return
       else:
         if lastR:
           socket.currPos.inc()
+          resString.complete()
           return
         else:
-          result.add socket.buffer[socket.currPos]
+          resString.mget.add socket.buffer[socket.currPos]
       socket.currPos.inc()
   else:
-    result = ""
     var c = ""
     while true:
-      c = await recv(socket, 1, flags)
+      let recvFut = recv(socket, 1, flags)
+      c = recvFut.read()
       if c.len == 0:
-        return ""
+        resString.mget.setLen(0)
+        resString.complete()
+        return
       if c == "\r":
-        c = await recv(socket, 1, flags) # Skip \L
+        let recvFut = recv(socket, 1, flags) # Skip \L
+        c = recvFut.read()
         assert c == "\L"
         addNLIfEmpty()
+        resString.complete()
         return
       elif c == "\L":
         addNLIfEmpty()
+        resString.complete()
         return
-      add(result.string, c)
+      resString.mget.add c
+
+  resString.complete()
+
+proc recvLine*(socket: AsyncSocket,
+    flags = {SocketFlag.SafeDisconn}): Future[string] {.async.} =
+  ## Reads a line of data from ``socket``. Returned future will complete once
+  ## a full line is read or an error occurs.
+  ##
+  ## If a full line is read ``\r\L`` is not
+  ## added to ``line``, however if solely ``\r\L`` is read then ``line``
+  ## will be set to it.
+  ##
+  ## If the socket is disconnected, ``line`` will be set to ``""``.
+  ##
+  ## If the socket is disconnected in the middle of a line (before ``\r\L``
+  ## is read) then line will be set to ``""``.
+  ## The partial line **will be lost**.
+  ##
+  ## **Warning**: The ``Peek`` flag is not yet implemented.
+  ##
+  ## **Warning**: ``recvLine`` on unbuffered sockets assumes that the protocol
+  ## uses ``\r\L`` to delimit a new line.
+  template addNLIfEmpty(): stmt =
+    if result.len == 0:
+      result.add("\c\L")
+  assert SocketFlag.Peek notin flags ## TODO:
+
+  # TODO: Optimise this.
+  var resString = newFutureVar[string]("asyncnet.recvLine")
+  await socket.recvLineInto(resString, flags)
+  result = resString.mget()
 
 proc listen*(socket: AsyncSocket, backlog = SOMAXCONN) {.tags: [ReadIOEffect].} =
   ## Marks ``socket`` as accepting connections.
@@ -500,11 +555,11 @@ when not defined(testing) and isMainModule:
         proc (future: Future[void]) =
           echo("Send")
           client.close()
-      
+
       var f = accept(sock)
       f.callback = onAccept
-      
+
     var f = accept(sock)
     f.callback = onAccept
   runForever()
-    
+