Skip to content

Commit

Permalink
Attempt to plumb plugins up to the CLI flags (#11)
Browse files Browse the repository at this point in the history
Attempt to plumb plugins up to the CLI flags

----------

Add fmt::Display to ResponseStatusCode.

----------

Dirt simple PR to implement displaying of response codes.

----------

Remove dead contingency table code.

This was dead code because I reimplemented contingency tables
in a better way.

-------------

Fix Type Incompatibilities between Actors

I had the type signatures incorrect for the actors. Some took Observation,
other T, and the incompatibility is that an Observation is a Group + T,
so DecisionEngines were actually able to be constructed. This commit
fixes the issue and adds a small smoke test to construct a fake
DecisionEngine as proof.

--------

Implement Chi Square engine and add different types of contingency tables.
  • Loading branch information
RobbieMcKinstry authored Oct 28, 2024
1 parent 367c263 commit a269065
Show file tree
Hide file tree
Showing 23 changed files with 545 additions and 129 deletions.
77 changes: 77 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ path = "src/bin/main.rs"
[dependencies]
async-stream = "0.3.6"
async-trait = "0.1.83"
bon = "2.3.0"
# aws-config = { version = "1.1.7", features = ["behavior-version-latest"] }
# aws-sdk-cloudwatchlogs = "1.52.0"
chrono = "0.4.38"
Expand Down
1 change: 1 addition & 0 deletions src/adapters/engines/action.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/// An [Action] describes an effectful operation affecting the deployments.
/// Actions describe decisions made by the [DecisionEngine].
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum Action {
/// Ramp the canary to 100% traffic and decommission the control deployment.
Promote,
Expand Down
45 changes: 45 additions & 0 deletions src/adapters/engines/chi.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use crate::{
metrics::ResponseStatusCode,
stats::{EmpiricalTable, ExpectationTable, Group, Observation},
};

use super::DecisionEngine;

/// The [ChiSquareEngine] uses the Chi Square statistical
/// significance test to determine whether the canary should be promoted or not.
#[derive(Default)]
pub struct ChiSquareEngine {
control_data: ExpectationTable<ResponseStatusCode>,
experimental_data: EmpiricalTable<ResponseStatusCode>,
}

impl DecisionEngine<ResponseStatusCode> for ChiSquareEngine {
// TODO: From writing this method, it's apparent there should be a Vec implementation
// that adds Vec::len() to the total and concats the vectors together, because
// otherwise we're wasting a ton of cycles just incrementing counters.
fn add_observation(&mut self, observation: Observation<ResponseStatusCode>) {
match observation.group {
Group::Control => {
// • Increment the number of observations for this category.
self.control_data.increment(observation.outcome);
}
Group::Experimental => {
// • Increment the number of observations in the canary contingency table.
self.experimental_data.increment(observation.outcome);
// • Then, let the control contingency table know that there was
// another experimental observation.
self.control_data.increment_experimental_total();
}
}
}

fn compute(&mut self) -> Option<super::Action> {
todo!()
}
}

impl ChiSquareEngine {
pub fn new() -> Self {
Self::default()
}
}
5 changes: 4 additions & 1 deletion src/adapters/engines/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use tokio::{pin, select, time::interval};
use tokio_stream::wrappers::IntervalStream;
use tokio_stream::StreamExt;

use crate::stats::Observation;

use super::{Action, DecisionEngine, HashableCategory};

/// An [EngineController] is a wrapper around a DecisionEngine that
Expand Down Expand Up @@ -32,9 +34,10 @@ impl EngineController {
pub fn run<T: HashableCategory>(
self,
mut engine: impl DecisionEngine<T>,
observations: impl Stream<Item = Vec<T>>,
observations: impl Stream<Item = Vec<Observation<T>>>,
) -> impl Stream<Item = Action> {
stream! {
// TODO: Set the MissedTickBehavior on the internal.
// TODO: Implement the stream controls.
let timer = IntervalStream::new(interval(self.maximum_duration));
// Pin our streams to the stack for iteration.
Expand Down
23 changes: 16 additions & 7 deletions src/adapters/engines/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::stats::EnumerableCategory;
use crate::stats::{EnumerableCategory, Observation};
use std::hash::Hash;

pub use action::Action;
pub use chi::ChiSquareEngine;

/// Helper trait, since these requirements are often used by
/// our implementation of `ContingencyTables`.
Expand All @@ -14,17 +15,16 @@ impl<T: EnumerableCategory + Hash + Eq> HashableCategory for T {}
pub trait DecisionEngine<T: HashableCategory> {
/// [add_observation] provides a new observation that the engine
/// should take under advisement before making a decision.
fn add_observation(&mut self, observation: T);
fn add_observation(&mut self, observation: Observation<T>);

/// [compute] will ask the engine to run over all known observations.
/// The engine isn't required to output an [Action]. It might determine
/// there isn't enough data to make an affirmative decision.
fn compute(&mut self) -> Option<Action>;
}

pub type BoxEngine<T> = Box<dyn DecisionEngine<T>>;

mod action;
mod chi;
mod controller;

/// The AlwaysPromote decision engine will always return the Promote
Expand All @@ -34,7 +34,7 @@ pub struct AlwaysPromote;

#[cfg(test)]
impl<T: HashableCategory> DecisionEngine<T> for AlwaysPromote {
fn add_observation(&mut self, _: T) {}
fn add_observation(&mut self, _: Observation<T>) {}

fn compute(&mut self) -> Option<Action> {
// true to its name, it will always promote the canary.
Expand All @@ -44,11 +44,20 @@ impl<T: HashableCategory> DecisionEngine<T> for AlwaysPromote {

#[cfg(test)]
mod tests {
use super::DecisionEngine;
use crate::metrics::ResponseStatusCode;
use super::{AlwaysPromote, DecisionEngine};
use crate::{adapters::Action, metrics::ResponseStatusCode};
use static_assertions::assert_obj_safe;

// We expect the DesignEngine to be boxed, and we expect
// it to use response codes as input.
assert_obj_safe!(DecisionEngine<ResponseStatusCode>);

/// This test is mostly for TDD: I want need to see the DecisionEngine
/// API is in action before I'm happy enough to move on to integrating
/// it with the rest of the system.
#[test]
fn mock_decision_engine() {
let mut engine: Box<dyn DecisionEngine<ResponseStatusCode>> = Box::new(AlwaysPromote);
assert_eq!(Some(Action::Promote), engine.compute());
}
}
12 changes: 7 additions & 5 deletions src/adapters/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use futures_core::stream::Stream;
use tokio::sync::mpsc::Sender;

use crate::stats::Observation;
use crate::{metrics::ResponseStatusCode, stats::Observation};

pub use engines::*;
pub use ingresses::*;
Expand All @@ -10,20 +10,22 @@ pub use monitors::*;
pub struct CloudwatchLogs {
/// The AWS client for querying Cloudwatch Logs.
client: Box<dyn ObservationEmitter>,
outbox: Sender<Observation>,
outbox: Sender<Observation<ResponseStatusCode>>,
}

// TODO: This must be a Boxed Async function since it needs
// to perform nonblocking network IO.
/// An ObservationEmitter returns the next set of observations when queried.
/// The list of Observations may be empty if no observations occurred in the window.
pub trait ObservationEmitter: Send + Sync {
fn emit_next(&mut self) -> Vec<Observation>;
fn emit_next(&mut self) -> Vec<Observation<ResponseStatusCode>>;
}

impl CloudwatchLogs {
/// Create a new [CloudwatchLogsAdapter] using a provided AWS client.
pub fn new(client: impl ObservationEmitter + 'static) -> impl Stream<Item = Observation> {
pub fn new(
client: impl ObservationEmitter + 'static,
) -> impl Stream<Item = Observation<ResponseStatusCode>> {
let (outbox, mut inbox) = tokio::sync::mpsc::channel(1024);
let adapter = Self {
client: Box::new(client),
Expand Down Expand Up @@ -68,7 +70,7 @@ mod tests {

struct FakeObservationEmitter;
impl ObservationEmitter for FakeObservationEmitter {
fn emit_next(&mut self) -> Vec<super::Observation> {
fn emit_next(&mut self) -> Vec<Observation<ResponseStatusCode>> {
vec![Observation {
group: Group::Control,
outcome: ResponseStatusCode::_2XX,
Expand Down
18 changes: 18 additions & 0 deletions src/cmd/deploy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use crate::Pipeline;
use miette::Result;

#[derive(Default)]
pub struct Deploy;

impl Deploy {
pub fn new() -> Self {
Self
}

/// deploy the canary, monitoring it, and ultimately promoting
/// or yanking the deployment.
pub async fn dispatch(self) -> Result<()> {
// • Set up our deployment pipeline.
Pipeline::run().await
}
}
4 changes: 4 additions & 0 deletions src/cmd/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
/// A subcommand to deploy the canary, and self-promote it as we gain
/// statistical confidence in its correctness.
pub use deploy::Deploy;
/// A subcommand to print the version of this executable.
pub use version::Version;

mod deploy;
mod version;
6 changes: 5 additions & 1 deletion src/config/command.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use clap::Subcommand;
use miette::Result;

use crate::cmd::Version;
use crate::cmd::{Deploy, Version};

/// one of the top-level commands accepted by
/// the canary CLI.
#[derive(Subcommand, Clone)]
pub enum CanaryCommand {
/// Deploy the canary, dynamically ramping traffic up as canary gains statistical
/// confidence in the canary.
Deploy,
/// Print the CLI version and exit
Version,
}
Expand All @@ -15,6 +18,7 @@ impl CanaryCommand {
/// dispatch the user-provided arguments to the command handler.
pub async fn dispatch(&self) -> Result<()> {
match self.clone() {
Self::Deploy => Deploy::new().dispatch().await,
Self::Version => Version::new().dispatch(),
}
}
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub use config::Flags;
pub use pipeline::Pipeline;

/// An adapter connects to some observable resource (like `CloudWatch`) and
/// emits events, like failed and succeeded requests.
Expand Down
Loading

0 comments on commit a269065

Please sign in to comment.