Skip to content

Commit

Permalink
Load Config and set up execution pipeline.
Browse files Browse the repository at this point in the history
This commit sets up the execution pipeline. I had some scratch code in
a function `setup_pipeline` in `src/pipeline/mod.rs` that would create
a mock pipeline. This commit instantiates (mostly real) objects in
`src/cmd/deploy.rs` instead. Since objects like the `Monitor` are
initialized based on user config, I think it makes sense that the config
plumbing lives in the `cmd` module instead of the `pipeline` module,
which offers better separation between setup logic and application
logic.

This commit also create a real AWS CloudWatch `Monitor`. The previous
version, which I ultimately deprecated, was way out of place in the wrong
module.
  • Loading branch information
RobbieMcKinstry committed Nov 11, 2024
1 parent 1787b98 commit 7ca52fc
Show file tree
Hide file tree
Showing 8 changed files with 1,366 additions and 19 deletions.
1,279 changes: 1,276 additions & 3 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ path = "src/bin/main.rs"
[dependencies]
async-stream = "0.3.6"
async-trait = "0.1.83"
aws-config = { version = "1.1.7", features = ["behavior-version-latest"] }
aws-sdk-cloudwatch = "1.54.0"
bon = "2.3.0"
# aws-config = { version = "1.1.7", features = ["behavior-version-latest"] }
# aws-sdk-cloudwatchlogs = "1.52.0"
chrono = "0.4.38"
clap = { version = "4.3", features = ["derive"] }
futures-core = "0.3.31"
Expand Down
1 change: 1 addition & 0 deletions src/adapters/engines/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::stats::Observation;

pub use action::Action;
pub use chi::ChiSquareEngine;

/// The decision engine receives observations from the monitor
/// and determines whether the canary should be promoted, yanked,
Expand Down
32 changes: 32 additions & 0 deletions src/adapters/monitors/cloudwatch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use async_trait::async_trait;
use aws_config::BehaviorVersion;

use crate::{metrics::ResponseStatusCode, stats::CategoricalObservation};
use aws_sdk_cloudwatch::client::Client as AwsClient;

use super::Monitor;

pub struct CloudWatch {
client: AwsClient,
}

impl CloudWatch {
pub async fn new() -> Self {
// We don't need a particular version, but we should pin to a particular
// behavior so it doens't accidently slip if `latest` gets updated
// without our knowledge.
let behavior = BehaviorVersion::v2024_03_28();
let config = aws_config::load_defaults(behavior).await;
let client = aws_sdk_cloudwatch::Client::new(&config);
Self { client }
}
}

#[async_trait]
impl Monitor for CloudWatch {
type Item = CategoricalObservation<5, ResponseStatusCode>;

async fn query(&mut self) -> Vec<Self::Item> {
todo!("CloudWatch monitoring not yet implemented.");
}
}
25 changes: 24 additions & 1 deletion src/adapters/monitors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use crate::stats::Observation;
use crate::{pipeline::ResponseMonitor, stats::Observation};
use async_trait::async_trait;
use miette::Result;
use tokio::{pin, time::interval};
use tokio_stream::{wrappers::IntervalStream, StreamExt};

pub use cloudwatch::CloudWatch;

/// The maximum number of observations that can be recevied before we
/// recompute statistical significance.
/// If this number is too low, we'll be performing compute-intensive
Expand All @@ -25,6 +28,23 @@ pub trait Monitor {
/// function was called.
// TODO: This should return a result which we should account for in error handling.
async fn query(&mut self) -> Vec<Self::Item>;

/// Load a response code monitor using the configuration provided by the user.
async fn load_from_conf() -> Result<ResponseMonitor>
// TODO: Without this bound, the trait isn't object safe, but with the bound, we have to
// know the concrete type statically because we have to know how to instantiate Self.
// Ultiately, this should just be a function instead of an associated method, probably.
where
Self: Sized,
{
// NB: Currently, only one type of Monitor is supported: the AWS Cloudwatch monitor.
// Effectively, this means we need to read the user's AWS config from the environment
// (or accept it from CLI flags).
// BUG: For now, we require the configuration come from the environment, but in the future
// we should accept CLI flags too.
let monitor = CloudWatch::new().await;
Ok(Box::new(monitor))
}
}

// TODO: Add a call to chunk_timeout to ensure that items are arriving after a particular
Expand Down Expand Up @@ -65,6 +85,9 @@ pub fn batch_observations<T: Monitor>(
obs.chunks_timeout(DEFAULT_BATCH_SIZE, duration)
}

/// The cloudwatch monitor.
mod cloudwatch;

#[cfg(test)]
mod tests {
use static_assertions::assert_obj_safe;
Expand Down
18 changes: 16 additions & 2 deletions src/cmd/deploy.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,32 @@
use crate::Pipeline;
use crate::adapters::ChiSquareEngine;
use crate::adapters::{CloudWatch, MockIngress};
use crate::{adapters::Monitor, Pipeline};
use miette::Result;

#[derive(Default)]
pub struct Deploy;

impl Deploy {
/// initialize the deployment command. Using a constructor here
/// is largely to make the look-and-feel of the cmd API similar
/// between different commands.
pub fn new() -> Self {
Self
}

/// deploy the canary, monitoring it, and ultimately promoting
/// or yanking the deployment.
pub async fn dispatch(self) -> Result<()> {
// • Load the monitor from the user's config.
let monitor = CloudWatch::load_from_conf().await?;
// • Set up our deployment pipeline.
Pipeline::run().await
Pipeline::builder()
.monitor(monitor)
.engine(ChiSquareEngine::new())
.ingress(MockIngress)
.build()
// Run the pipeline to completion.
.run()
.await
}
}
24 changes: 14 additions & 10 deletions src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,41 @@ use crate::{
use bon::bon;
use miette::Result;

/// A local-only shorthand to make the type declaration more readable.
type StatusCode = CategoricalObservation<5, ResponseStatusCode>;
/// An alias for the Response Code-based monitor.
pub type ResponseMonitor = Box<dyn Monitor<Item = CategoricalObservation<5, ResponseStatusCode>>>;

/// Pipeline captures the core logic of canary.
pub struct Pipeline {
engine: Box<dyn DecisionEngine<StatusCode>>,
engine: Box<dyn DecisionEngine<CategoricalObservation<5, ResponseStatusCode>>>,
ingress: Box<dyn Ingress>,
monitor: Box<dyn Monitor<Item = StatusCode>>,
monitor: ResponseMonitor,
}

#[bon]
impl Pipeline {
#[builder]
fn new(
monitor: impl Monitor<Item = CategoricalObservation<5, ResponseStatusCode>> + 'static,
pub fn new(
monitor: ResponseMonitor,
ingress: impl Ingress + 'static,
engine: impl DecisionEngine<CategoricalObservation<5, ResponseStatusCode>> + 'static,
) -> Self {
Self {
engine: Box::new(engine),
ingress: Box::new(ingress),
monitor: Box::new(monitor),
monitor,
}
}

pub async fn run() -> Result<()> {
pub async fn run(self) -> Result<()> {
todo!();
}
}

#[cfg(test)]
use crate::adapters::{AlwaysPromote, MockIngress};

// TODO: I can probably remove this function since more of the Pipeline setup is
// done in the cmd::Deploy function now.
// TODO: Add some more structure to this. Right now, I'm
// just trying to get the general layout defined and
// all of the actors wired up
Expand All @@ -47,10 +49,12 @@ pub async fn setup_pipeline() {
// • First, we create a monitor based on the configuration we've been given.
// It must use dynamic dispatch because we're not sure what kind of
// monitor it is.
let _monitor: Option<Box<dyn Monitor<Item = StatusCode>>> = None;
let _monitor: Option<Box<dyn Monitor<Item = CategoricalObservation<5, ResponseStatusCode>>>> =
None;
// • Repeat for the Ingress and the Engine.
let _ingress: Box<dyn Ingress> = Box::new(MockIngress);
let _engine: Box<dyn DecisionEngine<StatusCode>> = Box::new(AlwaysPromote);
let _engine: Box<dyn DecisionEngine<CategoricalObservation<5, ResponseStatusCode>>> =
Box::new(AlwaysPromote);

// TODO:
// Define the APIs that each of these things use.
Expand Down
2 changes: 1 addition & 1 deletion src/stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const DEFAULT_ALPHA_CUTOFF: f64 = 0.05;
/// The [ChiSquareEngine] calculates the Chi Square test statistic
/// based on the data stored in its contingency tables.
// #[deprecated = "Use crate::adapters::engines::ChiSquareEngine instead"]
pub struct ChiSquareEngine {
struct ChiSquareEngine {
control: Table,
experimental: Table,
total_control_count: usize,
Expand Down

0 comments on commit 7ca52fc

Please sign in to comment.