From f8e6e864d85fb28bc64ad6af3815a7254497d4d2 Mon Sep 17 00:00:00 2001 From: "Kartik K. Agaram" Date: Wed, 14 Sep 2016 19:25:55 -0700 Subject: 3351 - new but incomplete synchronization setup Previously our channels were based on an unconventional `wait-for-location` primitive that waits for a specific address to change its contents. This only works as long as a channel has a single reader and a single writer routine. To support multiple readers and writers we switch to a more conventional compare-and-set primitive. There's still a couple of failing scenarios, though -- the ones using `wait-for-routine-to-block`, because the new approach never blocks on an empty or full channel, just yields CPU for a time before polling. Hmm, how to fix this? --- 075channel.mu | 60 ++++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 39 insertions(+), 21 deletions(-) (limited to '075channel.mu') diff --git a/075channel.mu b/075channel.mu index 7bd371fe..7720ccdd 100644 --- a/075channel.mu +++ b/075channel.mu @@ -1,17 +1,14 @@ -# Mu synchronizes using channels rather than locks, like Erlang and Go. -# -# The two ends of a channel will usually belong to different routines, but -# each end should (currently) only be used by a single one. Don't try to read -# from or write to it from multiple routines at once. +# Mu synchronizes between routines using channels rather than locks, like +# Erlang and Go. # # Key properties of channels: # -# a) Writing to a full channel or reading from an empty one will put the -# current routine in 'waiting' state until the operation can be completed. +# a) Writing to a full channel or reading from an empty one will put the +# current routine in 'waiting' state until the operation can be completed. # -# b) Writing to a channel implicitly performs a deep copy, to prevent -# addresses from being shared between routines, thereby causing race -# conditions. +# b) Writing to a channel implicitly performs a deep copy. This prevents +# addresses from being shared between routines, and therefore eliminates all +# possibility of race conditions. scenario channel [ run [ @@ -27,9 +24,7 @@ scenario channel [ ] container channel:_elem [ - # To avoid locking, writer and reader will never write to the same location. - # So channels will include fields in pairs, one for the writer and one for the - # reader. + lock:boolean # inefficient but simple: serialize all reads as well as writes first-full:number # for write first-free:number # for read # A circular buffer contains values from index first-full up to (but not @@ -70,13 +65,23 @@ def write out:address:sink:_elem, val:_elem -> out:address:sink:_elem [ assert out, [write to null channel] chan:address:channel:_elem <- get *out, chan:offset + # block until lock is acquired AND queue has room + lock:location <- get-location *chan, lock:offset +#? $print [write], 10/newline { - # block if chan is full - full:boolean <- channel-full? chan - break-unless full - full-address:location <- get-location *chan, first-full:offset - wait-for-location full-address +#? $print [trying to acquire lock for writing], 10/newline + wait-for-reset-then-set lock +#? $print [lock acquired for writing], 10/newline + full?:boolean <- channel-full? chan + break-unless full? +#? $print [but channel is full; relinquishing lock], 10/newline + # channel is full; relinquish lock and give a reader the opportunity to + # create room on it + reset lock + switch # avoid spinlocking + loop } +#? $print [performing write], 10/newline # store a deep copy of val circular-buffer:address:array:_elem <- get *chan, data:offset free:number <- get *chan, first-free:offset @@ -93,6 +98,8 @@ def write out:address:sink:_elem, val:_elem -> out:address:sink:_elem [ } # write back *chan <- put *chan, first-free:offset, free +#? $print [relinquishing lock after writing], 10/newline + reset lock ] def read in:address:source:_elem -> result:_elem, eof?:boolean, in:address:source:_elem [ @@ -101,13 +108,22 @@ def read in:address:source:_elem -> result:_elem, eof?:boolean, in:address:sourc assert in, [read on null channel] eof? <- copy 0/false # default result chan:address:channel:_elem <- get *in, chan:offset + # block until lock is acquired AND queue has data + lock:location <- get-location *chan, lock:offset +#? $print [read], 10/newline { - # block if chan is empty +#? $print [trying to acquire lock for reading], 10/newline + wait-for-reset-then-set lock +#? $print [lock acquired for reading], 10/newline empty?:boolean <- channel-empty? chan break-unless empty? +#? $print [but channel is empty; relinquishing lock], 10/newline + # channel is empty; relinquish lock and give a writer the opportunity to + # add to it + reset lock - free-address:location <- get-location *chan, first-free:offset - wait-for-location free-address + switch # avoid spinlocking + loop } # pull result off full:number <- get *chan, first-full:offset @@ -127,6 +143,8 @@ def read in:address:source:_elem -> result:_elem, eof?:boolean, in:address:sourc } # write back *chan <- put *chan, first-full:offset, full +#? $print [relinquishing lock after reading], 10/newline + reset lock ] def clear in:address:source:_elem -> in:address:source:_elem [ -- cgit 1.4.1-2-gfad0