From 05fe4be58a11eb83373d66069cbd64f75846a4ed Mon Sep 17 00:00:00 2001 From: "Kartik K. Agaram" Date: Wed, 14 Sep 2016 20:24:04 -0700 Subject: 3353 Fix failing scenarios in channel layer. We do so by introducing a kludgy new instruction to explicitly signal when a routine is stuck ('blocked') and waiting on another. All this locking and blocking may well be a crap design. We'll see if we find ourselves using these primitives again. Ideally we don't need them for anything else now that we're done building channels. Still some failing scenarios left in chessboard.mu. Let's see how that goes. --- 073wait.cc | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++++------- 075channel.mu | 5 +++++ 2 files changed, 65 insertions(+), 7 deletions(-) diff --git a/073wait.cc b/073wait.cc index 6f1ba14c..8db03834 100644 --- a/073wait.cc +++ b/073wait.cc @@ -363,8 +363,63 @@ for (int i = 0; i < SIZE(Routines); ++i) { } } +//:: helpers for manipulating routines in tests +//: +//: Managing arbitrary scenarios requires the ability to: +//: a) stop the current routine (`switch`) +//: b) restart a routine (`restart`) +//: c) tell when a routine is blocked +//: +//: A routine is blocked either if it's waiting or if it explicitly signals +//: that it's blocked (even as it periodically wakes up and polls for some +//: event). +//: +//: Signalling blockedness might well be a huge hack. But Mu doesn't have Unix +//: signals to avoid polling with, because signals are also pretty hacky. + +:(before "End routine Fields") +bool blocked; +:(before "End routine Constructor") +blocked = false; + +:(before "End Primitive Recipe Declarations") +CURRENT_ROUTINE_IS_BLOCKED, +:(before "End Primitive Recipe Numbers") +put(Recipe_ordinal, "current-routine-is-blocked", CURRENT_ROUTINE_IS_BLOCKED); +:(before "End Primitive Recipe Checks") +case CURRENT_ROUTINE_IS_BLOCKED: { + if (!inst.ingredients.empty()) { + raise << maybe(get(Recipe, r).name) << "'current-routine-is-blocked' should have no ingredients, but got '" << inst.original_string << "'\n" << end(); + break; + } + break; +} +:(before "End Primitive Recipe Implementations") +case CURRENT_ROUTINE_IS_BLOCKED: { + Current_routine->blocked = true; + break; +} + +:(before "End Primitive Recipe Declarations") +CURRENT_ROUTINE_IS_UNBLOCKED, +:(before "End Primitive Recipe Numbers") +put(Recipe_ordinal, "current-routine-is-unblocked", CURRENT_ROUTINE_IS_UNBLOCKED); +:(before "End Primitive Recipe Checks") +case CURRENT_ROUTINE_IS_UNBLOCKED: { + if (!inst.ingredients.empty()) { + raise << maybe(get(Recipe, r).name) << "'current-routine-is-unblocked' should have no ingredients, but got '" << inst.original_string << "'\n" << end(); + break; + } + break; +} +:(before "End Primitive Recipe Implementations") +case CURRENT_ROUTINE_IS_UNBLOCKED: { + Current_routine->blocked = false; + break; +} + //: also allow waiting on a routine to block -//: (just for tests; use wait_for_routine below wherever possible) +//: (just for tests; use wait_for_routine above wherever possible) :(scenario wait_for_routine_to_block) def f1 [ @@ -379,7 +434,7 @@ def f2 [ +schedule: f1 +run: waiting for routine 2 to block +schedule: f2 -+schedule: waking up blocked routine 1 ++schedule: waking up routine 1 because routine 2 is blocked +schedule: f1 # if we got the synchronization wrong we'd be storing 0 in location 11 +mem: storing 34 in location 11 @@ -415,7 +470,6 @@ case WAIT_FOR_ROUTINE_TO_BLOCK: { Current_routine->state = WAITING; Current_routine->waiting_on_routine_to_block = ingredients.at(0).at(0); trace(9998, "run") << "waiting for routine " << ingredients.at(0).at(0) << " to block" << end(); -//? cerr << Current_routine->id << ": waiting for routine " << ingredients.at(0).at(0) << " to block\n"; break; } @@ -429,10 +483,9 @@ for (int i = 0; i < SIZE(Routines); ++i) { assert(id != waiter->id); // routine can't wait on itself for (int j = 0; j < SIZE(Routines); ++j) { const routine* waitee = Routines.at(j); - if (waitee->id == id && waitee->state != RUNNING) { - // routine is WAITING or COMPLETED or DISCONTINUED - trace(9999, "schedule") << "waking up blocked routine " << waiter->id << end(); -//? cerr << id << " is now unblocked (" << waitee->state << "); waking up waiting routine " << waiter->id << '\n'; + if (waitee->id != id) continue; + if (waitee->state != RUNNING || waitee->blocked) { + trace(9999, "schedule") << "waking up routine " << waiter->id << " because routine " << waitee->id << " is blocked" << end(); waiter->state = RUNNING; waiter->waiting_on_routine_to_block = 0; } diff --git a/075channel.mu b/075channel.mu index 7720ccdd..49ebad87 100644 --- a/075channel.mu +++ b/075channel.mu @@ -78,9 +78,11 @@ def write out:address:sink:_elem, val:_elem -> out:address:sink:_elem [ # channel is full; relinquish lock and give a reader the opportunity to # create room on it reset lock + current-routine-is-blocked switch # avoid spinlocking loop } + current-routine-is-unblocked #? $print [performing write], 10/newline # store a deep copy of val circular-buffer:address:array:_elem <- get *chan, data:offset @@ -121,10 +123,12 @@ def read in:address:source:_elem -> result:_elem, eof?:boolean, in:address:sourc # channel is empty; relinquish lock and give a writer the opportunity to # add to it reset lock + current-routine-is-blocked switch # avoid spinlocking loop } + current-routine-is-unblocked # pull result off full:number <- get *chan, first-full:offset circular-buffer:address:array:_elem <- get *chan, data:offset @@ -327,6 +331,7 @@ after [ { break-unless closed? empty-result:address:_elem <- new _elem:type + current-routine-is-unblocked return *empty-result, 1/true } ] -- cgit 1.4.1-2-gfad0