Skip to content

Commit

Permalink
Fix Type Incompatibilities between Actors
Browse files Browse the repository at this point in the history
I had the type signatures incorrect for the actors. Some took Observation,
other T, and the incompatibility is that an Observation is a Group + T,
so DecisionEngines were actually able to be constructed. This commit
fixes the issue and adds a small smoke test to construct a fake
DecisionEngine as proof.
  • Loading branch information
RobbieMcKinstry committed Oct 24, 2024
1 parent add9672 commit 2a19dc0
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 38 deletions.
1 change: 1 addition & 0 deletions src/adapters/engines/action.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/// An [Action] describes an effectful operation affecting the deployments.
/// Actions describe decisions made by the [DecisionEngine].
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum Action {
/// Ramp the canary to 100% traffic and decommission the control deployment.
Promote,
Expand Down
5 changes: 4 additions & 1 deletion src/adapters/engines/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use tokio::{pin, select, time::interval};
use tokio_stream::wrappers::IntervalStream;
use tokio_stream::StreamExt;

use crate::stats::Observation;

use super::{Action, DecisionEngine, HashableCategory};

/// An [EngineController] is a wrapper around a DecisionEngine that
Expand Down Expand Up @@ -32,9 +34,10 @@ impl EngineController {
pub fn run<T: HashableCategory>(
self,
mut engine: impl DecisionEngine<T>,
observations: impl Stream<Item = Vec<T>>,
observations: impl Stream<Item = Vec<Observation<T>>>,
) -> impl Stream<Item = Action> {
stream! {
// TODO: Set the MissedTickBehavior on the internal.
// TODO: Implement the stream controls.
let timer = IntervalStream::new(interval(self.maximum_duration));
// Pin our streams to the stack for iteration.
Expand Down
21 changes: 14 additions & 7 deletions src/adapters/engines/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stats::EnumerableCategory;
use crate::stats::{EnumerableCategory, Observation};
use std::hash::Hash;

pub use action::Action;
Expand All @@ -14,16 +14,14 @@ impl<T: EnumerableCategory + Hash + Eq> HashableCategory for T {}
pub trait DecisionEngine<T: HashableCategory> {
/// [add_observation] provides a new observation that the engine
/// should take under advisement before making a decision.
fn add_observation(&mut self, observation: T);
fn add_observation(&mut self, observation: Observation<T>);

/// [compute] will ask the engine to run over all known observations.
/// The engine isn't required to output an [Action]. It might determine
/// there isn't enough data to make an affirmative decision.
fn compute(&mut self) -> Option<Action>;
}

pub type BoxEngine<T> = Box<dyn DecisionEngine<T>>;

mod action;
mod controller;

Expand All @@ -34,7 +32,7 @@ pub struct AlwaysPromote;

#[cfg(test)]
impl<T: HashableCategory> DecisionEngine<T> for AlwaysPromote {
fn add_observation(&mut self, _: T) {}
fn add_observation(&mut self, _: Observation<T>) {}

fn compute(&mut self) -> Option<Action> {
// true to its name, it will always promote the canary.
Expand All @@ -44,11 +42,20 @@ impl<T: HashableCategory> DecisionEngine<T> for AlwaysPromote {

#[cfg(test)]
mod tests {
use super::DecisionEngine;
use crate::metrics::ResponseStatusCode;
use super::{AlwaysPromote, DecisionEngine};
use crate::{adapters::Action, metrics::ResponseStatusCode};
use static_assertions::assert_obj_safe;

// We expect the DesignEngine to be boxed, and we expect
// it to use response codes as input.
assert_obj_safe!(DecisionEngine<ResponseStatusCode>);

/// This test is mostly for TDD: I want need to see the DecisionEngine
/// API is in action before I'm happy enough to move on to integrating
/// it with the rest of the system.
#[test]
fn mock_decision_engine() {
let mut engine: Box<dyn DecisionEngine<ResponseStatusCode>> = Box::new(AlwaysPromote);
assert_eq!(Some(Action::Promote), engine.compute());
}
}
12 changes: 7 additions & 5 deletions src/adapters/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use futures_core::stream::Stream;
use tokio::sync::mpsc::Sender;

use crate::stats::Observation;
use crate::{metrics::ResponseStatusCode, stats::Observation};

pub use engines::*;
pub use ingresses::*;
Expand All @@ -10,20 +10,22 @@ pub use monitors::*;
pub struct CloudwatchLogs {
/// The AWS client for querying Cloudwatch Logs.
client: Box<dyn ObservationEmitter>,
outbox: Sender<Observation>,
outbox: Sender<Observation<ResponseStatusCode>>,
}

// TODO: This must be a Boxed Async function since it needs
// to perform nonblocking network IO.
/// An ObservationEmitter returns the next set of observations when queried.
/// The list of Observations may be empty if no observations occurred in the window.
pub trait ObservationEmitter: Send + Sync {
fn emit_next(&mut self) -> Vec<Observation>;
fn emit_next(&mut self) -> Vec<Observation<ResponseStatusCode>>;
}

impl CloudwatchLogs {
/// Create a new [CloudwatchLogsAdapter] using a provided AWS client.
pub fn new(client: impl ObservationEmitter + 'static) -> impl Stream<Item = Observation> {
pub fn new(
client: impl ObservationEmitter + 'static,
) -> impl Stream<Item = Observation<ResponseStatusCode>> {
let (outbox, mut inbox) = tokio::sync::mpsc::channel(1024);
let adapter = Self {
client: Box::new(client),
Expand Down Expand Up @@ -68,7 +70,7 @@ mod tests {

struct FakeObservationEmitter;
impl ObservationEmitter for FakeObservationEmitter {
fn emit_next(&mut self) -> Vec<super::Observation> {
fn emit_next(&mut self) -> Vec<Observation<ResponseStatusCode>> {
vec![Observation {
group: Group::Control,
outcome: ResponseStatusCode::_2XX,
Expand Down
6 changes: 3 additions & 3 deletions src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ use miette::Result;
pub struct Pipeline {
engine: Box<dyn DecisionEngine<ResponseStatusCode>>,
ingress: Box<dyn Ingress>,
monitor: Box<dyn Monitor<Item = Observation>>,
monitor: Box<dyn Monitor<Item = Observation<ResponseStatusCode>>>,
}

#[bon]
impl Pipeline {
#[builder]
fn new(
monitor: impl Monitor<Item = Observation> + 'static,
monitor: impl Monitor<Item = Observation<ResponseStatusCode>> + 'static,
ingress: impl Ingress + 'static,
engine: impl DecisionEngine<ResponseStatusCode> + 'static,
) -> Self {
Expand All @@ -44,7 +44,7 @@ pub async fn setup_pipeline() {
// • First, we create a monitor based on the configuration we've been given.
// It must use dynamic dispatch because we're not sure what kind of
// monitor it is.
let _monitor: Option<Box<dyn Monitor<Item = Observation>>> = None;
let _monitor: Option<Box<dyn Monitor<Item = ResponseStatusCode>>> = None;
// • Repeat for the Ingress and the Engine.
let _ingress: Box<dyn Ingress> = Box::new(MockIngress);
let _engine: Box<dyn DecisionEngine<ResponseStatusCode>> = Box::new(AlwaysPromote);
Expand Down
10 changes: 10 additions & 0 deletions src/stats/group.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/// The [Group] indicates from whence a given observation
/// was generated: either by a control group deployment or by
/// a canary deployment.
#[derive(Hash, Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
pub enum Group {
/// The control group is the current running deployment.
Control,
/// The experimental group represents the canary deployment.
Experimental,
}
29 changes: 7 additions & 22 deletions src/stats/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::collections::HashMap;

pub use chi::EnumerableCategory;
pub use group::Group;
pub use observation::Observation;

use crate::metrics::ResponseStatusCode;

Expand Down Expand Up @@ -39,7 +41,7 @@ impl ChiSquareEngine {
}
}

pub fn add_observation(&mut self, obs: Observation) {
pub fn add_observation(&mut self, obs: Observation<ResponseStatusCode>) {
// Fetch the count of observations for the given group.
let entry = match obs.group {
Group::Control => {
Expand Down Expand Up @@ -93,26 +95,9 @@ impl ChiSquareEngine {
/// This type maps the dependent variable to its count.
pub type ContingencyTable = HashMap<ResponseStatusCode, usize>;

/// An [Observation] represents a measured outcome that
/// belongs to either a control group or an experimental
/// group (i.e. canary).
pub struct Observation {
/// The experimental group or the control group.
pub group: Group,
/// The outcome of the observation, by status code.
pub outcome: ResponseStatusCode,
}

/// The [Group] indicates from whence a given observation
/// was generated: either by a control group deployment or by
/// a canary deployment.
#[derive(Hash, Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
pub enum Group {
/// The control group is the current running deployment.
Control,
/// The experimental group represents the canary deployment.
Experimental,
}

/// contains the engine to calculate the chi square test statistic.
mod chi;
/// `group` defines the two groups.
mod group;
/// An observation represents a group and the observed category.
mod observation;
13 changes: 13 additions & 0 deletions src/stats/observation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use super::{group::Group, EnumerableCategory};

/// An [Observation] represents a measured outcome that
/// belongs to either a control group or an experimental
/// group (i.e. canary).
/// The measured outcome must be categorical (for now).
pub struct Observation<Cat: EnumerableCategory> {
/// The experimental group or the control group.
pub group: Group,
/// The outcome of the observation, bucketed into a specific category.
/// e.g. a response status code's highest order digit, 2XX, 5XX, etc.
pub outcome: Cat,
}

0 comments on commit 2a19dc0

Please sign in to comment.