diff options
Diffstat (limited to 'lib/core/threads.nim')
-rwxr-xr-x | lib/core/threads.nim | 227 |
1 files changed, 123 insertions, 104 deletions
diff --git a/lib/core/threads.nim b/lib/core/threads.nim index f6ebb40dd..77253af20 100755 --- a/lib/core/threads.nim +++ b/lib/core/threads.nim @@ -8,8 +8,9 @@ # ## Basic thread support for Nimrod. Note that Nimrod's default GC is still -## single-threaded. This means that either your threads should not allocate -## GC'ed memory, or you should compile with ``--gc:none`` or ``--gc:boehm``. +## single-threaded. This means that you MUST turn off the GC while multiple +## threads are executing that allocate GC'ed memory. The alternative is to +## compile with ``--gc:none`` or ``--gc:boehm``. ## ## Example: ## @@ -27,16 +28,20 @@ ## ## InitLock(L) ## +## GC_disable() # native GC does not support multiple thready yet :-( ## for i in 0..high(thr): ## createThread(thr[i], threadFunc, (i*10, i*10+5)) ## for i in 0..high(thr): ## joinThread(thr[i]) +## GC_enable() when not compileOption("threads"): {.error: "Thread support requires ``--threads:on`` commandline switch".} when not defined(boehmgc) and not defined(nogc) and false: {.error: "Thread support requires --gc:boehm or --gc:none".} + +include "lib/system/systhread" # We jump through some hops here to ensure that Nimrod thread procs can have # the Nimrod calling convention. This is needed because thread procs are @@ -93,6 +98,10 @@ when defined(windows): else: type TSysThread {.importc: "pthread_t", header: "<sys/types.h>".} = int + Ttimespec {.importc: "struct timespec", + header: "<time.h>", final, pure.} = object + tv_sec: int + tv_nsec: int proc pthread_create(a1: var TSysThread, a2: ptr int, a3: proc (x: pointer) {.noconv.}, @@ -104,6 +113,17 @@ else: proc pthread_cancel(a1: TSysThread): cint {. importc: "pthread_cancel", header: "<pthread.h>".} + proc AquireSysTimeoutAux(L: var TSysLock, timeout: var Ttimespec): cint {. + importc: "pthread_mutex_timedlock", header: "<time.h>".} + + proc AquireSysTimeout(L: var TSysLock, msTimeout: int) {.inline.} = + var a: Ttimespec + a.tv_sec = msTimeout div 1000 + a.tv_nsec = (msTimeout mod 1000) * 1000 + var res = AquireSysTimeoutAux(L, a) + if res != 0'i32: + raise newException(EResourceExhausted, $strerror(res)) + {.push stack_trace:off.} proc threadProcWrapper[TParam](closure: pointer) {.noconv.} = var c = cast[ptr TThreadProcClosure[TParam]](closure) @@ -114,121 +134,119 @@ else: const noDeadlocks = false # compileOption("deadlockPrevention") - -include "lib/system/systhread" -when noDeadLocks: - type - TLock* {.pure, final.} = object ## Standard Nimrod Lock type. - key: int # used for identity and global order! - sys: TSysLock - next: ptr TLock -else: - type - TLock* = TSysLock - type + TLock* = TSysLock TThread* {.pure, final.}[TParam] = object ## Nimrod thread. sys: TSysThread c: TThreadProcClosure[TParam] when nodeadlocks: - var - lockList {.threadvar.}: ptr TLock + var deadlocksPrevented* = 0 ## counts the number of times a ## deadlock has been prevented -proc InitLock*(L: var TLock) {.inline.} = - ## Initializes the lock `L`. - when noDeadlocks: - InitSysLock(L.sys) - L.key = cast[int](addr(L)) - else: - InitSysLock(L) +proc InitLock*(lock: var TLock) {.inline.} = + ## Initializes the lock `lock`. + InitSysLock(lock) + +proc OrderedLocks(g: PGlobals): bool = + for i in 0 .. g.locksLen-2: + if g.locks[i] >= g.locks[i+1]: return false + result = true -proc TryAquire*(L: var TLock): bool {.inline.} = - ## Try to aquires the lock `L`. Returns `true` on success. +proc TryAquire*(lock: var TLock): bool {.inline.} = + ## Try to aquires the lock `lock`. Returns `true` on success. when noDeadlocks: - result = TryAquireSys(L.sys) + result = TryAquireSys(lock) + if not result: return + # we have to add it to the ordered list. Oh, and we might fail if there# + # there is no space in the array left ... + var g = GetGlobals() + if g.locksLen >= len(g.locks): + ReleaseSys(lock) + raise newException(EResourceExhausted, "cannot aquire additional lock") + # find the position to add: + var p = addr(lock) + var L = g.locksLen-1 + var i = 0 + while i <= L: + assert g.locks[i] != nil + if g.locks[i] < p: inc(i) # in correct order + elif g.locks[i] == p: return # thread already holds lock + else: + # do the crazy stuff here: + while L >= i: + g.locks[L+1] = g.locks[L] + dec L + g.locks[i] = p + inc(g.locksLen) + assert OrderedLocks(g) + return + # simply add to the end: + g.locks[g.locksLen] = p + inc(g.locksLen) + assert OrderedLocks(g) else: - result = TryAquireSys(L) + result = TryAquireSys(lock) -proc Aquire*(L: var TLock) = - ## Aquires the lock `L`. +proc Aquire*(lock: var TLock) = + ## Aquires the lock `lock`. when nodeadlocks: - # Note: we MUST NOT change the linked list of locks before we have aquired - # the proper locks! This is because the pointer to the next lock is part - # of the lock itself! - assert L.key != 0 - var p = lockList - if p == nil: - # simple case: no lock aquired yet: - AquireSys(L.sys) - locklist = addr(L) - L.next = nil - else: - # check where to put L into the list: - var r = p - var last: ptr TLock = nil - while L.key < r.key: - if r.next == nil: - # best case: L needs to be aquired as last lock, so we can - # skip a good amount of work: - AquireSys(L.sys) - r.next = addr(L) - L.next = nil - return - last = r - r = r.next - # special case: thread already holds L! - if L.key == r.key: return - - # bad case: L needs to be somewhere in between - # release all locks after L: - var rollback = r - while r != nil: - ReleaseSys(r.sys) - r = r.next - # and aquire them in the correct order again: - AquireSys(L.sys) - r = rollback - while r != nil: - assert r.key < L.key - AquireSys(r.sys) - r = r.next - # now that we have all the locks we need, we can insert L - # into our list: - if last != nil: - L.next = last.next - last.next = addr(L) + var g = GetGlobals() + var p = addr(lock) + var L = g.locksLen-1 + var i = 0 + while i <= L: + assert g.locks[i] != nil + if g.locks[i] < p: inc(i) # in correct order + elif g.locks[i] == p: return # thread already holds lock else: - L.next = lockList - lockList = addr(L) - inc(deadlocksPrevented) + # do the crazy stuff here: + if g.locksLen >= len(g.locks): + raise newException(EResourceExhausted, "cannot aquire additional lock") + while L >= i: + ReleaseSys(cast[ptr TSysLock](g.locks[L])[]) + g.locks[L+1] = g.locks[L] + dec L + # aquire the current lock: + AquireSys(lock) + g.locks[i] = p + inc(g.locksLen) + # aquire old locks in proper order again: + L = g.locksLen-1 + inc i + while i <= L: + AquireSys(cast[ptr TSysLock](g.locks[i])[]) + inc(i) + # DANGER: We can only modify this global var if we gained every lock! + # NO! We need an atomic increment. Crap. + discard system.atomicInc(deadlocksPrevented, 1) + assert OrderedLocks(g) + return + + # simply add to the end: + if g.locksLen >= len(g.locks): + raise newException(EResourceExhausted, "cannot aquire additional lock") + AquireSys(lock) + g.locks[g.locksLen] = p + inc(g.locksLen) + assert OrderedLocks(g) else: - AquireSys(L) + AquireSys(lock) -proc Release*(L: var TLock) = - ## Releases the lock `L`. +proc Release*(lock: var TLock) = + ## Releases the lock `lock`. when nodeadlocks: - assert L.key != 0 - var p = lockList - var last: ptr TLock = nil - while true: - # if we don't find the lock, die by reading from nil! - if p.key == L.key: - if last != nil: - last.next = p.next - else: - assert p == lockList - lockList = locklist.next - L.next = nil + var g = GetGlobals() + var p = addr(lock) + var L = g.locksLen + for i in countdown(L-1, 0): + if g.locks[i] == p: + for j in i..L-2: g.locks[j] = g.locks[j+1] + dec g.locksLen break - last = p - p = p.next - ReleaseSys(L.sys) - else: - ReleaseSys(L) + ReleaseSys(lock) proc joinThread*[TParam](t: TThread[TParam]) {.inline.} = ## waits for the thread `t` until it has terminated. @@ -257,7 +275,7 @@ proc createThread*[TParam](t: var TThread[TParam], var dummyThreadId: int32 t.sys = CreateThread(nil, 0'i32, threadProcWrapper[TParam], addr(t.c), 0'i32, dummyThreadId) - else: + else: if pthread_create(t.sys, nil, threadProcWrapper[TParam], addr(t.c)) != 0: raise newException(EIO, "cannot create thread") @@ -265,12 +283,12 @@ when isMainModule: import os var - thr: array [0..1, TThread[tuple[a,b: int]]] + thr: array [0..5, TThread[tuple[a, b: int]]] L, M, N: TLock proc doNothing() = nil - proc threadFunc(interval: tuple[a,b: int]) {.procvar.} = + proc threadFunc(interval: tuple[a, b: int]) {.procvar.} = doNothing() for i in interval.a..interval.b: when nodeadlocks: @@ -302,16 +320,15 @@ when isMainModule: else: assert false else: Aquire(L) # lock stdout - Aquire(M) - Aquire(N) echo i os.sleep(10) when nodeadlocks: echo "deadlocks prevented: ", deadlocksPrevented + when nodeadlocks: + Release(N) + Release(M) Release(L) - Release(M) - Release(N) InitLock(L) InitLock(M) @@ -323,5 +340,7 @@ when isMainModule: for i in 0..high(thr): joinThread(thr[i]) + GC_disable() main() + GC_enable() |