summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rwxr-xr-xdoc/lib.txt2
-rw-r--r--lib/core/locks.nim5
-rwxr-xr-xlib/pure/sockets.nim2
-rwxr-xr-xlib/system.nim2
-rwxr-xr-xlib/system/channels.nim151
-rwxr-xr-xlib/system/threads.nim94
-rwxr-xr-xtodo.txt11
-rwxr-xr-xweb/nimrod.ini2
8 files changed, 93 insertions, 176 deletions
diff --git a/doc/lib.txt b/doc/lib.txt
index 7595028db..ddfaaf46c 100755
--- a/doc/lib.txt
+++ b/doc/lib.txt
@@ -35,7 +35,7 @@ Core
   Nimrod thread support. **Note**: This is part of the system module. Do not
   import it explicitely.
 
-* `inboxes <inboxes.html>`_
+* `channels <channels.html>`_
   Nimrod message passing support for threads. **Note**: This is part of the 
   system module. Do not import it explicitely.
 
diff --git a/lib/core/locks.nim b/lib/core/locks.nim
index 1fffb8e0a..6f139c7a2 100644
--- a/lib/core/locks.nim
+++ b/lib/core/locks.nim
@@ -19,7 +19,8 @@ include "lib/system/syslocks"
 
 type
   TLock* = TSysLock ## Nimrod lock; whether this is re-entrant
-                    ## or not is unspecified!
+                    ## or not is unspecified! However, compilation
+                    ## in preventDeadlocks-mode guarantees re-entrancy.
   TCond* = TSysCond ## Nimrod condition variable
   
 const
@@ -50,7 +51,7 @@ proc DeinitLock*(lock: var TLock) {.inline.} =
   ## Frees the resources associated with the lock.
   DeinitSys(lock)
 
-proc TryAcquire*(lock: var TLock): bool {.inline.} = 
+proc TryAcquire*(lock: var TLock): bool = 
   ## Tries to acquire the given lock. Returns `true` on success.
   result = TryAcquireSys(lock)
   when noDeadlocks:
diff --git a/lib/pure/sockets.nim b/lib/pure/sockets.nim
index 958b3483b..f18cf2ba8 100755
--- a/lib/pure/sockets.nim
+++ b/lib/pure/sockets.nim
@@ -406,9 +406,11 @@ proc connectAsync*(socket: TSocket, name: string, port = TPort(0),
         var err = WSAGetLastError()
         # Windows EINTR doesn't behave same as POSIX.
         if err == WSAEWOULDBLOCK:
+          freeaddrinfo(aiList)
           return
       else:
         if errno == EINTR or errno == EINPROGRESS:
+          freeaddrinfo(aiList)
           return
         
     it = it.ai_next
diff --git a/lib/system.nim b/lib/system.nim
index 1d8d18e1f..2e754ece7 100755
--- a/lib/system.nim
+++ b/lib/system.nim
@@ -1808,7 +1808,7 @@ when not defined(EcmaScript) and not defined(NimrodVM):
 
   include "system/sysio"
   when hasThreadSupport:
-    include "system/inboxes"
+    include "system/channels"
 
   iterator lines*(filename: string): string =
     ## Iterate over any line in the file named `filename`.
diff --git a/lib/system/channels.nim b/lib/system/channels.nim
index d4a4b1eb4..e443dd6c1 100755
--- a/lib/system/channels.nim
+++ b/lib/system/channels.nim
@@ -7,7 +7,7 @@
 #    distribution, for details about the copyright.

 #

 

-## Message passing for threads. **Note**: This is part of the system module.

+## Channel support for threads. **Note**: This is part of the system module.

 ## Do not import it directly. To activate thread support you need to compile

 ## with the ``--threads:on`` command line switch.

 ##

@@ -16,7 +16,7 @@
 

 type

   pbytes = ptr array[0.. 0xffff, byte]

-  TInbox {.pure, final.} = object ## msg queue for a thread

+  TRawChannel {.pure, final.} = object ## msg queue for a thread

     rd, wr, count, mask: int

     data: pbytes

     lock: TSysLock

@@ -24,28 +24,30 @@ type
     elemType: PNimType

     ready: bool

     region: TMemRegion

-  PInbox = ptr TInbox

-  TLoadStoreMode = enum mStore, mLoad

+  PRawChannel = ptr TRawChannel

+  TLoadStoreMode = enum mStore, mLoad
+  TChannel*[TMsg] = TRawChannel ## a channel for thread communication

 

-const ThreadDeadMask = -2

+const ChannelDeadMask = -2

 

-proc initInbox(p: pointer) =

-  var inbox = cast[PInbox](p)

-  initSysLock(inbox.lock)

-  initSysCond(inbox.cond)

-  inbox.mask = -1

+proc initRawChannel(p: pointer) =

+  var c = cast[PRawChannel](p)

+  initSysLock(c.lock)

+  initSysCond(c.cond)

+  c.mask = -1

 

-proc freeInbox(p: pointer) =

-  var inbox = cast[PInbox](p)

+proc deinitRawChannel(p: pointer) =

+  var c = cast[PRawChannel](p)

   # we need to grab the lock to be save against sending threads!

-  acquireSys(inbox.lock)

-  inbox.mask = ThreadDeadMask

-  deallocOsPages(inbox.region)

-  deinitSys(inbox.lock)

-  deinitSysCond(inbox.cond)

-

-proc storeAux(dest, src: Pointer, mt: PNimType, t: PInbox, mode: TLoadStoreMode)

-proc storeAux(dest, src: Pointer, n: ptr TNimNode, t: PInbox,

+  acquireSys(c.lock)

+  c.mask = ChannelDeadMask

+  deallocOsPages(c.region)

+  deinitSys(c.lock)

+  deinitSysCond(c.cond)

+

+proc storeAux(dest, src: Pointer, mt: PNimType, t: PRawChannel, 
+              mode: TLoadStoreMode)

+proc storeAux(dest, src: Pointer, n: ptr TNimNode, t: PRawChannel,

               mode: TLoadStoreMode) =

   var

     d = cast[TAddress](dest)

@@ -62,7 +64,7 @@ proc storeAux(dest, src: Pointer, n: ptr TNimNode, t: PInbox,
     if m != nil: storeAux(dest, src, m, t, mode)

   of nkNone: sysAssert(false)

 

-proc storeAux(dest, src: Pointer, mt: PNimType, t: PInbox, 

+proc storeAux(dest, src: Pointer, mt: PNimType, t: PRawChannel, 

               mode: TLoadStoreMode) =

   var

     d = cast[TAddress](dest)

@@ -138,9 +140,9 @@ proc storeAux(dest, src: Pointer, mt: PNimType, t: PInbox,
       if mode == mStore:

         x[] = Alloc(t.region, mt.base.size)

       else:

-        # XXX we should use the dynamic type here too, but that is not stored in

-        # the inbox at all --> use source[]'s object type? but how? we need a

-        # tyRef to the object!

+        # XXX we should use the dynamic type here too, but that is not stored
+        # in the inbox at all --> use source[]'s object type? but how? we need
+        # a tyRef to the object!

         var obj = newObj(mt.base, mt.base.size)

         unsureAsgnRef(x, obj)

       storeAux(x[], s, mt.base, t, mode)

@@ -148,7 +150,7 @@ proc storeAux(dest, src: Pointer, mt: PNimType, t: PInbox,
   else:

     copyMem(dest, src, mt.size) # copy raw bits

 

-proc rawSend(q: PInbox, data: pointer, typ: PNimType) =

+proc rawSend(q: PRawChannel, data: pointer, typ: PNimType) =

   ## adds an `item` to the end of the queue `q`.

   var cap = q.mask+1

   if q.count >= cap:

@@ -172,19 +174,19 @@ proc rawSend(q: PInbox, data: pointer, typ: PNimType) =
   inc q.count

   q.wr = (q.wr + 1) and q.mask

 

-proc rawRecv(q: PInbox, data: pointer, typ: PNimType) =

+proc rawRecv(q: PRawChannel, data: pointer, typ: PNimType) =

   assert q.count > 0

   dec q.count

   storeAux(data, addr(q.data[q.rd * typ.size]), typ, q, mLoad)

   q.rd = (q.rd + 1) and q.mask

 

-template lockInbox(q: expr, action: stmt) =

+template lockChannel(q: expr, action: stmt) =

   acquireSys(q.lock)

   action

   releaseSys(q.lock)

 

 template sendImpl(q: expr) =  

-  if q.mask == ThreadDeadMask:

+  if q.mask == ChannelDeadMask:

     raise newException(EDeadThread, "cannot send message; thread died")

   acquireSys(q.lock)

   var m: TMsg

@@ -195,17 +197,12 @@ template sendImpl(q: expr) =
   releaseSys(q.lock)

   SignalSysCond(q.cond)

 

-proc send*[TMsg](receiver: var TThread[TMsg], msg: TMsg) =

-  ## sends a message to a thread. `msg` is deeply copied.

-  var q = cast[PInbox](getInBoxMem(receiver))

-  sendImpl(q)

-

-proc send*[TMsg](receiver: TThreadId[TMsg], msg: TMsg) =

+proc send*[TMsg](c: var TChannel[TMsg], msg: TMsg) =

   ## sends a message to a thread. `msg` is deeply copied.

-  var q = cast[PInbox](getInBoxMem(receiver[]))

+  var q = cast[PRawChannel](addr(c))

   sendImpl(q)

 

-proc llRecv(q: PInbox, res: pointer, typ: PNimType) =

+proc llRecv(q: PRawChannel, res: pointer, typ: PNimType) =

   # to save space, the generic is as small as possible

   acquireSys(q.lock)

   q.ready = true

@@ -218,80 +215,30 @@ proc llRecv(q: PInbox, res: pointer, typ: PNimType) =
   rawRecv(q, res, typ)

   releaseSys(q.lock)

 

-proc recv*[TMsg](): TMsg =

-  ## receives a message from its internal message queue. This blocks until

+proc recv*[TMsg](c: var TChannel[TMsg]): TMsg =

+  ## receives a message from the channel `c`. This blocks until

   ## a message has arrived! You may use ``peek`` to avoid the blocking.

-  var q = cast[PInbox](getInBoxMem())

+  var q = cast[PRawChannel](addr(c))

   llRecv(q, addr(result), cast[PNimType](getTypeInfo(result)))

 

-proc peek*(): int =

-  ## returns the current number of messages in the inbox.

-  var q = cast[PInbox](getInBoxMem())

-  lockInbox(q):

-    result = q.count

-

-proc peek*[TMsg](t: var TThread[TMsg]): int =

-  ## returns the current number of messages in the inbox of thread `t`.

-  var q = cast[PInbox](getInBoxMem(t))

-  if q.mask != ThreadDeadMask:

-    lockInbox(q):

+proc peek*[TMsg](c: var TChannel[TMsg]): int =

+  ## returns the current number of messages in the channel `c`.

+  var q = cast[PRawChannel](addr(c))

+  if q.mask != ChannelDeadMask:

+    lockChannel(q):

       result = q.count

 

-proc ready*[TMsg](t: var TThread[TMsg]): bool =

-  ## returns true iff the thread `t` is waiting on ``recv`` for new messages.

-  var q = cast[PInbox](getInBoxMem(t))

-  result = q.ready

-

-# ---------------------- channel support -------------------------------------

-

-type

-  TChannel*[TMsg] = TInbox ## a channel for thread communication

-  TChannelId*[TMsg] = ptr TChannel[TMsg] ## the current implementation uses

-                                         ## a pointer as a channel ID.

-

 proc open*[TMsg](c: var TChannel[TMsg]) =

   ## opens a channel `c` for inter thread communication.

-  initInbox(addr(c))

+  initRawChannel(addr(c))

 

 proc close*[TMsg](c: var TChannel[TMsg]) =

   ## closes a channel `c` and frees its associated resources.

-  freeInbox(addr(c))

-

-proc channelId*[TMsg](c: var TChannel[TMsg]): TChannelId[TMsg] {.inline.} =

-  ## returns the channel ID of `c`.

-  result = addr(c)

-  

-proc send*[TMsg](c: var TChannel[TMsg], msg: TMsg) =

-  ## sends a message to a channel. `msg` is deeply copied.

-  var q = cast[PInbox](addr(c))

-  sendImpl(q)

-

-proc send*[TMsg](c: TChannelId[TMsg], msg: TMsg) =

-  ## sends a message to a thread. `msg` is deeply copied.

-  var q = cast[PInbox](c)

-  sendImpl(q)

-

-proc peek*[TMsg](c: var TChannel[TMsg]): int =

-  ## returns the current number of messages in the channel `c`.

-  var q = cast[PInbox](addr(c))

-  lockInbox(q):

-    result = q.count

-

-proc peek*[TMsg](c: TChannelId[TMsg]): int =

-  ## returns the current number of messages in the channel `c`.

-  var q = cast[PInbox](c)

-  lockInbox(q):

-    result = q.count

-

-proc recv*[TMsg](c: TChannelId[TMsg]): TMsg =

-  ## receives a message from the channel `c`. This blocks until

-  ## a message has arrived! You may use ``peek`` to avoid the blocking.

-  var q = cast[PInbox](c)

-  llRecv(q, addr(result), cast[PNimType](getTypeInfo(result)))

-

-proc recv*[TMsg](c: var TChannel[TMsg]): TMsg =

-  ## receives a message from the channel `c`. This blocks until

-  ## a message has arrived! You may use ``peek`` to avoid the blocking.

-  var q = cast[PInbox](addr(c))

-  llRecv(q, addr(result), cast[PNimType](getTypeInfo(result)))

+  deinitRawChannel(addr(c))

 

+proc ready*[TMsg](c: var TChannel[TMsg]): bool =

+  ## returns true iff some thread is waiting on the channel `c` for
+  ## new messages.

+  var q = cast[PRawChannel](addr(c))

+  result = q.ready

+
diff --git a/lib/system/threads.nim b/lib/system/threads.nim
index 4ec1239a9..2079762f8 100755
--- a/lib/system/threads.nim
+++ b/lib/system/threads.nim
@@ -167,7 +167,6 @@ type
   PGcThread = ptr TGcThread
   TGcThread {.pure.} = object
     sys: TSysThread
-    inbox: TThreadLocalStorage
     when emulatedThreadVars and not useStackMaskHack:
       tls: TThreadLocalStorage
     else:
@@ -249,27 +248,22 @@ when not defined(useNimRtl):
 # GC'ed closures in Nimrod.
 
 type
-  TThread* {.pure, final.}[TMsg] =
+  TThread* {.pure, final.}[TArg] =
       object of TGcThread ## Nimrod thread. A thread is a heavy object (~14K)
                           ## that **must not** be part of a message! Use
                           ## a ``TThreadId`` for that.
-    emptyFn: proc ()
-    dataFn: proc (m: TMsg)
-    data: TMsg
-  TThreadId*[TMsg] = ptr TThread[TMsg] ## the current implementation uses
+    dataFn: proc (m: TArg)
+    when TArg isnot void:
+      data: TArg
+  TThreadId*[TArg] = ptr TThread[TArg] ## the current implementation uses
                                        ## a pointer as a thread ID.
 
-proc initInbox(p: pointer)
-proc freeInbox(p: pointer)
 when not defined(boehmgc) and not hasSharedHeap:
   proc deallocOsPages()
 
-when defined(mainThread):
-  initInbox(addr(mainThread.inbox))
-
 template ThreadProcWrapperBody(closure: expr) =
   ThreadVarSetValue(globalsSlot, closure)
-  var t = cast[ptr TThread[TMsg]](closure)
+  var t = cast[ptr TThread[TArg]](closure)
   when useStackMaskHack:
     var tls: TThreadLocalStorage
   when not defined(boehmgc) and not defined(nogc) and not hasSharedHeap:
@@ -279,9 +273,8 @@ template ThreadProcWrapperBody(closure: expr) =
   when defined(registerThread):
     t.stackBottom = addr(t)
     registerThread(t)
-  if t.emptyFn == nil: t.dataFn(t.data)
-  else: t.emptyFn()
-  freeInbox(addr(t.inbox))
+  if TArg is void: t.dataFn()
+  else: t.dataFn(t.data)
   when defined(registerThread): unregisterThread(t)
   when defined(deallocOsPages): deallocOsPages()
   # Since an unhandled exception terminates the whole process (!), there is
@@ -291,31 +284,30 @@ template ThreadProcWrapperBody(closure: expr) =
   # page!
   
   # mark as not running anymore:
-  t.emptyFn = nil
   t.dataFn = nil
   
 {.push stack_trace:off.}
 when defined(windows):
-  proc threadProcWrapper[TMsg](closure: pointer): int32 {.stdcall.} = 
+  proc threadProcWrapper[TArg](closure: pointer): int32 {.stdcall.} = 
     ThreadProcWrapperBody(closure)
     # implicitely return 0
 else:
-  proc threadProcWrapper[TMsg](closure: pointer) {.noconv.} = 
+  proc threadProcWrapper[TArg](closure: pointer) {.noconv.} = 
     ThreadProcWrapperBody(closure)
 {.pop.}
 
-proc running*[TMsg](t: TThread[TMsg]): bool {.inline.} = 
+proc running*[TArg](t: TThread[TArg]): bool {.inline.} = 
   ## returns true if `t` is running.
-  result = t.emptyFn != nil or t.dataFn != nil
+  result = t.dataFn != nil
 
-proc joinThread*[TMsg](t: TThread[TMsg]) {.inline.} = 
+proc joinThread*[TArg](t: TThread[TArg]) {.inline.} = 
   ## waits for the thread `t` to finish.
   when hostOS == "windows":
     discard WaitForSingleObject(t.sys, -1'i32)
   else:
     discard pthread_join(t.sys, nil)
 
-proc joinThreads*[TMsg](t: openArray[TThread[TMsg]]) = 
+proc joinThreads*[TArg](t: openArray[TThread[TArg]]) = 
   ## waits for every thread in `t` to finish.
   when hostOS == "windows":
     var a: array[0..255, TSysThread]
@@ -327,27 +319,28 @@ proc joinThreads*[TMsg](t: openArray[TThread[TMsg]]) =
 
 when false:
   # XXX a thread should really release its heap here somehow:
-  proc destroyThread*[TMsg](t: var TThread[TMsg]) =
+  proc destroyThread*[TArg](t: var TThread[TArg]) =
     ## forces the thread `t` to terminate. This is potentially dangerous if
     ## you don't have full control over `t` and its acquired resources.
     when hostOS == "windows":
       discard TerminateThread(t.sys, 1'i32)
     else:
       discard pthread_cancel(t.sys)
-    unregisterThread(addr(t))
+    when defined(registerThread): unregisterThread(addr(t))
+    t.dataFn = nil
 
-proc createThread*[TMsg](t: var TThread[TMsg], 
-                         tp: proc (msg: TMsg) {.thread.}, 
-                         param: TMsg) =
+proc createThread*[TArg](t: var TThread[TArg], 
+                         tp: proc (arg: TArg) {.thread.}, 
+                         param: TArg) =
   ## creates a new thread `t` and starts its execution. Entry point is the
-  ## proc `tp`. `param` is passed to `tp`.
-  t.data = param
+  ## proc `tp`. `param` is passed to `tp`. `TArg` can be ``void`` if you
+  ## don't need to pass any data to the thread.
+  when TArg isnot void: t.data = param
   t.dataFn = tp
   when hasSharedHeap: t.stackSize = ThreadStackSize
-  initInbox(addr(t.inbox))
   when hostOS == "windows":
     var dummyThreadId: int32
-    t.sys = CreateThread(nil, ThreadStackSize, threadProcWrapper[TMsg],
+    t.sys = CreateThread(nil, ThreadStackSize, threadProcWrapper[TArg],
                          addr(t), 0'i32, dummyThreadId)
     if t.sys <= 0:
       raise newException(EResourceExhausted, "cannot create thread")
@@ -355,39 +348,20 @@ proc createThread*[TMsg](t: var TThread[TMsg],
     var a: Tpthread_attr
     pthread_attr_init(a)
     pthread_attr_setstacksize(a, ThreadStackSize)
-    if pthread_create(t.sys, a, threadProcWrapper[TMsg], addr(t)) != 0:
+    if pthread_create(t.sys, a, threadProcWrapper[TArg], addr(t)) != 0:
       raise newException(EResourceExhausted, "cannot create thread")
 
-proc createThread*[TMsg](t: var TThread[TMsg], tp: proc () {.thread.}) =
-  ## creates a new thread `t` and starts its execution. Entry point is the
-  ## proc `tp`.
-  t.emptyFn = tp
-  when hasSharedHeap: t.stackSize = ThreadStackSize
-  initInbox(addr(t.inbox))
-  when hostOS == "windows":
-    var dummyThreadId: int32
-    t.sys = CreateThread(nil, ThreadStackSize, threadProcWrapper[TMsg],
-                         addr(t), 0'i32, dummyThreadId)
-    if t.sys <= 0:
-      raise newException(EResourceExhausted, "cannot create thread")
-  else:
-    var a: Tpthread_attr
-    pthread_attr_init(a)
-    pthread_attr_setstacksize(a, ThreadStackSize)
-    if pthread_create(t.sys, a, threadProcWrapper[TMsg], addr(t)) != 0:
-      raise newException(EResourceExhausted, "cannot create thread")
-
-proc threadId*[TMsg](t: var TThread[TMsg]): TThreadId[TMsg] {.inline.} =
+proc threadId*[TArg](t: var TThread[TArg]): TThreadId[TArg] {.inline.} =
   ## returns the thread ID of `t`.
   result = addr(t)
 
-proc myThreadId*[TMsg](): TThreadId[TMsg] =
+proc myThreadId*[TArg](): TThreadId[TArg] =
   ## returns the thread ID of the thread that calls this proc.
-  result = cast[TThreadId[TMsg]](ThreadVarGetValue(globalsSlot))
+  result = cast[TThreadId[TArg]](ThreadVarGetValue(globalsSlot))
 
-proc mainThreadId*[TMsg](): TThreadId[TMsg] =
+proc mainThreadId*[TArg](): TThreadId[TArg] =
   ## returns the thread ID of the main thread.
-  result = cast[TThreadId[TMsg]](addr(mainThread))
+  result = cast[TThreadId[TArg]](addr(mainThread))
 
 when useStackMaskHack:
   proc runMain(tp: proc () {.thread.}) {.compilerproc.} =
@@ -395,11 +369,3 @@ when useStackMaskHack:
     createThread(mainThread, tp)
     joinThread(mainThread)
 
-# ------------------------ message passing support ---------------------------
-
-proc getInBoxMem[TMsg](t: var TThread[TMsg]): pointer {.inline.} =
-  result = addr(t.inbox)
-
-proc getInBoxMem(): pointer {.inline.} =
-  result = addr(cast[PGcThread](ThreadVarGetValue(globalsSlot)).inbox)
-
diff --git a/todo.txt b/todo.txt
index b779eb6dc..ad80e4812 100755
--- a/todo.txt
+++ b/todo.txt
@@ -1,12 +1,11 @@
 Version 0.8.14
 ==============
 
-- optional indentation for 'case' statement
-- test the sort implementation again
-- fix the 'const' issues
 - threads should not have an inbox per default
 - make threadvar efficient again on linux after testing
-
+- fix the 'const' issues
+- test the sort implementation again
+- optional indentation for 'case' statement
 
 version 0.9.0
 =============
@@ -47,7 +46,9 @@ version 0.9.XX
 - checked exceptions
 - fix implicit generic routines
 - think about ``{:}.toTable[int, string]()``
-- nice idea: 
+- mocking support with ``tyProxy`` that does:
+  o.p(x) --> p(o, x) -->  myMacro(o, p, x)
+- nice idea:
 
   p(a, b): 
     echo a
diff --git a/web/nimrod.ini b/web/nimrod.ini
index 156bc5221..c56b8bb84 100755
--- a/web/nimrod.ini
+++ b/web/nimrod.ini
@@ -26,7 +26,7 @@ doc: "tools;c2nim;niminst"
 pdf: "manual;lib;tut1;tut2;nimrodc;c2nim;niminst"
 srcdoc: "core/macros;pure/marshal;core/typeinfo"
 srcdoc: "impure/graphics;impure/re;pure/sockets"
-srcdoc: "system.nim;system/threads.nim;system/inboxes.nim"
+srcdoc: "system.nim;system/threads.nim;system/channels.nim"
 srcdoc: "pure/os;pure/strutils;pure/math;pure/matchers"
 srcdoc: "pure/complex;pure/times;pure/osproc;pure/pegs;pure/dynlib"
 srcdoc: "pure/parseopt;pure/hashes;pure/strtabs;pure/lexbase"