From 6e1eeeebfb453fa7c871869c19375ce60fbd7413 Mon Sep 17 00:00:00 2001 From: Kartik Agaram Date: Sat, 27 Jul 2019 16:01:55 -0700 Subject: 5485 - promote SubX to top-level --- archive/2.vm/075channel.mu | 510 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 510 insertions(+) create mode 100644 archive/2.vm/075channel.mu (limited to 'archive/2.vm/075channel.mu') diff --git a/archive/2.vm/075channel.mu b/archive/2.vm/075channel.mu new file mode 100644 index 00000000..c64492cb --- /dev/null +++ b/archive/2.vm/075channel.mu @@ -0,0 +1,510 @@ +# Mu synchronizes between routines using channels rather than locks, like +# Erlang and Go. +# +# The key property of channels: Writing to a full channel or reading from an +# empty one will put the current routine in 'waiting' state until the +# operation can be completed. +# +# Beware of addresses passed into channels. They can cause race conditions. + +scenario channel [ + run [ + local-scope + source:&:source:num, sink:&:sink:num <- new-channel 3/capacity + sink <- write sink, 34 + 10:num/raw, 11:bool/raw, source <- read source + ] + memory-should-contain [ + 10 <- 34 + 11 <- 0 # read was successful + ] +] + +container channel:_elem [ + lock:bool # inefficient but simple: serialize all reads as well as writes + first-full:num # for write + first-free:num # for read + # A circular buffer contains values from index first-full up to (but not + # including) index first-free. The reader always modifies it at first-full, + # while the writer always modifies it at first-free. + data:&:@:_elem +] + +# Since channels have two ends, and since it's an error to use either end from +# multiple routines, let's distinguish the ends. + +container source:_elem [ + chan:&:channel:_elem +] + +container sink:_elem [ + chan:&:channel:_elem +] + +def new-channel capacity:num -> in:&:source:_elem, out:&:sink:_elem [ + local-scope + load-inputs + result:&:channel:_elem <- new {(channel _elem): type} + *result <- put *result, first-full:offset, 0 + *result <- put *result, first-free:offset, 0 + capacity <- add capacity, 1 # unused slot for 'full?' below + data:&:@:_elem <- new _elem:type, capacity + *result <- put *result, data:offset, data + in <- new {(source _elem): type} + *in <- put *in, chan:offset, result + out <- new {(sink _elem): type} + *out <- put *out, chan:offset, result +] + +# write a value to a channel +def write out:&:sink:_elem, val:_elem -> out:&:sink:_elem [ + local-scope + load-inputs + assert out, [write to null channel] + chan:&:channel:_elem <- get *out, chan:offset + + # block until lock is acquired AND queue has room + lock:location <- get-location *chan, lock:offset +#? $print [write], 10/newline + { +#? $print [trying to acquire lock for writing], 10/newline + wait-for-reset-then-set lock +#? $print [lock acquired for writing], 10/newline + full?:bool <- channel-full? chan + break-unless full? +#? $print [but channel is full; relinquishing lock], 10/newline + # channel is full; relinquish lock and give a reader the opportunity to + # create room on it + reset lock + current-routine-is-blocked + switch # avoid spinlocking + loop + } + current-routine-is-unblocked +#? $print [performing write], 10/newline + # store a deep copy of val + circular-buffer:&:@:_elem <- get *chan, data:offset + free:num <- get *chan, first-free:offset + *circular-buffer <- put-index *circular-buffer, free, val + # mark its slot as filled + free <- add free, 1 + { + # wrap free around to 0 if necessary + len:num <- length *circular-buffer + at-end?:bool <- greater-or-equal free, len + break-unless at-end? + free <- copy 0 + } + # write back + *chan <- put *chan, first-free:offset, free +#? $print [relinquishing lock after writing], 10/newline + reset lock +] + +# read a value from a channel +def read in:&:source:_elem -> result:_elem, eof?:bool, in:&:source:_elem [ + local-scope + load-inputs + assert in, [read on null channel] + eof? <- copy false # default result + chan:&:channel:_elem <- get *in, chan:offset + # block until lock is acquired AND queue has data + lock:location <- get-location *chan, lock:offset +#? $print [read], 10/newline + { +#? $print [trying to acquire lock for reading], 10/newline + wait-for-reset-then-set lock +#? $print [lock acquired for reading], 10/newline + empty?:bool <- channel-empty? chan + break-unless empty? +#? $print [but channel is empty; relinquishing lock], 10/newline + # channel is empty; relinquish lock and give a writer the opportunity to + # add to it + reset lock + current-routine-is-blocked + + switch # avoid spinlocking + loop + } + current-routine-is-unblocked + # pull result off + full:num <- get *chan, first-full:offset + circular-buffer:&:@:_elem <- get *chan, data:offset + result <- index *circular-buffer, full + # clear the slot + empty:&:_elem <- new _elem:type + *circular-buffer <- put-index *circular-buffer, full, *empty + # mark its slot as empty + full <- add full, 1 + { + # wrap full around to 0 if necessary + len:num <- length *circular-buffer + at-end?:bool <- greater-or-equal full, len + break-unless at-end? + full <- copy 0 + } + # write back + *chan <- put *chan, first-full:offset, full +#? $print [relinquishing lock after reading], 10/newline + reset lock +] + +# todo: create a notion of iterator and iterable so we can read/write whole +# aggregates (arrays, lists, ..) of _elems at once. + +scenario channel-initialization [ + run [ + local-scope + source:&:source:num <- new-channel 3/capacity + chan:&:channel:num <- get *source, chan:offset + 10:num/raw <- get *chan, first-full:offset + 11:num/raw <- get *chan, first-free:offset + ] + memory-should-contain [ + 10 <- 0 # first-full + 11 <- 0 # first-free + ] +] + +scenario channel-write-increments-free [ + local-scope + _, sink:&:sink:num <- new-channel 3/capacity + run [ + sink <- write sink, 34 + chan:&:channel:num <- get *sink, chan:offset + 10:num/raw <- get *chan, first-full:offset + 11:num/raw <- get *chan, first-free:offset + ] + memory-should-contain [ + 10 <- 0 # first-full + 11 <- 1 # first-free + ] +] + +scenario channel-read-increments-full [ + local-scope + source:&:source:num, sink:&:sink:num <- new-channel 3/capacity + sink <- write sink, 34 + run [ + _, _, source <- read source + chan:&:channel:num <- get *source, chan:offset + 10:num/raw <- get *chan, first-full:offset + 11:num/raw <- get *chan, first-free:offset + ] + memory-should-contain [ + 10 <- 1 # first-full + 11 <- 1 # first-free + ] +] + +scenario channel-wrap [ + local-scope + # channel with just 1 slot + source:&:source:num, sink:&:sink:num <- new-channel 1/capacity + chan:&:channel:num <- get *source, chan:offset + # write and read a value + sink <- write sink, 34 + _, _, source <- read source + run [ + # first-free will now be 1 + 10:num/raw <- get *chan, first-free:offset + 11:num/raw <- get *chan, first-free:offset + # write second value, verify that first-free wraps + sink <- write sink, 34 + 20:num/raw <- get *chan, first-free:offset + # read second value, verify that first-full wraps + _, _, source <- read source + 30:num/raw <- get *chan, first-full:offset + ] + memory-should-contain [ + 10 <- 1 # first-free after first write + 11 <- 1 # first-full after first read + 20 <- 0 # first-free after second write, wrapped + 30 <- 0 # first-full after second read, wrapped + ] +] + +scenario channel-new-empty-not-full [ + run [ + local-scope + source:&:source:num <- new-channel 3/capacity + chan:&:channel:num <- get *source, chan:offset + 10:bool/raw <- channel-empty? chan + 11:bool/raw <- channel-full? chan + ] + memory-should-contain [ + 10 <- 1 # empty? + 11 <- 0 # full? + ] +] + +scenario channel-write-not-empty [ + local-scope + source:&:source:num, sink:&:sink:num <- new-channel 3/capacity + chan:&:channel:num <- get *source, chan:offset + run [ + sink <- write sink, 34 + 10:bool/raw <- channel-empty? chan + 11:bool/raw <- channel-full? chan + ] + memory-should-contain [ + 10 <- 0 # empty? + 11 <- 0 # full? + ] +] + +scenario channel-write-full [ + local-scope + source:&:source:num, sink:&:sink:num <- new-channel 1/capacity + chan:&:channel:num <- get *source, chan:offset + run [ + sink <- write sink, 34 + 10:bool/raw <- channel-empty? chan + 11:bool/raw <- channel-full? chan + ] + memory-should-contain [ + 10 <- 0 # empty? + 11 <- 1 # full? + ] +] + +scenario channel-read-not-full [ + local-scope + source:&:source:num, sink:&:sink:num <- new-channel 1/capacity + chan:&:channel:num <- get *source, chan:offset + sink <- write sink, 34 + run [ + _, _, source <- read source + 10:bool/raw <- channel-empty? chan + 11:bool/raw <- channel-full? chan + ] + memory-should-contain [ + 10 <- 1 # empty? + 11 <- 0 # full? + ] +] + +scenario channel-clear [ + local-scope + # create a channel with a few items + source:&:source:num, sink:&:sink:num <- new-channel 3/capacity + chan:&:channel:num <- get *sink, chan:offset + write sink, 30 + write sink, 31 + write sink, 32 + run [ + clear source + 10:bool/raw <- channel-empty? chan + ] + memory-should-contain [ + 10 <- 1 # after the call to 'clear', the channel should be empty + ] +] + +def clear in:&:source:_elem -> in:&:source:_elem [ + local-scope + load-inputs + chan:&:channel:_elem <- get *in, chan:offset + { + empty?:bool <- channel-empty? chan + break-if empty? + _, _, in <- read in + loop + } +] + +## cancelling channels + +# every channel comes with a boolean signifying if it's been closed +# initially this boolean is false +container channel:_elem [ + closed?:bool +] + +# a channel can be closed from either the source or the sink +# both routines can modify the 'closed?' bit, but they can only ever set it, so this is a benign race +def close x:&:source:_elem -> x:&:source:_elem [ + local-scope + load-inputs + chan:&:channel:_elem <- get *x, chan:offset + *chan <- put *chan, closed?:offset, true +] +def close x:&:sink:_elem -> x:&:sink:_elem [ + local-scope + load-inputs + chan:&:channel:_elem <- get *x, chan:offset + *chan <- put *chan, closed?:offset, true +] + +# once a channel is closed from one side, no further operations are expected from that side +# if a channel is closed for reading, +# no further writes will be let through +# if a channel is closed for writing, +# future reads continue until the channel empties, +# then the channel is also closed for reading +after [ + closed?:bool <- get *chan, closed?:offset + return-if closed? +] +after [ + closed?:bool <- get *chan, closed?:offset + { + break-unless closed? + empty-result:&:_elem <- new _elem:type + current-routine-is-unblocked + return *empty-result, true + } +] + +## helpers + +# An empty channel has first-free and first-full both at the same value. +def channel-empty? chan:&:channel:_elem -> result:bool [ + local-scope + load-inputs + # return chan.first-full == chan.first-free + full:num <- get *chan, first-full:offset + free:num <- get *chan, first-free:offset + result <- equal full, free +] + +# A full channel has first-free just before first-full, wasting one slot. +# (Other alternatives: https://www.snellman.net/blog/archive/2016-12-13-ring-buffers) +def channel-full? chan:&:channel:_elem -> result:bool [ + local-scope + load-inputs + # tmp = chan.first-free + 1 + tmp:num <- get *chan, first-free:offset + tmp <- add tmp, 1 + { + # if tmp == chan.capacity, tmp = 0 + len:num <- capacity chan + at-end?:bool <- greater-or-equal tmp, len + break-unless at-end? + tmp <- copy 0 + } + # return chan.first-full == tmp + full:num <- get *chan, first-full:offset + result <- equal full, tmp +] + +def capacity chan:&:channel:_elem -> result:num [ + local-scope + load-inputs + q:&:@:_elem <- get *chan, data:offset + result <- length *q +] + +## helpers for channels of characters in particular + +def buffer-lines in:&:source:char, buffered-out:&:sink:char -> buffered-out:&:sink:char, in:&:source:char [ + local-scope + load-inputs + # repeat forever + eof?:bool <- copy false + { + line:&:buffer:char <- new-buffer 30 + # read characters from 'in' until newline, copy into line + { + +next-character + c:char, eof?:bool, in <- read in + break-if eof? + # drop a character on backspace + { + # special-case: if it's a backspace + backspace?:bool <- equal c, 8 + break-unless backspace? + # drop previous character + { + buffer-length:num <- get *line, length:offset + buffer-empty?:bool <- equal buffer-length, 0 + break-if buffer-empty? + buffer-length <- subtract buffer-length, 1 + *line <- put *line, length:offset, buffer-length + } + # and don't append this one + loop +next-character + } + # append anything else + line <- append line, c + line-done?:bool <- equal c, 10/newline + break-if line-done? + loop + } + # copy line into 'buffered-out' + i:num <- copy 0 + line-contents:text <- get *line, data:offset + max:num <- get *line, length:offset + { + done?:bool <- greater-or-equal i, max + break-if done? + c:char <- index *line-contents, i + buffered-out <- write buffered-out, c + i <- add i, 1 + loop + } + { + break-unless eof? + buffered-out <- close buffered-out + return + } + loop + } +] + +scenario buffer-lines-blocks-until-newline [ + run [ + local-scope + source:&:source:char, sink:&:sink:char <- new-channel 10/capacity + _, buffered-stdin:&:sink:char/buffered-stdin <- new-channel 10/capacity + buffered-chan:&:channel:char <- get *buffered-stdin, chan:offset + empty?:bool <- channel-empty? buffered-chan + assert empty?, [ +F buffer-lines-blocks-until-newline: channel should be empty after init] + # buffer stdin into buffered-stdin, try to read from buffered-stdin + buffer-routine:num <- start-running buffer-lines, source, buffered-stdin + wait-for-routine-to-block buffer-routine + empty? <- channel-empty? buffered-chan + assert empty?:bool, [ +F buffer-lines-blocks-until-newline: channel should be empty after buffer-lines bring-up] + # write 'a' + sink <- write sink, 97/a + restart buffer-routine + wait-for-routine-to-block buffer-routine + empty? <- channel-empty? buffered-chan + assert empty?:bool, [ +F buffer-lines-blocks-until-newline: channel should be empty after writing 'a'] + # write 'b' + sink <- write sink, 98/b + restart buffer-routine + wait-for-routine-to-block buffer-routine + empty? <- channel-empty? buffered-chan + assert empty?:bool, [ +F buffer-lines-blocks-until-newline: channel should be empty after writing 'b'] + # write newline + sink <- write sink, 10/newline + restart buffer-routine + wait-for-routine-to-block buffer-routine + empty? <- channel-empty? buffered-chan + data-emitted?:bool <- not empty? + assert data-emitted?, [ +F buffer-lines-blocks-until-newline: channel should contain data after writing newline] + trace 1, [test], [reached end] + ] + trace-should-contain [ + test: reached end + ] +] + +def drain source:&:source:char -> result:text, source:&:source:char [ + local-scope + load-inputs + buf:&:buffer:char <- new-buffer 30 + { + c:char, done?:bool <- read source + break-if done? + buf <- append buf, c + loop + } + result <- buffer-to-array buf +] -- cgit 1.4.1-2-gfad0