summary refs log tree commit diff stats
path: root/lib/system/threads.nim
diff options
context:
space:
mode:
Diffstat (limited to 'lib/system/threads.nim')
-rwxr-xr-xlib/system/threads.nim481
1 files changed, 481 insertions, 0 deletions
diff --git a/lib/system/threads.nim b/lib/system/threads.nim
new file mode 100755
index 000000000..052335baa
--- /dev/null
+++ b/lib/system/threads.nim
@@ -0,0 +1,481 @@
+#
+#
+#            Nimrod's Runtime Library
+#        (c) Copyright 2011 Andreas Rumpf
+#
+#    See the file "copying.txt", included in this
+#    distribution, for details about the copyright.
+#
+
+## Thread support for Nimrod. **Note**: This is part of the system module.
+## Do not import it directly. To active thread support you need to compile
+## with the ``--threads:on`` command line switch.
+##
+## Nimrod's memory model for threads is quite different from other common 
+## programming languages (C, Pascal): Each thread has its own
+## (garbage collected) heap and sharing of memory is restricted. This helps
+## to prevent race conditions and improves efficiency. See the manual for
+## details of this memory model.
+##
+## Example:
+##
+## .. code-block:: nimrod
+##
+##  var
+##    thr: array [0..4, TThread[tuple[a,b: int]]]
+##    L: TLock
+##  
+##  proc threadFunc(interval: tuple[a,b: int]) {.procvar.} = 
+##    for i in interval.a..interval.b: 
+##      Aquire(L) # lock stdout
+##      echo i
+##      Release(L)
+##
+##  InitLock(L)
+##
+##  for i in 0..high(thr):
+##    createThread(thr[i], threadFunc, (i*10, i*10+5))
+##  joinThreads(thr)
+  
+const
+  maxRegisters = 256 # don't think there is an arch with more registers
+  maxLocksPerThread* = 10 ## max number of locks a thread can hold
+                          ## at the same time
+
+when defined(Windows):
+  type
+    TSysLock {.final, pure.} = object # CRITICAL_SECTION in WinApi
+      DebugInfo: pointer
+      LockCount: int32
+      RecursionCount: int32
+      OwningThread: int
+      LockSemaphore: int
+      Reserved: int32
+          
+  proc InitSysLock(L: var TSysLock) {.stdcall,
+    dynlib: "kernel32", importc: "InitializeCriticalSection".}
+    ## Initializes the lock `L`.
+
+  proc TryAquireSysAux(L: var TSysLock): int32 {.stdcall,
+    dynlib: "kernel32", importc: "TryEnterCriticalSection".}
+    ## Tries to aquire the lock `L`.
+    
+  proc TryAquireSys(L: var TSysLock): bool {.inline.} = 
+    result = TryAquireSysAux(L) != 0'i32
+
+  proc AquireSys(L: var TSysLock) {.stdcall,
+    dynlib: "kernel32", importc: "EnterCriticalSection".}
+    ## Aquires the lock `L`.
+    
+  proc ReleaseSys(L: var TSysLock) {.stdcall,
+    dynlib: "kernel32", importc: "LeaveCriticalSection".}
+    ## Releases the lock `L`.
+
+  type
+    THandle = int
+    TSysThread = THandle
+    TWinThreadProc = proc (x: pointer): int32 {.stdcall.}
+
+  proc CreateThread(lpThreadAttributes: Pointer, dwStackSize: int32,
+                     lpStartAddress: TWinThreadProc, 
+                     lpParameter: Pointer,
+                     dwCreationFlags: int32, 
+                     lpThreadId: var int32): TSysThread {.
+    stdcall, dynlib: "kernel32", importc: "CreateThread".}
+
+  proc winSuspendThread(hThread: TSysThread): int32 {.
+    stdcall, dynlib: "kernel32", importc: "SuspendThread".}
+      
+  proc winResumeThread(hThread: TSysThread): int32 {.
+    stdcall, dynlib: "kernel32", importc: "ResumeThread".}
+
+  proc WaitForMultipleObjects(nCount: int32,
+                              lpHandles: ptr TSysThread,
+                              bWaitAll: int32,
+                              dwMilliseconds: int32): int32 {.
+    stdcall, dynlib: "kernel32", importc: "WaitForMultipleObjects".}
+
+  proc WaitForSingleObject(hHandle: TSysThread, dwMilliseconds: int32): int32 {.
+    stdcall, dynlib: "kernel32", importc: "WaitForSingleObject".}
+
+  proc TerminateThread(hThread: TSysThread, dwExitCode: int32): int32 {.
+    stdcall, dynlib: "kernel32", importc: "TerminateThread".}
+    
+  type
+    TThreadVarSlot {.compilerproc.} = distinct int32
+
+  proc TlsAlloc(): TThreadVarSlot {.
+    importc: "TlsAlloc", stdcall, dynlib: "kernel32".}
+  proc TlsSetValue(dwTlsIndex: TThreadVarSlot, lpTlsValue: pointer) {.
+    importc: "TlsSetValue", stdcall, dynlib: "kernel32".}
+  proc TlsGetValue(dwTlsIndex: TThreadVarSlot): pointer {.
+    importc: "TlsGetValue", stdcall, dynlib: "kernel32".}
+  
+  proc ThreadVarAlloc(): TThreadVarSlot {.compilerproc, inline.} =
+    result = TlsAlloc()
+  proc ThreadVarSetValue(s: TThreadVarSlot, value: pointer) {.
+                         compilerproc, inline.} =
+    TlsSetValue(s, value)
+  proc ThreadVarGetValue(s: TThreadVarSlot): pointer {.
+                         compilerproc, inline.} =
+    result = TlsGetValue(s)
+
+else:
+  {.passL: "-pthread".}
+  {.passC: "-pthread".}
+
+  type
+    TSysLock {.importc: "pthread_mutex_t", pure, final,
+               header: "<sys/types.h>".} = object
+
+  proc InitSysLock(L: var TSysLock, attr: pointer = nil) {.
+    importc: "pthread_mutex_init", header: "<pthread.h>".}
+
+  proc AquireSys(L: var TSysLock) {.
+    importc: "pthread_mutex_lock", header: "<pthread.h>".}
+  proc TryAquireSysAux(L: var TSysLock): cint {.
+    importc: "pthread_mutex_trylock", header: "<pthread.h>".}
+
+  proc TryAquireSys(L: var TSysLock): bool {.inline.} = 
+    result = TryAquireSysAux(L) == 0'i32
+
+  proc ReleaseSys(L: var TSysLock) {.
+    importc: "pthread_mutex_unlock", header: "<pthread.h>".}
+
+  type
+    TSysThread {.importc: "pthread_t", header: "<sys/types.h>",
+                 final, pure.} = object
+    Tpthread_attr {.importc: "pthread_attr_t",
+                     header: "<sys/types.h>", final, pure.} = object
+                 
+    Ttimespec {.importc: "struct timespec",
+                header: "<time.h>", final, pure.} = object
+      tv_sec: int
+      tv_nsec: int
+
+  proc pthread_attr_init(a1: var TPthread_attr) {.
+    importc, header: "<pthread.h>".}
+  proc pthread_attr_setstacksize(a1: var TPthread_attr, a2: int) {.
+    importc, header: "<pthread.h>".}
+
+  proc pthread_create(a1: var TSysThread, a2: var TPthread_attr,
+            a3: proc (x: pointer) {.noconv.}, 
+            a4: pointer): cint {.importc: "pthread_create", 
+            header: "<pthread.h>".}
+  proc pthread_join(a1: TSysThread, a2: ptr pointer): cint {.
+    importc, header: "<pthread.h>".}
+
+  proc pthread_cancel(a1: TSysThread): cint {.
+    importc: "pthread_cancel", header: "<pthread.h>".}
+
+  proc AquireSysTimeoutAux(L: var TSysLock, timeout: var Ttimespec): cint {.
+    importc: "pthread_mutex_timedlock", header: "<time.h>".}
+
+  proc AquireSysTimeout(L: var TSysLock, msTimeout: int) {.inline.} =
+    var a: Ttimespec
+    a.tv_sec = msTimeout div 1000
+    a.tv_nsec = (msTimeout mod 1000) * 1000
+    var res = AquireSysTimeoutAux(L, a)
+    if res != 0'i32: raise newException(EResourceExhausted, $strerror(res))
+
+  type
+    TThreadVarSlot {.importc: "pthread_key_t", pure, final,
+                   header: "<sys/types.h>".} = object
+
+  proc pthread_getspecific(a1: TThreadVarSlot): pointer {.
+    importc: "pthread_getspecific", header: "<pthread.h>".}
+  proc pthread_key_create(a1: ptr TThreadVarSlot, 
+                          destruct: proc (x: pointer) {.noconv.}): int32 {.
+    importc: "pthread_key_create", header: "<pthread.h>".}
+  proc pthread_key_delete(a1: TThreadVarSlot): int32 {.
+    importc: "pthread_key_delete", header: "<pthread.h>".}
+
+  proc pthread_setspecific(a1: TThreadVarSlot, a2: pointer): int32 {.
+    importc: "pthread_setspecific", header: "<pthread.h>".}
+  
+  proc ThreadVarAlloc(): TThreadVarSlot {.compilerproc, inline.} =
+    discard pthread_key_create(addr(result), nil)
+  proc ThreadVarSetValue(s: TThreadVarSlot, value: pointer) {.
+                         compilerproc, inline.} =
+    discard pthread_setspecific(s, value)
+  proc ThreadVarGetValue(s: TThreadVarSlot): pointer {.compilerproc, inline.} =
+    result = pthread_getspecific(s)
+
+type
+  TGlobals {.final, pure.} = object
+    excHandler: PSafePoint
+    currException: ref E_Base
+    framePtr: PFrame
+    buf: string       # cannot be allocated on the stack!
+    assertBuf: string # we need a different buffer for
+                      # assert, as it raises an exception and
+                      # exception handler needs the buffer too
+    gAssertionFailed: ref EAssertionFailed
+    tempFrames: array [0..127, PFrame] # cannot be allocated on the stack!
+    data: float # compiler should add thread local variables here!
+
+proc initGlobals(g: var TGlobals) = 
+  new(g.gAssertionFailed)
+  g.buf = newStringOfCap(2000)
+  g.assertBuf = newStringOfCap(2000)
+
+
+type
+  PGcThread = ptr TGcThread
+  TGcThread {.pure.} = object
+    sys: TSysThread
+    next, prev: PGcThread
+    stackBottom, stackTop: pointer
+    stackSize: int
+    g: TGlobals
+    locksLen: int
+    locks: array [0..MaxLocksPerThread-1, pointer]
+    registers: array[0..maxRegisters-1, pointer] # register contents for GC
+
+# XXX it'd be more efficient to not use a global variable for the 
+# thread storage slot, but to rely on the implementation to assign slot 0
+# for us... ;-)
+var globalsSlot = ThreadVarAlloc()
+#const globalsSlot = TThreadVarSlot(0)
+#assert checkSlot.int == globalsSlot.int
+  
+proc ThisThread(): PGcThread {.compilerRtl, inl.} =
+  result = cast[PGcThread](ThreadVarGetValue(globalsSlot))
+
+# create for the main thread. Note: do not insert this data into the list
+# of all threads; it's not to be stopped etc.
+when not defined(useNimRtl):
+  var mainThread: TGcThread
+  initGlobals(mainThread.g)
+  ThreadVarSetValue(globalsSlot, addr(mainThread))
+
+  var heapLock: TSysLock
+  InitSysLock(HeapLock)
+
+  var
+    threadList: PGcThread
+    
+  proc registerThread(t: PGcThread) = 
+    # we need to use the GC global lock here!
+    AquireSys(HeapLock)
+    t.prev = nil
+    t.next = threadList
+    if threadList != nil: 
+      assert(threadList.prev == nil)
+      threadList.prev = t
+    threadList = t
+    ReleaseSys(HeapLock)
+        
+  proc unregisterThread(t: PGcThread) =
+    # we need to use the GC global lock here!
+    AquireSys(HeapLock)
+    if t == threadList: threadList = t.next
+    if t.next != nil: t.next.prev = t.prev
+    if t.prev != nil: t.prev.next = t.next
+    # so that a thread can be unregistered twice which might happen if the
+    # code executes `destroyThread`:
+    t.next = nil
+    t.prev = nil
+    ReleaseSys(HeapLock)
+    
+  # on UNIX, the GC uses ``SIGFREEZE`` to tell every thread to stop so that
+  # the GC can examine the stacks?
+  
+  proc stopTheWord() =
+    nil
+    
+# We jump through some hops here to ensure that Nimrod thread procs can have
+# the Nimrod calling convention. This is needed because thread procs are 
+# ``stdcall`` on Windows and ``noconv`` on UNIX. Alternative would be to just
+# use ``stdcall`` since it is mapped to ``noconv`` on UNIX anyway. However, 
+# the current approach will likely result in less problems later when we have
+# GC'ed closures in Nimrod.
+
+type
+  TThread* {.pure, final.}[TParam] = object of TGcThread ## Nimrod thread.
+    fn: proc (p: TParam)
+    data: TParam
+  
+template ThreadProcWrapperBody(closure: expr) =
+  when not hasSharedHeap: initGC() # init the GC for this thread
+  ThreadVarSetValue(globalsSlot, closure)
+  var t = cast[ptr TThread[TParam]](closure)
+  when not hasSharedHeap: stackBottom = addr(t)
+  t.stackBottom = addr(t)
+  registerThread(t)
+  try:
+    t.fn(t.data)
+  finally:
+    unregisterThread(t)
+  
+{.push stack_trace:off.}
+when defined(windows):
+  proc threadProcWrapper[TParam](closure: pointer): int32 {.stdcall.} = 
+    ThreadProcWrapperBody(closure)
+    # implicitely return 0
+else:
+  proc threadProcWrapper[TParam](closure: pointer) {.noconv.} = 
+    ThreadProcWrapperBody(closure)
+{.pop.}
+
+proc joinThread*[TParam](t: TThread[TParam]) {.inline.} = 
+  ## waits for the thread `t` to finish.
+  when hostOS == "windows":
+    discard WaitForSingleObject(t.sys, -1'i32)
+  else:
+    discard pthread_join(t.sys, nil)
+
+proc joinThreads*[TParam](t: openArray[TThread[TParam]]) = 
+  ## waits for every thread in `t` to finish.
+  when hostOS == "windows":
+    var a: array[0..255, TSysThread]
+    assert a.len >= t.len
+    for i in 0..t.high: a[i] = t[i].sys
+    discard WaitForMultipleObjects(t.len, cast[ptr TSysThread](addr(a)), 1, -1)
+  else:
+    for i in 0..t.high: joinThread(t[i])
+
+proc destroyThread*[TParam](t: var TThread[TParam]) {.inline.} =
+  ## forces the thread `t` to terminate. This is potentially dangerous if
+  ## you don't have full control over `t` and its aquired resources.
+  when hostOS == "windows":
+    discard TerminateThread(t.sys, 1'i32)
+  else:
+    discard pthread_cancel(t.sys)
+  unregisterThread(addr(t.gcInfo))
+
+proc createThread*[TParam](t: var TThread[TParam], 
+                           tp: proc (param: TParam), 
+                           param: TParam,
+                           stackSize = 1024*256*sizeof(int)) = 
+  ## creates a new thread `t` and starts its execution. Entry point is the
+  ## proc `tp`. `param` is passed to `tp`.
+  t.data = param
+  t.fn = tp
+  t.stackSize = stackSize
+  when hostOS == "windows":
+    var dummyThreadId: int32
+    t.sys = CreateThread(nil, stackSize, threadProcWrapper[TParam],
+                         addr(t), 0'i32, dummyThreadId)
+  else:
+    var a: Tpthread_attr
+    pthread_attr_init(a)
+    pthread_attr_setstacksize(a, stackSize)
+    if pthread_create(t.sys, a, threadProcWrapper[TParam], addr(t)) != 0:
+      raise newException(EIO, "cannot create thread")
+
+# --------------------------- lock handling ----------------------------------
+
+type
+  TLock* = TSysLock ## Nimrod lock
+  
+const
+  noDeadlocks = false # compileOption("deadlockPrevention")
+
+when nodeadlocks:
+  var
+    deadlocksPrevented* = 0  ## counts the number of times a 
+                             ## deadlock has been prevented
+
+proc InitLock*(lock: var TLock) {.inline.} =
+  ## Initializes the lock `lock`.
+  InitSysLock(lock)
+
+proc OrderedLocks(g: PGcThread): bool = 
+  for i in 0 .. g.locksLen-2:
+    if g.locks[i] >= g.locks[i+1]: return false
+  result = true
+
+proc TryAquire*(lock: var TLock): bool {.inline.} = 
+  ## Try to aquires the lock `lock`. Returns `true` on success.
+  when noDeadlocks:
+    result = TryAquireSys(lock)
+    if not result: return
+    # we have to add it to the ordered list. Oh, and we might fail if
+    # there is no space in the array left ...
+    var g = ThisThread()
+    if g.locksLen >= len(g.locks):
+      ReleaseSys(lock)
+      raise newException(EResourceExhausted, "cannot aquire additional lock")
+    # find the position to add:
+    var p = addr(lock)
+    var L = g.locksLen-1
+    var i = 0
+    while i <= L:
+      assert g.locks[i] != nil
+      if g.locks[i] < p: inc(i) # in correct order
+      elif g.locks[i] == p: return # thread already holds lock
+      else:
+        # do the crazy stuff here:
+        while L >= i:
+          g.locks[L+1] = g.locks[L]
+          dec L
+        g.locks[i] = p
+        inc(g.locksLen)
+        assert OrderedLocks(g)
+        return
+    # simply add to the end:
+    g.locks[g.locksLen] = p
+    inc(g.locksLen)
+    assert OrderedLocks(g)
+  else:
+    result = TryAquireSys(lock)
+
+proc Aquire*(lock: var TLock) =
+  ## Aquires the lock `lock`.
+  when nodeadlocks:
+    var g = ThisThread()
+    var p = addr(lock)
+    var L = g.locksLen-1
+    var i = 0
+    while i <= L:
+      assert g.locks[i] != nil
+      if g.locks[i] < p: inc(i) # in correct order
+      elif g.locks[i] == p: return # thread already holds lock
+      else:
+        # do the crazy stuff here:
+        if g.locksLen >= len(g.locks):
+          raise newException(EResourceExhausted, "cannot aquire additional lock")
+        while L >= i:
+          ReleaseSys(cast[ptr TSysLock](g.locks[L])[])
+          g.locks[L+1] = g.locks[L]
+          dec L
+        # aquire the current lock:
+        AquireSys(lock)
+        g.locks[i] = p
+        inc(g.locksLen)
+        # aquire old locks in proper order again:
+        L = g.locksLen-1
+        inc i
+        while i <= L:
+          AquireSys(cast[ptr TSysLock](g.locks[i])[])
+          inc(i)
+        # DANGER: We can only modify this global var if we gained every lock!
+        # NO! We need an atomic increment. Crap.
+        discard system.atomicInc(deadlocksPrevented, 1)
+        assert OrderedLocks(g)
+        return
+        
+    # simply add to the end:
+    if g.locksLen >= len(g.locks):
+      raise newException(EResourceExhausted, "cannot aquire additional lock")
+    AquireSys(lock)
+    g.locks[g.locksLen] = p
+    inc(g.locksLen)
+    assert OrderedLocks(g)
+  else:
+    AquireSys(lock)
+  
+proc Release*(lock: var TLock) =
+  ## Releases the lock `lock`.
+  when nodeadlocks:
+    var g = ThisThread()
+    var p = addr(lock)
+    var L = g.locksLen
+    for i in countdown(L-1, 0):
+      if g.locks[i] == p: 
+        for j in i..L-2: g.locks[j] = g.locks[j+1]
+        dec g.locksLen
+        break
+  ReleaseSys(lock)
+