Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve autopilot liveness check #2236

Merged
merged 5 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 30 additions & 15 deletions crates/autopilot/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,34 +46,42 @@ use {
signature_validator,
sources::{
balancer_v2::{
pool_fetching::BalancerContracts,
BalancerFactoryKind,
BalancerPoolFetcher,
pool_fetching::BalancerContracts, BalancerFactoryKind, BalancerPoolFetcher,
},
uniswap_v2::{pool_cache::PoolCache, UniV2BaselineSourceParameters},
uniswap_v3::pool_fetching::UniswapV3PoolFetcher,
BaselineSource,
PoolAggregator,
BaselineSource, PoolAggregator,
},
token_info::{CachedTokenInfoFetcher, TokenInfoFetcher},
token_list::{AutoUpdatingTokenList, TokenListConfiguration},
zeroex_api::DefaultZeroExApi,
},
std::{collections::HashSet, sync::Arc, time::Duration},
std::{
collections::HashSet,
sync::{Arc, RwLock},
time::{Duration, Instant},
},
tracing::Instrument,
url::Url,
};

struct Liveness {
solvable_orders_cache: Arc<SolvableOrdersCache>,
pub struct Liveness {
max_auction_age: Duration,
last_auction_time: RwLock<Instant>,
}

#[async_trait::async_trait]
impl LivenessChecking for Liveness {
async fn is_alive(&self) -> bool {
let age = self.solvable_orders_cache.last_update_time().elapsed();
age <= self.max_auction_age
let last_auction_time = self.last_auction_time.read().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, why is it safe to unwrap

let auction_age = last_auction_time.elapsed();
auction_age <= self.max_auction_age
}
}

impl Liveness {
pub fn auction(&self) {
*self.last_auction_time.write().unwrap() = Instant::now();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: expect instead of unwrap with explanation of why we are impossible to run into this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same as mutex.lock().unwrap() which we commonly do throughout the code base so I think we don't need to start using .expect() here.
The reasoning is that an error here indicates an unknown state of the locked variable after a panic while holding the lock. There isn't really a good strategy to recover the "correct" value after that so we think panicking is fine on those errors over all.

}
}

Expand Down Expand Up @@ -583,11 +591,12 @@ pub async fn run(args: Arguments) {
.update(block)
.await
.expect("failed to perform initial solvable orders update");
let liveness = Liveness {

let liveness = Arc::new(Liveness {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Liveness could have a constructor and set last_auction_time itself (given it's not really variable and not needed to be overridden in tests)

max_auction_age: args.max_auction_age,
solvable_orders_cache: solvable_orders_cache.clone(),
};
shared::metrics::serve_metrics(Arc::new(liveness), args.metrics_address);
last_auction_time: Instant::now().into(),
});
shared::metrics::serve_metrics(liveness.clone(), args.metrics_address);

let on_settlement_event_updater =
crate::on_settlement_event_updater::OnSettlementEventUpdater {
Expand Down Expand Up @@ -644,6 +653,7 @@ pub async fn run(args: Arguments) {
in_flight_orders: Default::default(),
fee_policy: args.fee_policy,
persistence: infra::persistence::Persistence::new(args.s3.into().unwrap()).await,
liveness: liveness.clone(),
};
run.run_forever().await;
unreachable!("run loop exited");
Expand Down Expand Up @@ -690,7 +700,11 @@ async fn shadow_mode(args: Arguments) -> ! {
.await
};

shared::metrics::serve_metrics(Arc::new(shadow::Liveness), args.metrics_address);
let liveness: Arc<Liveness> = Arc::new(Liveness {
KRD-Kai marked this conversation as resolved.
Show resolved Hide resolved
max_auction_age: args.max_auction_age,
last_auction_time: Instant::now().into(),
});
shared::metrics::serve_metrics(liveness.clone(), args.metrics_address);

let shadow = shadow::RunLoop::new(
orderbook,
Expand All @@ -699,6 +713,7 @@ async fn shadow_mode(args: Arguments) -> ! {
args.score_cap,
args.solve_deadline,
args.fee_policy,
liveness.clone(),
);
shadow.run_forever().await;

Expand Down
10 changes: 5 additions & 5 deletions crates/autopilot/src/run_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use {
},
infra::{self, blockchain::Ethereum},
protocol::fee,
run::Liveness,
solvable_orders::SolvableOrdersCache,
},
::observe::metrics,
Expand All @@ -25,11 +26,7 @@ use {
interaction::InteractionData,
order::{OrderClass, OrderUid},
solver_competition::{
CompetitionAuction,
Order,
Score,
SolverCompetitionDB,
SolverSettlement,
CompetitionAuction, Order, Score, SolverCompetitionDB, SolverSettlement,
},
},
number::nonzero::U256 as NonZeroU256,
Expand Down Expand Up @@ -59,6 +56,7 @@ pub struct RunLoop {
pub in_flight_orders: Arc<Mutex<InFlightOrders>>,
pub fee_policy: arguments::FeePolicy,
pub persistence: infra::persistence::Persistence,
pub liveness: Arc<Liveness>,
}

impl RunLoop {
Expand All @@ -74,6 +72,8 @@ impl RunLoop {
|| last_block.replace(current_block) != Some(current_block)
{
observe::log_auction_delta(id, &previous, &auction);
self.liveness.auction();

self.single_run(id, auction)
.instrument(tracing::info_span!("auction", id))
.await;
Expand Down
18 changes: 7 additions & 11 deletions crates/autopilot/src/shadow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use {
solve::{self},
},
protocol::{self, fee},
run::Liveness,
run_loop::{self, observe},
},
::observe::metrics,
Expand All @@ -26,20 +27,11 @@ use {
number::nonzero::U256 as NonZeroU256,
primitive_types::{H160, U256},
rand::seq::SliceRandom,
shared::{metrics::LivenessChecking, token_list::AutoUpdatingTokenList},
std::{cmp, time::Duration},
shared::token_list::AutoUpdatingTokenList,
std::{cmp, sync::Arc, time::Duration},
tracing::Instrument,
};

pub struct Liveness;
#[async_trait::async_trait]
impl LivenessChecking for Liveness {
async fn is_alive(&self) -> bool {
// can we somehow check that we keep processing auctions?
true
}
}

pub struct RunLoop {
orderbook: protocol::Orderbook,
drivers: Vec<Driver>,
Expand All @@ -49,6 +41,7 @@ pub struct RunLoop {
score_cap: U256,
solve_deadline: Duration,
fee_policy: FeePolicy,
liveness: Arc<Liveness>,
}

impl RunLoop {
Expand All @@ -59,6 +52,7 @@ impl RunLoop {
score_cap: U256,
solve_deadline: Duration,
fee_policy: FeePolicy,
liveness: Arc<Liveness>,
) -> Self {
Self {
orderbook,
Expand All @@ -69,6 +63,7 @@ impl RunLoop {
score_cap,
solve_deadline,
fee_policy,
liveness,
}
}

Expand All @@ -81,6 +76,7 @@ impl RunLoop {
};
observe::log_auction_delta(id, &previous, &auction);
previous = Some(auction.clone());
self.liveness.auction();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine at the moment but I think eventually we should consider making the distinction between ready and healthy.
That way we can be more cautious on /healthy and only update its timestamp if self.single_run returned without an error (i.e. we are actually settling auctions) and /healthy can be updated whenever we created a new auction (i.e. a new autopilot instance is ready to start working).

Additionally we might run into issues with the current code if we don't have any open order because self.next_auction() would return None. Then we'd report the autopilot as unhealthy although everything works as it should.


self.single_run(id, auction)
.instrument(tracing::info_span!("auction", id))
Expand Down
Loading