diff options
author | Araq <rumpf_a@web.de> | 2011-07-16 18:34:18 +0200 |
---|---|---|
committer | Araq <rumpf_a@web.de> | 2011-07-16 18:34:18 +0200 |
commit | 42e6130b2c37345963c0b5469e12a287b88bf3eb (patch) | |
tree | 98a957645b04b43ddf86a7c499a8fc7d9cad69aa /lib | |
parent | fe5df368c18c79ee76fb63cb64121e1f9c3946bc (diff) | |
download | Nim-42e6130b2c37345963c0b5469e12a287b88bf3eb.tar.gz |
first steps to explicit channels for thread communication; added mainThreadId
Diffstat (limited to 'lib')
-rwxr-xr-x | lib/pure/ropes.nim | 10 | ||||
-rwxr-xr-x | lib/system.nim | 7 | ||||
-rwxr-xr-x | lib/system/inboxes.nim | 59 | ||||
-rwxr-xr-x | lib/system/mmdisp.nim | 20 | ||||
-rwxr-xr-x | lib/system/threads.nim | 12 |
5 files changed, 95 insertions, 13 deletions
diff --git a/lib/pure/ropes.nim b/lib/pure/ropes.nim index 69737576f..4a6c3f530 100755 --- a/lib/pure/ropes.nim +++ b/lib/pure/ropes.nim @@ -145,17 +145,17 @@ proc rope*(i: BiggestInt): PRope {.rtl, extern: "nro$1BiggestInt".} = proc rope*(f: BiggestFloat): PRope {.rtl, extern: "nro$1BiggestFloat".} = ## Converts a float to a rope. result = rope($f) - -proc disableCache*() {.rtl, extern: "nro$1".} = - ## the cache is discarded and disabled. The GC will reuse its used memory. - cache = nil - cacheEnabled = false proc enableCache*() {.rtl, extern: "nro$1".} = ## Enables the caching of leaves. This reduces the memory footprint at ## the cost of runtime efficiency. cacheEnabled = true +proc disableCache*() {.rtl, extern: "nro$1".} = + ## the cache is discarded and disabled. The GC will reuse its used memory. + cache = nil + cacheEnabled = false + proc `&`*(a, b: PRope): PRope {.rtl, extern: "nroConcRopeRope".} = ## the concatenation operator for ropes. if a == nil: diff --git a/lib/system.nim b/lib/system.nim index b6a119ddd..a8f8ba09b 100755 --- a/lib/system.nim +++ b/lib/system.nim @@ -783,7 +783,7 @@ const hasThreadSupport = compileOption("threads") hasSharedHeap = defined(boehmgc) # don't share heaps; every thread has its own -when hasThreadSupport and not hasSharedHeap: +when hasThreadSupport: {.pragma: rtlThreadVar, threadvar.} else: {.pragma: rtlThreadVar.} @@ -835,7 +835,7 @@ proc insert*[T](x: var seq[T], item: T, i = 0) {.noSideEffect.} = setLen(x, xl+1) var j = xl-1 while j >= i: - x[j+1] = x[j] + shallowCopy(x[j+1], x[j]) dec(j) x[i] = item @@ -1411,7 +1411,8 @@ var ## writes an error message and terminates the program. `outOfMemHook` can ## be used to raise an exception in case of OOM like so: ## - ## code-block:: nimrod + ## .. code-block:: nimrod + ## ## var gOutOfMem: ref EOutOfMemory ## new(gOutOfMem) # need to be allocated *before* OOM really happened! ## gOutOfMem.msg = "out of memory" diff --git a/lib/system/inboxes.nim b/lib/system/inboxes.nim index 7b522c7ae..b2db103cb 100755 --- a/lib/system/inboxes.nim +++ b/lib/system/inboxes.nim @@ -205,9 +205,8 @@ proc send*[TMsg](receiver: TThreadId[TMsg], msg: TMsg) = var q = cast[PInbox](getInBoxMem(receiver[])) sendImpl(q) -proc llRecv(res: pointer, typ: PNimType) = +proc llRecv(q: PInbox, res: pointer, typ: PNimType) = # to save space, the generic is as small as possible - var q = cast[PInbox](getInBoxMem()) acquireSys(q.lock) q.ready = true while q.count <= 0: @@ -222,7 +221,8 @@ proc llRecv(res: pointer, typ: PNimType) = proc recv*[TMsg](): TMsg = ## receives a message from its internal message queue. This blocks until ## a message has arrived! You may use ``peek`` to avoid the blocking. - llRecv(addr(result), cast[PNimType](getTypeInfo(result))) + var q = cast[PInbox](getInBoxMem()) + llRecv(q, addr(result), cast[PNimType](getTypeInfo(result))) proc peek*(): int = ## returns the current number of messages in the inbox. @@ -242,3 +242,56 @@ proc ready*[TMsg](t: var TThread[TMsg]): bool = var q = cast[PInbox](getInBoxMem(t)) result = q.ready +# ---------------------- channel support ------------------------------------- + +type + TChannel*[TMsg] = TInbox ## a channel for thread communication + TChannelId*[TMsg] = ptr TChannel[TMsg] ## the current implementation uses + ## a pointer as a channel ID. + +proc open*[TMsg](c: var TChannel[TMsg]) = + ## opens a channel `c` for inter thread communication. + initInbox(addr(c)) + +proc close*[TMsg](c: var TChannel[TMsg]) = + ## closes a channel `c` and frees its associated resources. + freeInbox(addr(c)) + +proc channelId*[TMsg](c: var TChannel[TMsg]): TChannelId[TMsg] {.inline.} = + ## returns the channel ID of `c`. + result = addr(c) + +proc send*[TMsg](c: var TChannel[TMsg], msg: TMsg) = + ## sends a message to a channel. `msg` is deeply copied. + var q = cast[PInbox](addr(c)) + sendImpl(q) + +proc send*[TMsg](c: TChannelId[TMsg], msg: TMsg) = + ## sends a message to a thread. `msg` is deeply copied. + var q = cast[PInbox](c) + sendImpl(q) + +proc peek*[TMsg](c: var TChannel[TMsg]): int = + ## returns the current number of messages in the channel `c`. + var q = cast[PInbox](addr(c)) + lockInbox(q): + result = q.count + +proc peek*[TMsg](c: TChannelId[TMsg]): int = + ## returns the current number of messages in the channel `c`. + var q = cast[PInbox](c) + lockInbox(q): + result = q.count + +proc recv*[TMsg](c: TChannelId[TMsg]): TMsg = + ## receives a message from the channel `c`. This blocks until + ## a message has arrived! You may use ``peek`` to avoid the blocking. + var q = cast[PInbox](c) + llRecv(q, addr(result), cast[PNimType](getTypeInfo(result))) + +proc recv*[TMsg](c: var TChannel[TMsg]): TMsg = + ## receives a message from the channel `c`. This blocks until + ## a message has arrived! You may use ``peek`` to avoid the blocking. + var q = cast[PInbox](addr(c)) + llRecv(q, addr(result), cast[PNimType](getTypeInfo(result))) + diff --git a/lib/system/mmdisp.nim b/lib/system/mmdisp.nim index e5efff615..0cbee1d47 100755 --- a/lib/system/mmdisp.nim +++ b/lib/system/mmdisp.nim @@ -133,6 +133,26 @@ when defined(boehmgc): proc asgnRefNoCycle(dest: ppointer, src: pointer) {.compilerproc, inline.} = dest[] = src + type + TMemRegion = object {.final, pure.} + + var + dummy {.rtlThreadVar.}: int + + proc rawAlloc(r: var TMemRegion, size: int): pointer = + result = boehmAlloc(size) + if result == nil: raiseOutOfMem() + proc rawAlloc0(r: var TMemRegion, size: int): pointer = + result = alloc(size) + zeroMem(result, size) + proc realloc(r: var TMemRegion, p: Pointer, newsize: int): pointer = + result = boehmRealloc(p, newsize) + if result == nil: raiseOutOfMem() + proc rawDealloc(r: var TMemRegion, p: Pointer) = boehmDealloc(p) + + proc deallocOsPages(r: var TMemRegion) {.inline.} = nil + proc deallocOsPages() {.inline.} = nil + include "system/cellsets" elif defined(nogc): # Even though we don't want the GC, we cannot simply use C's memory manager diff --git a/lib/system/threads.nim b/lib/system/threads.nim index 3e99afcde..823844c55 100755 --- a/lib/system/threads.nim +++ b/lib/system/threads.nim @@ -266,7 +266,10 @@ proc initInbox(p: pointer) proc freeInbox(p: pointer) when not defined(boehmgc) and not hasSharedHeap: proc deallocOsPages() - + +when defined(mainThread): + initInbox(addr(mainThread.inbox)) + template ThreadProcWrapperBody(closure: expr) = ThreadVarSetValue(globalsSlot, closure) var t = cast[ptr TThread[TMsg]](closure) @@ -379,6 +382,10 @@ proc myThreadId*[TMsg](): TThreadId[TMsg] = ## returns the thread ID of the thread that calls this proc. result = cast[TThreadId[TMsg]](ThreadVarGetValue(globalsSlot)) +proc mainThreadId*[TMsg](): TThreadId[TMsg] = + ## returns the thread ID of the main thread. + result = cast[TThreadId[TMsg]](addr(mainThread)) + when useStackMaskHack: proc runMain(tp: proc () {.thread.}) {.compilerproc.} = var mainThread: TThread[pointer] @@ -388,7 +395,8 @@ when useStackMaskHack: # --------------------------- lock handling ---------------------------------- type - TLock* = TSysLock ## Nimrod lock; not re-entrant! + TLock* = TSysLock ## Nimrod lock; whether this is re-entrant + ## or not is unspecified! const noDeadlocks = false # compileOption("deadlockPrevention") |