https://github.com/akkartik/mu/blob/master/075channel.mu
1
2
3
4
5
6
7
8
9
10 scenario channel [
11 run [
12 local-scope
13 source:&:source:num, sink:&:sink:num <- new-channel 3/capacity
14 sink <- write sink, 34
15 10:num/raw, 11:bool/raw, source <- read source
16 ]
17 memory-should-contain [
18 10 <- 34
19 11 <- 0
20 ]
21 ]
22
23 container channel:_elem [
24 lock:bool
25 first-full:num
26 first-free:num
27
28
29
30 data:&:@:_elem
31 ]
32
33
34
35
36 container source:_elem [
37 chan:&:channel:_elem
38 ]
39
40 container sink:_elem [
41 chan:&:channel:_elem
42 ]
43
44 def new-channel capacity:num -> in:&:source:_elem, out:&:sink:_elem [
45 local-scope
46 load-inputs
47 result:&:channel:_elem <- new {(channel _elem): type}
48 *result <- put *result, first-full:offset, 0
49 *result <- put *result, first-free:offset, 0
50 capacity <- add capacity, 1
51 data:&:@:_elem <- new _elem:type, capacity
52 *result <- put *result, data:offset, data
53 in <- new {(source _elem): type}
54 *in <- put *in, chan:offset, result
55 out <- new {(sink _elem): type}
56 *out <- put *out, chan:offset, result
57 ]
58
59
60 def write out:&:sink:_elem, val:_elem -> out:&:sink:_elem [
61 local-scope
62 load-inputs
63 assert out, [write to null channel]
64 chan:&:channel:_elem <- get *out, chan:offset
65 <channel-write-initial>
66
67 lock:location <- get-location *chan, lock:offset
68
69 {
70
71 wait-for-reset-then-set lock
72
73 full?:bool <- channel-full? chan
74 break-unless full?
75
76
77
78 reset lock
79 current-routine-is-blocked
80 switch
81 loop
82 }
83 current-routine-is-unblocked
84
85
86 circular-buffer:&:@:_elem <- get *chan, data:offset
87 free:num <- get *chan, first-free:offset
88 *circular-buffer <- put-index *circular-buffer, free, val
89
90 free <- add free, 1
91 {
92
93 len:num <- length *circular-buffer
94 at-end?:bool <- greater-or-equal free, len
95 break-unless at-end?
96 free <- copy 0
97 }
98
99 *chan <- put *chan, first-free:offset, free
100
101 reset lock
102 ]
103
104
105 def read in:&:source:_elem -> result:_elem, eof?:bool, in:&:source:_elem [
106 local-scope
107 load-inputs
108 assert in, [read on null channel]
109 eof? <- copy false
110 chan:&:channel:_elem <- get *in, chan:offset
111
112 lock:location <- get-location *chan, lock:offset
113
114 {
115
116 wait-for-reset-then-set lock
117
118 empty?:bool <- channel-empty? chan
119 break-unless empty?
120
121
122
123 reset lock
124 current-routine-is-blocked
125 <channel-read-empty>
126 switch
127 loop
128 }
129 current-routine-is-unblocked
130
131 full:num <- get *chan, first-full:offset
132 circular-buffer:&:@:_elem <- get *chan, data:offset
133 result <- index *circular-buffer, full
134
135 empty:&:_elem <- new _elem:type
136 *circular-buffer <- put-index *circular-buffer, full, *empty
137
138 full <- add full, 1
139 {
140
141 len:num <- length *circular-buffer
142 at-end?:bool <- greater-or-equal full, len
143 break-unless at-end?
144 full <- copy 0
145 }
146
147 *chan <- put *chan, first-full:offset, full
148
149 reset lock
150 ]
151
152
153
154
155 scenario channel-initialization [
156 run [
157 local-scope
158 source:&:source:num <- new-channel 3/capacity
159 chan:&:channel:num <- get *source, chan:offset
160 10:num/raw <- get *chan, first-full:offset
161 11:num/raw <- get *chan, first-free:offset
162 ]
163 memory-should-contain [
164 10 <- 0
165 11 <- 0
166 ]
167 ]
168
169 scenario channel-write-increments-free [
170 local-scope
171 _, sink:&:sink:num <- new-channel 3/capacity
172 run [
173 sink <- write sink, 34
174 chan:&:channel:num <- get *sink, chan:offset
175 10:num/raw <- get *chan, first-full:offset
176 11:num/raw <- get *chan, first-free:offset
177 ]
178 memory-should-contain [
179 10 <- 0
180 11 <- 1
181 ]
182 ]
183
184 scenario channel-read-increments-full [
185 local-scope
186 source:&:source:num, sink:&:sink:num <- new-channel 3/capacity
187 sink <- write sink, 34
188 run [
189 _, _, source <- read source
190 chan:&:channel:num <- get *source, chan:offset
191 10:num/raw <- get *chan, first-full:offset
192 11:num/raw <- get *chan, first-free:offset
193 ]
194 memory-should-contain [
195 10 <- 1
196 11 <- 1
197 ]
198 ]
199
200 scenario channel-wrap [
201 local-scope
202
203 source:&:source:num, sink:&:sink:num <- new-channel 1/capacity
204 chan:&:channel:num <- get *source, chan:offset
205
206 sink <- write sink, 34
207 _, _, source <- read source
208 run [
209
210 10:num/raw <- get *chan, first-free:offset
211 11:num/raw <- get *chan, first-free:offset
212
213 sink <- write sink, 34
214 20:num/raw <- get *chan, first-free:offset
215
216 _, _, source <- read source
217 30:num/raw <- get *chan, first-full:offset
218 ]
219 memory-should-contain [
220 10 <- 1
221 11 <- 1
222 20 <- 0
223 30 <- 0
224 ]
225 ]
226
227 scenario channel-new-empty-not-full [
228 run [
229 local-scope
230 source:&:source:num <- new-channel 3/capacity
231 chan:&:channel:num <- get *source, chan:offset
232 10:bool/raw <- channel-empty? chan
233 11:bool/raw <- channel-full? chan
234 ]
235 memory-should-contain [
236 10 <- 1
237 11 <- 0
238 ]
239 ]
240
241 scenario channel-write-not-empty [
242 local-scope
243 source:&:source:num, sink:&:sink:num <- new-channel 3/capacity
244 chan:&:channel:num <- get *source, chan:offset
245 run [
246 sink <- write sink, 34
247 10:bool/raw <- channel-empty? chan
248 11:bool/raw <- channel-full? chan
249 ]
250 memory-should-contain [
251 10 <- 0
252 11 <- 0
253 ]
254 ]
255
256 scenario channel-write-full [
257 local-scope
258 source:&:source:num, sink:&:sink:num <- new-channel 1/capacity
259 chan:&:channel:num <- get *source, chan:offset
260 run [
261 sink <- write sink, 34
262 10:bool/raw <- channel-empty? chan
263 11:bool/raw <- channel-full? chan
264 ]
265 memory-should-contain [
266 10 <- 0
267 11 <- 1
268 ]
269 ]
270
271 scenario channel-read-not-full [
272 local-scope
273 source:&:source:num, sink:&:sink:num <- new-channel 1/capacity
274 chan:&:channel:num <- get *source, chan:offset
275 sink <- write sink, 34
276 run [
277 _, _, source <- read source
278 10:bool/raw <- channel-empty? chan
279 11:bool/raw <- channel-full? chan
280 ]
281 memory-should-contain [
282 10 <- 1
283 11 <- 0
284 ]
285 ]
286
287 scenario channel-clear [
288 local-scope
289
290 source:&:source:num, sink:&:sink:num <- new-channel 3/capacity
291 chan:&:channel:num <- get *sink, chan:offset
292 write sink, 30
293 write sink, 31
294 write sink, 32
295 run [
296 clear source
297 10:bool/raw <- channel-empty? chan
298 ]
299 memory-should-contain [
300 10 <- 1
301 ]
302 ]
303
304 def clear in:&:source:_elem -> in:&:source:_elem [
305 local-scope
306 load-inputs
307 chan:&:channel:_elem <- get *in, chan:offset
308 {
309 empty?:bool <- channel-empty? chan
310 break-if empty?
311 _, _, in <- read in
312 loop
313 }
314 ]
315
316
317
318
319
320 container channel:_elem [
321 closed?:bool
322 ]
323
324
325
326 def close x:&:source:_elem -> x:&:source:_elem [
327 local-scope
328 load-inputs
329 chan:&:channel:_elem <- get *x, chan:offset
330 *chan <- put *chan, closed?:offset, true
331 ]
332 def close x:&:sink:_elem -> x:&:sink:_elem [
333 local-scope
334 load-inputs
335 chan:&:channel:_elem <- get *x, chan:offset
336 *chan <- put *chan, closed?:offset, true
337 ]
338
339
340
341
342
343
344
345 after <channel-write-initial> [
346 closed?:bool <- get *chan, closed?:offset
347 return-if closed?
348 ]
349 after <channel-read-empty> [
350 closed?:bool <- get *chan, closed?:offset
351 {
352 break-unless closed?
353 empty-result:&:_elem <- new _elem:type
354 current-routine-is-unblocked
355 return *empty-result, true
356 }
357 ]
358
359
360
361
362 def channel-empty? chan:&:channel:_elem -> result:bool [
363 local-scope
364 load-inputs
365
366 full:num <- get *chan, first-full:offset
367 free:num <- get *chan, first-free:offset
368 result <- equal full, free
369 ]
370
371
372
373 def channel-full? chan:&:channel:_elem -> result:bool [
374 local-scope
375 load-inputs
376
377 tmp:num <- get *chan, first-free:offset
378 tmp <- add tmp, 1
379 {
380
381 len:num <- capacity chan
382 at-end?:bool <- greater-or-equal tmp, len
383 break-unless at-end?
384 tmp <- copy 0
385 }
386
387 full:num <- get *chan, first-full:offset
388 result <- equal full, tmp
389 ]
390
391 def capacity chan:&:channel:_elem -> result:num [
392 local-scope
393 load-inputs
394 q:&:@:_elem <- get *chan, data:offset
395 result <- length *q
396 ]
397
398
399
400 def buffer-lines in:&:source:char, buffered-out:&:sink:char -> buffered-out:&:sink:char, in:&:source:char [
401 local-scope
402 load-inputs
403
404 eof?:bool <- copy false
405 {
406 line:&:buffer:char <- new-buffer 30
407
408 {
409 +next-character
410 c:char, eof?:bool, in <- read in
411 break-if eof?
412
413 {
414
415 backspace?:bool <- equal c, 8
416 break-unless backspace?
417
418 {
419 buffer-length:num <- get *line, length:offset
420 buffer-empty?:bool <- equal buffer-length, 0
421 break-if buffer-empty?
422 buffer-length <- subtract buffer-length, 1
423 *line <- put *line, length:offset, buffer-length
424 }
425
426 loop +next-character
427 }
428
429 line <- append line, c
430 line-done?:bool <- equal c, 10/newline
431 break-if line-done?
432 loop
433 }
434
435 i:num <- copy 0
436 line-contents:text <- get *line, data:offset
437 max:num <- get *line, length:offset
438 {
439 done?:bool <- greater-or-equal i, max
440 break-if done?
441 c:char <- index *line-contents, i
442 buffered-out <- write buffered-out, c
443 i <- add i, 1
444 loop
445 }
446 {
447 break-unless eof?
448 buffered-out <- close buffered-out
449 return
450 }
451 loop
452 }
453 ]
454
455 scenario buffer-lines-blocks-until-newline [
456 run [
457 local-scope
458 source:&:source:char, sink:&:sink:char <- new-channel 10/capacity
459 _, buffered-stdin:&:sink:char/buffered-stdin <- new-channel 10/capacity
460 buffered-chan:&:channel:char <- get *buffered-stdin, chan:offset
461 empty?:bool <- channel-empty? buffered-chan
462 assert empty?, [
463 F buffer-lines-blocks-until-newline: channel should be empty after init]
464
465 buffer-routine:num <- start-running buffer-lines, source, buffered-stdin
466 wait-for-routine-to-block buffer-routine
467 empty? <- channel-empty? buffered-chan
468 assert empty?:bool, [
469 F buffer-lines-blocks-until-newline: channel should be empty after buffer-lines bring-up]
470
471 sink <- write sink, 97/a
472 restart buffer-routine
473 wait-for-routine-to-block buffer-routine
474 empty? <- channel-empty? buffered-chan
475 assert empty?:bool, [
476 F buffer-lines-blocks-until-newline: channel should be empty after writing 'a']
477
478 sink <- write sink, 98/b
479 restart buffer-routine
480 wait-for-routine-to-block buffer-routine
481 empty? <- channel-empty? buffered-chan
482 assert empty?:bool, [
483 F buffer-lines-blocks-until-newline: channel should be empty after writing 'b']
484
485 sink <- write sink, 10/newline
486 restart buffer-routine
487 wait-for-routine-to-block buffer-routine
488 empty? <- channel-empty? buffered-chan
489 data-emitted?:bool <- not empty?
490 assert data-emitted?, [
491 F buffer-lines-blocks-until-newline: channel should contain data after writing newline]
492 trace 1, [test], [reached end]
493 ]
494 trace-should-contain [
495 test: reached end
496 ]
497 ]
498
499 def drain source:&:source:char -> result:text, source:&:source:char [
500 local-scope
501 load-inputs
502 buf:&:buffer:char <- new-buffer 30
503 {
504 c:char, done?:bool <- read source
505 break-if done?
506 buf <- append buf, c
507 loop
508 }
509 result <- buffer-to-array buf
510 ]