diff options
author | Araq <rumpf_a@web.de> | 2011-05-17 19:22:29 +0200 |
---|---|---|
committer | Araq <rumpf_a@web.de> | 2011-05-17 19:22:29 +0200 |
commit | aeb0506132bc706750840ba0a79b486745e34c4e (patch) | |
tree | a1e2aeaf5e0309ec9aa5ee2bb1345b837e7af4a5 /lib | |
parent | 6dd8c850513956e38f78c02e173df2c4d7b05cf3 (diff) | |
download | Nim-aeb0506132bc706750840ba0a79b486745e34c4e.tar.gz |
thread support: next iteration
Diffstat (limited to 'lib')
-rwxr-xr-x | lib/core/threads.nim | 171 | ||||
-rwxr-xr-x | lib/system.nim | 6 | ||||
-rwxr-xr-x | lib/system/excpt.nim | 55 | ||||
-rwxr-xr-x | lib/system/gc.nim | 21 | ||||
-rwxr-xr-x | lib/system/systhread.nim | 57 |
5 files changed, 190 insertions, 120 deletions
diff --git a/lib/core/threads.nim b/lib/core/threads.nim index fab8251f6..ff33394d8 100755 --- a/lib/core/threads.nim +++ b/lib/core/threads.nim @@ -32,7 +32,10 @@ ## for i in 0..high(thr): ## joinThread(thr[i]) -when not defined(boehmgc) and not defined(nogc): +when not compileOption("threads"): + {.error: "Thread support requires ``--threads:on`` commandline switch".} + +when not defined(boehmgc) and not defined(nogc) and false: {.error: "Thread support requires --gc:boehm or --gc:none".} # We jump through some hops here to ensure that Nimrod thread procs can have @@ -46,39 +49,13 @@ type TThreadProcClosure {.pure, final.}[TParam] = object fn: proc (p: TParam) data: TParam + threadLocalStorage: pointer -when defined(Windows): - type +when defined(windows): + type THandle = int TSysThread = THandle - TSysLock {.final, pure.} = object # CRITICAL_SECTION in WinApi - DebugInfo: pointer - LockCount: int32 - RecursionCount: int32 - OwningThread: int - LockSemaphore: int - Reserved: int32 - TWinThreadProc = proc (x: pointer): int32 {.stdcall.} - - proc InitSysLock(L: var TSysLock) {.stdcall, - dynlib: "kernel32", importc: "InitializeCriticalSection".} - ## Initializes the lock `L`. - - proc TryAquireSysAux(L: var TSysLock): int32 {.stdcall, - dynlib: "kernel32", importc: "TryEnterCriticalSection".} - ## Tries to aquire the lock `L`. - - proc TryAquireSys(L: var TSysLock): bool {.inline.} = - result = TryAquireSysAux(L) != 0'i32 - - proc AquireSys(L: var TSysLock) {.stdcall, - dynlib: "kernel32", importc: "EnterCriticalSection".} - ## Aquires the lock `L`. - - proc ReleaseSys(L: var TSysLock) {.stdcall, - dynlib: "kernel32", importc: "LeaveCriticalSection".} - ## Releases the lock `L`. proc CreateThread(lpThreadAttributes: Pointer, dwStackSize: int32, lpStartAddress: TWinThreadProc, @@ -105,29 +82,17 @@ when defined(Windows): proc TerminateThread(hThread: THandle, dwExitCode: int32): int32 {. stdcall, dynlib: "kernel32", importc: "TerminateThread".} + {.push stack_trace:off.} proc threadProcWrapper[TParam](closure: pointer): int32 {.stdcall.} = var c = cast[ptr TThreadProcClosure[TParam]](closure) + SetThreadLocalStorage(c.threadLocalStorage) c.fn(c.data) # implicitely return 0 + {.pop.} else: type - TSysLock {.importc: "pthread_mutex_t", header: "<sys/types.h>".} = int TSysThread {.importc: "pthread_t", header: "<sys/types.h>".} = int - - proc InitSysLock(L: var TSysLock, attr: pointer = nil) {. - importc: "pthread_mutex_init", header: "<pthread.h>".} - - proc AquireSys(L: var TSysLock) {. - importc: "pthread_mutex_lock", header: "<pthread.h>".} - proc TryAquireSysAux(L: var TSysLock): cint {. - importc: "pthread_mutex_trylock", header: "<pthread.h>".} - - proc TryAquireSys(L: var TSysLock): bool {.inline.} = - result = TryAquireSysAux(L) == 0'i32 - - proc ReleaseSys(L: var TSysLock) {. - importc: "pthread_mutex_unlock", header: "<pthread.h>".} proc pthread_create(a1: var TSysThread, a2: ptr int, a3: proc (x: pointer) {.noconv.}, @@ -139,14 +104,19 @@ else: proc pthread_cancel(a1: TSysThread): cint {. importc: "pthread_cancel", header: "<pthread.h>".} + {.push stack_trace:off.} proc threadProcWrapper[TParam](closure: pointer) {.noconv.} = var c = cast[ptr TThreadProcClosure[TParam]](closure) + SetThreadLocalStorage(c.threadLocalStorage) c.fn(c.data) + {.pop.} const - noDeadlocks = true # compileOption("deadlockPrevention") + noDeadlocks = false # compileOption("deadlockPrevention") +include "lib/system/systhread" + when noDeadLocks: type TLock* {.pure, final.} = object ## Standard Nimrod Lock type. @@ -281,62 +251,81 @@ proc createThread*[TParam](t: var TThread[TParam], param: TParam) = ## creates a new thread `t` and starts its execution. Entry point is the ## proc `tp`. `param` is passed to `tp`. + t.globals = AllocThreadLocalStorage() t.c.data = param t.c.fn = tp - t.globals = CreateThreadLocalStorage() when hostOS == "windows": var dummyThreadId: int32 t.sys = CreateThread(nil, 0'i32, threadProcWrapper[TParam], addr(t.c), 0'i32, dummyThreadId) else: - discard pthread_create(t.sys, nil, threadProcWrapper[TParam], addr(t.c)) + if pthread_create(t.sys, nil, threadProcWrapper[TParam], addr(t.c)) != 0: + raise newException(EIO, "cannot create thread") when isMainModule: + import os + var - thr: array [0..4, TThread[tuple[a,b: int]]] + thr: array [0..1, TThread[tuple[a,b: int]]] L, M, N: TLock + proc doNothing() = nil + + {.push stack_trace:off.} proc threadFunc(interval: tuple[a,b: int]) {.procvar.} = - for i in interval.a..interval.b: - case i mod 6 - of 0: - Aquire(L) # lock stdout - Aquire(M) - Aquire(N) - of 1: - Aquire(L) - Aquire(N) # lock stdout - Aquire(M) - of 2: - Aquire(M) - Aquire(L) - Aquire(N) - of 3: - Aquire(M) - Aquire(N) - Aquire(L) - of 4: - Aquire(N) - Aquire(M) - Aquire(L) - of 5: - Aquire(N) - Aquire(L) - Aquire(M) - else: assert false - echo i - echo "deadlocks prevented: ", deadlocksPrevented - Release(L) - Release(M) - Release(N) - - InitLock(L) - InitLock(M) - InitLock(N) - - for i in 0..high(thr): - createThread(thr[i], threadFunc, (i*100, i*100+50)) - for i in 0..high(thr): - joinThread(thr[i]) - + doNothing() + when false: + for i in interval.a..interval.b: + when nodeadlocks: + case i mod 6 + of 0: + Aquire(L) # lock stdout + Aquire(M) + Aquire(N) + of 1: + Aquire(L) + Aquire(N) # lock stdout + Aquire(M) + of 2: + Aquire(M) + Aquire(L) + Aquire(N) + of 3: + Aquire(M) + Aquire(N) + Aquire(L) + of 4: + Aquire(N) + Aquire(M) + Aquire(L) + of 5: + Aquire(N) + Aquire(L) + Aquire(M) + else: assert false + else: + Aquire(L) # lock stdout + Aquire(M) + Aquire(N) + + #echo i + os.sleep(10) + stdout.write(i) + when nodeadlocks: + echo "deadlocks prevented: ", deadlocksPrevented + Release(L) + Release(M) + Release(N) + {.pop.} + #InitLock(L) + #InitLock(M) + #InitLock(N) + + proc main = + for i in 0..high(thr): + createThread(thr[i], threadFunc, (i*100, i*100+50)) + for i in 0..high(thr): + joinThread(thr[i]) + + main() diff --git a/lib/system.nim b/lib/system.nim index 1fd17210f..b8093cdad 100755 --- a/lib/system.nim +++ b/lib/system.nim @@ -1369,7 +1369,7 @@ template accumulateResult*(iter: expr) = # we have to compute this here before turning it off in except.nim anyway ... const nimrodStackTrace = compileOption("stacktrace") -{.push checks: off, line_dir: off, debugger: off.} +{.push checks: off, debugger: off.} # obviously we cannot generate checking operations here :-) # because it would yield into an endless recursion # however, stack-traces are available for most parts @@ -1665,6 +1665,7 @@ when not defined(EcmaScript) and not defined(NimrodVM): # ---------------------------------------------------------------------------- + include "system/systhread" include "system/excpt" # we cannot compile this with stack tracing on # as it would recurse endlessly! @@ -1718,12 +1719,11 @@ when not defined(EcmaScript) and not defined(NimrodVM): else: result = n.sons[n.len] - include "system/systhread" {.push stack_trace: off.} include "system/mmdisp" + include "system/sysstr" {.pop.} - include "system/sysstr" include "system/assign" include "system/repr" diff --git a/lib/system/excpt.nim b/lib/system/excpt.nim index e2d3bea08..dac5678e0 100755 --- a/lib/system/excpt.nim +++ b/lib/system/excpt.nim @@ -81,8 +81,12 @@ when hasThreadSupport: proc pthread_setspecific(a1: Tpthread_key, a2: pointer): int32 {. importc: "pthread_setspecific", header: "<pthread.h>".} - proc specificDestroy(mem: pointer) {.noconv.} = dealloc(mem) - + proc specificDestroy(mem: pointer) {.noconv.} = + #aquireSys(heapLock) + #dealloc(mem) + #releaseSys(heapLock) + #c_free(mem) + proc ThreadVarAlloc(): TThreadVarSlot {.compilerproc, inline.} = discard pthread_key_create(addr(result), specificDestroy) proc ThreadVarSetValue(s: TThreadVarSlot, value: pointer) {. @@ -104,31 +108,50 @@ when hasThreadSupport: tempFrames: array [0..127, PFrame] # cannot be allocated on the stack! data: float # compiler should add thread local variables here! PGlobals = ptr TGlobals - - var globalsSlot = ThreadVarAlloc() - proc CreateThreadLocalStorage*(): pointer {.inl.} = + + # it's more efficient to not use a global variable for the thread storage + # slot, but to rely on the implementation to assign slot 0 for us... ;-) + var checkSlot = ThreadVarAlloc() + const globalsSlot = TThreadVarSlot(0) + assert checkSlot.int == globalsSlot.int + + proc AtomicAlloc0(size: int): pointer = + #AquireSys(heapLock) + result = c_malloc(size) + zeroMem(result, size) + #ReleaseSys(heapLock) + + proc NewGlobals(): PGlobals = + result = cast[PGlobals](AtomicAlloc0(sizeof(TGlobals))) + new(result.gAssertionFailed) + result.buf = newStringOfCap(2000) + result.assertBuf = newStringOfCap(2000) + + proc AllocThreadLocalStorage*(): pointer {.inl.} = isMultiThreaded = true - result = alloc0(sizeof(TGlobals)) - ThreadVarSetValue(globalsSlot, result) + result = NewGlobals() + + proc SetThreadLocalStorage*(p: pointer) {.inl.} = + ThreadVarSetValue(globalsSlot, p) proc GetGlobals(): PGlobals {.compilerRtl, inl.} = result = cast[PGlobals](ThreadVarGetValue(globalsSlot)) # create for the main thread: - ThreadVarSetValue(globalsSlot, alloc0(sizeof(TGlobals))) + ThreadVarSetValue(globalsSlot, NewGlobals()) when hasThreadSupport: - template ThreadGlobals = + template ThreadGlobals = var globals = GetGlobals() template `||`(varname: expr): expr = globals.varname - ThreadGlobals() + #ThreadGlobals() else: template ThreadGlobals = nil # nothing template `||`(varname: expr): expr = varname var - framePtr {.compilerproc.}: PFrame # XXX only temporarily a compilerproc + framePtr: PFrame excHandler: PSafePoint = nil # list of exception handlers # a global variable for the root of all try blocks @@ -141,6 +164,11 @@ else: tempFrames: array [0..127, PFrame] # cannot be allocated on the stack! gAssertionFailed: ref EAssertionFailed + new(||gAssertionFailed) + ||buf = newStringOfCap(2000) + ||assertBuf = newStringOfCap(2000) + + proc pushFrame(s: PFrame) {.compilerRtl, inl.} = ThreadGlobals() s.prev = ||framePtr @@ -388,11 +416,6 @@ proc registerSignalHandler() = when not defined(noSignalHandler): registerSignalHandler() # call it in initialization section -# for easier debugging of the GC, this memory is only allocated after the -# signal handlers have been registered -new(||gAssertionFailed) -||buf = newStringOfCap(2000) -||assertBuf = newStringOfCap(2000) proc raiseRangeError(val: biggestInt) {.compilerproc, noreturn, noinline.} = raise newException(EOutOfRange, "value " & $val & " out of range") diff --git a/lib/system/gc.nim b/lib/system/gc.nim index 52de66d48..3a7270539 100755 --- a/lib/system/gc.nim +++ b/lib/system/gc.nim @@ -81,14 +81,14 @@ var proc aquire(gch: var TGcHeap) {.inline.} = when hasThreadSupport: if isMultiThreaded: - aquire(gch.zctLock) - aquire(gch.cycleRootsLock) + aquireSys(gch.zctLock) + aquireSys(gch.cycleRootsLock) proc release(gch: var TGcHeap) {.inline.} = when hasThreadSupport: if isMultiThreaded: - release(gch.zctLock) - release(gch.cycleRootsLock) + releaseSys(gch.zctLock) + releaseSys(gch.cycleRootsLock) proc addZCT(s: var TCellSeq, c: PCell) {.noinline.} = if (c.refcount and rcZct) == 0: @@ -207,18 +207,18 @@ proc prepareDealloc(cell: PCell) = proc rtlAddCycleRoot(c: PCell) {.rtl, inl.} = # we MUST access gch as a global here, because this crosses DLL boundaries! when hasThreadSupport: - if isMultiThreaded: Aquire(gch.cycleRootsLock) + if isMultiThreaded: AquireSys(gch.cycleRootsLock) incl(gch.cycleRoots, c) when hasThreadSupport: - if isMultiThreaded: Release(gch.cycleRootsLock) + if isMultiThreaded: ReleaseSys(gch.cycleRootsLock) proc rtlAddZCT(c: PCell) {.rtl, inl.} = # we MUST access gch as a global here, because this crosses DLL boundaries! when hasThreadSupport: - if isMultiThreaded: Aquire(gch.zctLock) + if isMultiThreaded: AquireSys(gch.zctLock) addZCT(gch.zct, c) when hasThreadSupport: - if isMultiThreaded: Release(gch.zctLock) + if isMultiThreaded: ReleaseSys(gch.zctLock) proc decRef(c: PCell) {.inline.} = when stressGC: @@ -287,9 +287,10 @@ proc initGC() = Init(gch.cycleRoots) Init(gch.decStack) when hasThreadSupport: - InitLock(gch.cycleRootsLock) - InitLock(gch.zctLock) + InitSysLock(gch.cycleRootsLock) + InitSysLock(gch.zctLock) new(gOutOfMem) # reserve space for the EOutOfMemory exception here! + proc forAllSlotsAux(dest: pointer, n: ptr TNimNode, op: TWalkOp) = var d = cast[TAddress](dest) diff --git a/lib/system/systhread.nim b/lib/system/systhread.nim index d9b340ce2..2b5057ff0 100755 --- a/lib/system/systhread.nim +++ b/lib/system/systhread.nim @@ -9,6 +9,12 @@ const maxThreads = 256 + SystemInclude = defined(hasThreadSupport) + +when not SystemInclude: + # ugly hack: this file is then included from core/threads, so we have + # thread support: + const hasThreadSupport = true when (defined(gcc) or defined(llvm_gcc)) and hasThreadSupport: proc sync_add_and_fetch(p: var int, val: int): int {. @@ -40,3 +46,54 @@ proc atomicDec(memLoc: var int, x: int): int = dec(memLoc, x) result = memLoc +when defined(Windows): + type + TSysLock {.final, pure.} = object # CRITICAL_SECTION in WinApi + DebugInfo: pointer + LockCount: int32 + RecursionCount: int32 + OwningThread: int + LockSemaphore: int + Reserved: int32 + + proc InitSysLock(L: var TSysLock) {.stdcall, + dynlib: "kernel32", importc: "InitializeCriticalSection".} + ## Initializes the lock `L`. + + proc TryAquireSysAux(L: var TSysLock): int32 {.stdcall, + dynlib: "kernel32", importc: "TryEnterCriticalSection".} + ## Tries to aquire the lock `L`. + + proc TryAquireSys(L: var TSysLock): bool {.inline.} = + result = TryAquireSysAux(L) != 0'i32 + + proc AquireSys(L: var TSysLock) {.stdcall, + dynlib: "kernel32", importc: "EnterCriticalSection".} + ## Aquires the lock `L`. + + proc ReleaseSys(L: var TSysLock) {.stdcall, + dynlib: "kernel32", importc: "LeaveCriticalSection".} + ## Releases the lock `L`. + +else: + type + TSysLock {.importc: "pthread_mutex_t", header: "<sys/types.h>".} = int + + proc InitSysLock(L: var TSysLock, attr: pointer = nil) {. + importc: "pthread_mutex_init", header: "<pthread.h>".} + + proc AquireSys(L: var TSysLock) {. + importc: "pthread_mutex_lock", header: "<pthread.h>".} + proc TryAquireSysAux(L: var TSysLock): cint {. + importc: "pthread_mutex_trylock", header: "<pthread.h>".} + + proc TryAquireSys(L: var TSysLock): bool {.inline.} = + result = TryAquireSysAux(L) == 0'i32 + + proc ReleaseSys(L: var TSysLock) {. + importc: "pthread_mutex_unlock", header: "<pthread.h>".} + +when SystemInclude: + var heapLock: TSysLock + InitSysLock(HeapLock) + |