about summary refs log tree commit diff stats
path: root/075channel.mu
diff options
context:
space:
mode:
authorKartik K. Agaram <vc@akkartik.com>2016-09-14 19:25:55 -0700
committerKartik K. Agaram <vc@akkartik.com>2016-09-14 19:25:55 -0700
commitf8e6e864d85fb28bc64ad6af3815a7254497d4d2 (patch)
tree5a402c7af3445aa143e55411f306a1d40ddbc20a /075channel.mu
parent445bc53e6a6b3d87f35f5bffc6a87a9b4d73a2c1 (diff)
downloadmu-f8e6e864d85fb28bc64ad6af3815a7254497d4d2.tar.gz
3351 - new but incomplete synchronization setup
Previously our channels were based on an unconventional
`wait-for-location` primitive that waits for a specific address to
change its contents. This only works as long as a channel has a single
reader and a single writer routine. To support multiple readers and
writers we switch to a more conventional compare-and-set primitive.

There's still a couple of failing scenarios, though -- the ones using
`wait-for-routine-to-block`, because the new approach never blocks on an
empty or full channel, just yields CPU for a time before polling. Hmm,
how to fix this?
Diffstat (limited to '075channel.mu')
-rw-r--r--075channel.mu60
1 files changed, 39 insertions, 21 deletions
diff --git a/075channel.mu b/075channel.mu
index 7bd371fe..7720ccdd 100644
--- a/075channel.mu
+++ b/075channel.mu
@@ -1,17 +1,14 @@
-# Mu synchronizes using channels rather than locks, like Erlang and Go.
-#
-# The two ends of a channel will usually belong to different routines, but
-# each end should (currently) only be used by a single one. Don't try to read
-# from or write to it from multiple routines at once.
+# Mu synchronizes between routines using channels rather than locks, like
+# Erlang and Go.
 #
 # Key properties of channels:
 #
-# a) Writing to a full channel or reading from an empty one will put the
-# current routine in 'waiting' state until the operation can be completed.
+#   a) Writing to a full channel or reading from an empty one will put the
+#   current routine in 'waiting' state until the operation can be completed.
 #
-# b) Writing to a channel implicitly performs a deep copy, to prevent
-# addresses from being shared between routines, thereby causing race
-# conditions.
+#   b) Writing to a channel implicitly performs a deep copy. This prevents
+#   addresses from being shared between routines, and therefore eliminates all
+#   possibility of race conditions.
 
 scenario channel [
   run [
@@ -27,9 +24,7 @@ scenario channel [
 ]
 
 container channel:_elem [
-  # To avoid locking, writer and reader will never write to the same location.
-  # So channels will include fields in pairs, one for the writer and one for the
-  # reader.
+  lock:boolean  # inefficient but simple: serialize all reads as well as writes
   first-full:number  # for write
   first-free:number  # for read
   # A circular buffer contains values from index first-full up to (but not
@@ -70,13 +65,23 @@ def write out:address:sink:_elem, val:_elem -> out:address:sink:_elem [
   assert out, [write to null channel]
   chan:address:channel:_elem <- get *out, chan:offset
   <channel-write-initial>
+  # block until lock is acquired AND queue has room
+  lock:location <- get-location *chan, lock:offset
+#?   $print [write], 10/newline
   {
-    # block if chan is full
-    full:boolean <- channel-full? chan
-    break-unless full
-    full-address:location <- get-location *chan, first-full:offset
-    wait-for-location full-address
+#?     $print [trying to acquire lock for writing], 10/newline
+    wait-for-reset-then-set lock
+#?     $print [lock acquired for writing], 10/newline
+    full?:boolean <- channel-full? chan
+    break-unless full?
+#?     $print [but channel is full; relinquishing lock], 10/newline
+    # channel is full; relinquish lock and give a reader the opportunity to
+    # create room on it
+    reset lock
+    switch  # avoid spinlocking
+    loop
   }
+#?   $print [performing write], 10/newline
   # store a deep copy of val
   circular-buffer:address:array:_elem <- get *chan, data:offset
   free:number <- get *chan, first-free:offset
@@ -93,6 +98,8 @@ def write out:address:sink:_elem, val:_elem -> out:address:sink:_elem [
   }
   # write back
   *chan <- put *chan, first-free:offset, free
+#?   $print [relinquishing lock after writing], 10/newline
+  reset lock
 ]
 
 def read in:address:source:_elem -> result:_elem, eof?:boolean, in:address:source:_elem [
@@ -101,13 +108,22 @@ def read in:address:source:_elem -> result:_elem, eof?:boolean, in:address:sourc
   assert in, [read on null channel]
   eof? <- copy 0/false  # default result
   chan:address:channel:_elem <- get *in, chan:offset
+  # block until lock is acquired AND queue has data
+  lock:location <- get-location *chan, lock:offset
+#?   $print [read], 10/newline
   {
-    # block if chan is empty
+#?     $print [trying to acquire lock for reading], 10/newline
+    wait-for-reset-then-set lock
+#?     $print [lock acquired for reading], 10/newline
     empty?:boolean <- channel-empty? chan
     break-unless empty?
+#?     $print [but channel is empty; relinquishing lock], 10/newline
+    # channel is empty; relinquish lock and give a writer the opportunity to
+    # add to it
+    reset lock
     <channel-read-empty>
-    free-address:location <- get-location *chan, first-free:offset
-    wait-for-location free-address
+    switch  # avoid spinlocking
+    loop
   }
   # pull result off
   full:number <- get *chan, first-full:offset
@@ -127,6 +143,8 @@ def read in:address:source:_elem -> result:_elem, eof?:boolean, in:address:sourc
   }
   # write back
   *chan <- put *chan, first-full:offset, full
+#?   $print [relinquishing lock after reading], 10/newline
+  reset lock
 ]
 
 def clear in:address:source:_elem -> in:address:source:_elem [