about summary refs log blame commit diff stats
path: root/066stream.mu
blob: b3202f65daefac3abb144fdccc1795930e190a3d (plain) (tree)
85b2f61b ^
pre { line-height: 125%; }
td.linenos .normal { color: inherit; background-color: transparent; padding-left: 5px; padding-right: 5px; }
span.linenos { color: inherit; background-color: transparent; padding-left: 5px; padding-right: 5px; }
td.linenos .special { color: #000000; background-color: #ffffc0; padding-left: 5px; padding-right: 5px; }
span.linenos.special { color: #000000; background-color: #ffffc0; padding-left: 5px; padding-right: 5px; }
.highlight .hll { background-color: #ffffcc }
.highlight .c { color: #888888 } /* Comment */
.highlight .err { color: #a61717; background-color: #e3d2d2 } /* Error */
.highlight .k { color: #008800; font-weight: bold } /* Keyword */
.highlight .ch { color: #888888 } /* Comment.Hashbang */
.highlight .cm { color: #888888 } /* Comment.Multiline */
.highlight .cp { color: #cc0000; font-weight: bold } /* Comment.Preproc */
.highlight .cpf { color: #888888 } /* Comment.PreprocFile */
.highlight .c1 { color: #888888 } /* Comment.Single */
.highlight .cs { color: #cc0000; font-weight: bold; background-color: #fff0f0 } /* Comment.Special */
.highlight .gd { color: #000000; background-color: #ffdddd } /* Generic.Deleted */
.highlight .ge { font-style: italic } /* Generic.Emph */
.highlight .ges { font-weight: bold; font-style: italic } /* Generic.EmphStrong */
.highlight .gr { color: #aa0000 } /* Generic.Error */
.highlight .gh { color: #333333 } /* Generic.Heading */
.highlight .gi { color: #000000; background-color: #ddffdd } /* Generic.Inserted */
.highlight .go { color: #888888 } /* Generic.Output */
.highlight .gp { color: #555555 } /* Generic.Prompt */
.highlight .gs { font-weight: bold } /* Generic.Strong */
.highlight .gu { color: #666666 } /* Generic.Subheading */
.highlight .gt { color: #aa0000 } /* Generic.Traceback */
.highlight .kc { color: #008800; font-weight: bold } /* Keyword.Constant */
.highlight .kd { color: #008800; font-weight: bold } /* Keyword.Declaration */
.highlight .kn { color: #008800; font-weight: bold } /* Keyword.Namespace */
.highlight .kp { color: #008800 } /* Keyword.Pseudo */
.highlight .kr { color: #008800; font-weight: bold } /* Keyword.Reserved */
.highlight .kt { color: #888888; font-weight: bold } /* Keyword.Type */
.highlight .m { color: #0000DD; font-weight: bold } /* Literal.Number */
.highlight .s { color: #dd2200; background-color: #fff0f0 } /* Literal.String */
.highlight .na { color: #336699 } /* Name.Attribute */
.highlight .nb { color: #003388 } /* Name.Builtin */
.highlight .nc { color: #bb0066; font-weight: bold } /* Name.Class */
.highlight .no { color: #003366; font-weight: bold } /* Name.Constant */
.highlight .nd { color: #555555 } /* Name.Decorator */
.highlight .ne { color: #bb0066; font-weight: bold } /* Name.Exception */
.highlight .nf { color: #0066bb; font-weight: bold } /* Name.Function */
.highlight .nl { color: #336699; font-style: italic } /* Name.Label */
.highlight .nn { color: #bb0066; font-weight: bold } /* Name.Namespace */
.highlight .py { color: #336699; font-weight: bold } /* Name.Property */
.highlight .nt { color: #bb0066; font-weight: bold } /* Name.Tag */
.highlight .nv { color: #336699 } /* Name.Variable */
.highlight .ow { color: #008800 } /* Operator.Word */
.highlight .w { color: #bbbbbb } /* Text.Whitespace */
.highlight .mb { color: #0000DD; font-weight: bold } /* Literal.Number.Bin */
.highlight .mf { color: #0000DD; font-weight: bold } /* Literal.Number.Float */
.highlight .mh { color: #0000DD; font-weight: bold } /* Literal.Number.Hex */
.highlight .mi { color: #0000DD; font-weight: bold } /* Literal.Number.Integer */
.highlight .mo { color: #0000DD; font-weight: bold } /* Literal.Number.Oct */
.highlight .sa { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Affix */
.highlight .sb { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Backtick */
.highlight .sc { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Char */
.highlight .dl { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Delimiter */
.highlight .sd { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Doc */
.highlight .s2 { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Double */
.highlight .se { color: #0044dd; background-color: #fff0f0 } /* Literal.String.Escape */
.highlight .sh { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Heredoc */
.highlight .si { color: #3333bb; background-color: #fff0f0 } /* Literal.String.Interpol */
.highlight .sx { color: #22bb22; background-color: #f0fff0 } /* Literal.String.Other */
.highlight .sr { color: #008800; background-color: #fff0ff } /* Literal.String.Regex */
.highlight .s1 { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Single */
.highlight .ss { color: #aa6600; background-color: #fff0f0 } /* Literal.String.Symbol */
.highlight .bp { color: #003388 } /* Name.Builtin.Pseudo */
.highlight .fm { color: #0066bb; font-weight: bold } /* Name.Function.Magic */
.highlight .vc { color: #336699 } /* Name.Variable.Class */
.highlight .vg { color: #dd7700 } /* Name.Variable.Global */
.highlight .vi { color: #3333bb } /* Name.Variable.Instance */
.highlight .vm { color: #336699 } /* Name.Variable.Magic */
.highlight .il { color: #0000DD; font-weight: bold } /* Literal.Number.Integer.Long */
# Mu synchronizes between routines using channels rather than locks, like
# Erlang and Go.
#
# Key properties of channels:
#
#   a) Writing to a full channel or reading from an empty one will put the
#   current routine in 'waiting' state until the operation can be completed.
#
#   b) Writing to a channel implicitly performs a deep copy. This prevents
#   addresses from being shared between routines, and therefore eliminates all
#   possibility of race conditions.
#
# There's still a narrow window for race conditions: the ingredients passed in
# to 'start-running'. Pass only channels into routines and you should be fine.
# Any other mutable ingredients will require locks.

scenario channel [
  run [
    local-scope
    source:&:source:num, sink:&:sink:num <- new-channel 3/capacity
    sink <- write sink, 34
    10:num/raw, 11:bool/raw, source <- read source
  ]
  memory-should-contain [
    10 <- 34
    11 <- 0  # read was successful
  ]
]

container channel:_elem [
  lock:bool  # inefficient but simple: serialize all reads as well as writes
  first-full:num  # for write
  first-free:num  # for read
  # A circular buffer contains values from index first-full up to (but not
  # including) index first-free. The reader always modifies it at first-full,
  # while the writer always modifies it at first-free.
  data:&:@:_elem
]

# Since channels have two ends, and since it's an error to use either end from
# multiple routines, let's distinguish the ends.

container source:_elem [
  chan:&:channel:_elem
]

container sink:_elem [
  chan:&:channel:_elem
]

def new-channel capacity:num -> in:&:source:_elem, out:&:sink:_elem [
  local-scope
  load-ingredients
  result:&:channel:_elem <- new {(channel _elem): type}
  *result <- put *result, first-full:offset, 0
  *result <- put *result, first-free
# new type to help incrementally scan arrays
container stream:_elem [
  index:num
  data:&:@:_elem
]

def new-stream s:&:@:_elem -> result:&:stream:_elem [
  local-scope
  load-inputs
  return-unless s, null
  result <- new {(stream _elem): type}
  *result <- put *result, index:offset, 0
  *result <- put *result, data:offset, s
]

def rewind in:&:stream:_elem -> in:&:stream:_elem [
  local-scope
  load-inputs
  return-unless in
  *in <- put *in, index:offset, 0
]

def read in:&:stream:_elem -> result:_elem, empty?:bool, in:&:stream:_elem [
  local-scope
  load-inputs
  assert in, [cannot read; stream has no data]
  empty? <- copy false
  idx:num <- get *in, index:offset
  s:&:@:_elem <- get *in, data:offset
  len:num <- length *s
  at-end?:bool <- greater-or-equal idx len
  {
    break-unless at-end?
    empty-result:&:_elem <- new _elem:type
    return *empty-result, true
  }
  result <- index *s, idx
  idx <- add idx, 1
  *in <- put *in, index:offset, idx
]

def peek in:&:stream:_elem -> result:_elem, empty?:bool [
  local-scope
  load-inputs
  assert in, [cannot peek; stream has no data]
  empty?:bool <- copy false
  idx:num <- get *in, index:offset
  s:&:@:_elem <- get *in, data:offset
  len:num <- length *s
  at-end?:bool <- greater-or-equal idx len
  {
    break-unless at-end?
    empty-result:&:_elem <- new _elem:type
    return *empty-result, true
  }
  result <- index *s, idx
]

def read-line in:&:stream:char -> result:text, in:&:stream:char [
  local-scope
  load-inputs
  assert in, [cannot read-line; stream has no data]
  idx:num <- get *in, index:offset
  s:text <- get *in, data:offset
  next-idx:num <- find-next s, 10/newline, idx
  result <- copy-range s, idx, next-idx
  idx <- add next-idx, 1  # skip newline
  # write back
  *in <- put *in, index:offset, idx
]

def end-of-stream? in:&:stream:_elem -> result:bool [
  local-scope
  load-inputs
  assert in, [cannot check end-of-stream?; stream has no data]
  idx:num <- get *in, index:offset
  s:&:@:_elem <- get *in, data:offset
  len:num <- length *s
  result <- greater-or-equal idx, len
]
lass="o">-buffer:&:@:_elem <- get *chan, data:offset result <- index *circular-buffer, full # clear the slot empty:&:_elem <- new _elem:type *circular-buffer <- put-index *circular-buffer, full, *empty # mark its slot as empty full <- add full, 1 { # wrap full around to 0 if necessary len:num <- length *circular-buffer at-end?:bool <- greater-or-equal full, len break-unless at-end? full <- copy 0 } # write back *chan <- put *chan, first-full:offset, full #? $print [relinquishing lock after reading], 10/newline reset lock ] # todo: create a notion of iterator and iterable so we can read/write whole # aggregates (arrays, lists, ..) of _elems at once. scenario channel-initialization [ run [ local-scope source:&:source:num <- new-channel 3/capacity chan:&:channel:num <- get *source, chan:offset 10:num/raw <- get *chan, first-full:offset 11:num/raw <- get *chan, first-free:offset ] memory-should-contain [ 10 <- 0 # first-full 11 <- 0 # first-free ] ] scenario channel-write-increments-free [ local-scope _, sink:&:sink:num <- new-channel 3/capacity run [ sink <- write sink, 34 chan:&:channel:num <- get *sink, chan:offset 10:num/raw <- get *chan, first-full:offset 11:num/raw <- get *chan, first-free:offset ] memory-should-contain [ 10 <- 0 # first-full 11 <- 1 # first-free ] ] scenario channel-read-increments-full [ local-scope source:&:source:num, sink:&:sink:num <- new-channel 3/capacity sink <- write sink, 34 run [ _, _, source <- read source chan:&:channel:num <- get *source, chan:offset 10:num/raw <- get *chan, first-full:offset 11:num/raw <- get *chan, first-free:offset ] memory-should-contain [ 10 <- 1 # first-full 11 <- 1 # first-free ] ] scenario channel-wrap [ local-scope # channel with just 1 slot source:&:source:num, sink:&:sink:num <- new-channel 1/capacity chan:&:channel:num <- get *source, chan:offset # write and read a value sink <- write sink, 34 _, _, source <- read source run [ # first-free will now be 1 10:num/raw <- get *chan, first-free:offset 11:num/raw <- get *chan, first-free:offset # write second value, verify that first-free wraps sink <- write sink, 34 20:num/raw <- get *chan, first-free:offset # read second value, verify that first-full wraps _, _, source <- read source 30:num/raw <- get *chan, first-full:offset ] memory-should-contain [ 10 <- 1 # first-free after first write 11 <- 1 # first-full after first read 20 <- 0 # first-free after second write, wrapped 30 <- 0 # first-full after second read, wrapped ] ] scenario channel-new-empty-not-full [ run [ local-scope source:&:source:num <- new-channel 3/capacity chan:&:channel:num <- get *source, chan:offset 10:bool/raw <- channel-empty? chan 11:bool/raw <- channel-full? chan ] memory-should-contain [ 10 <- 1 # empty? 11 <- 0 # full? ] ] scenario channel-write-not-empty [ local-scope source:&:source:num, sink:&:sink:num <- new-channel 3/capacity chan:&:channel:num <- get *source, chan:offset run [ sink <- write sink, 34 10:bool/raw <- channel-empty? chan 11:bool/raw <- channel-full? chan ] memory-should-contain [ 10 <- 0 # empty? 11 <- 0 # full? ] ] scenario channel-write-full [ local-scope source:&:source:num, sink:&:sink:num <- new-channel 1/capacity chan:&:channel:num <- get *source, chan:offset run [ sink <- write sink, 34 10:bool/raw <- channel-empty? chan 11:bool/raw <- channel-full? chan ] memory-should-contain [ 10 <- 0 # empty? 11 <- 1 # full? ] ] scenario channel-read-not-full [ local-scope source:&:source:num, sink:&:sink:num <- new-channel 1/capacity chan:&:channel:num <- get *source, chan:offset sink <- write sink, 34 run [ _, _, source <- read source 10:bool/raw <- channel-empty? chan 11:bool/raw <- channel-full? chan ] memory-should-contain [ 10 <- 1 # empty? 11 <- 0 # full? ] ] scenario channel-clear [ local-scope # create a channel with a few items source:&:source:num, sink:&:sink:num <- new-channel 3/capacity chan:&:channel:num <- get *sink, chan:offset write sink, 30 write sink, 31 write sink, 32 run [ clear source 10:bool/raw <- channel-empty? chan ] memory-should-contain [ 10 <- 1 # after the call to 'clear', the channel should be empty ] ] def clear in:&:source:_elem -> in:&:source:_elem [ local-scope load-ingredients chan:&:channel:_elem <- get *in, chan:offset { empty?:bool <- channel-empty? chan break-if empty? _, _, in <- read in loop } ] ## cancelling channels # every channel comes with a boolean signifying if it's been closed # initially this boolean is false container channel:_elem [ closed?:bool ] # a channel can be closed from either the source or the sink # both routines can modify the 'closed?' bit, but they can only ever set it, so this is a benign race def close x:&:source:_elem -> x:&:source:_elem [ local-scope load-ingredients chan:&:channel:_elem <- get *x, chan:offset *chan <- put *chan, closed?:offset, 1/true ] def close x:&:sink:_elem -> x:&:sink:_elem [ local-scope load-ingredients chan:&:channel:_elem <- get *x, chan:offset *chan <- put *chan, closed?:offset, 1/true ] # once a channel is closed from one side, no further operations are expected from that side # if a channel is closed for reading, # no further writes will be let through # if a channel is closed for writing, # future reads continue until the channel empties, # then the channel is also closed for reading after <channel-write-initial> [ closed?:bool <- get *chan, closed?:offset return-if closed? ] after <channel-read-empty> [ closed?:bool <- get *chan, closed?:offset { break-unless closed? empty-result:&:_elem <- new _elem:type current-routine-is-unblocked return *empty-result, 1/true } ] ## helpers # An empty channel has first-free and first-full both at the same value. def channel-empty? chan:&:channel:_elem -> result:bool [ local-scope load-ingredients # return chan.first-full == chan.first-free full:num <- get *chan, first-full:offset free:num <- get *chan, first-free:offset result <- equal full, free ] # A full channel has first-free just before first-full, wasting one slot. # (Other alternatives: https://www.snellman.net/blog/archive/2016-12-13-ring-buffers) def channel-full? chan:&:channel:_elem -> result:bool [ local-scope load-ingredients # tmp = chan.first-free + 1 tmp:num <- get *chan, first-free:offset tmp <- add tmp, 1 { # if tmp == chan.capacity, tmp = 0 len:num <- capacity chan at-end?:bool <- greater-or-equal tmp, len break-unless at-end? tmp <- copy 0 } # return chan.first-full == tmp full:num <- get *chan, first-full:offset result <- equal full, tmp ] def capacity chan:&:channel:_elem -> result:num [ local-scope load-ingredients q:&:@:_elem <- get *chan, data:offset result <- length *q ] ## helpers for channels of characters in particular def buffer-lines in:&:source:char, buffered-out:&:sink:char -> buffered-out:&:sink:char, in:&:source:char [ local-scope load-ingredients # repeat forever eof?:bool <- copy 0/false { line:&:buffer:char <- new-buffer 30 # read characters from 'in' until newline, copy into line { +next-character c:char, eof?:bool, in <- read in break-if eof? # drop a character on backspace { # special-case: if it's a backspace backspace?:bool <- equal c, 8 break-unless backspace? # drop previous character { buffer-length:num <- get *line, length:offset buffer-empty?:bool <- equal buffer-length, 0 break-if buffer-empty? buffer-length <- subtract buffer-length, 1 *line <- put *line, length:offset, buffer-length } # and don't append this one loop +next-character } # append anything else line <- append line, c line-done?:bool <- equal c, 10/newline break-if line-done? loop } # copy line into 'buffered-out' i:num <- copy 0 line-contents:text <- get *line, data:offset max:num <- get *line, length:offset { done?:bool <- greater-or-equal i, max break-if done? c:char <- index *line-contents, i buffered-out <- write buffered-out, c i <- add i, 1 loop } { break-unless eof? buffered-out <- close buffered-out return } loop } ] scenario buffer-lines-blocks-until-newline [ run [ local-scope source:&:source:char, sink:&:sink:char <- new-channel 10/capacity _, buffered-stdin:&:sink:char/buffered-stdin <- new-channel 10/capacity buffered-chan:&:channel:char <- get *buffered-stdin, chan:offset empty?:bool <- channel-empty? buffered-chan assert empty?, [ F buffer-lines-blocks-until-newline: channel should be empty after init] # buffer stdin into buffered-stdin, try to read from buffered-stdin buffer-routine:num <- start-running buffer-lines, source, buffered-stdin wait-for-routine-to-block buffer-routine empty? <- channel-empty? buffered-chan assert empty?:bool, [ F buffer-lines-blocks-until-newline: channel should be empty after buffer-lines bring-up] # write 'a' sink <- write sink, 97/a restart buffer-routine wait-for-routine-to-block buffer-routine empty? <- channel-empty? buffered-chan assert empty?:bool, [ F buffer-lines-blocks-until-newline: channel should be empty after writing 'a'] # write 'b' sink <- write sink, 98/b restart buffer-routine wait-for-routine-to-block buffer-routine empty? <- channel-empty? buffered-chan assert empty?:bool, [ F buffer-lines-blocks-until-newline: channel should be empty after writing 'b'] # write newline sink <- write sink, 10/newline restart buffer-routine wait-for-routine-to-block buffer-routine empty? <- channel-empty? buffered-chan data-emitted?:bool <- not empty? assert data-emitted?, [ F buffer-lines-blocks-until-newline: channel should contain data after writing newline] trace 1, [test], [reached end] ] trace-should-contain [ test: reached end ] ] def drain source:&:source:char -> result:text, source:&:source:char [ local-scope load-ingredients buf:&:buffer:char <- new-buffer 30 { c:char, done?:bool <- read source break-if done? buf <- append buf, c loop } result <- buffer-to-array buf ]