diff options
author | Kartik Agaram <vc@akkartik.com> | 2020-01-01 17:04:37 -0800 |
---|---|---|
committer | Kartik Agaram <vc@akkartik.com> | 2020-01-01 17:04:37 -0800 |
commit | 2a4088119cf41175457414dfa59bd4064b8f0562 (patch) | |
tree | 64fe184e399f9870ebd481a90eec34d51e5dff68 /archive/2.vm/075channel.mu | |
parent | 23fd294d85959c6b476bcdc35ed6ad508cc99b8f (diff) | |
download | mu-2a4088119cf41175457414dfa59bd4064b8f0562.tar.gz |
5852
Diffstat (limited to 'archive/2.vm/075channel.mu')
-rw-r--r-- | archive/2.vm/075channel.mu | 510 |
1 files changed, 0 insertions, 510 deletions
diff --git a/archive/2.vm/075channel.mu b/archive/2.vm/075channel.mu deleted file mode 100644 index c64492cb..00000000 --- a/archive/2.vm/075channel.mu +++ /dev/null @@ -1,510 +0,0 @@ -# Mu synchronizes between routines using channels rather than locks, like -# Erlang and Go. -# -# The key property of channels: Writing to a full channel or reading from an -# empty one will put the current routine in 'waiting' state until the -# operation can be completed. -# -# Beware of addresses passed into channels. They can cause race conditions. - -scenario channel [ - run [ - local-scope - source:&:source:num, sink:&:sink:num <- new-channel 3/capacity - sink <- write sink, 34 - 10:num/raw, 11:bool/raw, source <- read source - ] - memory-should-contain [ - 10 <- 34 - 11 <- 0 # read was successful - ] -] - -container channel:_elem [ - lock:bool # inefficient but simple: serialize all reads as well as writes - first-full:num # for write - first-free:num # for read - # A circular buffer contains values from index first-full up to (but not - # including) index first-free. The reader always modifies it at first-full, - # while the writer always modifies it at first-free. - data:&:@:_elem -] - -# Since channels have two ends, and since it's an error to use either end from -# multiple routines, let's distinguish the ends. - -container source:_elem [ - chan:&:channel:_elem -] - -container sink:_elem [ - chan:&:channel:_elem -] - -def new-channel capacity:num -> in:&:source:_elem, out:&:sink:_elem [ - local-scope - load-inputs - result:&:channel:_elem <- new {(channel _elem): type} - *result <- put *result, first-full:offset, 0 - *result <- put *result, first-free:offset, 0 - capacity <- add capacity, 1 # unused slot for 'full?' below - data:&:@:_elem <- new _elem:type, capacity - *result <- put *result, data:offset, data - in <- new {(source _elem): type} - *in <- put *in, chan:offset, result - out <- new {(sink _elem): type} - *out <- put *out, chan:offset, result -] - -# write a value to a channel -def write out:&:sink:_elem, val:_elem -> out:&:sink:_elem [ - local-scope - load-inputs - assert out, [write to null channel] - chan:&:channel:_elem <- get *out, chan:offset - <channel-write-initial> - # block until lock is acquired AND queue has room - lock:location <- get-location *chan, lock:offset -#? $print [write], 10/newline - { -#? $print [trying to acquire lock for writing], 10/newline - wait-for-reset-then-set lock -#? $print [lock acquired for writing], 10/newline - full?:bool <- channel-full? chan - break-unless full? -#? $print [but channel is full; relinquishing lock], 10/newline - # channel is full; relinquish lock and give a reader the opportunity to - # create room on it - reset lock - current-routine-is-blocked - switch # avoid spinlocking - loop - } - current-routine-is-unblocked -#? $print [performing write], 10/newline - # store a deep copy of val - circular-buffer:&:@:_elem <- get *chan, data:offset - free:num <- get *chan, first-free:offset - *circular-buffer <- put-index *circular-buffer, free, val - # mark its slot as filled - free <- add free, 1 - { - # wrap free around to 0 if necessary - len:num <- length *circular-buffer - at-end?:bool <- greater-or-equal free, len - break-unless at-end? - free <- copy 0 - } - # write back - *chan <- put *chan, first-free:offset, free -#? $print [relinquishing lock after writing], 10/newline - reset lock -] - -# read a value from a channel -def read in:&:source:_elem -> result:_elem, eof?:bool, in:&:source:_elem [ - local-scope - load-inputs - assert in, [read on null channel] - eof? <- copy false # default result - chan:&:channel:_elem <- get *in, chan:offset - # block until lock is acquired AND queue has data - lock:location <- get-location *chan, lock:offset -#? $print [read], 10/newline - { -#? $print [trying to acquire lock for reading], 10/newline - wait-for-reset-then-set lock -#? $print [lock acquired for reading], 10/newline - empty?:bool <- channel-empty? chan - break-unless empty? -#? $print [but channel is empty; relinquishing lock], 10/newline - # channel is empty; relinquish lock and give a writer the opportunity to - # add to it - reset lock - current-routine-is-blocked - <channel-read-empty> - switch # avoid spinlocking - loop - } - current-routine-is-unblocked - # pull result off - full:num <- get *chan, first-full:offset - circular-buffer:&:@:_elem <- get *chan, data:offset - result <- index *circular-buffer, full - # clear the slot - empty:&:_elem <- new _elem:type - *circular-buffer <- put-index *circular-buffer, full, *empty - # mark its slot as empty - full <- add full, 1 - { - # wrap full around to 0 if necessary - len:num <- length *circular-buffer - at-end?:bool <- greater-or-equal full, len - break-unless at-end? - full <- copy 0 - } - # write back - *chan <- put *chan, first-full:offset, full -#? $print [relinquishing lock after reading], 10/newline - reset lock -] - -# todo: create a notion of iterator and iterable so we can read/write whole -# aggregates (arrays, lists, ..) of _elems at once. - -scenario channel-initialization [ - run [ - local-scope - source:&:source:num <- new-channel 3/capacity - chan:&:channel:num <- get *source, chan:offset - 10:num/raw <- get *chan, first-full:offset - 11:num/raw <- get *chan, first-free:offset - ] - memory-should-contain [ - 10 <- 0 # first-full - 11 <- 0 # first-free - ] -] - -scenario channel-write-increments-free [ - local-scope - _, sink:&:sink:num <- new-channel 3/capacity - run [ - sink <- write sink, 34 - chan:&:channel:num <- get *sink, chan:offset - 10:num/raw <- get *chan, first-full:offset - 11:num/raw <- get *chan, first-free:offset - ] - memory-should-contain [ - 10 <- 0 # first-full - 11 <- 1 # first-free - ] -] - -scenario channel-read-increments-full [ - local-scope - source:&:source:num, sink:&:sink:num <- new-channel 3/capacity - sink <- write sink, 34 - run [ - _, _, source <- read source - chan:&:channel:num <- get *source, chan:offset - 10:num/raw <- get *chan, first-full:offset - 11:num/raw <- get *chan, first-free:offset - ] - memory-should-contain [ - 10 <- 1 # first-full - 11 <- 1 # first-free - ] -] - -scenario channel-wrap [ - local-scope - # channel with just 1 slot - source:&:source:num, sink:&:sink:num <- new-channel 1/capacity - chan:&:channel:num <- get *source, chan:offset - # write and read a value - sink <- write sink, 34 - _, _, source <- read source - run [ - # first-free will now be 1 - 10:num/raw <- get *chan, first-free:offset - 11:num/raw <- get *chan, first-free:offset - # write second value, verify that first-free wraps - sink <- write sink, 34 - 20:num/raw <- get *chan, first-free:offset - # read second value, verify that first-full wraps - _, _, source <- read source - 30:num/raw <- get *chan, first-full:offset - ] - memory-should-contain [ - 10 <- 1 # first-free after first write - 11 <- 1 # first-full after first read - 20 <- 0 # first-free after second write, wrapped - 30 <- 0 # first-full after second read, wrapped - ] -] - -scenario channel-new-empty-not-full [ - run [ - local-scope - source:&:source:num <- new-channel 3/capacity - chan:&:channel:num <- get *source, chan:offset - 10:bool/raw <- channel-empty? chan - 11:bool/raw <- channel-full? chan - ] - memory-should-contain [ - 10 <- 1 # empty? - 11 <- 0 # full? - ] -] - -scenario channel-write-not-empty [ - local-scope - source:&:source:num, sink:&:sink:num <- new-channel 3/capacity - chan:&:channel:num <- get *source, chan:offset - run [ - sink <- write sink, 34 - 10:bool/raw <- channel-empty? chan - 11:bool/raw <- channel-full? chan - ] - memory-should-contain [ - 10 <- 0 # empty? - 11 <- 0 # full? - ] -] - -scenario channel-write-full [ - local-scope - source:&:source:num, sink:&:sink:num <- new-channel 1/capacity - chan:&:channel:num <- get *source, chan:offset - run [ - sink <- write sink, 34 - 10:bool/raw <- channel-empty? chan - 11:bool/raw <- channel-full? chan - ] - memory-should-contain [ - 10 <- 0 # empty? - 11 <- 1 # full? - ] -] - -scenario channel-read-not-full [ - local-scope - source:&:source:num, sink:&:sink:num <- new-channel 1/capacity - chan:&:channel:num <- get *source, chan:offset - sink <- write sink, 34 - run [ - _, _, source <- read source - 10:bool/raw <- channel-empty? chan - 11:bool/raw <- channel-full? chan - ] - memory-should-contain [ - 10 <- 1 # empty? - 11 <- 0 # full? - ] -] - -scenario channel-clear [ - local-scope - # create a channel with a few items - source:&:source:num, sink:&:sink:num <- new-channel 3/capacity - chan:&:channel:num <- get *sink, chan:offset - write sink, 30 - write sink, 31 - write sink, 32 - run [ - clear source - 10:bool/raw <- channel-empty? chan - ] - memory-should-contain [ - 10 <- 1 # after the call to 'clear', the channel should be empty - ] -] - -def clear in:&:source:_elem -> in:&:source:_elem [ - local-scope - load-inputs - chan:&:channel:_elem <- get *in, chan:offset - { - empty?:bool <- channel-empty? chan - break-if empty? - _, _, in <- read in - loop - } -] - -## cancelling channels - -# every channel comes with a boolean signifying if it's been closed -# initially this boolean is false -container channel:_elem [ - closed?:bool -] - -# a channel can be closed from either the source or the sink -# both routines can modify the 'closed?' bit, but they can only ever set it, so this is a benign race -def close x:&:source:_elem -> x:&:source:_elem [ - local-scope - load-inputs - chan:&:channel:_elem <- get *x, chan:offset - *chan <- put *chan, closed?:offset, true -] -def close x:&:sink:_elem -> x:&:sink:_elem [ - local-scope - load-inputs - chan:&:channel:_elem <- get *x, chan:offset - *chan <- put *chan, closed?:offset, true -] - -# once a channel is closed from one side, no further operations are expected from that side -# if a channel is closed for reading, -# no further writes will be let through -# if a channel is closed for writing, -# future reads continue until the channel empties, -# then the channel is also closed for reading -after <channel-write-initial> [ - closed?:bool <- get *chan, closed?:offset - return-if closed? -] -after <channel-read-empty> [ - closed?:bool <- get *chan, closed?:offset - { - break-unless closed? - empty-result:&:_elem <- new _elem:type - current-routine-is-unblocked - return *empty-result, true - } -] - -## helpers - -# An empty channel has first-free and first-full both at the same value. -def channel-empty? chan:&:channel:_elem -> result:bool [ - local-scope - load-inputs - # return chan.first-full == chan.first-free - full:num <- get *chan, first-full:offset - free:num <- get *chan, first-free:offset - result <- equal full, free -] - -# A full channel has first-free just before first-full, wasting one slot. -# (Other alternatives: https://www.snellman.net/blog/archive/2016-12-13-ring-buffers) -def channel-full? chan:&:channel:_elem -> result:bool [ - local-scope - load-inputs - # tmp = chan.first-free + 1 - tmp:num <- get *chan, first-free:offset - tmp <- add tmp, 1 - { - # if tmp == chan.capacity, tmp = 0 - len:num <- capacity chan - at-end?:bool <- greater-or-equal tmp, len - break-unless at-end? - tmp <- copy 0 - } - # return chan.first-full == tmp - full:num <- get *chan, first-full:offset - result <- equal full, tmp -] - -def capacity chan:&:channel:_elem -> result:num [ - local-scope - load-inputs - q:&:@:_elem <- get *chan, data:offset - result <- length *q -] - -## helpers for channels of characters in particular - -def buffer-lines in:&:source:char, buffered-out:&:sink:char -> buffered-out:&:sink:char, in:&:source:char [ - local-scope - load-inputs - # repeat forever - eof?:bool <- copy false - { - line:&:buffer:char <- new-buffer 30 - # read characters from 'in' until newline, copy into line - { - +next-character - c:char, eof?:bool, in <- read in - break-if eof? - # drop a character on backspace - { - # special-case: if it's a backspace - backspace?:bool <- equal c, 8 - break-unless backspace? - # drop previous character - { - buffer-length:num <- get *line, length:offset - buffer-empty?:bool <- equal buffer-length, 0 - break-if buffer-empty? - buffer-length <- subtract buffer-length, 1 - *line <- put *line, length:offset, buffer-length - } - # and don't append this one - loop +next-character - } - # append anything else - line <- append line, c - line-done?:bool <- equal c, 10/newline - break-if line-done? - loop - } - # copy line into 'buffered-out' - i:num <- copy 0 - line-contents:text <- get *line, data:offset - max:num <- get *line, length:offset - { - done?:bool <- greater-or-equal i, max - break-if done? - c:char <- index *line-contents, i - buffered-out <- write buffered-out, c - i <- add i, 1 - loop - } - { - break-unless eof? - buffered-out <- close buffered-out - return - } - loop - } -] - -scenario buffer-lines-blocks-until-newline [ - run [ - local-scope - source:&:source:char, sink:&:sink:char <- new-channel 10/capacity - _, buffered-stdin:&:sink:char/buffered-stdin <- new-channel 10/capacity - buffered-chan:&:channel:char <- get *buffered-stdin, chan:offset - empty?:bool <- channel-empty? buffered-chan - assert empty?, [ -F buffer-lines-blocks-until-newline: channel should be empty after init] - # buffer stdin into buffered-stdin, try to read from buffered-stdin - buffer-routine:num <- start-running buffer-lines, source, buffered-stdin - wait-for-routine-to-block buffer-routine - empty? <- channel-empty? buffered-chan - assert empty?:bool, [ -F buffer-lines-blocks-until-newline: channel should be empty after buffer-lines bring-up] - # write 'a' - sink <- write sink, 97/a - restart buffer-routine - wait-for-routine-to-block buffer-routine - empty? <- channel-empty? buffered-chan - assert empty?:bool, [ -F buffer-lines-blocks-until-newline: channel should be empty after writing 'a'] - # write 'b' - sink <- write sink, 98/b - restart buffer-routine - wait-for-routine-to-block buffer-routine - empty? <- channel-empty? buffered-chan - assert empty?:bool, [ -F buffer-lines-blocks-until-newline: channel should be empty after writing 'b'] - # write newline - sink <- write sink, 10/newline - restart buffer-routine - wait-for-routine-to-block buffer-routine - empty? <- channel-empty? buffered-chan - data-emitted?:bool <- not empty? - assert data-emitted?, [ -F buffer-lines-blocks-until-newline: channel should contain data after writing newline] - trace 1, [test], [reached end] - ] - trace-should-contain [ - test: reached end - ] -] - -def drain source:&:source:char -> result:text, source:&:source:char [ - local-scope - load-inputs - buf:&:buffer:char <- new-buffer 30 - { - c:char, done?:bool <- read source - break-if done? - buf <- append buf, c - loop - } - result <- buffer-to-array buf -] |