summary refs log tree commit diff stats
diff options
context:
space:
mode:
authorcheatfate <ka@hardcore.kiev.ua>2016-05-18 00:53:53 +0300
committercheatfate <ka@hardcore.kiev.ua>2016-05-18 00:53:53 +0300
commitaba60e54d58a0425ae707efef4e3dfcf1a54064d (patch)
tree999699e9fb1120e0480bc0dcda1abc5cc75d7c87
parentf4f7edf00f19e5abcd4f3a4631261e109d6b8c4d (diff)
downloadNim-aba60e54d58a0425ae707efef4e3dfcf1a54064d.tar.gz
Resolve bugs with `deep recursion` of asyncdispatch.
Introduce callSoon() implementation.
Patch tests to use waitFor() instead of asyncCheck()
-rw-r--r--lib/pure/asyncdispatch.nim27
-rw-r--r--tests/async/tasyncdiscard.nim2
-rw-r--r--tests/async/tasynctry.nim10
-rw-r--r--tests/ccgbugs/twrong_string_asgn.nim2
4 files changed, 31 insertions, 10 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim
index 8345d43e5..139492916 100644
--- a/lib/pure/asyncdispatch.nim
+++ b/lib/pure/asyncdispatch.nim
@@ -11,7 +11,7 @@ include "system/inclrtl"
 
 import os, oids, tables, strutils, macros, times, heapqueue
 
-import nativesockets, net
+import nativesockets, net, queues
 
 export Port, SocketFlag
 
@@ -155,6 +155,9 @@ type
 
 when not defined(release):
   var currentID = 0
+
+proc callSoon*(cbproc: proc ()) {.gcsafe.}
+
 proc newFuture*[T](fromProc: string = "unspecified"): Future[T] =
   ## Creates a new future.
   ##
@@ -257,7 +260,7 @@ proc `callback=`*(future: FutureBase, cb: proc () {.closure,gcsafe.}) =
   ## passes ``future`` as a param to the callback.
   future.cb = cb
   if future.finished:
-    future.cb()
+    callSoon(future.cb)
 
 proc `callback=`*[T](future: Future[T],
     cb: proc (future: Future[T]) {.closure,gcsafe.}) =
@@ -355,11 +358,17 @@ proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
 type
   PDispatcherBase = ref object of RootRef
     timers: HeapQueue[tuple[finishAt: float, fut: Future[void]]]
+    callbacks: Queue[proc ()]
 
 proc processTimers(p: PDispatcherBase) {.inline.} =
   while p.timers.len > 0 and epochTime() >= p.timers[0].finishAt:
     p.timers.pop().fut.complete()
 
+proc processPendingCallbacks(p: PDispatcherBase) =
+  while p.callbacks.len > 0:
+    var cb = p.callbacks.dequeue()
+    cb()
+
 proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} =
   # If dispatcher has active timers this proc returns the timeout
   # of the nearest timer. Returns `timeout` otherwise.
@@ -403,6 +412,7 @@ when defined(windows) or defined(nimdoc):
     result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
     result.handles = initSet[AsyncFD]()
     result.timers.newHeapQueue()
+    result.callbacks = initQueue[proc ()](64)
 
   var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
   proc getGlobalDispatcher*(): PDispatcher =
@@ -429,7 +439,7 @@ when defined(windows) or defined(nimdoc):
   proc poll*(timeout = 500) =
     ## Waits for completion events and processes them.
     let p = getGlobalDispatcher()
-    if p.handles.len == 0 and p.timers.len == 0:
+    if p.handles.len == 0 and p.timers.len == 0 and p.callbacks.len == 0:
       raise newException(ValueError,
         "No handles or timers registered in dispatcher.")
 
@@ -469,6 +479,8 @@ when defined(windows) or defined(nimdoc):
 
     # Timer processing.
     processTimers(p)
+    # Callback queue processing
+    processPendingCallbacks(p)
 
   var connectExPtr: pointer = nil
   var acceptExPtr: pointer = nil
@@ -930,6 +942,7 @@ else:
     new result
     result.selector = newSelector()
     result.timers.newHeapQueue()
+    result.callbacks = initQueue[proc ()](64)
 
   var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
   proc getGlobalDispatcher*(): PDispatcher =
@@ -1025,7 +1038,10 @@ else:
         # (e.g. socket disconnected).
         discard
 
+    # Timer processing.
     processTimers(p)
+    # Callback queue processing
+    processPendingCallbacks(p)
 
   proc connect*(socket: AsyncFD, address: string, port: Port,
     domain = AF_INET): Future[void] =
@@ -1604,6 +1620,11 @@ proc recvLine*(socket: AsyncFD): Future[string] {.async.} =
       return
     add(result, c)
 
+proc callSoon*(cbproc: proc ()) = 
+  ## Schedule `cbproc` to be called as soon as possible.
+  ## The callback is called when control returns to the event loop.
+  getGlobalDispatcher().callbacks.enqueue(cbproc)
+
 proc runForever*() =
   ## Begins a never ending global dispatcher poll loop.
   while true:
diff --git a/tests/async/tasyncdiscard.nim b/tests/async/tasyncdiscard.nim
index 71aba29e2..e7c87ad42 100644
--- a/tests/async/tasyncdiscard.nim
+++ b/tests/async/tasyncdiscard.nim
@@ -36,4 +36,4 @@ proc main {.async.} =
   discard await g()
   echo 6
 
-asyncCheck main()
+waitFor(main())
diff --git a/tests/async/tasynctry.nim b/tests/async/tasynctry.nim
index f77198e2e..5930f296f 100644
--- a/tests/async/tasynctry.nim
+++ b/tests/async/tasynctry.nim
@@ -48,7 +48,7 @@ proc catch() {.async.} =
   except OSError, EInvalidField:
     assert false
 
-asyncCheck catch()
+waitFor catch()
 
 proc test(): Future[bool] {.async.} =
   result = false
@@ -92,13 +92,13 @@ proc test4(): Future[int] {.async.} =
     result = 2
 
 var x = test()
-assert x.read
+assert x.waitFor()
 
 x = test2()
-assert x.read
+assert x.waitFor()
 
 var y = test3()
-assert y.read == 2
+assert y.waitFor() == 2
 
 y = test4()
-assert y.read == 2
+assert y.waitFor() == 2
diff --git a/tests/ccgbugs/twrong_string_asgn.nim b/tests/ccgbugs/twrong_string_asgn.nim
index b62e70e7c..669b7f8f5 100644
--- a/tests/ccgbugs/twrong_string_asgn.nim
+++ b/tests/ccgbugs/twrong_string_asgn.nim
@@ -16,4 +16,4 @@ x.callback =
   proc () =
     finished = true
 
-while not finished: discard
+while not finished: poll()