about summary refs log tree commit diff stats
path: root/075channel.mu
diff options
context:
space:
mode:
Diffstat (limited to '075channel.mu')
-rw-r--r--075channel.mu60
1 files changed, 39 insertions, 21 deletions
diff --git a/075channel.mu b/075channel.mu
index 7bd371fe..7720ccdd 100644
--- a/075channel.mu
+++ b/075channel.mu
@@ -1,17 +1,14 @@
-# Mu synchronizes using channels rather than locks, like Erlang and Go.
-#
-# The two ends of a channel will usually belong to different routines, but
-# each end should (currently) only be used by a single one. Don't try to read
-# from or write to it from multiple routines at once.
+# Mu synchronizes between routines using channels rather than locks, like
+# Erlang and Go.
 #
 # Key properties of channels:
 #
-# a) Writing to a full channel or reading from an empty one will put the
-# current routine in 'waiting' state until the operation can be completed.
+#   a) Writing to a full channel or reading from an empty one will put the
+#   current routine in 'waiting' state until the operation can be completed.
 #
-# b) Writing to a channel implicitly performs a deep copy, to prevent
-# addresses from being shared between routines, thereby causing race
-# conditions.
+#   b) Writing to a channel implicitly performs a deep copy. This prevents
+#   addresses from being shared between routines, and therefore eliminates all
+#   possibility of race conditions.
 
 scenario channel [
   run [
@@ -27,9 +24,7 @@ scenario channel [
 ]
 
 container channel:_elem [
-  # To avoid locking, writer and reader will never write to the same location.
-  # So channels will include fields in pairs, one for the writer and one for the
-  # reader.
+  lock:boolean  # inefficient but simple: serialize all reads as well as writes
   first-full:number  # for write
   first-free:number  # for read
   # A circular buffer contains values from index first-full up to (but not
@@ -70,13 +65,23 @@ def write out:address:sink:_elem, val:_elem -> out:address:sink:_elem [
   assert out, [write to null channel]
   chan:address:channel:_elem <- get *out, chan:offset
   <channel-write-initial>
+  # block until lock is acquired AND queue has room
+  lock:location <- get-location *chan, lock:offset
+#?   $print [write], 10/newline
   {
-    # block if chan is full
-    full:boolean <- channel-full? chan
-    break-unless full
-    full-address:location <- get-location *chan, first-full:offset
-    wait-for-location full-address
+#?     $print [trying to acquire lock for writing], 10/newline
+    wait-for-reset-then-set lock
+#?     $print [lock acquired for writing], 10/newline
+    full?:boolean <- channel-full? chan
+    break-unless full?
+#?     $print [but channel is full; relinquishing lock], 10/newline
+    # channel is full; relinquish lock and give a reader the opportunity to
+    # create room on it
+    reset lock
+    switch  # avoid spinlocking
+    loop
   }
+#?   $print [performing write], 10/newline
   # store a deep copy of val
   circular-buffer:address:array:_elem <- get *chan, data:offset
   free:number <- get *chan, first-free:offset
@@ -93,6 +98,8 @@ def write out:address:sink:_elem, val:_elem -> out:address:sink:_elem [
   }
   # write back
   *chan <- put *chan, first-free:offset, free
+#?   $print [relinquishing lock after writing], 10/newline
+  reset lock
 ]
 
 def read in:address:source:_elem -> result:_elem, eof?:boolean, in:address:source:_elem [
@@ -101,13 +108,22 @@ def read in:address:source:_elem -> result:_elem, eof?:boolean, in:address:sourc
   assert in, [read on null channel]
   eof? <- copy 0/false  # default result
   chan:address:channel:_elem <- get *in, chan:offset
+  # block until lock is acquired AND queue has data
+  lock:location <- get-location *chan, lock:offset
+#?   $print [read], 10/newline
   {
-    # block if chan is empty
+#?     $print [trying to acquire lock for reading], 10/newline
+    wait-for-reset-then-set lock
+#?     $print [lock acquired for reading], 10/newline
     empty?:boolean <- channel-empty? chan
     break-unless empty?
+#?     $print [but channel is empty; relinquishing lock], 10/newline
+    # channel is empty; relinquish lock and give a writer the opportunity to
+    # add to it
+    reset lock
     <channel-read-empty>
-    free-address:location <- get-location *chan, first-free:offset
-    wait-for-location free-address
+    switch  # avoid spinlocking
+    loop
   }
   # pull result off
   full:number <- get *chan, first-full:offset
@@ -127,6 +143,8 @@ def read in:address:source:_elem -> result:_elem, eof?:boolean, in:address:sourc
   }
   # write back
   *chan <- put *chan, first-full:offset, full
+#?   $print [relinquishing lock after reading], 10/newline
+  reset lock
 ]
 
 def clear in:address:source:_elem -> in:address:source:_elem [