diff options
Diffstat (limited to 'lib/pure/coro.nim')
-rw-r--r-- | lib/pure/coro.nim | 430 |
1 files changed, 316 insertions, 114 deletions
diff --git a/lib/pure/coro.nim b/lib/pure/coro.nim index c5724f26f..24836e316 100644 --- a/lib/pure/coro.nim +++ b/lib/pure/coro.nim @@ -7,137 +7,339 @@ # distribution, for details about the copyright. # -when not defined(nimCoroutines): - {.error: "Coroutines require -d:nimCoroutines".} +## Nim coroutines implementation, supports several context switching methods: +## ======== ============ +## ucontext available on unix and alike (default) +## setjmp available on unix and alike (x86/64 only) +## fibers available and required on windows. +## ======== ============ +## +## -d:nimCoroutines Required to build this module. +## -d:nimCoroutinesUcontext Use ucontext backend. +## -d:nimCoroutinesSetjmp Use setjmp backend. +## -d:nimCoroutinesSetjmpBundled Use bundled setjmp implementation. +## +## Unstable API. -import os, times -import macros -import arch -import lists +import system/coro_detection + +when not nimCoroutines and not defined(nimdoc): + when defined(noNimCoroutines): + {.error: "Coroutines can not be used with -d:noNimCoroutines".} + else: + {.error: "Coroutines require -d:nimCoroutines".} + +import std/[os, lists] +include system/timers + +when defined(nimPreviewSlimSystem): + import std/assertions const defaultStackSize = 512 * 1024 +const useOrcArc = defined(gcArc) or defined(gcOrc) or defined(gcAtomicArc) -type Coroutine = ref object - # prev: ptr Coroutine - # next: ptr Coroutine - ctx: JmpBuf - fn: proc() - started: bool - lastRun: float - sleepTime: float - stack: pointer - stacksize: int - -var coroutines = initDoublyLinkedList[Coroutine]() -var current: Coroutine -var mainCtx: JmpBuf - - -proc GC_addStack(starts: pointer) {.cdecl, importc.} -proc GC_removeStack(starts: pointer) {.cdecl, importc.} -proc GC_setCurrentStack(starts, pos: pointer) {.cdecl, importc.} - -proc start*(c: proc(), stacksize: int=defaultStackSize) = - ## Adds coroutine to event loop. It does not run immediately. - var coro = Coroutine() - coro.fn = c - while coro.stack == nil: - coro.stack = alloc0(stacksize) - coro.stacksize = stacksize - coroutines.append(coro) - -{.push stackTrace: off.} -proc suspend*(sleepTime: float=0) = - ## Stops coroutine execution and resumes no sooner than after ``sleeptime`` seconds. +when useOrcArc: + proc nimGC_setStackBottom*(theStackBottom: pointer) = discard + +proc GC_addStack(bottom: pointer) {.cdecl, importc.} +proc GC_removeStack(bottom: pointer) {.cdecl, importc.} +proc GC_setActiveStack(bottom: pointer) {.cdecl, importc.} +proc GC_getActiveStack() : pointer {.cdecl, importc.} + +const + CORO_BACKEND_UCONTEXT = 0 + CORO_BACKEND_SETJMP = 1 + CORO_BACKEND_FIBERS = 2 + +when defined(windows): + const coroBackend = CORO_BACKEND_FIBERS + when defined(nimCoroutinesUcontext): + {.warning: "ucontext coroutine backend is not available on windows, defaulting to fibers.".} + when defined(nimCoroutinesSetjmp): + {.warning: "setjmp coroutine backend is not available on windows, defaulting to fibers.".} +elif defined(haiku) or defined(openbsd): + const coroBackend = CORO_BACKEND_SETJMP + when defined(nimCoroutinesUcontext): + {.warning: "ucontext coroutine backend is not available on haiku, defaulting to setjmp".} +elif defined(nimCoroutinesSetjmp) or defined(nimCoroutinesSetjmpBundled): + const coroBackend = CORO_BACKEND_SETJMP +else: + const coroBackend = CORO_BACKEND_UCONTEXT + +when coroBackend == CORO_BACKEND_FIBERS: + import std/winlean + type + Context = pointer + +elif coroBackend == CORO_BACKEND_UCONTEXT: + type + stack_t {.importc, header: "<ucontext.h>".} = object + ss_sp: pointer + ss_flags: int + ss_size: int + + ucontext_t {.importc, header: "<ucontext.h>".} = object + uc_link: ptr ucontext_t + uc_stack: stack_t + + Context = ucontext_t + + proc getcontext(context: var ucontext_t): int32 {.importc, + header: "<ucontext.h>".} + proc setcontext(context: var ucontext_t): int32 {.importc, + header: "<ucontext.h>".} + proc swapcontext(fromCtx, toCtx: var ucontext_t): int32 {.importc, + header: "<ucontext.h>".} + proc makecontext(context: var ucontext_t, fn: pointer, argc: int32) {.importc, + header: "<ucontext.h>", varargs.} + +elif coroBackend == CORO_BACKEND_SETJMP: + proc coroExecWithStack*(fn: pointer, stack: pointer) {.noreturn, + importc: "narch_$1", fastcall.} + when defined(amd64): + {.compile: "../arch/x86/amd64.S".} + elif defined(i386): + {.compile: "../arch/x86/i386.S".} + else: + # coroExecWithStack is defined in assembly. To support other platforms + # please provide implementation of this procedure. + {.error: "Unsupported architecture.".} + + when defined(nimCoroutinesSetjmpBundled): + # Use setjmp/longjmp implementation shipped with compiler. + when defined(amd64): + type + JmpBuf = array[0x50 + 0x10, uint8] + elif defined(i386): + type + JmpBuf = array[0x1C, uint8] + else: + # Bundled setjmp/longjmp are defined in assembly. To support other + # platforms please provide implementations of these procedures. + {.error: "Unsupported architecture.".} + + proc setjmp(ctx: var JmpBuf): int {.importc: "narch_$1".} + proc longjmp(ctx: JmpBuf, ret = 1) {.importc: "narch_$1".} + else: + # Use setjmp/longjmp implementation provided by the system. + type + JmpBuf {.importc: "jmp_buf", header: "<setjmp.h>".} = object + + proc setjmp(ctx: var JmpBuf): int {.importc, header: "<setjmp.h>".} + proc longjmp(ctx: JmpBuf, ret = 1) {.importc, header: "<setjmp.h>".} + + type + Context = JmpBuf + +when defined(unix): + # GLibc fails with "*** longjmp causes uninitialized stack frame ***" because + # our custom stacks are not initialized to a magic value. + when defined(osx): + # workaround: error: The deprecated ucontext routines require _XOPEN_SOURCE to be defined + const extra = " -D_XOPEN_SOURCE" + else: + const extra = "" + {.passc: "-U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=0" & extra.} + +const + CORO_CREATED = 0 + CORO_EXECUTING = 1 + CORO_FINISHED = 2 + +type + Stack {.pure.} = object + top: pointer # Top of the stack. Pointer used for deallocating stack if we own it. + bottom: pointer # Very bottom of the stack, acts as unique stack identifier. + size: int + + Coroutine {.pure.} = object + execContext: Context + fn: proc() + state: int + lastRun: Ticks + sleepTime: float + stack: Stack + reference: CoroutineRef + + CoroutinePtr = ptr Coroutine + + CoroutineRef* = ref object + ## CoroutineRef holds a pointer to actual coroutine object. Public API always returns + ## CoroutineRef instead of CoroutinePtr in order to allow holding a reference to coroutine + ## object while it can be safely deallocated by coroutine scheduler loop. In this case + ## Coroutine.reference.coro is set to nil. Public API checks for it being nil and + ## gracefully fails if it is nil. + coro: CoroutinePtr + + CoroutineLoopContext = ref object + coroutines: DoublyLinkedList[CoroutinePtr] + current: DoublyLinkedNode[CoroutinePtr] + loop: Coroutine + ncbottom: pointer # non coroutine stack botttom + +var ctx {.threadvar.}: CoroutineLoopContext + +proc getCurrent(): CoroutinePtr = + ## Returns current executing coroutine object. + var node = ctx.current + if node != nil: + return node.value + return nil + +proc initialize() = + ## Initializes coroutine state of current thread. + if ctx == nil: + ctx = CoroutineLoopContext() + ctx.coroutines = initDoublyLinkedList[CoroutinePtr]() + ctx.loop = Coroutine() + ctx.loop.state = CORO_EXECUTING + when not useOrcArc: + ctx.ncbottom = GC_getActiveStack() + when coroBackend == CORO_BACKEND_FIBERS: + ctx.loop.execContext = ConvertThreadToFiberEx(nil, FIBER_FLAG_FLOAT_SWITCH) + +proc runCurrentTask() + +proc switchTo(current, to: CoroutinePtr) = + ## Switches execution from `current` into `to` context. + to.lastRun = getTicks() + # Update position of current stack so gc invoked from another stack knows how much to scan. + when not useOrcArc: + GC_setActiveStack(current.stack.bottom) + nimGC_setStackBottom(current.stack.bottom) + var frame = getFrameState() + block: + # Execution will switch to another fiber now. We do not need to update current stack + when coroBackend == CORO_BACKEND_FIBERS: + SwitchToFiber(to.execContext) + elif coroBackend == CORO_BACKEND_UCONTEXT: + discard swapcontext(current.execContext, to.execContext) + elif coroBackend == CORO_BACKEND_SETJMP: + var res = setjmp(current.execContext) + if res == 0: + if to.state == CORO_EXECUTING: + # Coroutine is resumed. + longjmp(to.execContext, 1) + elif to.state == CORO_CREATED: + # Coroutine is started. + coroExecWithStack(runCurrentTask, to.stack.bottom) + #raiseAssert "unreachable" + else: + {.error: "Invalid coroutine backend set.".} + # Execution was just resumed. Restore frame information and set active stack. + setFrameState(frame) + when not useOrcArc: + GC_setActiveStack(current.stack.bottom) + nimGC_setStackBottom(ctx.ncbottom) + +proc suspend*(sleepTime: float = 0) = + ## Stops coroutine execution and resumes no sooner than after `sleeptime` seconds. ## Until then other coroutines are executed. - ## - ## This is similar to a `yield`:idx:, or a `yieldFrom`:idx in Python. - var oldFrame = getFrame() - var sp {.volatile.}: pointer - GC_setCurrentStack(current.stack, cast[pointer](addr sp)) + var current = getCurrent() current.sleepTime = sleepTime - current.lastRun = epochTime() - if setjmp(current.ctx) == 0: - longjmp(mainCtx, 1) - setFrame(oldFrame) -{.pop.} + nimGC_setStackBottom(ctx.ncbottom) + switchTo(current, addr(ctx.loop)) + +proc runCurrentTask() = + ## Starts execution of current coroutine and updates it's state through coroutine's life. + var sp {.volatile.}: pointer + sp = addr(sp) + block: + var current = getCurrent() + current.stack.bottom = sp + nimGC_setStackBottom(current.stack.bottom) + # Execution of new fiber just started. Since it was entered not through `switchTo` we + # have to set active stack here as well. GC_removeStack() has to be called in main loop + # because we still need stack available in final suspend(0) call from which we will not + # return. + when not useOrcArc: + GC_addStack(sp) + # Activate current stack because we are executing in a new coroutine. + GC_setActiveStack(sp) + current.state = CORO_EXECUTING + try: + current.fn() # Start coroutine execution + except: + echo "Unhandled exception in coroutine." + writeStackTrace() + current.state = CORO_FINISHED + nimGC_setStackBottom(ctx.ncbottom) + suspend(0) # Exit coroutine without returning from coroExecWithStack() + raiseAssert "unreachable" + +proc start*(c: proc(), stacksize: int = defaultStackSize): CoroutineRef {.discardable.} = + ## Schedule coroutine for execution. It does not run immediately. + if ctx == nil: + initialize() + + var coro: CoroutinePtr + when coroBackend == CORO_BACKEND_FIBERS: + coro = cast[CoroutinePtr](alloc0(sizeof(Coroutine))) + coro.execContext = CreateFiberEx(stacksize, stacksize, + FIBER_FLAG_FLOAT_SWITCH, + (proc(p: pointer) {.stdcall.} = runCurrentTask()), nil) + else: + coro = cast[CoroutinePtr](alloc0(sizeof(Coroutine) + stacksize)) + coro.stack.top = cast[pointer](cast[int](coro) + sizeof(Coroutine)) + coro.stack.bottom = cast[pointer](cast[int](coro.stack.top) + stacksize) + when coroBackend == CORO_BACKEND_UCONTEXT: + discard getcontext(coro.execContext) + coro.execContext.uc_stack.ss_sp = coro.stack.top + coro.execContext.uc_stack.ss_size = stacksize + coro.execContext.uc_link = addr(ctx.loop.execContext) + makecontext(coro.execContext, runCurrentTask, 0) + coro.fn = c + coro.stack.size = stacksize + coro.state = CORO_CREATED + coro.reference = CoroutineRef(coro: coro) + ctx.coroutines.append(coro) + return coro.reference proc run*() = - ## Starts main event loop which exits when all coroutines exit. Calling this proc - ## starts execution of first coroutine. - var node = coroutines.head + ## Starts main coroutine scheduler loop which exits when all coroutines exit. + ## Calling this proc starts execution of first coroutine. + initialize() + ctx.current = ctx.coroutines.head var minDelay: float = 0 - var frame: PFrame - while node != nil: - var coro = node.value - current = coro - os.sleep(int(minDelay * 1000)) + while ctx.current != nil: + var current = getCurrent() - var remaining = coro.sleepTime - (epochTime() - coro.lastRun); + var remaining = current.sleepTime - (float(getTicks() - current.lastRun) / 1_000_000_000) if remaining <= 0: - remaining = 0 - let res = setjmp(mainCtx) - if res == 0: - frame = getFrame() - if coro.started: # coroutine resumes - longjmp(coro.ctx, 1) - else: - coro.started = true # coroutine starts - var stackEnd = cast[pointer](cast[ByteAddress](coro.stack) + coro.stacksize) - GC_addStack(coro.stack) - coroSwitchStack(stackEnd) - coro.fn() - coroRestoreStack() - GC_removeStack(coro.stack) - var next = node.prev - coroutines.remove(node) - dealloc(coro.stack) - node = next - setFrame(frame) - else: - setFrame(frame) - - elif remaining > 0: + # Save main loop context. Suspending coroutine will resume after this statement with + switchTo(addr(ctx.loop), current) + else: if minDelay > 0 and remaining > 0: minDelay = min(remaining, minDelay) else: minDelay = remaining - if node == nil or node.next == nil: - node = coroutines.head + if current.state == CORO_FINISHED: + var next = ctx.current.prev + if next == nil: + # If first coroutine ends then `prev` is nil even if more coroutines + # are to be scheduled. + next = ctx.current.next + current.reference.coro = nil + ctx.coroutines.remove(ctx.current) + when not useOrcArc: + GC_removeStack(current.stack.bottom) + when coroBackend == CORO_BACKEND_FIBERS: + DeleteFiber(current.execContext) + else: + dealloc(current.stack.top) + dealloc(current) + ctx.current = next + elif ctx.current == nil or ctx.current.next == nil: + ctx.current = ctx.coroutines.head + os.sleep(int(minDelay * 1000)) else: - node = node.next + ctx.current = ctx.current.next -proc alive*(c: proc()): bool = - ## Returns ``true`` if coroutine has not returned, ``false`` otherwise. - for coro in items(coroutines): - if coro.fn == c: - return true +proc alive*(c: CoroutineRef): bool = c.coro != nil and c.coro.state != CORO_FINISHED + ## Returns `true` if coroutine has not returned, `false` otherwise. -proc wait*(c: proc(), interval=0.01) = - ## Returns only after coroutine ``c`` has returned. ``interval`` is time in seconds how often. +proc wait*(c: CoroutineRef, interval = 0.01) = + ## Returns only after coroutine `c` has returned. `interval` is time in seconds how often. while alive(c): - suspend interval - -when defined(nimCoroutines) and isMainModule: - var stackCheckValue = 1100220033 - proc c2() - - proc c1() = - for i in 0 .. 3: - echo "c1" - suspend 0.05 - echo "c1 exits" - - - proc c2() = - for i in 0 .. 3: - echo "c2" - suspend 0.025 - wait(c1) - echo "c2 exits" - - start(c1) - start(c2) - run() - echo "done ", stackCheckValue + suspend(interval) |