Skip to content

Commit

Permalink
Hook the Monitor into the EngineController.
Browse files Browse the repository at this point in the history
  • Loading branch information
RobbieMcKinstry committed Nov 11, 2024
1 parent 7852d25 commit 7ddc226
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 5 deletions.
6 changes: 5 additions & 1 deletion src/adapters/engines/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,14 @@ pub struct EngineController {
}

impl EngineController {
pub fn new(maximum_duration: tokio::time::Duration) -> Self {
Self { maximum_duration }
}

/// Convert this controller into a stream that emits [Action]s from the Engine.
pub fn run<T: Observation>(
self,
mut engine: impl DecisionEngine<T>,
mut engine: Box<dyn DecisionEngine<T>>,
observations: impl Stream<Item = Vec<T>>,
) -> impl Stream<Item = Action> {
stream! {
Expand Down
1 change: 1 addition & 0 deletions src/adapters/engines/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::stats::Observation;

pub use action::Action;
pub use chi::ChiSquareEngine;
pub use controller::EngineController;

/// The decision engine receives observations from the monitor
/// and determines whether the canary should be promoted, yanked,
Expand Down
6 changes: 3 additions & 3 deletions src/adapters/monitors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ pub fn repeat_query<T: Observation>(
// TODO: Honestly, this function can be inlined where used.
/// Batch observations together into maximally sized chunks, and dump
/// them to a stream every so often.
pub fn batch_observations<T: Monitor>(
obs: impl tokio_stream::Stream<Item = T::Item>,
pub fn batch_observations<T: Observation>(
obs: impl tokio_stream::Stream<Item = T>,
duration: tokio::time::Duration,
) -> impl tokio_stream::Stream<Item = Vec<T::Item>> {
) -> impl tokio_stream::Stream<Item = Vec<T>> {
obs.chunks_timeout(DEFAULT_BATCH_SIZE, duration)
}

Expand Down
7 changes: 6 additions & 1 deletion src/pipeline/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::adapters::{batch_observations, EngineController};
use crate::{
adapters::{repeat_query, DecisionEngine, Ingress, Monitor},
metrics::ResponseStatusCode,
Expand Down Expand Up @@ -50,9 +51,13 @@ impl Pipeline {
// only one task at a time (the active task)
// can ever reach a block point.
// • We spawn the Monitor as a stream.
let _observations = repeat_query(self.monitor, DEFAULT_QUERY_FREQUENCY);
let event_stream = repeat_query(self.monitor, DEFAULT_QUERY_FREQUENCY);
let observations = batch_observations(event_stream, DEFAULT_QUERY_FREQUENCY);
// • We push observations from the Monitor into the
// DecisionEngine.
let _controller =
EngineController::new(DEFAULT_QUERY_FREQUENCY).run(self.engine, observations);
// • Pipe actions to the Ingress.

todo!();
}
Expand Down

0 comments on commit 7ddc226

Please sign in to comment.