summary refs log tree commit diff stats
path: root/lib/pure/asyncdispatch.nim
diff options
context:
space:
mode:
authorcheatfate <ka@hardcore.kiev.ua>2016-05-18 00:53:53 +0300
committercheatfate <ka@hardcore.kiev.ua>2016-05-18 00:53:53 +0300
commitaba60e54d58a0425ae707efef4e3dfcf1a54064d (patch)
tree999699e9fb1120e0480bc0dcda1abc5cc75d7c87 /lib/pure/asyncdispatch.nim
parentf4f7edf00f19e5abcd4f3a4631261e109d6b8c4d (diff)
downloadNim-aba60e54d58a0425ae707efef4e3dfcf1a54064d.tar.gz
Resolve bugs with `deep recursion` of asyncdispatch.
Introduce callSoon() implementation.
Patch tests to use waitFor() instead of asyncCheck()
Diffstat (limited to 'lib/pure/asyncdispatch.nim')
-rw-r--r--lib/pure/asyncdispatch.nim27
1 files changed, 24 insertions, 3 deletions
diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim
index 8345d43e5..139492916 100644
--- a/lib/pure/asyncdispatch.nim
+++ b/lib/pure/asyncdispatch.nim
@@ -11,7 +11,7 @@ include "system/inclrtl"
 
 import os, oids, tables, strutils, macros, times, heapqueue
 
-import nativesockets, net
+import nativesockets, net, queues
 
 export Port, SocketFlag
 
@@ -155,6 +155,9 @@ type
 
 when not defined(release):
   var currentID = 0
+
+proc callSoon*(cbproc: proc ()) {.gcsafe.}
+
 proc newFuture*[T](fromProc: string = "unspecified"): Future[T] =
   ## Creates a new future.
   ##
@@ -257,7 +260,7 @@ proc `callback=`*(future: FutureBase, cb: proc () {.closure,gcsafe.}) =
   ## passes ``future`` as a param to the callback.
   future.cb = cb
   if future.finished:
-    future.cb()
+    callSoon(future.cb)
 
 proc `callback=`*[T](future: Future[T],
     cb: proc (future: Future[T]) {.closure,gcsafe.}) =
@@ -355,11 +358,17 @@ proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
 type
   PDispatcherBase = ref object of RootRef
     timers: HeapQueue[tuple[finishAt: float, fut: Future[void]]]
+    callbacks: Queue[proc ()]
 
 proc processTimers(p: PDispatcherBase) {.inline.} =
   while p.timers.len > 0 and epochTime() >= p.timers[0].finishAt:
     p.timers.pop().fut.complete()
 
+proc processPendingCallbacks(p: PDispatcherBase) =
+  while p.callbacks.len > 0:
+    var cb = p.callbacks.dequeue()
+    cb()
+
 proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} =
   # If dispatcher has active timers this proc returns the timeout
   # of the nearest timer. Returns `timeout` otherwise.
@@ -403,6 +412,7 @@ when defined(windows) or defined(nimdoc):
     result.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1)
     result.handles = initSet[AsyncFD]()
     result.timers.newHeapQueue()
+    result.callbacks = initQueue[proc ()](64)
 
   var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
   proc getGlobalDispatcher*(): PDispatcher =
@@ -429,7 +439,7 @@ when defined(windows) or defined(nimdoc):
   proc poll*(timeout = 500) =
     ## Waits for completion events and processes them.
     let p = getGlobalDispatcher()
-    if p.handles.len == 0 and p.timers.len == 0:
+    if p.handles.len == 0 and p.timers.len == 0 and p.callbacks.len == 0:
       raise newException(ValueError,
         "No handles or timers registered in dispatcher.")
 
@@ -469,6 +479,8 @@ when defined(windows) or defined(nimdoc):
 
     # Timer processing.
     processTimers(p)
+    # Callback queue processing
+    processPendingCallbacks(p)
 
   var connectExPtr: pointer = nil
   var acceptExPtr: pointer = nil
@@ -930,6 +942,7 @@ else:
     new result
     result.selector = newSelector()
     result.timers.newHeapQueue()
+    result.callbacks = initQueue[proc ()](64)
 
   var gDisp{.threadvar.}: PDispatcher ## Global dispatcher
   proc getGlobalDispatcher*(): PDispatcher =
@@ -1025,7 +1038,10 @@ else:
         # (e.g. socket disconnected).
         discard
 
+    # Timer processing.
     processTimers(p)
+    # Callback queue processing
+    processPendingCallbacks(p)
 
   proc connect*(socket: AsyncFD, address: string, port: Port,
     domain = AF_INET): Future[void] =
@@ -1604,6 +1620,11 @@ proc recvLine*(socket: AsyncFD): Future[string] {.async.} =
       return
     add(result, c)
 
+proc callSoon*(cbproc: proc ()) = 
+  ## Schedule `cbproc` to be called as soon as possible.
+  ## The callback is called when control returns to the event loop.
+  getGlobalDispatcher().callbacks.enqueue(cbproc)
+
 proc runForever*() =
   ## Begins a never ending global dispatcher poll loop.
   while true: