https://github.com/akkartik/mu/blob/master/075channel.mu
  1 # Mu synchronizes between routines using channels rather than locks, like
  2 # Erlang and Go.
  3 #
  4 # The key property of channels: Writing to a full channel or reading from an
  5 # empty one will put the current routine in 'waiting' state until the
  6 # operation can be completed.
  7 #
  8 # Beware of addresses passed into channels. They can cause race conditions.
  9 
 10 scenario channel [
 11   run [
 12     local-scope
 13     source:&:source:num, sink:&:sink:num <- new-channel 3/capacity
 14     sink <- write sink, 34
 15     10:num/raw, 11:bool/raw, source <- read source
 16   ]
 17   memory-should-contain [
 18     10 <- 34
 19     11 <- 0  # read was successful
 20   ]
 21 ]
 22 
 23 container channel:_elem [
 24   lock:bool  # inefficient but simple: serialize all reads as well as writes
 25   first-full:num  # for write
 26   first-free:num  # for read
 27   # A circular buffer contains values from index first-full up to (but not
 28   # including) index first-free. The reader always modifies it at first-full,
 29   # while the writer always modifies it at first-free.
 30   data:&:@:_elem
 31 ]
 32 
 33 # Since channels have two ends, and since it's an error to use either end from
 34 # multiple routines, let's distinguish the ends.
 35 
 36 container source:_elem [
 37   chan:&:channel:_elem
 38 ]
 39 
 40 container sink:_elem [
 41   chan:&:channel:_elem
 42 ]
 43 
 44 def new-channel capacity:num -> in:&:source:_elem, out:&:sink:_elem [
 45   local-scope
 46   load-inputs
 47   result:&:channel:_elem <- new {(channel _elem): type}
 48   *result <- put *result, first-full:offset, 0
 49   *result <- put *result, first-free:offset, 0
 50   capacity <- add capacity, 1  # unused slot for 'full?' below
 51   data:&:@:_elem <- new _elem:type, capacity
 52   *result <- put *result, data:offset, data
 53   in <- new {(source _elem): type}
 54   *in <- put *in, chan:offset, result
 55   out <- new {(sink _elem): type}
 56   *out <- put *out, chan:offset, result
 57 ]
 58 
 59 # write a value to a channel
 60 def write out:&:sink:_elem, val:_elem -> out:&:sink:_elem [
 61   local-scope
 62   load-inputs
 63   assert out, [write to null channel]
 64   chan:&:channel:_elem <- get *out, chan:offset
 65   <channel-write-initial>
 66   # block until lock is acquired AND queue has room
 67   lock:location <- get-location *chan, lock:offset
 68 #?   $print [write], 10/newline
 69   {
 70 #?     $print [trying to acquire lock for writing], 10/newline
 71     wait-for-reset-then-set lock
 72 #?     $print [lock acquired for writing], 10/newline
 73     full?:bool <- channel-full? chan
 74     break-unless full?
 75 #?     $print [but channel is full; relinquishing lock], 10/newline
 76     # channel is full; relinquish lock and give a reader the opportunity to
 77     # create room on it
 78     reset lock
 79     current-routine-is-blocked
 80     switch  # avoid spinlocking
 81     loop
 82   }
 83   current-routine-is-unblocked
 84 #?   $print [performing write], 10/newline
 85   # store a deep copy of val
 86   circular-buffer:&:@:_elem <- get *chan, data:offset
 87   free:num <- get *chan, first-free:offset
 88   *circular-buffer <- put-index *circular-buffer, free, val
 89   # mark its slot as filled
 90   free <- add free, 1
 91   {
 92     # wrap free around to 0 if necessary
 93     len:num <- length *circular-buffer
 94     at-end?:bool <- greater-or-equal free, len
 95     break-unless at-end?
 96     free <- copy 0
 97   }
 98   # write back
 99   *chan <- put *chan, first-free:offset, free
100 #?   $print [relinquishing lock after writing], 10/newline
101   reset lock
102 ]
103 
104 # read a value from a channel
105 def read in:&:source:_elem -> result:_elem, eof?:bool, in:&:source:_elem [
106   local-scope
107   load-inputs
108   assert in, [read on null channel]
109   eof? <- copy false  # default result
110   chan:&:channel:_elem <- get *in, chan:offset
111   # block until lock is acquired AND queue has data
112   lock:location <- get-location *chan, lock:offset
113 #?   $print [read], 10/newline
114   {
115 #?     $print [trying to acquire lock for reading], 10/newline
116     wait-for-reset-then-set lock
117 #?     $print [lock acquired for reading], 10/newline
118     empty?:bool <- channel-empty? chan
119     break-unless empty?
120 #?     $print [but channel is empty; relinquishing lock], 10/newline
121     # channel is empty; relinquish lock and give a writer the opportunity to
122     # add to it
123     reset lock
124     current-routine-is-blocked
125     <channel-read-empty>
126     switch  # avoid spinlocking
127     loop
128   }
129   current-routine-is-unblocked
130   # pull result off
131   full:num <- get *chan, first-full:offset
132   circular-buffer:&:@:_elem <- get *chan, data:offset
133   result <- index *circular-buffer, full
134   # clear the slot
135   empty:&:_elem <- new _elem:type
136   *circular-buffer <- put-index *circular-buffer, full, *empty
137   # mark its slot as empty
138   full <- add full, 1
139   {
140     # wrap full around to 0 if necessary
141     len:num <- length *circular-buffer
142     at-end?:bool <- greater-or-equal full, len
143     break-unless at-end?
144     full <- copy 0
145   }
146   # write back
147   *chan <- put *chan, first-full:offset, full
148 #?   $print [relinquishing lock after reading], 10/newline
149   reset lock
150 ]
151 
152 # todo: create a notion of iterator and iterable so we can read/write whole
153 # aggregates (arrays, lists, ..) of _elems at once.
154 
155 scenario channel-initialization [
156   run [
157     local-scope
158     source:&:source:num <- new-channel 3/capacity
159     chan:&:channel:num <- get *source, chan:offset
160     10:num/raw <- get *chan, first-full:offset
161     11:num/raw <- get *chan, first-free:offset
162   ]
163   memory-should-contain [
164     10 <- 0  # first-full
165     11 <- 0  # first-free
166   ]
167 ]
168 
169 scenario channel-write-increments-free [
170   local-scope
171   _, sink:&:sink:num <- new-channel 3/capacity
172   run [
173     sink <- write sink, 34
174     chan:&:channel:num <- get *sink, chan:offset
175     10:num/raw <- get *chan, first-full:offset
176     11:num/raw <- get *chan, first-free:offset
177   ]
178   memory-should-contain [
179     10 <- 0  # first-full
180     11 <- 1  # first-free
181   ]
182 ]
183 
184 scenario channel-read-increments-full [
185   local-scope
186   source:&:source:num, sink:&:sink:num <- new-channel 3/capacity
187   sink <- write sink, 34
188   run [
189     _, _, source <- read source
190     chan:&:channel:num <- get *source, chan:offset
191     10:num/raw <- get *chan, first-full:offset
192     11:num/raw <- get *chan, first-free:offset
193   ]
194   memory-should-contain [
195     10 <- 1  # first-full
196     11 <- 1  # first-free
197   ]
198 ]
199 
200 scenario channel-wrap [
201   local-scope
202   # channel with just 1 slot
203   source:&:source:num, sink:&:sink:num <- new-channel 1/capacity
204   chan:&:channel:num <- get *source, chan:offset
205   # write and read a value
206   sink <- write sink, 34
207   _, _, source <- read source
208   run [
209     # first-free will now be 1
210     10:num/raw <- get *chan, first-free:offset
211     11:num/raw <- get *chan, first-free:offset
212     # write second value, verify that first-free wraps
213     sink <- write sink, 34
214     20:num/raw <- get *chan, first-free:offset
215     # read second value, verify that first-full wraps
216     _, _, source <- read source
217     30:num/raw <- get *chan, first-full:offset
218   ]
219   memory-should-contain [
220     10 <- 1  # first-free after first write
221     11 <- 1  # first-full after first read
222     20 <- 0  # first-free after second write, wrapped
223     30 <- 0  # first-full after second read, wrapped
224   ]
225 ]
226 
227 scenario channel-new-empty-not-full [
228   run [
229     local-scope
230     source:&:source:num <- new-channel 3/capacity
231     chan:&:channel:num <- get *source, chan:offset
232     10:bool/raw <- channel-empty? chan
233     11:bool/raw <- channel-full? chan
234   ]
235   memory-should-contain [
236     10 <- 1  # empty?
237     11 <- 0  # full?
238   ]
239 ]
240 
241 scenario channel-write-not-empty [
242   local-scope
243   source:&:source:num, sink:&:sink:num <- new-channel 3/capacity
244   chan:&:channel:num <- get *source, chan:offset
245   run [
246     sink <- write sink, 34
247     10:bool/raw <- channel-empty? chan
248     11:bool/raw <- channel-full? chan
249   ]
250   memory-should-contain [
251     10 <- 0  # empty?
252     11 <- 0  # full?
253   ]
254 ]
255 
256 scenario channel-write-full [
257   local-scope
258   source:&:source:num, sink:&:sink:num <- new-channel 1/capacity
259   chan:&:channel:num <- get *source, chan:offset
260   run [
261     sink <- write sink, 34
262     10:bool/raw <- channel-empty? chan
263     11:bool/raw <- channel-full? chan
264   ]
265   memory-should-contain [
266     10 <- 0  # empty?
267     11 <- 1  # full?
268   ]
269 ]
270 
271 scenario channel-read-not-full [
272   local-scope
273   source:&:source:num, sink:&:sink:num <- new-channel 1/capacity
274   chan:&:channel:num <- get *source, chan:offset
275   sink <- write sink, 34
276   run [
277     _, _, source <- read source
278     10:bool/raw <- channel-empty? chan
279     11:bool/raw <- channel-full? chan
280   ]
281   memory-should-contain [
282     10 <- 1  # empty?
283     11 <- 0  # full?
284   ]
285 ]
286 
287 scenario channel-clear [
288   local-scope
289   # create a channel with a few items
290   source:&:source:num, sink:&:sink:num <- new-channel 3/capacity
291   chan:&:channel:num <- get *sink, chan:offset
292   write sink, 30
293   write sink, 31
294   write sink, 32
295   run [
296     clear source
297     10:bool/raw <- channel-empty? chan
298   ]
299   memory-should-contain [
300     10 <- 1  # after the call to 'clear', the channel should be empty
301   ]
302 ]
303 
304 def clear in:&:source:_elem -> in:&:source:_elem [
305   local-scope
306   load-inputs
307   chan:&:channel:_elem <- get *in, chan:offset
308   {
309     empty?:bool <- channel-empty? chan
310     break-if empty?
311     _, _, in <- read in
312     loop
313   }
314 ]
315 
316 ## cancelling channels
317 
318 # every channel comes with a boolean signifying if it's been closed
319 # initially this boolean is false
320 container channel:_elem [
321   closed?:bool
322 ]
323 
324 # a channel can be closed from either the source or the sink
325 # both routines can modify the 'closed?' bit, but they can only ever set it, so this is a benign race
326 def close x:&:source:_elem -> x:&:source:_elem [
327   local-scope
328   load-inputs
329   chan:&:channel:_elem <- get *x, chan:offset
330   *chan <- put *chan, closed?:offset, true
331 ]
332 def close x:&:sink:_elem -> x:&:sink:_elem [
333   local-scope
334   load-inputs
335   chan:&:channel:_elem <- get *x, chan:offset
336   *chan <- put *chan, closed?:offset, true
337 ]
338 
339 # once a channel is closed from one side, no further operations are expected from that side
340 # if a channel is closed for reading,
341 #   no further writes will be let through
342 # if a channel is closed for writing,
343 #   future reads continue until the channel empties,
344 #   then the channel is also closed for reading
345 after <channel-write-initial> [
346   closed?:bool <- get *chan, closed?:offset
347   return-if closed?
348 ]
349 after <channel-read-empty> [
350   closed?:bool <- get *chan, closed?:offset
351   {
352     break-unless closed?
353     empty-result:&:_elem <- new _elem:type
354     current-routine-is-unblocked
355     return *empty-result, true
356   }
357 ]
358 
359 ## helpers
360 
361 # An empty channel has first-free and first-full both at the same value.
362 def channel-empty? chan:&:channel:_elem -> result:bool [
363   local-scope
364   load-inputs
365   # return chan.first-full == chan.first-free
366   full:num <- get *chan, first-full:offset
367   free:num <- get *chan, first-free:offset
368   result <- equal full, free
369 ]
370 
371 # A full channel has first-free just before first-full, wasting one slot.
372 # (Other alternatives: https://www.snellman.net/blog/archive/2016-12-13-ring-buffers)
373 def channel-full? chan:&:channel:_elem -> result:bool [
374   local-scope
375   load-inputs
376   # tmp = chan.first-free + 1
377   tmp:num <- get *chan, first-free:offset
378   tmp <- add tmp, 1
379   {
380     # if tmp == chan.capacity, tmp = 0
381     len:num <- capacity chan
382     at-end?:bool <- greater-or-equal tmp, len
383     break-unless at-end?
384     tmp <- copy 0
385   }
386   # return chan.first-full == tmp
387   full:num <- get *chan, first-full:offset
388   result <- equal full, tmp
389 ]
390 
391 def capacity chan:&:channel:_elem -> result:num [
392   local-scope
393   load-inputs
394   q:&:@:_elem <- get *chan, data:offset
395   result <- length *q
396 ]
397 
398 ## helpers for channels of characters in particular
399 
400 def buffer-lines in:&:source:char, buffered-out:&:sink:char -> buffered-out:&:sink:char, in:&:source:char [
401   local-scope
402   load-inputs
403   # repeat forever
404   eof?:bool <- copy false
405   {
406     line:&:buffer:char <- new-buffer 30
407     # read characters from 'in' until newline, copy into line
408     {
409       +next-character
410       c:char, eof?:bool, in <- read in
411       break-if eof?
412       # drop a character on backspace
413       {
414         # special-case: if it's a backspace
415         backspace?:bool <- equal c, 8
416         break-unless backspace?
417         # drop previous character
418         {
419           buffer-length:num <- get *line, length:offset
420           buffer-empty?:bool <- equal buffer-length, 0
421           break-if buffer-empty?
422           buffer-length <- subtract buffer-length, 1
423           *line <- put *line, length:offset, buffer-length
424         }
425         # and don't append this one
426         loop +next-character
427       }
428       # append anything else
429       line <- append line, c
430       line-done?:bool <- equal c, 10/newline
431       break-if line-done?
432       loop
433     }
434     # copy line into 'buffered-out'
435     i:num <- copy 0
436     line-contents:text <- get *line, data:offset
437     max:num <- get *line, length:offset
438     {
439       done?:bool <- greater-or-equal i, max
440       break-if done?
441       c:char <- index *line-contents, i
442       buffered-out <- write buffered-out, c
443       i <- add i, 1
444       loop
445     }
446     {
447       break-unless eof?
448       buffered-out <- close buffered-out
449       return
450     }
451     loop
452   }
453 ]
454 
455 scenario buffer-lines-blocks-until-newline [
456   run [
457     local-scope
458     source:&:source:char, sink:&:sink:char <- new-channel 10/capacity
459     _, buffered-stdin:&:sink:char/buffered-stdin <- new-channel 10/capacity
460     buffered-chan:&:channel:char <- get *buffered-stdin, chan:offset
461     empty?:bool <- channel-empty? buffered-chan
462     assert empty?, [ 
463 F buffer-lines-blocks-until-newline: channel should be empty after init]
464     # buffer stdin into buffered-stdin, try to read from buffered-stdin
465     buffer-routine:num <- start-running buffer-lines, source, buffered-stdin
466     wait-for-routine-to-block buffer-routine
467     empty? <- channel-empty? buffered-chan
468     assert empty?:bool, [ 
469 F buffer-lines-blocks-until-newline: channel should be empty after buffer-lines bring-up]
470     # write 'a'
471     sink <- write sink, 97/a
472     restart buffer-routine
473     wait-for-routine-to-block buffer-routine
474     empty? <- channel-empty? buffered-chan
475     assert empty?:bool, [ 
476 F buffer-lines-blocks-until-newline: channel should be empty after writing 'a']
477     # write 'b'
478     sink <- write sink, 98/b
479     restart buffer-routine
480     wait-for-routine-to-block buffer-routine
481     empty? <- channel-empty? buffered-chan
482     assert empty?:bool, [ 
483 F buffer-lines-blocks-until-newline: channel should be empty after writing 'b']
484     # write newline
485     sink <- write sink, 10/newline
486     restart buffer-routine
487     wait-for-routine-to-block buffer-routine
488     empty? <- channel-empty? buffered-chan
489     data-emitted?:bool <- not empty?
490     assert data-emitted?, [ 
491 F buffer-lines-blocks-until-newline: channel should contain data after writing newline]
492     trace 1, [test], [reached end]
493   ]
494   trace-should-contain [
495     test: reached end
496   ]
497 ]
498 
499 def drain source:&:source:char -> result:text, source:&:source:char [
500   local-scope
501   load-inputs
502   buf:&:buffer:char <- new-buffer 30
503   {
504     c:char, done?:bool <- read source
505     break-if done?
506     buf <- append buf, c
507     loop
508   }
509   result <- buffer-to-array buf
510 ]