1 # Mu synchronizes between routines using channels rather than locks, like
  2 # Erlang and Go.
  3 #
  4 # Key properties of channels:
  5 #
  6 #   a) Writing to a full channel or reading from an empty one will put the
  7 #   current routine in 'waiting' state until the operation can be completed.
  8 #
  9 #   b) Writing to a channel implicitly performs a deep copy. This prevents
 10 #   addresses from being shared between routines, and therefore eliminates all
 11 #   possibility of race conditions.
 12 #
 13 # There's still a narrow window for race conditions: the ingredients passed in
 14 # to 'start-running'. Pass only channels into routines and you should be fine.
 15 # Any other mutable ingredients will require locks.
 16 
 17 scenario channel [
 18   run [
 19   ¦ local-scope
 20   ¦ source:&:source:num, sink:&:sink:num <- new-channel 3/capacity
 21   ¦ sink <- write sink, 34
 22   ¦ 10:num/raw, 11:bool/raw, source <- read source
 23   ]
 24   memory-should-contain [
 25   ¦ 10 <- 34
 26   ¦ 11 <- 0  # read was successful
 27   ]
 28 ]
 29 
 30 container channel:_elem [
 31   lock:bool  # inefficient but simple: serialize all reads as well as writes
 32   first-full:num  # for write
 33   first-free:num  # for read
 34   # A circular buffer contains values from index first-full up to (but not
 35   # including) index first-free. The reader always modifies it at first-full,
 36   # while the writer always modifies it at first-free.
 37   data:&:@:_elem
 38 ]
 39 
 40 # Since channels have two ends, and since it's an error to use either end from
 41 # multiple routines, let's distinguish the ends.
 42 
 43 container source:_elem [
 44   chan:&:channel:_elem
 45 ]
 46 
 47 container sink:_elem [
 48   chan:&:channel:_elem
 49 ]
 50 
 51 def new-channel capacity:num -> in:&:source:_elem, out:&:sink:_elem [
 52   local-scope
 53   load-ingredients
 54   result:&:channel:_elem <- new {(channel _elem): type}
 55   *result <- put *result, first-full:offset, 0
 56   *result <- put *result, first-free:offset, 0
 57   capacity <- add capacity, 1  # unused slot for 'full?' below
 58   data:&:@:_elem <- new _elem:type, capacity
 59   *result <- put *result, data:offset, data
 60   in <- new {(source _elem): type}
 61   *in <- put *in, chan:offset, result
 62   out <- new {(sink _elem): type}
 63   *out <- put *out, chan:offset, result
 64 ]
 65 
 66 # write a value to a channel
 67 def write out:&:sink:_elem, val:_elem -> out:&:sink:_elem [
 68   local-scope
 69   load-ingredients
 70   assert out, [write to null channel]
 71   chan:&:channel:_elem <- get *out, chan:offset
 72   <channel-write-initial>
 73   # block until lock is acquired AND queue has room
 74   lock:location <- get-location *chan, lock:offset
 75 #?   $print [write], 10/newline
 76   {
 77 #?     $print [trying to acquire lock for writing], 10/newline
 78   ¦ wait-for-reset-then-set lock
 79 #?     $print [lock acquired for writing], 10/newline
 80   ¦ full?:bool <- channel-full? chan
 81   ¦ break-unless full?
 82 #?     $print [but channel is full; relinquishing lock], 10/newline
 83   ¦ # channel is full; relinquish lock and give a reader the opportunity to
 84   ¦ # create room on it
 85   ¦ reset lock
 86   ¦ current-routine-is-blocked
 87   ¦ switch  # avoid spinlocking
 88   ¦ loop
 89   }
 90   current-routine-is-unblocked
 91 #?   $print [performing write], 10/newline
 92   # store a deep copy of val
 93   circular-buffer:&:@:_elem <- get *chan, data:offset
 94   free:num <- get *chan, first-free:offset
 95   val-copy:_elem <- deep-copy val  # on this instruction rests all Mu's concurrency-safety
 96   *circular-buffer <- put-index *circular-buffer, free, val-copy
 97   # mark its slot as filled
 98   free <- add free, 1
 99   {
100   ¦ # wrap free around to 0 if necessary
101   ¦ len:num <- length *circular-buffer
102   ¦ at-end?:bool <- greater-or-equal free, len
103   ¦ break-unless at-end?
104   ¦ free <- copy 0
105   }
106   # write back
107   *chan <- put *chan, first-free:offset, free
108 #?   $print [relinquishing lock after writing], 10/newline
109   reset lock
110 ]
111 
112 # read a value from a channel
113 def read in:&:source:_elem -> result:_elem, eof?:bool, in:&:source:_elem [
114   local-scope
115   load-ingredients
116   assert in, [read on null channel]
117   eof? <- copy 0/false  # default result
118   chan:&:channel:_elem <- get *in, chan:offset
119   # block until lock is acquired AND queue has data
120   lock:location <- get-location *chan, lock:offset
121 #?   $print [read], 10/newline
122   {
123 #?     $print [trying to acquire lock for reading], 10/newline
124   ¦ wait-for-reset-then-set lock
125 #?     $print [lock acquired for reading], 10/newline
126   ¦ empty?:bool <- channel-empty? chan
127   ¦ break-unless empty?
128 #?     $print [but channel is empty; relinquishing lock], 10/newline
129   ¦ # channel is empty; relinquish lock and give a writer the opportunity to
130   ¦ # add to it
131   ¦ reset lock
132   ¦ current-routine-is-blocked
133   ¦ <channel-read-empty>
134   ¦ switch  # avoid spinlocking
135   ¦ loop
136   }
137   current-routine-is-unblocked
138   # pull result off
139   full:num <- get *chan, first-full:offset
140   circular-buffer:&:@:_elem <- get *chan, data:offset
141   result <- index *circular-buffer, full
142   # clear the slot
143   empty:&:_elem <- new _elem:type
144   *circular-buffer <- put-index *circular-buffer, full, *empty
145   # mark its slot as empty
146   full <- add full, 1
147   {
148   ¦ # wrap full around to 0 if necessary
149   ¦ len:num <- length *circular-buffer
150   ¦ at-end?:bool <- greater-or-equal full, len
151   ¦ break-unless at-end?
152   ¦ full <- copy 0
153   }
154   # write back
155   *chan <- put *chan, first-full:offset, full
156 #?   $print [relinquishing lock after reading], 10/newline
157   reset lock
158 ]
159 
160 # todo: create a notion of iterator and iterable so we can read/write whole
161 # aggregates (arrays, lists, ..) of _elems at once.
162 
163 scenario channel-initialization [
164   run [
165   ¦ local-scope
166   ¦ source:&:source:num <- new-channel 3/capacity
167   ¦ chan:&:channel:num <- get *source, chan:offset
168   ¦ 10:num/raw <- get *chan, first-full:offset
169   ¦ 11:num/raw <- get *chan, first-free:offset
170   ]
171   memory-should-contain [
172   ¦ 10 <- 0  # first-full
173   ¦ 11 <- 0  # first-free
174   ]
175 ]
176 
177 scenario channel-write-increments-free [
178   local-scope
179   _, sink:&:sink:num <- new-channel 3/capacity
180   run [
181   ¦ sink <- write sink, 34
182   ¦ chan:&:channel:num <- get *sink, chan:offset
183   ¦ 10:num/raw <- get *chan, first-full:offset
184   ¦ 11:num/raw <- get *chan, first-free:offset
185   ]
186   memory-should-contain [
187   ¦ 10 <- 0  # first-full
188   ¦ 11 <- 1  # first-free
189   ]
190 ]
191 
192 scenario channel-read-increments-full [
193   local-scope
194   source:&:source:num, sink:&:sink:num <- new-channel 3/capacity
195   sink <- write sink, 34
196   run [
197   ¦ _, _, source <- read source
198   ¦ chan:&:channel:num <- get *source, chan:offset
199   ¦ 10:num/raw <- get *chan, first-full:offset
200   ¦ 11:num/raw <- get *chan, first-free:offset
201   ]
202   memory-should-contain [
203   ¦ 10 <- 1  # first-full
204   ¦ 11 <- 1  # first-free
205   ]
206 ]
207 
208 scenario channel-wrap [
209   local-scope
210   # channel with just 1 slot
211   source:&:source:num, sink:&:sink:num <- new-channel 1/capacity
212   chan:&:channel:num <- get *source, chan:offset
213   # write and read a value
214   sink <- write sink, 34
215   _, _, source <- read source
216   run [
217   ¦ # first-free will now be 1
218   ¦ 10:num/raw <- get *chan, first-free:offset
219   ¦ 11:num/raw <- get *chan, first-free:offset
220   ¦ # write second value, verify that first-free wraps
221   ¦ sink <- write sink, 34
222   ¦ 20:num/raw <- get *chan, first-free:offset
223   ¦ # read second value, verify that first-full wraps
224   ¦ _, _, source <- read source
225   ¦ 30:num/raw <- get *chan, first-full:offset
226   ]
227   memory-should-contain [
228   ¦ 10 <- 1  # first-free after first write
229   ¦ 11 <- 1  # first-full after first read
230   ¦ 20 <- 0  # first-free after second write, wrapped
231   ¦ 30 <- 0  # first-full after second read, wrapped
232   ]
233 ]
234 
235 scenario channel-new-empty-not-full [
236   run [
237   ¦ local-scope
238   ¦ source:&:source:num <- new-channel 3/capacity
239   ¦ chan:&:channel:num <- get *source, chan:offset
240   ¦ 10:bool/raw <- channel-empty? chan
241   ¦ 11:bool/raw <- channel-full? chan
242   ]
243   memory-should-contain [
244   ¦ 10 <- 1  # empty?
245   ¦ 11 <- 0  # full?
246   ]
247 ]
248 
249 scenario channel-write-not-empty [
250   local-scope
251   source:&:source:num, sink:&:sink:num <- new-channel 3/capacity
252   chan:&:channel:num <- get *source, chan:offset
253   run [
254   ¦ sink <- write sink, 34
255   ¦ 10:bool/raw <- channel-empty? chan
256   ¦ 11:bool/raw <- channel-full? chan
257   ]
258   memory-should-contain [
259   ¦ 10 <- 0  # empty?
260   ¦ 11 <- 0  # full?
261   ]
262 ]
263 
264 scenario channel-write-full [
265   local-scope
266   source:&:source:num, sink:&:sink:num <- new-channel 1/capacity
267   chan:&:channel:num <- get *source, chan:offset
268   run [
269   ¦ sink <- write sink, 34
270   ¦ 10:bool/raw <- channel-empty? chan
271   ¦ 11:bool/raw <- channel-full? chan
272   ]
273   memory-should-contain [
274   ¦ 10 <- 0  # empty?
275   ¦ 11 <- 1  # full?
276   ]
277 ]
278 
279 scenario channel-read-not-full [
280   local-scope
281   source:&:source:num, sink:&:sink:num <- new-channel 1/capacity
282   chan:&:channel:num <- get *source, chan:offset
283   sink <- write sink, 34
284   run [
285   ¦ _, _, source <- read source
286   ¦ 10:bool/raw <- channel-empty? chan
287   ¦ 11:bool/raw <- channel-full? chan
288   ]
289   memory-should-contain [
290   ¦ 10 <- 1  # empty?
291   ¦ 11 <- 0  # full?
292   ]
293 ]
294 
295 scenario channel-clear [
296   local-scope
297   # create a channel with a few items
298   source:&:source:num, sink:&:sink:num <- new-channel 3/capacity
299   chan:&:channel:num <- get *sink, chan:offset
300   write sink, 30
301   write sink, 31
302   write sink, 32
303   run [
304   ¦ clear source
305   ¦ 10:bool/raw <- channel-empty? chan
306   ]
307   memory-should-contain [
308   ¦ 10 <- 1  # after the call to 'clear', the channel should be empty
309   ]
310 ]
311 
312 def clear in:&:source:_elem -> in:&:source:_elem [
313   local-scope
314   load-ingredients
315   chan:&:channel:_elem <- get *in, chan:offset
316   {
317   ¦ empty?:bool <- channel-empty? chan
318   ¦ break-if empty?
319   ¦ _, _, in <- read in
320   ¦ loop
321   }
322 ]
323 
324 ## cancelling channels
325 
326 # every channel comes with a boolean signifying if it's been closed
327 # initially this boolean is false
328 container channel:_elem [
329   closed?:bool
330 ]
331 
332 # a channel can be closed from either the source or the sink
333 # both routines can modify the 'closed?' bit, but they can only ever set it, so this is a benign race
334 def close x:&:source:_elem -> x:&:source:_elem [
335   local-scope
336   load-ingredients
337   chan:&:channel:_elem <- get *x, chan:offset
338   *chan <- put *chan, closed?:offset, 1/true
339 ]
340 def close x:&:sink:_elem -> x:&:sink:_elem [
341   local-scope
342   load-ingredients
343   chan:&:channel:_elem <- get *x, chan:offset
344   *chan <- put *chan, closed?:offset, 1/true
345 ]
346 
347 # once a channel is closed from one side, no further operations are expected from that side
348 # if a channel is closed for reading,
349 #   no further writes will be let through
350 # if a channel is closed for writing,
351 #   future reads continue until the channel empties,
352 #   then the channel is also closed for reading
353 after <channel-write-initial> [
354   closed?:bool <- get *chan, closed?:offset
355   return-if closed?
356 ]
357 after <channel-read-empty> [
358   closed?:bool <- get *chan, closed?:offset
359   {
360   ¦ break-unless closed?
361   ¦ empty-result:&:_elem <- new _elem:type
362   ¦ current-routine-is-unblocked
363   ¦ return *empty-result, 1/true
364   }
365 ]
366 
367 ## helpers
368 
369 # An empty channel has first-free and first-full both at the same value.
370 def channel-empty? chan:&:channel:_elem -> result:bool [
371   local-scope
372   load-ingredients
373   # return chan.first-full == chan.first-free
374   full:num <- get *chan, first-full:offset
375   free:num <- get *chan, first-free:offset
376   result <- equal full, free
377 ]
378 
379 # A full channel has first-free just before first-full, wasting one slot.
380 # (Other alternatives: https://www.snellman.net/blog/archive/2016-12-13-ring-buffers)
381 def channel-full? chan:&:channel:_elem -> result:bool [
382   local-scope
383   load-ingredients
384   # tmp = chan.first-free + 1
385   tmp:num <- get *chan, first-free:offset
386   tmp <- add tmp, 1
387   {
388   ¦ # if tmp == chan.capacity, tmp = 0
389   ¦ len:num <- capacity chan
390   ¦ at-end?:bool <- greater-or-equal tmp, len
391   ¦ break-unless at-end?
392   ¦ tmp <- copy 0
393   }
394   # return chan.first-full == tmp
395   full:num <- get *chan, first-full:offset
396   result <- equal full, tmp
397 ]
398 
399 def capacity chan:&:channel:_elem -> result:num [
400   local-scope
401   load-ingredients
402   q:&:@:_elem <- get *chan, data:offset
403   result <- length *q
404 ]
405 
406 ## helpers for channels of characters in particular
407 
408 def buffer-lines in:&:source:char, buffered-out:&:sink:char -> buffered-out:&:sink:char, in:&:source:char [
409   local-scope
410   load-ingredients
411   # repeat forever
412   eof?:bool <- copy 0/false
413   {
414   ¦ line:&:buffer:char <- new-buffer 30
415   ¦ # read characters from 'in' until newline, copy into line
416   ¦ {
417   ¦ ¦ +next-character
418   ¦ ¦ c:char, eof?:bool, in <- read in
419   ¦ ¦ break-if eof?
420   ¦ ¦ # drop a character on backspace
421   ¦ ¦ {
422   ¦ ¦ ¦ # special-case: if it's a backspace
423   ¦ ¦ ¦ backspace?:bool <- equal c, 8
424   ¦ ¦ ¦ break-unless backspace?
425   ¦ ¦ ¦ # drop previous character
426   ¦ ¦ ¦ {
427   ¦ ¦ ¦ ¦ buffer-length:num <- get *line, length:offset
428   ¦ ¦ ¦ ¦ buffer-empty?:bool <- equal buffer-length, 0
429   ¦ ¦ ¦ ¦ break-if buffer-empty?
430   ¦ ¦ ¦ ¦ buffer-length <- subtract buffer-length, 1
431   ¦ ¦ ¦ ¦ *line <- put *line, length:offset, buffer-length
432   ¦ ¦ ¦ }
433   ¦ ¦ ¦ # and don't append this one
434   ¦ ¦ ¦ loop +next-character
435   ¦ ¦ }
436   ¦ ¦ # append anything else
437   ¦ ¦ line <- append line, c
438   ¦ ¦ line-done?:bool <- equal c, 10/newline
439   ¦ ¦ break-if line-done?
440   ¦ ¦ loop
441   ¦ }
442   ¦ # copy line into 'buffered-out'
443   ¦ i:num <- copy 0
444   ¦ line-contents:text <- get *line, data:offset
445   ¦ max:num <- get *line, length:offset
446   ¦ {
447   ¦ ¦ done?:bool <- greater-or-equal i, max
448   ¦ ¦ break-if done?
449   ¦ ¦ c:char <- index *line-contents, i
450   ¦ ¦ buffered-out <- write buffered-out, c
451   ¦ ¦ i <- add i, 1
452   ¦ ¦ loop
453   ¦ }
454   ¦ {
455   ¦ ¦ break-unless eof?
456   ¦ ¦ buffered-out <- close buffered-out
457   ¦ ¦ return
458   ¦ }
459   ¦ loop
460   }
461 ]
462 
463 scenario buffer-lines-blocks-until-newline [
464   run [
465   ¦ local-scope
466   ¦ source:&:source:char, sink:&:sink:char <- new-channel 10/capacity
467   ¦ _, buffered-stdin:&:sink:char/buffered-stdin <- new-channel 10/capacity
468   ¦ buffered-chan:&:channel:char <- get *buffered-stdin, chan:offset
469   ¦ empty?:bool <- channel-empty? buffered-chan
470   ¦ assert empty?, [ 
471 F buffer-lines-blocks-until-newline: channel should be empty after init]
472   ¦ # buffer stdin into buffered-stdin, try to read from buffered-stdin
473   ¦ buffer-routine:num <- start-running buffer-lines, source, buffered-stdin
474   ¦ wait-for-routine-to-block buffer-routine
475   ¦ empty? <- channel-empty? buffered-chan
476   ¦ assert empty?:bool, [ 
477 F buffer-lines-blocks-until-newline: channel should be empty after buffer-lines bring-up]
478   ¦ # write 'a'
479   ¦ sink <- write sink, 97/a
480   ¦ restart buffer-routine
481   ¦ wait-for-routine-to-block buffer-routine
482   ¦ empty? <- channel-empty? buffered-chan
483   ¦ assert empty?:bool, [ 
484 F buffer-lines-blocks-until-newline: channel should be empty after writing 'a']
485   ¦ # write 'b'
486   ¦ sink <- write sink, 98/b
487   ¦ restart buffer-routine
488   ¦ wait-for-routine-to-block buffer-routine
489   ¦ empty? <- channel-empty? buffered-chan
490   ¦ assert empty?:bool, [ 
491 F buffer-lines-blocks-until-newline: channel should be empty after writing 'b']
492   ¦ # write newline
493   ¦ sink <- write sink, 10/newline
494   ¦ restart buffer-routine
495   ¦ wait-for-routine-to-block buffer-routine
496   ¦ empty? <- channel-empty? buffered-chan
497   ¦ data-emitted?:bool <- not empty?
498   ¦ assert data-emitted?, [ 
499 F buffer-lines-blocks-until-newline: channel should contain data after writing newline]
500   ¦ trace 1, [test], [reached end]
501   ]
502   trace-should-contain [
503   ¦ test: reached end
504   ]
505 ]
506 
507 def drain source:&:source:char -> result:text, source:&:source:char [
508   local-scope
509   load-ingredients
510   buf:&:buffer:char <- new-buffer 30
511   {
512   ¦ c:char, done?:bool <- read source
513   ¦ break-if done?
514   ¦ buf <- append buf, c
515   ¦ loop
516   }
517   result <- buffer-to-array buf
518 ]