Skip to content

Commit

Permalink
Merge pull request #21 from wack/robbie/pipeline-run
Browse files Browse the repository at this point in the history
Instantiate the Monitor in the Pipeline.
  • Loading branch information
RobbieMcKinstry authored Nov 11, 2024
2 parents 53a6a04 + 7ddc226 commit eff0048
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 34 deletions.
11 changes: 10 additions & 1 deletion src/adapters/engines/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ use crate::stats::Observation;

use super::{Action, DecisionEngine};

// TODO: This type is not currently used, and I haven't really
// incorporated it into my mental model of the Pipeline.
// I think it's supposed to wrap the monitor and control
// configurables, but I'm not planning to bubble those
// configurables up to the CLI until a future release.
/// An [EngineController] is a wrapper around a DecisionEngine that
/// controls how and when its called. It essentially converts the
/// [DecisionEngine] into an async stream that only emits [Action]s
Expand All @@ -30,10 +35,14 @@ pub struct EngineController {
}

impl EngineController {
pub fn new(maximum_duration: tokio::time::Duration) -> Self {
Self { maximum_duration }
}

/// Convert this controller into a stream that emits [Action]s from the Engine.
pub fn run<T: Observation>(
self,
mut engine: impl DecisionEngine<T>,
mut engine: Box<dyn DecisionEngine<T>>,
observations: impl Stream<Item = Vec<T>>,
) -> impl Stream<Item = Action> {
stream! {
Expand Down
1 change: 1 addition & 0 deletions src/adapters/engines/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::stats::Observation;

pub use action::Action;
pub use chi::ChiSquareEngine;
pub use controller::EngineController;

/// The decision engine receives observations from the monitor
/// and determines whether the canary should be promoted, yanked,
Expand Down
12 changes: 6 additions & 6 deletions src/adapters/monitors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ pub trait Monitor {
// amount of time.
/// [repeat_query] runs the query on an interval and returns a stream of items.
/// This function runs indefinitely.
pub fn repeat_query<T: Monitor>(
mut observer: T,
pub fn repeat_query<T: Observation>(
mut observer: Box<dyn Monitor<Item = T>>,
duration: tokio::time::Duration,
) -> impl tokio_stream::Stream<Item = T::Item> {
) -> impl tokio_stream::Stream<Item = T> {
// • Everything happens in this stream closure, which desugars
// into a background thread and a channel write at yield points.
async_stream::stream! {
Expand All @@ -78,10 +78,10 @@ pub fn repeat_query<T: Monitor>(
// TODO: Honestly, this function can be inlined where used.
/// Batch observations together into maximally sized chunks, and dump
/// them to a stream every so often.
pub fn batch_observations<T: Monitor>(
obs: impl tokio_stream::Stream<Item = T::Item>,
pub fn batch_observations<T: Observation>(
obs: impl tokio_stream::Stream<Item = T>,
duration: tokio::time::Duration,
) -> impl tokio_stream::Stream<Item = Vec<T::Item>> {
) -> impl tokio_stream::Stream<Item = Vec<T>> {
obs.chunks_timeout(DEFAULT_BATCH_SIZE, duration)
}

Expand Down
55 changes: 28 additions & 27 deletions src/pipeline/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::adapters::{batch_observations, EngineController};
use crate::{
adapters::{DecisionEngine, Ingress, Monitor},
adapters::{repeat_query, DecisionEngine, Ingress, Monitor},
metrics::ResponseStatusCode,
stats::CategoricalObservation,
};
Expand All @@ -16,6 +17,11 @@ pub struct Pipeline {
monitor: ResponseMonitor,
}

// Check for new metrics every 60s. TODO: This number is set because
// AWS Cloudwatch only populates new metrics once per minute. This number
// can be adjusted in the future.
const DEFAULT_QUERY_FREQUENCY: tokio::time::Duration = tokio::time::Duration::from_secs(60);

#[bon]
impl Pipeline {
#[builder]
Expand All @@ -31,36 +37,31 @@ impl Pipeline {
}
}

/// Run the pipeline, spawning tasks for the monitor, ingress, and engine to run
/// independently.
pub async fn run(self) -> Result<()> {
// TODO: Handle graceful shutdown.
// • First, create a shutdown channel which we
// pass to each thread.
// TODO: I think because we don't spawn three
// tasks and then `join!` them, we're actually
// running single-threaded. Tokio will smartly
// which to run other threads when one blocks,
// but because tasks are only executed lazily,
// only one task at a time (the active task)
// can ever reach a block point.
// • We spawn the Monitor as a stream.
let event_stream = repeat_query(self.monitor, DEFAULT_QUERY_FREQUENCY);
let observations = batch_observations(event_stream, DEFAULT_QUERY_FREQUENCY);
// • We push observations from the Monitor into the
// DecisionEngine.
let _controller =
EngineController::new(DEFAULT_QUERY_FREQUENCY).run(self.engine, observations);
// • Pipe actions to the Ingress.

todo!();
}
}

#[cfg(test)]
use crate::adapters::{AlwaysPromote, MockIngress};

// TODO: I can probably remove this function since more of the Pipeline setup is
// done in the cmd::Deploy function now.
// TODO: Add some more structure to this. Right now, I'm
// just trying to get the general layout defined and
// all of the actors wired up
#[cfg(test)]
pub async fn setup_pipeline() {
// • First, we create a monitor based on the configuration we've been given.
// It must use dynamic dispatch because we're not sure what kind of
// monitor it is.
let _monitor: Option<Box<dyn Monitor<Item = CategoricalObservation<5, ResponseStatusCode>>>> =
None;
// • Repeat for the Ingress and the Engine.
let _ingress: Box<dyn Ingress> = Box::new(MockIngress);
let _engine: Box<dyn DecisionEngine<CategoricalObservation<5, ResponseStatusCode>>> =
Box::new(AlwaysPromote);

// TODO:
// Define the APIs that each of these things use.

// TODO:
// Now that these types are defined, let's wire them together.
// The DecisionEngine actor takes a stream of Event batches.
todo!();
}

0 comments on commit eff0048

Please sign in to comment.