summary refs log tree commit diff stats
path: root/lib
diff options
context:
space:
mode:
authorDominik Picheta <dominikpicheta@gmail.com>2017-02-11 12:43:16 +0100
committerDominik Picheta <dominikpicheta@gmail.com>2017-02-11 12:43:16 +0100
commit1b4067a81b627ee7f1aeec4d29cd70756be57a5f (patch)
tree536332230fbad520883defb411e94c7c9e426cf4 /lib
parent77071eb767dabc78ea23c0ea623331acac640694 (diff)
downloadNim-1b4067a81b627ee7f1aeec4d29cd70756be57a5f.tar.gz
Implement streamed body reading in httpclient.
Diffstat (limited to 'lib')
-rw-r--r--lib/pure/httpclient.nim150
1 files changed, 108 insertions, 42 deletions
diff --git a/lib/pure/httpclient.nim b/lib/pure/httpclient.nim
index 1ded540ec..024643384 100644
--- a/lib/pure/httpclient.nim
+++ b/lib/pure/httpclient.nim
@@ -117,20 +117,28 @@
 ## only basic authentication is supported at the moment.
 
 import net, strutils, uri, parseutils, strtabs, base64, os, mimetypes,
-  math, random, httpcore, times, tables
+  math, random, httpcore, times, tables, streams
 import asyncnet, asyncdispatch
 import nativesockets
 
 export httpcore except parseHeader # TODO: The ``except`` doesn't work
 
 type
-  Response* = object
+  Response* = ref object
     version*: string
     status*: string
     headers*: HttpHeaders
-    body*: string
+    body: string # TODO: here for compatibility with old httpclient procs.
+    bodyStream*: StringStream
 
-proc code*(response: Response): HttpCode
+  AsyncResponse* = ref object
+    version*: string
+    status*: string
+    headers*: HttpHeaders
+    body: string
+    bodyStream*: FutureStream[string]
+
+proc code*(response: Response | AsyncResponse): HttpCode
            {.raises: [ValueError, OverflowError].} =
   ## Retrieves the specified response's ``HttpCode``.
   ##
@@ -138,6 +146,40 @@ proc code*(response: Response): HttpCode
   ## corresponding ``HttpCode``.
   return response.status[0 .. 2].parseInt.HttpCode
 
+proc body*(response: Response): string =
+  ## Retrieves the specified response's body.
+  ##
+  ## The response's body stream is read synchronously.
+  if response.body.isNil():
+    response.body = response.bodyStream.readAll()
+  return response.body
+
+proc `body=`*(response: Response, value: string) {.deprecated.} =
+  ## Setter for backward compatibility.
+  ##
+  ## **This is deprecated and should not be used**.
+  response.body = value
+
+proc readAll*(future: FutureStream[string]): Future[string] {.async.} =
+  ## Returns a future that will complete when all the string data from the
+  ## specified future stream is retrieved.
+
+  # TODO: Move this to asyncfutures.
+  result = ""
+  while true:
+    let (hasValue, value) = await future.take()
+    if hasValue:
+      result.add(value)
+    else:
+      break
+
+proc body*(response: AsyncResponse): Future[string] {.async.} =
+  ## Reads the response's body and caches it. The read is performed only
+  ## once.
+  if response.body.isNil:
+    response.body = await readAll(response.bodyStream)
+  return response.body
+
 type
   Proxy* = ref object
     url*: Uri
@@ -249,6 +291,7 @@ proc parseBody(s: Socket, headers: HttpHeaders, httpVersion: string, timeout: in
           result.add(buf)
 
 proc parseResponse(s: Socket, getBody: bool, timeout: int): Response =
+  new result
   var parsedStatus = false
   var linei = 0
   var fullyRead = false
@@ -735,6 +778,10 @@ type
     contentProgress: BiggestInt
     oneSecondProgress: BiggestInt
     lastProgressReport: float
+    when SocketType is AsyncSocket:
+      bodyStream: FutureStream[string]
+    else:
+      bodyStream: StringStream
 
 type
   HttpClient* = HttpClientBase[Socket]
@@ -764,6 +811,7 @@ proc newHttpClient*(userAgent = defUserAgent,
   result.proxy = proxy
   result.timeout = timeout
   result.onProgressChanged = nil
+  result.bodyStream = newStringStream()
   when defined(ssl):
     result.sslContext = sslContext
 
@@ -794,6 +842,7 @@ proc newAsyncHttpClient*(userAgent = defUserAgent,
   result.proxy = proxy
   result.timeout = -1 # TODO
   result.onProgressChanged = nil
+  result.bodyStream = newFutureStream[string]("newAsyncHttpClient")
   when defined(ssl):
     result.sslContext = sslContext
 
@@ -815,14 +864,14 @@ proc reportProgress(client: HttpClient | AsyncHttpClient,
       client.oneSecondProgress = 0
       client.lastProgressReport = epochTime()
 
-proc recvFull(client: HttpClient | AsyncHttpClient,
-              size: int, timeout: int): Future[string] {.multisync.} =
+proc recvFull(client: HttpClient | AsyncHttpClient, size: int, timeout: int,
+              keep: bool): Future[int] {.multisync.} =
   ## Ensures that all the data requested is read and returned.
-  result = ""
+  var readLen = 0
   while true:
-    if size == result.len: break
+    if size == readLen: break
 
-    let remainingSize = size - result.len
+    let remainingSize = size - readLen
     let sizeToRecv = min(remainingSize, net.BufferSize)
 
     when client.socket is Socket:
@@ -830,13 +879,20 @@ proc recvFull(client: HttpClient | AsyncHttpClient,
     else:
       let data = await client.socket.recv(sizeToRecv)
     if data == "": break # We've been disconnected.
-    result.add data
+
+    readLen.inc(data.len)
+    if keep:
+      when client.socket is Socket:
+        client.bodyStream.write(data)
+      else:
+        await client.bodyStream.put(data)
 
     await reportProgress(client, data.len)
 
-proc parseChunks(client: HttpClient | AsyncHttpClient): Future[string]
+  return readLen
+
+proc parseChunks(client: HttpClient | AsyncHttpClient): Future[void]
                  {.multisync.} =
-  result = ""
   while true:
     var chunkSize = 0
     var chunkSizeStr = await client.socket.recvLine()
@@ -861,25 +917,29 @@ proc parseChunks(client: HttpClient | AsyncHttpClient): Future[string]
         httpError("Invalid chunk size: " & chunkSizeStr)
       inc(i)
     if chunkSize <= 0:
-      discard await recvFull(client, 2, client.timeout) # Skip \c\L
+      discard await recvFull(client, 2, client.timeout, false) # Skip \c\L
       break
-    result.add await recvFull(client, chunkSize, client.timeout)
-    discard await recvFull(client, 2, client.timeout) # Skip \c\L
+    discard await recvFull(client, chunkSize, client.timeout, true)
+    discard await recvFull(client, 2, client.timeout, false) # Skip \c\L
     # Trailer headers will only be sent if the request specifies that we want
     # them: http://tools.ietf.org/html/rfc2616#section-3.6.1
 
 proc parseBody(client: HttpClient | AsyncHttpClient,
                headers: HttpHeaders,
-               httpVersion: string): Future[string] {.multisync.} =
-  result = ""
+               httpVersion: string): Future[void] {.multisync.} =
   # Reset progress from previous requests.
   client.contentTotal = 0
   client.contentProgress = 0
   client.oneSecondProgress = 0
   client.lastProgressReport = 0
 
+  when client is HttpClient:
+    client.bodyStream = newStringStream()
+  else:
+    client.bodyStream = newFutureStream[string]("parseResponse")
+
   if headers.getOrDefault"Transfer-Encoding" == "chunked":
-    result = await parseChunks(client)
+    await parseChunks(client)
   else:
     # -REGION- Content-Length
     # (http://tools.ietf.org/html/rfc2616#section-4.4) NR.3
@@ -888,26 +948,31 @@ proc parseBody(client: HttpClient | AsyncHttpClient,
       var length = contentLengthHeader.parseint()
       client.contentTotal = length
       if length > 0:
-        result = await client.recvFull(length, client.timeout)
-        if result == "":
+        let recvLen = await client.recvFull(length, client.timeout, true)
+        if recvLen == 0:
           httpError("Got disconnected while trying to read body.")
-        if result.len != length:
+        if recvLen != length:
           httpError("Received length doesn't match expected length. Wanted " &
-                    $length & " got " & $result.len)
+                    $length & " got " & $recvLen)
     else:
       # (http://tools.ietf.org/html/rfc2616#section-4.4) NR.4 TODO
 
       # -REGION- Connection: Close
       # (http://tools.ietf.org/html/rfc2616#section-4.4) NR.5
       if headers.getOrDefault"Connection" == "close" or httpVersion == "1.0":
-        var buf = ""
         while true:
-          buf = await client.recvFull(4000, client.timeout)
-          if buf == "": break
-          result.add(buf)
+          let recvLen = await client.recvFull(4000, client.timeout, true)
+          if recvLen == 0: break
+
+  when client is AsyncHttpClient:
+    client.bodyStream.complete()
+  else:
+    client.bodyStream.setPosition(0)
 
 proc parseResponse(client: HttpClient | AsyncHttpClient,
-                   getBody: bool): Future[Response] {.multisync.} =
+                   getBody: bool): Future[Response | AsyncResponse]
+                   {.multisync.} =
+  new result
   var parsedStatus = false
   var linei = 0
   var fullyRead = false
@@ -956,9 +1021,8 @@ proc parseResponse(client: HttpClient | AsyncHttpClient,
   if not fullyRead:
     httpError("Connection was closed before full request has been made")
   if getBody:
-    result.body = await parseBody(client, result.headers, result.version)
-  else:
-    result.body = ""
+    await parseBody(client, result.headers, result.version)
+    result.bodyStream = client.bodyStream
 
 proc newConnection(client: HttpClient | AsyncHttpClient,
                    url: Uri) {.multisync.} =
@@ -1006,8 +1070,9 @@ proc override(fallback, override: HttpHeaders): HttpHeaders =
     result[k] = vs
 
 proc requestAux(client: HttpClient | AsyncHttpClient, url: string,
-              httpMethod: string, body = "",
-              headers: HttpHeaders = nil): Future[Response] {.multisync.} =
+                httpMethod: string, body = "",
+                headers: HttpHeaders = nil): Future[Response | AsyncResponse]
+                {.multisync.} =
   # Helper that actually makes the request. Does not handle redirects.
   let connectionUrl =
     if client.proxy.isNil: parseUri(url) else: client.proxy.url
@@ -1053,10 +1118,10 @@ proc requestAux(client: HttpClient | AsyncHttpClient, url: string,
   # Restore the clients proxy in case it was overwritten.
   client.proxy = savedProxy
 
-
 proc request*(client: HttpClient | AsyncHttpClient, url: string,
               httpMethod: string, body = "",
-              headers: HttpHeaders = nil): Future[Response] {.multisync.} =
+              headers: HttpHeaders = nil): Future[Response | AsyncResponse]
+              {.multisync.} =
   ## Connects to the hostname specified by the URL and performs a request
   ## using the custom method string specified by ``httpMethod``.
   ##
@@ -1078,7 +1143,8 @@ proc request*(client: HttpClient | AsyncHttpClient, url: string,
 
 proc request*(client: HttpClient | AsyncHttpClient, url: string,
               httpMethod = HttpGET, body = "",
-              headers: HttpHeaders = nil): Future[Response] {.multisync.} =
+              headers: HttpHeaders = nil): Future[Response | AsyncResponse]
+              {.multisync.} =
   ## Connects to the hostname specified by the URL and performs a request
   ## using the method specified.
   ##
@@ -1088,11 +1154,10 @@ proc request*(client: HttpClient | AsyncHttpClient, url: string,
   ##
   ## When a request is made to a different hostname, the current connection will
   ## be closed.
-  result = await request(client, url, $httpMethod, body,
-                         headers = headers)
+  result = await request(client, url, $httpMethod, body, headers)
 
 proc get*(client: HttpClient | AsyncHttpClient,
-          url: string): Future[Response] {.multisync.} =
+          url: string): Future[Response | AsyncResponse] {.multisync.} =
   ## Connects to the hostname specified by the URL and performs a GET request.
   ##
   ## This procedure will follow redirects up to a maximum number of redirects
@@ -1112,16 +1177,17 @@ proc getContent*(client: HttpClient | AsyncHttpClient,
   if resp.code.is4xx or resp.code.is5xx:
     raise newException(HttpRequestError, resp.status)
   else:
-    return resp.body
+    return await resp.bodyStream.readAll()
 
 proc post*(client: HttpClient | AsyncHttpClient, url: string, body = "",
-           multipart: MultipartData = nil): Future[Response] {.multisync.} =
+           multipart: MultipartData = nil): Future[Response | AsyncResponse]
+           {.multisync.} =
   ## Connects to the hostname specified by the URL and performs a POST request.
   ##
   ## This procedure will follow redirects up to a maximum number of redirects
   ## specified in ``client.maxRedirects``.
   let (mpHeader, mpBody) = format(multipart)
-
+  # TODO: Support FutureStream for `body` parameter.
   template withNewLine(x): expr =
     if x.len > 0 and not x.endsWith("\c\L"):
       x & "\c\L"
@@ -1161,4 +1227,4 @@ proc postContent*(client: HttpClient | AsyncHttpClient, url: string,
   if resp.code.is4xx or resp.code.is5xx:
     raise newException(HttpRequestError, resp.status)
   else:
-    return resp.body
+    return await resp.bodyStream.readAll()