diff options
Diffstat (limited to 'lib/system/threads.nim')
-rwxr-xr-x | lib/system/threads.nim | 256 |
1 files changed, 120 insertions, 136 deletions
diff --git a/lib/system/threads.nim b/lib/system/threads.nim index 86a6a5691..9bb67863b 100755 --- a/lib/system/threads.nim +++ b/lib/system/threads.nim @@ -25,8 +25,8 @@ ## thr: array [0..4, TThread[tuple[a,b: int]]] ## L: TLock ## -## proc threadFunc(interval: tuple[a,b: int]) {.procvar.} = -## for i in interval.a..interval.b: +## proc threadFunc(interval: tuple[a,b: int]) {.thread.} = +## for i in interval.a..interval.b: ## Acquire(L) # lock stdout ## echo i ## Release(L) @@ -41,38 +41,13 @@ const maxRegisters = 256 # don't think there is an arch with more registers maxLocksPerThread* = 10 ## max number of locks a thread can hold ## at the same time + useStackMaskHack = false ## use the stack mask hack for better performance + StackGuardSize = 4096 + ThreadStackMask = 1024*256*sizeof(int)-1 + ThreadStackSize = ThreadStackMask+1 - StackGuardSize -when defined(Windows): - type - TSysLock {.final, pure.} = object # CRITICAL_SECTION in WinApi - DebugInfo: pointer - LockCount: int32 - RecursionCount: int32 - OwningThread: int - LockSemaphore: int - Reserved: int32 - - proc InitSysLock(L: var TSysLock) {.stdcall, noSideEffect, - dynlib: "kernel32", importc: "InitializeCriticalSection".} - ## Initializes the lock `L`. - - proc TryAcquireSysAux(L: var TSysLock): int32 {.stdcall, noSideEffect, - dynlib: "kernel32", importc: "TryEnterCriticalSection".} - ## Tries to acquire the lock `L`. - - proc TryAcquireSys(L: var TSysLock): bool {.inline.} = - result = TryAcquireSysAux(L) != 0'i32 - - proc AcquireSys(L: var TSysLock) {.stdcall, noSideEffect, - dynlib: "kernel32", importc: "EnterCriticalSection".} - ## Acquires the lock `L`. - - proc ReleaseSys(L: var TSysLock) {.stdcall, noSideEffect, - dynlib: "kernel32", importc: "LeaveCriticalSection".} - ## Releases the lock `L`. - +when defined(windows): type - THandle = int TSysThread = THandle TWinThreadProc = proc (x: pointer): int32 {.stdcall.} @@ -95,9 +70,6 @@ when defined(Windows): dwMilliseconds: int32): int32 {. stdcall, dynlib: "kernel32", importc: "WaitForMultipleObjects".} - proc WaitForSingleObject(hHandle: TSysThread, dwMilliseconds: int32): int32 {. - stdcall, dynlib: "kernel32", importc: "WaitForSingleObject".} - proc TerminateThread(hThread: TSysThread, dwExitCode: int32): int32 {. stdcall, dynlib: "kernel32", importc: "TerminateThread".} @@ -116,24 +88,6 @@ else: {.passC: "-pthread".} type - TSysLock {.importc: "pthread_mutex_t", pure, final, - header: "<sys/types.h>".} = object - - proc InitSysLock(L: var TSysLock, attr: pointer = nil) {. - importc: "pthread_mutex_init", header: "<pthread.h>", noSideEffect.} - - proc AcquireSys(L: var TSysLock) {.noSideEffect, - importc: "pthread_mutex_lock", header: "<pthread.h>".} - proc TryAcquireSysAux(L: var TSysLock): cint {.noSideEffect, - importc: "pthread_mutex_trylock", header: "<pthread.h>".} - - proc TryAcquireSys(L: var TSysLock): bool {.inline.} = - result = TryAcquireSysAux(L) == 0'i32 - - proc ReleaseSys(L: var TSysLock) {.noSideEffect, - importc: "pthread_mutex_unlock", header: "<pthread.h>".} - - type TSysThread {.importc: "pthread_t", header: "<sys/types.h>", final, pure.} = object Tpthread_attr {.importc: "pthread_attr_t", @@ -191,57 +145,71 @@ else: proc ThreadVarGetValue(s: TThreadVarSlot): pointer {.inline.} = result = pthread_getspecific(s) -const emulatedThreadVars = defined(macosx) + when useStackMaskHack: + proc pthread_attr_setstack(attr: var TPthread_attr, stackaddr: pointer, + size: int): cint {. + importc: "pthread_attr_setstack", header: "<pthread.h>".} + +const + emulatedThreadVars = true when emulatedThreadVars: # the compiler generates this proc for us, so that we can get the size of - # the thread local var block: + # the thread local var block; we use this only for sanity checking though proc NimThreadVarsSize(): int {.noconv, importc: "NimThreadVarsSize".} -proc ThreadVarsAlloc(size: int): pointer = - result = c_malloc(size) - zeroMem(result, size) -proc ThreadVarsDealloc(p: pointer) {.importc: "free", nodecl.} - +# we preallocate a fixed size for thread local storage, so that no heap +# allocations are needed. Currently less than 7K are used on a 64bit machine. +# We use ``float`` for proper alignment: type + TThreadLocalStorage = array [0..1_000, float] + PGcThread = ptr TGcThread TGcThread {.pure.} = object sys: TSysThread next, prev: PGcThread - stackBottom, stackTop, threadLocalStorage: pointer + stackBottom, stackTop: pointer stackSize: int - locksLen: int - locks: array [0..MaxLocksPerThread-1, pointer] - registers: array[0..maxRegisters-1, pointer] # register contents for GC + inbox: TThreadLocalStorage + when emulatedThreadVars and not useStackMaskHack: + tls: TThreadLocalStorage + else: + nil # XXX it'd be more efficient to not use a global variable for the # thread storage slot, but to rely on the implementation to assign slot 0 # for us... ;-) var globalsSlot = ThreadVarAlloc() #const globalsSlot = TThreadVarSlot(0) -#assert checkSlot.int == globalsSlot.int - -proc ThisThread(): PGcThread {.compilerRtl, inl.} = - result = cast[PGcThread](ThreadVarGetValue(globalsSlot)) +#sysAssert checkSlot.int == globalsSlot.int proc GetThreadLocalVars(): pointer {.compilerRtl, inl.} = - result = cast[PGcThread](ThreadVarGetValue(globalsSlot)).threadLocalStorage + result = addr(cast[PGcThread](ThreadVarGetValue(globalsSlot)).tls) + +when useStackMaskHack: + proc MaskStackPointer(offset: int): pointer {.compilerRtl, inl.} = + var x {.volatile.}: pointer + x = addr(x) + result = cast[pointer]((cast[int](x) and not ThreadStackMask) +% + (0) +% offset) # create for the main thread. Note: do not insert this data into the list # of all threads; it's not to be stopped etc. when not defined(useNimRtl): - var mainThread: TGcThread - - ThreadVarSetValue(globalsSlot, addr(mainThread)) - when emulatedThreadVars: - mainThread.threadLocalStorage = ThreadVarsAlloc(NimThreadVarsSize()) - - initStackBottom() - initGC() + when not useStackMaskHack: + var mainThread: TGcThread + ThreadVarSetValue(globalsSlot, addr(mainThread)) + initStackBottom() + initGC() var heapLock: TSysLock InitSysLock(HeapLock) + when emulatedThreadVars: + if NimThreadVarsSize() > sizeof(TThreadLocalStorage): + echo "too large thread local storage size requested" + quit 1 + var threadList: PGcThread @@ -251,11 +219,11 @@ when not defined(useNimRtl): t.prev = nil t.next = threadList if threadList != nil: - assert(threadList.prev == nil) + sysAssert(threadList.prev == nil) threadList.prev = t threadList = t ReleaseSys(HeapLock) - + proc unregisterThread(t: PGcThread) = # we need to use the GC global lock here! AcquireSys(HeapLock) @@ -270,9 +238,7 @@ when not defined(useNimRtl): # on UNIX, the GC uses ``SIGFREEZE`` to tell every thread to stop so that # the GC can examine the stacks? - - proc stopTheWord() = - nil + proc stopTheWord() = nil # We jump through some hops here to ensure that Nimrod thread procs can have # the Nimrod calling convention. This is needed because thread procs are @@ -286,26 +252,33 @@ type fn: proc (p: TParam) data: TParam +proc initInbox(p: pointer) +proc freeInbox(p: pointer) when not defined(boehmgc) and not hasSharedHeap: proc deallocOsPages() template ThreadProcWrapperBody(closure: expr) = ThreadVarSetValue(globalsSlot, closure) var t = cast[ptr TThread[TParam]](closure) - when emulatedThreadVars: - t.threadLocalStorage = ThreadVarsAlloc(NimThreadVarsSize()) + when useStackMaskHack: + var tls: TThreadLocalStorage when not defined(boehmgc) and not hasSharedHeap: # init the GC for this thread: setStackBottom(addr(t)) initGC() t.stackBottom = addr(t) registerThread(t) + initInbox(addr(t.inbox)) try: + when false: + var a = addr(tls) + var b = MaskStackPointer(1293920-372736-303104-36864) + c_fprintf(c_stdout, "TLS: %p\nmasked: %p\ndiff: %ld\n", + a, b, cast[int](a) - cast[int](b)) t.fn(t.data) finally: # XXX shut-down is not executed when the thread is forced down! - when emulatedThreadVars: - ThreadVarsDealloc(t.threadLocalStorage) + freeInbox(addr(t.inbox)) unregisterThread(t) when defined(deallocOsPages): deallocOsPages() @@ -330,7 +303,7 @@ proc joinThreads*[TParam](t: openArray[TThread[TParam]]) = ## waits for every thread in `t` to finish. when hostOS == "windows": var a: array[0..255, TSysThread] - assert a.len >= t.len + sysAssert a.len >= t.len for i in 0..t.high: a[i] = t[i].sys discard WaitForMultipleObjects(t.len, cast[ptr TSysThread](addr(a)), 1, -1) else: @@ -338,7 +311,7 @@ proc joinThreads*[TParam](t: openArray[TThread[TParam]]) = when false: # XXX a thread should really release its heap here somehow: - proc destroyThread*[TParam](t: var TThread[TParam]) {.inline.} = + proc destroyThread*[TParam](t: var TThread[TParam]) = ## forces the thread `t` to terminate. This is potentially dangerous if ## you don't have full control over `t` and its acquired resources. when hostOS == "windows": @@ -348,28 +321,32 @@ when false: unregisterThread(addr(t)) proc createThread*[TParam](t: var TThread[TParam], - tp: proc (param: TParam), - param: TParam, - stackSize = 1024*256*sizeof(int)) {. - magic: "CreateThread".} = + tp: proc (param: TParam) {.thread.}, + param: TParam) = ## creates a new thread `t` and starts its execution. Entry point is the ## proc `tp`. `param` is passed to `tp`. t.data = param t.fn = tp - t.stackSize = stackSize + t.stackSize = ThreadStackSize when hostOS == "windows": var dummyThreadId: int32 - t.sys = CreateThread(nil, stackSize, threadProcWrapper[TParam], + t.sys = CreateThread(nil, ThreadStackSize, threadProcWrapper[TParam], addr(t), 0'i32, dummyThreadId) if t.sys <= 0: raise newException(EResourceExhausted, "cannot create thread") else: var a: Tpthread_attr pthread_attr_init(a) - pthread_attr_setstacksize(a, stackSize) + pthread_attr_setstacksize(a, ThreadStackSize) if pthread_create(t.sys, a, threadProcWrapper[TParam], addr(t)) != 0: raise newException(EResourceExhausted, "cannot create thread") +when useStackMaskHack: + proc runMain(tp: proc (dummy: pointer) {.thread.}) {.compilerproc.} = + var mainThread: TThread[pointer] + createThread(mainThread, tp, nil) + joinThread(mainThread) + # --------------------------- lock handling ---------------------------------- type @@ -380,18 +357,20 @@ const when nodeadlocks: var - deadlocksPrevented* = 0 ## counts the number of times a + deadlocksPrevented*: int ## counts the number of times a ## deadlock has been prevented + locksLen {.threadvar.}: int + locks {.threadvar.}: array [0..MaxLocksPerThread-1, pointer] + + proc OrderedLocks(): bool = + for i in 0 .. locksLen-2: + if locks[i] >= locks[i+1]: return false + result = true proc InitLock*(lock: var TLock) {.inline.} = ## Initializes the lock `lock`. InitSysLock(lock) -proc OrderedLocks(g: PGcThread): bool = - for i in 0 .. g.locksLen-2: - if g.locks[i] >= g.locks[i+1]: return false - result = true - proc TryAcquire*(lock: var TLock): bool {.inline.} = ## Try to acquires the lock `lock`. Returns `true` on success. result = TryAcquireSys(lock) @@ -399,88 +378,93 @@ proc TryAcquire*(lock: var TLock): bool {.inline.} = if not result: return # we have to add it to the ordered list. Oh, and we might fail if # there is no space in the array left ... - var g = ThisThread() - if g.locksLen >= len(g.locks): + if locksLen >= len(locks): ReleaseSys(lock) raise newException(EResourceExhausted, "cannot acquire additional lock") # find the position to add: var p = addr(lock) - var L = g.locksLen-1 + var L = locksLen-1 var i = 0 while i <= L: - assert g.locks[i] != nil - if g.locks[i] < p: inc(i) # in correct order - elif g.locks[i] == p: return # thread already holds lock + sysAssert locks[i] != nil + if locks[i] < p: inc(i) # in correct order + elif locks[i] == p: return # thread already holds lock else: # do the crazy stuff here: while L >= i: - g.locks[L+1] = g.locks[L] + locks[L+1] = locks[L] dec L - g.locks[i] = p - inc(g.locksLen) - assert OrderedLocks(g) + locks[i] = p + inc(locksLen) + sysAssert OrderedLocks() return # simply add to the end: - g.locks[g.locksLen] = p - inc(g.locksLen) - assert OrderedLocks(g) + locks[locksLen] = p + inc(locksLen) + sysAssert OrderedLocks(g) proc Acquire*(lock: var TLock) = ## Acquires the lock `lock`. when nodeadlocks: - var g = ThisThread() var p = addr(lock) - var L = g.locksLen-1 + var L = locksLen-1 var i = 0 while i <= L: - assert g.locks[i] != nil - if g.locks[i] < p: inc(i) # in correct order - elif g.locks[i] == p: return # thread already holds lock + sysAssert locks[i] != nil + if locks[i] < p: inc(i) # in correct order + elif locks[i] == p: return # thread already holds lock else: # do the crazy stuff here: - if g.locksLen >= len(g.locks): + if locksLen >= len(locks): raise newException(EResourceExhausted, "cannot acquire additional lock") while L >= i: - ReleaseSys(cast[ptr TSysLock](g.locks[L])[]) - g.locks[L+1] = g.locks[L] + ReleaseSys(cast[ptr TSysLock](locks[L])[]) + locks[L+1] = locks[L] dec L # acquire the current lock: AcquireSys(lock) - g.locks[i] = p - inc(g.locksLen) + locks[i] = p + inc(locksLen) # acquire old locks in proper order again: - L = g.locksLen-1 + L = locksLen-1 inc i while i <= L: - AcquireSys(cast[ptr TSysLock](g.locks[i])[]) + AcquireSys(cast[ptr TSysLock](locks[i])[]) inc(i) # DANGER: We can only modify this global var if we gained every lock! # NO! We need an atomic increment. Crap. discard system.atomicInc(deadlocksPrevented, 1) - assert OrderedLocks(g) + sysAssert OrderedLocks(g) return # simply add to the end: - if g.locksLen >= len(g.locks): + if locksLen >= len(locks): raise newException(EResourceExhausted, "cannot acquire additional lock") AcquireSys(lock) - g.locks[g.locksLen] = p - inc(g.locksLen) - assert OrderedLocks(g) + locks[locksLen] = p + inc(locksLen) + sysAssert OrderedLocks(g) else: AcquireSys(lock) proc Release*(lock: var TLock) = ## Releases the lock `lock`. when nodeadlocks: - var g = ThisThread() var p = addr(lock) - var L = g.locksLen + var L = locksLen for i in countdown(L-1, 0): - if g.locks[i] == p: - for j in i..L-2: g.locks[j] = g.locks[j+1] - dec g.locksLen + if locks[i] == p: + for j in i..L-2: locks[j] = locks[j+1] + dec locksLen break ReleaseSys(lock) +# ------------------------ message passing support --------------------------- + +proc getInBoxMem*[TMsg](t: var TThread[TMsg]): pointer {.inline.} = + result = addr(t.inbox) + +proc getInBoxMem*(): pointer {.inline.} = + result = addr(cast[PGcThread](ThreadVarGetValue(globalsSlot)).inbox) + |