about summary refs log tree commit diff stats
diff options
context:
space:
mode:
authorKartik K. Agaram <vc@akkartik.com>2016-09-14 19:25:55 -0700
committerKartik K. Agaram <vc@akkartik.com>2016-09-14 19:25:55 -0700
commitf8e6e864d85fb28bc64ad6af3815a7254497d4d2 (patch)
tree5a402c7af3445aa143e55411f306a1d40ddbc20a
parent445bc53e6a6b3d87f35f5bffc6a87a9b4d73a2c1 (diff)
downloadmu-f8e6e864d85fb28bc64ad6af3815a7254497d4d2.tar.gz
3351 - new but incomplete synchronization setup
Previously our channels were based on an unconventional
`wait-for-location` primitive that waits for a specific address to
change its contents. This only works as long as a channel has a single
reader and a single writer routine. To support multiple readers and
writers we switch to a more conventional compare-and-set primitive.

There's still a couple of failing scenarios, though -- the ones using
`wait-for-routine-to-block`, because the new approach never blocks on an
empty or full channel, just yields CPU for a time before polling. Hmm,
how to fix this?
-rw-r--r--073wait.cc89
-rw-r--r--075channel.mu60
2 files changed, 101 insertions, 48 deletions
diff --git a/073wait.cc b/073wait.cc
index a642c86a..0d7c3e4e 100644
--- a/073wait.cc
+++ b/073wait.cc
@@ -5,18 +5,22 @@
 
 :(scenario wait_for_location)
 def f1 [
-  1:number <- copy 0
+  10:number <- copy 34
   start-running f2
-  2:location <- copy 1/unsafe
-  wait-for-location 2:location
-  # now wait for f2 to run and modify location 1 before using its value
-  3:number <- copy 1:number
+  20:location <- copy 10/unsafe
+  wait-for-reset-then-set 20:location
+  # wait for f2 to run and reset location 1
+  30:number <- copy 10:number
 ]
 def f2 [
-  1:number <- copy 34
+  10:location <- copy 0/unsafe
 ]
-# if we got the synchronization wrong we'd be storing 0 in location 3
-+mem: storing 34 in location 3
++schedule: f1
++run: waiting for location 10 to reset
++schedule: f2
++schedule: waking up routine 1
++schedule: f1
++mem: storing 1 in location 30
 
 //: define the new state that all routines can be in
 
@@ -25,9 +29,8 @@ WAITING,
 :(before "End routine Fields")
 // only if state == WAITING
 int waiting_on_location;
-int old_value_of_waiting_location;
 :(before "End routine Constructor")
-waiting_on_location = old_value_of_waiting_location = 0;
+waiting_on_location = 0;
 
 :(before "End Mu Test Teardown")
 if (Passed && any_routines_waiting()) {
@@ -59,30 +62,61 @@ void dump_waiting_routines() {
   }
 }
 
-//: primitive recipe to put routines in that state
+//: Primitive recipe to put routines in that state.
+//: This primitive is also known elsewhere as compare-and-set (CAS). Used to
+//: build locks.
 
 :(before "End Primitive Recipe Declarations")
-WAIT_FOR_LOCATION,
+WAIT_FOR_RESET_THEN_SET,
 :(before "End Primitive Recipe Numbers")
-put(Recipe_ordinal, "wait-for-location", WAIT_FOR_LOCATION);
+put(Recipe_ordinal, "wait-for-reset-then-set", WAIT_FOR_RESET_THEN_SET);
 :(before "End Primitive Recipe Checks")
-case WAIT_FOR_LOCATION: {
+case WAIT_FOR_RESET_THEN_SET: {
   if (SIZE(inst.ingredients) != 1) {
-    raise << maybe(get(Recipe, r).name) << "'wait-for-location' requires exactly one ingredient, but got '" << inst.original_string << "'\n" << end();
+    raise << maybe(get(Recipe, r).name) << "'wait-for-reset-then-set' requires exactly one ingredient, but got '" << inst.original_string << "'\n" << end();
     break;
   }
   if (!is_mu_location(inst.ingredients.at(0))) {
-    raise << maybe(get(Recipe, r).name) << "'wait-for-location' requires a location ingredient, but got '" << inst.ingredients.at(0).original_string << "'\n" << end();
+    raise << maybe(get(Recipe, r).name) << "'wait-for-reset-then-set' requires a location ingredient, but got '" << inst.ingredients.at(0).original_string << "'\n" << end();
   }
   break;
 }
 :(before "End Primitive Recipe Implementations")
-case WAIT_FOR_LOCATION: {
-  int loc = ingredients.at(0).at(0);
+case WAIT_FOR_RESET_THEN_SET: {
+  int loc = static_cast<int>(ingredients.at(0).at(0));
+  trace(9998, "run") << "wait: *" << loc << " = " << get_or_insert(Memory, loc) << end();
+  if (get_or_insert(Memory, loc) == 0) {
+    trace(9998, "run") << "location " << loc << " is already 0; setting" << end();
+    put(Memory, loc, 1);
+    break;
+  }
+  trace(9998, "run") << "waiting for location " << loc << " to reset" << end();
   Current_routine->state = WAITING;
   Current_routine->waiting_on_location = loc;
-  Current_routine->old_value_of_waiting_location = get_or_insert(Memory, loc);
-  trace(9998, "run") << "waiting for location " << loc << " to change from " << no_scientific(get_or_insert(Memory, loc)) << end();
+  break;
+}
+
+//: Counterpart to unlock a lock.
+:(before "End Primitive Recipe Declarations")
+RESET,
+:(before "End Primitive Recipe Numbers")
+put(Recipe_ordinal, "reset", RESET);
+:(before "End Primitive Recipe Checks")
+case RESET: {
+  if (SIZE(inst.ingredients) != 1) {
+    raise << maybe(get(Recipe, r).name) << "'reset' requires exactly one ingredient, but got '" << inst.original_string << "'\n" << end();
+    break;
+  }
+  if (!is_mu_location(inst.ingredients.at(0))) {
+    raise << maybe(get(Recipe, r).name) << "'reset' requires a location ingredient, but got '" << inst.ingredients.at(0).original_string << "'\n" << end();
+  }
+  break;
+}
+:(before "End Primitive Recipe Implementations")
+case RESET: {
+  int loc = static_cast<int>(ingredients.at(0).at(0));
+  put(Memory, loc, 0);
+  trace(9998, "run") << "reset: *" << loc << " = " << get_or_insert(Memory, loc) << end();
   break;
 }
 
@@ -91,17 +125,18 @@ case WAIT_FOR_LOCATION: {
 :(before "End Scheduler State Transitions")
 for (int i = 0; i < SIZE(Routines); ++i) {
   if (Routines.at(i)->state != WAITING) continue;
-  if (Routines.at(i)->waiting_on_location &&
-      get_or_insert(Memory, Routines.at(i)->waiting_on_location) != Routines.at(i)->old_value_of_waiting_location) {
-    trace(9999, "schedule") << "waking up routine\n" << end();
+  int loc = Routines.at(i)->waiting_on_location;
+  if (loc && get_or_insert(Memory, loc) == 0) {
+    trace(9999, "schedule") << "waking up routine " << Routines.at(i)->id << end();
+    put(Memory, loc, 1);
     Routines.at(i)->state = RUNNING;
-    Routines.at(i)->waiting_on_location = Routines.at(i)->old_value_of_waiting_location = 0;
+    Routines.at(i)->waiting_on_location = 0;
   }
 }
 
-//: primitive to help compute locations to wait for
-//: only supports elements inside containers, no arrays or containers within
-//: containers yet.
+//: Primitive to help compute locations to wait on.
+//: Only supports elements immediately inside containers; no arrays or
+//: containers within containers yet.
 
 :(scenario get_location)
 def main [
diff --git a/075channel.mu b/075channel.mu
index 7bd371fe..7720ccdd 100644
--- a/075channel.mu
+++ b/075channel.mu
@@ -1,17 +1,14 @@
-# Mu synchronizes using channels rather than locks, like Erlang and Go.
-#
-# The two ends of a channel will usually belong to different routines, but
-# each end should (currently) only be used by a single one. Don't try to read
-# from or write to it from multiple routines at once.
+# Mu synchronizes between routines using channels rather than locks, like
+# Erlang and Go.
 #
 # Key properties of channels:
 #
-# a) Writing to a full channel or reading from an empty one will put the
-# current routine in 'waiting' state until the operation can be completed.
+#   a) Writing to a full channel or reading from an empty one will put the
+#   current routine in 'waiting' state until the operation can be completed.
 #
-# b) Writing to a channel implicitly performs a deep copy, to prevent
-# addresses from being shared between routines, thereby causing race
-# conditions.
+#   b) Writing to a channel implicitly performs a deep copy. This prevents
+#   addresses from being shared between routines, and therefore eliminates all
+#   possibility of race conditions.
 
 scenario channel [
   run [
@@ -27,9 +24,7 @@ scenario channel [
 ]
 
 container channel:_elem [
-  # To avoid locking, writer and reader will never write to the same location.
-  # So channels will include fields in pairs, one for the writer and one for the
-  # reader.
+  lock:boolean  # inefficient but simple: serialize all reads as well as writes
   first-full:number  # for write
   first-free:number  # for read
   # A circular buffer contains values from index first-full up to (but not
@@ -70,13 +65,23 @@ def write out:address:sink:_elem, val:_elem -> out:address:sink:_elem [
   assert out, [write to null channel]
   chan:address:channel:_elem <- get *out, chan:offset
   <channel-write-initial>
+  # block until lock is acquired AND queue has room
+  lock:location <- get-location *chan, lock:offset
+#?   $print [write], 10/newline
   {
-    # block if chan is full
-    full:boolean <- channel-full? chan
-    break-unless full
-    full-address:location <- get-location *chan, first-full:offset
-    wait-for-location full-address
+#?     $print [trying to acquire lock for writing], 10/newline
+    wait-for-reset-then-set lock
+#?     $print [lock acquired for writing], 10/newline
+    full?:boolean <- channel-full? chan
+    break-unless full?
+#?     $print [but channel is full; relinquishing lock], 10/newline
+    # channel is full; relinquish lock and give a reader the opportunity to
+    # create room on it
+    reset lock
+    switch  # avoid spinlocking
+    loop
   }
+#?   $print [performing write], 10/newline
   # store a deep copy of val
   circular-buffer:address:array:_elem <- get *chan, data:offset
   free:number <- get *chan, first-free:offset
@@ -93,6 +98,8 @@ def write out:address:sink:_elem, val:_elem -> out:address:sink:_elem [
   }
   # write back
   *chan <- put *chan, first-free:offset, free
+#?   $print [relinquishing lock after writing], 10/newline
+  reset lock
 ]
 
 def read in:address:source:_elem -> result:_elem, eof?:boolean, in:address:source:_elem [
@@ -101,13 +108,22 @@ def read in:address:source:_elem -> result:_elem, eof?:boolean, in:address:sourc
   assert in, [read on null channel]
   eof? <- copy 0/false  # default result
   chan:address:channel:_elem <- get *in, chan:offset
+  # block until lock is acquired AND queue has data
+  lock:location <- get-location *chan, lock:offset
+#?   $print [read], 10/newline
   {
-    # block if chan is empty
+#?     $print [trying to acquire lock for reading], 10/newline
+    wait-for-reset-then-set lock
+#?     $print [lock acquired for reading], 10/newline
     empty?:boolean <- channel-empty? chan
     break-unless empty?
+#?     $print [but channel is empty; relinquishing lock], 10/newline
+    # channel is empty; relinquish lock and give a writer the opportunity to
+    # add to it
+    reset lock
     <channel-read-empty>
-    free-address:location <- get-location *chan, first-free:offset
-    wait-for-location free-address
+    switch  # avoid spinlocking
+    loop
   }
   # pull result off
   full:number <- get *chan, first-full:offset
@@ -127,6 +143,8 @@ def read in:address:source:_elem -> result:_elem, eof?:boolean, in:address:sourc
   }
   # write back
   *chan <- put *chan, first-full:offset, full
+#?   $print [relinquishing lock after reading], 10/newline
+  reset lock
 ]
 
 def clear in:address:source:_elem -> in:address:source:_elem [