1 # Mu synchronizes between routines using channels rather than locks, like
  2 # Erlang and Go.
  3 #
  4 # Key properties of channels:
  5 #
  6 #   a) Writing to a full channel or reading from an empty one will put the
  7 #   current routine in 'waiting' state until the operation can be completed.
  8 #
  9 #   b) Writing to a channel implicitly performs a deep copy. This prevents
 10 #   addresses from being shared between routines, and therefore eliminates all
 11 #   possibility of race conditions.
 12 #
 13 # There's still a narrow window for race conditions: the ingredients passed in
 14 # to 'start-running'. Pass only channels into routines and you should be fine.
 15 # Any other mutable ingredients will require locks.
 16 
 17 scenario channel [
 18   run [
 19     local-scope
 20     source:&:source:num, sink:&:sink:num <- new-channel 3/capacity
 21     sink <- write sink, 34
 22     10:num/raw, 11:bool/raw, source <- read source
 23   ]
 24   memory-should-contain [
 25     10 <- 34
 26     11 <- 0  # read was successful
 27   ]
 28 ]
 29 
 30 container channel:_elem [
 31   lock:bool  # inefficient but simple: serialize all reads as well as writes
 32   first-full:num  # for write
 33   first-free:num  # for read
 34   # A circular buffer contains values from index first-full up to (but not
 35   # including) index first-empty. The reader always modifies it at first-full,
 36   # while the writer always modifies it at first-empty.
 37   data:&:@:_elem
 38 ]
 39 
 40 # Since channels have two ends, and since it's an error to use either end from
 41 # multiple routines, let's distinguish the ends.
 42 
 43 container source:_elem [
 44   chan:&:channel:_elem
 45 ]
 46 
 47 container sink:_elem [
 48   chan:&:channel:_elem
 49 ]
 50 
 51 def new-channel capacity:num -> in:&:source:_elem, out:&:sink:_elem [
 52   local-scope
 53   load-ingredients
 54   result:&:channel:_elem <- new {(channel _elem): type}
 55   *result <- put *result, first-full:offset, 0
 56   *result <- put *result, first-free:offset, 0
 57   capacity <- add capacity, 1  # unused slot for 'full?' below
 58   data:&:@:_elem <- new _elem:type, capacity
 59   *result <- put *result, data:offset, data
 60   in <- new {(source _elem): type}
 61   *in <- put *in, chan:offset, result
 62   out <- new {(sink _elem): type}
 63   *out <- put *out, chan:offset, result
 64 ]
 65 
 66 def write out:&:sink:_elem, val:_elem -> out:&:sink:_elem [
 67   local-scope
 68   load-ingredients
 69   assert out, [write to null channel]
 70   chan:&:channel:_elem <- get *out, chan:offset
 71   <channel-write-initial>
 72   # block until lock is acquired AND queue has room
 73   lock:location <- get-location *chan, lock:offset
 74 #?   $print [write], 10/newline
 75   {
 76 #?     $print [trying to acquire lock for writing], 10/newline
 77     wait-for-reset-then-set lock
 78 #?     $print [lock acquired for writing], 10/newline
 79     full?:bool <- channel-full? chan
 80     break-unless full?
 81 #?     $print [but channel is full; relinquishing lock], 10/newline
 82     # channel is full; relinquish lock and give a reader the opportunity to
 83     # create room on it
 84     reset lock
 85     current-routine-is-blocked
 86     switch  # avoid spinlocking
 87     loop
 88   }
 89   current-routine-is-unblocked
 90 #?   $print [performing write], 10/newline
 91   # store a deep copy of val
 92   circular-buffer:&:@:_elem <- get *chan, data:offset
 93   free:num <- get *chan, first-free:offset
 94   val-copy:_elem <- deep-copy val  # on this instruction rests all Mu's concurrency-safety
 95   *circular-buffer <- put-index *circular-buffer, free, val-copy
 96   # mark its slot as filled
 97   free <- add free, 1
 98   {
 99     # wrap free around to 0 if necessary
100     len:num <- length *circular-buffer
101     at-end?:bool <- greater-or-equal free, len
102     break-unless at-end?
103     free <- copy 0
104   }
105   # write back
106   *chan <- put *chan, first-free:offset, free
107 #?   $print [relinquishing lock after writing], 10/newline
108   reset lock
109 ]
110 
111 def read in:&:source:_elem -> result:_elem, eof?:bool, in:&:source:_elem [
112   local-scope
113   load-ingredients
114   assert in, [read on null channel]
115   eof? <- copy 0/false  # default result
116   chan:&:channel:_elem <- get *in, chan:offset
117   # block until lock is acquired AND queue has data
118   lock:location <- get-location *chan, lock:offset
119 #?   $print [read], 10/newline
120   {
121 #?     $print [trying to acquire lock for reading], 10/newline
122     wait-for-reset-then-set lock
123 #?     $print [lock acquired for reading], 10/newline
124     empty?:bool <- channel-empty? chan
125     break-unless empty?
126 #?     $print [but channel is empty; relinquishing lock], 10/newline
127     # channel is empty; relinquish lock and give a writer the opportunity to
128     # add to it
129     reset lock
130     current-routine-is-blocked
131     <channel-read-empty>
132     switch  # avoid spinlocking
133     loop
134   }
135   current-routine-is-unblocked
136   # pull result off
137   full:num <- get *chan, first-full:offset
138   circular-buffer:&:@:_elem <- get *chan, data:offset
139   result <- index *circular-buffer, full
140   # clear the slot
141   empty:&:_elem <- new _elem:type
142   *circular-buffer <- put-index *circular-buffer, full, *empty
143   # mark its slot as empty
144   full <- add full, 1
145   {
146     # wrap full around to 0 if necessary
147     len:num <- length *circular-buffer
148     at-end?:bool <- greater-or-equal full, len
149     break-unless at-end?
150     full <- copy 0
151   }
152   # write back
153   *chan <- put *chan, first-full:offset, full
154 #?   $print [relinquishing lock after reading], 10/newline
155   reset lock
156 ]
157 
158 # todo: create a notion of iterator and iterable so we can read/write whole
159 # aggregates (arrays, lists, ..) of _elems at once.
160 
161 def clear in:&:source:_elem -> in:&:source:_elem [
162   local-scope
163   load-ingredients
164   chan:&:channel:_elem <- get *in, chan:offset
165   {
166     empty?:bool <- channel-empty? chan
167     break-if empty?
168     _, _, in <- read in
169   }
170 ]
171 
172 scenario channel-initialization [
173   run [
174     local-scope
175     source:&:source:num <- new-channel 3/capacity
176     chan:&:channel:num <- get *source, chan:offset
177     10:num/raw <- get *chan, first-full:offset
178     11:num/raw <- get *chan, first-free:offset
179   ]
180   memory-should-contain [
181     10 <- 0  # first-full
182     11 <- 0  # first-free
183   ]
184 ]
185 
186 scenario channel-write-increments-free [
187   local-scope
188   _, sink:&:sink:num <- new-channel 3/capacity
189   run [
190     sink <- write sink, 34
191     chan:&:channel:num <- get *sink, chan:offset
192     10:num/raw <- get *chan, first-full:offset
193     11:num/raw <- get *chan, first-free:offset
194   ]
195   memory-should-contain [
196     10 <- 0  # first-full
197     11 <- 1  # first-free
198   ]
199 ]
200 
201 scenario channel-read-increments-full [
202   local-scope
203   source:&:source:num, sink:&:sink:num <- new-channel 3/capacity
204   sink <- write sink, 34
205   run [
206     _, _, source <- read source
207     chan:&:channel:num <- get *source, chan:offset
208     10:num/raw <- get *chan, first-full:offset
209     11:num/raw <- get *chan, first-free:offset
210   ]
211   memory-should-contain [
212     10 <- 1  # first-full
213     11 <- 1  # first-free
214   ]
215 ]
216 
217 scenario channel-wrap [
218   local-scope
219   # channel with just 1 slot
220   source:&:source:num, sink:&:sink:num <- new-channel 1/capacity
221   chan:&:channel:num <- get *source, chan:offset
222   # write and read a value
223   sink <- write sink, 34
224   _, _, source <- read source
225   run [
226     # first-free will now be 1
227     10:num/raw <- get *chan, first-free:offset
228     11:num/raw <- get *chan, first-free:offset
229     # write second value, verify that first-free wraps
230     sink <- write sink, 34
231     20:num/raw <- get *chan, first-free:offset
232     # read second value, verify that first-full wraps
233     _, _, source <- read source
234     30:num/raw <- get *chan, first-full:offset
235   ]
236   memory-should-contain [
237     10 <- 1  # first-free after first write
238     11 <- 1  # first-full after first read
239     20 <- 0  # first-free after second write, wrapped
240     30 <- 0  # first-full after second read, wrapped
241   ]
242 ]
243 
244 scenario channel-new-empty-not-full [
245   run [
246     local-scope
247     source:&:source:num <- new-channel 3/capacity
248     chan:&:channel:num <- get *source, chan:offset
249     10:bool/raw <- channel-empty? chan
250     11:bool/raw <- channel-full? chan
251   ]
252   memory-should-contain [
253     10 <- 1  # empty?
254     11 <- 0  # full?
255   ]
256 ]
257 
258 scenario channel-write-not-empty [
259   local-scope
260   source:&:source:num, sink:&:sink:num <- new-channel 3/capacity
261   chan:&:channel:num <- get *source, chan:offset
262   run [
263     sink <- write sink, 34
264     10:bool/raw <- channel-empty? chan
265     11:bool/raw <- channel-full? chan
266   ]
267   memory-should-contain [
268     10 <- 0  # empty?
269     11 <- 0  # full?
270   ]
271 ]
272 
273 scenario channel-write-full [
274   local-scope
275   source:&:source:num, sink:&:sink:num <- new-channel 1/capacity
276   chan:&:channel:num <- get *source, chan:offset
277   run [
278     sink <- write sink, 34
279     10:bool/raw <- channel-empty? chan
280     11:bool/raw <- channel-full? chan
281   ]
282   memory-should-contain [
283     10 <- 0  # empty?
284     11 <- 1  # full?
285   ]
286 ]
287 
288 scenario channel-read-not-full [
289   local-scope
290   source:&:source:num, sink:&:sink:num <- new-channel 1/capacity
291   chan:&:channel:num <- get *source, chan:offset
292   sink <- write sink, 34
293   run [
294     _, _, source <- read source
295     10:bool/raw <- channel-empty? chan
296     11:bool/raw <- channel-full? chan
297   ]
298   memory-should-contain [
299     10 <- 1  # empty?
300     11 <- 0  # full?
301   ]
302 ]
303 
304 ## cancelling channels
305 
306 # every channel comes with a boolean signifying if it's been closed
307 # initially this boolean is false
308 container channel:_elem [
309   closed?:bool
310 ]
311 
312 # a channel can be closed from either the source or the sink
313 # both routines can modify the 'closed?' bit, but they can only ever set it, so this is a benign race
314 def close x:&:source:_elem -> x:&:source:_elem [
315   local-scope
316   load-ingredients
317   chan:&:channel:_elem <- get *x, chan:offset
318   *chan <- put *chan, closed?:offset, 1/true
319 ]
320 def close x:&:sink:_elem -> x:&:sink:_elem [
321   local-scope
322   load-ingredients
323   chan:&:channel:_elem <- get *x, chan:offset
324   *chan <- put *chan, closed?:offset, 1/true
325 ]
326 
327 # once a channel is closed from one side, no further operations are expected from that side
328 # if a channel is closed for reading,
329 #   no further writes will be let through
330 # if a channel is closed for writing,
331 #   future reads continue until the channel empties,
332 #   then the channel is also closed for reading
333 after <channel-write-initial> [
334   closed?:bool <- get *chan, closed?:offset
335   return-if closed?
336 ]
337 after <channel-read-empty> [
338   closed?:bool <- get *chan, closed?:offset
339   {
340     break-unless closed?
341     empty-result:&:_elem <- new _elem:type
342     current-routine-is-unblocked
343     return *empty-result, 1/true
344   }
345 ]
346 
347 ## helpers
348 
349 # An empty channel has first-empty and first-full both at the same value.
350 def channel-empty? chan:&:channel:_elem -> result:bool [
351   local-scope
352   load-ingredients
353   # return chan.first-full == chan.first-free
354   full:num <- get *chan, first-full:offset
355   free:num <- get *chan, first-free:offset
356   result <- equal full, free
357 ]
358 
359 # A full channel has first-empty just before first-full, wasting one slot.
360 # (Other alternatives: https://en.wikipedia.org/wiki/Circular_buffer#Full_.2F_Empty_Buffer_Distinction)
361 def channel-full? chan:&:channel:_elem -> result:bool [
362   local-scope
363   load-ingredients
364   # tmp = chan.first-free + 1
365   tmp:num <- get *chan, first-free:offset
366   tmp <- add tmp, 1
367   {
368     # if tmp == chan.capacity, tmp = 0
369     len:num <- capacity chan
370     at-end?:bool <- greater-or-equal tmp, len
371     break-unless at-end?
372     tmp <- copy 0
373   }
374   # return chan.first-full == tmp
375   full:num <- get *chan, first-full:offset
376   result <- equal full, tmp
377 ]
378 
379 def capacity chan:&:channel:_elem -> result:num [
380   local-scope
381   load-ingredients
382   q:&:@:_elem <- get *chan, data:offset
383   result <- length *q
384 ]
385 
386 ## helpers for channels of characters in particular
387 
388 def buffer-lines in:&:source:char, buffered-out:&:sink:char -> buffered-out:&:sink:char, in:&:source:char [
389   local-scope
390   load-ingredients
391   # repeat forever
392   eof?:bool <- copy 0/false
393   {
394     line:&:buffer <- new-buffer 30
395     # read characters from 'in' until newline, copy into line
396     {
397       +next-character
398       c:char, eof?:bool, in <- read in
399       break-if eof?
400       # drop a character on backspace
401       {
402         # special-case: if it's a backspace
403         backspace?:bool <- equal c, 8
404         break-unless backspace?
405         # drop previous character
406         {
407           buffer-length:num <- get *line, length:offset
408           buffer-empty?:bool <- equal buffer-length, 0
409           break-if buffer-empty?
410           buffer-length <- subtract buffer-length, 1
411           *line <- put *line, length:offset, buffer-length
412         }
413         # and don't append this one
414         loop +next-character
415       }
416       # append anything else
417       line <- append line, c
418       line-done?:bool <- equal c, 10/newline
419       break-if line-done?
420       loop
421     }
422     # copy line into 'buffered-out'
423     i:num <- copy 0
424     line-contents:text <- get *line, data:offset
425     max:num <- get *line, length:offset
426     {
427       done?:bool <- greater-or-equal i, max
428       break-if done?
429       c:char <- index *line-contents, i
430       buffered-out <- write buffered-out, c
431       i <- add i, 1
432       loop
433     }
434     {
435       break-unless eof?
436       buffered-out <- close buffered-out
437       return
438     }
439     loop
440   }
441 ]
442 
443 scenario buffer-lines-blocks-until-newline [
444   run [
445     local-scope
446     source:&:source:char, sink:&:sink:char <- new-channel 10/capacity
447     _, buffered-stdin:&:sink:char/buffered-stdin <- new-channel 10/capacity
448     buffered-chan:&:channel:char <- get *buffered-stdin, chan:offset
449     empty?:bool <- channel-empty? buffered-chan
450     assert empty?, [ 
451 F buffer-lines-blocks-until-newline: channel should be empty after init]
452     # buffer stdin into buffered-stdin, try to read from buffered-stdin
453     buffer-routine:num <- start-running buffer-lines, source, buffered-stdin
454     wait-for-routine-to-block buffer-routine
455     empty? <- channel-empty? buffered-chan
456     assert empty?:bool, [ 
457 F buffer-lines-blocks-until-newline: channel should be empty after buffer-lines bring-up]
458     # write 'a'
459     sink <- write sink, 97/a
460     restart buffer-routine
461     wait-for-routine-to-block buffer-routine
462     empty? <- channel-empty? buffered-chan
463     assert empty?:bool, [ 
464 F buffer-lines-blocks-until-newline: channel should be empty after writing 'a']
465     # write 'b'
466     sink <- write sink, 98/b
467     restart buffer-routine
468     wait-for-routine-to-block buffer-routine
469     empty? <- channel-empty? buffered-chan
470     assert empty?:bool, [ 
471 F buffer-lines-blocks-until-newline: channel should be empty after writing 'b']
472     # write newline
473     sink <- write sink, 10/newline
474     restart buffer-routine
475     wait-for-routine-to-block buffer-routine
476     empty? <- channel-empty? buffered-chan
477     data-emitted?:bool <- not empty?
478     assert data-emitted?, [ 
479 F buffer-lines-blocks-until-newline: channel should contain data after writing newline]
480     trace 1, [test], [reached end]
481   ]
482   trace-should-contain [
483     test: reached end
484   ]
485 ]
486 
487 def drain source:&:source:char -> result:text, source:&:source:char [
488   local-scope
489   load-ingredients
490   buf:&:buffer <- new-buffer 30
491   {
492     c:char, done?:bool <- read source
493     break-if done?
494     buf <- append buf, c
495     loop
496   }
497   result <- buffer-to-array buf
498 ]