Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Decision Making and Plumb to Ingress #44

Merged
merged 4 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/adapters/engines/action.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use crate::WholePercent;

/// An [Action] describes an effectful operation affecting the deployments.
/// Actions describe decisions made by the [DecisionEngine].
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum Action {
/// Ramp the canary to 100% traffic and decommission the control deployment.
/// Ramp the canary to 100% traffic and decommission the baseline deployment.
Promote,
/// Ramp the control to 100% traffic and decommission the canary deployment.
Yank,
/// Ramp the baseline to 100% traffic and decommission the canary deployment.
Rollback,
/// RampUp indicates the amount of traffic provided to the canary should increase
/// by one unit.
RampUp,
/// RampDown indicates the amount of traffic provided to the canary should decrease.
RampDown,
RampTo(WholePercent),
// NB: We don't have a no-op action type, which might be something the DecisionEngine
// provides, except that I'm picturing this Action type as part of the interface
// into the Ingress, so the Ingress just won't hear back anything from the engine
Expand Down
63 changes: 55 additions & 8 deletions src/adapters/engines/chi.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,38 @@
use crate::pipeline::TimeoutBehavior;
use crate::{
metrics::ResponseStatusCode,
stats::{CategoricalObservation, ContingencyTable, Group},
pipeline::{StageConfig, StageDetails},
stats::{chi_square_test, CategoricalObservation, ContingencyTable, Group},
};
use tokio::time::Instant;

use super::DecisionEngine;
use super::{Action, DecisionEngine};

/// The [ChiSquareEngine] uses the Chi Square statistical
/// significance test to determine whether the canary should be promoted or not.
#[derive(Default)]
pub struct ChiSquareEngine {
table: ContingencyTable<5, ResponseStatusCode>,
stages: StageConfig,
start_time: Instant,
}

impl ChiSquareEngine {
pub fn new() -> Self {
Self::default()
let start_time = Instant::now();
Self {
table: Default::default(),
stages: Default::default(),
start_time,
}
}

pub fn reset_start_time(&mut self) {
self.start_time = Instant::now();
}

fn advance(&mut self) -> Option<&StageDetails> {
self.reset_start_time();
self.stages.advance()
}
}

Expand All @@ -35,9 +53,38 @@ impl DecisionEngine<CategoricalObservation<5, ResponseStatusCode>> for ChiSquare
}
}

fn compute(&mut self) -> Option<super::Action> {
// TODO: Remember to call self.control_data.set_experimental_total
// for calculating the expected values.
todo!()
fn compute(&mut self) -> Option<Action> {
// • Check if we even have an active stage.
let stage = self.stages.current()?;

// • Otherwise, we know we can proceed with tabulation.
// • Compute the p-value.
let is_significant = chi_square_test(&self.table, stage.badness_confidence_limit());
if is_significant {
// Welp, it's time to roll back.
// TODO: Must check to see if the canary is actually worse than the baseline.
// This will be a false positive if you're actually *fixing* a bug.
return Some(Action::Rollback);
}
let timed_out = stage.has_timed_out(self.start_time);
match (timed_out, stage.timeout_behavior()) {
// If we've timed out, but there's no significant failure, then
// we advance the stage.
(true, TimeoutBehavior::Advance) => {
let details = self.advance();
match details {
Some(details) => Some(Action::RampTo(details.canary_traffic())),
None => Some(Action::Promote),
}
}
// Otherwise, we keep observing.
(false, TimeoutBehavior::Advance) => None,
}
}
}

impl Default for ChiSquareEngine {
fn default() -> Self {
Self::new()
}
}
29 changes: 23 additions & 6 deletions src/pipeline/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::adapters::{batch_observations, EngineController};
use crate::adapters::{batch_observations, Action, EngineController};
use crate::{
adapters::{repeat_query, DecisionEngine, Ingress, Monitor},
metrics::ResponseStatusCode,
Expand All @@ -8,6 +8,9 @@ use bon::bon;
use miette::Result;

pub(crate) use percent::WholePercent;
pub(crate) use stages::{StageConfig, StageDetails, TimeoutBehavior};
use tokio::pin;
use tokio_stream::StreamExt as _;

/// An alias for the Response Code-based monitor.
pub type ResponseMonitor = Box<dyn Monitor<Item = CategoricalObservation<5, ResponseStatusCode>>>;
Expand Down Expand Up @@ -41,7 +44,7 @@ impl Pipeline {

/// Run the pipeline, spawning tasks for the monitor, ingress, and engine to run
/// independently.
pub async fn run(self) -> Result<()> {
pub async fn run(mut self) -> Result<()> {
// TODO: Handle graceful shutdown.
// • First, create a shutdown channel which we
// pass to each thread.
Expand All @@ -57,11 +60,25 @@ impl Pipeline {
let observations = batch_observations(event_stream, DEFAULT_QUERY_FREQUENCY);
// • We push observations from the Monitor into the
// DecisionEngine.
let _controller =
EngineController::new(DEFAULT_QUERY_FREQUENCY).run(self.engine, observations);
let actions = EngineController::new(DEFAULT_QUERY_FREQUENCY).run(self.engine, observations);
pin!(actions);
// • Pipe actions to the Ingress.

todo!();
while let Some(action) = actions.next().await {
match action {
Action::Promote => {
self.ingress.promote_canary().await?;
return Ok(());
}
Action::Rollback => {
self.ingress.rollback_canary().await?;
return Ok(());
}
Action::RampTo(percent) => {
self.ingress.set_canary_traffic(percent).await?;
}
}
}
unreachable!("Connection to Decision Engine terminated early.");
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/pipeline/percent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ impl DecimalPercent {
// Using a newtype allows us to ensure they're correct by construction.
#[nutype(
validate(less_or_equal = 100),
derive(Debug, Display, Copy, Clone, PartialEq, Eq, TryFrom, Into)
derive(
Debug, Display, Copy, Clone, PartialEq, Eq, TryFrom, Into, PartialOrd, Ord, Hash
)
)]
pub(crate) struct WholePercent(u8);

Expand Down
14 changes: 9 additions & 5 deletions src/pipeline/stages/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use tokio::time::Duration;
use tokio::time::{Duration, Instant};

use crate::pipeline::percent::{DecimalPercent, WholePercent};

Expand All @@ -12,6 +12,7 @@ const DEFAULT_CANARY_STAGE_CONFIDENCE: [f64; 4] = [99.0, 95.0, 90.0, 90.0];
pub struct StageConfig {
current_stage: usize,
stages: Vec<StageDetails>,
start_time: Instant,
}

impl StageConfig {
Expand All @@ -24,7 +25,7 @@ impl StageConfig {
self.current()
}

fn current(&self) -> Option<&StageDetails> {
pub fn current(&self) -> Option<&StageDetails> {
self.stages.get(self.current_stage)
}
}
Expand Down Expand Up @@ -59,14 +60,16 @@ impl Default for StageConfig {
Self {
current_stage,
stages,
start_time: Instant::now(),
}
}
}

#[cfg(test)]
mod tests {
use crate::pipeline::stages::config::{
DEFAULT_CANARY_STAGE_CONFIDENCE, DEFAULT_CANARY_STAGE_TRAFFIC,
use crate::{
pipeline::stages::config::{DEFAULT_CANARY_STAGE_CONFIDENCE, DEFAULT_CANARY_STAGE_TRAFFIC},
WholePercent,
};

use super::StageConfig;
Expand All @@ -78,7 +81,8 @@ mod tests {
let mut conf = StageConfig::default();
let mut stage = conf.current();
for i in 0..4 {
let expected_traffic = Some(DEFAULT_CANARY_STAGE_TRAFFIC[i]);
let expected_traffic =
Some(WholePercent::try_new(DEFAULT_CANARY_STAGE_TRAFFIC[i]).unwrap());
let observed_traffic = stage.map(|st| st.canary_traffic());
let expected_confidence = Some(DEFAULT_CANARY_STAGE_CONFIDENCE[i]);
let observed_confidence = stage.map(|st| st.badness_confidence_limit());
Expand Down
36 changes: 34 additions & 2 deletions src/pipeline/stages/details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ pub struct StageDetails {
}

impl StageDetails {
pub(crate) fn canary_traffic(&self) -> u8 {
self.canary_traffic.into()
pub(crate) fn canary_traffic(&self) -> WholePercent {
self.canary_traffic
}

pub(crate) fn badness_confidence_limit(&self) -> f64 {
Expand All @@ -34,6 +34,11 @@ impl StageDetails {
pub(crate) fn timeout_behavior(&self) -> TimeoutBehavior {
self.timeout_behavior
}

/// Returns true if the current time is beyond the timeout limit.
pub(crate) fn has_timed_out(&self, start_time: tokio::time::Instant) -> bool {
tokio::time::Instant::now().duration_since(start_time) > self.timeout
}
}

#[bon]
Expand All @@ -52,3 +57,30 @@ impl StageDetails {
}
}
}

#[cfg(test)]
mod tests {
use tokio::time::{Duration, Instant};

use crate::{pipeline::percent::DecimalPercent, WholePercent};

use super::StageDetails;

/// This test checks whether is_timed_out works by checking it against
/// times in the past and future.
#[test]
fn is_timed_out() {
let one_year_in_seconds = 60 * 60 * 24 * 365;
let distant_past = Instant::now() - Duration::from_secs(one_year_in_seconds);
let far_future = Instant::now() + Duration::from_secs(one_year_in_seconds);
let example = StageDetails {
canary_traffic: WholePercent::try_new(100).unwrap(),
badness_confidence_limit: DecimalPercent::try_new(100.0).unwrap(),
timeout: Duration::from_secs(5 * 60), // five minutes
timeout_behavior: crate::pipeline::stages::TimeoutBehavior::Advance,
};

assert_eq!(true, example.has_timed_out(distant_past));
assert_eq!(false, example.has_timed_out(far_future));
}
}
5 changes: 4 additions & 1 deletion src/pipeline/stages/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
pub(crate) use config::StageConfig;
pub(crate) use details::StageDetails;

mod config;
mod details;

Expand All @@ -10,7 +13,7 @@ mod details;
/// of confidence, which only tells us how confident we are that
/// the deployment *isn't* safe).
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum TimeoutBehavior {
pub(crate) enum TimeoutBehavior {
/// If we don't have confidence that the deployment is bad,
/// we advance to the next stage.
Advance,
Expand Down
2 changes: 1 addition & 1 deletion src/stats/chi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use super::{Categorical, ContingencyTable};

/// Alpha represents the alpha cutoff, expressed as a floating point from [0, 1] inclusive.
/// For example, 0.95 is the standard 5% confidency interval.
fn chi_square_test<const N: usize, C: Categorical<N>>(
pub fn chi_square_test<const N: usize, C: Categorical<N>>(
table: &ContingencyTable<N, C>,
alpha: f64,
) -> bool {
Expand Down
2 changes: 1 addition & 1 deletion src/stats/contingency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl<const N: usize, C: Categorical<N>> ContingencyTable<N, C> {
expected_in_category * total_observed / expected_total
}

/// calculate the expected count for the category with index `i`.
/// calculate the observed count for the category with index `i`.
pub fn observed_by_index(&self, i: usize) -> u32 {
self.observed.get_count_by_index(i)
}
Expand Down
Loading