diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index db7f7173c3..3bea17fd32 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -56,21 +56,39 @@ use { token_info::{CachedTokenInfoFetcher, TokenInfoFetcher}, token_list::{AutoUpdatingTokenList, TokenListConfiguration}, }, - std::{collections::HashSet, sync::Arc, time::Duration}, + std::{ + collections::HashSet, + sync::{Arc, RwLock}, + time::{Duration, Instant}, + }, tracing::Instrument, url::Url, }; -struct Liveness { - solvable_orders_cache: Arc, +pub struct Liveness { max_auction_age: Duration, + last_auction_time: RwLock, } #[async_trait::async_trait] impl LivenessChecking for Liveness { async fn is_alive(&self) -> bool { - let age = self.solvable_orders_cache.last_update_time().elapsed(); - age <= self.max_auction_age + let last_auction_time = self.last_auction_time.read().unwrap(); + let auction_age = last_auction_time.elapsed(); + auction_age <= self.max_auction_age + } +} + +impl Liveness { + pub fn new(max_auction_age: Duration) -> Liveness { + Liveness { + max_auction_age, + last_auction_time: RwLock::new(Instant::now()), + } + } + + pub fn auction(&self) { + *self.last_auction_time.write().unwrap() = Instant::now(); } } @@ -549,11 +567,9 @@ pub async fn run(args: Arguments) { .update(block) .await .expect("failed to perform initial solvable orders update"); - let liveness = Liveness { - max_auction_age: args.max_auction_age, - solvable_orders_cache: solvable_orders_cache.clone(), - }; - shared::metrics::serve_metrics(Arc::new(liveness), args.metrics_address); + + let liveness = Arc::new(Liveness::new(args.max_auction_age)); + shared::metrics::serve_metrics(liveness.clone(), args.metrics_address); let on_settlement_event_updater = crate::on_settlement_event_updater::OnSettlementEventUpdater { @@ -607,6 +623,7 @@ pub async fn run(args: Arguments) { in_flight_orders: Default::default(), persistence: infra::persistence::Persistence::new(args.s3.into().unwrap(), Arc::new(db)) .await, + liveness: liveness.clone(), }; run.run_forever().await; unreachable!("run loop exited"); @@ -653,7 +670,8 @@ async fn shadow_mode(args: Arguments) -> ! { .await }; - shared::metrics::serve_metrics(Arc::new(shadow::Liveness), args.metrics_address); + let liveness = Arc::new(Liveness::new(args.max_auction_age)); + shared::metrics::serve_metrics(liveness.clone(), args.metrics_address); let shadow = shadow::RunLoop::new( orderbook, @@ -661,6 +679,7 @@ async fn shadow_mode(args: Arguments) -> ! { trusted_tokens, args.score_cap, args.solve_deadline, + liveness.clone(), ); shadow.run_forever().await; diff --git a/crates/autopilot/src/run_loop.rs b/crates/autopilot/src/run_loop.rs index 044e8ccd00..703ef86af6 100644 --- a/crates/autopilot/src/run_loop.rs +++ b/crates/autopilot/src/run_loop.rs @@ -9,6 +9,7 @@ use { solve::{self, TradedAmounts}, }, infra::{self, persistence::dto}, + run::Liveness, solvable_orders::SolvableOrdersCache, }, ::observe::metrics, @@ -49,6 +50,7 @@ pub struct RunLoop { pub max_settlement_transaction_wait: Duration, pub solve_deadline: Duration, pub in_flight_orders: Arc>, + pub liveness: Arc, } impl RunLoop { @@ -64,6 +66,8 @@ impl RunLoop { || last_block.replace(current_block) != Some(current_block) { observe::log_auction_delta(id, &previous, &auction); + self.liveness.auction(); + self.single_run(id, auction) .instrument(tracing::info_span!("auction", id)) .await; diff --git a/crates/autopilot/src/shadow.rs b/crates/autopilot/src/shadow.rs index 7684a0a98f..9f69c1d5f8 100644 --- a/crates/autopilot/src/shadow.rs +++ b/crates/autopilot/src/shadow.rs @@ -16,26 +16,18 @@ use { solve::{self}, }, infra, + run::Liveness, run_loop::{self, observe}, }, ::observe::metrics, number::nonzero::U256 as NonZeroU256, primitive_types::{H160, U256}, rand::seq::SliceRandom, - shared::{metrics::LivenessChecking, token_list::AutoUpdatingTokenList}, - std::{cmp, time::Duration}, + shared::token_list::AutoUpdatingTokenList, + std::{cmp, sync::Arc, time::Duration}, tracing::Instrument, }; -pub struct Liveness; -#[async_trait::async_trait] -impl LivenessChecking for Liveness { - async fn is_alive(&self) -> bool { - // can we somehow check that we keep processing auctions? - true - } -} - pub struct RunLoop { orderbook: infra::shadow::Orderbook, drivers: Vec, @@ -44,6 +36,7 @@ pub struct RunLoop { block: u64, score_cap: U256, solve_deadline: Duration, + liveness: Arc, } impl RunLoop { @@ -53,6 +46,7 @@ impl RunLoop { trusted_tokens: AutoUpdatingTokenList, score_cap: U256, solve_deadline: Duration, + liveness: Arc, ) -> Self { Self { orderbook, @@ -62,6 +56,7 @@ impl RunLoop { block: 0, score_cap, solve_deadline, + liveness, } } @@ -74,6 +69,7 @@ impl RunLoop { }; observe::log_auction_delta(id, &previous, &auction); previous = Some(auction.clone()); + self.liveness.auction(); self.single_run(id, auction) .instrument(tracing::info_span!("auction", id))