1 # Mu synchronizes between routines using channels rather than locks, like
  2 # Erlang and Go.
  3 #
  4 # Key properties of channels:
  5 #
  6 #   a) Writing to a full channel or reading from an empty one will put the
  7 #   current routine in 'waiting' state until the operation can be completed.
  8 #
  9 #   b) Writing to a channel implicitly performs a deep copy. This prevents
 10 #   addresses from being shared between routines, and therefore eliminates all
 11 #   possibility of race conditions.
 12 #
 13 # There's still a narrow window for race conditions: the inputs passed in
 14 # to 'start-running'. Pass only channels into routines and you should be fine.
 15 # Any other mutable inputs will require locks.
 16 
 17 scenario channel [
 18   run [
 19     local-scope
 20     source:&:source:num, sink:&:sink:num <- new-channel 3/capacity
 21     sink <- write sink, 34
 22     10:num/raw, 11:bool/raw, source <- read source
 23   ]
 24   memory-should-contain [
 25     10 <- 34
 26     11 <- 0  # read was successful
 27   ]
 28 ]
 29 
 30 container channel:_elem [
 31   lock:bool  # inefficient but simple: serialize all reads as well as writes
 32   first-full:num  # for write
 33   first-free:num  # for read
 34   # A circular buffer contains values from index first-full up to (but not
 35   # including) index first-free. The reader always modifies it at first-full,
 36   # while the writer always modifies it at first-free.
 37   data:&:@:_elem
 38 ]
 39 
 40 # Since channels have two ends, and since it's an error to use either end from
 41 # multiple routines, let's distinguish the ends.
 42 
 43 container source:_elem [
 44   chan:&:channel:_elem
 45 ]
 46 
 47 container sink:_elem [
 48   chan:&:channel:_elem
 49 ]
 50 
 51 def new-channel capacity:num -> in:&:source:_elem, out:&:sink:_elem [
 52   local-scope
 53   load-inputs
 54   result:&:channel:_elem <- new {(channel _elem): type}
 55   *result <- put *result, first-full:offset, 0
 56   *result <- put *result, first-free:offset, 0
 57   capacity <- add capacity, 1  # unused slot for 'full?' below
 58   data:&:@:_elem <- new _elem:type, capacity
 59   *result <- put *result, data:offset, data
 60   in <- new {(source _elem): type}
 61   *in <- put *in, chan:offset, result
 62   out <- new {(sink _elem): type}
 63   *out <- put *out, chan:offset, result
 64 ]
 65 
 66 # write a value to a channel
 67 def write out:&:sink:_elem, val:_elem -> out:&:sink:_elem [
 68   local-scope
 69   load-inputs
 70   assert out, [write to null channel]
 71   chan:&:channel:_elem <- get *out, chan:offset
 72   <channel-write-initial>
 73   # block until lock is acquired AND queue has room
 74   lock:location <- get-location *chan, lock:offset
 75 #?   $print [write], 10/newline
 76   {
 77 #?     $print [trying to acquire lock for writing], 10/newline
 78     wait-for-reset-then-set lock
 79 #?     $print [lock acquired for writing], 10/newline
 80     full?:bool <- channel-full? chan
 81     break-unless full?
 82 #?     $print [but channel is full; relinquishing lock], 10/newline
 83     # channel is full; relinquish lock and give a reader the opportunity to
 84     # create room on it
 85     reset lock
 86     current-routine-is-blocked
 87     switch  # avoid spinlocking
 88     loop
 89   }
 90   current-routine-is-unblocked
 91 #?   $print [performing write], 10/newline
 92   # store a deep copy of val
 93   circular-buffer:&:@:_elem <- get *chan, data:offset
 94   free:num <- get *chan, first-free:offset
 95   val-copy:_elem <- deep-copy val  # on this instruction rests all Mu's concurrency-safety
 96   *circular-buffer <- put-index *circular-buffer, free, val-copy
 97   # mark its slot as filled
 98   free <- add free, 1
 99   {
100     # wrap free around to 0 if necessary
101     len:num <- length *circular-buffer
102     at-end?:bool <- greater-or-equal free, len
103     break-unless at-end?
104     free <- copy 0
105   }
106   # write back
107   *chan <- put *chan, first-free:offset, free
108 #?   $print [relinquishing lock after writing], 10/newline
109   reset lock
110 ]
111 
112 # read a value from a channel
113 def read in:&:source:_elem -> result:_elem, eof?:bool, in:&:source:_elem [
114   local-scope
115   load-inputs
116   assert in, [read on null channel]
117   eof? <- copy 0/false  # default result
118   chan:&:channel:_elem <- get *in, chan:offset
119   # block until lock is acquired AND queue has data
120   lock:location <- get-location *chan, lock:offset
121 #?   $print [read], 10/newline
122   {
123 #?     $print [trying to acquire lock for reading], 10/newline
124     wait-for-reset-then-set lock
125 #?     $print [lock acquired for reading], 10/newline
126     empty?:bool <- channel-empty? chan
127     break-unless empty?
128 #?     $print [but channel is empty; relinquishing lock], 10/newline
129     # channel is empty; relinquish lock and give a writer the opportunity to
130     # add to it
131     reset lock
132     current-routine-is-blocked
133     <channel-read-empty>
134     switch  # avoid spinlocking
135     loop
136   }
137   current-routine-is-unblocked
138   # pull result off
139   full:num <- get *chan, first-full:offset
140   circular-buffer:&:@:_elem <- get *chan, data:offset
141   result <- index *circular-buffer, full
142   # clear the slot
143   empty:&:_elem <- new _elem:type
144   *circular-buffer <- put-index *circular-buffer, full, *empty
145   # mark its slot as empty
146   full <- add full, 1
147   {
148     # wrap full around to 0 if necessary
149     len:num <- length *circular-buffer
150     at-end?:bool <- greater-or-equal full, len
151     break-unless at-end?
152     full <- copy 0
153   }
154   # write back
155   *chan <- put *chan, first-full:offset, full
156 #?   $print [relinquishing lock after reading], 10/newline
157   reset lock
158 ]
159 
160 # todo: create a notion of iterator and iterable so we can read/write whole
161 # aggregates (arrays, lists, ..) of _elems at once.
162 
163 scenario channel-initialization [
164   run [
165     local-scope
166     source:&:source:num <- new-channel 3/capacity
167     chan:&:channel:num <- get *source, chan:offset
168     10:num/raw <- get *chan, first-full:offset
169     11:num/raw <- get *chan, first-free:offset
170   ]
171   memory-should-contain [
172     10 <- 0  # first-full
173     11 <- 0  # first-free
174   ]
175 ]
176 
177 scenario channel-write-increments-free [
178   local-scope
179   _, sink:&:sink:num <- new-channel 3/capacity
180   run [
181     sink <- write sink, 34
182     chan:&:channel:num <- get *sink, chan:offset
183     10:num/raw <- get *chan, first-full:offset
184     11:num/raw <- get *chan, first-free:offset
185   ]
186   memory-should-contain [
187     10 <- 0  # first-full
188     11 <- 1  # first-free
189   ]
190 ]
191 
192 scenario channel-read-increments-full [
193   local-scope
194   source:&:source:num, sink:&:sink:num <- new-channel 3/capacity
195   sink <- write sink, 34
196   run [
197     _, _, source <- read source
198     chan:&:channel:num <- get *source, chan:offset
199     10:num/raw <- get *chan, first-full:offset
200     11:num/raw <- get *chan, first-free:offset
201   ]
202   memory-should-contain [
203     10 <- 1  # first-full
204     11 <- 1  # first-free
205   ]
206 ]
207 
208 scenario channel-wrap [
209   local-scope
210   # channel with just 1 slot
211   source:&:source:num, sink:&:sink:num <- new-channel 1/capacity
212   chan:&:channel:num <- get *source, chan:offset
213   # write and read a value
214   sink <- write sink, 34
215   _, _, source <- read source
216   run [
217     # first-free will now be 1
218     10:num/raw <- get *chan, first-free:offset
219     11:num/raw <- get *chan, first-free:offset
220     # write second value, verify that first-free wraps
221     sink <- write sink, 34
222     20:num/raw <- get *chan, first-free:offset
223     # read second value, verify that first-full wraps
224     _, _, source <- read source
225     30:num/raw <- get *chan, first-full:offset
226   ]
227   memory-should-contain [
228     10 <- 1  # first-free after first write
229     11 <- 1  # first-full after first read
230     20 <- 0  # first-free after second write, wrapped
231     30 <- 0  # first-full after second read, wrapped
232   ]
233 ]
234 
235 scenario channel-new-empty-not-full [
236   run [
237     local-scope
238     source:&:source:num <- new-channel 3/capacity
239     chan:&:channel:num <- get *source, chan:offset
240     10:bool/raw <- channel-empty? chan
241     11:bool/raw <- channel-full? chan
242   ]
243   memory-should-contain [
244     10 <- 1  # empty?
245     11 <- 0  # full?
246   ]
247 ]
248 
249 scenario channel-write-not-empty [
250   local-scope
251   source:&:source:num, sink:&:sink:num <- new-channel 3/capacity
252   chan:&:channel:num <- get *source, chan:offset
253   run [
254     sink <- write sink, 34
255     10:bool/raw <- channel-empty? chan
256     11:bool/raw <- channel-full? chan
257   ]
258   memory-should-contain [
259     10 <- 0  # empty?
260     11 <- 0  # full?
261   ]
262 ]
263 
264 scenario channel-write-full [
265   local-scope
266   source:&:source:num, sink:&:sink:num <- new-channel 1/capacity
267   chan:&:channel:num <- get *source, chan:offset
268   run [
269     sink <- write sink, 34
270     10:bool/raw <- channel-empty? chan
271     11:bool/raw <- channel-full? chan
272   ]
273   memory-should-contain [
274     10 <- 0  # empty?
275     11 <- 1  # full?
276   ]
277 ]
278 
279 scenario channel-read-not-full [
280   local-scope
281   source:&:source:num, sink:&:sink:num <- new-channel 1/capacity
282   chan:&:channel:num <- get *source, chan:offset
283   sink <- write sink, 34
284   run [
285     _, _, source <- read source
286     10:bool/raw <- channel-empty? chan
287     11:bool/raw <- channel-full? chan
288   ]
289   memory-should-contain [
290     10 <- 1  # empty?
291     11 <- 0  # full?
292   ]
293 ]
294 
295 scenario channel-clear [
296   local-scope
297   # create a channel with a few items
298   source:&:source:num, sink:&:sink:num <- new-channel 3/capacity
299   chan:&:channel:num <- get *sink, chan:offset
300   write sink, 30
301   write sink, 31
302   write sink, 32
303   run [
304     clear source
305     10:bool/raw <- channel-empty? chan
306   ]
307   memory-should-contain [
308     10 <- 1  # after the call to 'clear', the channel should be empty
309   ]
310 ]
311 
312 def clear in:&:source:_elem -> in:&:source:_elem [
313   local-scope
314   load-inputs
315   chan:&:channel:_elem <- get *in, chan:offset
316   {
317     empty?:bool <- channel-empty? chan
318     break-if empty?
319     _, _, in <- read in
320     loop
321   }
322 ]
323 
324 ## cancelling channels
325 
326 # every channel comes with a boolean signifying if it's been closed
327 # initially this boolean is false
328 container channel:_elem [
329   closed?:bool
330 ]
331 
332 # a channel can be closed from either the source or the sink
333 # both routines can modify the 'closed?' bit, but they can only ever set it, so this is a benign race
334 def close x:&:source:_elem -> x:&:source:_elem [
335   local-scope
336   load-inputs
337   chan:&:channel:_elem <- get *x, chan:offset
338   *chan <- put *chan, closed?:offset, 1/true
339 ]
340 def close x:&:sink:_elem -> x:&:sink:_elem [
341   local-scope
342   load-inputs
343   chan:&:channel:_elem <- get *x, chan:offset
344   *chan <- put *chan, closed?:offset, 1/true
345 ]
346 
347 # once a channel is closed from one side, no further operations are expected from that side
348 # if a channel is closed for reading,
349 #   no further writes will be let through
350 # if a channel is closed for writing,
351 #   future reads continue until the channel empties,
352 #   then the channel is also closed for reading
353 after <channel-write-initial> [
354   closed?:bool <- get *chan, closed?:offset
355   return-if closed?
356 ]
357 after <channel-read-empty> [
358   closed?:bool <- get *chan, closed?:offset
359   {
360     break-unless closed?
361     empty-result:&:_elem <- new _elem:type
362     current-routine-is-unblocked
363     return *empty-result, 1/true
364   }
365 ]
366 
367 ## helpers
368 
369 # An empty channel has first-free and first-full both at the same value.
370 def channel-empty? chan:&:channel:_elem -> result:bool [
371   local-scope
372   load-inputs
373   # return chan.first-full == chan.first-free
374   full:num <- get *chan, first-full:offset
375   free:num <- get *chan, first-free:offset
376   result <- equal full, free
377 ]
378 
379 # A full channel has first-free just before first-full, wasting one slot.
380 # (Other alternatives: https://www.snellman.net/blog/archive/2016-12-13-ring-buffers)
381 def channel-full? chan:&:channel:_elem -> result:bool [
382   local-scope
383   load-inputs
384   # tmp = chan.first-free + 1
385   tmp:num <- get *chan, first-free:offset
386   tmp <- add tmp, 1
387   {
388     # if tmp == chan.capacity, tmp = 0
389     len:num <- capacity chan
390     at-end?:bool <- greater-or-equal tmp, len
391     break-unless at-end?
392     tmp <- copy 0
393   }
394   # return chan.first-full == tmp
395   full:num <- get *chan, first-full:offset
396   result <- equal full, tmp
397 ]
398 
399 def capacity chan:&:channel:_elem -> result:num [
400   local-scope
401   load-inputs
402   q:&:@:_elem <- get *chan, data:offset
403   result <- length *q
404 ]
405 
406 ## helpers for channels of characters in particular
407 
408 def buffer-lines in:&:source:char, buffered-out:&:sink:char -> buffered-out:&:sink:char, in:&:source:char [
409   local-scope
410   load-inputs
411   # repeat forever
412   eof?:bool <- copy 0/false
413   {
414     line:&:buffer:char <- new-buffer 30
415     # read characters from 'in' until newline, copy into line
416     {
417       +next-character
418       c:char, eof?:bool, in <- read in
419       break-if eof?
420       # drop a character on backspace
421       {
422         # special-case: if it's a backspace
423         backspace?:bool <- equal c, 8
424         break-unless backspace?
425         # drop previous character
426         {
427           buffer-length:num <- get *line, length:offset
428           buffer-empty?:bool <- equal buffer-length, 0
429           break-if buffer-empty?
430           buffer-length <- subtract buffer-length, 1
431           *line <- put *line, length:offset, buffer-length
432         }
433         # and don't append this one
434         loop +next-character
435       }
436       # append anything else
437       line <- append line, c
438       line-done?:bool <- equal c, 10/newline
439       break-if line-done?
440       loop
441     }
442     # copy line into 'buffered-out'
443     i:num <- copy 0
444     line-contents:text <- get *line, data:offset
445     max:num <- get *line, length:offset
446     {
447       done?:bool <- greater-or-equal i, max
448       break-if done?
449       c:char <- index *line-contents, i
450       buffered-out <- write buffered-out, c
451       i <- add i, 1
452       loop
453     }
454     {
455       break-unless eof?
456       buffered-out <- close buffered-out
457       return
458     }
459     loop
460   }
461 ]
462 
463 scenario buffer-lines-blocks-until-newline [
464   run [
465     local-scope
466     source:&:source:char, sink:&:sink:char <- new-channel 10/capacity
467     _, buffered-stdin:&:sink:char/buffered-stdin <- new-channel 10/capacity
468     buffered-chan:&:channel:char <- get *buffered-stdin, chan:offset
469     empty?:bool <- channel-empty? buffered-chan
470     assert empty?, [ 
471 F buffer-lines-blocks-until-newline: channel should be empty after init]
472     # buffer stdin into buffered-stdin, try to read from buffered-stdin
473     buffer-routine:num <- start-running buffer-lines, source, buffered-stdin
474     wait-for-routine-to-block buffer-routine
475     empty? <- channel-empty? buffered-chan
476     assert empty?:bool, [ 
477 F buffer-lines-blocks-until-newline: channel should be empty after buffer-lines bring-up]
478     # write 'a'
479     sink <- write sink, 97/a
480     restart buffer-routine
481     wait-for-routine-to-block buffer-routine
482     empty? <- channel-empty? buffered-chan
483     assert empty?:bool, [ 
484 F buffer-lines-blocks-until-newline: channel should be empty after writing 'a']
485     # write 'b'
486     sink <- write sink, 98/b
487     restart buffer-routine
488     wait-for-routine-to-block buffer-routine
489     empty? <- channel-empty? buffered-chan
490     assert empty?:bool, [ 
491 F buffer-lines-blocks-until-newline: channel should be empty after writing 'b']
492     # write newline
493     sink <- write sink, 10/newline
494     restart buffer-routine
495     wait-for-routine-to-block buffer-routine
496     empty? <- channel-empty? buffered-chan
497     data-emitted?:bool <- not empty?
498     assert data-emitted?, [ 
499 F buffer-lines-blocks-until-newline: channel should contain data after writing newline]
500     trace 1, [test], [reached end]
501   ]
502   trace-should-contain [
503     test: reached end
504   ]
505 ]
506 
507 def drain source:&:source:char -> result:text, source:&:source:char [
508   local-scope
509   load-inputs
510   buf:&:buffer:char <- new-buffer 30
511   {
512     c:char, done?:bool <- read source
513     break-if done?
514     buf <- append buf, c
515     loop
516   }
517   result <- buffer-to-array buf
518 ]