about summary refs log tree commit diff stats
path: root/archive/2.vm/075channel.mu
diff options
context:
space:
mode:
authorKartik Agaram <vc@akkartik.com>2020-01-01 17:04:37 -0800
committerKartik Agaram <vc@akkartik.com>2020-01-01 17:04:37 -0800
commit2a4088119cf41175457414dfa59bd4064b8f0562 (patch)
tree64fe184e399f9870ebd481a90eec34d51e5dff68 /archive/2.vm/075channel.mu
parent23fd294d85959c6b476bcdc35ed6ad508cc99b8f (diff)
downloadmu-2a4088119cf41175457414dfa59bd4064b8f0562.tar.gz
5852
Diffstat (limited to 'archive/2.vm/075channel.mu')
-rw-r--r--archive/2.vm/075channel.mu510
1 files changed, 0 insertions, 510 deletions
diff --git a/archive/2.vm/075channel.mu b/archive/2.vm/075channel.mu
deleted file mode 100644
index c64492cb..00000000
--- a/archive/2.vm/075channel.mu
+++ /dev/null
@@ -1,510 +0,0 @@
-# Mu synchronizes between routines using channels rather than locks, like
-# Erlang and Go.
-#
-# The key property of channels: Writing to a full channel or reading from an
-# empty one will put the current routine in 'waiting' state until the
-# operation can be completed.
-#
-# Beware of addresses passed into channels. They can cause race conditions.
-
-scenario channel [
-  run [
-    local-scope
-    source:&:source:num, sink:&:sink:num <- new-channel 3/capacity
-    sink <- write sink, 34
-    10:num/raw, 11:bool/raw, source <- read source
-  ]
-  memory-should-contain [
-    10 <- 34
-    11 <- 0  # read was successful
-  ]
-]
-
-container channel:_elem [
-  lock:bool  # inefficient but simple: serialize all reads as well as writes
-  first-full:num  # for write
-  first-free:num  # for read
-  # A circular buffer contains values from index first-full up to (but not
-  # including) index first-free. The reader always modifies it at first-full,
-  # while the writer always modifies it at first-free.
-  data:&:@:_elem
-]
-
-# Since channels have two ends, and since it's an error to use either end from
-# multiple routines, let's distinguish the ends.
-
-container source:_elem [
-  chan:&:channel:_elem
-]
-
-container sink:_elem [
-  chan:&:channel:_elem
-]
-
-def new-channel capacity:num -> in:&:source:_elem, out:&:sink:_elem [
-  local-scope
-  load-inputs
-  result:&:channel:_elem <- new {(channel _elem): type}
-  *result <- put *result, first-full:offset, 0
-  *result <- put *result, first-free:offset, 0
-  capacity <- add capacity, 1  # unused slot for 'full?' below
-  data:&:@:_elem <- new _elem:type, capacity
-  *result <- put *result, data:offset, data
-  in <- new {(source _elem): type}
-  *in <- put *in, chan:offset, result
-  out <- new {(sink _elem): type}
-  *out <- put *out, chan:offset, result
-]
-
-# write a value to a channel
-def write out:&:sink:_elem, val:_elem -> out:&:sink:_elem [
-  local-scope
-  load-inputs
-  assert out, [write to null channel]
-  chan:&:channel:_elem <- get *out, chan:offset
-  <channel-write-initial>
-  # block until lock is acquired AND queue has room
-  lock:location <- get-location *chan, lock:offset
-#?   $print [write], 10/newline
-  {
-#?     $print [trying to acquire lock for writing], 10/newline
-    wait-for-reset-then-set lock
-#?     $print [lock acquired for writing], 10/newline
-    full?:bool <- channel-full? chan
-    break-unless full?
-#?     $print [but channel is full; relinquishing lock], 10/newline
-    # channel is full; relinquish lock and give a reader the opportunity to
-    # create room on it
-    reset lock
-    current-routine-is-blocked
-    switch  # avoid spinlocking
-    loop
-  }
-  current-routine-is-unblocked
-#?   $print [performing write], 10/newline
-  # store a deep copy of val
-  circular-buffer:&:@:_elem <- get *chan, data:offset
-  free:num <- get *chan, first-free:offset
-  *circular-buffer <- put-index *circular-buffer, free, val
-  # mark its slot as filled
-  free <- add free, 1
-  {
-    # wrap free around to 0 if necessary
-    len:num <- length *circular-buffer
-    at-end?:bool <- greater-or-equal free, len
-    break-unless at-end?
-    free <- copy 0
-  }
-  # write back
-  *chan <- put *chan, first-free:offset, free
-#?   $print [relinquishing lock after writing], 10/newline
-  reset lock
-]
-
-# read a value from a channel
-def read in:&:source:_elem -> result:_elem, eof?:bool, in:&:source:_elem [
-  local-scope
-  load-inputs
-  assert in, [read on null channel]
-  eof? <- copy false  # default result
-  chan:&:channel:_elem <- get *in, chan:offset
-  # block until lock is acquired AND queue has data
-  lock:location <- get-location *chan, lock:offset
-#?   $print [read], 10/newline
-  {
-#?     $print [trying to acquire lock for reading], 10/newline
-    wait-for-reset-then-set lock
-#?     $print [lock acquired for reading], 10/newline
-    empty?:bool <- channel-empty? chan
-    break-unless empty?
-#?     $print [but channel is empty; relinquishing lock], 10/newline
-    # channel is empty; relinquish lock and give a writer the opportunity to
-    # add to it
-    reset lock
-    current-routine-is-blocked
-    <channel-read-empty>
-    switch  # avoid spinlocking
-    loop
-  }
-  current-routine-is-unblocked
-  # pull result off
-  full:num <- get *chan, first-full:offset
-  circular-buffer:&:@:_elem <- get *chan, data:offset
-  result <- index *circular-buffer, full
-  # clear the slot
-  empty:&:_elem <- new _elem:type
-  *circular-buffer <- put-index *circular-buffer, full, *empty
-  # mark its slot as empty
-  full <- add full, 1
-  {
-    # wrap full around to 0 if necessary
-    len:num <- length *circular-buffer
-    at-end?:bool <- greater-or-equal full, len
-    break-unless at-end?
-    full <- copy 0
-  }
-  # write back
-  *chan <- put *chan, first-full:offset, full
-#?   $print [relinquishing lock after reading], 10/newline
-  reset lock
-]
-
-# todo: create a notion of iterator and iterable so we can read/write whole
-# aggregates (arrays, lists, ..) of _elems at once.
-
-scenario channel-initialization [
-  run [
-    local-scope
-    source:&:source:num <- new-channel 3/capacity
-    chan:&:channel:num <- get *source, chan:offset
-    10:num/raw <- get *chan, first-full:offset
-    11:num/raw <- get *chan, first-free:offset
-  ]
-  memory-should-contain [
-    10 <- 0  # first-full
-    11 <- 0  # first-free
-  ]
-]
-
-scenario channel-write-increments-free [
-  local-scope
-  _, sink:&:sink:num <- new-channel 3/capacity
-  run [
-    sink <- write sink, 34
-    chan:&:channel:num <- get *sink, chan:offset
-    10:num/raw <- get *chan, first-full:offset
-    11:num/raw <- get *chan, first-free:offset
-  ]
-  memory-should-contain [
-    10 <- 0  # first-full
-    11 <- 1  # first-free
-  ]
-]
-
-scenario channel-read-increments-full [
-  local-scope
-  source:&:source:num, sink:&:sink:num <- new-channel 3/capacity
-  sink <- write sink, 34
-  run [
-    _, _, source <- read source
-    chan:&:channel:num <- get *source, chan:offset
-    10:num/raw <- get *chan, first-full:offset
-    11:num/raw <- get *chan, first-free:offset
-  ]
-  memory-should-contain [
-    10 <- 1  # first-full
-    11 <- 1  # first-free
-  ]
-]
-
-scenario channel-wrap [
-  local-scope
-  # channel with just 1 slot
-  source:&:source:num, sink:&:sink:num <- new-channel 1/capacity
-  chan:&:channel:num <- get *source, chan:offset
-  # write and read a value
-  sink <- write sink, 34
-  _, _, source <- read source
-  run [
-    # first-free will now be 1
-    10:num/raw <- get *chan, first-free:offset
-    11:num/raw <- get *chan, first-free:offset
-    # write second value, verify that first-free wraps
-    sink <- write sink, 34
-    20:num/raw <- get *chan, first-free:offset
-    # read second value, verify that first-full wraps
-    _, _, source <- read source
-    30:num/raw <- get *chan, first-full:offset
-  ]
-  memory-should-contain [
-    10 <- 1  # first-free after first write
-    11 <- 1  # first-full after first read
-    20 <- 0  # first-free after second write, wrapped
-    30 <- 0  # first-full after second read, wrapped
-  ]
-]
-
-scenario channel-new-empty-not-full [
-  run [
-    local-scope
-    source:&:source:num <- new-channel 3/capacity
-    chan:&:channel:num <- get *source, chan:offset
-    10:bool/raw <- channel-empty? chan
-    11:bool/raw <- channel-full? chan
-  ]
-  memory-should-contain [
-    10 <- 1  # empty?
-    11 <- 0  # full?
-  ]
-]
-
-scenario channel-write-not-empty [
-  local-scope
-  source:&:source:num, sink:&:sink:num <- new-channel 3/capacity
-  chan:&:channel:num <- get *source, chan:offset
-  run [
-    sink <- write sink, 34
-    10:bool/raw <- channel-empty? chan
-    11:bool/raw <- channel-full? chan
-  ]
-  memory-should-contain [
-    10 <- 0  # empty?
-    11 <- 0  # full?
-  ]
-]
-
-scenario channel-write-full [
-  local-scope
-  source:&:source:num, sink:&:sink:num <- new-channel 1/capacity
-  chan:&:channel:num <- get *source, chan:offset
-  run [
-    sink <- write sink, 34
-    10:bool/raw <- channel-empty? chan
-    11:bool/raw <- channel-full? chan
-  ]
-  memory-should-contain [
-    10 <- 0  # empty?
-    11 <- 1  # full?
-  ]
-]
-
-scenario channel-read-not-full [
-  local-scope
-  source:&:source:num, sink:&:sink:num <- new-channel 1/capacity
-  chan:&:channel:num <- get *source, chan:offset
-  sink <- write sink, 34
-  run [
-    _, _, source <- read source
-    10:bool/raw <- channel-empty? chan
-    11:bool/raw <- channel-full? chan
-  ]
-  memory-should-contain [
-    10 <- 1  # empty?
-    11 <- 0  # full?
-  ]
-]
-
-scenario channel-clear [
-  local-scope
-  # create a channel with a few items
-  source:&:source:num, sink:&:sink:num <- new-channel 3/capacity
-  chan:&:channel:num <- get *sink, chan:offset
-  write sink, 30
-  write sink, 31
-  write sink, 32
-  run [
-    clear source
-    10:bool/raw <- channel-empty? chan
-  ]
-  memory-should-contain [
-    10 <- 1  # after the call to 'clear', the channel should be empty
-  ]
-]
-
-def clear in:&:source:_elem -> in:&:source:_elem [
-  local-scope
-  load-inputs
-  chan:&:channel:_elem <- get *in, chan:offset
-  {
-    empty?:bool <- channel-empty? chan
-    break-if empty?
-    _, _, in <- read in
-    loop
-  }
-]
-
-## cancelling channels
-
-# every channel comes with a boolean signifying if it's been closed
-# initially this boolean is false
-container channel:_elem [
-  closed?:bool
-]
-
-# a channel can be closed from either the source or the sink
-# both routines can modify the 'closed?' bit, but they can only ever set it, so this is a benign race
-def close x:&:source:_elem -> x:&:source:_elem [
-  local-scope
-  load-inputs
-  chan:&:channel:_elem <- get *x, chan:offset
-  *chan <- put *chan, closed?:offset, true
-]
-def close x:&:sink:_elem -> x:&:sink:_elem [
-  local-scope
-  load-inputs
-  chan:&:channel:_elem <- get *x, chan:offset
-  *chan <- put *chan, closed?:offset, true
-]
-
-# once a channel is closed from one side, no further operations are expected from that side
-# if a channel is closed for reading,
-#   no further writes will be let through
-# if a channel is closed for writing,
-#   future reads continue until the channel empties,
-#   then the channel is also closed for reading
-after <channel-write-initial> [
-  closed?:bool <- get *chan, closed?:offset
-  return-if closed?
-]
-after <channel-read-empty> [
-  closed?:bool <- get *chan, closed?:offset
-  {
-    break-unless closed?
-    empty-result:&:_elem <- new _elem:type
-    current-routine-is-unblocked
-    return *empty-result, true
-  }
-]
-
-## helpers
-
-# An empty channel has first-free and first-full both at the same value.
-def channel-empty? chan:&:channel:_elem -> result:bool [
-  local-scope
-  load-inputs
-  # return chan.first-full == chan.first-free
-  full:num <- get *chan, first-full:offset
-  free:num <- get *chan, first-free:offset
-  result <- equal full, free
-]
-
-# A full channel has first-free just before first-full, wasting one slot.
-# (Other alternatives: https://www.snellman.net/blog/archive/2016-12-13-ring-buffers)
-def channel-full? chan:&:channel:_elem -> result:bool [
-  local-scope
-  load-inputs
-  # tmp = chan.first-free + 1
-  tmp:num <- get *chan, first-free:offset
-  tmp <- add tmp, 1
-  {
-    # if tmp == chan.capacity, tmp = 0
-    len:num <- capacity chan
-    at-end?:bool <- greater-or-equal tmp, len
-    break-unless at-end?
-    tmp <- copy 0
-  }
-  # return chan.first-full == tmp
-  full:num <- get *chan, first-full:offset
-  result <- equal full, tmp
-]
-
-def capacity chan:&:channel:_elem -> result:num [
-  local-scope
-  load-inputs
-  q:&:@:_elem <- get *chan, data:offset
-  result <- length *q
-]
-
-## helpers for channels of characters in particular
-
-def buffer-lines in:&:source:char, buffered-out:&:sink:char -> buffered-out:&:sink:char, in:&:source:char [
-  local-scope
-  load-inputs
-  # repeat forever
-  eof?:bool <- copy false
-  {
-    line:&:buffer:char <- new-buffer 30
-    # read characters from 'in' until newline, copy into line
-    {
-      +next-character
-      c:char, eof?:bool, in <- read in
-      break-if eof?
-      # drop a character on backspace
-      {
-        # special-case: if it's a backspace
-        backspace?:bool <- equal c, 8
-        break-unless backspace?
-        # drop previous character
-        {
-          buffer-length:num <- get *line, length:offset
-          buffer-empty?:bool <- equal buffer-length, 0
-          break-if buffer-empty?
-          buffer-length <- subtract buffer-length, 1
-          *line <- put *line, length:offset, buffer-length
-        }
-        # and don't append this one
-        loop +next-character
-      }
-      # append anything else
-      line <- append line, c
-      line-done?:bool <- equal c, 10/newline
-      break-if line-done?
-      loop
-    }
-    # copy line into 'buffered-out'
-    i:num <- copy 0
-    line-contents:text <- get *line, data:offset
-    max:num <- get *line, length:offset
-    {
-      done?:bool <- greater-or-equal i, max
-      break-if done?
-      c:char <- index *line-contents, i
-      buffered-out <- write buffered-out, c
-      i <- add i, 1
-      loop
-    }
-    {
-      break-unless eof?
-      buffered-out <- close buffered-out
-      return
-    }
-    loop
-  }
-]
-
-scenario buffer-lines-blocks-until-newline [
-  run [
-    local-scope
-    source:&:source:char, sink:&:sink:char <- new-channel 10/capacity
-    _, buffered-stdin:&:sink:char/buffered-stdin <- new-channel 10/capacity
-    buffered-chan:&:channel:char <- get *buffered-stdin, chan:offset
-    empty?:bool <- channel-empty? buffered-chan
-    assert empty?, [ 
-F buffer-lines-blocks-until-newline: channel should be empty after init]
-    # buffer stdin into buffered-stdin, try to read from buffered-stdin
-    buffer-routine:num <- start-running buffer-lines, source, buffered-stdin
-    wait-for-routine-to-block buffer-routine
-    empty? <- channel-empty? buffered-chan
-    assert empty?:bool, [ 
-F buffer-lines-blocks-until-newline: channel should be empty after buffer-lines bring-up]
-    # write 'a'
-    sink <- write sink, 97/a
-    restart buffer-routine
-    wait-for-routine-to-block buffer-routine
-    empty? <- channel-empty? buffered-chan
-    assert empty?:bool, [ 
-F buffer-lines-blocks-until-newline: channel should be empty after writing 'a']
-    # write 'b'
-    sink <- write sink, 98/b
-    restart buffer-routine
-    wait-for-routine-to-block buffer-routine
-    empty? <- channel-empty? buffered-chan
-    assert empty?:bool, [ 
-F buffer-lines-blocks-until-newline: channel should be empty after writing 'b']
-    # write newline
-    sink <- write sink, 10/newline
-    restart buffer-routine
-    wait-for-routine-to-block buffer-routine
-    empty? <- channel-empty? buffered-chan
-    data-emitted?:bool <- not empty?
-    assert data-emitted?, [ 
-F buffer-lines-blocks-until-newline: channel should contain data after writing newline]
-    trace 1, [test], [reached end]
-  ]
-  trace-should-contain [
-    test: reached end
-  ]
-]
-
-def drain source:&:source:char -> result:text, source:&:source:char [
-  local-scope
-  load-inputs
-  buf:&:buffer:char <- new-buffer 30
-  {
-    c:char, done?:bool <- read source
-    break-if done?
-    buf <- append buf, c
-    loop
-  }
-  result <- buffer-to-array buf
-]