From 2a19dc0fe96c8bd17914008baec797ce25643bf0 Mon Sep 17 00:00:00 2001 From: Robbie McKinstry Date: Thu, 24 Oct 2024 17:39:26 -0400 Subject: [PATCH] Fix Type Incompatibilities between Actors I had the type signatures incorrect for the actors. Some took Observation, other T, and the incompatibility is that an Observation is a Group + T, so DecisionEngines were actually able to be constructed. This commit fixes the issue and adds a small smoke test to construct a fake DecisionEngine as proof. --- src/adapters/engines/action.rs | 1 + src/adapters/engines/controller.rs | 5 ++++- src/adapters/engines/mod.rs | 21 ++++++++++++++------- src/adapters/mod.rs | 12 +++++++----- src/pipeline/mod.rs | 6 +++--- src/stats/group.rs | 10 ++++++++++ src/stats/mod.rs | 29 +++++++---------------------- src/stats/observation.rs | 13 +++++++++++++ 8 files changed, 59 insertions(+), 38 deletions(-) create mode 100644 src/stats/group.rs create mode 100644 src/stats/observation.rs diff --git a/src/adapters/engines/action.rs b/src/adapters/engines/action.rs index 442b9f3..cf95747 100644 --- a/src/adapters/engines/action.rs +++ b/src/adapters/engines/action.rs @@ -1,5 +1,6 @@ /// An [Action] describes an effectful operation affecting the deployments. /// Actions describe decisions made by the [DecisionEngine]. +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum Action { /// Ramp the canary to 100% traffic and decommission the control deployment. Promote, diff --git a/src/adapters/engines/controller.rs b/src/adapters/engines/controller.rs index 3909173..fec3c77 100644 --- a/src/adapters/engines/controller.rs +++ b/src/adapters/engines/controller.rs @@ -4,6 +4,8 @@ use tokio::{pin, select, time::interval}; use tokio_stream::wrappers::IntervalStream; use tokio_stream::StreamExt; +use crate::stats::Observation; + use super::{Action, DecisionEngine, HashableCategory}; /// An [EngineController] is a wrapper around a DecisionEngine that @@ -32,9 +34,10 @@ impl EngineController { pub fn run( self, mut engine: impl DecisionEngine, - observations: impl Stream>, + observations: impl Stream>>, ) -> impl Stream { stream! { + // TODO: Set the MissedTickBehavior on the internal. // TODO: Implement the stream controls. let timer = IntervalStream::new(interval(self.maximum_duration)); // Pin our streams to the stack for iteration. diff --git a/src/adapters/engines/mod.rs b/src/adapters/engines/mod.rs index 304a59a..5b5fbb9 100644 --- a/src/adapters/engines/mod.rs +++ b/src/adapters/engines/mod.rs @@ -1,4 +1,4 @@ -use crate::stats::EnumerableCategory; +use crate::stats::{EnumerableCategory, Observation}; use std::hash::Hash; pub use action::Action; @@ -14,7 +14,7 @@ impl HashableCategory for T {} pub trait DecisionEngine { /// [add_observation] provides a new observation that the engine /// should take under advisement before making a decision. - fn add_observation(&mut self, observation: T); + fn add_observation(&mut self, observation: Observation); /// [compute] will ask the engine to run over all known observations. /// The engine isn't required to output an [Action]. It might determine @@ -22,8 +22,6 @@ pub trait DecisionEngine { fn compute(&mut self) -> Option; } -pub type BoxEngine = Box>; - mod action; mod controller; @@ -34,7 +32,7 @@ pub struct AlwaysPromote; #[cfg(test)] impl DecisionEngine for AlwaysPromote { - fn add_observation(&mut self, _: T) {} + fn add_observation(&mut self, _: Observation) {} fn compute(&mut self) -> Option { // true to its name, it will always promote the canary. @@ -44,11 +42,20 @@ impl DecisionEngine for AlwaysPromote { #[cfg(test)] mod tests { - use super::DecisionEngine; - use crate::metrics::ResponseStatusCode; + use super::{AlwaysPromote, DecisionEngine}; + use crate::{adapters::Action, metrics::ResponseStatusCode}; use static_assertions::assert_obj_safe; // We expect the DesignEngine to be boxed, and we expect // it to use response codes as input. assert_obj_safe!(DecisionEngine); + + /// This test is mostly for TDD: I want need to see the DecisionEngine + /// API is in action before I'm happy enough to move on to integrating + /// it with the rest of the system. + #[test] + fn mock_decision_engine() { + let mut engine: Box> = Box::new(AlwaysPromote); + assert_eq!(Some(Action::Promote), engine.compute()); + } } diff --git a/src/adapters/mod.rs b/src/adapters/mod.rs index 0e60130..5f9f2cc 100644 --- a/src/adapters/mod.rs +++ b/src/adapters/mod.rs @@ -1,7 +1,7 @@ use futures_core::stream::Stream; use tokio::sync::mpsc::Sender; -use crate::stats::Observation; +use crate::{metrics::ResponseStatusCode, stats::Observation}; pub use engines::*; pub use ingresses::*; @@ -10,7 +10,7 @@ pub use monitors::*; pub struct CloudwatchLogs { /// The AWS client for querying Cloudwatch Logs. client: Box, - outbox: Sender, + outbox: Sender>, } // TODO: This must be a Boxed Async function since it needs @@ -18,12 +18,14 @@ pub struct CloudwatchLogs { /// An ObservationEmitter returns the next set of observations when queried. /// The list of Observations may be empty if no observations occurred in the window. pub trait ObservationEmitter: Send + Sync { - fn emit_next(&mut self) -> Vec; + fn emit_next(&mut self) -> Vec>; } impl CloudwatchLogs { /// Create a new [CloudwatchLogsAdapter] using a provided AWS client. - pub fn new(client: impl ObservationEmitter + 'static) -> impl Stream { + pub fn new( + client: impl ObservationEmitter + 'static, + ) -> impl Stream> { let (outbox, mut inbox) = tokio::sync::mpsc::channel(1024); let adapter = Self { client: Box::new(client), @@ -68,7 +70,7 @@ mod tests { struct FakeObservationEmitter; impl ObservationEmitter for FakeObservationEmitter { - fn emit_next(&mut self) -> Vec { + fn emit_next(&mut self) -> Vec> { vec![Observation { group: Group::Control, outcome: ResponseStatusCode::_2XX, diff --git a/src/pipeline/mod.rs b/src/pipeline/mod.rs index be3e89b..810db87 100644 --- a/src/pipeline/mod.rs +++ b/src/pipeline/mod.rs @@ -10,14 +10,14 @@ use miette::Result; pub struct Pipeline { engine: Box>, ingress: Box, - monitor: Box>, + monitor: Box>>, } #[bon] impl Pipeline { #[builder] fn new( - monitor: impl Monitor + 'static, + monitor: impl Monitor> + 'static, ingress: impl Ingress + 'static, engine: impl DecisionEngine + 'static, ) -> Self { @@ -44,7 +44,7 @@ pub async fn setup_pipeline() { // • First, we create a monitor based on the configuration we've been given. // It must use dynamic dispatch because we're not sure what kind of // monitor it is. - let _monitor: Option>> = None; + let _monitor: Option>> = None; // • Repeat for the Ingress and the Engine. let _ingress: Box = Box::new(MockIngress); let _engine: Box> = Box::new(AlwaysPromote); diff --git a/src/stats/group.rs b/src/stats/group.rs new file mode 100644 index 0000000..cb95f4f --- /dev/null +++ b/src/stats/group.rs @@ -0,0 +1,10 @@ +/// The [Group] indicates from whence a given observation +/// was generated: either by a control group deployment or by +/// a canary deployment. +#[derive(Hash, Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] +pub enum Group { + /// The control group is the current running deployment. + Control, + /// The experimental group represents the canary deployment. + Experimental, +} diff --git a/src/stats/mod.rs b/src/stats/mod.rs index a17567d..d8388a5 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -1,6 +1,8 @@ use std::collections::HashMap; pub use chi::EnumerableCategory; +pub use group::Group; +pub use observation::Observation; use crate::metrics::ResponseStatusCode; @@ -39,7 +41,7 @@ impl ChiSquareEngine { } } - pub fn add_observation(&mut self, obs: Observation) { + pub fn add_observation(&mut self, obs: Observation) { // Fetch the count of observations for the given group. let entry = match obs.group { Group::Control => { @@ -93,26 +95,9 @@ impl ChiSquareEngine { /// This type maps the dependent variable to its count. pub type ContingencyTable = HashMap; -/// An [Observation] represents a measured outcome that -/// belongs to either a control group or an experimental -/// group (i.e. canary). -pub struct Observation { - /// The experimental group or the control group. - pub group: Group, - /// The outcome of the observation, by status code. - pub outcome: ResponseStatusCode, -} - -/// The [Group] indicates from whence a given observation -/// was generated: either by a control group deployment or by -/// a canary deployment. -#[derive(Hash, Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] -pub enum Group { - /// The control group is the current running deployment. - Control, - /// The experimental group represents the canary deployment. - Experimental, -} - /// contains the engine to calculate the chi square test statistic. mod chi; +/// `group` defines the two groups. +mod group; +/// An observation represents a group and the observed category. +mod observation; diff --git a/src/stats/observation.rs b/src/stats/observation.rs new file mode 100644 index 0000000..060b58a --- /dev/null +++ b/src/stats/observation.rs @@ -0,0 +1,13 @@ +use super::{group::Group, EnumerableCategory}; + +/// An [Observation] represents a measured outcome that +/// belongs to either a control group or an experimental +/// group (i.e. canary). +/// The measured outcome must be categorical (for now). +pub struct Observation { + /// The experimental group or the control group. + pub group: Group, + /// The outcome of the observation, bucketed into a specific category. + /// e.g. a response status code's highest order digit, 2XX, 5XX, etc. + pub outcome: Cat, +}