diff options
author | Rokas Kupstys <rokups@zoho.com> | 2017-03-02 15:53:50 +0200 |
---|---|---|
committer | Andreas Rumpf <rumpf_a@web.de> | 2017-03-02 14:53:50 +0100 |
commit | cd2721242ad8faf7013911ac57bfcfff62578a2f (patch) | |
tree | 6d2473c58fa857e14c66ba3dc5b5351ca84debb7 /lib | |
parent | 34a3d40d18ef4ff73c629e38738068fe509e3c6c (diff) | |
download | Nim-cd2721242ad8faf7013911ac57bfcfff62578a2f.tar.gz |
Fix waiting on coroutines (#5463)
Public coroutine API returns a safe reference to specific running coroutine. Fixes bug where multiple coroutines executing same procedure would identify as same coroutine. Greatly optimizes `alive()` (and as a result of that `wait()`) calls. Coroutine struct is allocated together with stack as memory unmanaged by GC.
Diffstat (limited to 'lib')
-rw-r--r-- | lib/pure/coro.nim | 63 |
1 files changed, 36 insertions, 27 deletions
diff --git a/lib/pure/coro.nim b/lib/pure/coro.nim index e053f4427..b6ef30e7c 100644 --- a/lib/pure/coro.nim +++ b/lib/pure/coro.nim @@ -23,7 +23,6 @@ when not nimCoroutines and not defined(nimdoc): {.error: "Coroutines require -d:nimCoroutines".} import os -import macros import lists include system/timers @@ -120,27 +119,38 @@ const CORO_FINISHED = 2 type - Stack = object + Stack {.pure.} = object top: pointer # Top of the stack. Pointer used for deallocating stack if we own it. bottom: pointer # Very bottom of the stack, acts as unique stack identifier. size: int - Coroutine = ref object + Coroutine {.pure.} = object execContext: Context fn: proc() state: int lastRun: Ticks sleepTime: float stack: Stack + reference: CoroutineRef + + CoroutinePtr = ptr Coroutine + + CoroutineRef* = ref object + ## CoroutineRef holds a pointer to actual coroutine object. Public API always returns + ## CoroutineRef instead of CoroutinePtr in order to allow holding a reference to coroutine + ## object while it can be safely deallocated by coroutine scheduler loop. In this case + ## Coroutine.reference.coro is set to nil. Public API checks for for it being nil and + ## gracefully fails if it is nil. + coro: CoroutinePtr CoroutineLoopContext = ref object - coroutines: DoublyLinkedList[Coroutine] - current: DoublyLinkedNode[Coroutine] + coroutines: DoublyLinkedList[CoroutinePtr] + current: DoublyLinkedNode[CoroutinePtr] loop: Coroutine var ctx {.threadvar.}: CoroutineLoopContext -proc getCurrent(): Coroutine = +proc getCurrent(): CoroutinePtr = ## Returns current executing coroutine object. var node = ctx.current if node != nil: @@ -151,7 +161,7 @@ proc initialize() = ## Initializes coroutine state of current thread. if ctx == nil: ctx = CoroutineLoopContext() - ctx.coroutines = initDoublyLinkedList[Coroutine]() + ctx.coroutines = initDoublyLinkedList[CoroutinePtr]() ctx.loop = Coroutine() ctx.loop.state = CORO_EXECUTING when coroBackend == CORO_BACKEND_FIBERS: @@ -159,7 +169,7 @@ proc initialize() = proc runCurrentTask() -proc switchTo(current, to: Coroutine) = +proc switchTo(current, to: CoroutinePtr) = ## Switches execution from `current` into `to` context. to.lastRun = getTicks() # Update position of current stack so gc invoked from another stack knows how much to scan. @@ -192,7 +202,7 @@ proc suspend*(sleepTime: float=0) = ## Until then other coroutines are executed. var current = getCurrent() current.sleepTime = sleepTime - switchTo(current, ctx.loop) + switchTo(current, addr(ctx.loop)) proc runCurrentTask() = ## Starts execution of current coroutine and updates it's state through coroutine's life. @@ -218,31 +228,33 @@ proc runCurrentTask() = suspend(0) # Exit coroutine without returning from coroExecWithStack() doAssert false -proc start*(c: proc(), stacksize: int=defaultStackSize) = +proc start*(c: proc(), stacksize: int=defaultStackSize): CoroutineRef {.discardable.} = ## Schedule coroutine for execution. It does not run immediately. if ctx == nil: initialize() - var coro = Coroutine() - coro.fn = c + var coro: CoroutinePtr when coroBackend == CORO_BACKEND_FIBERS: + coro = cast[CoroutinePtr](alloc0(sizeof(Coroutine))) coro.execContext = CreateFiberEx(stacksize, stacksize, FIBER_FLAG_FLOAT_SWITCH, (proc(p: pointer): void {.stdcall.} = runCurrentTask()), nil) coro.stack.size = stacksize else: - var stack: pointer - while stack == nil: - stack = alloc0(stacksize) - coro.stack.top = stack + coro = cast[CoroutinePtr](alloc0(sizeof(Coroutine) + stacksize)) + coro.stack.top = cast[pointer](cast[ByteAddress](coro) + sizeof(Coroutine)) + coro.stack.bottom = cast[pointer](cast[ByteAddress](coro.stack.top) + stacksize) when coroBackend == CORO_BACKEND_UCONTEXT: discard getcontext(coro.execContext) - coro.execContext.uc_stack.ss_sp = cast[pointer](cast[ByteAddress](stack) + stacksize) - coro.execContext.uc_stack.ss_size = coro.stack.size - coro.execContext.uc_link = addr ctx.loop.execContext + coro.execContext.uc_stack.ss_sp = coro.stack.top + coro.execContext.uc_stack.ss_size = stacksize + coro.execContext.uc_link = addr(ctx.loop.execContext) makecontext(coro.execContext, runCurrentTask, 0) + coro.fn = c coro.stack.size = stacksize coro.state = CORO_CREATED + coro.reference = CoroutineRef(coro: coro) ctx.coroutines.append(coro) + return coro.reference proc run*() = initialize() @@ -256,7 +268,7 @@ proc run*() = var remaining = current.sleepTime - (float(getTicks() - current.lastRun) / 1_000_000_000) if remaining <= 0: # Save main loop context. Suspending coroutine will resume after this statement with - switchTo(ctx.loop, current) + switchTo(addr(ctx.loop), current) else: if minDelay > 0 and remaining > 0: minDelay = min(remaining, minDelay) @@ -269,14 +281,14 @@ proc run*() = # If first coroutine ends then `prev` is nil even if more coroutines # are to be scheduled. next = ctx.current.next + current.reference.coro = nil ctx.coroutines.remove(ctx.current) GC_removeStack(current.stack.bottom) when coroBackend == CORO_BACKEND_FIBERS: DeleteFiber(current.execContext) else: dealloc(current.stack.top) - current.stack.top = nil - current.stack.bottom = nil + dealloc(current) ctx.current = next elif ctx.current == nil or ctx.current.next == nil: ctx.current = ctx.coroutines.head @@ -284,13 +296,10 @@ proc run*() = else: ctx.current = ctx.current.next -proc alive*(c: proc()): bool = +proc alive*(c: CoroutineRef): bool = c.coro != nil and c.coro.state != CORO_FINISHED ## Returns ``true`` if coroutine has not returned, ``false`` otherwise. - for coro in items(ctx.coroutines): - if coro.fn == c: - return coro.state != CORO_FINISHED -proc wait*(c: proc(), interval=0.01) = +proc wait*(c: CoroutineRef, interval=0.01) = ## Returns only after coroutine ``c`` has returned. ``interval`` is time in seconds how often. while alive(c): suspend(interval) |