summary refs log tree commit diff stats
path: root/lib/pure
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pure')
-rw-r--r--lib/pure/asyncdispatch.nim2
-rw-r--r--lib/pure/asyncfile.nim12
-rw-r--r--lib/pure/asyncmacro.nim63
-rw-r--r--lib/pure/collections/queues.nim2
-rw-r--r--lib/pure/httpclient.nim198
-rw-r--r--lib/pure/includes/asyncfutures.nim107
6 files changed, 300 insertions, 84 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim
index d97214d15..58113ae69 100644
--- a/lib/pure/asyncdispatch.nim
+++ b/lib/pure/asyncdispatch.nim
@@ -9,7 +9,7 @@
 
 include "system/inclrtl"
 
-import os, oids, tables, strutils, times, heapqueue
+import os, oids, tables, strutils, times, heapqueue, queues
 
 import nativesockets, net, deques
 
diff --git a/lib/pure/asyncfile.nim b/lib/pure/asyncfile.nim
index 0241e4796..5a23f3ba2 100644
--- a/lib/pure/asyncfile.nim
+++ b/lib/pure/asyncfile.nim
@@ -476,3 +476,15 @@ proc close*(f: AsyncFile) =
     if close(f.fd.cint) == -1:
       raiseOSError(osLastError())
 
+proc writeFromStream(f: AsyncFile, fut: FutureStream[string]) {.async.} =
+  while true:
+    let (hasValue, value) = await fut.take()
+    if hasValue:
+      await f.write(value)
+    else:
+      break
+
+proc getWriteStream*(f: AsyncFile): FutureStream[string] =
+  ## Returns a new stream that can be used for writing to the file.
+  result = newFutureStream[string]()
+  asyncCheck writeFromStream(f, result)
diff --git a/lib/pure/asyncmacro.nim b/lib/pure/asyncmacro.nim
index f74881c6d..f0837d67d 100644
--- a/lib/pure/asyncmacro.nim
+++ b/lib/pure/asyncmacro.nim
@@ -33,8 +33,10 @@ template createCb(retFutureSym, iteratorNameSym,
       if not nameIterVar.finished:
         var next = nameIterVar()
         if next == nil:
-          assert retFutureSym.finished, "Async procedure's (" &
-                 name & ") return Future was not finished."
+          if not retFutureSym.finished:
+            let msg = "Async procedure ($1) yielded `nil`, are you await'ing a " &
+                    "`nil` Future?"
+            raise newException(AssertionError, msg % name)
         else:
           next.callback = cb
     except:
@@ -281,6 +283,14 @@ proc getFutureVarIdents(params: NimNode): seq[NimNode] {.compileTime.} =
        ($params[i][1][0].ident).normalize == "futurevar":
       result.add(params[i][0])
 
+proc isInvalidReturnType(typeName: string): bool =
+  return typeName notin ["Future"] #, "FutureStream"]
+
+proc verifyReturnType(typeName: string) {.compileTime.} =
+  if typeName.isInvalidReturnType:
+    error("Expected return type of 'Future' got '$1'" %
+          typeName)
+
 proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} =
   ## This macro transforms a single procedure into a closure iterator.
   ## The ``async`` macro supports a stmtList holding multiple async procedures.
@@ -295,18 +305,16 @@ proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} =
   # Verify that the return type is a Future[T]
   if returnType.kind == nnkBracketExpr:
     let fut = repr(returnType[0])
-    if fut != "Future":
-      error("Expected return type of 'Future' got '" & fut & "'")
+    verifyReturnType(fut)
     baseType = returnType[1]
   elif returnType.kind in nnkCallKinds and $returnType[0] == "[]":
     let fut = repr(returnType[1])
-    if fut != "Future":
-      error("Expected return type of 'Future' got '" & fut & "'")
+    verifyReturnType(fut)
     baseType = returnType[2]
   elif returnType.kind == nnkEmpty:
     baseType = returnType
   else:
-    error("Expected return type of 'Future' got '" & repr(returnType) & "'")
+    verifyReturnType(repr(returnType))
 
   let subtypeIsVoid = returnType.kind == nnkEmpty or
         (baseType.kind == nnkIdent and returnType[1].ident == !"void")
@@ -390,7 +398,7 @@ proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} =
   if procBody.kind != nnkEmpty:
     result[6] = outerProcBody
   #echo(treeRepr(result))
-  #if prc[0].getName == "testInfix":
+  #if prc[0].getName == "beta":
   #  echo(toStrLit(result))
 
 macro async*(prc: untyped): untyped =
@@ -451,13 +459,12 @@ proc stripAwait(node: NimNode): NimNode =
   for i in 0 .. <result.len:
     result[i] = stripAwait(result[i])
 
-proc splitParams(param: NimNode, async: bool): NimNode =
-  expectKind(param, nnkIdentDefs)
-  result = param
-  if param[1].kind == nnkInfix and $param[1][0].ident in ["|", "or"]:
-    let firstType = param[1][1]
+proc splitParamType(paramType: NimNode, async: bool): NimNode =
+  result = paramType
+  if paramType.kind == nnkInfix and $paramType[0].ident in ["|", "or"]:
+    let firstType = paramType[1]
     let firstTypeName = $firstType.ident
-    let secondType = param[1][2]
+    let secondType = paramType[2]
     let secondTypeName = $secondType.ident
 
     # Make sure that at least one has the name `async`, otherwise we shouldn't
@@ -468,22 +475,21 @@ proc splitParams(param: NimNode, async: bool): NimNode =
 
     if async:
       if firstTypeName.normalize.startsWith("async"):
-        result = newIdentDefs(param[0], param[1][1])
+        result = paramType[1]
       elif secondTypeName.normalize.startsWith("async"):
-        result = newIdentDefs(param[0], param[1][2])
+        result = paramType[2]
     else:
       if not firstTypeName.normalize.startsWith("async"):
-        result = newIdentDefs(param[0], param[1][1])
+        result = paramType[1]
       elif not secondTypeName.normalize.startsWith("async"):
-        result = newIdentDefs(param[0], param[1][2])
+        result = paramType[2]
 
 proc stripReturnType(returnType: NimNode): NimNode =
   # Strip out the 'Future' from 'Future[T]'.
   result = returnType
   if returnType.kind == nnkBracketExpr:
     let fut = repr(returnType[0])
-    if fut != "Future":
-      error("Expected return type of 'Future' got '" & fut & "'")
+    verifyReturnType(fut)
     result = returnType[1]
 
 proc splitProc(prc: NimNode): (NimNode, NimNode) =
@@ -491,15 +497,24 @@ proc splitProc(prc: NimNode): (NimNode, NimNode) =
   ## for example: proc (socket: Socket | AsyncSocket).
   ## It transforms them so that ``proc (socket: Socket)`` and
   ## ``proc (socket: AsyncSocket)`` are returned.
+
   result[0] = prc.copyNimTree()
-  result[0][3][0] = stripReturnType(result[0][3][0])
+  # Retrieve the `T` inside `Future[T]`.
+  let returnType = stripReturnType(result[0][3][0])
+  result[0][3][0] = splitParamType(returnType, async=false)
   for i in 1 .. <result[0][3].len:
-    result[0][3][i] = splitParams(result[0][3][i], false)
+    # Sync proc (0) -> FormalParams (3) -> IdentDefs, the parameter (i) ->
+    # parameter type (1).
+    result[0][3][i][1] = splitParamType(result[0][3][i][1], async=false)
   result[0][6] = stripAwait(result[0][6])
 
   result[1] = prc.copyNimTree()
+  if result[1][3][0].kind == nnkBracketExpr:
+    result[1][3][0][1] = splitParamType(result[1][3][0][1], async=true)
   for i in 1 .. <result[1][3].len:
-    result[1][3][i] = splitParams(result[1][3][i], true)
+    # Async proc (1) -> FormalParams (3) -> IdentDefs, the parameter (i) ->
+    # parameter type (1).
+    result[1][3][i][1] = splitParamType(result[1][3][i][1], async=true)
 
 macro multisync*(prc: untyped): untyped =
   ## Macro which processes async procedures into both asynchronous and
@@ -512,4 +527,4 @@ macro multisync*(prc: untyped): untyped =
   let (sync, asyncPrc) = splitProc(prc)
   result = newStmtList()
   result.add(asyncSingleProc(asyncPrc))
-  result.add(sync)
+  result.add(sync)
\ No newline at end of file
diff --git a/lib/pure/collections/queues.nim b/lib/pure/collections/queues.nim
index 0490ae494..401422162 100644
--- a/lib/pure/collections/queues.nim
+++ b/lib/pure/collections/queues.nim
@@ -144,7 +144,7 @@ proc add*[T](q: var Queue[T], item: T) =
   var cap = q.mask+1
   if unlikely(q.count >= cap):
     var n = newSeq[T](cap*2)
-    for i, x in q:  # don't use copyMem because the GC and because it's slower.
+    for i, x in pairs(q):  # don't use copyMem because the GC and because it's slower.
       shallowCopy(n[i], x)
     shallowCopy(q.data, n)
     q.mask = cap*2 - 1
diff --git a/lib/pure/httpclient.nim b/lib/pure/httpclient.nim
index 1ded540ec..4f26c078a 100644
--- a/lib/pure/httpclient.nim
+++ b/lib/pure/httpclient.nim
@@ -117,20 +117,28 @@
 ## only basic authentication is supported at the moment.
 
 import net, strutils, uri, parseutils, strtabs, base64, os, mimetypes,
-  math, random, httpcore, times, tables
-import asyncnet, asyncdispatch
+  math, random, httpcore, times, tables, streams
+import asyncnet, asyncdispatch, asyncfile
 import nativesockets
 
 export httpcore except parseHeader # TODO: The ``except`` doesn't work
 
 type
-  Response* = object
+  Response* = ref object
     version*: string
     status*: string
     headers*: HttpHeaders
-    body*: string
+    body: string # TODO: here for compatibility with old httpclient procs.
+    bodyStream*: Stream
 
-proc code*(response: Response): HttpCode
+  AsyncResponse* = ref object
+    version*: string
+    status*: string
+    headers*: HttpHeaders
+    body: string
+    bodyStream*: FutureStream[string]
+
+proc code*(response: Response | AsyncResponse): HttpCode
            {.raises: [ValueError, OverflowError].} =
   ## Retrieves the specified response's ``HttpCode``.
   ##
@@ -138,6 +146,40 @@ proc code*(response: Response): HttpCode
   ## corresponding ``HttpCode``.
   return response.status[0 .. 2].parseInt.HttpCode
 
+proc body*(response: Response): string =
+  ## Retrieves the specified response's body.
+  ##
+  ## The response's body stream is read synchronously.
+  if response.body.isNil():
+    response.body = response.bodyStream.readAll()
+  return response.body
+
+proc `body=`*(response: Response, value: string) {.deprecated.} =
+  ## Setter for backward compatibility.
+  ##
+  ## **This is deprecated and should not be used**.
+  response.body = value
+
+proc readAll*(future: FutureStream[string]): Future[string] {.async.} =
+  ## Returns a future that will complete when all the string data from the
+  ## specified future stream is retrieved.
+
+  # TODO: Move this to asyncfutures.
+  result = ""
+  while true:
+    let (hasValue, value) = await future.take()
+    if hasValue:
+      result.add(value)
+    else:
+      break
+
+proc body*(response: AsyncResponse): Future[string] {.async.} =
+  ## Reads the response's body and caches it. The read is performed only
+  ## once.
+  if response.body.isNil:
+    response.body = await readAll(response.bodyStream)
+  return response.body
+
 type
   Proxy* = ref object
     url*: Uri
@@ -249,6 +291,7 @@ proc parseBody(s: Socket, headers: HttpHeaders, httpVersion: string, timeout: in
           result.add(buf)
 
 proc parseResponse(s: Socket, getBody: bool, timeout: int): Response =
+  new result
   var parsedStatus = false
   var linei = 0
   var fullyRead = false
@@ -653,10 +696,13 @@ proc postContent*(url: string, extraHeaders = "", body = "",
 proc downloadFile*(url: string, outputFilename: string,
                    sslContext: SSLContext = defaultSSLContext,
                    timeout = -1, userAgent = defUserAgent,
-                   proxy: Proxy = nil) =
+                   proxy: Proxy = nil) {.deprecated.} =
   ## | Downloads ``url`` and saves it to ``outputFilename``
   ## | An optional timeout can be specified in milliseconds, if reading from the
   ## server takes longer than specified an ETimeout exception will be raised.
+  ##
+  ## **Deprecated since version 0.16.2**: use ``HttpClient.downloadFile``
+  ## instead.
   var f: File
   if open(f, outputFilename, fmWrite):
     f.write(getContent(url, sslContext = sslContext, timeout = timeout,
@@ -735,6 +781,11 @@ type
     contentProgress: BiggestInt
     oneSecondProgress: BiggestInt
     lastProgressReport: float
+    when SocketType is AsyncSocket:
+      bodyStream: FutureStream[string]
+    else:
+      bodyStream: Stream
+    getBody: bool ## When `false`, the body is never read in requestAux.
 
 type
   HttpClient* = HttpClientBase[Socket]
@@ -764,6 +815,8 @@ proc newHttpClient*(userAgent = defUserAgent,
   result.proxy = proxy
   result.timeout = timeout
   result.onProgressChanged = nil
+  result.bodyStream = newStringStream()
+  result.getBody = true
   when defined(ssl):
     result.sslContext = sslContext
 
@@ -794,6 +847,8 @@ proc newAsyncHttpClient*(userAgent = defUserAgent,
   result.proxy = proxy
   result.timeout = -1 # TODO
   result.onProgressChanged = nil
+  result.bodyStream = newFutureStream[string]("newAsyncHttpClient")
+  result.getBody = true
   when defined(ssl):
     result.sslContext = sslContext
 
@@ -815,14 +870,14 @@ proc reportProgress(client: HttpClient | AsyncHttpClient,
       client.oneSecondProgress = 0
       client.lastProgressReport = epochTime()
 
-proc recvFull(client: HttpClient | AsyncHttpClient,
-              size: int, timeout: int): Future[string] {.multisync.} =
+proc recvFull(client: HttpClient | AsyncHttpClient, size: int, timeout: int,
+              keep: bool): Future[int] {.multisync.} =
   ## Ensures that all the data requested is read and returned.
-  result = ""
+  var readLen = 0
   while true:
-    if size == result.len: break
+    if size == readLen: break
 
-    let remainingSize = size - result.len
+    let remainingSize = size - readLen
     let sizeToRecv = min(remainingSize, net.BufferSize)
 
     when client.socket is Socket:
@@ -830,13 +885,20 @@ proc recvFull(client: HttpClient | AsyncHttpClient,
     else:
       let data = await client.socket.recv(sizeToRecv)
     if data == "": break # We've been disconnected.
-    result.add data
+
+    readLen.inc(data.len)
+    if keep:
+      when client.socket is Socket:
+        client.bodyStream.write(data)
+      else:
+        await client.bodyStream.put(data)
 
     await reportProgress(client, data.len)
 
-proc parseChunks(client: HttpClient | AsyncHttpClient): Future[string]
+  return readLen
+
+proc parseChunks(client: HttpClient | AsyncHttpClient): Future[void]
                  {.multisync.} =
-  result = ""
   while true:
     var chunkSize = 0
     var chunkSizeStr = await client.socket.recvLine()
@@ -861,25 +923,27 @@ proc parseChunks(client: HttpClient | AsyncHttpClient): Future[string]
         httpError("Invalid chunk size: " & chunkSizeStr)
       inc(i)
     if chunkSize <= 0:
-      discard await recvFull(client, 2, client.timeout) # Skip \c\L
+      discard await recvFull(client, 2, client.timeout, false) # Skip \c\L
       break
-    result.add await recvFull(client, chunkSize, client.timeout)
-    discard await recvFull(client, 2, client.timeout) # Skip \c\L
+    discard await recvFull(client, chunkSize, client.timeout, true)
+    discard await recvFull(client, 2, client.timeout, false) # Skip \c\L
     # Trailer headers will only be sent if the request specifies that we want
     # them: http://tools.ietf.org/html/rfc2616#section-3.6.1
 
 proc parseBody(client: HttpClient | AsyncHttpClient,
                headers: HttpHeaders,
-               httpVersion: string): Future[string] {.multisync.} =
-  result = ""
+               httpVersion: string): Future[void] {.multisync.} =
   # Reset progress from previous requests.
   client.contentTotal = 0
   client.contentProgress = 0
   client.oneSecondProgress = 0
   client.lastProgressReport = 0
 
+  when client is AsyncHttpClient:
+    assert(not client.bodyStream.finished)
+
   if headers.getOrDefault"Transfer-Encoding" == "chunked":
-    result = await parseChunks(client)
+    await parseChunks(client)
   else:
     # -REGION- Content-Length
     # (http://tools.ietf.org/html/rfc2616#section-4.4) NR.3
@@ -888,26 +952,31 @@ proc parseBody(client: HttpClient | AsyncHttpClient,
       var length = contentLengthHeader.parseint()
       client.contentTotal = length
       if length > 0:
-        result = await client.recvFull(length, client.timeout)
-        if result == "":
+        let recvLen = await client.recvFull(length, client.timeout, true)
+        if recvLen == 0:
           httpError("Got disconnected while trying to read body.")
-        if result.len != length:
+        if recvLen != length:
           httpError("Received length doesn't match expected length. Wanted " &
-                    $length & " got " & $result.len)
+                    $length & " got " & $recvLen)
     else:
       # (http://tools.ietf.org/html/rfc2616#section-4.4) NR.4 TODO
 
       # -REGION- Connection: Close
       # (http://tools.ietf.org/html/rfc2616#section-4.4) NR.5
       if headers.getOrDefault"Connection" == "close" or httpVersion == "1.0":
-        var buf = ""
         while true:
-          buf = await client.recvFull(4000, client.timeout)
-          if buf == "": break
-          result.add(buf)
+          let recvLen = await client.recvFull(4000, client.timeout, true)
+          if recvLen == 0: break
+
+  when client is AsyncHttpClient:
+    client.bodyStream.complete()
+  else:
+    client.bodyStream.setPosition(0)
 
 proc parseResponse(client: HttpClient | AsyncHttpClient,
-                   getBody: bool): Future[Response] {.multisync.} =
+                   getBody: bool): Future[Response | AsyncResponse]
+                   {.multisync.} =
+  new result
   var parsedStatus = false
   var linei = 0
   var fullyRead = false
@@ -955,10 +1024,14 @@ proc parseResponse(client: HttpClient | AsyncHttpClient,
 
   if not fullyRead:
     httpError("Connection was closed before full request has been made")
+
   if getBody:
-    result.body = await parseBody(client, result.headers, result.version)
-  else:
-    result.body = ""
+    when client is HttpClient:
+      client.bodyStream = newStringStream()
+    else:
+      client.bodyStream = newFutureStream[string]("parseResponse")
+    await parseBody(client, result.headers, result.version)
+    result.bodyStream = client.bodyStream
 
 proc newConnection(client: HttpClient | AsyncHttpClient,
                    url: Uri) {.multisync.} =
@@ -1006,8 +1079,9 @@ proc override(fallback, override: HttpHeaders): HttpHeaders =
     result[k] = vs
 
 proc requestAux(client: HttpClient | AsyncHttpClient, url: string,
-              httpMethod: string, body = "",
-              headers: HttpHeaders = nil): Future[Response] {.multisync.} =
+                httpMethod: string, body = "",
+                headers: HttpHeaders = nil): Future[Response | AsyncResponse]
+                {.multisync.} =
   # Helper that actually makes the request. Does not handle redirects.
   let connectionUrl =
     if client.proxy.isNil: parseUri(url) else: client.proxy.url
@@ -1047,16 +1121,17 @@ proc requestAux(client: HttpClient | AsyncHttpClient, url: string,
   if body != "":
     await client.socket.send(body)
 
-  result = await parseResponse(client,
-                               httpMethod.toLower() notin ["head", "connect"])
+  let getBody = httpMethod.toLowerAscii() notin ["head", "connect"] and
+                client.getBody
+  result = await parseResponse(client, getBody)
 
   # Restore the clients proxy in case it was overwritten.
   client.proxy = savedProxy
 
-
 proc request*(client: HttpClient | AsyncHttpClient, url: string,
               httpMethod: string, body = "",
-              headers: HttpHeaders = nil): Future[Response] {.multisync.} =
+              headers: HttpHeaders = nil): Future[Response | AsyncResponse]
+              {.multisync.} =
   ## Connects to the hostname specified by the URL and performs a request
   ## using the custom method string specified by ``httpMethod``.
   ##
@@ -1078,7 +1153,8 @@ proc request*(client: HttpClient | AsyncHttpClient, url: string,
 
 proc request*(client: HttpClient | AsyncHttpClient, url: string,
               httpMethod = HttpGET, body = "",
-              headers: HttpHeaders = nil): Future[Response] {.multisync.} =
+              headers: HttpHeaders = nil): Future[Response | AsyncResponse]
+              {.multisync.} =
   ## Connects to the hostname specified by the URL and performs a request
   ## using the method specified.
   ##
@@ -1088,11 +1164,10 @@ proc request*(client: HttpClient | AsyncHttpClient, url: string,
   ##
   ## When a request is made to a different hostname, the current connection will
   ## be closed.
-  result = await request(client, url, $httpMethod, body,
-                         headers = headers)
+  result = await request(client, url, $httpMethod, body, headers)
 
 proc get*(client: HttpClient | AsyncHttpClient,
-          url: string): Future[Response] {.multisync.} =
+          url: string): Future[Response | AsyncResponse] {.multisync.} =
   ## Connects to the hostname specified by the URL and performs a GET request.
   ##
   ## This procedure will follow redirects up to a maximum number of redirects
@@ -1112,16 +1187,17 @@ proc getContent*(client: HttpClient | AsyncHttpClient,
   if resp.code.is4xx or resp.code.is5xx:
     raise newException(HttpRequestError, resp.status)
   else:
-    return resp.body
+    return await resp.bodyStream.readAll()
 
 proc post*(client: HttpClient | AsyncHttpClient, url: string, body = "",
-           multipart: MultipartData = nil): Future[Response] {.multisync.} =
+           multipart: MultipartData = nil): Future[Response | AsyncResponse]
+           {.multisync.} =
   ## Connects to the hostname specified by the URL and performs a POST request.
   ##
   ## This procedure will follow redirects up to a maximum number of redirects
   ## specified in ``client.maxRedirects``.
   let (mpHeader, mpBody) = format(multipart)
-
+  # TODO: Support FutureStream for `body` parameter.
   template withNewLine(x): expr =
     if x.len > 0 and not x.endsWith("\c\L"):
       x & "\c\L"
@@ -1134,16 +1210,14 @@ proc post*(client: HttpClient | AsyncHttpClient, url: string, body = "",
     headers["Content-Type"] = mpHeader.split(": ")[1]
   headers["Content-Length"] = $len(xb)
 
-  result = await client.requestAux(url, $HttpPOST, xb,
-                                headers = headers)
+  result = await client.requestAux(url, $HttpPOST, xb, headers)
   # Handle redirects.
   var lastURL = url
   for i in 1..client.maxRedirects:
     if result.status.redirection():
       let redirectTo = getNewLocation(lastURL, result.headers)
       var meth = if result.status != "307": HttpGet else: HttpPost
-      result = await client.requestAux(redirectTo, $meth, xb,
-                                    headers = headers)
+      result = await client.requestAux(redirectTo, $meth, xb, headers)
       lastURL = redirectTo
 
 proc postContent*(client: HttpClient | AsyncHttpClient, url: string,
@@ -1161,4 +1235,28 @@ proc postContent*(client: HttpClient | AsyncHttpClient, url: string,
   if resp.code.is4xx or resp.code.is5xx:
     raise newException(HttpRequestError, resp.status)
   else:
-    return resp.body
+    return await resp.bodyStream.readAll()
+
+proc downloadFile*(client: HttpClient | AsyncHttpClient,
+                   url: string, filename: string): Future[void] {.multisync.} =
+  ## Downloads ``url`` and saves it to ``filename``.
+  client.getBody = false
+  let resp = await client.get(url)
+
+  when client is HttpClient:
+    client.bodyStream = newFileStream(filename, fmWrite)
+    if client.bodyStream.isNil:
+      fileError("Unable to open file")
+  else:
+    var f = openAsync(filename, fmWrite)
+    client.bodyStream = f.getWriteStream()
+
+  await parseBody(client, resp.headers, resp.version)
+
+  when client is HttpClient:
+    client.bodyStream.close()
+  else:
+    f.close()
+
+  if resp.code.is4xx or resp.code.is5xx:
+    raise newException(HttpRequestError, resp.status)
\ No newline at end of file
diff --git a/lib/pure/includes/asyncfutures.nim b/lib/pure/includes/asyncfutures.nim
index c83228014..6f6693605 100644
--- a/lib/pure/includes/asyncfutures.nim
+++ b/lib/pure/includes/asyncfutures.nim
@@ -16,6 +16,10 @@ type
 
   FutureVar*[T] = distinct Future[T]
 
+  FutureStream*[T] = ref object of FutureBase   ## Special future that acts as
+                                                ## a queue.
+    queue: Queue[T]
+
   FutureError* = object of Exception
     cause*: FutureBase
 
@@ -26,11 +30,7 @@ when not defined(release):
 
 proc callSoon*(cbproc: proc ()) {.gcsafe.}
 
-proc newFuture*[T](fromProc: string = "unspecified"): Future[T] =
-  ## Creates a new future.
-  ##
-  ## Specifying ``fromProc``, which is a string specifying the name of the proc
-  ## that this future belongs to, is a good habit as it helps with debugging.
+template setupFutureBase(fromProc: string): stmt =
   new(result)
   result.finished = false
   when not defined(release):
@@ -39,6 +39,13 @@ proc newFuture*[T](fromProc: string = "unspecified"): Future[T] =
     result.fromProc = fromProc
     currentID.inc()
 
+proc newFuture*[T](fromProc: string = "unspecified"): Future[T] =
+  ## Creates a new future.
+  ##
+  ## Specifying ``fromProc``, which is a string specifying the name of the proc
+  ## that this future belongs to, is a good habit as it helps with debugging.
+  setupFutureBase(fromProc)
+
 proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] =
   ## Create a new ``FutureVar``. This Future type is ideally suited for
   ## situations where you want to avoid unnecessary allocations of Futures.
@@ -47,6 +54,15 @@ proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] =
   ## that this future belongs to, is a good habit as it helps with debugging.
   result = FutureVar[T](newFuture[T](fromProc))
 
+proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] =
+  ## Create a new ``FutureStream``. This Future type's callback can be activated
+  ## multiple times when new data is written to it.
+  ##
+  ## Specifying ``fromProc``, which is a string specifying the name of the proc
+  ## that this future belongs to, is a good habit as it helps with debugging.
+  setupFutureBase(fromProc)
+  result.queue = initQueue[T]()
+
 proc clean*[T](future: FutureVar[T]) =
   ## Resets the ``finished`` status of ``future``.
   Future[T](future).finished = false
@@ -107,12 +123,18 @@ proc complete*[T](future: FutureVar[T], val: T) =
   ## Any previously stored value will be overwritten.
   template fut: untyped = Future[T](future)
   checkFinished(fut)
-  assert(fut.error == nil)
+  assert(fut.error.isNil())
   fut.finished = true
   fut.value = val
-  if fut.cb != nil:
+  if not fut.cb.isNil():
     fut.cb()
 
+proc complete*[T](future: FutureStream[T]) =
+  ## Completes a ``FutureStream`` signifying the end of data.
+  future.finished = true
+  if not future.cb.isNil():
+    future.cb()
+
 proc fail*[T](future: Future[T], error: ref Exception) =
   ## Completes ``future`` with ``error``.
   #assert(not future.finished, "Future already finished, cannot finish twice.")
@@ -149,6 +171,20 @@ proc `callback=`*[T](future: Future[T],
   ## If future has already completed then ``cb`` will be called immediately.
   future.callback = proc () = cb(future)
 
+proc `callback=`*[T](future: FutureStream[T],
+    cb: proc (future: FutureStream[T]) {.closure,gcsafe.}) =
+  ## Sets the callback proc to be called when data was placed inside the
+  ## future stream.
+  ##
+  ## The callback is also called when the future is completed. So you should
+  ## use ``finished`` to check whether data is available.
+  ##
+  ## If the future stream already has data then ``cb`` will be called
+  ## immediately.
+  future.cb = proc () = cb(future)
+  if future.queue.len > 0 or future.finished:
+    callSoon(future.cb)
+
 proc injectStacktrace[T](future: Future[T]) =
   # TODO: Come up with something better.
   when not defined(release):
@@ -195,12 +231,17 @@ proc mget*[T](future: FutureVar[T]): var T =
   ## Future has not been finished.
   result = Future[T](future).value
 
-proc finished*[T](future: Future[T] | FutureVar[T]): bool =
+proc finished*[T](future: Future[T] | FutureVar[T] | FutureStream[T]): bool =
   ## Determines whether ``future`` has completed.
   ##
   ## ``True`` may indicate an error or a value. Use ``failed`` to distinguish.
+  ##
+  ## For a ``FutureStream`` this signifies that no more data will be placed
+  ## inside it and that there is no data waiting to be retrieved.
   when future is FutureVar[T]:
     result = (Future[T](future)).finished
+  elif future is FutureStream[T]:
+    result = future.finished and future.queue.len == 0
   else:
     result = future.finished
 
@@ -208,6 +249,56 @@ proc failed*(future: FutureBase): bool =
   ## Determines whether ``future`` completed with an error.
   return future.error != nil
 
+proc put*[T](future: FutureStream[T], value: T): Future[void] =
+  ## Writes the specified value inside the specified future stream.
+  ##
+  ## This will raise ``ValueError`` if ``future`` is finished.
+  result = newFuture[void]("FutureStream.put")
+  if future.finished:
+    let msg = "FutureStream is finished and so no longer accepts new data."
+    result.fail(newException(ValueError, msg))
+    return
+  # TODO: Buffering.
+  future.queue.enqueue(value)
+  if not future.cb.isNil: future.cb()
+  result.complete()
+
+proc take*[T](future: FutureStream[T]): Future[(bool, T)] =
+  ## Returns a future that will complete when the ``FutureStream`` has data
+  ## placed into it. The future will be completed with the oldest
+  ## value stored inside the stream. The return value will also determine
+  ## whether data was retrieved, ``false`` means that the future stream was
+  ## completed and no data was retrieved.
+  ##
+  ## This function will remove the data that was returned from the underlying
+  ## ``FutureStream``.
+  var resFut = newFuture[(bool, T)]("FutureStream.take")
+  let savedCb = future.cb
+  future.callback =
+    proc (fs: FutureStream[T]) =
+      # We don't want this callback called again.
+      future.cb = nil
+
+      # The return value depends on whether the FutureStream has finished.
+      var res: (bool, T)
+      if finished(fs):
+        # Remember, this callback is called when the FutureStream is completed.
+        res[0] = false
+      else:
+        res[0] = true
+        res[1] = fs.queue.dequeue()
+
+      if not resFut.finished:
+        resFut.complete(res)
+
+      # If the saved callback isn't nil then let's call it.
+      if not savedCb.isNil: savedCb()
+  return resFut
+
+proc len*[T](future: FutureStream[T]): int =
+  ## Returns the amount of data pieces inside the stream.
+  future.queue.len
+
 proc asyncCheck*[T](future: Future[T]) =
   ## Sets a callback on ``future`` which raises an exception if the future
   ## finished with an error.