diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/pure/asyncdispatch.nim | 13 | ||||
-rw-r--r-- | lib/pure/asyncfile.nim | 13 | ||||
-rw-r--r-- | lib/pure/collections/deques.nim | 2 | ||||
-rw-r--r-- | lib/pure/httpclient.nim | 34 | ||||
-rw-r--r-- | lib/pure/includes/asyncfutures.nim | 43 | ||||
-rw-r--r-- | lib/upcoming/asyncdispatch.nim | 11 |
6 files changed, 70 insertions, 46 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index 58113ae69..7fa686f00 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -9,7 +9,7 @@ include "system/inclrtl" -import os, oids, tables, strutils, times, heapqueue, queues +import os, oids, tables, strutils, times, heapqueue import nativesockets, net, deques @@ -1387,6 +1387,17 @@ proc send*(socket: AsyncFD, data: string, # -- Await Macro include asyncmacro +proc readAll*(future: FutureStream[string]): Future[string] {.async.} = + ## Returns a future that will complete when all the string data from the + ## specified future stream is retrieved. + result = "" + while true: + let (hasValue, value) = await future.read() + if hasValue: + result.add(value) + else: + break + proc recvLine*(socket: AsyncFD): Future[string] {.async, deprecated.} = ## Reads a line of data from ``socket``. Returned future will complete once ## a full line is read or an error occurs. diff --git a/lib/pure/asyncfile.nim b/lib/pure/asyncfile.nim index adfe6edba..488b8276e 100644 --- a/lib/pure/asyncfile.nim +++ b/lib/pure/asyncfile.nim @@ -476,13 +476,16 @@ proc close*(f: AsyncFile) = if close(f.fd.cint) == -1: raiseOSError(osLastError()) -proc writeFromStream(f: AsyncFile, fut: FutureStream[string]) {.async.} = +proc writeFromStream*(f: AsyncFile, fs: FutureStream[string]) {.async.} = + ## Reads data from the specified future stream until it is completed. + ## The data which is read is written to the file immediately and + ## freed from memory. + ## + ## This procedure is perfect for saving streamed data to a file without + ## wasting memory. while true: - let (hasValue, value) = await fut.take() + let (hasValue, value) = await fs.read() if hasValue: await f.write(value) else: break - -proc setWriteStream*(f: AsyncFile; fut: FutureStream[string]) {.async.} = - await writeFromStream(f, fut) diff --git a/lib/pure/collections/deques.nim b/lib/pure/collections/deques.nim index 495d7896c..d42679f06 100644 --- a/lib/pure/collections/deques.nim +++ b/lib/pure/collections/deques.nim @@ -129,7 +129,7 @@ proc expandIfNeeded[T](deq: var Deque[T]) = var cap = deq.mask + 1 if unlikely(deq.count >= cap): var n = newSeq[T](cap * 2) - for i, x in deq: # don't use copyMem because the GC and because it's slower. + for i, x in pairs(deq): # don't use copyMem because the GC and because it's slower. shallowCopy(n[i], x) shallowCopy(deq.data, n) deq.mask = cap * 2 - 1 diff --git a/lib/pure/httpclient.nim b/lib/pure/httpclient.nim index 84f66b6dc..4d8400af6 100644 --- a/lib/pure/httpclient.nim +++ b/lib/pure/httpclient.nim @@ -131,7 +131,7 @@ type version*: string status*: string headers*: HttpHeaders - body: string # TODO: here for compatibility with old httpclient procs. + body: string bodyStream*: Stream AsyncResponse* = ref object @@ -163,19 +163,6 @@ proc `body=`*(response: Response, value: string) {.deprecated.} = ## **This is deprecated and should not be used**. response.body = value -proc readAll*(future: FutureStream[string]): Future[string] {.async.} = - ## Returns a future that will complete when all the string data from the - ## specified future stream is retrieved. - - # TODO: Move this to asyncfutures. - result = "" - while true: - let (hasValue, value) = await future.take() - if hasValue: - result.add(value) - else: - break - proc body*(response: AsyncResponse): Future[string] {.async.} = ## Reads the response's body and caches it. The read is performed only ## once. @@ -650,7 +637,7 @@ proc post*(url: string, extraHeaders = "", body = "", ## **Deprecated since version 0.15.0**: use ``HttpClient.post`` instead. let (mpHeaders, mpBody) = format(multipart) - template withNewLine(x): expr = + template withNewLine(x): untyped = if x.len > 0 and not x.endsWith("\c\L"): x & "\c\L" else: @@ -891,10 +878,7 @@ proc recvFull(client: HttpClient | AsyncHttpClient, size: int, timeout: int, readLen.inc(data.len) if keep: - when client.socket is Socket: - client.bodyStream.write(data) - else: - await client.bodyStream.put(data) + await client.bodyStream.write(data) await reportProgress(client, data.len) @@ -1253,11 +1237,15 @@ proc downloadFile*(client: HttpClient | AsyncHttpClient, parseBody(client, resp.headers, resp.version) client.bodyStream.close() else: - client.bodyStream = newFutureStream[string]() - var f = openAsync(filename, fmWrite) + client.bodyStream = newFutureStream[string]("downloadFile") + var file = openAsync(filename, fmWrite) + # Let `parseBody` write response data into client.bodyStream in the + # background. asyncCheck parseBody(client, resp.headers, resp.version) - await f.setWriteStream(client.bodyStream) - f.close() + # The `writeFromStream` proc will complete once all the data in the + # `bodyStream` has been written to the file. + await file.writeFromStream(client.bodyStream) + file.close() if resp.code.is4xx or resp.code.is5xx: raise newException(HttpRequestError, resp.status) diff --git a/lib/pure/includes/asyncfutures.nim b/lib/pure/includes/asyncfutures.nim index 6f6693605..a597de5cf 100644 --- a/lib/pure/includes/asyncfutures.nim +++ b/lib/pure/includes/asyncfutures.nim @@ -17,8 +17,10 @@ type FutureVar*[T] = distinct Future[T] FutureStream*[T] = ref object of FutureBase ## Special future that acts as - ## a queue. - queue: Queue[T] + ## a queue. Its API is still + ## experimental and so is + ## subject to change. + queue: Deque[T] FutureError* = object of Exception cause*: FutureBase @@ -30,7 +32,7 @@ when not defined(release): proc callSoon*(cbproc: proc ()) {.gcsafe.} -template setupFutureBase(fromProc: string): stmt = +template setupFutureBase(fromProc: string) = new(result) result.finished = false when not defined(release): @@ -55,13 +57,20 @@ proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] = result = FutureVar[T](newFuture[T](fromProc)) proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] = - ## Create a new ``FutureStream``. This Future type's callback can be activated - ## multiple times when new data is written to it. + ## Create a new ``FutureStream``. This future's callback is activated when + ## two events occur: + ## + ## * New data is written into the future stream. + ## * The future stream is completed (this means that no more data will be + ## written). ## ## Specifying ``fromProc``, which is a string specifying the name of the proc ## that this future belongs to, is a good habit as it helps with debugging. + ## + ## **Note:** The API of FutureStream is still new and so has a higher + ## likelihood of changing in the future. setupFutureBase(fromProc) - result.queue = initQueue[T]() + result.queue = initDeque[T]() proc clean*[T](future: FutureVar[T]) = ## Resets the ``finished`` status of ``future``. @@ -130,7 +139,7 @@ proc complete*[T](future: FutureVar[T], val: T) = fut.cb() proc complete*[T](future: FutureStream[T]) = - ## Completes a ``FutureStream`` signifying the end of data. + ## Completes a ``FutureStream`` signalling the end of data. future.finished = true if not future.cb.isNil(): future.cb() @@ -179,8 +188,8 @@ proc `callback=`*[T](future: FutureStream[T], ## The callback is also called when the future is completed. So you should ## use ``finished`` to check whether data is available. ## - ## If the future stream already has data then ``cb`` will be called - ## immediately. + ## If the future stream already has data or is finished then ``cb`` will be + ## called immediately. future.cb = proc () = cb(future) if future.queue.len > 0 or future.finished: callSoon(future.cb) @@ -236,8 +245,9 @@ proc finished*[T](future: Future[T] | FutureVar[T] | FutureStream[T]): bool = ## ## ``True`` may indicate an error or a value. Use ``failed`` to distinguish. ## - ## For a ``FutureStream`` this signifies that no more data will be placed - ## inside it and that there is no data waiting to be retrieved. + ## For a ``FutureStream`` a ``true`` value means that no more data will be + ## placed inside the stream _and_ that there is no data waiting to be + ## retrieved. when future is FutureVar[T]: result = (Future[T](future)).finished elif future is FutureStream[T]: @@ -249,7 +259,7 @@ proc failed*(future: FutureBase): bool = ## Determines whether ``future`` completed with an error. return future.error != nil -proc put*[T](future: FutureStream[T], value: T): Future[void] = +proc write*[T](future: FutureStream[T], value: T): Future[void] = ## Writes the specified value inside the specified future stream. ## ## This will raise ``ValueError`` if ``future`` is finished. @@ -258,12 +268,13 @@ proc put*[T](future: FutureStream[T], value: T): Future[void] = let msg = "FutureStream is finished and so no longer accepts new data." result.fail(newException(ValueError, msg)) return - # TODO: Buffering. - future.queue.enqueue(value) + # TODO: Implement limiting of the streams storage to prevent it growing + # infinitely when no reads are occuring. + future.queue.addLast(value) if not future.cb.isNil: future.cb() result.complete() -proc take*[T](future: FutureStream[T]): Future[(bool, T)] = +proc read*[T](future: FutureStream[T]): Future[(bool, T)] = ## Returns a future that will complete when the ``FutureStream`` has data ## placed into it. The future will be completed with the oldest ## value stored inside the stream. The return value will also determine @@ -286,7 +297,7 @@ proc take*[T](future: FutureStream[T]): Future[(bool, T)] = res[0] = false else: res[0] = true - res[1] = fs.queue.dequeue() + res[1] = fs.queue.popLast() if not resFut.finished: resFut.complete(res) diff --git a/lib/upcoming/asyncdispatch.nim b/lib/upcoming/asyncdispatch.nim index bdc04fdeb..823b19138 100644 --- a/lib/upcoming/asyncdispatch.nim +++ b/lib/upcoming/asyncdispatch.nim @@ -1664,6 +1664,17 @@ proc accept*(socket: AsyncFD, # -- Await Macro include asyncmacro +proc readAll*(future: FutureStream[string]): Future[string] {.async.} = + ## Returns a future that will complete when all the string data from the + ## specified future stream is retrieved. + result = "" + while true: + let (hasValue, value) = await future.take() + if hasValue: + result.add(value) + else: + break + proc recvLine*(socket: AsyncFD): Future[string] {.async.} = ## Reads a line of data from ``socket``. Returned future will complete once ## a full line is read or an error occurs. |