summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--lib/pure/asyncdispatch.nim34
-rw-r--r--lib/pure/asyncfutures.nim (renamed from lib/pure/includes/asyncfutures.nim)211
-rw-r--r--lib/pure/asyncstreams.nim105
-rw-r--r--lib/upcoming/asyncdispatch.nim33
-rw-r--r--tests/async/tasyncrecursion.nim1
-rw-r--r--tests/async/tcallbacks.nim20
6 files changed, 259 insertions, 145 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim
index 8c1cf6b18..28b20feaa 100644
--- a/lib/pure/asyncdispatch.nim
+++ b/lib/pure/asyncdispatch.nim
@@ -9,11 +9,12 @@
 
 include "system/inclrtl"
 
-import os, tables, strutils, times, heapqueue, options
-
+import os, tables, strutils, times, heapqueue, options, asyncstreams
+import asyncfutures except callSoon
 import nativesockets, net, deques
 
 export Port, SocketFlag
+export asyncfutures, asyncstreams
 
 #{.injectStmt: newGcInvariant().}
 
@@ -159,8 +160,6 @@ export Port, SocketFlag
 
 # TODO: Check if yielded future is nil and throw a more meaningful exception
 
-include includes/asyncfutures
-
 type
   PDispatcherBase = ref object of RootRef
     timers*: HeapQueue[tuple[finishAt: float, fut: Future[void]]]
@@ -190,6 +189,12 @@ proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} =
       result = int((timerTimeout - curTime) * 1000)
       if result < 0: result = 0
 
+proc callSoon(cbproc: proc ()) {.gcsafe.}
+
+proc initCallSoonProc =
+  if asyncfutures.getCallSoonProc().isNil:
+    asyncfutures.setCallSoonProc(callSoon)
+
 when defined(windows) or defined(nimdoc):
   import winlean, sets, hashes
   type
@@ -237,15 +242,17 @@ when defined(windows) or defined(nimdoc):
     result.callbacks = initDeque[proc ()](64)
 
   var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
-  proc getGlobalDispatcher*(): PDispatcher =
-    ## Retrieves the global thread-local dispatcher.
-    if gDisp.isNil: gDisp = newDispatcher()
-    result = gDisp
 
   proc setGlobalDispatcher*(disp: PDispatcher) =
     if not gDisp.isNil:
       assert gDisp.callbacks.len == 0
     gDisp = disp
+    initCallSoonProc()
+
+  proc getGlobalDispatcher*(): PDispatcher =
+    if gDisp.isNil:
+      setGlobalDispatcher(newDispatcher())
+    result = gDisp
 
   proc register*(fd: AsyncFD) =
     ## Registers ``fd`` with the dispatcher.
@@ -932,14 +939,17 @@ else:
     result.callbacks = initDeque[proc ()](64)
 
   var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
-  proc getGlobalDispatcher*(): PDispatcher =
-    if gDisp.isNil: gDisp = newDispatcher()
-    result = gDisp
 
   proc setGlobalDispatcher*(disp: PDispatcher) =
     if not gDisp.isNil:
       assert gDisp.callbacks.len == 0
     gDisp = disp
+    initCallSoonProc()
+
+  proc getGlobalDispatcher*(): PDispatcher =
+    if gDisp.isNil:
+      setGlobalDispatcher(newDispatcher())
+    result = gDisp
 
   proc update(fd: AsyncFD, events: set[Event]) =
     let p = getGlobalDispatcher()
@@ -1327,7 +1337,7 @@ proc recvLine*(socket: AsyncFD): Future[string] {.async, deprecated.} =
       return
     add(result, c)
 
-proc callSoon*(cbproc: proc ()) =
+proc callSoon(cbproc: proc ()) =
   ## Schedule `cbproc` to be called as soon as possible.
   ## The callback is called when control returns to the event loop.
   getGlobalDispatcher().callbacks.addLast(cbproc)
diff --git a/lib/pure/includes/asyncfutures.nim b/lib/pure/asyncfutures.nim
index 6af5bf3cf..a9e97c14c 100644
--- a/lib/pure/includes/asyncfutures.nim
+++ b/lib/pure/asyncfutures.nim
@@ -1,8 +1,16 @@
+import os, tables, strutils, times, heapqueue, options, deques
 
 # TODO: This shouldn't need to be included, but should ideally be exported.
 type
+  CallbackFunc = proc () {.closure, gcsafe.}
+
+  CallbackList = object
+    function: CallbackFunc
+    next: ref CallbackList
+
   FutureBase* = ref object of RootObj ## Untyped future.
-    cb: proc () {.closure,gcsafe.}
+    callbacks: CallbackList
+
     finished: bool
     error*: ref Exception ## Stored exception
     errorStackTrace*: string
@@ -16,12 +24,6 @@ type
 
   FutureVar*[T] = distinct Future[T]
 
-  FutureStream*[T] = ref object of FutureBase   ## Special future that acts as
-                                                ## a queue. Its API is still
-                                                ## experimental and so is
-                                                ## subject to change.
-    queue: Deque[T]
-
   FutureError* = object of Exception
     cause*: FutureBase
 
@@ -30,7 +32,27 @@ type
 when not defined(release):
   var currentID = 0
 
-proc callSoon*(cbproc: proc ()) {.gcsafe.}
+var callSoonProc {.threadvar.}: proc (cbproc: proc ()) {.gcsafe.}
+
+proc getCallSoonProc*(): (proc(cbproc: proc ()) {.gcsafe.}) =
+  ## Get current implementation of ``callSoon``.
+  return callSoonProc
+
+proc setCallSoonProc*(p: (proc(cbproc: proc ()) {.gcsafe.})) =
+  ## Change current implementation of ``callSoon``. This is normally called when dispatcher from ``asyncdispatcher`` is initialized.
+  callSoonProc = p
+
+proc callSoon*(cbproc: proc ()) =
+  ## Call ``cbproc`` "soon".
+  ##
+  ## If async dispatcher is running, ``cbproc`` will be executed during next dispatcher tick.
+  ##
+  ## If async dispatcher is not running, ``cbproc`` will be executed immediately.
+  if callSoonProc.isNil:
+    # Loop not initialized yet. Call the function directly to allow setup code to use futures.
+    cbproc()
+  else:
+    callSoonProc(cbproc)
 
 template setupFutureBase(fromProc: string) =
   new(result)
@@ -56,22 +78,6 @@ proc newFutureVar*[T](fromProc = "unspecified"): FutureVar[T] =
   ## that this future belongs to, is a good habit as it helps with debugging.
   result = FutureVar[T](newFuture[T](fromProc))
 
-proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] =
-  ## Create a new ``FutureStream``. This future's callback is activated when
-  ## two events occur:
-  ##
-  ## * New data is written into the future stream.
-  ## * The future stream is completed (this means that no more data will be
-  ##   written).
-  ##
-  ## Specifying ``fromProc``, which is a string specifying the name of the proc
-  ## that this future belongs to, is a good habit as it helps with debugging.
-  ##
-  ## **Note:** The API of FutureStream is still new and so has a higher
-  ## likelihood of changing in the future.
-  setupFutureBase(fromProc)
-  result.queue = initDeque[T]()
-
 proc clean*[T](future: FutureVar[T]) =
   ## Resets the ``finished`` status of ``future``.
   Future[T](future).finished = false
@@ -98,6 +104,33 @@ proc checkFinished[T](future: Future[T]) =
       err.cause = future
       raise err
 
+proc call(callbacks: var CallbackList) =
+  var current = callbacks
+
+  while true:
+    if not current.function.isNil:
+      callSoon(current.function)
+
+    if current.next.isNil:
+      break
+    else:
+      current = current.next[]
+
+  # callback will be called only once, let GC collect them now
+  callbacks.next = nil
+  callbacks.function = nil
+
+proc add(callbacks: var CallbackList, function: CallbackFunc) =
+  if callbacks.function.isNil:
+    callbacks.function = function
+    assert callbacks.next == nil
+  else:
+    let newNext = new(ref CallbackList)
+    newNext.function = callbacks.function
+    newNext.next = callbacks.next
+    callbacks.next = newNext
+    callbacks.function = function
+
 proc complete*[T](future: Future[T], val: T) =
   ## Completes ``future`` with value ``val``.
   #assert(not future.finished, "Future already finished, cannot finish twice.")
@@ -105,8 +138,7 @@ proc complete*[T](future: Future[T], val: T) =
   assert(future.error == nil)
   future.value = val
   future.finished = true
-  if future.cb != nil:
-    future.cb()
+  future.callbacks.call()
 
 proc complete*(future: Future[void]) =
   ## Completes a void ``future``.
@@ -114,8 +146,7 @@ proc complete*(future: Future[void]) =
   checkFinished(future)
   assert(future.error == nil)
   future.finished = true
-  if future.cb != nil:
-    future.cb()
+  future.callbacks.call()
 
 proc complete*[T](future: FutureVar[T]) =
   ## Completes a ``FutureVar``.
@@ -123,8 +154,7 @@ proc complete*[T](future: FutureVar[T]) =
   checkFinished(fut)
   assert(fut.error == nil)
   fut.finished = true
-  if fut.cb != nil:
-    fut.cb()
+  fut.callbacks.call()
 
 proc complete*[T](future: FutureVar[T], val: T) =
   ## Completes a ``FutureVar`` with value ``val``.
@@ -138,12 +168,6 @@ proc complete*[T](future: FutureVar[T], val: T) =
   if not fut.cb.isNil():
     fut.cb()
 
-proc complete*[T](future: FutureStream[T]) =
-  ## Completes a ``FutureStream`` signalling the end of data.
-  future.finished = true
-  if not future.cb.isNil():
-    future.cb()
-
 proc fail*[T](future: Future[T], error: ref Exception) =
   ## Completes ``future`` with ``error``.
   #assert(not future.finished, "Future already finished, cannot finish twice.")
@@ -152,26 +176,40 @@ proc fail*[T](future: Future[T], error: ref Exception) =
   future.error = error
   future.errorStackTrace =
     if getStackTrace(error) == "": getStackTrace() else: getStackTrace(error)
-  if future.cb != nil:
-    future.cb()
+  future.callbacks.call()
+
+proc clearCallbacks(future: FutureBase) =
+  future.callbacks.function = nil
+  future.callbacks.next = nil
+
+proc addCallback*(future: FutureBase, cb: proc() {.closure,gcsafe.}) =
+  ## Adds the callbacks proc to be called when the future completes.
+  ##
+  ## If future has already completed then ``cb`` will be called immediately.
+  assert cb != nil
+  if future.finished:
+    callSoon(cb)
   else:
-    # This is to prevent exceptions from being silently ignored when a future
-    # is discarded.
-    # TODO: This may turn out to be a bad idea.
-    # Turns out this is a bad idea.
-    #raise error
-    discard
+    future.callbacks.add cb
+
+proc addCallback*[T](future: Future[T],
+                     cb: proc (future: Future[T]) {.closure,gcsafe.}) =
+  ## Adds the callbacks proc to be called when the future completes.
+  ##
+  ## If future has already completed then ``cb`` will be called immediately.
+  future.addCallback(
+    proc() =
+      cb(future)
+  )
 
 proc `callback=`*(future: FutureBase, cb: proc () {.closure,gcsafe.}) =
-  ## Sets the callback proc to be called when the future completes.
+  ## Clears the list of callbacks and sets the callback proc to be called when the future completes.
   ##
   ## If future has already completed then ``cb`` will be called immediately.
   ##
-  ## **Note**: You most likely want the other ``callback`` setter which
-  ## passes ``future`` as a param to the callback.
-  future.cb = cb
-  if future.finished:
-    callSoon(future.cb)
+  ## It's recommended to use ``addCallback`` or ``then`` instead.
+  future.clearCallbacks
+  future.addCallback cb
 
 proc `callback=`*[T](future: Future[T],
     cb: proc (future: Future[T]) {.closure,gcsafe.}) =
@@ -180,20 +218,6 @@ proc `callback=`*[T](future: Future[T],
   ## If future has already completed then ``cb`` will be called immediately.
   future.callback = proc () = cb(future)
 
-proc `callback=`*[T](future: FutureStream[T],
-    cb: proc (future: FutureStream[T]) {.closure,gcsafe.}) =
-  ## Sets the callback proc to be called when data was placed inside the
-  ## future stream.
-  ##
-  ## The callback is also called when the future is completed. So you should
-  ## use ``finished`` to check whether data is available.
-  ##
-  ## If the future stream already has data or is finished then ``cb`` will be
-  ## called immediately.
-  future.cb = proc () = cb(future)
-  if future.queue.len > 0 or future.finished:
-    callSoon(future.cb)
-
 proc injectStacktrace[T](future: Future[T]) =
   # TODO: Come up with something better.
   when not defined(release):
@@ -240,18 +264,12 @@ proc mget*[T](future: FutureVar[T]): var T =
   ## Future has not been finished.
   result = Future[T](future).value
 
-proc finished*[T](future: Future[T] | FutureVar[T] | FutureStream[T]): bool =
+proc finished*[T](future: Future[T] | FutureVar[T]): bool =
   ## Determines whether ``future`` has completed.
   ##
   ## ``True`` may indicate an error or a value. Use ``failed`` to distinguish.
-  ##
-  ## For a ``FutureStream`` a ``true`` value means that no more data will be
-  ## placed inside the stream _and_ that there is no data waiting to be
-  ## retrieved.
   when future is FutureVar[T]:
     result = (Future[T](future)).finished
-  elif future is FutureStream[T]:
-    result = future.finished and future.queue.len == 0
   else:
     result = future.finished
 
@@ -259,57 +277,6 @@ proc failed*(future: FutureBase): bool =
   ## Determines whether ``future`` completed with an error.
   return future.error != nil
 
-proc write*[T](future: FutureStream[T], value: T): Future[void] =
-  ## Writes the specified value inside the specified future stream.
-  ##
-  ## This will raise ``ValueError`` if ``future`` is finished.
-  result = newFuture[void]("FutureStream.put")
-  if future.finished:
-    let msg = "FutureStream is finished and so no longer accepts new data."
-    result.fail(newException(ValueError, msg))
-    return
-  # TODO: Implement limiting of the streams storage to prevent it growing
-  # infinitely when no reads are occuring.
-  future.queue.addLast(value)
-  if not future.cb.isNil: future.cb()
-  result.complete()
-
-proc read*[T](future: FutureStream[T]): Future[(bool, T)] =
-  ## Returns a future that will complete when the ``FutureStream`` has data
-  ## placed into it. The future will be completed with the oldest
-  ## value stored inside the stream. The return value will also determine
-  ## whether data was retrieved, ``false`` means that the future stream was
-  ## completed and no data was retrieved.
-  ##
-  ## This function will remove the data that was returned from the underlying
-  ## ``FutureStream``.
-  var resFut = newFuture[(bool, T)]("FutureStream.take")
-  let savedCb = future.cb
-  future.callback =
-    proc (fs: FutureStream[T]) =
-      # We don't want this callback called again.
-      future.cb = nil
-
-      # The return value depends on whether the FutureStream has finished.
-      var res: (bool, T)
-      if finished(fs):
-        # Remember, this callback is called when the FutureStream is completed.
-        res[0] = false
-      else:
-        res[0] = true
-        res[1] = fs.queue.popFirst()
-
-      if not resFut.finished:
-        resFut.complete(res)
-
-      # If the saved callback isn't nil then let's call it.
-      if not savedCb.isNil: savedCb()
-  return resFut
-
-proc len*[T](future: FutureStream[T]): int =
-  ## Returns the amount of data pieces inside the stream.
-  future.queue.len
-
 proc asyncCheck*[T](future: Future[T]) =
   ## Sets a callback on ``future`` which raises an exception if the future
   ## finished with an error.
diff --git a/lib/pure/asyncstreams.nim b/lib/pure/asyncstreams.nim
new file mode 100644
index 000000000..d3ea143f3
--- /dev/null
+++ b/lib/pure/asyncstreams.nim
@@ -0,0 +1,105 @@
+import asyncfutures
+
+import deques
+
+type
+  FutureStream*[T] = ref object   ## Special future that acts as
+                                  ## a queue. Its API is still
+                                  ## experimental and so is
+                                  ## subject to change.
+    queue: Deque[T]
+    finished: bool
+    cb: proc () {.closure, gcsafe.}
+
+proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] =
+  ## Create a new ``FutureStream``. This future's callback is activated when
+  ## two events occur:
+  ##
+  ## * New data is written into the future stream.
+  ## * The future stream is completed (this means that no more data will be
+  ##   written).
+  ##
+  ## Specifying ``fromProc``, which is a string specifying the name of the proc
+  ## that this future belongs to, is a good habit as it helps with debugging.
+  ##
+  ## **Note:** The API of FutureStream is still new and so has a higher
+  ## likelihood of changing in the future.
+  result = FutureStream[T](finished: false, cb: nil)
+  result.queue = initDeque[T]()
+
+proc complete*[T](future: FutureStream[T]) =
+  ## Completes a ``FutureStream`` signalling the end of data.
+  future.finished = true
+  if not future.cb.isNil:
+    future.cb()
+
+proc `callback=`*[T](future: FutureStream[T],
+    cb: proc (future: FutureStream[T]) {.closure,gcsafe.}) =
+  ## Sets the callback proc to be called when data was placed inside the
+  ## future stream.
+  ##
+  ## The callback is also called when the future is completed. So you should
+  ## use ``finished`` to check whether data is available.
+  ##
+  ## If the future stream already has data or is finished then ``cb`` will be
+  ## called immediately.
+  future.cb = proc () = cb(future)
+  if future.queue.len > 0 or future.finished:
+    callSoon(future.cb)
+
+proc finished*[T](future: FutureStream[T]): bool =
+  ## Check if a ``FutureStream`` is finished. ``true`` value means that
+  ## no more data will be placed inside the stream _and_ that there is
+  ## no data waiting to be retrieved.
+  result = future.finished and future.queue.len == 0
+
+proc write*[T](future: FutureStream[T], value: T): Future[void] =
+  ## Writes the specified value inside the specified future stream.
+  ##
+  ## This will raise ``ValueError`` if ``future`` is finished.
+  result = newFuture[void]("FutureStream.put")
+  if future.finished:
+    let msg = "FutureStream is finished and so no longer accepts new data."
+    result.fail(newException(ValueError, msg))
+    return
+  # TODO: Implement limiting of the streams storage to prevent it growing
+  # infinitely when no reads are occuring.
+  future.queue.addLast(value)
+  if not future.cb.isNil: future.cb()
+  result.complete()
+
+proc read*[T](future: FutureStream[T]): Future[(bool, T)] =
+  ## Returns a future that will complete when the ``FutureStream`` has data
+  ## placed into it. The future will be completed with the oldest
+  ## value stored inside the stream. The return value will also determine
+  ## whether data was retrieved, ``false`` means that the future stream was
+  ## completed and no data was retrieved.
+  ##
+  ## This function will remove the data that was returned from the underlying
+  ## ``FutureStream``.
+  var resFut = newFuture[(bool, T)]("FutureStream.take")
+  let savedCb = future.cb
+  future.callback =
+    proc (fs: FutureStream[T]) =
+      # We don't want this callback called again.
+      future.cb = nil
+
+      # The return value depends on whether the FutureStream has finished.
+      var res: (bool, T)
+      if finished(fs):
+        # Remember, this callback is called when the FutureStream is completed.
+        res[0] = false
+      else:
+        res[0] = true
+        res[1] = fs.queue.popFirst()
+
+      if not resFut.finished:
+        resFut.complete(res)
+
+      # If the saved callback isn't nil then let's call it.
+      if not savedCb.isNil: savedCb()
+  return resFut
+
+proc len*[T](future: FutureStream[T]): int =
+  ## Returns the amount of data pieces inside the stream.
+  future.queue.len
diff --git a/lib/upcoming/asyncdispatch.nim b/lib/upcoming/asyncdispatch.nim
index 1623d8375..4e3b06173 100644
--- a/lib/upcoming/asyncdispatch.nim
+++ b/lib/upcoming/asyncdispatch.nim
@@ -9,11 +9,13 @@
 
 include "system/inclrtl"
 
-import os, tables, strutils, times, heapqueue, lists, options
+import os, tables, strutils, times, heapqueue, lists, options, asyncstreams
+import asyncfutures except callSoon
 
 import nativesockets, net, deques
 
 export Port, SocketFlag
+export asyncfutures, asyncstreams
 
 #{.injectStmt: newGcInvariant().}
 
@@ -130,8 +132,6 @@ export Port, SocketFlag
 
 # TODO: Check if yielded future is nil and throw a more meaningful exception
 
-include "../includes/asyncfutures"
-
 type
   PDispatcherBase = ref object of RootRef
     timers: HeapQueue[tuple[finishAt: float, fut: Future[void]]]
@@ -161,6 +161,12 @@ proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} =
       result = int((timerTimeout - curTime) * 1000)
       if result < 0: result = 0
 
+proc callSoon(cbproc: proc ()) {.gcsafe.}
+
+proc initCallSoonProc =
+  if asyncfutures.getCallSoonProc().isNil:
+    asyncfutures.setCallSoonProc(callSoon)
+
 when defined(windows) or defined(nimdoc):
   import winlean, sets, hashes
   type
@@ -214,15 +220,17 @@ when defined(windows) or defined(nimdoc):
     result.callbacks = initDeque[proc ()](64)
 
   var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
-  proc getGlobalDispatcher*(): PDispatcher =
-    ## Retrieves the global thread-local dispatcher.
-    if gDisp.isNil: gDisp = newDispatcher()
-    result = gDisp
 
   proc setGlobalDispatcher*(disp: PDispatcher) =
     if not gDisp.isNil:
       assert gDisp.callbacks.len == 0
     gDisp = disp
+    initCallSoonProc()
+
+  proc getGlobalDispatcher*(): PDispatcher =
+    if gDisp.isNil:
+      setGlobalDispatcher(newDispatcher())
+    result = gDisp
 
   proc register*(fd: AsyncFD) =
     ## Registers ``fd`` with the dispatcher.
@@ -1081,14 +1089,17 @@ else:
     result.callbacks = initDeque[proc ()](InitDelayedCallbackListSize)
 
   var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
-  proc getGlobalDispatcher*(): PDispatcher =
-    if gDisp.isNil: gDisp = newDispatcher()
-    result = gDisp
 
   proc setGlobalDispatcher*(disp: PDispatcher) =
     if not gDisp.isNil:
       assert gDisp.callbacks.len == 0
     gDisp = disp
+    initCallSoonProc()
+
+  proc getGlobalDispatcher*(): PDispatcher =
+    if gDisp.isNil:
+      setGlobalDispatcher(newDispatcher())
+    result = gDisp
 
   proc register*(fd: AsyncFD) =
     let p = getGlobalDispatcher()
@@ -1601,7 +1612,7 @@ proc recvLine*(socket: AsyncFD): Future[string] {.async.} =
       return
     add(result, c)
 
-proc callSoon*(cbproc: proc ()) =
+proc callSoon(cbproc: proc ()) =
   ## Schedule `cbproc` to be called as soon as possible.
   ## The callback is called when control returns to the event loop.
   getGlobalDispatcher().callbacks.addLast(cbproc)
diff --git a/tests/async/tasyncrecursion.nim b/tests/async/tasyncrecursion.nim
index 54482edab..1aeebe9b4 100644
--- a/tests/async/tasyncrecursion.nim
+++ b/tests/async/tasyncrecursion.nim
@@ -17,5 +17,6 @@ proc asyncRecursionTest*(): Future[int] {.async.} =
     inc(i)
 
 when isMainModule:
+  setGlobalDispatcher(newDispatcher())
   var i = waitFor asyncRecursionTest()
   echo i
diff --git a/tests/async/tcallbacks.nim b/tests/async/tcallbacks.nim
new file mode 100644
index 000000000..8c08032cd
--- /dev/null
+++ b/tests/async/tcallbacks.nim
@@ -0,0 +1,20 @@
+discard """
+  exitcode: 0
+  output: '''3
+2
+1
+5
+'''
+"""
+import asyncfutures
+
+let f1: Future[int] = newFuture[int]()
+f1.addCallback(proc() = echo 1)
+f1.addCallback(proc() = echo 2)
+f1.addCallback(proc() = echo 3)
+f1.complete(10)
+
+let f2: Future[int] = newFuture[int]()
+f2.addCallback(proc() = echo 4)
+f2.callback = proc() = echo 5
+f2.complete(10)