diff options
author | Dominik Picheta <dominikpicheta@gmail.com> | 2017-02-11 12:43:16 +0100 |
---|---|---|
committer | Dominik Picheta <dominikpicheta@gmail.com> | 2017-02-11 12:43:16 +0100 |
commit | 1b4067a81b627ee7f1aeec4d29cd70756be57a5f (patch) | |
tree | 536332230fbad520883defb411e94c7c9e426cf4 /lib | |
parent | 77071eb767dabc78ea23c0ea623331acac640694 (diff) | |
download | Nim-1b4067a81b627ee7f1aeec4d29cd70756be57a5f.tar.gz |
Implement streamed body reading in httpclient.
Diffstat (limited to 'lib')
-rw-r--r-- | lib/pure/httpclient.nim | 150 |
1 files changed, 108 insertions, 42 deletions
diff --git a/lib/pure/httpclient.nim b/lib/pure/httpclient.nim index 1ded540ec..024643384 100644 --- a/lib/pure/httpclient.nim +++ b/lib/pure/httpclient.nim @@ -117,20 +117,28 @@ ## only basic authentication is supported at the moment. import net, strutils, uri, parseutils, strtabs, base64, os, mimetypes, - math, random, httpcore, times, tables + math, random, httpcore, times, tables, streams import asyncnet, asyncdispatch import nativesockets export httpcore except parseHeader # TODO: The ``except`` doesn't work type - Response* = object + Response* = ref object version*: string status*: string headers*: HttpHeaders - body*: string + body: string # TODO: here for compatibility with old httpclient procs. + bodyStream*: StringStream -proc code*(response: Response): HttpCode + AsyncResponse* = ref object + version*: string + status*: string + headers*: HttpHeaders + body: string + bodyStream*: FutureStream[string] + +proc code*(response: Response | AsyncResponse): HttpCode {.raises: [ValueError, OverflowError].} = ## Retrieves the specified response's ``HttpCode``. ## @@ -138,6 +146,40 @@ proc code*(response: Response): HttpCode ## corresponding ``HttpCode``. return response.status[0 .. 2].parseInt.HttpCode +proc body*(response: Response): string = + ## Retrieves the specified response's body. + ## + ## The response's body stream is read synchronously. + if response.body.isNil(): + response.body = response.bodyStream.readAll() + return response.body + +proc `body=`*(response: Response, value: string) {.deprecated.} = + ## Setter for backward compatibility. + ## + ## **This is deprecated and should not be used**. + response.body = value + +proc readAll*(future: FutureStream[string]): Future[string] {.async.} = + ## Returns a future that will complete when all the string data from the + ## specified future stream is retrieved. + + # TODO: Move this to asyncfutures. + result = "" + while true: + let (hasValue, value) = await future.take() + if hasValue: + result.add(value) + else: + break + +proc body*(response: AsyncResponse): Future[string] {.async.} = + ## Reads the response's body and caches it. The read is performed only + ## once. + if response.body.isNil: + response.body = await readAll(response.bodyStream) + return response.body + type Proxy* = ref object url*: Uri @@ -249,6 +291,7 @@ proc parseBody(s: Socket, headers: HttpHeaders, httpVersion: string, timeout: in result.add(buf) proc parseResponse(s: Socket, getBody: bool, timeout: int): Response = + new result var parsedStatus = false var linei = 0 var fullyRead = false @@ -735,6 +778,10 @@ type contentProgress: BiggestInt oneSecondProgress: BiggestInt lastProgressReport: float + when SocketType is AsyncSocket: + bodyStream: FutureStream[string] + else: + bodyStream: StringStream type HttpClient* = HttpClientBase[Socket] @@ -764,6 +811,7 @@ proc newHttpClient*(userAgent = defUserAgent, result.proxy = proxy result.timeout = timeout result.onProgressChanged = nil + result.bodyStream = newStringStream() when defined(ssl): result.sslContext = sslContext @@ -794,6 +842,7 @@ proc newAsyncHttpClient*(userAgent = defUserAgent, result.proxy = proxy result.timeout = -1 # TODO result.onProgressChanged = nil + result.bodyStream = newFutureStream[string]("newAsyncHttpClient") when defined(ssl): result.sslContext = sslContext @@ -815,14 +864,14 @@ proc reportProgress(client: HttpClient | AsyncHttpClient, client.oneSecondProgress = 0 client.lastProgressReport = epochTime() -proc recvFull(client: HttpClient | AsyncHttpClient, - size: int, timeout: int): Future[string] {.multisync.} = +proc recvFull(client: HttpClient | AsyncHttpClient, size: int, timeout: int, + keep: bool): Future[int] {.multisync.} = ## Ensures that all the data requested is read and returned. - result = "" + var readLen = 0 while true: - if size == result.len: break + if size == readLen: break - let remainingSize = size - result.len + let remainingSize = size - readLen let sizeToRecv = min(remainingSize, net.BufferSize) when client.socket is Socket: @@ -830,13 +879,20 @@ proc recvFull(client: HttpClient | AsyncHttpClient, else: let data = await client.socket.recv(sizeToRecv) if data == "": break # We've been disconnected. - result.add data + + readLen.inc(data.len) + if keep: + when client.socket is Socket: + client.bodyStream.write(data) + else: + await client.bodyStream.put(data) await reportProgress(client, data.len) -proc parseChunks(client: HttpClient | AsyncHttpClient): Future[string] + return readLen + +proc parseChunks(client: HttpClient | AsyncHttpClient): Future[void] {.multisync.} = - result = "" while true: var chunkSize = 0 var chunkSizeStr = await client.socket.recvLine() @@ -861,25 +917,29 @@ proc parseChunks(client: HttpClient | AsyncHttpClient): Future[string] httpError("Invalid chunk size: " & chunkSizeStr) inc(i) if chunkSize <= 0: - discard await recvFull(client, 2, client.timeout) # Skip \c\L + discard await recvFull(client, 2, client.timeout, false) # Skip \c\L break - result.add await recvFull(client, chunkSize, client.timeout) - discard await recvFull(client, 2, client.timeout) # Skip \c\L + discard await recvFull(client, chunkSize, client.timeout, true) + discard await recvFull(client, 2, client.timeout, false) # Skip \c\L # Trailer headers will only be sent if the request specifies that we want # them: http://tools.ietf.org/html/rfc2616#section-3.6.1 proc parseBody(client: HttpClient | AsyncHttpClient, headers: HttpHeaders, - httpVersion: string): Future[string] {.multisync.} = - result = "" + httpVersion: string): Future[void] {.multisync.} = # Reset progress from previous requests. client.contentTotal = 0 client.contentProgress = 0 client.oneSecondProgress = 0 client.lastProgressReport = 0 + when client is HttpClient: + client.bodyStream = newStringStream() + else: + client.bodyStream = newFutureStream[string]("parseResponse") + if headers.getOrDefault"Transfer-Encoding" == "chunked": - result = await parseChunks(client) + await parseChunks(client) else: # -REGION- Content-Length # (http://tools.ietf.org/html/rfc2616#section-4.4) NR.3 @@ -888,26 +948,31 @@ proc parseBody(client: HttpClient | AsyncHttpClient, var length = contentLengthHeader.parseint() client.contentTotal = length if length > 0: - result = await client.recvFull(length, client.timeout) - if result == "": + let recvLen = await client.recvFull(length, client.timeout, true) + if recvLen == 0: httpError("Got disconnected while trying to read body.") - if result.len != length: + if recvLen != length: httpError("Received length doesn't match expected length. Wanted " & - $length & " got " & $result.len) + $length & " got " & $recvLen) else: # (http://tools.ietf.org/html/rfc2616#section-4.4) NR.4 TODO # -REGION- Connection: Close # (http://tools.ietf.org/html/rfc2616#section-4.4) NR.5 if headers.getOrDefault"Connection" == "close" or httpVersion == "1.0": - var buf = "" while true: - buf = await client.recvFull(4000, client.timeout) - if buf == "": break - result.add(buf) + let recvLen = await client.recvFull(4000, client.timeout, true) + if recvLen == 0: break + + when client is AsyncHttpClient: + client.bodyStream.complete() + else: + client.bodyStream.setPosition(0) proc parseResponse(client: HttpClient | AsyncHttpClient, - getBody: bool): Future[Response] {.multisync.} = + getBody: bool): Future[Response | AsyncResponse] + {.multisync.} = + new result var parsedStatus = false var linei = 0 var fullyRead = false @@ -956,9 +1021,8 @@ proc parseResponse(client: HttpClient | AsyncHttpClient, if not fullyRead: httpError("Connection was closed before full request has been made") if getBody: - result.body = await parseBody(client, result.headers, result.version) - else: - result.body = "" + await parseBody(client, result.headers, result.version) + result.bodyStream = client.bodyStream proc newConnection(client: HttpClient | AsyncHttpClient, url: Uri) {.multisync.} = @@ -1006,8 +1070,9 @@ proc override(fallback, override: HttpHeaders): HttpHeaders = result[k] = vs proc requestAux(client: HttpClient | AsyncHttpClient, url: string, - httpMethod: string, body = "", - headers: HttpHeaders = nil): Future[Response] {.multisync.} = + httpMethod: string, body = "", + headers: HttpHeaders = nil): Future[Response | AsyncResponse] + {.multisync.} = # Helper that actually makes the request. Does not handle redirects. let connectionUrl = if client.proxy.isNil: parseUri(url) else: client.proxy.url @@ -1053,10 +1118,10 @@ proc requestAux(client: HttpClient | AsyncHttpClient, url: string, # Restore the clients proxy in case it was overwritten. client.proxy = savedProxy - proc request*(client: HttpClient | AsyncHttpClient, url: string, httpMethod: string, body = "", - headers: HttpHeaders = nil): Future[Response] {.multisync.} = + headers: HttpHeaders = nil): Future[Response | AsyncResponse] + {.multisync.} = ## Connects to the hostname specified by the URL and performs a request ## using the custom method string specified by ``httpMethod``. ## @@ -1078,7 +1143,8 @@ proc request*(client: HttpClient | AsyncHttpClient, url: string, proc request*(client: HttpClient | AsyncHttpClient, url: string, httpMethod = HttpGET, body = "", - headers: HttpHeaders = nil): Future[Response] {.multisync.} = + headers: HttpHeaders = nil): Future[Response | AsyncResponse] + {.multisync.} = ## Connects to the hostname specified by the URL and performs a request ## using the method specified. ## @@ -1088,11 +1154,10 @@ proc request*(client: HttpClient | AsyncHttpClient, url: string, ## ## When a request is made to a different hostname, the current connection will ## be closed. - result = await request(client, url, $httpMethod, body, - headers = headers) + result = await request(client, url, $httpMethod, body, headers) proc get*(client: HttpClient | AsyncHttpClient, - url: string): Future[Response] {.multisync.} = + url: string): Future[Response | AsyncResponse] {.multisync.} = ## Connects to the hostname specified by the URL and performs a GET request. ## ## This procedure will follow redirects up to a maximum number of redirects @@ -1112,16 +1177,17 @@ proc getContent*(client: HttpClient | AsyncHttpClient, if resp.code.is4xx or resp.code.is5xx: raise newException(HttpRequestError, resp.status) else: - return resp.body + return await resp.bodyStream.readAll() proc post*(client: HttpClient | AsyncHttpClient, url: string, body = "", - multipart: MultipartData = nil): Future[Response] {.multisync.} = + multipart: MultipartData = nil): Future[Response | AsyncResponse] + {.multisync.} = ## Connects to the hostname specified by the URL and performs a POST request. ## ## This procedure will follow redirects up to a maximum number of redirects ## specified in ``client.maxRedirects``. let (mpHeader, mpBody) = format(multipart) - + # TODO: Support FutureStream for `body` parameter. template withNewLine(x): expr = if x.len > 0 and not x.endsWith("\c\L"): x & "\c\L" @@ -1161,4 +1227,4 @@ proc postContent*(client: HttpClient | AsyncHttpClient, url: string, if resp.code.is4xx or resp.code.is5xx: raise newException(HttpRequestError, resp.status) else: - return resp.body + return await resp.bodyStream.readAll() |