Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for multiple builder and block selection #24

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 19 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ hyper-util = { version = "0.1", features = ["full"] }
serde_json = "1.0.96"
reth-rpc-layer = { git = "https://github.com/paradigmxyz/reth.git", tag = "v1.0.7" }
reth-optimism-payload-builder = { git = "https://github.com/paradigmxyz/reth.git", tag = "v1.0.7", features = ["optimism"] }
futures = "0.3.31"

[dev-dependencies]
anyhow = "1.0"
Expand Down
16 changes: 12 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use tracing_subscriber::EnvFilter;

mod error;
mod proxy;
mod selector;
mod server;

#[derive(Parser, Debug)]
Expand Down Expand Up @@ -45,7 +46,7 @@ struct Args {

/// URL of the builder execution engine
#[arg(long, env)]
builder_url: String,
builder_urls: Vec<String>,

/// Use the proposer to sync the builder node
#[arg(long, env, default_value = "false")]
Expand Down Expand Up @@ -112,12 +113,19 @@ async fn main() -> Result<()> {
// Initialize the l2 client
let l2_client = create_client(&args.l2_url, jwt_secret)?;

// Initialize the builder client
let builder_client = create_client(&args.builder_url, builder_jwt_secret)?;
// Initialize the builder clients
let builder_clients = args
.builder_urls
.iter()
.map(|url| create_client(url, builder_jwt_secret))
.collect::<Result<Vec<_>>>()?;

let eth_engine_api = EthEngineApi::new(
Arc::new(l2_client),
Arc::new(builder_client),
builder_clients
.iter()
.map(|c| Arc::new(c.clone()))
.collect(),
args.boost_sync,
);
let mut module: RpcModule<()> = RpcModule::new(());
Expand Down
28 changes: 28 additions & 0 deletions src/selector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use jsonrpsee::core::ClientError;
use op_alloy_rpc_types_engine::OptimismExecutionPayloadEnvelopeV3;

// Define a trait for choosing a payload
pub trait PayloadSelector {
fn select_payload(
&self,
local_payload: Result<OptimismExecutionPayloadEnvelopeV3, ClientError>,
builder_payloads: Vec<Result<OptimismExecutionPayloadEnvelopeV3, ClientError>>,
) -> Result<OptimismExecutionPayloadEnvelopeV3, ClientError>;
}

pub struct DefaultPayloadSelector;

impl PayloadSelector for DefaultPayloadSelector {
fn select_payload(
&self,
local_payload: Result<OptimismExecutionPayloadEnvelopeV3, ClientError>,
builder_payloads: Vec<Result<OptimismExecutionPayloadEnvelopeV3, ClientError>>,
) -> Result<OptimismExecutionPayloadEnvelopeV3, ClientError> {
builder_payloads
.iter()
.filter_map(|payload| payload.as_ref().ok())
.max_by_key(|p| p.block_value)
.map(|p| Ok(p.clone()))
.unwrap_or(local_payload)
}
}
117 changes: 66 additions & 51 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use alloy_rpc_types_engine::{
ExecutionPayload, ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, PayloadId,
PayloadStatus,
};
use futures::future::join_all;
use jsonrpsee::core::{async_trait, ClientError, RpcResult};
use jsonrpsee::http_client::transport::HttpBackend;
use jsonrpsee::http_client::HttpClient;
Expand All @@ -16,6 +17,8 @@ use reth_rpc_layer::AuthClientService;
use std::sync::Arc;
use tracing::{error, info};

use crate::selector::{DefaultPayloadSelector, PayloadSelector};

#[rpc(server, client, namespace = "engine")]
pub trait EngineApi {
#[method(name = "forkchoiceUpdatedV3")]
Expand All @@ -42,20 +45,22 @@ pub trait EngineApi {

pub struct EthEngineApi<S = AuthClientService<HttpBackend>> {
l2_client: Arc<HttpClient<S>>,
builder_client: Arc<HttpClient<S>>,
builder_clients: Vec<Arc<HttpClient<S>>>,
payload_selector: Arc<dyn PayloadSelector + Send + Sync>,
boost_sync: bool,
}

impl<S> EthEngineApi<S> {
pub fn new(
l2_client: Arc<HttpClient<S>>,
builder_client: Arc<HttpClient<S>>,
builder_clients: Vec<Arc<HttpClient<S>>>,
boost_sync: bool,
) -> Self {
Self {
l2_client,
builder_client,
builder_clients,
boost_sync,
payload_selector: Arc::new(DefaultPayloadSelector),
}
}
}
Expand Down Expand Up @@ -85,11 +90,12 @@ impl EngineApiServer for EthEngineApi {
};

if should_send_to_builder {
// async call to builder to trigger payload building and sync
let builder = self.builder_client.clone();
let attr = payload_attributes.clone();
tokio::spawn(async move {
builder.fork_choice_updated_v3(fork_choice_state, attr).await.map(|response| {
// async call to each builder to trigger payload building and sync
for builder in self.builder_clients.iter() {
let builder = builder.clone();
let attr = payload_attributes.clone();
tokio::spawn(async move {
builder.fork_choice_updated_v3(fork_choice_state, attr).await.map(|response| {
let payload_id_str = response.payload_id.map(|id| id.to_string()).unwrap_or_default();
if response.is_invalid() {
error!(message = "builder rejected fork_choice_updated_v3 with attributes", "payload_id" = payload_id_str, "validation_error" = %response.payload_status.status);
Expand All @@ -99,7 +105,8 @@ impl EngineApiServer for EthEngineApi {
}).map_err(|e| {
error!(message = "error calling fork_choice_updated_v3 to builder", "error" = %e, "head_block_hash" = %fork_choice_state.head_block_hash);
})
});
});
}
} else {
info!(message = "no payload attributes provided or no_tx_pool is set", "head_block_hash" = %fork_choice_state.head_block_hash);
}
Expand All @@ -126,47 +133,53 @@ impl EngineApiServer for EthEngineApi {
) -> RpcResult<OptimismExecutionPayloadEnvelopeV3> {
info!(message = "received get_payload_v3", "payload_id" = %payload_id);
let l2_client_future = self.l2_client.get_payload_v3(payload_id);
let builder_client_future = Box::pin(async {
let payload = self.builder_client.get_payload_v3(payload_id).await.map_err(|e| {
error!(message = "error calling get_payload_v3 from builder", "error" = %e, "payload_id" = %payload_id);
e
})?;

info!(message = "received payload from builder", "payload_id" = %payload_id, "block_hash" = %payload.as_v1_payload().block_hash);

// Send the payload to the local execution engine with engine_newPayload to validate the block from the builder.
// Otherwise, we do not want to risk the network to a halt since op-node will not be able to propose the block.
// If validation fails, return the local block since that one has already been validated.
let payload_status = self.l2_client.new_payload_v3(payload.execution_payload.clone(), vec![], payload.parent_beacon_block_root).await.map_err(|e| {
error!(message = "error calling new_payload_v3 to validate builder payload", "error" = %e, "payload_id" = %payload_id);
e
})?;
if payload_status.is_invalid() {
let builder_client_futures = self.builder_clients.iter().map(|builder| {
let builder = builder.clone();
Box::pin(async move {
let payload = builder.get_payload_v3(payload_id).await.map_err(|e| {
error!(message = "error calling get_payload_v3 from builder", "error" = %e, "payload_id" = %payload_id);
e
})?;

info!(message = "received payload from builder", "payload_id" = %payload_id, "block_hash" = %payload.as_v1_payload().block_hash);

// Send the payload to the local execution engine with engine_newPayload to validate the block from the builder.
// Otherwise, we do not want to risk the network to a halt since op-node will not be able to propose the block.
// If validation fails, return the local block since that one has already been validated.
let payload_status = self.l2_client.new_payload_v3(payload.execution_payload.clone(), vec![], payload.parent_beacon_block_root).await.map_err(|e| {
error!(message = "error calling new_payload_v3 to validate builder payload", "error" = %e, "payload_id" = %payload_id);
e
})?;
if payload_status.is_invalid() {
error!(message = "builder payload was not valid", "payload_status" = %payload_status.status, "payload_id" = %payload_id);
Err(ClientError::Call(ErrorObject::owned(
INVALID_REQUEST_CODE,
"Builder payload was not valid",
None::<String>,
)))
} else {
info!(message = "received payload status from local execution engine validating builder payload", "payload_id" = %payload_id);
Ok(payload)
}
});

let (l2_payload, builder_payload) = tokio::join!(l2_client_future, builder_client_future);

builder_payload.or(l2_payload).map_err(|e| match e {
ClientError::Call(err) => err, // Already an ErrorObjectOwned, so just return it
other_error => {
error!(
message = "error calling get_payload_v3",
"error" = %other_error,
"payload_id" = %payload_id
);
ErrorCode::InternalError.into()
}
})
)))
} else {
info!(message = "received payload status from local execution engine validating builder payload", "payload_id" = %payload_id);
Ok(payload)
}
})
}).collect::<Vec<_>>();

let (l2_payload, builder_payloads) =
tokio::join!(l2_client_future, join_all(builder_client_futures));

self.payload_selector
.select_payload(l2_payload, builder_payloads)
.map_err(|e| match e {
ClientError::Call(err) => err, // Already an ErrorObjectOwned, so just return it
other_error => {
error!(
message = "error calling get_payload_v3",
"error" = %other_error,
"payload_id" = %payload_id
);
ErrorCode::InternalError.into()
}
})
}

async fn new_payload_v3(
Expand All @@ -180,11 +193,12 @@ impl EngineApiServer for EthEngineApi {

// async call to builder to sync the builder node
if self.boost_sync {
let builder = self.builder_client.clone();
let builder_payload = payload.clone();
let builder_versioned_hashes = versioned_hashes.clone();
tokio::spawn(async move {
builder.new_payload_v3(builder_payload, builder_versioned_hashes, parent_beacon_block_root).await
for builder in self.builder_clients.iter() {
let builder = builder.clone();
let builder_payload = payload.clone();
let builder_versioned_hashes = versioned_hashes.clone();
tokio::spawn(async move {
builder.new_payload_v3(builder_payload, builder_versioned_hashes, parent_beacon_block_root).await
.map(|response: PayloadStatus| {
if response.is_invalid() {
error!(message = "builder rejected new_payload_v3", "block_hash" = %block_hash);
Expand All @@ -195,7 +209,8 @@ impl EngineApiServer for EthEngineApi {
error!(message = "error calling new_payload_v3 to builder", "error" = %e, "block_hash" = %block_hash);
e
})
});
});
}
}

self.l2_client
Expand Down
Loading