Skip to content

Commit

Permalink
Refactor runloop slightly (#2958)
Browse files Browse the repository at this point in the history
# Description
Mostly a refactor with a few changes to improve observability.

# Changes
Refactors:
* move fairness checks and solutions sorting into `competition` to have
it return the final state of the competition
* combine a bunch of logic to report on the proposed solutions into
`report_on_solutions()`
* rename `solve()` to `try_solve()` and introduce a new function
`solve()` that does all the error handling to make `competition()`
leaner

Observability:
* `info` log whenever we see a new block
* log which solver proposed a solution for which order (to have a
holistic view on the auction in the autopilot without having to stitch
together driver logs
* record `Executed` order events together with `Considered` as suggested
[here](#2921 (comment))
to have a consistent view in the DB

## How to test
e2e tests should still pass
  • Loading branch information
MartinquaXD authored Sep 11, 2024
1 parent a25053d commit 6c6d982
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 104 deletions.
4 changes: 2 additions & 2 deletions crates/autopilot/src/domain/competition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl SolutionWithId {
self.solution.score()
}

pub fn order_ids(&self) -> impl Iterator<Item = &domain::OrderUid> {
pub fn order_ids(&self) -> impl Iterator<Item = &domain::OrderUid> + std::fmt::Debug {
self.solution.order_ids()
}

Expand Down Expand Up @@ -85,7 +85,7 @@ impl Solution {
self.score
}

pub fn order_ids(&self) -> impl Iterator<Item = &domain::OrderUid> {
pub fn order_ids(&self) -> impl Iterator<Item = &domain::OrderUid> + std::fmt::Debug {
self.orders.keys()
}

Expand Down
3 changes: 2 additions & 1 deletion crates/autopilot/src/infra/persistence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,11 @@ impl Persistence {
/// because this is just debugging information.
pub fn store_order_events(
&self,
order_uids: Vec<domain::OrderUid>,
order_uids: impl IntoIterator<Item = domain::OrderUid>,
label: boundary::OrderEventLabel,
) {
let db = self.postgres.clone();
let order_uids = order_uids.into_iter().collect();
tokio::spawn(
async move {
let mut tx = db.pool.acquire().await.expect("failed to acquire tx");
Expand Down
215 changes: 120 additions & 95 deletions crates/autopilot/src/run_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,44 +182,13 @@ impl RunLoop {

Metrics::pre_processed(single_run_start.elapsed());

let mut solutions = {
let mut solutions = self.competition(auction_id, auction).await;
if solutions.is_empty() {
tracing::info!("no solutions for auction");
return;
}

// Shuffle so that sorting randomly splits ties.
solutions.shuffle(&mut rand::thread_rng());
solutions.sort_unstable_by_key(|participant| participant.solution.score().get().0);
solutions
};
let competition_simulation_block = self.eth.current_block().borrow().number;

// Make sure the winning solution is fair.
while !Self::is_solution_fair(solutions.last(), &solutions, auction) {
let unfair_solution = solutions.pop().expect("must exist");
warn!(
invalidated = unfair_solution.driver.name,
"fairness check invalidated of solution"
);
let solutions = self.competition(auction_id, auction).await;
if solutions.is_empty() {
tracing::info!("no solutions for auction");
return;
}

let considered_orders: HashSet<_> = solutions
.iter()
.flat_map(|solution| solution.solution.order_ids().copied())
.collect();
let winning_orders: HashSet<_> = solutions
.last()
.into_iter()
.flat_map(|solution| solution.solution.order_ids().copied())
.collect();
let considered_orders: Vec<_> = considered_orders
.difference(&winning_orders)
.cloned()
.collect();
self.persistence
.store_order_events(considered_orders, OrderEventLabel::Considered);
let competition_simulation_block = self.eth.current_block().borrow().number;

// TODO: Keep going with other solutions until some deadline.
if let Some(Participant { driver, solution }) = solutions.last() {
Expand All @@ -239,7 +208,6 @@ impl RunLoop {
};

let block_deadline = competition_simulation_block + self.submission_deadline;
let auction_uids = auction.orders.iter().map(|o| o.uid).collect::<HashSet<_>>();

// Post-processing should not be executed asynchronously since it includes steps
// of storing all the competition/auction-related data to the DB.
Expand Down Expand Up @@ -271,18 +239,7 @@ impl RunLoop {
tracing::warn!(?err, driver = %driver.name, "settlement failed");
}
}
let solution_uids = solution.order_ids().copied().collect::<HashSet<_>>();

let unsettled_orders: HashSet<_> = solutions
.iter()
// Report orders that were part of any solution candidate
.flat_map(|p| p.solution.order_ids())
// but not part of the winning one
.filter(|uid| !solution_uids.contains(uid))
// yet still part of the auction (filter out jit orders)
.filter(|uid| auction_uids.contains(uid))
.collect();
Metrics::matched_unsettled(driver, unsettled_orders);
Metrics::single_run_completed(single_run_start.elapsed());
}
}
Expand Down Expand Up @@ -429,6 +386,7 @@ impl RunLoop {
}

/// Runs the solver competition, making all configured drivers participate.
/// Returns all fair solutions sorted by their score (worst to best).
async fn competition(
&self,
id: domain::auction::Id,
Expand All @@ -442,45 +400,80 @@ impl RunLoop {
);
let request = &request;

let order_uids = auction.orders.iter().map(|o| OrderUid(o.uid.0)).collect();
let order_uids = auction.orders.iter().map(|o| OrderUid(o.uid.0));
self.persistence
.store_order_events(order_uids, OrderEventLabel::Ready);

let start = Instant::now();
futures::future::join_all(self.drivers.iter().map(|driver| async move {
let result = self.solve(driver, request).await;
let solutions = match result {
Ok(solutions) => {
Metrics::solve_ok(driver, start.elapsed());
solutions
}
Err(err) => {
Metrics::solve_err(driver, start.elapsed(), &err);
if matches!(err, SolveError::NoSolutions) {
tracing::debug!(driver = %driver.name, "solver found no solution");
} else {
tracing::warn!(?err, driver = %driver.name, "solve error");
}
vec![]
}
};

solutions.into_iter().filter_map(|solution| match solution {
Ok(solution) => {
Metrics::solution_ok(driver);
Some(Participant { driver, solution })
}
Err(err) => {
Metrics::solution_err(driver, &err);
tracing::debug!(?err, driver = %driver.name, "invalid proposed solution");
None
}
})
}))
let mut solutions = futures::future::join_all(
self.drivers
.iter()
.map(|driver| self.solve(driver, request)),
)
.await
.into_iter()
.flatten()
.collect()
.collect::<Vec<_>>();

// Shuffle so that sorting randomly splits ties.
solutions.shuffle(&mut rand::thread_rng());
solutions.sort_unstable_by_key(|participant| participant.solution.score().get().0);

// Make sure the winning solution is fair.
while !Self::is_solution_fair(solutions.last(), &solutions, auction) {
let unfair_solution = solutions.pop().expect("must exist");
warn!(
invalidated = unfair_solution.driver.name,
"fairness check invalidated of solution"
);
}
self.report_on_solutions(&solutions, auction);

solutions
}

/// Records metrics, order events and logs for the given solutions.
/// Expects the winning solution to be the last in the list.
fn report_on_solutions(&self, solutions: &[Participant<'_>], auction: &domain::Auction) {
let Some(winner) = solutions.last() else {
// no solutions means nothing to report
return;
};

solutions.iter().for_each(|solution| {
tracing::debug!(
driver=%solution.driver.name,
orders=?solution.solution.order_ids(),
solution=solution.solution.id(),
"proposed solution"
);
});

let proposed_orders: HashSet<_> = solutions
.iter()
.flat_map(|solution| solution.solution.order_ids().copied())
.collect();
let winning_orders: HashSet<_> = solutions
.last()
.into_iter()
.flat_map(|solution| solution.solution.order_ids().copied())
.collect();
let mut non_winning_orders: HashSet<_> = proposed_orders
.difference(&winning_orders)
.cloned()
.collect();
self.persistence.store_order_events(
non_winning_orders.iter().cloned(),
OrderEventLabel::Considered,
);
self.persistence
.store_order_events(winning_orders, OrderEventLabel::Executing);

let auction_uids = auction.orders.iter().map(|o| o.uid).collect::<HashSet<_>>();

// Report orders that were part of a non-winning solution candidate
// but only if they were part of the auction (filter out jit orders)
non_winning_orders.retain(|uid| auction_uids.contains(uid));
Metrics::matched_unsettled(winner.driver, non_winning_orders);
}

/// Returns true if winning solution is fair or winner is None
Expand Down Expand Up @@ -570,8 +563,49 @@ impl RunLoop {
!unfair
}

/// Computes a driver's solutions for the solver competition.
async fn solve(
/// Sends a `/solve` request to the driver and manages all error cases and
/// records metrics and logs appropriately.
async fn solve<'a>(
&self,
driver: &'a infra::Driver,
request: &solve::Request,
) -> Vec<Participant<'a>> {
let start = Instant::now();
let result = self.try_solve(driver, request).await;
let solutions = match result {
Ok(solutions) => {
Metrics::solve_ok(driver, start.elapsed());
solutions
}
Err(err) => {
Metrics::solve_err(driver, start.elapsed(), &err);
if matches!(err, SolveError::NoSolutions) {
tracing::debug!(driver = %driver.name, "solver found no solution");
} else {
tracing::warn!(?err, driver = %driver.name, "solve error");
}
vec![]
}
};

solutions
.into_iter()
.filter_map(|solution| match solution {
Ok(solution) => {
Metrics::solution_ok(driver);
Some(Participant { driver, solution })
}
Err(err) => {
Metrics::solution_err(driver, &err);
tracing::debug!(?err, driver = %driver.name, "invalid proposed solution");
None
}
})
.collect()
}

/// Sends `/solve` request to the driver and forwards errors to the caller.
async fn try_solve(
&self,
driver: &infra::Driver,
request: &solve::Request,
Expand All @@ -593,13 +627,8 @@ impl RunLoop {
let futures = solutions.into_iter().map(|solution| async {
let solution = solution?;
let solver = solution.solver();
let is_allowed = self
.eth
.contracts()
.authenticator()
.is_solver(solver.into())
.call()
.await;
let authenticator = self.eth.contracts().authenticator();
let is_allowed = authenticator.is_solver(solver.into()).call().await;

match is_allowed {
Ok(true) => Ok(solution),
Expand Down Expand Up @@ -651,10 +680,6 @@ impl RunLoop {
auction_id: i64,
submission_deadline_latest_block: u64,
) -> Result<(), SettleError> {
let order_ids = solved.order_ids().copied().collect();
self.persistence
.store_order_events(order_ids, OrderEventLabel::Executing);

let request = settle::Request {
solution_id: solved.id(),
submission_deadline_latest_block,
Expand Down Expand Up @@ -904,7 +929,7 @@ impl Metrics {
.observe(elapsed.as_secs_f64());
}

fn matched_unsettled(winning: &infra::Driver, unsettled: HashSet<&domain::OrderUid>) {
fn matched_unsettled(winning: &infra::Driver, unsettled: HashSet<domain::OrderUid>) {
if !unsettled.is_empty() {
tracing::debug!(?unsettled, "some orders were matched but not settled");
}
Expand Down
8 changes: 2 additions & 6 deletions crates/autopilot/src/solvable_orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,17 +246,13 @@ impl SolvableOrdersCache {
// spawning a background task since `order_events` table insert operation takes
// a while and the result is ignored.
self.persistence.store_order_events(
invalid_order_uids
.iter()
.map(|id| domain::OrderUid(id.0))
.collect(),
invalid_order_uids.iter().map(|id| domain::OrderUid(id.0)),
OrderEventLabel::Invalid,
);
self.persistence.store_order_events(
filtered_order_events
.iter()
.map(|id| domain::OrderUid(id.0))
.collect(),
.map(|id| domain::OrderUid(id.0)),
OrderEventLabel::Filtered,
);

Expand Down
1 change: 1 addition & 0 deletions crates/ethrpc/src/block_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ pub async fn current_block_stream(
continue;
}

tracing::info!(number=%block.number, hash=?block.hash, "noticed a new block");
if sender.send(block).is_err() {
tracing::debug!("exiting polling loop");
break;
Expand Down

0 comments on commit 6c6d982

Please sign in to comment.