summary refs log tree commit diff stats
path: root/lib/pure/asyncstreams.nim
blob: 7ffde9c1064785c949ad5c29f9e8ebcda482e88f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
pre { line-height: 125%; }
td.linenos .normal { color: inherit; background-color: transparent; padding-left: 5px; padding-right: 5px; }
span.linenos { color: inherit; background-color: transparent; padding-left: 5px; padding-right: 5px; }
td.linenos .special { color: #000000; background-color: #ffffc0; padding-left: 5px; padding-right: 5px; }
span.linenos.special { color: #000000; background-color: #ffffc0; padding-left: 5px; padding-right: 5px; }
.highlight .hll { background-color: #ffffcc }
.highlight .c { color: #888888 } /* Comment */
.highlight .err { color: #a61717; background-color: #e3d2d2 } /* Error */
.highlight .k { color: #008800; font-weight: bold } /* Keyword */
.highlight .ch { color: #888888 } /* Comment.Hashbang */
.highlight .cm { color: #888888 } /* Comment.Multiline */
.highlight .cp { color: #cc0000; font-weight: bold } /* Comment.Preproc */
.highlight .cpf { color: #888888 } /* Comment.PreprocFile */
.highlight .c1 { color: #888888 } /* Comment.Single */
.highlight .cs { color: #cc0000; font-weight: bold; background-color: #fff0f0 } /* Comment.Special */
.h
#
#
#            Nim's Runtime Library
#        (c) Copyright 2015 Dominik Picheta
#
#    See the file "copying.txt", included in this
#    distribution, for details about the copyright.
#

## Unstable API.

import asyncfutures

import deques

type
  FutureStream*[T] = ref object ## Special future that acts as
                                ## a queue. Its API is still
                                ## experimental and so is
                                ## subject to change.
    queue: Deque[T]
    finished: bool
    cb: proc () {.closure, gcsafe.}

proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] =
  ## Create a new ``FutureStream``. This future's callback is activated when
  ## two events occur:
  ##
  ## * New data is written into the future stream.
  ## * The future stream is completed (this means that no more data will be
  ##   written).
  ##
  ## Specifying ``fromProc``, which is a string specifying the name of the proc
  ## that this future belongs to, is a good habit as it helps with debugging.
  ##
  ## **Note:** The API of FutureStream is still new and so has a higher
  ## likelihood of changing in the future.
  result = FutureStream[T](finished: false, cb: nil)
  result.queue = initDeque[T]()

proc complete*[T](future: FutureStream[T]) =
  ## Completes a ``FutureStream`` signalling the end of data.
  future.finished = true
  if not future.cb.isNil:
    future.cb()

proc `callback=`*[T](future: FutureStream[T],
    cb: proc (future: FutureStream[T]) {.closure, gcsafe.}) =
  ## Sets the callback proc to be called when data was placed inside the
  ## future stream.
  ##
  ## The callback is also called when the future is completed. So you should
  ## use ``finished`` to check whether data is available.
  ##
  ## If the future stream already has data or is finished then ``cb`` will be
  ## called immediately.
  proc named() = cb(future)
  future.cb = named
  if future.queue.len > 0 or future.finished:
    callSoon(future.cb)

proc finished*[T](future: FutureStream[T]): bool =
  ## Check if a ``FutureStream`` is finished. ``true`` value means that
  ## no more data will be placed inside the stream *and* that there is
  ## no data waiting to be retrieved.
  result = future.finished and future.queue.len == 0

proc write*[T](future: FutureStream[T], value: T): Future[void] =
  ## Writes the specified value inside the specified future stream.
  ##
  ## This will raise ``ValueError`` if ``future`` is finished.
  result = newFuture[void]("FutureStream.put")
  if future.finished:
    let msg = "FutureStream is finished and so no longer accepts new data."
    result.fail(newException(ValueError, msg))
    return
  # TODO: Implement limiting of the streams storage to prevent it growing
  # infinitely when no reads are occurring.
  future.queue.addLast(value)
  if not future.cb.isNil: future.cb()
  result.complete()

proc read*[T](future: FutureStream[T]): owned(Future[(bool, T)]) =
  ## Returns a future that will complete when the ``FutureStream`` has data
  ## placed into it. The future will be completed with the oldest
  ## value stored inside the stream. The return value will also determine
  ## whether data was retrieved, ``false`` means that the future stream was
  ## completed and no data was retrieved.
  ##
  ## This function will remove the data that was returned from the underlying
  ## ``FutureStream``.
  var resFut = newFuture[(bool, T)]("FutureStream.take")
  let savedCb = future.cb
  proc newCb(fs: FutureStream[T]) =
    # Exit early if `resFut` is already complete. (See #8994).
    if resFut.finished: return

    # We don't want this callback called again.
    #future.cb = nil

    # The return value depends on whether the FutureStream has finished.
    var res: (bool, T)
    if finished(fs):
      # Remember, this callback is called when the FutureStream is completed.
      res[0] = false
    else:
      res[0] = true
      res[1] = fs.queue.popFirst()

    resFut.complete(res)

    # If the saved callback isn't nil then let's call it.
    if not savedCb.isNil: savedCb()

  if future.queue.len > 0 or future.finished:
    newCb(future)
  else:
    future.callback = newCb
  return resFut

proc len*[T](future: FutureStream[T]): int =
  ## Returns the amount of data pieces inside the stream.
  future.queue.len