diff options
author | Araq <rumpf_a@web.de> | 2011-06-02 13:02:40 +0200 |
---|---|---|
committer | Araq <rumpf_a@web.de> | 2011-06-02 13:02:40 +0200 |
commit | 3260702a6044cdae89cf673ad1983aa3342127de (patch) | |
tree | 40439bfaf9f4ecb4929547e387998b282eee408c /lib | |
parent | d0bfc3665fd0131dad516d2fcd7cfe73c3a6f122 (diff) | |
download | Nim-3260702a6044cdae89cf673ad1983aa3342127de.tar.gz |
first steps to thread local heaps
Diffstat (limited to 'lib')
-rwxr-xr-x | lib/core/threads.nim | 348 | ||||
-rw-r--r-- | lib/prelude.nim | 23 | ||||
-rwxr-xr-x | lib/system.nim | 30 | ||||
-rwxr-xr-x | lib/system/alloc.nim | 71 | ||||
-rw-r--r-- | lib/system/atomics.nim | 41 | ||||
-rwxr-xr-x | lib/system/excpt.nim | 108 | ||||
-rwxr-xr-x | lib/system/gc.nim | 43 | ||||
-rwxr-xr-x | lib/system/repr.nim | 2 | ||||
-rwxr-xr-x | lib/system/systhread.nim | 98 | ||||
-rwxr-xr-x | lib/system/threads.nim | 481 | ||||
-rw-r--r-- | lib/wrappers/zmq.nim | 298 |
11 files changed, 927 insertions, 616 deletions
diff --git a/lib/core/threads.nim b/lib/core/threads.nim deleted file mode 100755 index 338d9166d..000000000 --- a/lib/core/threads.nim +++ /dev/null @@ -1,348 +0,0 @@ -# -# -# Nimrod's Runtime Library -# (c) Copyright 2011 Andreas Rumpf -# -# See the file "copying.txt", included in this -# distribution, for details about the copyright. -# - -## Basic thread support for Nimrod. Note that Nimrod's default GC is still -## single-threaded. This means that you MUST turn off the GC while multiple -## threads are executing that allocate GC'ed memory. The alternative is to -## compile with ``--gc:none`` or ``--gc:boehm``. -## -## Example: -## -## .. code-block:: nimrod -## -## var -## thr: array [0..4, TThread[tuple[a,b: int]]] -## L: TLock -## -## proc threadFunc(interval: tuple[a,b: int]) {.procvar.} = -## for i in interval.a..interval.b: -## Aquire(L) # lock stdout -## echo i -## Release(L) -## -## InitLock(L) -## -## GC_disable() # native GC does not support multiple threads yet :-( -## for i in 0..high(thr): -## createThread(thr[i], threadFunc, (i*10, i*10+5)) -## for i in 0..high(thr): -## joinThread(thr[i]) -## GC_enable() - -when not compileOption("threads"): - {.error: "Thread support requires ``--threads:on`` commandline switch".} - -when not defined(boehmgc) and not defined(nogc) and false: - {.error: "Thread support requires --gc:boehm or --gc:none".} - -include "lib/system/systhread" - -# We jump through some hops here to ensure that Nimrod thread procs can have -# the Nimrod calling convention. This is needed because thread procs are -# ``stdcall`` on Windows and ``noconv`` on UNIX. Alternative would be to just -# use ``stdcall`` since it is mapped to ``noconv`` on UNIX anyway. However, -# the current approach will likely result in less problems later when we have -# GC'ed closures in Nimrod. - -type - TThreadProcClosure {.pure, final.}[TParam] = object - fn: proc (p: TParam) - threadLocalStorage: pointer - stackBottom: pointer - data: TParam - -when defined(windows): - type - THandle = int - TSysThread = THandle - TWinThreadProc = proc (x: pointer): int32 {.stdcall.} - - proc CreateThread(lpThreadAttributes: Pointer, dwStackSize: int32, - lpStartAddress: TWinThreadProc, - lpParameter: Pointer, - dwCreationFlags: int32, lpThreadId: var int32): THandle {. - stdcall, dynlib: "kernel32", importc: "CreateThread".} - - when false: - proc winSuspendThread(hThread: TSysThread): int32 {. - stdcall, dynlib: "kernel32", importc: "SuspendThread".} - - proc winResumeThread(hThread: TSysThread): int32 {. - stdcall, dynlib: "kernel32", importc: "ResumeThread".} - - proc WaitForMultipleObjects(nCount: int32, - lpHandles: ptr array[0..10, THandle], - bWaitAll: int32, - dwMilliseconds: int32): int32 {. - stdcall, dynlib: "kernel32", importc: "WaitForMultipleObjects".} - - proc WaitForSingleObject(hHandle: THANDLE, dwMilliseconds: int32): int32 {. - stdcall, dynlib: "kernel32", importc: "WaitForSingleObject".} - - proc TerminateThread(hThread: THandle, dwExitCode: int32): int32 {. - stdcall, dynlib: "kernel32", importc: "TerminateThread".} - - {.push stack_trace:off.} - proc threadProcWrapper[TParam](closure: pointer): int32 {.stdcall.} = - var c = cast[ptr TThreadProcClosure[TParam]](closure) - SetThreadLocalStorage(c.threadLocalStorage) - c.fn(c.data) - # implicitely return 0 - {.pop.} - -else: - type - TSysThread {.importc: "pthread_t", header: "<sys/types.h>".} = int - Ttimespec {.importc: "struct timespec", - header: "<time.h>", final, pure.} = object - tv_sec: int - tv_nsec: int - - proc pthread_create(a1: var TSysThread, a2: ptr int, - a3: proc (x: pointer) {.noconv.}, - a4: pointer): cint {.importc: "pthread_create", - header: "<pthread.h>".} - proc pthread_join(a1: TSysThread, a2: ptr pointer): cint {. - importc, header: "<pthread.h>".} - - proc pthread_cancel(a1: TSysThread): cint {. - importc: "pthread_cancel", header: "<pthread.h>".} - - proc AquireSysTimeoutAux(L: var TSysLock, timeout: var Ttimespec): cint {. - importc: "pthread_mutex_timedlock", header: "<time.h>".} - - proc AquireSysTimeout(L: var TSysLock, msTimeout: int) {.inline.} = - var a: Ttimespec - a.tv_sec = msTimeout div 1000 - a.tv_nsec = (msTimeout mod 1000) * 1000 - var res = AquireSysTimeoutAux(L, a) - if res != 0'i32: - raise newException(EResourceExhausted, $strerror(res)) - - {.push stack_trace:off.} - proc threadProcWrapper[TParam](closure: pointer) {.noconv.} = - var c = cast[ptr TThreadProcClosure[TParam]](closure) - SetThreadLocalStorage(c.threadLocalStorage) - c.fn(c.data) - {.pop.} - - -const - noDeadlocks = true # compileOption("deadlockPrevention") - -type - TLock* = TSysLock - TThread* {.pure, final.}[TParam] = object ## Nimrod thread. - sys: TSysThread - c: TThreadProcClosure[TParam] - -when nodeadlocks: - var - deadlocksPrevented* = 0 ## counts the number of times a - ## deadlock has been prevented - -proc InitLock*(lock: var TLock) {.inline.} = - ## Initializes the lock `lock`. - InitSysLock(lock) - -proc OrderedLocks(g: PGlobals): bool = - for i in 0 .. g.locksLen-2: - if g.locks[i] >= g.locks[i+1]: return false - result = true - -proc TryAquire*(lock: var TLock): bool {.inline.} = - ## Try to aquires the lock `lock`. Returns `true` on success. - when noDeadlocks: - result = TryAquireSys(lock) - if not result: return - # we have to add it to the ordered list. Oh, and we might fail if there# - # there is no space in the array left ... - var g = GetGlobals() - if g.locksLen >= len(g.locks): - ReleaseSys(lock) - raise newException(EResourceExhausted, "cannot aquire additional lock") - # find the position to add: - var p = addr(lock) - var L = g.locksLen-1 - var i = 0 - while i <= L: - assert g.locks[i] != nil - if g.locks[i] < p: inc(i) # in correct order - elif g.locks[i] == p: return # thread already holds lock - else: - # do the crazy stuff here: - while L >= i: - g.locks[L+1] = g.locks[L] - dec L - g.locks[i] = p - inc(g.locksLen) - assert OrderedLocks(g) - return - # simply add to the end: - g.locks[g.locksLen] = p - inc(g.locksLen) - assert OrderedLocks(g) - else: - result = TryAquireSys(lock) - -proc Aquire*(lock: var TLock) = - ## Aquires the lock `lock`. - when nodeadlocks: - var g = GetGlobals() - var p = addr(lock) - var L = g.locksLen-1 - var i = 0 - while i <= L: - assert g.locks[i] != nil - if g.locks[i] < p: inc(i) # in correct order - elif g.locks[i] == p: return # thread already holds lock - else: - # do the crazy stuff here: - if g.locksLen >= len(g.locks): - raise newException(EResourceExhausted, "cannot aquire additional lock") - while L >= i: - ReleaseSys(cast[ptr TSysLock](g.locks[L])[]) - g.locks[L+1] = g.locks[L] - dec L - # aquire the current lock: - AquireSys(lock) - g.locks[i] = p - inc(g.locksLen) - # aquire old locks in proper order again: - L = g.locksLen-1 - inc i - while i <= L: - AquireSys(cast[ptr TSysLock](g.locks[i])[]) - inc(i) - # DANGER: We can only modify this global var if we gained every lock! - # NO! We need an atomic increment. Crap. - discard system.atomicInc(deadlocksPrevented, 1) - assert OrderedLocks(g) - return - - # simply add to the end: - if g.locksLen >= len(g.locks): - raise newException(EResourceExhausted, "cannot aquire additional lock") - AquireSys(lock) - g.locks[g.locksLen] = p - inc(g.locksLen) - assert OrderedLocks(g) - else: - AquireSys(lock) - -proc Release*(lock: var TLock) = - ## Releases the lock `lock`. - when nodeadlocks: - var g = GetGlobals() - var p = addr(lock) - var L = g.locksLen - for i in countdown(L-1, 0): - if g.locks[i] == p: - for j in i..L-2: g.locks[j] = g.locks[j+1] - dec g.locksLen - break - ReleaseSys(lock) - -proc joinThread*[TParam](t: TThread[TParam]) {.inline.} = - ## waits for the thread `t` until it has terminated. - when hostOS == "windows": - discard WaitForSingleObject(t.sys, -1'i32) - else: - discard pthread_join(t.sys, nil) - -proc destroyThread*[TParam](t: var TThread[TParam]) {.inline.} = - ## forces the thread `t` to terminate. This is potentially dangerous if - ## you don't have full control over `t` and its aquired resources. - when hostOS == "windows": - discard TerminateThread(t.sys, 1'i32) - else: - discard pthread_cancel(t.sys) - -proc createThread*[TParam](t: var TThread[TParam], - tp: proc (param: TParam), - param: TParam) = - ## creates a new thread `t` and starts its execution. Entry point is the - ## proc `tp`. `param` is passed to `tp`. - t.c.threadLocalStorage = AllocThreadLocalStorage() - t.c.data = param - t.c.fn = tp - when hostOS == "windows": - var dummyThreadId: int32 - t.sys = CreateThread(nil, 0'i32, threadProcWrapper[TParam], - addr(t.c), 0'i32, dummyThreadId) - else: - if pthread_create(t.sys, nil, threadProcWrapper[TParam], addr(t.c)) != 0: - raise newException(EIO, "cannot create thread") - -when isMainModule: - import os - - var - thr: array [0..5, TThread[tuple[a, b: int]]] - L, M, N: TLock - - proc doNothing() = nil - - proc threadFunc(interval: tuple[a, b: int]) {.procvar.} = - doNothing() - for i in interval.a..interval.b: - when nodeadlocks: - case i mod 6 - of 0: - Aquire(L) # lock stdout - Aquire(M) - Aquire(N) - of 1: - Aquire(L) - Aquire(N) # lock stdout - Aquire(M) - of 2: - Aquire(M) - Aquire(L) - Aquire(N) - of 3: - Aquire(M) - Aquire(N) - Aquire(L) - of 4: - Aquire(N) - Aquire(M) - Aquire(L) - of 5: - Aquire(N) - Aquire(L) - Aquire(M) - else: assert false - else: - Aquire(L) # lock stdout - Aquire(M) - - echo i - os.sleep(10) - when nodeadlocks: - echo "deadlocks prevented: ", deadlocksPrevented - when nodeadlocks: - Release(N) - Release(M) - Release(L) - - InitLock(L) - InitLock(M) - InitLock(N) - - proc main = - for i in 0..high(thr): - createThread(thr[i], threadFunc, (i*100, i*100+50)) - for i in 0..high(thr): - joinThread(thr[i]) - - GC_disable() - main() - GC_enable() - diff --git a/lib/prelude.nim b/lib/prelude.nim new file mode 100644 index 000000000..372708f31 --- /dev/null +++ b/lib/prelude.nim @@ -0,0 +1,23 @@ +# +# +# Nimrod's Runtime Library +# (c) Copyright 2011 Andreas Rumpf +# +# See the file "copying.txt", included in this +# distribution, for details about the copyright. +# + +## This is an include file that simply imports common modules for your +## convenience: +## +## .. code-block:: nimrod +## include prelude +## +## Same as: +## +## .. code-block:: nimrod +## import os, strutils, times, parseutils, parseopt + +import os, strutils, times, parseutils, parseopt + + diff --git a/lib/system.nim b/lib/system.nim index 9a9e4fb06..ef9685574 100755 --- a/lib/system.nim +++ b/lib/system.nim @@ -778,6 +778,12 @@ proc compileOption*(option, arg: string): bool {. const hasThreadSupport = compileOption("threads") + hasSharedHeap = false # don't share heaps, so every thread has its own heap + +when hasThreadSupport and not hasSharedHeap: + {.pragma: rtlThreadVar, threadvar.} +else: + {.pragma: rtlThreadVar.} include "system/inclrtl" @@ -1448,12 +1454,6 @@ proc quit*(errorcode: int = QuitSuccess) {. when not defined(EcmaScript) and not defined(NimrodVM): {.push stack_trace: off.} - proc atomicInc*(memLoc: var int, x: int): int {.inline.} - ## atomic increment of `memLoc`. Returns the value after the operation. - - proc atomicDec*(memLoc: var int, x: int): int {.inline.} - ## atomic decrement of `memLoc`. Returns the value after the operation. - proc initGC() proc initStackBottom() {.inline.} = @@ -1666,7 +1666,23 @@ when not defined(EcmaScript) and not defined(NimrodVM): # ---------------------------------------------------------------------------- - include "system/systhread" + proc atomicInc*(memLoc: var int, x: int): int {.inline.} + ## atomic increment of `memLoc`. Returns the value after the operation. + + proc atomicDec*(memLoc: var int, x: int): int {.inline.} + ## atomic decrement of `memLoc`. Returns the value after the operation. + + include "system/atomics" + + type + PSafePoint = ptr TSafePoint + TSafePoint {.compilerproc, final.} = object + prev: PSafePoint # points to next safe point ON THE STACK + status: int + context: C_JmpBuf + + when hasThreadSupport: + include "system/threads" include "system/excpt" # we cannot compile this with stack tracing on # as it would recurse endlessly! diff --git a/lib/system/alloc.nim b/lib/system/alloc.nim index 2280415e1..c12dc253a 100755 --- a/lib/system/alloc.nim +++ b/lib/system/alloc.nim @@ -1,7 +1,7 @@ # # # Nimrod's Runtime Library -# (c) Copyright 2009 Andreas Rumpf +# (c) Copyright 2011 Andreas Rumpf # # See the file "copying.txt", included in this # distribution, for details about the copyright. @@ -80,7 +80,7 @@ else: # system immediately. const - ChunkOsReturn = 256 * PageSize + ChunkOsReturn = 256 * PageSize # 1 MB InitialMemoryRequest = ChunkOsReturn div 2 # < ChunkOsReturn! SmallChunkSize = PageSize @@ -101,6 +101,7 @@ type next: ptr TFreeCell # next free cell in chunk (overlaid with refcount) zeroField: int # 0 means cell is not used (overlaid with typ field) # 1 means cell is manually managed pointer + # otherwise a PNimType is stored in there PChunk = ptr TBaseChunk PBigChunk = ptr TBigChunk @@ -151,6 +152,7 @@ type TAllocator {.final, pure.} = object llmem: PLLChunk currMem, maxMem, freeMem: int # memory sizes (allocated from OS) + lastSize: int # needed for the case that OS gives us pages linearly freeSmallChunks: array[0..SmallChunkSize div MemAlign-1, PSmallChunk] freeChunksList: PBigChunk # XXX make this a datastructure with O(1) access chunkStarts: TIntSet @@ -167,10 +169,7 @@ proc getMaxMem(a: var TAllocator): int = # maxPagesCount may not be up to date. Thus we use the # maximum of these both values here: return max(a.currMem, a.maxMem) - -var - allocator: TAllocator - + proc llAlloc(a: var TAllocator, size: int): pointer = # *low-level* alloc for the memory managers data structures. Deallocation # is never done. @@ -192,10 +191,10 @@ proc IntSetGet(t: TIntSet, key: int): PTrunk = it = it.next result = nil -proc IntSetPut(t: var TIntSet, key: int): PTrunk = +proc IntSetPut(a: var TAllocator, t: var TIntSet, key: int): PTrunk = result = IntSetGet(t, key) if result == nil: - result = cast[PTrunk](llAlloc(allocator, sizeof(result[]))) + result = cast[PTrunk](llAlloc(a, sizeof(result[]))) result.next = t.data[key and high(t.data)] t.data[key and high(t.data)] = result result.key = key @@ -208,8 +207,8 @@ proc Contains(s: TIntSet, key: int): bool = else: result = false -proc Incl(s: var TIntSet, key: int) = - var t = IntSetPut(s, key shr TrunkShift) +proc Incl(a: var TAllocator, s: var TIntSet, key: int) = + var t = IntSetPut(a, s, key shr TrunkShift) var u = key and TrunkMask t.bits[u shr IntShift] = t.bits[u shr IntShift] or (1 shl (u and IntMask)) @@ -219,18 +218,6 @@ proc Excl(s: var TIntSet, key: int) = var u = key and TrunkMask t.bits[u shr IntShift] = t.bits[u shr IntShift] and not (1 shl (u and IntMask)) - -proc ContainsOrIncl(s: var TIntSet, key: int): bool = - var t = IntSetGet(s, key shr TrunkShift) - if t != nil: - var u = key and TrunkMask - result = (t.bits[u shr IntShift] and (1 shl (u and IntMask))) != 0 - if not result: - t.bits[u shr IntShift] = t.bits[u shr IntShift] or - (1 shl (u and IntMask)) - else: - Incl(s, key) - result = false # ------------- chunk management ---------------------------------------------- proc pageIndex(c: PChunk): int {.inline.} = @@ -241,9 +228,7 @@ proc pageIndex(p: pointer): int {.inline.} = proc pageAddr(p: pointer): PChunk {.inline.} = result = cast[PChunk](cast[TAddress](p) and not PageMask) - assert(Contains(allocator.chunkStarts, pageIndex(result))) - -var lastSize = PageSize + #assert(Contains(allocator.chunkStarts, pageIndex(result))) proc requestOsChunks(a: var TAllocator, size: int): PBigChunk = incCurrMem(a, size) @@ -263,6 +248,7 @@ proc requestOsChunks(a: var TAllocator, size: int): PBigChunk = #echo("Next already allocated!") next.prevSize = size # set result.prevSize: + var lastSize = if a.lastSize != 0: a.lastSize else: PageSize var prv = cast[TAddress](result) -% lastSize assert((nxt and PageMask) == 0) var prev = cast[PChunk](prv) @@ -271,7 +257,7 @@ proc requestOsChunks(a: var TAllocator, size: int): PBigChunk = result.prevSize = lastSize else: result.prevSize = 0 # unknown - lastSize = size # for next request + a.lastSize = size # for next request proc freeOsChunks(a: var TAllocator, p: pointer, size: int) = # update next.prevSize: @@ -287,8 +273,8 @@ proc freeOsChunks(a: var TAllocator, p: pointer, size: int) = dec(a.freeMem, size) #c_fprintf(c_stdout, "[Alloc] back to OS: %ld\n", size) -proc isAccessible(p: pointer): bool {.inline.} = - result = Contains(allocator.chunkStarts, pageIndex(p)) +proc isAccessible(a: TAllocator, p: pointer): bool {.inline.} = + result = Contains(a.chunkStarts, pageIndex(p)) proc contains[T](list, x: T): bool = var it = list @@ -337,7 +323,7 @@ proc updatePrevSize(a: var TAllocator, c: PBigChunk, prevSize: int) {.inline.} = var ri = cast[PChunk](cast[TAddress](c) +% c.size) assert((cast[TAddress](ri) and PageMask) == 0) - if isAccessible(ri): + if isAccessible(a, ri): ri.prevSize = prevSize proc freeBigChunk(a: var TAllocator, c: PBigChunk) = @@ -347,7 +333,7 @@ proc freeBigChunk(a: var TAllocator, c: PBigChunk) = when coalescRight: var ri = cast[PChunk](cast[TAddress](c) +% c.size) assert((cast[TAddress](ri) and PageMask) == 0) - if isAccessible(ri) and chunkUnused(ri): + if isAccessible(a, ri) and chunkUnused(ri): assert(not isSmallChunk(ri)) if not isSmallChunk(ri): ListRemove(a.freeChunksList, cast[PBigChunk](ri)) @@ -357,7 +343,7 @@ proc freeBigChunk(a: var TAllocator, c: PBigChunk) = if c.prevSize != 0: var le = cast[PChunk](cast[TAddress](c) -% c.prevSize) assert((cast[TAddress](le) and PageMask) == 0) - if isAccessible(le) and chunkUnused(le): + if isAccessible(a, le) and chunkUnused(le): assert(not isSmallChunk(le)) if not isSmallChunk(le): ListRemove(a.freeChunksList, cast[PBigChunk](le)) @@ -366,7 +352,7 @@ proc freeBigChunk(a: var TAllocator, c: PBigChunk) = c = cast[PBigChunk](le) if c.size < ChunkOsReturn: - incl(a.chunkStarts, pageIndex(c)) + incl(a, a.chunkStarts, pageIndex(c)) updatePrevSize(a, c, c.size) ListAdd(a.freeChunksList, c) c.used = false @@ -383,7 +369,7 @@ proc splitChunk(a: var TAllocator, c: PBigChunk, size: int) = rest.prevSize = size updatePrevSize(a, c, rest.size) c.size = size - incl(a.chunkStarts, pageIndex(rest)) + incl(a, a.chunkStarts, pageIndex(rest)) ListAdd(a.freeChunksList, rest) proc getBigChunk(a: var TAllocator, size: int): PBigChunk = @@ -410,7 +396,7 @@ proc getBigChunk(a: var TAllocator, size: int): PBigChunk = result = requestOsChunks(a, size) result.prevSize = 0 # XXX why is this needed? result.used = true - incl(a.chunkStarts, pageIndex(result)) + incl(a, a.chunkStarts, pageIndex(result)) dec(a.freeMem, size) proc getSmallChunk(a: var TAllocator): PSmallChunk = @@ -472,7 +458,7 @@ proc rawAlloc(a: var TAllocator, requestedSize: int): pointer = assert c.size == size result = addr(c.data) assert((cast[TAddress](result) and (MemAlign-1)) == 0) - assert(isAccessible(result)) + assert(isAccessible(a, result)) proc rawDealloc(a: var TAllocator, p: pointer) = var c = pageAddr(p) @@ -509,7 +495,7 @@ proc rawDealloc(a: var TAllocator, p: pointer) = freeBigChunk(a, cast[PBigChunk](c)) proc isAllocatedPtr(a: TAllocator, p: pointer): bool = - if isAccessible(p): + if isAccessible(a, p): var c = pageAddr(p) if not chunkUnused(c): if isSmallChunk(c): @@ -522,11 +508,12 @@ proc isAllocatedPtr(a: TAllocator, p: pointer): bool = var c = cast[PBigChunk](c) result = p == addr(c.data) and cast[ptr TFreeCell](p).zeroField >% 1 +var + allocator {.rtlThreadVar.}: TAllocator + # ---------------------- interface to programs ------------------------------- when not defined(useNimRtl): - var heapLock: TSysLock - InitSysLock(HeapLock) proc unlockedAlloc(size: int): pointer {.inline.} = result = rawAlloc(allocator, size+sizeof(TFreeCell)) @@ -545,18 +532,18 @@ when not defined(useNimRtl): assert(not isAllocatedPtr(allocator, x)) proc alloc(size: int): pointer = - when hasThreadSupport: AquireSys(HeapLock) + when hasThreadSupport and hasSharedHeap: AquireSys(HeapLock) result = unlockedAlloc(size) - when hasThreadSupport: ReleaseSys(HeapLock) + when hasThreadSupport and hasSharedHeap: ReleaseSys(HeapLock) proc alloc0(size: int): pointer = result = alloc(size) zeroMem(result, size) proc dealloc(p: pointer) = - when hasThreadSupport: AquireSys(HeapLock) + when hasThreadSupport and hasSharedHeap: AquireSys(HeapLock) unlockedDealloc(p) - when hasThreadSupport: ReleaseSys(HeapLock) + when hasThreadSupport and hasSharedHeap: ReleaseSys(HeapLock) proc ptrSize(p: pointer): int = var x = cast[pointer](cast[TAddress](p) -% sizeof(TFreeCell)) diff --git a/lib/system/atomics.nim b/lib/system/atomics.nim new file mode 100644 index 000000000..31c25c5af --- /dev/null +++ b/lib/system/atomics.nim @@ -0,0 +1,41 @@ +# +# +# Nimrod's Runtime Library +# (c) Copyright 2011 Andreas Rumpf +# +# See the file "copying.txt", included in this +# distribution, for details about the copyright. +# + +## Atomic operations for Nimrod. + +when (defined(gcc) or defined(llvm_gcc)) and hasThreadSupport: + proc sync_add_and_fetch(p: var int, val: int): int {. + importc: "__sync_add_and_fetch", nodecl.} + proc sync_sub_and_fetch(p: var int, val: int): int {. + importc: "__sync_sub_and_fetch", nodecl.} +elif defined(vcc) and hasThreadSupport: + proc sync_add_and_fetch(p: var int, val: int): int {. + importc: "NimXadd", nodecl.} +else: + proc sync_add_and_fetch(p: var int, val: int): int {.inline.} = + inc(p, val) + result = p + +proc atomicInc(memLoc: var int, x: int): int = + when hasThreadSupport: + result = sync_add_and_fetch(memLoc, x) + else: + inc(memLoc, x) + result = memLoc + +proc atomicDec(memLoc: var int, x: int): int = + when hasThreadSupport: + when defined(sync_sub_and_fetch): + result = sync_sub_and_fetch(memLoc, x) + else: + result = sync_add_and_fetch(memLoc, -x) + else: + dec(memLoc, x) + result = memLoc + diff --git a/lib/system/excpt.nim b/lib/system/excpt.nim index bbe608fdc..3ef39902a 100755 --- a/lib/system/excpt.nim +++ b/lib/system/excpt.nim @@ -10,9 +10,6 @@ # Exception handling code. This is difficult because it has # to work if there is no more memory (but it doesn't yet!). -const - MaxLocksPerThread = 10 - var stackTraceNewLine* = "\n" ## undocumented feature; it is replaced by ``<br>`` ## for CGI applications @@ -35,111 +32,10 @@ proc chckRange(i, a, b: int): int {.inline, compilerproc.} proc chckRangeF(x, a, b: float): float {.inline, compilerproc.} proc chckNil(p: pointer) {.inline, compilerproc.} -type - PSafePoint = ptr TSafePoint - TSafePoint {.compilerproc, final.} = object - prev: PSafePoint # points to next safe point ON THE STACK - status: int - context: C_JmpBuf - -when hasThreadSupport: - # Support for thread local storage: - when defined(windows): - type - TThreadVarSlot {.compilerproc.} = distinct int32 - - proc TlsAlloc(): TThreadVarSlot {. - importc: "TlsAlloc", stdcall, dynlib: "kernel32".} - proc TlsSetValue(dwTlsIndex: TThreadVarSlot, lpTlsValue: pointer) {. - importc: "TlsSetValue", stdcall, dynlib: "kernel32".} - proc TlsGetValue(dwTlsIndex: TThreadVarSlot): pointer {. - importc: "TlsGetValue", stdcall, dynlib: "kernel32".} - - proc ThreadVarAlloc(): TThreadVarSlot {.compilerproc, inline.} = - result = TlsAlloc() - proc ThreadVarSetValue(s: TThreadVarSlot, value: pointer) {. - compilerproc, inline.} = - TlsSetValue(s, value) - proc ThreadVarGetValue(s: TThreadVarSlot): pointer {. - compilerproc, inline.} = - result = TlsGetValue(s) - - else: - {.passL: "-pthread".} - {.passC: "-pthread".} - type - TThreadVarSlot {.importc: "pthread_key_t", pure, final, - header: "<sys/types.h>".} = object - - proc pthread_getspecific(a1: TThreadVarSlot): pointer {. - importc: "pthread_getspecific", header: "<pthread.h>".} - proc pthread_key_create(a1: ptr TThreadVarSlot, - destruct: proc (x: pointer) {.noconv.}): int32 {. - importc: "pthread_key_create", header: "<pthread.h>".} - proc pthread_key_delete(a1: TThreadVarSlot): int32 {. - importc: "pthread_key_delete", header: "<pthread.h>".} - - proc pthread_setspecific(a1: TThreadVarSlot, a2: pointer): int32 {. - importc: "pthread_setspecific", header: "<pthread.h>".} - - proc specificDestroy(mem: pointer) {.noconv.} = - # we really need a thread-safe 'dealloc' here: - dealloc(mem) - - proc ThreadVarAlloc(): TThreadVarSlot {.compilerproc, inline.} = - discard pthread_key_create(addr(result), specificDestroy) - proc ThreadVarSetValue(s: TThreadVarSlot, value: pointer) {. - compilerproc, inline.} = - discard pthread_setspecific(s, value) - proc ThreadVarGetValue(s: TThreadVarSlot): pointer {.compilerproc, inline.} = - result = pthread_getspecific(s) - - type - TGlobals* {.final, pure.} = object - excHandler: PSafePoint - currException: ref E_Base - framePtr: PFrame - locksLen*: int - locks*: array [0..MaxLocksPerThread-1, pointer] - buf: string # cannot be allocated on the stack! - assertBuf: string # we need a different buffer for - # assert, as it raises an exception and - # exception handler needs the buffer too - gAssertionFailed: ref EAssertionFailed - tempFrames: array [0..127, PFrame] # cannot be allocated on the stack! - data: float # compiler should add thread local variables here! - PGlobals* = ptr TGlobals - - # XXX it'd be more efficient to not use a global variable for the - # thread storage slot, but to rely on the implementation to assign slot 0 - # for us... ;-) - var globalsSlot = ThreadVarAlloc() - #const globalsSlot = TThreadVarSlot(0) - #assert checkSlot.int == globalsSlot.int - - proc NewGlobals(): PGlobals = - result = cast[PGlobals](alloc0(sizeof(TGlobals))) - new(result.gAssertionFailed) - result.buf = newStringOfCap(2000) - result.assertBuf = newStringOfCap(2000) - - proc AllocThreadLocalStorage*(): pointer {.inl.} = - isMultiThreaded = true - result = NewGlobals() - - proc SetThreadLocalStorage*(p: pointer) {.inl.} = - ThreadVarSetValue(globalsSlot, p) - - proc GetGlobals*(): PGlobals {.compilerRtl, inl.} = - result = cast[PGlobals](ThreadVarGetValue(globalsSlot)) - - # create for the main thread: - ThreadVarSetValue(globalsSlot, NewGlobals()) - when hasThreadSupport: template ThreadGlobals = - var globals = GetGlobals() - template `||`(varname: expr): expr = globals.varname + var currentThread = ThisThread() + template `||`(varname: expr): expr = currentThread.g.varname else: template ThreadGlobals = nil # nothing diff --git a/lib/system/gc.nim b/lib/system/gc.nim index ab8f19674..d276d6cec 100755 --- a/lib/system/gc.nim +++ b/lib/system/gc.nim @@ -15,10 +15,6 @@ # stack overflows when traversing deep datastructures. This is comparable to # an incremental and generational GC. It should be well-suited for soft real # time applications (like games). -# -# Future Improvements: -# * Support for multi-threading. However, locks for the reference counting -# might turn out to be too slow. const CycleIncrease = 2 # is a multiplicative increase @@ -64,10 +60,10 @@ type stat: TGcStat var - stackBottom: pointer - gch: TGcHeap - cycleThreshold: int = InitialCycleThreshold - recGcLock: int = 0 + stackBottom {.rtlThreadVar.}: pointer + gch {.rtlThreadVar.}: TGcHeap + cycleThreshold {.rtlThreadVar.}: int = InitialCycleThreshold + recGcLock {.rtlThreadVar.}: int = 0 # we use a lock to prevent the garbage collector to be triggered in a # finalizer; the collector should not call itself this way! Thus every # object allocated by a finalizer will not trigger a garbage collection. @@ -186,6 +182,15 @@ proc doOperation(p: pointer, op: TWalkOp) proc forAllChildrenAux(dest: Pointer, mt: PNimType, op: TWalkOp) # we need the prototype here for debugging purposes +when hasThreadSupport and hasSharedHeap: + template `--`(x: expr): expr = atomicDec(x, rcIncrement) <% rcIncrement + template `++`(x: expr): stmt = discard atomicInc(x, rcIncrement) +else: + template `--`(x: expr): expr = + Dec(x, rcIncrement) + x <% rcIncrement + template `++`(x: expr): stmt = Inc(x, rcIncrement) + proc prepareDealloc(cell: PCell) = if cell.typ.finalizer != nil: # the finalizer could invoke something that @@ -219,13 +224,13 @@ proc decRef(c: PCell) {.inline.} = writeCell("broken cell", c) assert(c.refcount >=% rcIncrement) #if c.refcount <% rcIncrement: quit("leck mich") - if atomicDec(c.refcount, rcIncrement) <% rcIncrement: + if --c.refcount: rtlAddZCT(c) elif canBeCycleRoot(c): rtlAddCycleRoot(c) proc incRef(c: PCell) {.inline.} = - discard atomicInc(c.refcount, rcIncrement) + ++c.refcount if canBeCycleRoot(c): rtlAddCycleRoot(c) @@ -245,10 +250,10 @@ proc asgnRefNoCycle(dest: ppointer, src: pointer) {.compilerProc, inline.} = # cycle is possible. if src != nil: var c = usrToCell(src) - discard atomicInc(c.refcount, rcIncrement) + ++c.refcount if dest[] != nil: var c = usrToCell(dest[]) - if atomicDec(c.refcount, rcIncrement) <% rcIncrement: + if --c.refcount: rtlAddZCT(c) dest[] = src @@ -517,7 +522,17 @@ proc gcMark(p: pointer) {.inline.} = proc markThreadStacks(gch: var TGcHeap) = when hasThreadSupport: - nil + var it = threadList + while it != nil: + # mark registers: + for i in 0 .. high(it.registers): gcMark(it.registers[i]) + var sp = cast[TAddress](it.stackBottom) + var max = cast[TAddress](it.stackTop) + # XXX unroll this loop: + while sp <=% max: + gcMark(cast[ppointer](sp)[]) + sp = sp +% sizeof(pointer) + it = it.next # ----------------- stack management -------------------------------------- # inspired from Smart Eiffel @@ -684,7 +699,7 @@ proc unmarkStackAndRegisters(gch: var TGcHeap) = # decRef(d[i]) inlined: cannot create a cycle and must not aquire lock var c = d[i] # XXX no need for an atomic dec here: - if atomicDec(c.refcount, rcIncrement) <% rcIncrement: + if --c.refcount: addZCT(gch.zct, c) assert c.typ != nil gch.decStack.len = 0 diff --git a/lib/system/repr.nim b/lib/system/repr.nim index 395adc2ca..d9997001d 100755 --- a/lib/system/repr.nim +++ b/lib/system/repr.nim @@ -97,7 +97,7 @@ proc reprSetAux(result: var string, p: pointer, typ: PNimType) = inc(elemCounter) if typ.size <= 8: for i in 0..sizeof(int64)*8-1: - if (u and (1 shl i)) != 0: + if (u and (1'i64 shl int64(i))) != 0'i64: if elemCounter > 0: add result, ", " addSetElem(result, i+typ.node.len, typ.base) inc(elemCounter) diff --git a/lib/system/systhread.nim b/lib/system/systhread.nim deleted file mode 100755 index c83062942..000000000 --- a/lib/system/systhread.nim +++ /dev/null @@ -1,98 +0,0 @@ -# -# -# Nimrod's Runtime Library -# (c) Copyright 2011 Andreas Rumpf -# -# See the file "copying.txt", included in this -# distribution, for details about the copyright. -# - -const - maxThreads = 256 - SystemInclude = defined(hasThreadSupport) - -when not SystemInclude: - # ugly hack: this file is then included from core/threads, so we have - # thread support: - const hasThreadSupport = true - - include "lib/system/ansi_c" - -when (defined(gcc) or defined(llvm_gcc)) and hasThreadSupport: - proc sync_add_and_fetch(p: var int, val: int): int {. - importc: "__sync_add_and_fetch", nodecl.} - proc sync_sub_and_fetch(p: var int, val: int): int {. - importc: "__sync_sub_and_fetch", nodecl.} -elif defined(vcc) and hasThreadSupport: - proc sync_add_and_fetch(p: var int, val: int): int {. - importc: "NimXadd", nodecl.} -else: - proc sync_add_and_fetch(p: var int, val: int): int {.inline.} = - inc(p, val) - result = p - -proc atomicInc(memLoc: var int, x: int): int = - when hasThreadSupport: - result = sync_add_and_fetch(memLoc, x) - else: - inc(memLoc, x) - result = memLoc - -proc atomicDec(memLoc: var int, x: int): int = - when hasThreadSupport: - when defined(sync_sub_and_fetch): - result = sync_sub_and_fetch(memLoc, x) - else: - result = sync_add_and_fetch(memLoc, -x) - else: - dec(memLoc, x) - result = memLoc - -when defined(Windows): - type - TSysLock {.final, pure.} = object # CRITICAL_SECTION in WinApi - DebugInfo: pointer - LockCount: int32 - RecursionCount: int32 - OwningThread: int - LockSemaphore: int - Reserved: int32 - - proc InitSysLock(L: var TSysLock) {.stdcall, - dynlib: "kernel32", importc: "InitializeCriticalSection".} - ## Initializes the lock `L`. - - proc TryAquireSysAux(L: var TSysLock): int32 {.stdcall, - dynlib: "kernel32", importc: "TryEnterCriticalSection".} - ## Tries to aquire the lock `L`. - - proc TryAquireSys(L: var TSysLock): bool {.inline.} = - result = TryAquireSysAux(L) != 0'i32 - - proc AquireSys(L: var TSysLock) {.stdcall, - dynlib: "kernel32", importc: "EnterCriticalSection".} - ## Aquires the lock `L`. - - proc ReleaseSys(L: var TSysLock) {.stdcall, - dynlib: "kernel32", importc: "LeaveCriticalSection".} - ## Releases the lock `L`. - -else: - type - TSysLock {.importc: "pthread_mutex_t", pure, final, - header: "<sys/types.h>".} = object - - proc InitSysLock(L: var TSysLock, attr: pointer = nil) {. - importc: "pthread_mutex_init", header: "<pthread.h>".} - - proc AquireSys(L: var TSysLock) {. - importc: "pthread_mutex_lock", header: "<pthread.h>".} - proc TryAquireSysAux(L: var TSysLock): cint {. - importc: "pthread_mutex_trylock", header: "<pthread.h>".} - - proc TryAquireSys(L: var TSysLock): bool {.inline.} = - result = TryAquireSysAux(L) == 0'i32 - - proc ReleaseSys(L: var TSysLock) {. - importc: "pthread_mutex_unlock", header: "<pthread.h>".} - diff --git a/lib/system/threads.nim b/lib/system/threads.nim new file mode 100755 index 000000000..052335baa --- /dev/null +++ b/lib/system/threads.nim @@ -0,0 +1,481 @@ +# +# +# Nimrod's Runtime Library +# (c) Copyright 2011 Andreas Rumpf +# +# See the file "copying.txt", included in this +# distribution, for details about the copyright. +# + +## Thread support for Nimrod. **Note**: This is part of the system module. +## Do not import it directly. To active thread support you need to compile +## with the ``--threads:on`` command line switch. +## +## Nimrod's memory model for threads is quite different from other common +## programming languages (C, Pascal): Each thread has its own +## (garbage collected) heap and sharing of memory is restricted. This helps +## to prevent race conditions and improves efficiency. See the manual for +## details of this memory model. +## +## Example: +## +## .. code-block:: nimrod +## +## var +## thr: array [0..4, TThread[tuple[a,b: int]]] +## L: TLock +## +## proc threadFunc(interval: tuple[a,b: int]) {.procvar.} = +## for i in interval.a..interval.b: +## Aquire(L) # lock stdout +## echo i +## Release(L) +## +## InitLock(L) +## +## for i in 0..high(thr): +## createThread(thr[i], threadFunc, (i*10, i*10+5)) +## joinThreads(thr) + +const + maxRegisters = 256 # don't think there is an arch with more registers + maxLocksPerThread* = 10 ## max number of locks a thread can hold + ## at the same time + +when defined(Windows): + type + TSysLock {.final, pure.} = object # CRITICAL_SECTION in WinApi + DebugInfo: pointer + LockCount: int32 + RecursionCount: int32 + OwningThread: int + LockSemaphore: int + Reserved: int32 + + proc InitSysLock(L: var TSysLock) {.stdcall, + dynlib: "kernel32", importc: "InitializeCriticalSection".} + ## Initializes the lock `L`. + + proc TryAquireSysAux(L: var TSysLock): int32 {.stdcall, + dynlib: "kernel32", importc: "TryEnterCriticalSection".} + ## Tries to aquire the lock `L`. + + proc TryAquireSys(L: var TSysLock): bool {.inline.} = + result = TryAquireSysAux(L) != 0'i32 + + proc AquireSys(L: var TSysLock) {.stdcall, + dynlib: "kernel32", importc: "EnterCriticalSection".} + ## Aquires the lock `L`. + + proc ReleaseSys(L: var TSysLock) {.stdcall, + dynlib: "kernel32", importc: "LeaveCriticalSection".} + ## Releases the lock `L`. + + type + THandle = int + TSysThread = THandle + TWinThreadProc = proc (x: pointer): int32 {.stdcall.} + + proc CreateThread(lpThreadAttributes: Pointer, dwStackSize: int32, + lpStartAddress: TWinThreadProc, + lpParameter: Pointer, + dwCreationFlags: int32, + lpThreadId: var int32): TSysThread {. + stdcall, dynlib: "kernel32", importc: "CreateThread".} + + proc winSuspendThread(hThread: TSysThread): int32 {. + stdcall, dynlib: "kernel32", importc: "SuspendThread".} + + proc winResumeThread(hThread: TSysThread): int32 {. + stdcall, dynlib: "kernel32", importc: "ResumeThread".} + + proc WaitForMultipleObjects(nCount: int32, + lpHandles: ptr TSysThread, + bWaitAll: int32, + dwMilliseconds: int32): int32 {. + stdcall, dynlib: "kernel32", importc: "WaitForMultipleObjects".} + + proc WaitForSingleObject(hHandle: TSysThread, dwMilliseconds: int32): int32 {. + stdcall, dynlib: "kernel32", importc: "WaitForSingleObject".} + + proc TerminateThread(hThread: TSysThread, dwExitCode: int32): int32 {. + stdcall, dynlib: "kernel32", importc: "TerminateThread".} + + type + TThreadVarSlot {.compilerproc.} = distinct int32 + + proc TlsAlloc(): TThreadVarSlot {. + importc: "TlsAlloc", stdcall, dynlib: "kernel32".} + proc TlsSetValue(dwTlsIndex: TThreadVarSlot, lpTlsValue: pointer) {. + importc: "TlsSetValue", stdcall, dynlib: "kernel32".} + proc TlsGetValue(dwTlsIndex: TThreadVarSlot): pointer {. + importc: "TlsGetValue", stdcall, dynlib: "kernel32".} + + proc ThreadVarAlloc(): TThreadVarSlot {.compilerproc, inline.} = + result = TlsAlloc() + proc ThreadVarSetValue(s: TThreadVarSlot, value: pointer) {. + compilerproc, inline.} = + TlsSetValue(s, value) + proc ThreadVarGetValue(s: TThreadVarSlot): pointer {. + compilerproc, inline.} = + result = TlsGetValue(s) + +else: + {.passL: "-pthread".} + {.passC: "-pthread".} + + type + TSysLock {.importc: "pthread_mutex_t", pure, final, + header: "<sys/types.h>".} = object + + proc InitSysLock(L: var TSysLock, attr: pointer = nil) {. + importc: "pthread_mutex_init", header: "<pthread.h>".} + + proc AquireSys(L: var TSysLock) {. + importc: "pthread_mutex_lock", header: "<pthread.h>".} + proc TryAquireSysAux(L: var TSysLock): cint {. + importc: "pthread_mutex_trylock", header: "<pthread.h>".} + + proc TryAquireSys(L: var TSysLock): bool {.inline.} = + result = TryAquireSysAux(L) == 0'i32 + + proc ReleaseSys(L: var TSysLock) {. + importc: "pthread_mutex_unlock", header: "<pthread.h>".} + + type + TSysThread {.importc: "pthread_t", header: "<sys/types.h>", + final, pure.} = object + Tpthread_attr {.importc: "pthread_attr_t", + header: "<sys/types.h>", final, pure.} = object + + Ttimespec {.importc: "struct timespec", + header: "<time.h>", final, pure.} = object + tv_sec: int + tv_nsec: int + + proc pthread_attr_init(a1: var TPthread_attr) {. + importc, header: "<pthread.h>".} + proc pthread_attr_setstacksize(a1: var TPthread_attr, a2: int) {. + importc, header: "<pthread.h>".} + + proc pthread_create(a1: var TSysThread, a2: var TPthread_attr, + a3: proc (x: pointer) {.noconv.}, + a4: pointer): cint {.importc: "pthread_create", + header: "<pthread.h>".} + proc pthread_join(a1: TSysThread, a2: ptr pointer): cint {. + importc, header: "<pthread.h>".} + + proc pthread_cancel(a1: TSysThread): cint {. + importc: "pthread_cancel", header: "<pthread.h>".} + + proc AquireSysTimeoutAux(L: var TSysLock, timeout: var Ttimespec): cint {. + importc: "pthread_mutex_timedlock", header: "<time.h>".} + + proc AquireSysTimeout(L: var TSysLock, msTimeout: int) {.inline.} = + var a: Ttimespec + a.tv_sec = msTimeout div 1000 + a.tv_nsec = (msTimeout mod 1000) * 1000 + var res = AquireSysTimeoutAux(L, a) + if res != 0'i32: raise newException(EResourceExhausted, $strerror(res)) + + type + TThreadVarSlot {.importc: "pthread_key_t", pure, final, + header: "<sys/types.h>".} = object + + proc pthread_getspecific(a1: TThreadVarSlot): pointer {. + importc: "pthread_getspecific", header: "<pthread.h>".} + proc pthread_key_create(a1: ptr TThreadVarSlot, + destruct: proc (x: pointer) {.noconv.}): int32 {. + importc: "pthread_key_create", header: "<pthread.h>".} + proc pthread_key_delete(a1: TThreadVarSlot): int32 {. + importc: "pthread_key_delete", header: "<pthread.h>".} + + proc pthread_setspecific(a1: TThreadVarSlot, a2: pointer): int32 {. + importc: "pthread_setspecific", header: "<pthread.h>".} + + proc ThreadVarAlloc(): TThreadVarSlot {.compilerproc, inline.} = + discard pthread_key_create(addr(result), nil) + proc ThreadVarSetValue(s: TThreadVarSlot, value: pointer) {. + compilerproc, inline.} = + discard pthread_setspecific(s, value) + proc ThreadVarGetValue(s: TThreadVarSlot): pointer {.compilerproc, inline.} = + result = pthread_getspecific(s) + +type + TGlobals {.final, pure.} = object + excHandler: PSafePoint + currException: ref E_Base + framePtr: PFrame + buf: string # cannot be allocated on the stack! + assertBuf: string # we need a different buffer for + # assert, as it raises an exception and + # exception handler needs the buffer too + gAssertionFailed: ref EAssertionFailed + tempFrames: array [0..127, PFrame] # cannot be allocated on the stack! + data: float # compiler should add thread local variables here! + +proc initGlobals(g: var TGlobals) = + new(g.gAssertionFailed) + g.buf = newStringOfCap(2000) + g.assertBuf = newStringOfCap(2000) + + +type + PGcThread = ptr TGcThread + TGcThread {.pure.} = object + sys: TSysThread + next, prev: PGcThread + stackBottom, stackTop: pointer + stackSize: int + g: TGlobals + locksLen: int + locks: array [0..MaxLocksPerThread-1, pointer] + registers: array[0..maxRegisters-1, pointer] # register contents for GC + +# XXX it'd be more efficient to not use a global variable for the +# thread storage slot, but to rely on the implementation to assign slot 0 +# for us... ;-) +var globalsSlot = ThreadVarAlloc() +#const globalsSlot = TThreadVarSlot(0) +#assert checkSlot.int == globalsSlot.int + +proc ThisThread(): PGcThread {.compilerRtl, inl.} = + result = cast[PGcThread](ThreadVarGetValue(globalsSlot)) + +# create for the main thread. Note: do not insert this data into the list +# of all threads; it's not to be stopped etc. +when not defined(useNimRtl): + var mainThread: TGcThread + initGlobals(mainThread.g) + ThreadVarSetValue(globalsSlot, addr(mainThread)) + + var heapLock: TSysLock + InitSysLock(HeapLock) + + var + threadList: PGcThread + + proc registerThread(t: PGcThread) = + # we need to use the GC global lock here! + AquireSys(HeapLock) + t.prev = nil + t.next = threadList + if threadList != nil: + assert(threadList.prev == nil) + threadList.prev = t + threadList = t + ReleaseSys(HeapLock) + + proc unregisterThread(t: PGcThread) = + # we need to use the GC global lock here! + AquireSys(HeapLock) + if t == threadList: threadList = t.next + if t.next != nil: t.next.prev = t.prev + if t.prev != nil: t.prev.next = t.next + # so that a thread can be unregistered twice which might happen if the + # code executes `destroyThread`: + t.next = nil + t.prev = nil + ReleaseSys(HeapLock) + + # on UNIX, the GC uses ``SIGFREEZE`` to tell every thread to stop so that + # the GC can examine the stacks? + + proc stopTheWord() = + nil + +# We jump through some hops here to ensure that Nimrod thread procs can have +# the Nimrod calling convention. This is needed because thread procs are +# ``stdcall`` on Windows and ``noconv`` on UNIX. Alternative would be to just +# use ``stdcall`` since it is mapped to ``noconv`` on UNIX anyway. However, +# the current approach will likely result in less problems later when we have +# GC'ed closures in Nimrod. + +type + TThread* {.pure, final.}[TParam] = object of TGcThread ## Nimrod thread. + fn: proc (p: TParam) + data: TParam + +template ThreadProcWrapperBody(closure: expr) = + when not hasSharedHeap: initGC() # init the GC for this thread + ThreadVarSetValue(globalsSlot, closure) + var t = cast[ptr TThread[TParam]](closure) + when not hasSharedHeap: stackBottom = addr(t) + t.stackBottom = addr(t) + registerThread(t) + try: + t.fn(t.data) + finally: + unregisterThread(t) + +{.push stack_trace:off.} +when defined(windows): + proc threadProcWrapper[TParam](closure: pointer): int32 {.stdcall.} = + ThreadProcWrapperBody(closure) + # implicitely return 0 +else: + proc threadProcWrapper[TParam](closure: pointer) {.noconv.} = + ThreadProcWrapperBody(closure) +{.pop.} + +proc joinThread*[TParam](t: TThread[TParam]) {.inline.} = + ## waits for the thread `t` to finish. + when hostOS == "windows": + discard WaitForSingleObject(t.sys, -1'i32) + else: + discard pthread_join(t.sys, nil) + +proc joinThreads*[TParam](t: openArray[TThread[TParam]]) = + ## waits for every thread in `t` to finish. + when hostOS == "windows": + var a: array[0..255, TSysThread] + assert a.len >= t.len + for i in 0..t.high: a[i] = t[i].sys + discard WaitForMultipleObjects(t.len, cast[ptr TSysThread](addr(a)), 1, -1) + else: + for i in 0..t.high: joinThread(t[i]) + +proc destroyThread*[TParam](t: var TThread[TParam]) {.inline.} = + ## forces the thread `t` to terminate. This is potentially dangerous if + ## you don't have full control over `t` and its aquired resources. + when hostOS == "windows": + discard TerminateThread(t.sys, 1'i32) + else: + discard pthread_cancel(t.sys) + unregisterThread(addr(t.gcInfo)) + +proc createThread*[TParam](t: var TThread[TParam], + tp: proc (param: TParam), + param: TParam, + stackSize = 1024*256*sizeof(int)) = + ## creates a new thread `t` and starts its execution. Entry point is the + ## proc `tp`. `param` is passed to `tp`. + t.data = param + t.fn = tp + t.stackSize = stackSize + when hostOS == "windows": + var dummyThreadId: int32 + t.sys = CreateThread(nil, stackSize, threadProcWrapper[TParam], + addr(t), 0'i32, dummyThreadId) + else: + var a: Tpthread_attr + pthread_attr_init(a) + pthread_attr_setstacksize(a, stackSize) + if pthread_create(t.sys, a, threadProcWrapper[TParam], addr(t)) != 0: + raise newException(EIO, "cannot create thread") + +# --------------------------- lock handling ---------------------------------- + +type + TLock* = TSysLock ## Nimrod lock + +const + noDeadlocks = false # compileOption("deadlockPrevention") + +when nodeadlocks: + var + deadlocksPrevented* = 0 ## counts the number of times a + ## deadlock has been prevented + +proc InitLock*(lock: var TLock) {.inline.} = + ## Initializes the lock `lock`. + InitSysLock(lock) + +proc OrderedLocks(g: PGcThread): bool = + for i in 0 .. g.locksLen-2: + if g.locks[i] >= g.locks[i+1]: return false + result = true + +proc TryAquire*(lock: var TLock): bool {.inline.} = + ## Try to aquires the lock `lock`. Returns `true` on success. + when noDeadlocks: + result = TryAquireSys(lock) + if not result: return + # we have to add it to the ordered list. Oh, and we might fail if + # there is no space in the array left ... + var g = ThisThread() + if g.locksLen >= len(g.locks): + ReleaseSys(lock) + raise newException(EResourceExhausted, "cannot aquire additional lock") + # find the position to add: + var p = addr(lock) + var L = g.locksLen-1 + var i = 0 + while i <= L: + assert g.locks[i] != nil + if g.locks[i] < p: inc(i) # in correct order + elif g.locks[i] == p: return # thread already holds lock + else: + # do the crazy stuff here: + while L >= i: + g.locks[L+1] = g.locks[L] + dec L + g.locks[i] = p + inc(g.locksLen) + assert OrderedLocks(g) + return + # simply add to the end: + g.locks[g.locksLen] = p + inc(g.locksLen) + assert OrderedLocks(g) + else: + result = TryAquireSys(lock) + +proc Aquire*(lock: var TLock) = + ## Aquires the lock `lock`. + when nodeadlocks: + var g = ThisThread() + var p = addr(lock) + var L = g.locksLen-1 + var i = 0 + while i <= L: + assert g.locks[i] != nil + if g.locks[i] < p: inc(i) # in correct order + elif g.locks[i] == p: return # thread already holds lock + else: + # do the crazy stuff here: + if g.locksLen >= len(g.locks): + raise newException(EResourceExhausted, "cannot aquire additional lock") + while L >= i: + ReleaseSys(cast[ptr TSysLock](g.locks[L])[]) + g.locks[L+1] = g.locks[L] + dec L + # aquire the current lock: + AquireSys(lock) + g.locks[i] = p + inc(g.locksLen) + # aquire old locks in proper order again: + L = g.locksLen-1 + inc i + while i <= L: + AquireSys(cast[ptr TSysLock](g.locks[i])[]) + inc(i) + # DANGER: We can only modify this global var if we gained every lock! + # NO! We need an atomic increment. Crap. + discard system.atomicInc(deadlocksPrevented, 1) + assert OrderedLocks(g) + return + + # simply add to the end: + if g.locksLen >= len(g.locks): + raise newException(EResourceExhausted, "cannot aquire additional lock") + AquireSys(lock) + g.locks[g.locksLen] = p + inc(g.locksLen) + assert OrderedLocks(g) + else: + AquireSys(lock) + +proc Release*(lock: var TLock) = + ## Releases the lock `lock`. + when nodeadlocks: + var g = ThisThread() + var p = addr(lock) + var L = g.locksLen + for i in countdown(L-1, 0): + if g.locks[i] == p: + for j in i..L-2: g.locks[j] = g.locks[j+1] + dec g.locksLen + break + ReleaseSys(lock) + diff --git a/lib/wrappers/zmq.nim b/lib/wrappers/zmq.nim new file mode 100644 index 000000000..8ebda26f9 --- /dev/null +++ b/lib/wrappers/zmq.nim @@ -0,0 +1,298 @@ +# Nimrod wrapper of 0mq +# Generated by c2nim with modifications and enhancement from Andreas Rumpf +# Original licence follows: + +# +# Copyright (c) 2007-2011 iMatix Corporation +# Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file +# +# This file is part of 0MQ. +# +# 0MQ is free software; you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# 0MQ is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +# Generated from zmq version 2.1.5 + +## Nimrod 0mq wrapper. This file contains the low level C wrappers as well as +## some higher level constructs. The higher level constructs are easily +## recognizable because they are the only ones that have documentation. + + +{.deadCodeElim: on.} +when defined(windows): + const + zmqdll* = "zmq.dll" +elif defined(macosx): + const + zmqdll* = "libzmq.dylib" +else: + const + zmqdll* = "libzmq.so" + +# A number random enough not to collide with different errno ranges on +# different OSes. The assumption is that error_t is at least 32-bit type. +const + HAUSNUMERO* = 156384712 + # On Windows platform some of the standard POSIX errnos are not defined. + ENOTSUP* = (HAUSNUMERO + 1) + EPROTONOSUPPORT* = (HAUSNUMERO + 2) + ENOBUFS* = (HAUSNUMERO + 3) + ENETDOWN* = (HAUSNUMERO + 4) + EADDRINUSE* = (HAUSNUMERO + 5) + EADDRNOTAVAIL* = (HAUSNUMERO + 6) + ECONNREFUSED* = (HAUSNUMERO + 7) + EINPROGRESS* = (HAUSNUMERO + 8) + # Native 0MQ error codes. + EFSM* = (HAUSNUMERO + 51) + ENOCOMPATPROTO* = (HAUSNUMERO + 52) + ETERM* = (HAUSNUMERO + 53) + EMTHREAD* = (HAUSNUMERO + 54) + # Maximal size of "Very Small Message". VSMs are passed by value + # to avoid excessive memory allocation/deallocation. + # If VMSs larger than 255 bytes are required, type of 'vsm_size' + # field in msg_t structure should be modified accordingly. + MAX_VSM_SIZE* = 30 + + POLLIN* = 1 + POLLOUT* = 2 + POLLERR* = 4 + + STREAMER* = 1 + FORWARDER* = 2 + QUEUE* = 3 + + PAIR* = 0 + PUB* = 1 + SUB* = 2 + REQ* = 3 + REP* = 4 + DEALER* = 5 + ROUTER* = 6 + PULL* = 7 + PUSH* = 8 + XPUB* = 9 + XSUB* = 10 + XREQ* = DEALER # Old alias, remove in 3.x + XREP* = ROUTER # Old alias, remove in 3.x + UPSTREAM* = PULL # Old alias, remove in 3.x + DOWNSTREAM* = PUSH # Old alias, remove in 3.x + +type + # Message types. These integers may be stored in 'content' member of the + # message instead of regular pointer to the data. + TMsgTypes* = enum + DELIMITER = 31, + VSM = 32 + # Message flags. MSG_SHARED is strictly speaking not a message flag + # (it has no equivalent in the wire format), however, making it a flag + # allows us to pack the stucture tighter and thus improve performance. + TMsgFlags* = enum + MSG_MORE = 1, + MSG_SHARED = 128, + MSG_MASK = 129 # Merges all the flags + # A message. Note that 'content' is not a pointer to the raw data. + # Rather it is pointer to zmq::msg_content_t structure + # (see src/msg_content.hpp for its definition). + TMsg*{.pure, final.} = object + content*: pointer + flags*: char + vsm_size*: char + vsm_data*: array[0..MAX_VSM_SIZE - 1, char] + + TFreeFn = proc (data, hint: pointer) {.noconv.} + + TContext {.final, pure.} = object + PContext* = ptr TContext + + # Socket Types + TSocket {.final, pure.} = object + PSocket* = ptr TSocket + + # Socket options. + TSockOptions* = enum + HWM = 1, + SWAP = 3, + AFFINITY = 4, + IDENTITY = 5, + SUBSCRIBE = 6, + UNSUBSCRIBE = 7, + RATE = 8, + RECOVERY_IVL = 9, + MCAST_LOOP = 10, + SNDBUF = 11, + RCVBUF = 12, + RCVMORE = 13, + FD = 14, + EVENTS = 15, + theTYPE = 16, + LINGER = 17, + RECONNECT_IVL = 18, + BACKLOG = 19, + RECOVERY_IVL_MSEC = 20, # opt. recovery time, reconcile in 3.x + RECONNECT_IVL_MAX = 21 + + # Send/recv options. + TSendRecvOptions* = enum + NOBLOCK, SNDMORE + + TPollItem*{.pure, final.} = object + socket*: PSocket + fd*: cint + events*: cshort + revents*: cshort + +# Run-time API version detection + +proc version*(major: var cint, minor: var cint, patch: var cint){.cdecl, + importc: "zmq_version", dynlib: zmqdll.} +#**************************************************************************** +# 0MQ errors. +#**************************************************************************** + +# This function retrieves the errno as it is known to 0MQ library. The goal +# of this function is to make the code 100% portable, including where 0MQ +# compiled with certain CRT library (on Windows) is linked to an +# application that uses different CRT library. + +proc errno*(): cint{.cdecl, importc: "zmq_errno", dynlib: zmqdll.} +# Resolves system errors and 0MQ errors to human-readable string. + +proc strerror*(errnum: cint): cstring {.cdecl, importc: "zmq_strerror", + dynlib: zmqdll.} +#**************************************************************************** +# 0MQ message definition. +#**************************************************************************** + +proc msg_init*(msg: var TMsg): cint{.cdecl, importc: "zmq_msg_init", + dynlib: zmqdll.} +proc msg_init*(msg: var TMsg, size: int): cint{.cdecl, + importc: "zmq_msg_init_size", dynlib: zmqdll.} +proc msg_init*(msg: var TMsg, data: cstring, size: int, + ffn: TFreeFn, hint: pointer): cint{.cdecl, + importc: "zmq_msg_init_data", dynlib: zmqdll.} +proc msg_close*(msg: var TMsg): cint {.cdecl, importc: "zmq_msg_close", + dynlib: zmqdll.} +proc msg_move*(dest, src: var TMsg): cint{.cdecl, + importc: "zmq_msg_move", dynlib: zmqdll.} +proc msg_copy*(dest, src: var TMsg): cint{.cdecl, + importc: "zmq_msg_copy", dynlib: zmqdll.} +proc msg_data*(msg: var TMsg): cstring {.cdecl, importc: "zmq_msg_data", + dynlib: zmqdll.} +proc msg_size*(msg: var TMsg): int {.cdecl, importc: "zmq_msg_size", + dynlib: zmqdll.} + +#**************************************************************************** +# 0MQ infrastructure (a.k.a. context) initialisation & termination. +#**************************************************************************** + +proc init*(io_threads: cint): PContext {.cdecl, importc: "zmq_init", + dynlib: zmqdll.} +proc term*(context: PContext): cint {.cdecl, importc: "zmq_term", + dynlib: zmqdll.} +#**************************************************************************** +# 0MQ socket definition. +#**************************************************************************** + +proc socket*(context: PContext, theType: cint): PSocket {.cdecl, + importc: "zmq_socket", dynlib: zmqdll.} +proc close*(s: PSocket): cint{.cdecl, importc: "zmq_close", dynlib: zmqdll.} +proc setsockopt*(s: PSocket, option: cint, optval: pointer, + optvallen: int): cint {.cdecl, importc: "zmq_setsockopt", + dynlib: zmqdll.} +proc getsockopt*(s: PSocket, option: cint, optval: pointer, + optvallen: ptr int): cint{.cdecl, + importc: "zmq_getsockopt", dynlib: zmqdll.} +proc bindAddr*(s: PSocket, address: cstring): cint{.cdecl, importc: "zmq_bind", + dynlib: zmqdll.} +proc connect*(s: PSocket, address: cstring): cint{.cdecl, + importc: "zmq_connect", dynlib: zmqdll.} +proc send*(s: PSocket, msg: var TMsg, flags: cint): cint{.cdecl, + importc: "zmq_send", dynlib: zmqdll.} +proc recv*(s: PSocket, msg: var TMsg, flags: cint): cint{.cdecl, + importc: "zmq_recv", dynlib: zmqdll.} +#**************************************************************************** +# I/O multiplexing. +#**************************************************************************** + +proc poll*(items: ptr TPollItem, nitems: cint, timeout: int): cint{. + cdecl, importc: "zmq_poll", dynlib: zmqdll.} + +#**************************************************************************** +# Built-in devices +#**************************************************************************** + +proc device*(device: cint, insocket, outsocket: PSocket): cint{. + cdecl, importc: "zmq_device", dynlib: zmqdll.} + +type + EZmq* = object of ESynch ## exception that is raised if something fails + TConnection* {.pure, final.} = object ## a connection + c*: PContext ## the embedded context + s*: PSocket ## the embedded socket + + TConnectionMode* = enum ## connection mode + conPAIR = 0, + conPUB = 1, + conSUB = 2, + conREQ = 3, + conREP = 4, + conDEALER = 5, + conROUTER = 6, + conPULL = 7, + conPUSH = 8, + conXPUB = 9, + conXSUB = 10 + +proc zmqError*() {.noinline, noreturn.} = + ## raises EZmq with error message from `zmq.strerror`. + var e: ref EZmq + new(e) + e.msg = $strerror(errno()) + raise e + +proc open*(address: string, server: bool, mode: TConnectionMode = conDEALER, + numthreads = 4): TConnection = + ## opens a new connection. If `server` is true, it uses `bindAddr` for the + ## underlying socket, otherwise it opens the socket with `connect`. + result.c = init(cint(numthreads)) + if result.c == nil: zmqError() + result.s = socket(result.c, cint(ord(mode))) + if result.s == nil: zmqError() + if server: + if bindAddr(result.s, address) != 0'i32: zmqError() + else: + if connect(result.s, address) != 0'i32: zmqError() + +proc close*(c: var TConnection) = + ## closes the connection. + if close(c.s) != 0'i32: zmqError() + if term(c.c) != 0'i32: zmqError() + +proc send*(c: var TConnection, msg: string) = + ## sends a message over the connection. + var m: TMsg + if msg_init(m, msg.len) != 0'i32: zmqError() + copyMem(msg_data(m), cstring(msg), msg.len) + if send(c.s, m, 0'i32) != 0'i32: zmqError() + discard msg_close(m) + +proc receive*(c: var TConnection): string = + ## receives a message from a connection. + var m: TMsg + if msg_init(m) != 0'i32: zmqError() + if recv(c.s, m, 0'i32) != 0'i32: zmqError() + result = newString(msg_size(m)) + copyMem(addr(result[0]), msg_data(m), result.len) + discard msg_close(m) + |