Skip to content

Commit

Permalink
Make /settle call blocking and report tx_hash (#1999)
Browse files Browse the repository at this point in the history
# Description
Currently the autopilot wastes a lot of time waiting for transactions
that will never appear. This happens because the driver's `/settle`
endpoint operates in a fire-and-forget (get request, kick off submission
in background, return immediately). That way the driver does not have
any way to communicate that it will not be able to submit the solution
to the autopilot and it will have to monitor the blockchain until the
deadline is reached.

This PR is slightly related to
#1974 but does not solve
the issue that solutions don't get submitted in the first place.

# Changes
This PR makes it so that `/settle` actually blocks in the driver until
the solution gets submitted and returns the tx_hash. If the driver is
not able to submit the solution (e.g. the simulations for the solution
start reverting) it will return an error to the autopilot which will
immediately move to the next auction.

Note that this is not really how it's supposed to work but this solution
is fine as long as we are running all the drivers (and can therefore
assume a reasonable behavior). We should revisit this decision before we
can encourage external parties to run their own driver.

## How to test
e2e tests
  • Loading branch information
MartinquaXD authored Oct 20, 2023
1 parent 11f62fd commit 1044860
Show file tree
Hide file tree
Showing 12 changed files with 102 additions and 146 deletions.
28 changes: 17 additions & 11 deletions crates/autopilot/src/driver_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,27 @@ impl Driver {
}

pub async fn solve(&self, request: &solve::Request) -> Result<solve::Response> {
self.request_response("solve", request).await
self.request_response("solve", request, None).await
}

pub async fn reveal(&self, request: &reveal::Request) -> Result<reveal::Response> {
self.request_response("reveal", request).await
self.request_response("reveal", request, None).await
}

pub async fn settle(&self, request: &settle::Request) -> Result<settle::Response> {
self.request_response("settle", request).await
pub async fn settle(
&self,
request: &settle::Request,
timeout: std::time::Duration,
) -> Result<settle::Response> {
self.request_response("settle", request, Some(timeout))
.await
}

async fn request_response<Response>(
&self,
path: &str,
request: &impl serde::Serialize,
timeout: Option<std::time::Duration>,
) -> Result<Response>
where
Response: serde::de::DeserializeOwned,
Expand All @@ -54,13 +60,13 @@ impl Driver {
body=%serde_json::to_string_pretty(request).unwrap(),
"request",
);
let mut response = self
.client
.post(url.clone())
.json(request)
.send()
.await
.context("send")?;
let mut request = self.client.post(url.clone()).json(request);

if let Some(timeout) = timeout {
request = request.timeout(timeout);
}

let mut response = request.send().await.context("send")?;
let status = response.status().as_u16();
let body = response_body_with_size_limit(&mut response, RESPONSE_SIZE_LIMIT)
.await
Expand Down
2 changes: 2 additions & 0 deletions crates/autopilot/src/driver_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ pub mod reveal {
pub mod settle {
use {
model::bytes_hex,
primitive_types::H256,
serde::{Deserialize, Serialize},
serde_with::serde_as,
};
Expand All @@ -212,6 +213,7 @@ pub mod settle {
#[serde(rename_all = "camelCase", deny_unknown_fields)]
pub struct Response {
pub calldata: Calldata,
pub tx_hash: H256,
}

#[serde_as]
Expand Down
116 changes: 18 additions & 98 deletions crates/autopilot/src/run_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,15 @@ use {
},
},
number::nonzero::U256 as NonZeroU256,
primitive_types::{H160, H256, U256},
primitive_types::{H160, U256},
rand::seq::SliceRandom,
shared::{
event_handling::MAX_REORG_BLOCK_COUNT,
remaining_amounts,
token_list::AutoUpdatingTokenList,
},
shared::{remaining_amounts, token_list::AutoUpdatingTokenList},
std::{
collections::{BTreeMap, HashSet},
sync::Arc,
time::{Duration, Instant},
},
tracing::Instrument,
web3::types::Transaction,
};

pub const SOLVE_TIME_LIMIT: Duration = Duration::from_secs(15);
Expand Down Expand Up @@ -290,7 +285,7 @@ impl RunLoop {
}

tracing::info!(driver = %driver.name, "settling");
match self.settle(driver, auction_id, solution, &revealed).await {
match self.settle(driver, solution, &revealed).await {
Ok(()) => Metrics::settle_ok(driver),
Err(err) => {
Metrics::settle_err(driver, &err);
Expand Down Expand Up @@ -413,7 +408,6 @@ impl RunLoop {
async fn settle(
&self,
driver: &Driver,
id: AuctionId,
solved: &Solution,
revealed: &reveal::Response,
) -> Result<(), SettleError> {
Expand All @@ -424,95 +418,27 @@ impl RunLoop {
.collect_vec();
self.database.store_order_events(&events).await;

driver
.settle(&settle::Request {
solution_id: solved.id,
})
let request = settle::Request {
solution_id: solved.id,
};

let tx_hash = driver
.settle(&request, self.max_settlement_transaction_wait)
.await
.map_err(SettleError::Failure)?;
.map_err(SettleError::Failure)?
.tx_hash;

// TODO: React to deadline expiring.
let transaction = self
.wait_for_settlement_transaction(id, solved.account)
.await?;
if let Some(tx) = transaction {
let events = revealed
.orders
.iter()
.map(|uid| (*uid, OrderEventLabel::Traded))
.collect_vec();
self.database.store_order_events(&events).await;
tracing::debug!("settled in tx {:?}", tx.hash);
} else {
tracing::warn!("could not find a mined transaction in time");
}
let events = revealed
.orders
.iter()
.map(|uid| (*uid, OrderEventLabel::Traded))
.collect_vec();
self.database.store_order_events(&events).await;
tracing::debug!(?tx_hash, "solution settled");

Ok(())
}

/// Tries to find a `settle` contract call with calldata ending in `tag`.
///
/// Returns None if no transaction was found within the deadline.
async fn wait_for_settlement_transaction(
&self,
id: AuctionId,
submission_address: H160,
) -> Result<Option<Transaction>, SettleError> {
// Start earlier than current block because there might be a delay when
// receiving the Solver's /execute response during which it already
// started broadcasting the tx.
let start_offset = MAX_REORG_BLOCK_COUNT;
let max_wait_time_blocks = (self.max_settlement_transaction_wait.as_secs_f32()
/ self.network_block_interval.as_secs_f32())
.ceil() as u64;
let current = self.current_block.borrow().number;
let start = current.saturating_sub(start_offset);
let deadline = current.saturating_add(max_wait_time_blocks);
tracing::debug!(
%current, %start, %deadline, ?id, ?submission_address,
"waiting for settlement",
);

// Use the existing event indexing infrastructure to find the transaction. We
// query all settlement events in the block range to get tx hashes and
// query the node for the full calldata.
//
// If the block range was large, we would make the query more efficient by
// moving the starting block up while taking reorgs into account. With
// the current range of 30 blocks this isn't necessary.
//
// We do keep track of hashes we have already seen to reduce load from the node.

let mut seen_transactions: HashSet<H256> = Default::default();
while self.current_block.borrow().number <= deadline {
let mut hashes = self
.database
.recent_settlement_tx_hashes(start..deadline + 1)
.await
.map_err(SettleError::Database)?;
hashes.retain(|hash| !seen_transactions.contains(hash));
for hash in hashes {
let Some(tx) = self
.web3
.eth()
.transaction(web3::types::TransactionId::Hash(hash))
.await
.map_err(|err| SettleError::TransactionFetch(hash, err))?
else {
continue;
};
if tx.input.0.ends_with(&id.to_be_bytes()) && tx.from == Some(submission_address) {
return Ok(Some(tx));
}
seen_transactions.insert(hash);
}
// It would be more correct to wait until just after the last event update run,
// but that is hard to synchronize.
tokio::time::sleep(self.network_block_interval.div_f32(2.)).await;
}
Ok(None)
}

/// Saves the competition data to the database
async fn save_competition(&self, competition: &Competition) -> Result<()> {
self.database.save_competition(competition).await
Expand Down Expand Up @@ -632,10 +558,6 @@ enum RevealError {

#[derive(Debug, thiserror::Error)]
enum SettleError {
#[error("unexpected database error: {0}")]
Database(anyhow::Error),
#[error("error fetching transaction receipts for {0:?}: {1}")]
TransactionFetch(H256, web3::Error),
#[error(transparent)]
Failure(anyhow::Error),
}
Expand Down Expand Up @@ -732,8 +654,6 @@ impl Metrics {

fn settle_err(driver: &Driver, err: &SettleError) {
let label = match err {
SettleError::Database(_) => "internal_error",
SettleError::TransactionFetch(..) => "tx_error",
SettleError::Failure(_) => "error",
};
Self::get()
Expand Down
2 changes: 1 addition & 1 deletion crates/driver/src/boundary/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl Mempool {
self.submitted_transactions.clone(),
web3.clone(),
&web3,
)?;
);
let receipt = submitter
.submit(
settlement.boundary.inner,
Expand Down
40 changes: 24 additions & 16 deletions crates/driver/src/domain/competition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use {
self::solution::settlement,
super::Mempools,
crate::{
domain::competition::solution::Settlement,
domain::{competition::solution::Settlement, eth},
infra::{
self,
blockchain::Ethereum,
Expand Down Expand Up @@ -208,21 +208,25 @@ impl Competition {
.unwrap()
.take()
.ok_or(Error::SolutionNotAvailable)?;
self.mempools.execute(&self.solver, &settlement);
Ok(Settled {
internalized_calldata: settlement
.calldata(
self.eth.contracts().settlement(),
settlement::Internalization::Enable,
)
.into(),
uninternalized_calldata: settlement
.calldata(
self.eth.contracts().settlement(),
settlement::Internalization::Disable,
)
.into(),
})

match self.mempools.execute(&self.solver, &settlement).await {
Err(_) => Err(Error::SubmissionError),
Ok(tx_hash) => Ok(Settled {
internalized_calldata: settlement
.calldata(
self.eth.contracts().settlement(),
settlement::Internalization::Enable,
)
.into(),
uninternalized_calldata: settlement
.calldata(
self.eth.contracts().settlement(),
settlement::Internalization::Disable,
)
.into(),
tx_hash,
}),
}
}

/// The ID of the auction being competed on.
Expand Down Expand Up @@ -264,6 +268,8 @@ pub struct Settled {
/// can manually enforce certain rules which can not be enforced
/// automatically.
pub uninternalized_calldata: Bytes<Vec<u8>>,
/// The transaction hash in which the solution was submitted.
pub tx_hash: eth::TxId,
}

#[derive(Debug, thiserror::Error)]
Expand All @@ -277,4 +283,6 @@ pub enum Error {
DeadlineExceeded(#[from] solution::DeadlineExceeded),
#[error("solver error: {0:?}")]
Solver(#[from] solver::Error),
#[error("failed to submit the solution")]
SubmissionError,
}
30 changes: 20 additions & 10 deletions crates/driver/src/domain/mempools.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
crate::{
domain::competition::solution::Settlement,
domain::{competition::solution::Settlement, eth},
infra::{self, observe, solver::Solver},
},
futures::{future::select_ok, FutureExt},
Expand All @@ -21,17 +21,19 @@ impl Mempools {
}
}

/// Publish a settlement to the mempools. Wait until it is confirmed in the
/// background.
pub fn execute(&self, solver: &Solver, settlement: &Settlement) {
/// Publish a settlement to the mempools.
pub async fn execute(
&self,
solver: &Solver,
settlement: &Settlement,
) -> Result<eth::TxId, AllFailed> {
let auction_id = settlement.auction_id;
let solver_name = solver.name();
tokio::spawn(select_ok(self.0.iter().cloned().map(|mempool| {
let solver = solver.clone();
let settlement = settlement.clone();

let (tx_hash, _remaining_futures) = select_ok(self.0.iter().cloned().map(|mempool| {
async move {
let result = mempool.execute(&solver, settlement.clone()).await;
observe::mempool_executed(&mempool, &settlement, &result);
let result = mempool.execute(solver, settlement.clone()).await;
observe::mempool_executed(&mempool, settlement, &result);
result
}
.instrument(tracing::info_span!(
Expand All @@ -40,7 +42,11 @@ impl Mempools {
?auction_id,
))
.boxed()
})));
}))
.await
.map_err(|_| AllFailed)?;

Ok(tx_hash)
}

/// Defines if the mempools are configured in a way that guarantees that
Expand Down Expand Up @@ -70,3 +76,7 @@ pub enum RevertProtection {
Enabled,
Disabled,
}

#[derive(Debug, Error)]
#[error("none of the submission strategies successfully submitted the solution")]
pub struct AllFailed;
3 changes: 3 additions & 0 deletions crates/driver/src/infra/api/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ enum Kind {
InvalidTokens,
InvalidAmounts,
QuoteSameTokens,
FailedToSubmit,
}

#[derive(Debug, Serialize)]
Expand Down Expand Up @@ -49,6 +50,7 @@ impl From<Kind> for (hyper::StatusCode, axum::Json<Error>) {
"Invalid order specified in the auction, some orders have either a 0 remaining buy \
or sell amount"
}
Kind::FailedToSubmit => "Could not submit the solution to the blockchain",
};
(
hyper::StatusCode::BAD_REQUEST,
Expand Down Expand Up @@ -79,6 +81,7 @@ impl From<competition::Error> for (hyper::StatusCode, axum::Json<Error>) {
competition::Error::SolutionNotAvailable => Kind::SolutionNotAvailable,
competition::Error::DeadlineExceeded(_) => Kind::DeadlineExceeded,
competition::Error::Solver(_) => Kind::SolverFailed,
competition::Error::SubmissionError => Kind::FailedToSubmit,
};
error.into()
}
Expand Down
Loading

0 comments on commit 1044860

Please sign in to comment.