1 # Mu synchronizes between routines using channels rather than locks, like
  2 # Erlang and Go.
  3 #
  4 # Key properties of channels:
  5 #
  6 #   a) Writing to a full channel or reading from an empty one will put the
  7 #   current routine in 'waiting' state until the operation can be completed.
  8 #
  9 #   b) Writing to a channel implicitly performs a deep copy. This prevents
 10 #   addresses from being shared between routines, and therefore eliminates all
 11 #   possibility of race conditions.
 12 #
 13 # There's still a narrow window for race conditions: the ingredients passed in
 14 # to 'start-running'. Pass only channels into routines and you should be fine.
 15 # Any other mutable ingredients will require locks.
 16 
 17 scenario channel [
 18   run [
 19   ¦ local-scope
 20   ¦ source:&:source:num, sink:&:sink:num <- new-channel 3/capacity
 21   ¦ sink <- write sink, 34
 22   ¦ 10:num/raw, 11:bool/raw, source <- read source
 23   ]
 24   memory-should-contain [
 25   ¦ 10 <- 34
 26   ¦ 11 <- 0  # read was successful
 27   ]
 28 ]
 29 
 30 container channel:_elem [
 31   lock:bool  # inefficient but simple: serialize all reads as well as writes
 32   first-full:num  # for write
 33   first-free:num  # for read
 34   # A circular buffer contains values from index first-full up to (but not
 35   # including) index first-empty. The reader always modifies it at first-full,
 36   # while the writer always modifies it at first-empty.
 37   data:&:@:_elem
 38 ]
 39 
 40 # Since channels have two ends, and since it's an error to use either end from
 41 # multiple routines, let's distinguish the ends.
 42 
 43 container source:_elem [
 44   chan:&:channel:_elem
 45 ]
 46 
 47 container sink:_elem [
 48   chan:&:channel:_elem
 49 ]
 50 
 51 def new-channel capacity:num -> in:&:source:_elem, out:&:sink:_elem [
 52   local-scope
 53   load-ingredients
 54   result:&:channel:_elem <- new {(channel _elem): type}
 55   *result <- put *result, first-full:offset, 0
 56   *result <- put *result, first-free:offset, 0
 57   capacity <- add capacity, 1  # unused slot for 'full?' below
 58   data:&:@:_elem <- new _elem:type, capacity
 59   *result <- put *result, data:offset, data
 60   in <- new {(source _elem): type}
 61   *in <- put *in, chan:offset, result
 62   out <- new {(sink _elem): type}
 63   *out <- put *out, chan:offset, result
 64 ]
 65 
 66 def write out:&:sink:_elem, val:_elem -> out:&:sink:_elem [
 67   local-scope
 68   load-ingredients
 69   assert out, [write to null channel]
 70   chan:&:channel:_elem <- get *out, chan:offset
 71   <channel-write-initial>
 72   # block until lock is acquired AND queue has room
 73   lock:location <- get-location *chan, lock:offset
 74 #?   $print [write], 10/newline
 75   {
 76 #?     $print [trying to acquire lock for writing], 10/newline
 77   ¦ wait-for-reset-then-set lock
 78 #?     $print [lock acquired for writing], 10/newline
 79   ¦ full?:bool <- channel-full? chan
 80   ¦ break-unless full?
 81 #?     $print [but channel is full; relinquishing lock], 10/newline
 82   ¦ # channel is full; relinquish lock and give a reader the opportunity to
 83   ¦ # create room on it
 84   ¦ reset lock
 85   ¦ current-routine-is-blocked
 86   ¦ switch  # avoid spinlocking
 87   ¦ loop
 88   }
 89   current-routine-is-unblocked
 90 #?   $print [performing write], 10/newline
 91   # store a deep copy of val
 92   circular-buffer:&:@:_elem <- get *chan, data:offset
 93   free:num <- get *chan, first-free:offset
 94   val-copy:_elem <- deep-copy val  # on this instruction rests all Mu's concurrency-safety
 95   *circular-buffer <- put-index *circular-buffer, free, val-copy
 96   # mark its slot as filled
 97   free <- add free, 1
 98   {
 99   ¦ # wrap free around to 0 if necessary
100   ¦ len:num <- length *circular-buffer
101   ¦ at-end?:bool <- greater-or-equal free, len
102   ¦ break-unless at-end?
103   ¦ free <- copy 0
104   }
105   # write back
106   *chan <- put *chan, first-free:offset, free
107 #?   $print [relinquishing lock after writing], 10/newline
108   reset lock
109 ]
110 
111 def read in:&:source:_elem -> result:_elem, eof?:bool, in:&:source:_elem [
112   local-scope
113   load-ingredients
114   assert in, [read on null channel]
115   eof? <- copy 0/false  # default result
116   chan:&:channel:_elem <- get *in, chan:offset
117   # block until lock is acquired AND queue has data
118   lock:location <- get-location *chan, lock:offset
119 #?   $print [read], 10/newline
120   {
121 #?     $print [trying to acquire lock for reading], 10/newline
122   ¦ wait-for-reset-then-set lock
123 #?     $print [lock acquired for reading], 10/newline
124   ¦ empty?:bool <- channel-empty? chan
125   ¦ break-unless empty?
126 #?     $print [but channel is empty; relinquishing lock], 10/newline
127   ¦ # channel is empty; relinquish lock and give a writer the opportunity to
128   ¦ # add to it
129   ¦ reset lock
130   ¦ current-routine-is-blocked
131   ¦ <channel-read-empty>
132   ¦ switch  # avoid spinlocking
133   ¦ loop
134   }
135   current-routine-is-unblocked
136   # pull result off
137   full:num <- get *chan, first-full:offset
138   circular-buffer:&:@:_elem <- get *chan, data:offset
139   result <- index *circular-buffer, full
140   # clear the slot
141   empty:&:_elem <- new _elem:type
142   *circular-buffer <- put-index *circular-buffer, full, *empty
143   # mark its slot as empty
144   full <- add full, 1
145   {
146   ¦ # wrap full around to 0 if necessary
147   ¦ len:num <- length *circular-buffer
148   ¦ at-end?:bool <- greater-or-equal full, len
149   ¦ break-unless at-end?
150   ¦ full <- copy 0
151   }
152   # write back
153   *chan <- put *chan, first-full:offset, full
154 #?   $print [relinquishing lock after reading], 10/newline
155   reset lock
156 ]
157 
158 # todo: create a notion of iterator and iterable so we can read/write whole
159 # aggregates (arrays, lists, ..) of _elems at once.
160 
161 def clear in:&:source:_elem -> in:&:source:_elem [
162   local-scope
163   load-ingredients
164   chan:&:channel:_elem <- get *in, chan:offset
165   {
166   ¦ empty?:bool <- channel-empty? chan
167   ¦ break-if empty?
168   ¦ _, _, in <- read in
169   }
170 ]
171 
172 scenario channel-initialization [
173   run [
174   ¦ local-scope
175   ¦ source:&:source:num <- new-channel 3/capacity
176   ¦ chan:&:channel:num <- get *source, chan:offset
177   ¦ 10:num/raw <- get *chan, first-full:offset
178   ¦ 11:num/raw <- get *chan, first-free:offset
179   ]
180   memory-should-contain [
181   ¦ 10 <- 0  # first-full
182   ¦ 11 <- 0  # first-free
183   ]
184 ]
185 
186 scenario channel-write-increments-free [
187   local-scope
188   _, sink:&:sink:num <- new-channel 3/capacity
189   run [
190   ¦ sink <- write sink, 34
191   ¦ chan:&:channel:num <- get *sink, chan:offset
192   ¦ 10:num/raw <- get *chan, first-full:offset
193   ¦ 11:num/raw <- get *chan, first-free:offset
194   ]
195   memory-should-contain [
196   ¦ 10 <- 0  # first-full
197   ¦ 11 <- 1  # first-free
198   ]
199 ]
200 
201 scenario channel-read-increments-full [
202   local-scope
203   source:&:source:num, sink:&:sink:num <- new-channel 3/capacity
204   sink <- write sink, 34
205   run [
206   ¦ _, _, source <- read source
207   ¦ chan:&:channel:num <- get *source, chan:offset
208   ¦ 10:num/raw <- get *chan, first-full:offset
209   ¦ 11:num/raw <- get *chan, first-free:offset
210   ]
211   memory-should-contain [
212   ¦ 10 <- 1  # first-full
213   ¦ 11 <- 1  # first-free
214   ]
215 ]
216 
217 scenario channel-wrap [
218   local-scope
219   # channel with just 1 slot
220   source:&:source:num, sink:&:sink:num <- new-channel 1/capacity
221   chan:&:channel:num <- get *source, chan:offset
222   # write and read a value
223   sink <- write sink, 34
224   _, _, source <- read source
225   run [
226   ¦ # first-free will now be 1
227   ¦ 10:num/raw <- get *chan, first-free:offset
228   ¦ 11:num/raw <- get *chan, first-free:offset
229   ¦ # write second value, verify that first-free wraps
230   ¦ sink <- write sink, 34
231   ¦ 20:num/raw <- get *chan, first-free:offset
232   ¦ # read second value, verify that first-full wraps
233   ¦ _, _, source <- read source
234   ¦ 30:num/raw <- get *chan, first-full:offset
235   ]
236   memory-should-contain [
237   ¦ 10 <- 1  # first-free after first write
238   ¦ 11 <- 1  # first-full after first read
239   ¦ 20 <- 0  # first-free after second write, wrapped
240   ¦ 30 <- 0  # first-full after second read, wrapped
241   ]
242 ]
243 
244 scenario channel-new-empty-not-full [
245   run [
246   ¦ local-scope
247   ¦ source:&:source:num <- new-channel 3/capacity
248   ¦ chan:&:channel:num <- get *source, chan:offset
249   ¦ 10:bool/raw <- channel-empty? chan
250   ¦ 11:bool/raw <- channel-full? chan
251   ]
252   memory-should-contain [
253   ¦ 10 <- 1  # empty?
254   ¦ 11 <- 0  # full?
255   ]
256 ]
257 
258 scenario channel-write-not-empty [
259   local-scope
260   source:&:source:num, sink:&:sink:num <- new-channel 3/capacity
261   chan:&:channel:num <- get *source, chan:offset
262   run [
263   ¦ sink <- write sink, 34
264   ¦ 10:bool/raw <- channel-empty? chan
265   ¦ 11:bool/raw <- channel-full? chan
266   ]
267   memory-should-contain [
268   ¦ 10 <- 0  # empty?
269   ¦ 11 <- 0  # full?
270   ]
271 ]
272 
273 scenario channel-write-full [
274   local-scope
275   source:&:source:num, sink:&:sink:num <- new-channel 1/capacity
276   chan:&:channel:num <- get *source, chan:offset
277   run [
278   ¦ sink <- write sink, 34
279   ¦ 10:bool/raw <- channel-empty? chan
280   ¦ 11:bool/raw <- channel-full? chan
281   ]
282   memory-should-contain [
283   ¦ 10 <- 0  # empty?
284   ¦ 11 <- 1  # full?
285   ]
286 ]
287 
288 scenario channel-read-not-full [
289   local-scope
290   source:&:source:num, sink:&:sink:num <- new-channel 1/capacity
291   chan:&:channel:num <- get *source, chan:offset
292   sink <- write sink, 34
293   run [
294   ¦ _, _, source <- read source
295   ¦ 10:bool/raw <- channel-empty? chan
296   ¦ 11:bool/raw <- channel-full? chan
297   ]
298   memory-should-contain [
299   ¦ 10 <- 1  # empty?
300   ¦ 11 <- 0  # full?
301   ]
302 ]
303 
304 ## cancelling channels
305 
306 # every channel comes with a boolean signifying if it's been closed
307 # initially this boolean is false
308 container channel:_elem [
309   closed?:bool
310 ]
311 
312 # a channel can be closed from either the source or the sink
313 # both routines can modify the 'closed?' bit, but they can only ever set it, so this is a benign race
314 def close x:&:source:_elem -> x:&:source:_elem [
315   local-scope
316   load-ingredients
317   chan:&:channel:_elem <- get *x, chan:offset
318   *chan <- put *chan, closed?:offset, 1/true
319 ]
320 def close x:&:sink:_elem -> x:&:sink:_elem [
321   local-scope
322   load-ingredients
323   chan:&:channel:_elem <- get *x, chan:offset
324   *chan <- put *chan, closed?:offset, 1/true
325 ]
326 
327 # once a channel is closed from one side, no further operations are expected from that side
328 # if a channel is closed for reading,
329 #   no further writes will be let through
330 # if a channel is closed for writing,
331 #   future reads continue until the channel empties,
332 #   then the channel is also closed for reading
333 after <channel-write-initial> [
334   closed?:bool <- get *chan, closed?:offset
335   return-if closed?
336 ]
337 after <channel-read-empty> [
338   closed?:bool <- get *chan, closed?:offset
339   {
340   ¦ break-unless closed?
341   ¦ empty-result:&:_elem <- new _elem:type
342   ¦ current-routine-is-unblocked
343   ¦ return *empty-result, 1/true
344   }
345 ]
346 
347 ## helpers
348 
349 # An empty channel has first-empty and first-full both at the same value.
350 def channel-empty? chan:&:channel:_elem -> result:bool [
351   local-scope
352   load-ingredients
353   # return chan.first-full == chan.first-free
354   full:num <- get *chan, first-full:offset
355   free:num <- get *chan, first-free:offset
356   result <- equal full, free
357 ]
358 
359 # A full channel has first-empty just before first-full, wasting one slot.
360 # (Other alternatives: https://en.wikipedia.org/wiki/Circular_buffer#Full_.2F_Empty_Buffer_Distinction)
361 def channel-full? chan:&:channel:_elem -> result:bool [
362   local-scope
363   load-ingredients
364   # tmp = chan.first-free + 1
365   tmp:num <- get *chan, first-free:offset
366   tmp <- add tmp, 1
367   {
368   ¦ # if tmp == chan.capacity, tmp = 0
369   ¦ len:num <- capacity chan
370   ¦ at-end?:bool <- greater-or-equal tmp, len
371   ¦ break-unless at-end?
372   ¦ tmp <- copy 0
373   }
374   # return chan.first-full == tmp
375   full:num <- get *chan, first-full:offset
376   result <- equal full, tmp
377 ]
378 
379 def capacity chan:&:channel:_elem -> result:num [
380   local-scope
381   load-ingredients
382   q:&:@:_elem <- get *chan, data:offset
383   result <- length *q
384 ]
385 
386 ## helpers for channels of characters in particular
387 
388 def buffer-lines in:&:source:char, buffered-out:&:sink:char -> buffered-out:&:sink:char, in:&:source:char [
389   local-scope
390   load-ingredients
391   # repeat forever
392   eof?:bool <- copy 0/false
393   {
394   ¦ line:&:buffer <- new-buffer 30
395   ¦ # read characters from 'in' until newline, copy into line
396   ¦ {
397   ¦ ¦ +next-character
398   ¦ ¦ c:char, eof?:bool, in <- read in
399   ¦ ¦ break-if eof?
400   ¦ ¦ # drop a character on backspace
401   ¦ ¦ {
402   ¦ ¦ ¦ # special-case: if it's a backspace
403   ¦ ¦ ¦ backspace?:bool <- equal c, 8
404   ¦ ¦ ¦ break-unless backspace?
405   ¦ ¦ ¦ # drop previous character
406   ¦ ¦ ¦ {
407   ¦ ¦ ¦ ¦ buffer-length:num <- get *line, length:offset
408   ¦ ¦ ¦ ¦ buffer-empty?:bool <- equal buffer-length, 0
409   ¦ ¦ ¦ ¦ break-if buffer-empty?
410   ¦ ¦ ¦ ¦ buffer-length <- subtract buffer-length, 1
411   ¦ ¦ ¦ ¦ *line <- put *line, length:offset, buffer-length
412   ¦ ¦ ¦ }
413   ¦ ¦ ¦ # and don't append this one
414   ¦ ¦ ¦ loop +next-character
415   ¦ ¦ }
416   ¦ ¦ # append anything else
417   ¦ ¦ line <- append line, c
418   ¦ ¦ line-done?:bool <- equal c, 10/newline
419   ¦ ¦ break-if line-done?
420   ¦ ¦ loop
421   ¦ }
422   ¦ # copy line into 'buffered-out'
423   ¦ i:num <- copy 0
424   ¦ line-contents:text <- get *line, data:offset
425   ¦ max:num <- get *line, length:offset
426   ¦ {
427   ¦ ¦ done?:bool <- greater-or-equal i, max
428   ¦ ¦ break-if done?
429   ¦ ¦ c:char <- index *line-contents, i
430   ¦ ¦ buffered-out <- write buffered-out, c
431   ¦ ¦ i <- add i, 1
432   ¦ ¦ loop
433   ¦ }
434   ¦ {
435   ¦ ¦ break-unless eof?
436   ¦ ¦ buffered-out <- close buffered-out
437   ¦ ¦ return
438   ¦ }
439   ¦ loop
440   }
441 ]
442 
443 scenario buffer-lines-blocks-until-newline [
444   run [
445   ¦ local-scope
446   ¦ source:&:source:char, sink:&:sink:char <- new-channel 10/capacity
447   ¦ _, buffered-stdin:&:sink:char/buffered-stdin <- new-channel 10/capacity
448   ¦ buffered-chan:&:channel:char <- get *buffered-stdin, chan:offset
449   ¦ empty?:bool <- channel-empty? buffered-chan
450   ¦ assert empty?, [ 
451 F buffer-lines-blocks-until-newline: channel should be empty after init]
452   ¦ # buffer stdin into buffered-stdin, try to read from buffered-stdin
453   ¦ buffer-routine:num <- start-running buffer-lines, source, buffered-stdin
454   ¦ wait-for-routine-to-block buffer-routine
455   ¦ empty? <- channel-empty? buffered-chan
456   ¦ assert empty?:bool, [ 
457 F buffer-lines-blocks-until-newline: channel should be empty after buffer-lines bring-up]
458   ¦ # write 'a'
459   ¦ sink <- write sink, 97/a
460   ¦ restart buffer-routine
461   ¦ wait-for-routine-to-block buffer-routine
462   ¦ empty? <- channel-empty? buffered-chan
463   ¦ assert empty?:bool, [ 
464 F buffer-lines-blocks-until-newline: channel should be empty after writing 'a']
465   ¦ # write 'b'
466   ¦ sink <- write sink, 98/b
467   ¦ restart buffer-routine
468   ¦ wait-for-routine-to-block buffer-routine
469   ¦ empty? <- channel-empty? buffered-chan
470   ¦ assert empty?:bool, [ 
471 F buffer-lines-blocks-until-newline: channel should be empty after writing 'b']
472   ¦ # write newline
473   ¦ sink <- write sink, 10/newline
474   ¦ restart buffer-routine
475   ¦ wait-for-routine-to-block buffer-routine
476   ¦ empty? <- channel-empty? buffered-chan
477   ¦ data-emitted?:bool <- not empty?
478   ¦ assert data-emitted?, [ 
479 F buffer-lines-blocks-until-newline: channel should contain data after writing newline]
480   ¦ trace 1, [test], [reached end]
481   ]
482   trace-should-contain [
483   ¦ test: reached end
484   ]
485 ]
486 
487 def drain source:&:source:char -> result:text, source:&:source:char [
488   local-scope
489   load-ingredients
490   buf:&:buffer <- new-buffer 30
491   {
492   ¦ c:char, done?:bool <- read source
493   ¦ break-if done?
494   ¦ buf <- append buf, c
495   ¦ loop
496   }
497   result <- buffer-to-array buf
498 ]