Skip to content

Commit

Permalink
Merge pull request #44 from wack/robbie/multi-309
Browse files Browse the repository at this point in the history
Implement Decision Making and Plumb to Ingress
  • Loading branch information
RobbieMcKinstry authored Nov 26, 2024
2 parents 4e22278 + bb1c850 commit 4cbbdc1
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 128 deletions.
12 changes: 6 additions & 6 deletions src/adapters/engines/action.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use crate::WholePercent;

/// An [Action] describes an effectful operation affecting the deployments.
/// Actions describe decisions made by the [DecisionEngine].
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum Action {
/// Ramp the canary to 100% traffic and decommission the control deployment.
/// Ramp the canary to 100% traffic and decommission the baseline deployment.
Promote,
/// Ramp the control to 100% traffic and decommission the canary deployment.
Yank,
/// Ramp the baseline to 100% traffic and decommission the canary deployment.
Rollback,
/// RampUp indicates the amount of traffic provided to the canary should increase
/// by one unit.
RampUp,
/// RampDown indicates the amount of traffic provided to the canary should decrease.
RampDown,
RampTo(WholePercent),
// NB: We don't have a no-op action type, which might be something the DecisionEngine
// provides, except that I'm picturing this Action type as part of the interface
// into the Ingress, so the Ingress just won't hear back anything from the engine
Expand Down
63 changes: 55 additions & 8 deletions src/adapters/engines/chi.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,38 @@
use crate::pipeline::TimeoutBehavior;
use crate::{
metrics::ResponseStatusCode,
stats::{CategoricalObservation, ContingencyTable, Group},
pipeline::{StageConfig, StageDetails},
stats::{chi_square_test, CategoricalObservation, ContingencyTable, Group},
};
use tokio::time::Instant;

use super::DecisionEngine;
use super::{Action, DecisionEngine};

/// The [ChiSquareEngine] uses the Chi Square statistical
/// significance test to determine whether the canary should be promoted or not.
#[derive(Default)]
pub struct ChiSquareEngine {
table: ContingencyTable<5, ResponseStatusCode>,
stages: StageConfig,
start_time: Instant,
}

impl ChiSquareEngine {
pub fn new() -> Self {
Self::default()
let start_time = Instant::now();
Self {
table: Default::default(),
stages: Default::default(),
start_time,
}
}

pub fn reset_start_time(&mut self) {
self.start_time = Instant::now();
}

fn advance(&mut self) -> Option<&StageDetails> {
self.reset_start_time();
self.stages.advance()
}
}

Expand All @@ -35,9 +53,38 @@ impl DecisionEngine<CategoricalObservation<5, ResponseStatusCode>> for ChiSquare
}
}

fn compute(&mut self) -> Option<super::Action> {
// TODO: Remember to call self.control_data.set_experimental_total
// for calculating the expected values.
todo!()
fn compute(&mut self) -> Option<Action> {
// • Check if we even have an active stage.
let stage = self.stages.current()?;

// • Otherwise, we know we can proceed with tabulation.
// • Compute the p-value.
let is_significant = chi_square_test(&self.table, stage.badness_confidence_limit());
if is_significant {
// Welp, it's time to roll back.
// TODO: Must check to see if the canary is actually worse than the baseline.
// This will be a false positive if you're actually *fixing* a bug.
return Some(Action::Rollback);
}
let timed_out = stage.has_timed_out(self.start_time);
match (timed_out, stage.timeout_behavior()) {
// If we've timed out, but there's no significant failure, then
// we advance the stage.
(true, TimeoutBehavior::Advance) => {
let details = self.advance();
match details {
Some(details) => Some(Action::RampTo(details.canary_traffic())),
None => Some(Action::Promote),
}
}
// Otherwise, we keep observing.
(false, TimeoutBehavior::Advance) => None,
}
}
}

impl Default for ChiSquareEngine {
fn default() -> Self {
Self::new()
}
}
29 changes: 23 additions & 6 deletions src/pipeline/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::adapters::{batch_observations, EngineController};
use crate::adapters::{batch_observations, Action, EngineController};
use crate::{
adapters::{repeat_query, DecisionEngine, Ingress, Monitor},
metrics::ResponseStatusCode,
Expand All @@ -8,6 +8,9 @@ use bon::bon;
use miette::Result;

pub(crate) use percent::WholePercent;
pub(crate) use stages::{StageConfig, StageDetails, TimeoutBehavior};
use tokio::pin;
use tokio_stream::StreamExt as _;

/// An alias for the Response Code-based monitor.
pub type ResponseMonitor = Box<dyn Monitor<Item = CategoricalObservation<5, ResponseStatusCode>>>;
Expand Down Expand Up @@ -41,7 +44,7 @@ impl Pipeline {

/// Run the pipeline, spawning tasks for the monitor, ingress, and engine to run
/// independently.
pub async fn run(self) -> Result<()> {
pub async fn run(mut self) -> Result<()> {
// TODO: Handle graceful shutdown.
// • First, create a shutdown channel which we
// pass to each thread.
Expand All @@ -57,11 +60,25 @@ impl Pipeline {
let observations = batch_observations(event_stream, DEFAULT_QUERY_FREQUENCY);
// • We push observations from the Monitor into the
// DecisionEngine.
let _controller =
EngineController::new(DEFAULT_QUERY_FREQUENCY).run(self.engine, observations);
let actions = EngineController::new(DEFAULT_QUERY_FREQUENCY).run(self.engine, observations);
pin!(actions);
// • Pipe actions to the Ingress.

todo!();
while let Some(action) = actions.next().await {
match action {
Action::Promote => {
self.ingress.promote_canary().await?;
return Ok(());
}
Action::Rollback => {
self.ingress.rollback_canary().await?;
return Ok(());
}
Action::RampTo(percent) => {
self.ingress.set_canary_traffic(percent).await?;
}
}
}
unreachable!("Connection to Decision Engine terminated early.");
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/pipeline/percent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ impl DecimalPercent {
// Using a newtype allows us to ensure they're correct by construction.
#[nutype(
validate(less_or_equal = 100),
derive(Debug, Display, Copy, Clone, PartialEq, Eq, TryFrom, Into)
derive(
Debug, Display, Copy, Clone, PartialEq, Eq, TryFrom, Into, PartialOrd, Ord, Hash
)
)]
pub(crate) struct WholePercent(u8);

Expand Down
14 changes: 9 additions & 5 deletions src/pipeline/stages/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use tokio::time::Duration;
use tokio::time::{Duration, Instant};

use crate::pipeline::percent::{DecimalPercent, WholePercent};

Expand All @@ -12,6 +12,7 @@ const DEFAULT_CANARY_STAGE_CONFIDENCE: [f64; 4] = [99.0, 95.0, 90.0, 90.0];
pub struct StageConfig {
current_stage: usize,
stages: Vec<StageDetails>,
start_time: Instant,
}

impl StageConfig {
Expand All @@ -24,7 +25,7 @@ impl StageConfig {
self.current()
}

fn current(&self) -> Option<&StageDetails> {
pub fn current(&self) -> Option<&StageDetails> {
self.stages.get(self.current_stage)
}
}
Expand Down Expand Up @@ -59,14 +60,16 @@ impl Default for StageConfig {
Self {
current_stage,
stages,
start_time: Instant::now(),
}
}
}

#[cfg(test)]
mod tests {
use crate::pipeline::stages::config::{
DEFAULT_CANARY_STAGE_CONFIDENCE, DEFAULT_CANARY_STAGE_TRAFFIC,
use crate::{
pipeline::stages::config::{DEFAULT_CANARY_STAGE_CONFIDENCE, DEFAULT_CANARY_STAGE_TRAFFIC},
WholePercent,
};

use super::StageConfig;
Expand All @@ -78,7 +81,8 @@ mod tests {
let mut conf = StageConfig::default();
let mut stage = conf.current();
for i in 0..4 {
let expected_traffic = Some(DEFAULT_CANARY_STAGE_TRAFFIC[i]);
let expected_traffic =
Some(WholePercent::try_new(DEFAULT_CANARY_STAGE_TRAFFIC[i]).unwrap());
let observed_traffic = stage.map(|st| st.canary_traffic());
let expected_confidence = Some(DEFAULT_CANARY_STAGE_CONFIDENCE[i]);
let observed_confidence = stage.map(|st| st.badness_confidence_limit());
Expand Down
36 changes: 34 additions & 2 deletions src/pipeline/stages/details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ pub struct StageDetails {
}

impl StageDetails {
pub(crate) fn canary_traffic(&self) -> u8 {
self.canary_traffic.into()
pub(crate) fn canary_traffic(&self) -> WholePercent {
self.canary_traffic
}

pub(crate) fn badness_confidence_limit(&self) -> f64 {
Expand All @@ -34,6 +34,11 @@ impl StageDetails {
pub(crate) fn timeout_behavior(&self) -> TimeoutBehavior {
self.timeout_behavior
}

/// Returns true if the current time is beyond the timeout limit.
pub(crate) fn has_timed_out(&self, start_time: tokio::time::Instant) -> bool {
tokio::time::Instant::now().duration_since(start_time) > self.timeout
}
}

#[bon]
Expand All @@ -52,3 +57,30 @@ impl StageDetails {
}
}
}

#[cfg(test)]
mod tests {
use tokio::time::{Duration, Instant};

use crate::{pipeline::percent::DecimalPercent, WholePercent};

use super::StageDetails;

/// This test checks whether is_timed_out works by checking it against
/// times in the past and future.
#[test]
fn is_timed_out() {
let one_year_in_seconds = 60 * 60 * 24 * 365;
let distant_past = Instant::now() - Duration::from_secs(one_year_in_seconds);
let far_future = Instant::now() + Duration::from_secs(one_year_in_seconds);
let example = StageDetails {
canary_traffic: WholePercent::try_new(100).unwrap(),
badness_confidence_limit: DecimalPercent::try_new(100.0).unwrap(),
timeout: Duration::from_secs(5 * 60), // five minutes
timeout_behavior: crate::pipeline::stages::TimeoutBehavior::Advance,
};

assert_eq!(true, example.has_timed_out(distant_past));
assert_eq!(false, example.has_timed_out(far_future));
}
}
5 changes: 4 additions & 1 deletion src/pipeline/stages/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
pub(crate) use config::StageConfig;
pub(crate) use details::StageDetails;

mod config;
mod details;

Expand All @@ -10,7 +13,7 @@ mod details;
/// of confidence, which only tells us how confident we are that
/// the deployment *isn't* safe).
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum TimeoutBehavior {
pub(crate) enum TimeoutBehavior {
/// If we don't have confidence that the deployment is bad,
/// we advance to the next stage.
Advance,
Expand Down
2 changes: 1 addition & 1 deletion src/stats/chi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use super::{Categorical, ContingencyTable};

/// Alpha represents the alpha cutoff, expressed as a floating point from [0, 1] inclusive.
/// For example, 0.95 is the standard 5% confidency interval.
fn chi_square_test<const N: usize, C: Categorical<N>>(
pub fn chi_square_test<const N: usize, C: Categorical<N>>(
table: &ContingencyTable<N, C>,
alpha: f64,
) -> bool {
Expand Down
2 changes: 1 addition & 1 deletion src/stats/contingency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl<const N: usize, C: Categorical<N>> ContingencyTable<N, C> {
expected_in_category * total_observed / expected_total
}

/// calculate the expected count for the category with index `i`.
/// calculate the observed count for the category with index `i`.
pub fn observed_by_index(&self, i: usize) -> u32 {
self.observed.get_count_by_index(i)
}
Expand Down
Loading

0 comments on commit 4cbbdc1

Please sign in to comment.