summary refs log tree commit diff stats
diff options
context:
space:
mode:
authorAraq <rumpf_a@web.de>2011-07-09 01:18:33 +0200
committerAraq <rumpf_a@web.de>2011-07-09 01:18:33 +0200
commit2565ff8ddec9fcf43fbda2fae6f04806c1bc6e8a (patch)
tree8ab9b57efd41ac133eaf29b0a751d42cce1f66df
parent99bcc233cd8fb3bb9b6f3f0857e477dd9b33c9e8 (diff)
downloadNim-2565ff8ddec9fcf43fbda2fae6f04806c1bc6e8a.tar.gz
basic message passing working
-rwxr-xr-xlib/system.nim7
-rw-r--r--lib/system/inboxes.nim27
-rwxr-xr-xlib/system/threads.nim37
-rwxr-xr-xtodo.txt2
4 files changed, 56 insertions, 17 deletions
diff --git a/lib/system.nim b/lib/system.nim
index 5c7102664..6f20a9b4d 100755
--- a/lib/system.nim
+++ b/lib/system.nim
@@ -236,7 +236,7 @@ type
 
   EInvalidObjectAssignment* =
     object of ESynch ## is raised if an object gets assigned to its
-                     ## farther's object.
+                     ## parent's object.
 
   EInvalidObjectConversion* =
     object of ESynch ## is raised if an object is converted to an incompatible
@@ -261,7 +261,10 @@ type
                              ## that cannot be represented with infinite
                              ## precision -- for example, 2.0 / 3.0, log(1.1) 
                              ## NOTE: Nimrod currently does not detect these!
-
+  EDeadThread* =
+    object of ESynch ## is raised if it is attempted to send a message to a
+                     ## dead thread.
+                     
   TResult* = enum Failure, Success
 
 proc sizeof*[T](x: T): natural {.magic: "SizeOf", noSideEffect.}
diff --git a/lib/system/inboxes.nim b/lib/system/inboxes.nim
index 8f683f612..1f5d56c16 100644
--- a/lib/system/inboxes.nim
+++ b/lib/system/inboxes.nim
@@ -22,6 +22,8 @@ type
   PInbox = ptr TInbox

   TLoadStoreMode = enum mStore, mLoad

 

+const ThreadDeadMask = -2

+

 proc initInbox(p: pointer) =

   var inbox = cast[PInbox](p)

   initSysLock(inbox.lock)

@@ -30,6 +32,9 @@ proc initInbox(p: pointer) =
 

 proc freeInbox(p: pointer) =

   var inbox = cast[PInbox](p)

+  # we need to grab the lock to be save against sending threads!

+  acquireSys(inbox.lock)

+  inbox.mask = ThreadDeadMask

   deallocOsPages(inbox.region)

   deinitSys(inbox.lock)

   deinitSysCond(inbox.cond)

@@ -158,7 +163,6 @@ proc rawSend(q: PInbox, data: pointer, typ: PNimType) =
     q.mask = cap*2 - 1

     q.wr = q.count

     q.rd = 0

-    #echo "came here"

   storeAux(addr(q.data[q.wr * typ.size]), data, typ, q, mStore)

   inc q.count

   q.wr = (q.wr + 1) and q.mask

@@ -177,23 +181,34 @@ template lockInbox(q: expr, action: stmt) =
 proc send*[TMsg](receiver: var TThread[TMsg], msg: TMsg) =

   ## sends a message to a thread. `msg` is deeply copied.

   var q = cast[PInbox](getInBoxMem(receiver))

+  if q.mask == ThreadDeadMask:

+    raise newException(EDeadThread, "cannot send message; thread died")

   acquireSys(q.lock)

   var m: TMsg

   shallowCopy(m, msg)

-  rawSend(q, addr(m), cast[PNimType](getTypeInfo(msg)))

+  var typ = cast[PNimType](getTypeInfo(msg))

+  rawSend(q, addr(m), typ)

+  q.elemType = typ

   releaseSys(q.lock)

   SignalSysCond(q.cond)

 

-proc recv*[TMsg](): TMsg =

-  ## receives a message from its internal message queue. This blocks until

-  ## a message has arrived! You may use ``peek`` to avoid the blocking.

+proc llRecv(res: pointer, typ: PNimType) =

+  # to save space, the generic is as small as possible

   var q = cast[PInbox](getInBoxMem())

   acquireSys(q.lock)

   while q.count <= 0:

     WaitSysCond(q.cond, q.lock)

-  rawRecv(q, addr(result), cast[PNimType](getTypeInfo(result)))

+  if typ != q.elemType:

+    releaseSys(q.lock)

+    raise newException(EInvalidValue, "cannot receive message of wrong type")

+  rawRecv(q, res, typ)

   releaseSys(q.lock)

 

+proc recv*[TMsg](): TMsg =

+  ## receives a message from its internal message queue. This blocks until

+  ## a message has arrived! You may use ``peek`` to avoid the blocking.

+  llRecv(addr(result), cast[PNimType](getTypeInfo(result)))

+

 proc peek*(): int =

   ## returns the current number of messages in the inbox.

   var q = cast[PInbox](getInBoxMem())

diff --git a/lib/system/threads.nim b/lib/system/threads.nim
index 9bb67863b..bd361760d 100755
--- a/lib/system/threads.nim
+++ b/lib/system/threads.nim
@@ -249,7 +249,8 @@ when not defined(useNimRtl):
 
 type
   TThread* {.pure, final.}[TParam] = object of TGcThread ## Nimrod thread.
-    fn: proc (p: TParam)
+    emptyFn: proc ()
+    dataFn: proc (p: TParam)
     data: TParam
 
 proc initInbox(p: pointer)
@@ -268,14 +269,14 @@ template ThreadProcWrapperBody(closure: expr) =
     initGC()
   t.stackBottom = addr(t)
   registerThread(t)
-  initInbox(addr(t.inbox))
   try:
     when false:
       var a = addr(tls)
       var b = MaskStackPointer(1293920-372736-303104-36864)
       c_fprintf(c_stdout, "TLS:    %p\nmasked: %p\ndiff:   %ld\n",
                 a, b, cast[int](a) - cast[int](b))
-    t.fn(t.data)
+    if t.emptyFn == nil: t.dataFn(t.data)
+    else: t.emptyFn()
   finally:
     # XXX shut-down is not executed when the thread is forced down!
     freeInbox(addr(t.inbox))
@@ -326,8 +327,28 @@ proc createThread*[TParam](t: var TThread[TParam],
   ## creates a new thread `t` and starts its execution. Entry point is the
   ## proc `tp`. `param` is passed to `tp`.
   t.data = param
-  t.fn = tp
+  t.dataFn = tp
+  t.stackSize = ThreadStackSize
+  initInbox(addr(t.inbox))
+  when hostOS == "windows":
+    var dummyThreadId: int32
+    t.sys = CreateThread(nil, ThreadStackSize, threadProcWrapper[TParam],
+                         addr(t), 0'i32, dummyThreadId)
+    if t.sys <= 0:
+      raise newException(EResourceExhausted, "cannot create thread")
+  else:
+    var a: Tpthread_attr
+    pthread_attr_init(a)
+    pthread_attr_setstacksize(a, ThreadStackSize)
+    if pthread_create(t.sys, a, threadProcWrapper[TParam], addr(t)) != 0:
+      raise newException(EResourceExhausted, "cannot create thread")
+
+proc createThread*[TParam](t: var TThread[TParam], tp: proc () {.thread.}) =
+  ## creates a new thread `t` and starts its execution. Entry point is the
+  ## proc `tp`.
+  t.emptyFn = tp
   t.stackSize = ThreadStackSize
+  initInbox(addr(t.inbox))
   when hostOS == "windows":
     var dummyThreadId: int32
     t.sys = CreateThread(nil, ThreadStackSize, threadProcWrapper[TParam],
@@ -342,9 +363,9 @@ proc createThread*[TParam](t: var TThread[TParam],
       raise newException(EResourceExhausted, "cannot create thread")
 
 when useStackMaskHack:
-  proc runMain(tp: proc (dummy: pointer) {.thread.}) {.compilerproc.} =
+  proc runMain(tp: proc () {.thread.}) {.compilerproc.} =
     var mainThread: TThread[pointer]
-    createThread(mainThread, tp, nil)
+    createThread(mainThread, tp)
     joinThread(mainThread)
 
 # --------------------------- lock handling ----------------------------------
@@ -462,9 +483,9 @@ proc Release*(lock: var TLock) =
 
 # ------------------------ message passing support ---------------------------
 
-proc getInBoxMem*[TMsg](t: var TThread[TMsg]): pointer {.inline.} =
+proc getInBoxMem[TMsg](t: var TThread[TMsg]): pointer {.inline.} =
   result = addr(t.inbox)
 
-proc getInBoxMem*(): pointer {.inline.} =
+proc getInBoxMem(): pointer {.inline.} =
   result = addr(cast[PGcThread](ThreadVarGetValue(globalsSlot)).inbox)
 
diff --git a/todo.txt b/todo.txt
index 710f9b8aa..3c9754e10 100755
--- a/todo.txt
+++ b/todo.txt
@@ -1,12 +1,12 @@
 High priority (version 0.8.12)
 ==============================
+* ``force_nosideEffect`` or some similar pragma is needed (``loophole``?)
 * test threads on windows
 * test thread analysis: 
   var x = globalString # ok, copied; `x` is mine!
   vs
   var x = globalRef # read access, `x` is theirs!
 
-* test message passing built-ins
 * make threadvar efficient again on linux after testing
 * document Nimrod's threads
 * document Nimrod's two phase symbol lookup for generics