# Mu synchronizes between routines using channels rather than locks, like
# Erlang and Go.
#
# Key properties of channels:
#
# a) Writing to a full channel or reading from an empty one will put the
# current routine in 'waiting' state until the operation can be completed.
#
# b) Writing to a channel implicitly performs a deep copy. This prevents
# addresses from being shared between routines, and therefore eliminates all
# possibility of race conditions.
#
# There's still a narrow window for race conditions: the ingredients passed in
# to 'start-running'. Pass only channels into routines and you should be fine.
# Any other mutable ingredients will require locks.
scenario channel [
run [
local-scope
source:&:source:num, sink:&:sink:num <- new-channel 3/capacity
sink <- write sink, 34
10:num/raw, 11:bool/raw, source <- read source
]
memory-should-contain [
10 <- 34
11 <- 0 # read was successful
]
]
container channel:_elem [
lock:bool # inefficient but simple: serialize all reads as well as writes
first-full:num # for write
first-free:num # for read
# A circular buffer contains values from index first-full up to (but not
# including) index first-free. The reader always modifies it at first-full,
# while the writer always modifies it at first-free.
data:&:@:_elem
]
# Since channels have two ends, and since it's an error to use either end from
# multiple routines, let's distinguish the ends.
container source:_elem [
chan:&:channel:_elem
]
container sink:_elem [
chan:&:channel:_elem
]
def new-channel capacity:num -> in:&:source:_elem, out:&:sink:_elem [
local-scope
load-ingredients
result:&:channel:_elem <- new {(channel _elem): type}
*result <- put *result, first-full:offset, 0
*result <- put *result, first-free# new type to help incrementally scan arrays
container stream:_elem [
index:num
data:&:@:_elem
]
def new-stream s:&:@:_elem -> result:&:stream:_elem [
local-scope
load-inputs
return-unless s, null
result <- new {(stream _elem): type}
*result <- put *result, index:offset, 0
*result <- put *result, data:offset, s
]
def rewind in:&:stream:_elem -> in:&:stream:_elem [
local-scope
load-inputs
return-unless in
*in <- put *in, index:offset, 0
]
def read in:&:stream:_elem -> result:_elem, empty?:bool, in:&:stream:_elem [
local-scope
load-inputs
assert in, [cannot read; stream has no data]
empty? <- copy false
idx:num <- get *in, index:offset
s:&:@:_elem <- get *in, data:offset
len:num <- length *s
at-end?:bool <- greater-or-equal idx len
{
break-unless at-end?
empty-result:&:_elem <- new _elem:type
return *empty-result, true
}
result <- index *s, idx
idx <- add idx, 1
*in <- put *in, index:offset, idx
]
def peek in:&:stream:_elem -> result:_elem, empty?:bool [
local-scope
load-inputs
assert in, [cannot peek; stream has no data]
empty?:bool <- copy false
idx:num <- get *in, index:offset
s:&:@:_elem <- get *in, data:offset
len:num <- length *s
at-end?:bool <- greater-or-equal idx len
{
break-unless at-end?
empty-result:&:_elem <- new _elem:type
return *empty-result, true
}
result <- index *s, idx
]
def read-line in:&:stream:char -> result:text, in:&:stream:char [
local-scope
load-inputs
assert in, [cannot read-line; stream has no data]
idx:num <- get *in, index:offset
s:text <- get *in, data:offset
next-idx:num <- find-next s, 10/newline, idx
result <- copy-range s, idx, next-idx
idx <- add next-idx, 1 # skip newline
# write back
*in <- put *in, index:offset, idx
]
def end-of-stream? in:&:stream:_elem -> result:bool [
local-scope
load-inputs
assert in, [cannot check end-of-stream?; stream has no data]
idx:num <- get *in, index:offset
s:&:@:_elem <- get *in, data:offset
len:num <- length *s
result <- greater-or-equal idx, len
]