diff --git a/src/adapters/engines/action.rs b/src/adapters/engines/action.rs index cf95747..7afd952 100644 --- a/src/adapters/engines/action.rs +++ b/src/adapters/engines/action.rs @@ -1,16 +1,16 @@ +use crate::WholePercent; + /// An [Action] describes an effectful operation affecting the deployments. /// Actions describe decisions made by the [DecisionEngine]. #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum Action { - /// Ramp the canary to 100% traffic and decommission the control deployment. + /// Ramp the canary to 100% traffic and decommission the baseline deployment. Promote, - /// Ramp the control to 100% traffic and decommission the canary deployment. - Yank, + /// Ramp the baseline to 100% traffic and decommission the canary deployment. + Rollback, /// RampUp indicates the amount of traffic provided to the canary should increase /// by one unit. - RampUp, - /// RampDown indicates the amount of traffic provided to the canary should decrease. - RampDown, + RampTo(WholePercent), // NB: We don't have a no-op action type, which might be something the DecisionEngine // provides, except that I'm picturing this Action type as part of the interface // into the Ingress, so the Ingress just won't hear back anything from the engine diff --git a/src/adapters/engines/chi.rs b/src/adapters/engines/chi.rs index 0f76b5c..a124305 100644 --- a/src/adapters/engines/chi.rs +++ b/src/adapters/engines/chi.rs @@ -1,20 +1,38 @@ +use crate::pipeline::TimeoutBehavior; use crate::{ metrics::ResponseStatusCode, - stats::{CategoricalObservation, ContingencyTable, Group}, + pipeline::{StageConfig, StageDetails}, + stats::{chi_square_test, CategoricalObservation, ContingencyTable, Group}, }; +use tokio::time::Instant; -use super::DecisionEngine; +use super::{Action, DecisionEngine}; /// The [ChiSquareEngine] uses the Chi Square statistical /// significance test to determine whether the canary should be promoted or not. -#[derive(Default)] pub struct ChiSquareEngine { table: ContingencyTable<5, ResponseStatusCode>, + stages: StageConfig, + start_time: Instant, } impl ChiSquareEngine { pub fn new() -> Self { - Self::default() + let start_time = Instant::now(); + Self { + table: Default::default(), + stages: Default::default(), + start_time, + } + } + + pub fn reset_start_time(&mut self) { + self.start_time = Instant::now(); + } + + fn advance(&mut self) -> Option<&StageDetails> { + self.reset_start_time(); + self.stages.advance() } } @@ -35,9 +53,38 @@ impl DecisionEngine> for ChiSquare } } - fn compute(&mut self) -> Option { - // TODO: Remember to call self.control_data.set_experimental_total - // for calculating the expected values. - todo!() + fn compute(&mut self) -> Option { + // • Check if we even have an active stage. + let stage = self.stages.current()?; + + // • Otherwise, we know we can proceed with tabulation. + // • Compute the p-value. + let is_significant = chi_square_test(&self.table, stage.badness_confidence_limit()); + if is_significant { + // Welp, it's time to roll back. + // TODO: Must check to see if the canary is actually worse than the baseline. + // This will be a false positive if you're actually *fixing* a bug. + return Some(Action::Rollback); + } + let timed_out = stage.has_timed_out(self.start_time); + match (timed_out, stage.timeout_behavior()) { + // If we've timed out, but there's no significant failure, then + // we advance the stage. + (true, TimeoutBehavior::Advance) => { + let details = self.advance(); + match details { + Some(details) => Some(Action::RampTo(details.canary_traffic())), + None => Some(Action::Promote), + } + } + // Otherwise, we keep observing. + (false, TimeoutBehavior::Advance) => None, + } + } +} + +impl Default for ChiSquareEngine { + fn default() -> Self { + Self::new() } } diff --git a/src/pipeline/mod.rs b/src/pipeline/mod.rs index b263c3d..0c87a9e 100644 --- a/src/pipeline/mod.rs +++ b/src/pipeline/mod.rs @@ -1,4 +1,4 @@ -use crate::adapters::{batch_observations, EngineController}; +use crate::adapters::{batch_observations, Action, EngineController}; use crate::{ adapters::{repeat_query, DecisionEngine, Ingress, Monitor}, metrics::ResponseStatusCode, @@ -8,6 +8,9 @@ use bon::bon; use miette::Result; pub(crate) use percent::WholePercent; +pub(crate) use stages::{StageConfig, StageDetails, TimeoutBehavior}; +use tokio::pin; +use tokio_stream::StreamExt as _; /// An alias for the Response Code-based monitor. pub type ResponseMonitor = Box>>; @@ -41,7 +44,7 @@ impl Pipeline { /// Run the pipeline, spawning tasks for the monitor, ingress, and engine to run /// independently. - pub async fn run(self) -> Result<()> { + pub async fn run(mut self) -> Result<()> { // TODO: Handle graceful shutdown. // • First, create a shutdown channel which we // pass to each thread. @@ -57,11 +60,25 @@ impl Pipeline { let observations = batch_observations(event_stream, DEFAULT_QUERY_FREQUENCY); // • We push observations from the Monitor into the // DecisionEngine. - let _controller = - EngineController::new(DEFAULT_QUERY_FREQUENCY).run(self.engine, observations); + let actions = EngineController::new(DEFAULT_QUERY_FREQUENCY).run(self.engine, observations); + pin!(actions); // • Pipe actions to the Ingress. - - todo!(); + while let Some(action) = actions.next().await { + match action { + Action::Promote => { + self.ingress.promote_canary().await?; + return Ok(()); + } + Action::Rollback => { + self.ingress.rollback_canary().await?; + return Ok(()); + } + Action::RampTo(percent) => { + self.ingress.set_canary_traffic(percent).await?; + } + } + } + unreachable!("Connection to Decision Engine terminated early."); } } diff --git a/src/pipeline/percent.rs b/src/pipeline/percent.rs index 109a631..cbfe31a 100644 --- a/src/pipeline/percent.rs +++ b/src/pipeline/percent.rs @@ -20,7 +20,9 @@ impl DecimalPercent { // Using a newtype allows us to ensure they're correct by construction. #[nutype( validate(less_or_equal = 100), - derive(Debug, Display, Copy, Clone, PartialEq, Eq, TryFrom, Into) + derive( + Debug, Display, Copy, Clone, PartialEq, Eq, TryFrom, Into, PartialOrd, Ord, Hash + ) )] pub(crate) struct WholePercent(u8); diff --git a/src/pipeline/stages/config.rs b/src/pipeline/stages/config.rs index 517c649..82fde73 100644 --- a/src/pipeline/stages/config.rs +++ b/src/pipeline/stages/config.rs @@ -1,4 +1,4 @@ -use tokio::time::Duration; +use tokio::time::{Duration, Instant}; use crate::pipeline::percent::{DecimalPercent, WholePercent}; @@ -12,6 +12,7 @@ const DEFAULT_CANARY_STAGE_CONFIDENCE: [f64; 4] = [99.0, 95.0, 90.0, 90.0]; pub struct StageConfig { current_stage: usize, stages: Vec, + start_time: Instant, } impl StageConfig { @@ -24,7 +25,7 @@ impl StageConfig { self.current() } - fn current(&self) -> Option<&StageDetails> { + pub fn current(&self) -> Option<&StageDetails> { self.stages.get(self.current_stage) } } @@ -59,14 +60,16 @@ impl Default for StageConfig { Self { current_stage, stages, + start_time: Instant::now(), } } } #[cfg(test)] mod tests { - use crate::pipeline::stages::config::{ - DEFAULT_CANARY_STAGE_CONFIDENCE, DEFAULT_CANARY_STAGE_TRAFFIC, + use crate::{ + pipeline::stages::config::{DEFAULT_CANARY_STAGE_CONFIDENCE, DEFAULT_CANARY_STAGE_TRAFFIC}, + WholePercent, }; use super::StageConfig; @@ -78,7 +81,8 @@ mod tests { let mut conf = StageConfig::default(); let mut stage = conf.current(); for i in 0..4 { - let expected_traffic = Some(DEFAULT_CANARY_STAGE_TRAFFIC[i]); + let expected_traffic = + Some(WholePercent::try_new(DEFAULT_CANARY_STAGE_TRAFFIC[i]).unwrap()); let observed_traffic = stage.map(|st| st.canary_traffic()); let expected_confidence = Some(DEFAULT_CANARY_STAGE_CONFIDENCE[i]); let observed_confidence = stage.map(|st| st.badness_confidence_limit()); diff --git a/src/pipeline/stages/details.rs b/src/pipeline/stages/details.rs index c71a54c..27ce2b9 100644 --- a/src/pipeline/stages/details.rs +++ b/src/pipeline/stages/details.rs @@ -19,8 +19,8 @@ pub struct StageDetails { } impl StageDetails { - pub(crate) fn canary_traffic(&self) -> u8 { - self.canary_traffic.into() + pub(crate) fn canary_traffic(&self) -> WholePercent { + self.canary_traffic } pub(crate) fn badness_confidence_limit(&self) -> f64 { @@ -34,6 +34,11 @@ impl StageDetails { pub(crate) fn timeout_behavior(&self) -> TimeoutBehavior { self.timeout_behavior } + + /// Returns true if the current time is beyond the timeout limit. + pub(crate) fn has_timed_out(&self, start_time: tokio::time::Instant) -> bool { + tokio::time::Instant::now().duration_since(start_time) > self.timeout + } } #[bon] @@ -52,3 +57,30 @@ impl StageDetails { } } } + +#[cfg(test)] +mod tests { + use tokio::time::{Duration, Instant}; + + use crate::{pipeline::percent::DecimalPercent, WholePercent}; + + use super::StageDetails; + + /// This test checks whether is_timed_out works by checking it against + /// times in the past and future. + #[test] + fn is_timed_out() { + let one_year_in_seconds = 60 * 60 * 24 * 365; + let distant_past = Instant::now() - Duration::from_secs(one_year_in_seconds); + let far_future = Instant::now() + Duration::from_secs(one_year_in_seconds); + let example = StageDetails { + canary_traffic: WholePercent::try_new(100).unwrap(), + badness_confidence_limit: DecimalPercent::try_new(100.0).unwrap(), + timeout: Duration::from_secs(5 * 60), // five minutes + timeout_behavior: crate::pipeline::stages::TimeoutBehavior::Advance, + }; + + assert_eq!(true, example.has_timed_out(distant_past)); + assert_eq!(false, example.has_timed_out(far_future)); + } +} diff --git a/src/pipeline/stages/mod.rs b/src/pipeline/stages/mod.rs index 7a99a46..a122f26 100644 --- a/src/pipeline/stages/mod.rs +++ b/src/pipeline/stages/mod.rs @@ -1,3 +1,6 @@ +pub(crate) use config::StageConfig; +pub(crate) use details::StageDetails; + mod config; mod details; @@ -10,7 +13,7 @@ mod details; /// of confidence, which only tells us how confident we are that /// the deployment *isn't* safe). #[derive(Clone, Copy, Debug, PartialEq, Eq)] -enum TimeoutBehavior { +pub(crate) enum TimeoutBehavior { /// If we don't have confidence that the deployment is bad, /// we advance to the next stage. Advance, diff --git a/src/stats/chi.rs b/src/stats/chi.rs index 2bec847..b867050 100644 --- a/src/stats/chi.rs +++ b/src/stats/chi.rs @@ -6,7 +6,7 @@ use super::{Categorical, ContingencyTable}; /// Alpha represents the alpha cutoff, expressed as a floating point from [0, 1] inclusive. /// For example, 0.95 is the standard 5% confidency interval. -fn chi_square_test>( +pub fn chi_square_test>( table: &ContingencyTable, alpha: f64, ) -> bool { diff --git a/src/stats/contingency.rs b/src/stats/contingency.rs index 034cf90..cb47439 100644 --- a/src/stats/contingency.rs +++ b/src/stats/contingency.rs @@ -49,7 +49,7 @@ impl> ContingencyTable { expected_in_category * total_observed / expected_total } - /// calculate the expected count for the category with index `i`. + /// calculate the observed count for the category with index `i`. pub fn observed_by_index(&self, i: usize) -> u32 { self.observed.get_count_by_index(i) } diff --git a/src/stats/mod.rs b/src/stats/mod.rs index c62c6dd..77a1e64 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -1,105 +1,9 @@ -use std::collections::HashMap; - pub use categorical::Categorical; +pub use chi::chi_square_test; pub use contingency::ContingencyTable; pub use group::Group; pub use observation::{CategoricalObservation, Observation}; -use crate::metrics::ResponseStatusCode; - -// TODO: Before long, we can delete this file since this is an -// old and mostly incorrect implement of X2. - -/// The alpha cutoff is the amount of confidence must have in the result -/// to feel comfortable that the result is not due to chance, but instead -/// do to the independent variable. The valu is expressed as a confidence -/// percentage: 0.05 means we are 95% confident that the observed difference -/// is not due to chance, but actually because the experimental group differs -/// from the control group. -const DEFAULT_ALPHA_CUTOFF: f64 = 0.05; - -/// The [ChiSquareEngine] calculates the Chi Square test statistic -/// based on the data stored in its contingency tables. -// #[deprecated = "Use crate::adapters::engines::ChiSquareEngine instead"] -struct ChiSquareEngine { - control: Table, - experimental: Table, - total_control_count: usize, - total_experimental_count: usize, - alpha_cutoff: f64, -} - -impl Default for ChiSquareEngine { - fn default() -> Self { - Self::new() - } -} - -impl ChiSquareEngine { - pub fn new() -> Self { - Self { - control: HashMap::default(), - experimental: HashMap::default(), - total_control_count: 0, - total_experimental_count: 0, - alpha_cutoff: DEFAULT_ALPHA_CUTOFF, - } - } - - pub fn add_observation(&mut self, obs: CategoricalObservation<5, ResponseStatusCode>) { - // Fetch the count of observations for the given group. - let entry = match obs.group { - Group::Control => { - self.total_control_count += 1; - self.control.entry(obs.outcome) - } - Group::Experimental => { - self.total_experimental_count += 1; - self.experimental.entry(obs.outcome) - } - }; - // Increment the count. - entry.and_modify(|count| *count += 1).or_insert(1); - } - - /// calculate the test statistic from the contingency tables. - pub fn calc_test_statistic(&self) -> f64 { - let mut error = 0.0; - let categories = [ - ResponseStatusCode::_1XX, - ResponseStatusCode::_2XX, - ResponseStatusCode::_3XX, - ResponseStatusCode::_4XX, - ResponseStatusCode::_5XX, - ]; - // For each category, we calculate the squared error between the - // expected and the observed probabilies. - for category in categories { - let expected = self.expected_frequency(category); - let observed = self.observed_frequency(category); - error += (observed - expected).powi(2) / expected; - } - error - } - - /// calculate the expected frequency for this category. - fn expected_frequency(&self, category: ResponseStatusCode) -> f64 { - let observation_count = self.control[&category] as f64; - let total_count = self.control[&category] as f64; - observation_count / total_count - } - - /// calculate the observed frequency for this category. - fn observed_frequency(&self, category: ResponseStatusCode) -> f64 { - let observation_count = self.experimental[&category] as f64; - let total_count = self.experimental[&category] as f64; - observation_count / total_count - } -} - -/// This type maps the dependent variable to its count. -type Table = HashMap; - /// For modeling categorical data. mod categorical; /// contains the engine to calculate the chi square test statistic.