From 044a3373003630f267ef48265cedced4abc8f57a Mon Sep 17 00:00:00 2001 From: KRD <44735306+KRD-Kai@users.noreply.github.com> Date: Tue, 2 Jan 2024 20:14:33 +0000 Subject: [PATCH 1/4] Add timestamp based liveness check for shadow and regular autopilot --- crates/autopilot/src/run.rs | 33 +++++++++++++++++++++----------- crates/autopilot/src/run_loop.rs | 11 +++++------ crates/autopilot/src/shadow.rs | 21 ++++++++++---------- 3 files changed, 37 insertions(+), 28 deletions(-) diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index 987584cb00..0bd28d582b 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -46,34 +46,36 @@ use { signature_validator, sources::{ balancer_v2::{ - pool_fetching::BalancerContracts, - BalancerFactoryKind, - BalancerPoolFetcher, + pool_fetching::BalancerContracts, BalancerFactoryKind, BalancerPoolFetcher, }, uniswap_v2::{pool_cache::PoolCache, UniV2BaselineSourceParameters}, uniswap_v3::pool_fetching::UniswapV3PoolFetcher, - BaselineSource, - PoolAggregator, + BaselineSource, PoolAggregator, }, token_info::{CachedTokenInfoFetcher, TokenInfoFetcher}, token_list::{AutoUpdatingTokenList, TokenListConfiguration}, zeroex_api::DefaultZeroExApi, }, - std::{collections::HashSet, sync::Arc, time::Duration}, + std::{ + collections::HashSet, + sync::{Arc, RwLock}, + time::{Duration, Instant}, + }, tracing::Instrument, url::Url, }; struct Liveness { - solvable_orders_cache: Arc, max_auction_age: Duration, + last_auction_time: Arc>, } #[async_trait::async_trait] impl LivenessChecking for Liveness { async fn is_alive(&self) -> bool { - let age = self.solvable_orders_cache.last_update_time().elapsed(); - age <= self.max_auction_age + let last_auction_time = self.last_auction_time.read().unwrap(); + let auction_age = last_auction_time.elapsed(); + auction_age <= self.max_auction_age } } @@ -583,9 +585,11 @@ pub async fn run(args: Arguments) { .update(block) .await .expect("failed to perform initial solvable orders update"); + + let now = Arc::new(RwLock::new(Instant::now())); let liveness = Liveness { max_auction_age: args.max_auction_age, - solvable_orders_cache: solvable_orders_cache.clone(), + last_auction_time: now.clone(), }; shared::metrics::serve_metrics(Arc::new(liveness), args.metrics_address); @@ -644,6 +648,7 @@ pub async fn run(args: Arguments) { in_flight_orders: Default::default(), fee_policy: args.fee_policy, persistence: infra::persistence::Persistence::new(args.s3.into().unwrap()).await, + last_auction_time: now.clone(), }; run.run_forever().await; unreachable!("run loop exited"); @@ -690,7 +695,12 @@ async fn shadow_mode(args: Arguments) -> ! { .await }; - shared::metrics::serve_metrics(Arc::new(shadow::Liveness), args.metrics_address); + let now = Arc::new(RwLock::new(Instant::now())); + let liveness = Liveness { + max_auction_age: args.max_auction_age, + last_auction_time: now.clone(), + }; + shared::metrics::serve_metrics(Arc::new(liveness), args.metrics_address); let shadow = shadow::RunLoop::new( orderbook, @@ -699,6 +709,7 @@ async fn shadow_mode(args: Arguments) -> ! { args.score_cap, args.solve_deadline, args.fee_policy, + now.clone(), ); shadow.run_forever().await; diff --git a/crates/autopilot/src/run_loop.rs b/crates/autopilot/src/run_loop.rs index 717b272015..12bea88668 100644 --- a/crates/autopilot/src/run_loop.rs +++ b/crates/autopilot/src/run_loop.rs @@ -25,11 +25,7 @@ use { interaction::InteractionData, order::{OrderClass, OrderUid}, solver_competition::{ - CompetitionAuction, - Order, - Score, - SolverCompetitionDB, - SolverSettlement, + CompetitionAuction, Order, Score, SolverCompetitionDB, SolverSettlement, }, }, number::nonzero::U256 as NonZeroU256, @@ -38,7 +34,7 @@ use { shared::{remaining_amounts, token_list::AutoUpdatingTokenList}, std::{ collections::{BTreeMap, HashMap, HashSet}, - sync::{Arc, Mutex}, + sync::{Arc, Mutex, RwLock}, time::{Duration, Instant}, }, tracing::Instrument, @@ -59,6 +55,7 @@ pub struct RunLoop { pub in_flight_orders: Arc>, pub fee_policy: arguments::FeePolicy, pub persistence: infra::persistence::Persistence, + pub last_auction_time: Arc>, } impl RunLoop { @@ -74,6 +71,8 @@ impl RunLoop { || last_block.replace(current_block) != Some(current_block) { observe::log_auction_delta(id, &previous, &auction); + *self.last_auction_time.write().unwrap() = Instant::now(); + self.single_run(id, auction) .instrument(tracing::info_span!("auction", id)) .await; diff --git a/crates/autopilot/src/shadow.rs b/crates/autopilot/src/shadow.rs index af62a0a86d..dd5a10c24f 100644 --- a/crates/autopilot/src/shadow.rs +++ b/crates/autopilot/src/shadow.rs @@ -26,20 +26,15 @@ use { number::nonzero::U256 as NonZeroU256, primitive_types::{H160, U256}, rand::seq::SliceRandom, - shared::{metrics::LivenessChecking, token_list::AutoUpdatingTokenList}, - std::{cmp, time::Duration}, + shared::token_list::AutoUpdatingTokenList, + std::{ + cmp, + sync::{Arc, RwLock}, + time::{Duration, Instant}, + }, tracing::Instrument, }; -pub struct Liveness; -#[async_trait::async_trait] -impl LivenessChecking for Liveness { - async fn is_alive(&self) -> bool { - // can we somehow check that we keep processing auctions? - true - } -} - pub struct RunLoop { orderbook: protocol::Orderbook, drivers: Vec, @@ -49,6 +44,7 @@ pub struct RunLoop { score_cap: U256, solve_deadline: Duration, fee_policy: FeePolicy, + last_auction_time: Arc>, } impl RunLoop { @@ -59,6 +55,7 @@ impl RunLoop { score_cap: U256, solve_deadline: Duration, fee_policy: FeePolicy, + last_auction_time: Arc>, ) -> Self { Self { orderbook, @@ -69,6 +66,7 @@ impl RunLoop { score_cap, solve_deadline, fee_policy, + last_auction_time, } } @@ -81,6 +79,7 @@ impl RunLoop { }; observe::log_auction_delta(id, &previous, &auction); previous = Some(auction.clone()); + *self.last_auction_time.write().unwrap() = Instant::now(); self.single_run(id, auction) .instrument(tracing::info_span!("auction", id)) From 4dc4e20e82687da80e5e92191190f6a702085385 Mon Sep 17 00:00:00 2001 From: KRD <44735306+KRD-Kai@users.noreply.github.com> Date: Thu, 4 Jan 2024 12:49:56 +0000 Subject: [PATCH 2/4] Add liveness auction method to update timestamp --- crates/autopilot/src/run.rs | 32 ++++++++++++++++++-------------- crates/autopilot/src/run_loop.rs | 7 ++++--- crates/autopilot/src/shadow.rs | 15 ++++++--------- 3 files changed, 28 insertions(+), 26 deletions(-) diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index 0bd28d582b..9e203aba89 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -65,9 +65,9 @@ use { url::Url, }; -struct Liveness { +pub struct Liveness { max_auction_age: Duration, - last_auction_time: Arc>, + last_auction_time: RwLock, } #[async_trait::async_trait] @@ -79,6 +79,12 @@ impl LivenessChecking for Liveness { } } +impl Liveness { + pub fn auction(&self) { + *self.last_auction_time.write().unwrap() = Instant::now(); + } +} + async fn ethrpc(url: &Url) -> blockchain::Rpc { blockchain::Rpc::new(url) .await @@ -586,12 +592,11 @@ pub async fn run(args: Arguments) { .await .expect("failed to perform initial solvable orders update"); - let now = Arc::new(RwLock::new(Instant::now())); - let liveness = Liveness { + let liveness = Arc::new(Liveness { max_auction_age: args.max_auction_age, - last_auction_time: now.clone(), - }; - shared::metrics::serve_metrics(Arc::new(liveness), args.metrics_address); + last_auction_time: Instant::now().into(), + }); + shared::metrics::serve_metrics(liveness.clone(), args.metrics_address); let on_settlement_event_updater = crate::on_settlement_event_updater::OnSettlementEventUpdater { @@ -648,7 +653,7 @@ pub async fn run(args: Arguments) { in_flight_orders: Default::default(), fee_policy: args.fee_policy, persistence: infra::persistence::Persistence::new(args.s3.into().unwrap()).await, - last_auction_time: now.clone(), + liveness: liveness.clone(), }; run.run_forever().await; unreachable!("run loop exited"); @@ -695,12 +700,11 @@ async fn shadow_mode(args: Arguments) -> ! { .await }; - let now = Arc::new(RwLock::new(Instant::now())); - let liveness = Liveness { + let liveness: Arc = Arc::new(Liveness { max_auction_age: args.max_auction_age, - last_auction_time: now.clone(), - }; - shared::metrics::serve_metrics(Arc::new(liveness), args.metrics_address); + last_auction_time: Instant::now().into(), + }); + shared::metrics::serve_metrics(liveness.clone(), args.metrics_address); let shadow = shadow::RunLoop::new( orderbook, @@ -709,7 +713,7 @@ async fn shadow_mode(args: Arguments) -> ! { args.score_cap, args.solve_deadline, args.fee_policy, - now.clone(), + liveness.clone(), ); shadow.run_forever().await; diff --git a/crates/autopilot/src/run_loop.rs b/crates/autopilot/src/run_loop.rs index 12bea88668..50b151c8ee 100644 --- a/crates/autopilot/src/run_loop.rs +++ b/crates/autopilot/src/run_loop.rs @@ -13,6 +13,7 @@ use { }, infra::{self, blockchain::Ethereum}, protocol::fee, + run::Liveness, solvable_orders::SolvableOrdersCache, }, ::observe::metrics, @@ -34,7 +35,7 @@ use { shared::{remaining_amounts, token_list::AutoUpdatingTokenList}, std::{ collections::{BTreeMap, HashMap, HashSet}, - sync::{Arc, Mutex, RwLock}, + sync::{Arc, Mutex}, time::{Duration, Instant}, }, tracing::Instrument, @@ -55,7 +56,7 @@ pub struct RunLoop { pub in_flight_orders: Arc>, pub fee_policy: arguments::FeePolicy, pub persistence: infra::persistence::Persistence, - pub last_auction_time: Arc>, + pub liveness: Arc, } impl RunLoop { @@ -71,7 +72,7 @@ impl RunLoop { || last_block.replace(current_block) != Some(current_block) { observe::log_auction_delta(id, &previous, &auction); - *self.last_auction_time.write().unwrap() = Instant::now(); + self.liveness.auction(); self.single_run(id, auction) .instrument(tracing::info_span!("auction", id)) diff --git a/crates/autopilot/src/shadow.rs b/crates/autopilot/src/shadow.rs index dd5a10c24f..3443807169 100644 --- a/crates/autopilot/src/shadow.rs +++ b/crates/autopilot/src/shadow.rs @@ -16,6 +16,7 @@ use { solve::{self}, }, protocol::{self, fee}, + run::Liveness, run_loop::{self, observe}, }, ::observe::metrics, @@ -27,11 +28,7 @@ use { primitive_types::{H160, U256}, rand::seq::SliceRandom, shared::token_list::AutoUpdatingTokenList, - std::{ - cmp, - sync::{Arc, RwLock}, - time::{Duration, Instant}, - }, + std::{cmp, sync::Arc, time::Duration}, tracing::Instrument, }; @@ -44,7 +41,7 @@ pub struct RunLoop { score_cap: U256, solve_deadline: Duration, fee_policy: FeePolicy, - last_auction_time: Arc>, + liveness: Arc, } impl RunLoop { @@ -55,7 +52,7 @@ impl RunLoop { score_cap: U256, solve_deadline: Duration, fee_policy: FeePolicy, - last_auction_time: Arc>, + liveness: Arc, ) -> Self { Self { orderbook, @@ -66,7 +63,7 @@ impl RunLoop { score_cap, solve_deadline, fee_policy, - last_auction_time, + liveness, } } @@ -79,7 +76,7 @@ impl RunLoop { }; observe::log_auction_delta(id, &previous, &auction); previous = Some(auction.clone()); - *self.last_auction_time.write().unwrap() = Instant::now(); + self.liveness.auction(); self.single_run(id, auction) .instrument(tracing::info_span!("auction", id)) From 385e0b65d815ad33b46d2ade872caa9f73f55fe5 Mon Sep 17 00:00:00 2001 From: KRD <44735306+KRD-Kai@users.noreply.github.com> Date: Mon, 15 Jan 2024 08:32:35 +0000 Subject: [PATCH 3/4] Add liveness constructor --- crates/autopilot/src/run.rs | 24 ++++++++++++++---------- crates/autopilot/src/run_loop.rs | 6 +++++- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index 9e203aba89..ef9ea981e6 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -46,11 +46,14 @@ use { signature_validator, sources::{ balancer_v2::{ - pool_fetching::BalancerContracts, BalancerFactoryKind, BalancerPoolFetcher, + pool_fetching::BalancerContracts, + BalancerFactoryKind, + BalancerPoolFetcher, }, uniswap_v2::{pool_cache::PoolCache, UniV2BaselineSourceParameters}, uniswap_v3::pool_fetching::UniswapV3PoolFetcher, - BaselineSource, PoolAggregator, + BaselineSource, + PoolAggregator, }, token_info::{CachedTokenInfoFetcher, TokenInfoFetcher}, token_list::{AutoUpdatingTokenList, TokenListConfiguration}, @@ -80,6 +83,13 @@ impl LivenessChecking for Liveness { } impl Liveness { + pub fn new(max_auction_age: Duration) -> Liveness { + Liveness { + max_auction_age, + last_auction_time: RwLock::new(Instant::now()), + } + } + pub fn auction(&self) { *self.last_auction_time.write().unwrap() = Instant::now(); } @@ -592,10 +602,7 @@ pub async fn run(args: Arguments) { .await .expect("failed to perform initial solvable orders update"); - let liveness = Arc::new(Liveness { - max_auction_age: args.max_auction_age, - last_auction_time: Instant::now().into(), - }); + let liveness = Arc::new(Liveness::new(args.max_auction_age)); shared::metrics::serve_metrics(liveness.clone(), args.metrics_address); let on_settlement_event_updater = @@ -700,10 +707,7 @@ async fn shadow_mode(args: Arguments) -> ! { .await }; - let liveness: Arc = Arc::new(Liveness { - max_auction_age: args.max_auction_age, - last_auction_time: Instant::now().into(), - }); + let liveness = Arc::new(Liveness::new(args.max_auction_age)); shared::metrics::serve_metrics(liveness.clone(), args.metrics_address); let shadow = shadow::RunLoop::new( diff --git a/crates/autopilot/src/run_loop.rs b/crates/autopilot/src/run_loop.rs index 50b151c8ee..7adf7877eb 100644 --- a/crates/autopilot/src/run_loop.rs +++ b/crates/autopilot/src/run_loop.rs @@ -26,7 +26,11 @@ use { interaction::InteractionData, order::{OrderClass, OrderUid}, solver_competition::{ - CompetitionAuction, Order, Score, SolverCompetitionDB, SolverSettlement, + CompetitionAuction, + Order, + Score, + SolverCompetitionDB, + SolverSettlement, }, }, number::nonzero::U256 as NonZeroU256, From 791bd428943daebf47d168cf95ec31a400225fe6 Mon Sep 17 00:00:00 2001 From: KRD <44735306+KRD-Kai@users.noreply.github.com> Date: Mon, 15 Jan 2024 09:49:40 +0000 Subject: [PATCH 4/4] Format --- crates/autopilot/src/run.rs | 7 +++++-- crates/autopilot/src/run_loop.rs | 6 +++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index 75a187b0df..3bea17fd32 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -44,11 +44,14 @@ use { signature_validator, sources::{ balancer_v2::{ - pool_fetching::BalancerContracts, BalancerFactoryKind, BalancerPoolFetcher, + pool_fetching::BalancerContracts, + BalancerFactoryKind, + BalancerPoolFetcher, }, uniswap_v2::{pool_cache::PoolCache, UniV2BaselineSourceParameters}, uniswap_v3::pool_fetching::UniswapV3PoolFetcher, - BaselineSource, PoolAggregator, + BaselineSource, + PoolAggregator, }, token_info::{CachedTokenInfoFetcher, TokenInfoFetcher}, token_list::{AutoUpdatingTokenList, TokenListConfiguration}, diff --git a/crates/autopilot/src/run_loop.rs b/crates/autopilot/src/run_loop.rs index bc9e8b0a81..703ef86af6 100644 --- a/crates/autopilot/src/run_loop.rs +++ b/crates/autopilot/src/run_loop.rs @@ -18,7 +18,11 @@ use { database::order_events::OrderEventLabel, itertools::Itertools, model::solver_competition::{ - CompetitionAuction, Order, Score, SolverCompetitionDB, SolverSettlement, + CompetitionAuction, + Order, + Score, + SolverCompetitionDB, + SolverSettlement, }, number::nonzero::U256 as NonZeroU256, primitive_types::{H160, H256, U256},