summary refs log tree commit diff stats
path: root/lib
diff options
context:
space:
mode:
authorRokas Kupstys <rokups@zoho.com>2017-03-02 15:53:50 +0200
committerAndreas Rumpf <rumpf_a@web.de>2017-03-02 14:53:50 +0100
commitcd2721242ad8faf7013911ac57bfcfff62578a2f (patch)
tree6d2473c58fa857e14c66ba3dc5b5351ca84debb7 /lib
parent34a3d40d18ef4ff73c629e38738068fe509e3c6c (diff)
downloadNim-cd2721242ad8faf7013911ac57bfcfff62578a2f.tar.gz
Fix waiting on coroutines (#5463)
Public coroutine API returns a safe reference to specific running coroutine. Fixes bug where multiple coroutines executing same procedure would identify as same coroutine.
Greatly optimizes `alive()` (and as a result of that `wait()`) calls.
Coroutine struct is allocated together with stack as memory unmanaged by GC.
Diffstat (limited to 'lib')
-rw-r--r--lib/pure/coro.nim63
1 files changed, 36 insertions, 27 deletions
diff --git a/lib/pure/coro.nim b/lib/pure/coro.nim
index e053f4427..b6ef30e7c 100644
--- a/lib/pure/coro.nim
+++ b/lib/pure/coro.nim
@@ -23,7 +23,6 @@ when not nimCoroutines and not defined(nimdoc):
     {.error: "Coroutines require -d:nimCoroutines".}
 
 import os
-import macros
 import lists
 include system/timers
 
@@ -120,27 +119,38 @@ const
   CORO_FINISHED = 2
 
 type
-  Stack = object
+  Stack {.pure.} = object
     top: pointer      # Top of the stack. Pointer used for deallocating stack if we own it.
     bottom: pointer   # Very bottom of the stack, acts as unique stack identifier.
     size: int
 
-  Coroutine = ref object
+  Coroutine {.pure.} = object
     execContext: Context
     fn: proc()
     state: int
     lastRun: Ticks
     sleepTime: float
     stack: Stack
+    reference: CoroutineRef
+
+  CoroutinePtr = ptr Coroutine
+
+  CoroutineRef* = ref object
+    ## CoroutineRef holds a pointer to actual coroutine object. Public API always returns
+    ## CoroutineRef instead of CoroutinePtr in order to allow holding a reference to coroutine
+    ## object while it can be safely deallocated by coroutine scheduler loop. In this case
+    ## Coroutine.reference.coro is set to nil. Public API checks for for it being nil and
+    ## gracefully fails if it is nil.
+    coro: CoroutinePtr
 
   CoroutineLoopContext = ref object
-    coroutines: DoublyLinkedList[Coroutine]
-    current: DoublyLinkedNode[Coroutine]
+    coroutines: DoublyLinkedList[CoroutinePtr]
+    current: DoublyLinkedNode[CoroutinePtr]
     loop: Coroutine
 
 var ctx {.threadvar.}: CoroutineLoopContext
 
-proc getCurrent(): Coroutine =
+proc getCurrent(): CoroutinePtr =
   ## Returns current executing coroutine object.
   var node = ctx.current
   if node != nil:
@@ -151,7 +161,7 @@ proc initialize() =
   ## Initializes coroutine state of current thread.
   if ctx == nil:
     ctx = CoroutineLoopContext()
-    ctx.coroutines = initDoublyLinkedList[Coroutine]()
+    ctx.coroutines = initDoublyLinkedList[CoroutinePtr]()
     ctx.loop = Coroutine()
     ctx.loop.state = CORO_EXECUTING
     when coroBackend == CORO_BACKEND_FIBERS:
@@ -159,7 +169,7 @@ proc initialize() =
 
 proc runCurrentTask()
 
-proc switchTo(current, to: Coroutine) =
+proc switchTo(current, to: CoroutinePtr) =
   ## Switches execution from `current` into `to` context.
   to.lastRun = getTicks()
   # Update position of current stack so gc invoked from another stack knows how much to scan.
@@ -192,7 +202,7 @@ proc suspend*(sleepTime: float=0) =
   ## Until then other coroutines are executed.
   var current = getCurrent()
   current.sleepTime = sleepTime
-  switchTo(current, ctx.loop)
+  switchTo(current, addr(ctx.loop))
 
 proc runCurrentTask() =
   ## Starts execution of current coroutine and updates it's state through coroutine's life.
@@ -218,31 +228,33 @@ proc runCurrentTask() =
   suspend(0)                      # Exit coroutine without returning from coroExecWithStack()
   doAssert false
 
-proc start*(c: proc(), stacksize: int=defaultStackSize) =
+proc start*(c: proc(), stacksize: int=defaultStackSize): CoroutineRef {.discardable.} =
   ## Schedule coroutine for execution. It does not run immediately.
   if ctx == nil:
     initialize()
   
-  var coro = Coroutine()
-  coro.fn = c
+  var coro: CoroutinePtr
   when coroBackend == CORO_BACKEND_FIBERS:
+    coro = cast[CoroutinePtr](alloc0(sizeof(Coroutine)))
     coro.execContext = CreateFiberEx(stacksize, stacksize,
       FIBER_FLAG_FLOAT_SWITCH, (proc(p: pointer): void {.stdcall.} = runCurrentTask()), nil)
     coro.stack.size = stacksize
   else:
-    var stack: pointer
-    while stack == nil:
-      stack = alloc0(stacksize)
-    coro.stack.top = stack
+    coro = cast[CoroutinePtr](alloc0(sizeof(Coroutine) + stacksize))
+    coro.stack.top = cast[pointer](cast[ByteAddress](coro) + sizeof(Coroutine))
+    coro.stack.bottom = cast[pointer](cast[ByteAddress](coro.stack.top) + stacksize)
     when coroBackend == CORO_BACKEND_UCONTEXT:
       discard getcontext(coro.execContext)
-      coro.execContext.uc_stack.ss_sp = cast[pointer](cast[ByteAddress](stack) + stacksize)
-      coro.execContext.uc_stack.ss_size = coro.stack.size
-      coro.execContext.uc_link = addr ctx.loop.execContext
+      coro.execContext.uc_stack.ss_sp = coro.stack.top
+      coro.execContext.uc_stack.ss_size = stacksize
+      coro.execContext.uc_link = addr(ctx.loop.execContext)
       makecontext(coro.execContext, runCurrentTask, 0)
+  coro.fn = c
   coro.stack.size = stacksize
   coro.state = CORO_CREATED
+  coro.reference = CoroutineRef(coro: coro)
   ctx.coroutines.append(coro)
+  return coro.reference
 
 proc run*() =
   initialize()
@@ -256,7 +268,7 @@ proc run*() =
     var remaining = current.sleepTime - (float(getTicks() - current.lastRun) / 1_000_000_000)
     if remaining <= 0:
       # Save main loop context. Suspending coroutine will resume after this statement with
-      switchTo(ctx.loop, current)
+      switchTo(addr(ctx.loop), current)
     else:
       if minDelay > 0 and remaining > 0:
         minDelay = min(remaining, minDelay)
@@ -269,14 +281,14 @@ proc run*() =
         # If first coroutine ends then `prev` is nil even if more coroutines 
         # are to be scheduled.
         next = ctx.current.next
+      current.reference.coro = nil
       ctx.coroutines.remove(ctx.current)
       GC_removeStack(current.stack.bottom)
       when coroBackend == CORO_BACKEND_FIBERS:
         DeleteFiber(current.execContext)
       else:
         dealloc(current.stack.top)
-      current.stack.top = nil
-      current.stack.bottom = nil
+      dealloc(current)
       ctx.current = next
     elif ctx.current == nil or ctx.current.next == nil:
       ctx.current = ctx.coroutines.head
@@ -284,13 +296,10 @@ proc run*() =
     else:
       ctx.current = ctx.current.next
 
-proc alive*(c: proc()): bool =
+proc alive*(c: CoroutineRef): bool = c.coro != nil and c.coro.state != CORO_FINISHED
   ## Returns ``true`` if coroutine has not returned, ``false`` otherwise.
-  for coro in items(ctx.coroutines):
-    if coro.fn == c:
-      return coro.state != CORO_FINISHED
 
-proc wait*(c: proc(), interval=0.01) =
+proc wait*(c: CoroutineRef, interval=0.01) =
   ## Returns only after coroutine ``c`` has returned. ``interval`` is time in seconds how often.
   while alive(c):
     suspend(interval)