diff options
-rw-r--r-- | 073wait.cc | 89 | ||||
-rw-r--r-- | 075channel.mu | 60 |
2 files changed, 101 insertions, 48 deletions
diff --git a/073wait.cc b/073wait.cc index a642c86a..0d7c3e4e 100644 --- a/073wait.cc +++ b/073wait.cc @@ -5,18 +5,22 @@ :(scenario wait_for_location) def f1 [ - 1:number <- copy 0 + 10:number <- copy 34 start-running f2 - 2:location <- copy 1/unsafe - wait-for-location 2:location - # now wait for f2 to run and modify location 1 before using its value - 3:number <- copy 1:number + 20:location <- copy 10/unsafe + wait-for-reset-then-set 20:location + # wait for f2 to run and reset location 1 + 30:number <- copy 10:number ] def f2 [ - 1:number <- copy 34 + 10:location <- copy 0/unsafe ] -# if we got the synchronization wrong we'd be storing 0 in location 3 -+mem: storing 34 in location 3 ++schedule: f1 ++run: waiting for location 10 to reset ++schedule: f2 ++schedule: waking up routine 1 ++schedule: f1 ++mem: storing 1 in location 30 //: define the new state that all routines can be in @@ -25,9 +29,8 @@ WAITING, :(before "End routine Fields") // only if state == WAITING int waiting_on_location; -int old_value_of_waiting_location; :(before "End routine Constructor") -waiting_on_location = old_value_of_waiting_location = 0; +waiting_on_location = 0; :(before "End Mu Test Teardown") if (Passed && any_routines_waiting()) { @@ -59,30 +62,61 @@ void dump_waiting_routines() { } } -//: primitive recipe to put routines in that state +//: Primitive recipe to put routines in that state. +//: This primitive is also known elsewhere as compare-and-set (CAS). Used to +//: build locks. :(before "End Primitive Recipe Declarations") -WAIT_FOR_LOCATION, +WAIT_FOR_RESET_THEN_SET, :(before "End Primitive Recipe Numbers") -put(Recipe_ordinal, "wait-for-location", WAIT_FOR_LOCATION); +put(Recipe_ordinal, "wait-for-reset-then-set", WAIT_FOR_RESET_THEN_SET); :(before "End Primitive Recipe Checks") -case WAIT_FOR_LOCATION: { +case WAIT_FOR_RESET_THEN_SET: { if (SIZE(inst.ingredients) != 1) { - raise << maybe(get(Recipe, r).name) << "'wait-for-location' requires exactly one ingredient, but got '" << inst.original_string << "'\n" << end(); + raise << maybe(get(Recipe, r).name) << "'wait-for-reset-then-set' requires exactly one ingredient, but got '" << inst.original_string << "'\n" << end(); break; } if (!is_mu_location(inst.ingredients.at(0))) { - raise << maybe(get(Recipe, r).name) << "'wait-for-location' requires a location ingredient, but got '" << inst.ingredients.at(0).original_string << "'\n" << end(); + raise << maybe(get(Recipe, r).name) << "'wait-for-reset-then-set' requires a location ingredient, but got '" << inst.ingredients.at(0).original_string << "'\n" << end(); } break; } :(before "End Primitive Recipe Implementations") -case WAIT_FOR_LOCATION: { - int loc = ingredients.at(0).at(0); +case WAIT_FOR_RESET_THEN_SET: { + int loc = static_cast<int>(ingredients.at(0).at(0)); + trace(9998, "run") << "wait: *" << loc << " = " << get_or_insert(Memory, loc) << end(); + if (get_or_insert(Memory, loc) == 0) { + trace(9998, "run") << "location " << loc << " is already 0; setting" << end(); + put(Memory, loc, 1); + break; + } + trace(9998, "run") << "waiting for location " << loc << " to reset" << end(); Current_routine->state = WAITING; Current_routine->waiting_on_location = loc; - Current_routine->old_value_of_waiting_location = get_or_insert(Memory, loc); - trace(9998, "run") << "waiting for location " << loc << " to change from " << no_scientific(get_or_insert(Memory, loc)) << end(); + break; +} + +//: Counterpart to unlock a lock. +:(before "End Primitive Recipe Declarations") +RESET, +:(before "End Primitive Recipe Numbers") +put(Recipe_ordinal, "reset", RESET); +:(before "End Primitive Recipe Checks") +case RESET: { + if (SIZE(inst.ingredients) != 1) { + raise << maybe(get(Recipe, r).name) << "'reset' requires exactly one ingredient, but got '" << inst.original_string << "'\n" << end(); + break; + } + if (!is_mu_location(inst.ingredients.at(0))) { + raise << maybe(get(Recipe, r).name) << "'reset' requires a location ingredient, but got '" << inst.ingredients.at(0).original_string << "'\n" << end(); + } + break; +} +:(before "End Primitive Recipe Implementations") +case RESET: { + int loc = static_cast<int>(ingredients.at(0).at(0)); + put(Memory, loc, 0); + trace(9998, "run") << "reset: *" << loc << " = " << get_or_insert(Memory, loc) << end(); break; } @@ -91,17 +125,18 @@ case WAIT_FOR_LOCATION: { :(before "End Scheduler State Transitions") for (int i = 0; i < SIZE(Routines); ++i) { if (Routines.at(i)->state != WAITING) continue; - if (Routines.at(i)->waiting_on_location && - get_or_insert(Memory, Routines.at(i)->waiting_on_location) != Routines.at(i)->old_value_of_waiting_location) { - trace(9999, "schedule") << "waking up routine\n" << end(); + int loc = Routines.at(i)->waiting_on_location; + if (loc && get_or_insert(Memory, loc) == 0) { + trace(9999, "schedule") << "waking up routine " << Routines.at(i)->id << end(); + put(Memory, loc, 1); Routines.at(i)->state = RUNNING; - Routines.at(i)->waiting_on_location = Routines.at(i)->old_value_of_waiting_location = 0; + Routines.at(i)->waiting_on_location = 0; } } -//: primitive to help compute locations to wait for -//: only supports elements inside containers, no arrays or containers within -//: containers yet. +//: Primitive to help compute locations to wait on. +//: Only supports elements immediately inside containers; no arrays or +//: containers within containers yet. :(scenario get_location) def main [ diff --git a/075channel.mu b/075channel.mu index 7bd371fe..7720ccdd 100644 --- a/075channel.mu +++ b/075channel.mu @@ -1,17 +1,14 @@ -# Mu synchronizes using channels rather than locks, like Erlang and Go. -# -# The two ends of a channel will usually belong to different routines, but -# each end should (currently) only be used by a single one. Don't try to read -# from or write to it from multiple routines at once. +# Mu synchronizes between routines using channels rather than locks, like +# Erlang and Go. # # Key properties of channels: # -# a) Writing to a full channel or reading from an empty one will put the -# current routine in 'waiting' state until the operation can be completed. +# a) Writing to a full channel or reading from an empty one will put the +# current routine in 'waiting' state until the operation can be completed. # -# b) Writing to a channel implicitly performs a deep copy, to prevent -# addresses from being shared between routines, thereby causing race -# conditions. +# b) Writing to a channel implicitly performs a deep copy. This prevents +# addresses from being shared between routines, and therefore eliminates all +# possibility of race conditions. scenario channel [ run [ @@ -27,9 +24,7 @@ scenario channel [ ] container channel:_elem [ - # To avoid locking, writer and reader will never write to the same location. - # So channels will include fields in pairs, one for the writer and one for the - # reader. + lock:boolean # inefficient but simple: serialize all reads as well as writes first-full:number # for write first-free:number # for read # A circular buffer contains values from index first-full up to (but not @@ -70,13 +65,23 @@ def write out:address:sink:_elem, val:_elem -> out:address:sink:_elem [ assert out, [write to null channel] chan:address:channel:_elem <- get *out, chan:offset <channel-write-initial> + # block until lock is acquired AND queue has room + lock:location <- get-location *chan, lock:offset +#? $print [write], 10/newline { - # block if chan is full - full:boolean <- channel-full? chan - break-unless full - full-address:location <- get-location *chan, first-full:offset - wait-for-location full-address +#? $print [trying to acquire lock for writing], 10/newline + wait-for-reset-then-set lock +#? $print [lock acquired for writing], 10/newline + full?:boolean <- channel-full? chan + break-unless full? +#? $print [but channel is full; relinquishing lock], 10/newline + # channel is full; relinquish lock and give a reader the opportunity to + # create room on it + reset lock + switch # avoid spinlocking + loop } +#? $print [performing write], 10/newline # store a deep copy of val circular-buffer:address:array:_elem <- get *chan, data:offset free:number <- get *chan, first-free:offset @@ -93,6 +98,8 @@ def write out:address:sink:_elem, val:_elem -> out:address:sink:_elem [ } # write back *chan <- put *chan, first-free:offset, free +#? $print [relinquishing lock after writing], 10/newline + reset lock ] def read in:address:source:_elem -> result:_elem, eof?:boolean, in:address:source:_elem [ @@ -101,13 +108,22 @@ def read in:address:source:_elem -> result:_elem, eof?:boolean, in:address:sourc assert in, [read on null channel] eof? <- copy 0/false # default result chan:address:channel:_elem <- get *in, chan:offset + # block until lock is acquired AND queue has data + lock:location <- get-location *chan, lock:offset +#? $print [read], 10/newline { - # block if chan is empty +#? $print [trying to acquire lock for reading], 10/newline + wait-for-reset-then-set lock +#? $print [lock acquired for reading], 10/newline empty?:boolean <- channel-empty? chan break-unless empty? +#? $print [but channel is empty; relinquishing lock], 10/newline + # channel is empty; relinquish lock and give a writer the opportunity to + # add to it + reset lock <channel-read-empty> - free-address:location <- get-location *chan, first-free:offset - wait-for-location free-address + switch # avoid spinlocking + loop } # pull result off full:number <- get *chan, first-full:offset @@ -127,6 +143,8 @@ def read in:address:source:_elem -> result:_elem, eof?:boolean, in:address:sourc } # write back *chan <- put *chan, first-full:offset, full +#? $print [relinquishing lock after reading], 10/newline + reset lock ] def clear in:address:source:_elem -> in:address:source:_elem [ |