diff options
Diffstat (limited to 'lib/pure')
-rw-r--r-- | lib/pure/asyncdispatch.nim | 2 | ||||
-rw-r--r-- | lib/pure/asyncfile.nim | 12 | ||||
-rw-r--r-- | lib/pure/asyncmacro.nim | 63 | ||||
-rw-r--r-- | lib/pure/collections/queues.nim | 2 | ||||
-rw-r--r-- | lib/pure/httpclient.nim | 198 | ||||
-rw-r--r-- | lib/pure/includes/asyncfutures.nim | 107 |
6 files changed, 300 insertions, 84 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index d97214d15..58113ae69 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -9,7 +9,7 @@ include "system/inclrtl" -import os, oids, tables, strutils, times, heapqueue +import os, oids, tables, strutils, times, heapqueue, queues import nativesockets, net, deques diff --git a/lib/pure/asyncfile.nim b/lib/pure/asyncfile.nim index 0241e4796..5a23f3ba2 100644 --- a/lib/pure/asyncfile.nim +++ b/lib/pure/asyncfile.nim @@ -476,3 +476,15 @@ proc close*(f: AsyncFile) = if close(f.fd.cint) == -1: raiseOSError(osLastError()) +proc writeFromStream(f: AsyncFile, fut: FutureStream[string]) {.async.} = + while true: + let (hasValue, value) = await fut.take() + if hasValue: + await f.write(value) + else: + break + +proc getWriteStream*(f: AsyncFile): FutureStream[string] = + ## Returns a new stream that can be used for writing to the file. + result = newFutureStream[string]() + asyncCheck writeFromStream(f, result) diff --git a/lib/pure/asyncmacro.nim b/lib/pure/asyncmacro.nim index f74881c6d..f0837d67d 100644 --- a/lib/pure/asyncmacro.nim +++ b/lib/pure/asyncmacro.nim @@ -33,8 +33,10 @@ template createCb(retFutureSym, iteratorNameSym, if not nameIterVar.finished: var next = nameIterVar() if next == nil: - assert retFutureSym.finished, "Async procedure's (" & - name & ") return Future was not finished." + if not retFutureSym.finished: + let msg = "Async procedure ($1) yielded `nil`, are you await'ing a " & + "`nil` Future?" + raise newException(AssertionError, msg % name) else: next.callback = cb except: @@ -281,6 +283,14 @@ proc getFutureVarIdents(params: NimNode): seq[NimNode] {.compileTime.} = ($params[i][1][0].ident).normalize == "futurevar": result.add(params[i][0]) +proc isInvalidReturnType(typeName: string): bool = + return typeName notin ["Future"] #, "FutureStream"] + +proc verifyReturnType(typeName: string) {.compileTime.} = + if typeName.isInvalidReturnType: + error("Expected return type of 'Future' got '$1'" % + typeName) + proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} = ## This macro transforms a single procedure into a closure iterator. ## The ``async`` macro supports a stmtList holding multiple async procedures. @@ -295,18 +305,16 @@ proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} = # Verify that the return type is a Future[T] if returnType.kind == nnkBracketExpr: let fut = repr(returnType[0]) - if fut != "Future": - error("Expected return type of 'Future' got '" & fut & "'") + verifyReturnType(fut) baseType = returnType[1] elif returnType.kind in nnkCallKinds and $returnType[0] == "[]": let fut = repr(returnType[1]) - if fut != "Future": - error("Expected return type of 'Future' got '" & fut & "'") + verifyReturnType(fut) baseType = returnType[2] elif returnType.kind == nnkEmpty: baseType = returnType else: - error("Expected return type of 'Future' got '" & repr(returnType) & "'") + verifyReturnType(repr(returnType)) let subtypeIsVoid = returnType.kind == nnkEmpty or (baseType.kind == nnkIdent and returnType[1].ident == !"void") @@ -390,7 +398,7 @@ proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} = if procBody.kind != nnkEmpty: result[6] = outerProcBody #echo(treeRepr(result)) - #if prc[0].getName == "testInfix": + #if prc[0].getName == "beta": # echo(toStrLit(result)) macro async*(prc: untyped): untyped = @@ -451,13 +459,12 @@ proc stripAwait(node: NimNode): NimNode = for i in 0 .. <result.len: result[i] = stripAwait(result[i]) -proc splitParams(param: NimNode, async: bool): NimNode = - expectKind(param, nnkIdentDefs) - result = param - if param[1].kind == nnkInfix and $param[1][0].ident in ["|", "or"]: - let firstType = param[1][1] +proc splitParamType(paramType: NimNode, async: bool): NimNode = + result = paramType + if paramType.kind == nnkInfix and $paramType[0].ident in ["|", "or"]: + let firstType = paramType[1] let firstTypeName = $firstType.ident - let secondType = param[1][2] + let secondType = paramType[2] let secondTypeName = $secondType.ident # Make sure that at least one has the name `async`, otherwise we shouldn't @@ -468,22 +475,21 @@ proc splitParams(param: NimNode, async: bool): NimNode = if async: if firstTypeName.normalize.startsWith("async"): - result = newIdentDefs(param[0], param[1][1]) + result = paramType[1] elif secondTypeName.normalize.startsWith("async"): - result = newIdentDefs(param[0], param[1][2]) + result = paramType[2] else: if not firstTypeName.normalize.startsWith("async"): - result = newIdentDefs(param[0], param[1][1]) + result = paramType[1] elif not secondTypeName.normalize.startsWith("async"): - result = newIdentDefs(param[0], param[1][2]) + result = paramType[2] proc stripReturnType(returnType: NimNode): NimNode = # Strip out the 'Future' from 'Future[T]'. result = returnType if returnType.kind == nnkBracketExpr: let fut = repr(returnType[0]) - if fut != "Future": - error("Expected return type of 'Future' got '" & fut & "'") + verifyReturnType(fut) result = returnType[1] proc splitProc(prc: NimNode): (NimNode, NimNode) = @@ -491,15 +497,24 @@ proc splitProc(prc: NimNode): (NimNode, NimNode) = ## for example: proc (socket: Socket | AsyncSocket). ## It transforms them so that ``proc (socket: Socket)`` and ## ``proc (socket: AsyncSocket)`` are returned. + result[0] = prc.copyNimTree() - result[0][3][0] = stripReturnType(result[0][3][0]) + # Retrieve the `T` inside `Future[T]`. + let returnType = stripReturnType(result[0][3][0]) + result[0][3][0] = splitParamType(returnType, async=false) for i in 1 .. <result[0][3].len: - result[0][3][i] = splitParams(result[0][3][i], false) + # Sync proc (0) -> FormalParams (3) -> IdentDefs, the parameter (i) -> + # parameter type (1). + result[0][3][i][1] = splitParamType(result[0][3][i][1], async=false) result[0][6] = stripAwait(result[0][6]) result[1] = prc.copyNimTree() + if result[1][3][0].kind == nnkBracketExpr: + result[1][3][0][1] = splitParamType(result[1][3][0][1], async=true) for i in 1 .. <result[1][3].len: - result[1][3][i] = splitParams(result[1][3][i], true) + # Async proc (1) -> FormalParams (3) -> IdentDefs, the parameter (i) -> + # parameter type (1). + result[1][3][i][1] = splitParamType(result[1][3][i][1], async=true) macro multisync*(prc: untyped): untyped = ## Macro which processes async procedures into both asynchronous and @@ -512,4 +527,4 @@ macro multisync*(prc: untyped): untyped = let (sync, asyncPrc) = splitProc(prc) result = newStmtList() result.add(asyncSingleProc(asyncPrc)) - result.add(sync) + result.add(sync) \ No newline at end of file diff --git a/lib/pure/collections/queues.nim b/lib/pure/collections/queues.nim index 0490ae494..401422162 100644 --- a/lib/pure/collections/queues.nim +++ b/lib/pure/collections/queues.nim @@ -144,7 +144,7 @@ proc add*[T](q: var Queue[T], item: T) = var cap = q.mask+1 if unlikely(q.count >= cap): var n = newSeq[T](cap*2) - for i, x in q: # don't use copyMem because the GC and because it's slower. + for i, x in pairs(q): # don't use copyMem because the GC and because it's slower. shallowCopy(n[i], x) shallowCopy(q.data, n) q.mask = cap*2 - 1 diff --git a/lib/pure/httpclient.nim b/lib/pure/httpclient.nim index 1ded540ec..4f26c078a 100644 --- a/lib/pure/httpclient.nim +++ b/lib/pure/httpclient.nim @@ -117,20 +117,28 @@ ## only basic authentication is supported at the moment. import net, strutils, uri, parseutils, strtabs, base64, os, mimetypes, - math, random, httpcore, times, tables -import asyncnet, asyncdispatch + math, random, httpcore, times, tables, streams +import asyncnet, asyncdispatch, asyncfile import nativesockets export httpcore except parseHeader # TODO: The ``except`` doesn't work type - Response* = object + Response* = ref object version*: string status*: string headers*: HttpHeaders - body*: string + body: string # TODO: here for compatibility with old httpclient procs. + bodyStream*: Stream -proc code*(response: Response): HttpCode + AsyncResponse* = ref object + version*: string + status*: string + headers*: HttpHeaders + body: string + bodyStream*: FutureStream[string] + +proc code*(response: Response | AsyncResponse): HttpCode {.raises: [ValueError, OverflowError].} = ## Retrieves the specified response's ``HttpCode``. ## @@ -138,6 +146,40 @@ proc code*(response: Response): HttpCode ## corresponding ``HttpCode``. return response.status[0 .. 2].parseInt.HttpCode +proc body*(response: Response): string = + ## Retrieves the specified response's body. + ## + ## The response's body stream is read synchronously. + if response.body.isNil(): + response.body = response.bodyStream.readAll() + return response.body + +proc `body=`*(response: Response, value: string) {.deprecated.} = + ## Setter for backward compatibility. + ## + ## **This is deprecated and should not be used**. + response.body = value + +proc readAll*(future: FutureStream[string]): Future[string] {.async.} = + ## Returns a future that will complete when all the string data from the + ## specified future stream is retrieved. + + # TODO: Move this to asyncfutures. + result = "" + while true: + let (hasValue, value) = await future.take() + if hasValue: + result.add(value) + else: + break + +proc body*(response: AsyncResponse): Future[string] {.async.} = + ## Reads the response's body and caches it. The read is performed only + ## once. + if response.body.isNil: + response.body = await readAll(response.bodyStream) + return response.body + type Proxy* = ref object url*: Uri @@ -249,6 +291,7 @@ proc parseBody(s: Socket, headers: HttpHeaders, httpVersion: string, timeout: in result.add(buf) proc parseResponse(s: Socket, getBody: bool, timeout: int): Response = + new result var parsedStatus = false var linei = 0 var fullyRead = false @@ -653,10 +696,13 @@ proc postContent*(url: string, extraHeaders = "", body = "", proc downloadFile*(url: string, outputFilename: string, sslContext: SSLContext = defaultSSLContext, timeout = -1, userAgent = defUserAgent, - proxy: Proxy = nil) = + proxy: Proxy = nil) {.deprecated.} = ## | Downloads ``url`` and saves it to ``outputFilename`` ## | An optional timeout can be specified in milliseconds, if reading from the ## server takes longer than specified an ETimeout exception will be raised. + ## + ## **Deprecated since version 0.16.2**: use ``HttpClient.downloadFile`` + ## instead. var f: File if open(f, outputFilename, fmWrite): f.write(getContent(url, sslContext = sslContext, timeout = timeout, @@ -735,6 +781,11 @@ type contentProgress: BiggestInt oneSecondProgress: BiggestInt lastProgressReport: float + when SocketType is AsyncSocket: + bodyStream: FutureStream[string] + else: + bodyStream: Stream + getBody: bool ## When `false`, the body is never read in requestAux. type HttpClient* = HttpClientBase[Socket] @@ -764,6 +815,8 @@ proc newHttpClient*(userAgent = defUserAgent, result.proxy = proxy result.timeout = timeout result.onProgressChanged = nil + result.bodyStream = newStringStream() + result.getBody = true when defined(ssl): result.sslContext = sslContext @@ -794,6 +847,8 @@ proc newAsyncHttpClient*(userAgent = defUserAgent, result.proxy = proxy result.timeout = -1 # TODO result.onProgressChanged = nil + result.bodyStream = newFutureStream[string]("newAsyncHttpClient") + result.getBody = true when defined(ssl): result.sslContext = sslContext @@ -815,14 +870,14 @@ proc reportProgress(client: HttpClient | AsyncHttpClient, client.oneSecondProgress = 0 client.lastProgressReport = epochTime() -proc recvFull(client: HttpClient | AsyncHttpClient, - size: int, timeout: int): Future[string] {.multisync.} = +proc recvFull(client: HttpClient | AsyncHttpClient, size: int, timeout: int, + keep: bool): Future[int] {.multisync.} = ## Ensures that all the data requested is read and returned. - result = "" + var readLen = 0 while true: - if size == result.len: break + if size == readLen: break - let remainingSize = size - result.len + let remainingSize = size - readLen let sizeToRecv = min(remainingSize, net.BufferSize) when client.socket is Socket: @@ -830,13 +885,20 @@ proc recvFull(client: HttpClient | AsyncHttpClient, else: let data = await client.socket.recv(sizeToRecv) if data == "": break # We've been disconnected. - result.add data + + readLen.inc(data.len) + if keep: + when client.socket is Socket: + client.bodyStream.write(data) + else: + await client.bodyStream.put(data) await reportProgress(client, data.len) -proc parseChunks(client: HttpClient | AsyncHttpClient): Future[string] + return readLen + +proc parseChunks(client: HttpClient | AsyncHttpClient): Future[void] {.multisync.} = - result = "" while true: var chunkSize = 0 var chunkSizeStr = await client.socket.recvLine() @@ -861,25 +923,27 @@ proc parseChunks(client: HttpClient | AsyncHttpClient): Future[string] httpError("Invalid chunk size: " & chunkSizeStr) inc(i) if chunkSize <= 0: - discard await recvFull(client, 2, client.timeout) # Skip \c\L + discard await recvFull(client, 2, client.timeout, false) # Skip \c\L break - result.add await recvFull(client, chunkSize, client.timeout) - discard await recvFull(client, 2, client.timeout) # Skip \c\L + discard await recvFull(client, chunkSize, client.timeout, true) + discard await recvFull(client, 2, client.timeout, false) # Skip \c\L # Trailer headers will only be sent if the request specifies that we want # them: http://tools.ietf.org/html/rfc2616#section-3.6.1 proc parseBody(client: HttpClient | AsyncHttpClient, headers: HttpHeaders, - httpVersion: string): Future[string] {.multisync.} = - result = "" + httpVersion: string): Future[void] {.multisync.} = # Reset progress from previous requests. client.contentTotal = 0 client.contentProgress = 0 client.oneSecondProgress = 0 client.lastProgressReport = 0 + when client is AsyncHttpClient: + assert(not client.bodyStream.finished) + if headers.getOrDefault"Transfer-Encoding" == "chunked": - result = await parseChunks(client) + await parseChunks(client) else: # -REGION- Content-Length # (http://tools.ietf.org/html/rfc2616#section-4.4) NR.3 @@ -888,26 +952,31 @@ proc parseBody(client: HttpClient | AsyncHttpClient, var length = contentLengthHeader.parseint() client.contentTotal = length if length > 0: - result = await client.recvFull(length, client.timeout) - if result == "": + let recvLen = await client.recvFull(length, client.timeout, true) + if recvLen == 0: httpError("Got disconnected while trying to read body.") - if result.len != length: + if recvLen != length: httpError("Received length doesn't match expected length. Wanted " & - $length & " got " & $result.len) + $length & " got " & $recvLen) else: # (http://tools.ietf.org/html/rfc2616#section-4.4) NR.4 TODO # -REGION- Connection: Close # (http://tools.ietf.org/html/rfc2616#section-4.4) NR.5 if headers.getOrDefault"Connection" == "close" or httpVersion == "1.0": - var buf = "" while true: - buf = await client.recvFull(4000, client.timeout) - if buf == "": break - result.add(buf) + let recvLen = await client.recvFull(4000, client.timeout, true) + if recvLen == 0: break + + when client is AsyncHttpClient: + client.bodyStream.complete() + else: + client.bodyStream.setPosition(0) proc parseResponse(client: HttpClient | AsyncHttpClient, - getBody: bool): Future[Response] {.multisync.} = + getBody: bool): Future[Response | AsyncResponse] + {.multisync.} = + new result var parsedStatus = false var linei = 0 var fullyRead = false @@ -955,10 +1024,14 @@ proc parseResponse(client: HttpClient | AsyncHttpClient, if not fullyRead: httpError("Connection was closed before full request has been made") + if getBody: - result.body = await parseBody(client, result.headers, result.version) - else: - result.body = "" + when client is HttpClient: + client.bodyStream = newStringStream() + else: + client.bodyStream = newFutureStream[string]("parseResponse") + await parseBody(client, result.headers, result.version) + result.bodyStream = client.bodyStream proc newConnection(client: HttpClient | AsyncHttpClient, url: Uri) {.multisync.} = @@ -1006,8 +1079,9 @@ proc override(fallback, override: HttpHeaders): HttpHeaders = result[k] = vs proc requestAux(client: HttpClient | AsyncHttpClient, url: string, - httpMethod: string, body = "", - headers: HttpHeaders = nil): Future[Response] {.multisync.} = + httpMethod: string, body = "", + headers: HttpHeaders = nil): Future[Response | AsyncResponse] + {.multisync.} = # Helper that actually makes the request. Does not handle redirects. let connectionUrl = if client.proxy.isNil: parseUri(url) else: client.proxy.url @@ -1047,16 +1121,17 @@ proc requestAux(client: HttpClient | AsyncHttpClient, url: string, if body != "": await client.socket.send(body) - result = await parseResponse(client, - httpMethod.toLower() notin ["head", "connect"]) + let getBody = httpMethod.toLowerAscii() notin ["head", "connect"] and + client.getBody + result = await parseResponse(client, getBody) # Restore the clients proxy in case it was overwritten. client.proxy = savedProxy - proc request*(client: HttpClient | AsyncHttpClient, url: string, httpMethod: string, body = "", - headers: HttpHeaders = nil): Future[Response] {.multisync.} = + headers: HttpHeaders = nil): Future[Response | AsyncResponse] + {.multisync.} = ## Connects to the hostname specified by the URL and performs a request ## using the custom method string specified by ``httpMethod``. ## @@ -1078,7 +1153,8 @@ proc request*(client: HttpClient | AsyncHttpClient, url: string, proc request*(client: HttpClient | AsyncHttpClient, url: string, httpMethod = HttpGET, body = "", - headers: HttpHeaders = nil): Future[Response] {.multisync.} = + headers: HttpHeaders = nil): Future[Response | AsyncResponse] + {.multisync.} = ## Connects to the hostname specified by the URL and performs a request ## using the method specified. ## @@ -1088,11 +1164,10 @@ proc request*(client: HttpClient | AsyncHttpClient, url: string, ## ## When a request is made to a different hostname, the current connection will ## be closed. - result = await request(client, url, $httpMethod, body, - headers = headers) + result = await request(client, url, $httpMethod, body, headers) proc get*(client: HttpClient | AsyncHttpClient, - url: string): Future[Response] {.multisync.} = + url: string): Future[Response | AsyncResponse] {.multisync.} = ## Connects to the hostname specified by the URL and performs a GET request. ## ## This procedure will follow redirects up to a maximum number of redirects @@ -1112,16 +1187,17 @@ proc getContent*(client: HttpClient | AsyncHttpClient, if resp.code.is4xx or resp.code.is5xx: raise newException(HttpRequestError, resp.status) else: - return resp.body + return await resp.bodyStream.readAll() proc post*(client: HttpClient | AsyncHttpClient, url: string, body = "", - multipart: MultipartData = nil): Future[Response] {.multisync.} = + multipart: MultipartData = nil): Future[Response | AsyncResponse] + {.multisync.} = ## Connects to the hostname specified by the URL and performs a POST request. ## ## This procedure will follow redirects up to a maximum number of redirects ## specified in ``client.maxRedirects``. let (mpHeader, mpBody) = format(multipart) - + # TODO: Support FutureStream for `body` parameter. template withNewLine(x): expr = if x.len > 0 and not x.endsWith("\c\L"): x & "\c\L" @@ -1134,16 +1210,14 @@ proc post*(client: HttpClient | AsyncHttpClient, url: string, body = "", headers["Content-Type"] = mpHeader.split(": ")[1] headers["Content-Length"] = $len(xb) - result = await client.requestAux(url, $HttpPOST, xb, - headers = headers) + result = await client.requestAux(url, $HttpPOST, xb, headers) # Handle redirects. var lastURL = url for i in 1..client.maxRedirects: if result.status.redirection(): let redirectTo = getNewLocation(lastURL, result.headers) var meth = if result.status != "307": HttpGet else: HttpPost - result = await client.requestAux(redirectTo, $meth, xb, - headers = headers) + result = await client.requestAux(redirectTo, $meth, xb, headers) lastURL = redirectTo proc postContent*(client: HttpClient | AsyncHttpClient, url: string, @@ -1161,4 +1235,28 @@ proc postContent*(client: HttpClient | AsyncHttpClient, url: string, if resp.code.is4xx or resp.code.is5xx: raise newException(HttpRequestError, resp.status) else: - return resp.body + return await resp.bodyStream.readAll() + +proc downloadFile*(client: HttpClient | AsyncHttpClient, + url: string, filename: string): Future[void] {.multisync.} = + ## Downloads ``url`` and saves it to ``filename``. + client.getBody = false + let resp = await client.get(url) + + when client is HttpClient: + client.bodyStream = newFileStream(filename, fmWrite) + if client.bodyStream.isNil: + fileError("Unable to open file") + else: + var f = openAsync(filename, fmWrite) + client.bodyStream = f.getWriteStream() + + await parseBody(client, resp.headers, resp.version) + + when client is HttpClient: + client.bodyStream.close() + else: + f.close() + + if resp.code.is4xx or resp.code.is5xx: + raise newException(HttpRequestError, resp.status) \ No newline at end of file diff --git a/lib/pure/includes/asyncfutures.nim b/lib/pure/includes/asyncfutures.nim index c83228014..6f6693605 100644 --- a/lib/pure/includes/asyncfutures.nim +++ b/lib/pure/includes/asyncfutures.nim @@ -16,6 +16,10 @@ type FutureVar*[T] = distinct Future[T] + FutureStream*[T] = ref object of FutureBase ## Special future that acts as + ## a queue. + queue: Queue[T] + FutureError* = object of Exception cause*: FutureBase @@ -26,11 +30,7 @@ when not defined(release): proc callSoon*(cbproc: proc ()) {.gcsafe.} -proc newFuture*[T](fromProc: string = "unspecified"): Future[T] = - ## Creates a new future. - ## - ## Specifying ``fromProc``, which is a string specifying the name of the proc - ## that this future belongs to, is a good habit as it helps with debugging. +template setupFutureBase(fromProc: string): stmt = new(result) result.finished = false when not defined(release): @@ -39,6 +39,13 @@ proc newFuture*[T](fromProc: string = "unspecified"): Future[T] = result.fromProc = fromProc currentID.inc() +proc newFuture*[T](fromProc: string = "unspecified"): Future[T] = + ## Creates a new future. + ## + ## Specifying ``fromProc``, which is a string specifying the name of the proc + ## that this future belongs to, is a good habit as it helps with debugging. + setupFutureBase(fromProc) + proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] = ## Create a new ``FutureVar``. This Future type is ideally suited for ## situations where you want to avoid unnecessary allocations of Futures. @@ -47,6 +54,15 @@ proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] = ## that this future belongs to, is a good habit as it helps with debugging. result = FutureVar[T](newFuture[T](fromProc)) +proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] = + ## Create a new ``FutureStream``. This Future type's callback can be activated + ## multiple times when new data is written to it. + ## + ## Specifying ``fromProc``, which is a string specifying the name of the proc + ## that this future belongs to, is a good habit as it helps with debugging. + setupFutureBase(fromProc) + result.queue = initQueue[T]() + proc clean*[T](future: FutureVar[T]) = ## Resets the ``finished`` status of ``future``. Future[T](future).finished = false @@ -107,12 +123,18 @@ proc complete*[T](future: FutureVar[T], val: T) = ## Any previously stored value will be overwritten. template fut: untyped = Future[T](future) checkFinished(fut) - assert(fut.error == nil) + assert(fut.error.isNil()) fut.finished = true fut.value = val - if fut.cb != nil: + if not fut.cb.isNil(): fut.cb() +proc complete*[T](future: FutureStream[T]) = + ## Completes a ``FutureStream`` signifying the end of data. + future.finished = true + if not future.cb.isNil(): + future.cb() + proc fail*[T](future: Future[T], error: ref Exception) = ## Completes ``future`` with ``error``. #assert(not future.finished, "Future already finished, cannot finish twice.") @@ -149,6 +171,20 @@ proc `callback=`*[T](future: Future[T], ## If future has already completed then ``cb`` will be called immediately. future.callback = proc () = cb(future) +proc `callback=`*[T](future: FutureStream[T], + cb: proc (future: FutureStream[T]) {.closure,gcsafe.}) = + ## Sets the callback proc to be called when data was placed inside the + ## future stream. + ## + ## The callback is also called when the future is completed. So you should + ## use ``finished`` to check whether data is available. + ## + ## If the future stream already has data then ``cb`` will be called + ## immediately. + future.cb = proc () = cb(future) + if future.queue.len > 0 or future.finished: + callSoon(future.cb) + proc injectStacktrace[T](future: Future[T]) = # TODO: Come up with something better. when not defined(release): @@ -195,12 +231,17 @@ proc mget*[T](future: FutureVar[T]): var T = ## Future has not been finished. result = Future[T](future).value -proc finished*[T](future: Future[T] | FutureVar[T]): bool = +proc finished*[T](future: Future[T] | FutureVar[T] | FutureStream[T]): bool = ## Determines whether ``future`` has completed. ## ## ``True`` may indicate an error or a value. Use ``failed`` to distinguish. + ## + ## For a ``FutureStream`` this signifies that no more data will be placed + ## inside it and that there is no data waiting to be retrieved. when future is FutureVar[T]: result = (Future[T](future)).finished + elif future is FutureStream[T]: + result = future.finished and future.queue.len == 0 else: result = future.finished @@ -208,6 +249,56 @@ proc failed*(future: FutureBase): bool = ## Determines whether ``future`` completed with an error. return future.error != nil +proc put*[T](future: FutureStream[T], value: T): Future[void] = + ## Writes the specified value inside the specified future stream. + ## + ## This will raise ``ValueError`` if ``future`` is finished. + result = newFuture[void]("FutureStream.put") + if future.finished: + let msg = "FutureStream is finished and so no longer accepts new data." + result.fail(newException(ValueError, msg)) + return + # TODO: Buffering. + future.queue.enqueue(value) + if not future.cb.isNil: future.cb() + result.complete() + +proc take*[T](future: FutureStream[T]): Future[(bool, T)] = + ## Returns a future that will complete when the ``FutureStream`` has data + ## placed into it. The future will be completed with the oldest + ## value stored inside the stream. The return value will also determine + ## whether data was retrieved, ``false`` means that the future stream was + ## completed and no data was retrieved. + ## + ## This function will remove the data that was returned from the underlying + ## ``FutureStream``. + var resFut = newFuture[(bool, T)]("FutureStream.take") + let savedCb = future.cb + future.callback = + proc (fs: FutureStream[T]) = + # We don't want this callback called again. + future.cb = nil + + # The return value depends on whether the FutureStream has finished. + var res: (bool, T) + if finished(fs): + # Remember, this callback is called when the FutureStream is completed. + res[0] = false + else: + res[0] = true + res[1] = fs.queue.dequeue() + + if not resFut.finished: + resFut.complete(res) + + # If the saved callback isn't nil then let's call it. + if not savedCb.isNil: savedCb() + return resFut + +proc len*[T](future: FutureStream[T]): int = + ## Returns the amount of data pieces inside the stream. + future.queue.len + proc asyncCheck*[T](future: Future[T]) = ## Sets a callback on ``future`` which raises an exception if the future ## finished with an error. |