From e5f0ed11c445c960364db9da53f80aec5008a024 Mon Sep 17 00:00:00 2001 From: Robbie McKinstry Date: Tue, 26 Nov 2024 15:56:19 -0500 Subject: [PATCH 1/4] WIP: implement decision engine decision making --- src/adapters/engines/action.rs | 6 +-- src/adapters/engines/chi.rs | 34 ++++++++++-- src/pipeline/mod.rs | 1 + src/pipeline/stages/config.rs | 2 +- src/pipeline/stages/details.rs | 5 ++ src/pipeline/stages/mod.rs | 4 +- src/stats/chi.rs | 2 +- src/stats/mod.rs | 94 +--------------------------------- 8 files changed, 45 insertions(+), 103 deletions(-) diff --git a/src/adapters/engines/action.rs b/src/adapters/engines/action.rs index cf95747..eec776c 100644 --- a/src/adapters/engines/action.rs +++ b/src/adapters/engines/action.rs @@ -2,10 +2,10 @@ /// Actions describe decisions made by the [DecisionEngine]. #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum Action { - /// Ramp the canary to 100% traffic and decommission the control deployment. + /// Ramp the canary to 100% traffic and decommission the baseline deployment. Promote, - /// Ramp the control to 100% traffic and decommission the canary deployment. - Yank, + /// Ramp the baseline to 100% traffic and decommission the canary deployment. + Rollback, /// RampUp indicates the amount of traffic provided to the canary should increase /// by one unit. RampUp, diff --git a/src/adapters/engines/chi.rs b/src/adapters/engines/chi.rs index 0f76b5c..c54306a 100644 --- a/src/adapters/engines/chi.rs +++ b/src/adapters/engines/chi.rs @@ -1,15 +1,15 @@ use crate::{ - metrics::ResponseStatusCode, - stats::{CategoricalObservation, ContingencyTable, Group}, + metrics::ResponseStatusCode, pipeline::StageConfig, stats::{chi_square_test, CategoricalObservation, ContingencyTable, Group} }; -use super::DecisionEngine; +use super::{Action, DecisionEngine}; /// The [ChiSquareEngine] uses the Chi Square statistical /// significance test to determine whether the canary should be promoted or not. #[derive(Default)] pub struct ChiSquareEngine { table: ContingencyTable<5, ResponseStatusCode>, + stages: StageConfig, } impl ChiSquareEngine { @@ -35,9 +35,35 @@ impl DecisionEngine> for ChiSquare } } - fn compute(&mut self) -> Option { + fn compute(&mut self) -> Option { + // • Check if we even have an active stage. + let stage = match self.stages.current() { + // If not, then we've already emitted our final action. + None => return None, + Some(stage) => stage, + }; + + // • Otherwise, we know we can proceed with tabulation. + // • Compute the p-value. + let is_significant = chi_square_test(&self.table, stage.badness_confidence_limit()); + let timed_out = stage.is_timed_out(); + match (is_significant, timed_out, stage.timeout_behavior()) { + } + + /* + + if is_significant { + // Welp, it's time to roll back. + // TODO: Must check to see if the canary is actually worse than the baseline. + // This will be a false positive if you're actually *fixing* a bug. + return Some(Action::Rollback); + } + // Now, we check to see if this stage has timed out yet. + stage.is_timed_out() + // TODO: Remember to call self.control_data.set_experimental_total // for calculating the expected values. todo!() + */ } } diff --git a/src/pipeline/mod.rs b/src/pipeline/mod.rs index b263c3d..21e4c6c 100644 --- a/src/pipeline/mod.rs +++ b/src/pipeline/mod.rs @@ -8,6 +8,7 @@ use bon::bon; use miette::Result; pub(crate) use percent::WholePercent; +pub (crate) use stages::StageConfig; /// An alias for the Response Code-based monitor. pub type ResponseMonitor = Box>>; diff --git a/src/pipeline/stages/config.rs b/src/pipeline/stages/config.rs index 517c649..dfabab7 100644 --- a/src/pipeline/stages/config.rs +++ b/src/pipeline/stages/config.rs @@ -24,7 +24,7 @@ impl StageConfig { self.current() } - fn current(&self) -> Option<&StageDetails> { + pub fn current(&self) -> Option<&StageDetails> { self.stages.get(self.current_stage) } } diff --git a/src/pipeline/stages/details.rs b/src/pipeline/stages/details.rs index c71a54c..683cab4 100644 --- a/src/pipeline/stages/details.rs +++ b/src/pipeline/stages/details.rs @@ -34,6 +34,11 @@ impl StageDetails { pub(crate) fn timeout_behavior(&self) -> TimeoutBehavior { self.timeout_behavior } + + /// Returns true if the current time is beyond the timeout limit. + pub(crate) fn is_timed_out(&self, start_time: tokio::time::Instant) -> bool { + todo!(); + } } #[bon] diff --git a/src/pipeline/stages/mod.rs b/src/pipeline/stages/mod.rs index 7a99a46..4613e7e 100644 --- a/src/pipeline/stages/mod.rs +++ b/src/pipeline/stages/mod.rs @@ -1,3 +1,5 @@ +pub(crate) use config::StageConfig; + mod config; mod details; @@ -10,7 +12,7 @@ mod details; /// of confidence, which only tells us how confident we are that /// the deployment *isn't* safe). #[derive(Clone, Copy, Debug, PartialEq, Eq)] -enum TimeoutBehavior { +pub (crate) enum TimeoutBehavior { /// If we don't have confidence that the deployment is bad, /// we advance to the next stage. Advance, diff --git a/src/stats/chi.rs b/src/stats/chi.rs index 2bec847..b867050 100644 --- a/src/stats/chi.rs +++ b/src/stats/chi.rs @@ -6,7 +6,7 @@ use super::{Categorical, ContingencyTable}; /// Alpha represents the alpha cutoff, expressed as a floating point from [0, 1] inclusive. /// For example, 0.95 is the standard 5% confidency interval. -fn chi_square_test>( +pub fn chi_square_test>( table: &ContingencyTable, alpha: f64, ) -> bool { diff --git a/src/stats/mod.rs b/src/stats/mod.rs index c62c6dd..72588a9 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -4,102 +4,10 @@ pub use categorical::Categorical; pub use contingency::ContingencyTable; pub use group::Group; pub use observation::{CategoricalObservation, Observation}; +pub use chi::chi_square_test; use crate::metrics::ResponseStatusCode; -// TODO: Before long, we can delete this file since this is an -// old and mostly incorrect implement of X2. - -/// The alpha cutoff is the amount of confidence must have in the result -/// to feel comfortable that the result is not due to chance, but instead -/// do to the independent variable. The valu is expressed as a confidence -/// percentage: 0.05 means we are 95% confident that the observed difference -/// is not due to chance, but actually because the experimental group differs -/// from the control group. -const DEFAULT_ALPHA_CUTOFF: f64 = 0.05; - -/// The [ChiSquareEngine] calculates the Chi Square test statistic -/// based on the data stored in its contingency tables. -// #[deprecated = "Use crate::adapters::engines::ChiSquareEngine instead"] -struct ChiSquareEngine { - control: Table, - experimental: Table, - total_control_count: usize, - total_experimental_count: usize, - alpha_cutoff: f64, -} - -impl Default for ChiSquareEngine { - fn default() -> Self { - Self::new() - } -} - -impl ChiSquareEngine { - pub fn new() -> Self { - Self { - control: HashMap::default(), - experimental: HashMap::default(), - total_control_count: 0, - total_experimental_count: 0, - alpha_cutoff: DEFAULT_ALPHA_CUTOFF, - } - } - - pub fn add_observation(&mut self, obs: CategoricalObservation<5, ResponseStatusCode>) { - // Fetch the count of observations for the given group. - let entry = match obs.group { - Group::Control => { - self.total_control_count += 1; - self.control.entry(obs.outcome) - } - Group::Experimental => { - self.total_experimental_count += 1; - self.experimental.entry(obs.outcome) - } - }; - // Increment the count. - entry.and_modify(|count| *count += 1).or_insert(1); - } - - /// calculate the test statistic from the contingency tables. - pub fn calc_test_statistic(&self) -> f64 { - let mut error = 0.0; - let categories = [ - ResponseStatusCode::_1XX, - ResponseStatusCode::_2XX, - ResponseStatusCode::_3XX, - ResponseStatusCode::_4XX, - ResponseStatusCode::_5XX, - ]; - // For each category, we calculate the squared error between the - // expected and the observed probabilies. - for category in categories { - let expected = self.expected_frequency(category); - let observed = self.observed_frequency(category); - error += (observed - expected).powi(2) / expected; - } - error - } - - /// calculate the expected frequency for this category. - fn expected_frequency(&self, category: ResponseStatusCode) -> f64 { - let observation_count = self.control[&category] as f64; - let total_count = self.control[&category] as f64; - observation_count / total_count - } - - /// calculate the observed frequency for this category. - fn observed_frequency(&self, category: ResponseStatusCode) -> f64 { - let observation_count = self.experimental[&category] as f64; - let total_count = self.experimental[&category] as f64; - observation_count / total_count - } -} - -/// This type maps the dependent variable to its count. -type Table = HashMap; - /// For modeling categorical data. mod categorical; /// contains the engine to calculate the chi square test statistic. From 1015bc8bf932a38940d078c84f76b6c5084afa67 Mon Sep 17 00:00:00 2001 From: Robbie McKinstry Date: Tue, 26 Nov 2024 16:46:45 -0500 Subject: [PATCH 2/4] add fn to check if the stage has timed out --- src/adapters/engines/chi.rs | 30 ++++++++++++++++++++++-------- src/pipeline/stages/config.rs | 4 +++- src/pipeline/stages/details.rs | 33 +++++++++++++++++++++++++++++++-- src/stats/mod.rs | 4 ---- 4 files changed, 56 insertions(+), 15 deletions(-) diff --git a/src/adapters/engines/chi.rs b/src/adapters/engines/chi.rs index c54306a..fc8ec2a 100644 --- a/src/adapters/engines/chi.rs +++ b/src/adapters/engines/chi.rs @@ -1,3 +1,5 @@ +use tokio::time::Instant; + use crate::{ metrics::ResponseStatusCode, pipeline::StageConfig, stats::{chi_square_test, CategoricalObservation, ContingencyTable, Group} }; @@ -6,15 +8,24 @@ use super::{Action, DecisionEngine}; /// The [ChiSquareEngine] uses the Chi Square statistical /// significance test to determine whether the canary should be promoted or not. -#[derive(Default)] pub struct ChiSquareEngine { table: ContingencyTable<5, ResponseStatusCode>, stages: StageConfig, + start_time: Instant } impl ChiSquareEngine { pub fn new() -> Self { - Self::default() + let start_time = Instant::now(); + Self { + table: Default::default(), + stages: Default::default(), + start_time, + } + } + + pub fn reset_start_time(&mut self) { + self.start_time = Instant::now(); } } @@ -37,17 +48,14 @@ impl DecisionEngine> for ChiSquare fn compute(&mut self) -> Option { // • Check if we even have an active stage. - let stage = match self.stages.current() { - // If not, then we've already emitted our final action. - None => return None, - Some(stage) => stage, - }; + let stage = self.stages.current()?; // • Otherwise, we know we can proceed with tabulation. // • Compute the p-value. let is_significant = chi_square_test(&self.table, stage.badness_confidence_limit()); - let timed_out = stage.is_timed_out(); + let timed_out = stage.has_timed_out(self.start_time); match (is_significant, timed_out, stage.timeout_behavior()) { + _ => todo!(), } /* @@ -67,3 +75,9 @@ impl DecisionEngine> for ChiSquare */ } } + +impl Default for ChiSquareEngine { + fn default() -> Self { + Self::new() + } +} diff --git a/src/pipeline/stages/config.rs b/src/pipeline/stages/config.rs index dfabab7..eefef39 100644 --- a/src/pipeline/stages/config.rs +++ b/src/pipeline/stages/config.rs @@ -1,4 +1,4 @@ -use tokio::time::Duration; +use tokio::time::{Duration, Instant}; use crate::pipeline::percent::{DecimalPercent, WholePercent}; @@ -12,6 +12,7 @@ const DEFAULT_CANARY_STAGE_CONFIDENCE: [f64; 4] = [99.0, 95.0, 90.0, 90.0]; pub struct StageConfig { current_stage: usize, stages: Vec, + start_time: Instant, } impl StageConfig { @@ -59,6 +60,7 @@ impl Default for StageConfig { Self { current_stage, stages, + start_time: Instant::now(), } } } diff --git a/src/pipeline/stages/details.rs b/src/pipeline/stages/details.rs index 683cab4..bd31bdb 100644 --- a/src/pipeline/stages/details.rs +++ b/src/pipeline/stages/details.rs @@ -36,8 +36,8 @@ impl StageDetails { } /// Returns true if the current time is beyond the timeout limit. - pub(crate) fn is_timed_out(&self, start_time: tokio::time::Instant) -> bool { - todo!(); + pub(crate) fn has_timed_out(&self, start_time: tokio::time::Instant) -> bool { + tokio::time::Instant::now().duration_since(start_time) > self.timeout } } @@ -57,3 +57,32 @@ impl StageDetails { } } } + +#[cfg(test)] +mod tests { + use tokio::time::{Duration, Instant}; + + use crate::{pipeline::percent::DecimalPercent, WholePercent}; + + use super::StageDetails; + + + /// This test checks whether is_timed_out works by checking it against + /// times in the past and future. + #[test] + fn is_timed_out() { + let one_year_in_seconds = 60*60*24*365; + let distant_past = Instant::now() - Duration::from_secs(one_year_in_seconds); + let far_future = Instant::now() + Duration::from_secs(one_year_in_seconds); + let example = StageDetails { + canary_traffic: WholePercent::try_new(100).unwrap(), + badness_confidence_limit: DecimalPercent::try_new(100.0).unwrap(), + timeout: Duration::from_secs(5*60), // five minutes + timeout_behavior: crate::pipeline::stages::TimeoutBehavior::Advance, + }; + + assert_eq!(true, example.has_timed_out(distant_past)); + assert_eq!(false, example.has_timed_out(far_future)); + + } +} diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 72588a9..28f40c4 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -1,13 +1,9 @@ -use std::collections::HashMap; - pub use categorical::Categorical; pub use contingency::ContingencyTable; pub use group::Group; pub use observation::{CategoricalObservation, Observation}; pub use chi::chi_square_test; -use crate::metrics::ResponseStatusCode; - /// For modeling categorical data. mod categorical; /// contains the engine to calculate the chi square test statistic. From cce86801f6d322e254af871d2e12c080ae67a7ac Mon Sep 17 00:00:00 2001 From: Robbie McKinstry Date: Tue, 26 Nov 2024 16:58:47 -0500 Subject: [PATCH 3/4] Implement compute function --- src/adapters/engines/action.rs | 6 +++--- src/adapters/engines/chi.rs | 34 +++++++++++++++++++++++++--------- src/pipeline/mod.rs | 2 +- src/pipeline/percent.rs | 2 +- src/pipeline/stages/config.rs | 6 +++--- src/pipeline/stages/details.rs | 4 ++-- src/pipeline/stages/mod.rs | 1 + 7 files changed, 36 insertions(+), 19 deletions(-) diff --git a/src/adapters/engines/action.rs b/src/adapters/engines/action.rs index eec776c..7afd952 100644 --- a/src/adapters/engines/action.rs +++ b/src/adapters/engines/action.rs @@ -1,3 +1,5 @@ +use crate::WholePercent; + /// An [Action] describes an effectful operation affecting the deployments. /// Actions describe decisions made by the [DecisionEngine]. #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] @@ -8,9 +10,7 @@ pub enum Action { Rollback, /// RampUp indicates the amount of traffic provided to the canary should increase /// by one unit. - RampUp, - /// RampDown indicates the amount of traffic provided to the canary should decrease. - RampDown, + RampTo(WholePercent), // NB: We don't have a no-op action type, which might be something the DecisionEngine // provides, except that I'm picturing this Action type as part of the interface // into the Ingress, so the Ingress just won't hear back anything from the engine diff --git a/src/adapters/engines/chi.rs b/src/adapters/engines/chi.rs index fc8ec2a..2971e36 100644 --- a/src/adapters/engines/chi.rs +++ b/src/adapters/engines/chi.rs @@ -1,8 +1,8 @@ use tokio::time::Instant; - use crate::{ - metrics::ResponseStatusCode, pipeline::StageConfig, stats::{chi_square_test, CategoricalObservation, ContingencyTable, Group} + metrics::ResponseStatusCode, pipeline::{StageConfig, StageDetails}, stats::{chi_square_test, CategoricalObservation, ContingencyTable, Group} }; +use crate::pipeline::TimeoutBehavior; use super::{Action, DecisionEngine}; @@ -27,6 +27,11 @@ impl ChiSquareEngine { pub fn reset_start_time(&mut self) { self.start_time = Instant::now(); } + + fn advance(&mut self) -> Option<&StageDetails> { + self.reset_start_time(); + self.stages.advance() + } } impl DecisionEngine> for ChiSquareEngine { @@ -53,19 +58,30 @@ impl DecisionEngine> for ChiSquare // • Otherwise, we know we can proceed with tabulation. // • Compute the p-value. let is_significant = chi_square_test(&self.table, stage.badness_confidence_limit()); - let timed_out = stage.has_timed_out(self.start_time); - match (is_significant, timed_out, stage.timeout_behavior()) { - _ => todo!(), - } - - /* - if is_significant { // Welp, it's time to roll back. // TODO: Must check to see if the canary is actually worse than the baseline. // This will be a false positive if you're actually *fixing* a bug. return Some(Action::Rollback); } + let timed_out = stage.has_timed_out(self.start_time); + match (timed_out, stage.timeout_behavior()) { + // If we've timed out, but there's no significant failure, then + // we advance the stage. + (true, TimeoutBehavior::Advance) => { + let details = self.advance(); + match details { + Some(details) => Some(Action::RampTo(details.canary_traffic())), + None => Some(Action::Promote), + } + }, + // Otherwise, we keep observing. + (false, TimeoutBehavior::Advance) => None, + } + + /* + + // Now, we check to see if this stage has timed out yet. stage.is_timed_out() diff --git a/src/pipeline/mod.rs b/src/pipeline/mod.rs index 21e4c6c..5234a4a 100644 --- a/src/pipeline/mod.rs +++ b/src/pipeline/mod.rs @@ -8,7 +8,7 @@ use bon::bon; use miette::Result; pub(crate) use percent::WholePercent; -pub (crate) use stages::StageConfig; +pub (crate) use stages::{StageDetails, StageConfig, TimeoutBehavior}; /// An alias for the Response Code-based monitor. pub type ResponseMonitor = Box>>; diff --git a/src/pipeline/percent.rs b/src/pipeline/percent.rs index 109a631..edf32d7 100644 --- a/src/pipeline/percent.rs +++ b/src/pipeline/percent.rs @@ -20,7 +20,7 @@ impl DecimalPercent { // Using a newtype allows us to ensure they're correct by construction. #[nutype( validate(less_or_equal = 100), - derive(Debug, Display, Copy, Clone, PartialEq, Eq, TryFrom, Into) + derive(Debug, Display, Copy, Clone, PartialEq, Eq, TryFrom, Into, PartialOrd, Ord, Hash) )] pub(crate) struct WholePercent(u8); diff --git a/src/pipeline/stages/config.rs b/src/pipeline/stages/config.rs index eefef39..ff59cfd 100644 --- a/src/pipeline/stages/config.rs +++ b/src/pipeline/stages/config.rs @@ -67,9 +67,9 @@ impl Default for StageConfig { #[cfg(test)] mod tests { - use crate::pipeline::stages::config::{ + use crate::{pipeline::stages::config::{ DEFAULT_CANARY_STAGE_CONFIDENCE, DEFAULT_CANARY_STAGE_TRAFFIC, - }; + }, WholePercent}; use super::StageConfig; @@ -80,7 +80,7 @@ mod tests { let mut conf = StageConfig::default(); let mut stage = conf.current(); for i in 0..4 { - let expected_traffic = Some(DEFAULT_CANARY_STAGE_TRAFFIC[i]); + let expected_traffic = Some(WholePercent::try_new(DEFAULT_CANARY_STAGE_TRAFFIC[i]).unwrap()); let observed_traffic = stage.map(|st| st.canary_traffic()); let expected_confidence = Some(DEFAULT_CANARY_STAGE_CONFIDENCE[i]); let observed_confidence = stage.map(|st| st.badness_confidence_limit()); diff --git a/src/pipeline/stages/details.rs b/src/pipeline/stages/details.rs index bd31bdb..bb1c6f4 100644 --- a/src/pipeline/stages/details.rs +++ b/src/pipeline/stages/details.rs @@ -19,8 +19,8 @@ pub struct StageDetails { } impl StageDetails { - pub(crate) fn canary_traffic(&self) -> u8 { - self.canary_traffic.into() + pub(crate) fn canary_traffic(&self) -> WholePercent { + self.canary_traffic } pub(crate) fn badness_confidence_limit(&self) -> f64 { diff --git a/src/pipeline/stages/mod.rs b/src/pipeline/stages/mod.rs index 4613e7e..b24b5ec 100644 --- a/src/pipeline/stages/mod.rs +++ b/src/pipeline/stages/mod.rs @@ -1,4 +1,5 @@ pub(crate) use config::StageConfig; +pub (crate) use details::StageDetails; mod config; mod details; From bb1c850d5372d8285185083919b9090f3d796a31 Mon Sep 17 00:00:00 2001 From: Robbie McKinstry Date: Tue, 26 Nov 2024 17:34:10 -0500 Subject: [PATCH 4/4] Plumb actions into ingress --- src/adapters/engines/chi.rs | 25 ++++++++----------------- src/pipeline/mod.rs | 30 +++++++++++++++++++++++------- src/pipeline/percent.rs | 4 +++- src/pipeline/stages/config.rs | 10 ++++++---- src/pipeline/stages/details.rs | 8 +++----- src/pipeline/stages/mod.rs | 4 ++-- src/stats/contingency.rs | 2 +- src/stats/mod.rs | 2 +- 8 files changed, 47 insertions(+), 38 deletions(-) diff --git a/src/adapters/engines/chi.rs b/src/adapters/engines/chi.rs index 2971e36..a124305 100644 --- a/src/adapters/engines/chi.rs +++ b/src/adapters/engines/chi.rs @@ -1,8 +1,10 @@ -use tokio::time::Instant; +use crate::pipeline::TimeoutBehavior; use crate::{ - metrics::ResponseStatusCode, pipeline::{StageConfig, StageDetails}, stats::{chi_square_test, CategoricalObservation, ContingencyTable, Group} + metrics::ResponseStatusCode, + pipeline::{StageConfig, StageDetails}, + stats::{chi_square_test, CategoricalObservation, ContingencyTable, Group}, }; -use crate::pipeline::TimeoutBehavior; +use tokio::time::Instant; use super::{Action, DecisionEngine}; @@ -11,7 +13,7 @@ use super::{Action, DecisionEngine}; pub struct ChiSquareEngine { table: ContingencyTable<5, ResponseStatusCode>, stages: StageConfig, - start_time: Instant + start_time: Instant, } impl ChiSquareEngine { @@ -69,26 +71,15 @@ impl DecisionEngine> for ChiSquare // If we've timed out, but there's no significant failure, then // we advance the stage. (true, TimeoutBehavior::Advance) => { - let details = self.advance(); + let details = self.advance(); match details { Some(details) => Some(Action::RampTo(details.canary_traffic())), None => Some(Action::Promote), } - }, + } // Otherwise, we keep observing. (false, TimeoutBehavior::Advance) => None, } - - /* - - - // Now, we check to see if this stage has timed out yet. - stage.is_timed_out() - - // TODO: Remember to call self.control_data.set_experimental_total - // for calculating the expected values. - todo!() - */ } } diff --git a/src/pipeline/mod.rs b/src/pipeline/mod.rs index 5234a4a..0c87a9e 100644 --- a/src/pipeline/mod.rs +++ b/src/pipeline/mod.rs @@ -1,4 +1,4 @@ -use crate::adapters::{batch_observations, EngineController}; +use crate::adapters::{batch_observations, Action, EngineController}; use crate::{ adapters::{repeat_query, DecisionEngine, Ingress, Monitor}, metrics::ResponseStatusCode, @@ -8,7 +8,9 @@ use bon::bon; use miette::Result; pub(crate) use percent::WholePercent; -pub (crate) use stages::{StageDetails, StageConfig, TimeoutBehavior}; +pub(crate) use stages::{StageConfig, StageDetails, TimeoutBehavior}; +use tokio::pin; +use tokio_stream::StreamExt as _; /// An alias for the Response Code-based monitor. pub type ResponseMonitor = Box>>; @@ -42,7 +44,7 @@ impl Pipeline { /// Run the pipeline, spawning tasks for the monitor, ingress, and engine to run /// independently. - pub async fn run(self) -> Result<()> { + pub async fn run(mut self) -> Result<()> { // TODO: Handle graceful shutdown. // • First, create a shutdown channel which we // pass to each thread. @@ -58,11 +60,25 @@ impl Pipeline { let observations = batch_observations(event_stream, DEFAULT_QUERY_FREQUENCY); // • We push observations from the Monitor into the // DecisionEngine. - let _controller = - EngineController::new(DEFAULT_QUERY_FREQUENCY).run(self.engine, observations); + let actions = EngineController::new(DEFAULT_QUERY_FREQUENCY).run(self.engine, observations); + pin!(actions); // • Pipe actions to the Ingress. - - todo!(); + while let Some(action) = actions.next().await { + match action { + Action::Promote => { + self.ingress.promote_canary().await?; + return Ok(()); + } + Action::Rollback => { + self.ingress.rollback_canary().await?; + return Ok(()); + } + Action::RampTo(percent) => { + self.ingress.set_canary_traffic(percent).await?; + } + } + } + unreachable!("Connection to Decision Engine terminated early."); } } diff --git a/src/pipeline/percent.rs b/src/pipeline/percent.rs index edf32d7..cbfe31a 100644 --- a/src/pipeline/percent.rs +++ b/src/pipeline/percent.rs @@ -20,7 +20,9 @@ impl DecimalPercent { // Using a newtype allows us to ensure they're correct by construction. #[nutype( validate(less_or_equal = 100), - derive(Debug, Display, Copy, Clone, PartialEq, Eq, TryFrom, Into, PartialOrd, Ord, Hash) + derive( + Debug, Display, Copy, Clone, PartialEq, Eq, TryFrom, Into, PartialOrd, Ord, Hash + ) )] pub(crate) struct WholePercent(u8); diff --git a/src/pipeline/stages/config.rs b/src/pipeline/stages/config.rs index ff59cfd..82fde73 100644 --- a/src/pipeline/stages/config.rs +++ b/src/pipeline/stages/config.rs @@ -67,9 +67,10 @@ impl Default for StageConfig { #[cfg(test)] mod tests { - use crate::{pipeline::stages::config::{ - DEFAULT_CANARY_STAGE_CONFIDENCE, DEFAULT_CANARY_STAGE_TRAFFIC, - }, WholePercent}; + use crate::{ + pipeline::stages::config::{DEFAULT_CANARY_STAGE_CONFIDENCE, DEFAULT_CANARY_STAGE_TRAFFIC}, + WholePercent, + }; use super::StageConfig; @@ -80,7 +81,8 @@ mod tests { let mut conf = StageConfig::default(); let mut stage = conf.current(); for i in 0..4 { - let expected_traffic = Some(WholePercent::try_new(DEFAULT_CANARY_STAGE_TRAFFIC[i]).unwrap()); + let expected_traffic = + Some(WholePercent::try_new(DEFAULT_CANARY_STAGE_TRAFFIC[i]).unwrap()); let observed_traffic = stage.map(|st| st.canary_traffic()); let expected_confidence = Some(DEFAULT_CANARY_STAGE_CONFIDENCE[i]); let observed_confidence = stage.map(|st| st.badness_confidence_limit()); diff --git a/src/pipeline/stages/details.rs b/src/pipeline/stages/details.rs index bb1c6f4..27ce2b9 100644 --- a/src/pipeline/stages/details.rs +++ b/src/pipeline/stages/details.rs @@ -37,7 +37,7 @@ impl StageDetails { /// Returns true if the current time is beyond the timeout limit. pub(crate) fn has_timed_out(&self, start_time: tokio::time::Instant) -> bool { - tokio::time::Instant::now().duration_since(start_time) > self.timeout + tokio::time::Instant::now().duration_since(start_time) > self.timeout } } @@ -66,23 +66,21 @@ mod tests { use super::StageDetails; - /// This test checks whether is_timed_out works by checking it against /// times in the past and future. #[test] fn is_timed_out() { - let one_year_in_seconds = 60*60*24*365; + let one_year_in_seconds = 60 * 60 * 24 * 365; let distant_past = Instant::now() - Duration::from_secs(one_year_in_seconds); let far_future = Instant::now() + Duration::from_secs(one_year_in_seconds); let example = StageDetails { canary_traffic: WholePercent::try_new(100).unwrap(), badness_confidence_limit: DecimalPercent::try_new(100.0).unwrap(), - timeout: Duration::from_secs(5*60), // five minutes + timeout: Duration::from_secs(5 * 60), // five minutes timeout_behavior: crate::pipeline::stages::TimeoutBehavior::Advance, }; assert_eq!(true, example.has_timed_out(distant_past)); assert_eq!(false, example.has_timed_out(far_future)); - } } diff --git a/src/pipeline/stages/mod.rs b/src/pipeline/stages/mod.rs index b24b5ec..a122f26 100644 --- a/src/pipeline/stages/mod.rs +++ b/src/pipeline/stages/mod.rs @@ -1,5 +1,5 @@ pub(crate) use config::StageConfig; -pub (crate) use details::StageDetails; +pub(crate) use details::StageDetails; mod config; mod details; @@ -13,7 +13,7 @@ mod details; /// of confidence, which only tells us how confident we are that /// the deployment *isn't* safe). #[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub (crate) enum TimeoutBehavior { +pub(crate) enum TimeoutBehavior { /// If we don't have confidence that the deployment is bad, /// we advance to the next stage. Advance, diff --git a/src/stats/contingency.rs b/src/stats/contingency.rs index 034cf90..cb47439 100644 --- a/src/stats/contingency.rs +++ b/src/stats/contingency.rs @@ -49,7 +49,7 @@ impl> ContingencyTable { expected_in_category * total_observed / expected_total } - /// calculate the expected count for the category with index `i`. + /// calculate the observed count for the category with index `i`. pub fn observed_by_index(&self, i: usize) -> u32 { self.observed.get_count_by_index(i) } diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 28f40c4..77a1e64 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -1,8 +1,8 @@ pub use categorical::Categorical; +pub use chi::chi_square_test; pub use contingency::ContingencyTable; pub use group::Group; pub use observation::{CategoricalObservation, Observation}; -pub use chi::chi_square_test; /// For modeling categorical data. mod categorical;