summary refs log tree commit diff stats
path: root/lib/pure/coro.nim
blob: ead3849c6b9863d41389589a8e684bc46a0393fa (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
#
#
#            Nim's Runtime Library
#        (c) Copyright 2015 Rokas Kupstys
#
#    See the file "copying.txt", included in this
#    distribution, for details about the copyright.
#

## Nim coroutines implementation, supports several context switching methods:
## --------  ------------
## ucontext  available on unix and alike (default)
## setjmp    available on unix and alike (x86/64 only)
## fibers    available and required on windows.
## --------  ------------
##
## -d:nimCoroutines               Required to build this module.
## -d:nimCoroutinesUcontext       Use ucontext backend.
## -d:nimCoroutinesSetjmp         Use setjmp backend.
## -d:nimCoroutinesSetjmpBundled  Use bundled setjmp implementation.
##
## Unstable API.

when not nimCoroutines and not defined(nimdoc):
  when defined(noNimCoroutines):
    {.error: "Coroutines can not be used with -d:noNimCoroutines".}
  else:
    {.error: "Coroutines require -d:nimCoroutines".}

import os
import lists
include system/timers

const defaultStackSize = 512 * 1024

proc GC_addStack(bottom: pointer) {.cdecl, importc.}
proc GC_removeStack(bottom: pointer) {.cdecl, importc.}
proc GC_setActiveStack(bottom: pointer) {.cdecl, importc.}

const
  CORO_BACKEND_UCONTEXT = 0
  CORO_BACKEND_SETJMP = 1
  CORO_BACKEND_FIBERS = 2

when defined(windows):
  const coroBackend = CORO_BACKEND_FIBERS
  when defined(nimCoroutinesUcontext):
    {.warning: "ucontext coroutine backend is not available on windows, defaulting to fibers.".}
  when defined(nimCoroutinesSetjmp):
    {.warning: "setjmp coroutine backend is not available on windows, defaulting to fibers.".}
elif defined(haiku) or defined(openbsd):
  const coroBackend = CORO_BACKEND_SETJMP
  when defined(nimCoroutinesUcontext):
    {.warning: "ucontext coroutine backend is not available on haiku, defaulting to setjmp".}
elif defined(nimCoroutinesSetjmp) or defined(nimCoroutinesSetjmpBundled):
  const coroBackend = CORO_BACKEND_SETJMP
else:
  const coroBackend = CORO_BACKEND_UCONTEXT

when coroBackend == CORO_BACKEND_FIBERS:
  import windows.winlean
  type
    Context = pointer

elif coroBackend == CORO_BACKEND_UCONTEXT:
  type
    stack_t {.importc, header: "<ucontext.h>".} = object
      ss_sp: pointer
      ss_flags: int
      ss_size: int

    ucontext_t {.importc, header: "<ucontext.h>".} = object
      uc_link: ptr ucontext_t
      uc_stack: stack_t

    Context = ucontext_t

  proc getcontext(context: var ucontext_t): int32 {.importc,
      header: "<ucontext.h>".}
  proc setcontext(context: var ucontext_t): int32 {.importc,
      header: "<ucontext.h>".}
  proc swapcontext(fromCtx, toCtx: var ucontext_t): int32 {.importc,
      header: "<ucontext.h>".}
  proc makecontext(context: var ucontext_t, fn: pointer, argc: int32) {.importc,
      header: "<ucontext.h>", varargs.}

elif coroBackend == CORO_BACKEND_SETJMP:
  proc coroExecWithStack*(fn: pointer, stack: pointer) {.noreturn,
      importc: "narch_$1", fastcall.}
  when defined(amd64):
    {.compile: "../arch/x86/amd64.S".}
  elif defined(i386):
    {.compile: "../arch/x86/i386.S".}
  else:
    # coroExecWithStack is defined in assembly. To support other platforms
    # please provide implementation of this procedure.
    {.error: "Unsupported architecture.".}

  when defined(nimCoroutinesSetjmpBundled):
    # Use setjmp/longjmp implementation shipped with compiler.
    when defined(amd64):
      type
        JmpBuf = array[0x50 + 0x10, uint8]
    elif defined(i386):
      type
        JmpBuf = array[0x1C, uint8]
    else:
      # Bundled setjmp/longjmp are defined in assembly. To support other
      # platforms please provide implementations of these procedures.
      {.error: "Unsupported architecture.".}

    proc setjmp(ctx: var JmpBuf): int {.importc: "narch_$1".}
    proc longjmp(ctx: JmpBuf, ret = 1) {.importc: "narch_$1".}
  else:
    # Use setjmp/longjmp implementation provided by the system.
    type
      JmpBuf {.importc: "jmp_buf", header: "<setjmp.h>".} = object

    proc setjmp(ctx: var JmpBuf): int {.importc, header: "<setjmp.h>".}
    proc longjmp(ctx: JmpBuf, ret = 1) {.importc, header: "<setjmp.h>".}

  type
    Context = JmpBuf

when defined(unix):
  # GLibc fails with "*** longjmp causes uninitialized stack frame ***" because
  # our custom stacks are not initialized to a magic value.
  when defined(osx):
    # workaround: error: The deprecated ucontext routines require _XOPEN_SOURCE to be defined
    const extra = " -D_XOPEN_SOURCE"
  else:
    const extra = ""
  {.passc: "-U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=0" & extra.}

const
  CORO_CREATED = 0
  CORO_EXECUTING = 1
  CORO_FINISHED = 2

type
  Stack {.pure.} = object
    top: pointer    # Top of the stack. Pointer used for deallocating stack if we own it.
    bottom: pointer # Very bottom of the stack, acts as unique stack identifier.
    size: int

  Coroutine {.pure.} = object
    execContext: Context
    fn: proc()
    state: int
    lastRun: Ticks
    sleepTime: float
    stack: Stack
    reference: CoroutineRef

  CoroutinePtr = ptr Coroutine

  CoroutineRef* = ref object
    ## CoroutineRef holds a pointer to actual coroutine object. Public API always returns
    ## CoroutineRef instead of CoroutinePtr in order to allow holding a reference to coroutine
    ## object while it can be safely deallocated by coroutine scheduler loop. In this case
    ## Coroutine.reference.coro is set to nil. Public API checks for for it being nil and
    ## gracefully fails if it is nil.
    coro: CoroutinePtr

  CoroutineLoopContext = ref object
    coroutines: DoublyLinkedList[CoroutinePtr]
    current: DoublyLinkedNode[CoroutinePtr]
    loop: Coroutine

var ctx {.threadvar.}: CoroutineLoopContext

proc getCurrent(): CoroutinePtr =
  ## Returns current executing coroutine object.
  var node = ctx.current
  if node != nil:
    return node.value
  return nil

proc initialize() =
  ## Initializes coroutine state of current thread.
  if ctx == nil:
    ctx = CoroutineLoopContext()
    ctx.coroutines = initDoublyLinkedList[CoroutinePtr]()
    ctx.loop = Coroutine()
    ctx.loop.state = CORO_EXECUTING
    when coroBackend == CORO_BACKEND_FIBERS:
      ctx.loop.execContext = ConvertThreadToFiberEx(nil, FIBER_FLAG_FLOAT_SWITCH)

proc runCurrentTask()

proc switchTo(current, to: CoroutinePtr) =
  ## Switches execution from `current` into `to` context.
  to.lastRun = getTicks()
  # Update position of current stack so gc invoked from another stack knows how much to scan.
  GC_setActiveStack(current.stack.bottom)
  var frame = getFrameState()
  block:
    # Execution will switch to another fiber now. We do not need to update current stack
    when coroBackend == CORO_BACKEND_FIBERS:
      SwitchToFiber(to.execContext)
    elif coroBackend == CORO_BACKEND_UCONTEXT:
      discard swapcontext(current.execContext, to.execContext)
    elif coroBackend == CORO_BACKEND_SETJMP:
      var res = setjmp(current.execContext)
      if res == 0:
        if to.state == CORO_EXECUTING:
          # Coroutine is resumed.
          longjmp(to.execContext, 1)
        elif to.state == CORO_CREATED:
          # Coroutine is started.
          coroExecWithStack(runCurrentTask, to.stack.bottom)
          #doAssert false
    else:
      {.error: "Invalid coroutine backend set.".}
  # Execution was just resumed. Restore frame information and set active stack.
  setFrameState(frame)
  GC_setActiveStack(current.stack.bottom)

proc suspend*(sleepTime: float = 0) =
  ## Stops coroutine execution and resumes no sooner than after ``sleeptime`` seconds.
  ## Until then other coroutines are executed.
  var current = getCurrent()
  current.sleepTime = sleepTime
  switchTo(current, addr(ctx.loop))

proc runCurrentTask() =
  ## Starts execution of current coroutine and updates it's state through coroutine's life.
  var sp {.volatile.}: pointer
  sp = addr(sp)
  block:
    var current = getCurrent()
    current.stack.bottom = sp
    # Execution of new fiber just started. Since it was entered not through `switchTo` we
    # have to set active stack here as well. GC_removeStack() has to be called in main loop
    # because we still need stack available in final suspend(0) call from which we will not
    # return.
    GC_addStack(sp)
    # Activate current stack because we are executing in a new coroutine.
    GC_setActiveStack(sp)
    current.state = CORO_EXECUTING
    try:
      current.fn() # Start coroutine execution
    except:
      echo "Unhandled exception in coroutine."
      writeStackTrace()
    current.state = CORO_FINISHED
  suspend(0) # Exit coroutine without returning from coroExecWithStack()
  doAssert false

proc start*(c: proc(), stacksize: int = defaultStackSize): CoroutineRef {.discardable.} =
  ## Schedule coroutine for execution. It does not run immediately.
  if ctx == nil:
    initialize()

  var coro: CoroutinePtr
  when coroBackend == CORO_BACKEND_FIBERS:
    coro = cast[CoroutinePtr](alloc0(sizeof(Coroutine)))
    coro.execContext = CreateFiberEx(stacksize, stacksize,
      FIBER_FLAG_FLOAT_SWITCH,
      (proc(p: pointer): void {.stdcall.} = runCurrentTask()),
      nil)
    coro.stack.size = stacksize
  else:
    coro = cast[CoroutinePtr](alloc0(sizeof(Coroutine) + stacksize))
    coro.stack.top = cast[pointer](cast[ByteAddress](coro) + sizeof(Coroutine))
    coro.stack.bottom = cast[pointer](cast[ByteAddress](coro.stack.top) + stacksize)
    when coroBackend == CORO_BACKEND_UCONTEXT:
      discard getcontext(coro.execContext)
      coro.execContext.uc_stack.ss_sp = coro.stack.top
      coro.execContext.uc_stack.ss_size = stacksize
      coro.execContext.uc_link = addr(ctx.loop.execContext)
      makecontext(coro.execContext, runCurrentTask, 0)
  coro.fn = c
  coro.stack.size = stacksize
  coro.state = CORO_CREATED
  coro.reference = CoroutineRef(coro: coro)
  ctx.coroutines.append(coro)
  return coro.reference

proc run*() =
  initialize()
  ## Starts main coroutine scheduler loop which exits when all coroutines exit.
  ## Calling this proc starts execution of first coroutine.
  ctx.current = ctx.coroutines.head
  var minDelay: float = 0
  while ctx.current != nil:
    var current = getCurrent()

    var remaining = current.sleepTime - (float(getTicks() - current.lastRun) / 1_000_000_000)
    if remaining <= 0:
      # Save main loop context. Suspending coroutine will resume after this statement with
      switchTo(addr(ctx.loop), current)
    else:
      if minDelay > 0 and remaining > 0:
        minDelay = min(remaining, minDelay)
      else:
        minDelay = remaining

    if current.state == CORO_FINISHED:
      var next = ctx.current.prev
      if next == nil:
        # If first coroutine ends then `prev` is nil even if more coroutines
        # are to be scheduled.
        next = ctx.current.next
      current.reference.coro = nil
      ctx.coroutines.remove(ctx.current)
      GC_removeStack(current.stack.bottom)
      when coroBackend == CORO_BACKEND_FIBERS:
        DeleteFiber(current.execContext)
      else:
        dealloc(current.stack.top)
      dealloc(current)
      ctx.current = next
    elif ctx.current == nil or ctx.current.next == nil:
      ctx.current = ctx.coroutines.head
      os.sleep(int(minDelay * 1000))
    else:
      ctx.current = ctx.current.next

proc alive*(c: CoroutineRef): bool = c.coro != nil and c.coro.state != CORO_FINISHED
  ## Returns ``true`` if coroutine has not returned, ``false`` otherwise.

proc wait*(c: CoroutineRef, interval = 0.01) =
  ## Returns only after coroutine ``c`` has returned. ``interval`` is time in seconds how often.
  while alive(c):
    suspend(interval)