about summary refs log tree commit diff stats
diff options
context:
space:
mode:
authorKartik K. Agaram <vc@akkartik.com>2014-11-07 21:39:00 -0800
committerKartik K. Agaram <vc@akkartik.com>2014-11-07 21:39:00 -0800
commit643f35e5c563ad139d6d15d964713f016a21cbc9 (patch)
tree1f359937558b7d380b022aa1079851f0fb639b70
parent66741bc8ef82776b480713fcfc298f315bfa6530 (diff)
downloadmu-643f35e5c563ad139d6d15d964713f016a21cbc9.tar.gz
259 - finally channel read/write can cause routine to sleep
-rw-r--r--mu.arc32
-rw-r--r--mu.arc.t48
2 files changed, 74 insertions, 6 deletions
diff --git a/mu.arc b/mu.arc
index ba5b39d3..ef5cb539 100644
--- a/mu.arc
+++ b/mu.arc
@@ -187,9 +187,9 @@
 (def run fn-names
   (each it fn-names
     (enq make-routine.it running-routines*))
-  ; simple round-robin scheduler
   (while (or (~empty running-routines*)
              (~empty sleeping-routines*))
+    (detect-deadlock)
     (point continue
     (each (routine _) canon.sleeping-routines*
       (awhen (case rep.routine!sleep.1
@@ -208,6 +208,7 @@
       (trace "schedule" "skipping cycle " curr-cycle*)
       (++ curr-cycle*)
       (continue))
+    ; simple round-robin scheduler
     (= routine* deq.running-routines*)
     (trace "schedule" top.routine*!fn-name)
     (routine-mark:run-for-time-slice scheduling-interval*)
@@ -219,6 +220,15 @@
         :else
           (enq-limit routine* completed-routines*)))))
 
+(def detect-deadlock ()
+  (when (and empty.running-routines*
+             (~some 'literal (map (fn(_) rep._!sleep.1)
+                                  keys.sleeping-routines*)))
+    (each (routine _) sleeping-routines*
+      (wipe sleeping-routines*.routine)
+      (= rep.routine!error "deadlock detected")
+      (enq routine completed-routines*))))
+
 (def die (msg)
   (= rep.routine*!error msg)
   (= rep.routine*!stack-trace rep.routine*!call-stack)
@@ -498,8 +508,6 @@
                   (assert (m arg.0))
                 sleep
                   (let operand arg.0
-                    (assert (~pos 'deref metadata.operand)
-                            "sleep doesn't support indirect addressing yet")
                     (if (is ty.operand 'literal)
                       (let delay v.operand
                         (trace "run" "sleeping until " (+ curr-cycle* delay))
@@ -859,6 +867,15 @@
   ((default-scope scope-address) <- new (scope literal) (30 literal))
   ((chan channel) <- arg)
   ((val tagged-value) <- arg)
+  { begin
+    ((full boolean) <- full? (chan channel))
+    (break-unless (full boolean))
+    ; todo: race condition: what if consumer routine reads between previous
+    ; instruction and next?
+    ((watch boolean-address) <- get-address (chan channel) (read-watch offset))
+    ((watch boolean-address deref) <- copy (nil literal))
+    (sleep (watch boolean-address))
+  }
   ((q tagged-value-array-address) <- get (chan channel) (circular-buffer offset))
   ((free integer-address) <- get-address (chan channel) (first-free offset))
   ((dest tagged-value-address) <- index-address (q tagged-value-array-address deref) (free integer-address deref))
@@ -877,6 +894,15 @@
 (init-fn read
   ((default-scope scope-address) <- new (scope literal) (30 literal))
   ((chan channel) <- arg)
+  { begin
+    ((empty boolean) <- empty? (chan channel))
+    (break-unless (empty boolean))
+    ; todo: race condition: what if producer routine writes between previous
+    ; instruction and next?
+    ((watch boolean-address) <- get-address (chan channel) (write-watch offset))
+    ((watch boolean-address deref) <- copy (nil literal))
+    (sleep (watch boolean-address deref))
+  }
   ((full integer-address) <- get-address (chan channel) (first-full offset))
   ((q tagged-value-array-address) <- get (chan channel) (circular-buffer offset))
   ((result tagged-value) <- index (q tagged-value-array-address deref) (full integer-address deref))
diff --git a/mu.arc.t b/mu.arc.t
index 1c18eeeb..d2b0d70f 100644
--- a/mu.arc.t
+++ b/mu.arc.t
@@ -1784,6 +1784,8 @@
 (= scheduling-interval* 1)
 (run 'f1 'f2)
 ;? (prn canon.memory*)
+(let last-routine (deq completed-routines*)
+  (aif rep.last-routine!error (prn "error - " it)))
 (if (~is memory*.2 4)  ; successor of value
   (prn "F - scheduler handles routines blocking on a memory location"))
 ;? (quit)
@@ -2027,10 +2029,50 @@
         (~is nil memory*.5))
   (prn "F - a channel after reading may be empty"))
 
-; We'd like to block routines when they write to a full channel or read from
-; an empty channel.
+; The key property of channels; writing to a full channel blocks the current
+; routine until it creates space. Ditto reading from an empty channel.
+
+(reset)
+(new-trace "channel-read-block")
+(add-fns
+  '((main
+      ((1 channel-address) <- new-channel (3 literal))
+      ; channel is empty, but receives a read
+      ((2 tagged-value) (1 channel-address deref) <- read (1 channel-address deref)))))
+;? (set dump-trace*)
+;? (= dump-trace* (obj whitelist '("run")))
+(run 'main)
+;? (prn int-canon.memory*)
+;? (prn sleeping-routines*)
+; read should cause the routine to sleep, and
+; the sole sleeping routine should trigger the deadlock detector
+(let last-routine (deq completed-routines*)
+  (when (or (no rep.last-routine!error)
+            (~posmatch "deadlock" rep.last-routine!error))
+    (prn "F - 'read' on empty channel blocks (puts the routine to sleep until the channel gets data)")))
+;? (quit)
 
-; TODO
+(reset)
+(new-trace "channel-write-block")
+(add-fns
+  '((main
+      ((1 channel-address) <- new-channel (1 literal))
+      ((2 integer-address) <- new (integer literal))
+      ((2 integer-address deref) <- copy (34 literal))
+      ((3 tagged-value-address) <- new-tagged-value (integer-address literal) (2 integer-address))
+      ((1 channel-address deref) <- write (1 channel-address deref) (3 tagged-value-address deref))
+      ; channel has capacity 1, but receives a second write
+      ((1 channel-address deref) <- write (1 channel-address deref) (3 tagged-value-address deref)))))
+;? (set dump-trace*)
+;? (= dump-trace* (obj whitelist '("run")))
+(run 'main)
+;? (prn int-canon.memory*)
+; second write should cause the routine to sleep, and
+; the sole sleeping routine should trigger the deadlock detector
+(let last-routine (deq completed-routines*)
+  (when (or (no rep.last-routine!error)
+            (~posmatch "deadlock" rep.last-routine!error))
+    (prn "F - 'write' on full channel blocks (puts the routine to sleep until the channel gets data)")))
 
 ; But how will the sleeping routines wake up? Our scheduler can't watch for
 ; changes to arbitrary values, just tell us if a specific raw location becomes