Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: local -> external builder mapping #47

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 90 additions & 38 deletions src/server.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::metrics::ServerMetrics;
use std::num::NonZero;
use std::sync::Arc;

use alloy_primitives::{Bytes, B256};
use alloy_rpc_types_engine::{
ExecutionPayload, ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, PayloadId,
Expand All @@ -18,17 +20,18 @@ use opentelemetry::{Context, KeyValue};
use reth_optimism_payload_builder::{OpPayloadAttributes, OpPayloadBuilderAttributes};
use reth_payload_primitives::PayloadBuilderAttributes;
use reth_rpc_layer::AuthClientService;
use std::num::NonZero;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::{debug, error, info};

use crate::metrics::ServerMetrics;

const CACHE_SIZE: usize = 100;

struct PayloadTraceContext {
tracer: Arc<BoxedTracer>,
block_hash_to_payload_ids: Arc<Mutex<LruCache<B256, Vec<PayloadId>>>>,
payload_id_to_span: Arc<Mutex<LruCache<PayloadId, Arc<BoxedSpan>>>>,
local_to_external_payload_ids: Arc<Mutex<LruCache<PayloadId, PayloadId>>>,
}

impl PayloadTraceContext {
Expand All @@ -41,6 +44,9 @@ impl PayloadTraceContext {
payload_id_to_span: Arc::new(Mutex::new(LruCache::new(
NonZero::new(CACHE_SIZE).unwrap(),
))),
local_to_external_payload_ids: Arc::new(Mutex::new(LruCache::new(
NonZero::new(CACHE_SIZE).unwrap(),
))),
}
}

Expand Down Expand Up @@ -84,6 +90,16 @@ impl PayloadTraceContext {
}
block_hash_to_payload_ids.pop(block_hash);
}

async fn store_payload_id_mapping(&self, local_id: PayloadId, external_id: PayloadId) {
let mut local_to_external = self.local_to_external_payload_ids.lock().await;
local_to_external.put(local_id, external_id);
}

async fn get_external_payload_id(&self, local_id: &PayloadId) -> Option<PayloadId> {
let mut store = self.local_to_external_payload_ids.lock().await;
store.get(local_id).copied()
}
}

#[rpc(server, client, namespace = "engine")]
Expand Down Expand Up @@ -210,6 +226,25 @@ where
"has_attributes" = payload_attributes.is_some(),
);

// First get the local payload ID from L2 client
let l2_response = self
.l2_client
.client
.fork_choice_updated_v3(fork_choice_state.clone(), payload_attributes.clone())
.await
.map_err(|e| match e {
ClientError::Call(err) => err,
other_error => {
error!(
message = "error calling fork_choice_updated_v3 for l2 client",
"url" = self.l2_client.url,
"error" = %other_error,
"head_block_hash" = %fork_choice_state.head_block_hash,
);
ErrorCode::InternalError.into()
}
})?;

let use_tx_pool = payload_attributes
.as_ref()
.map(|attr| !attr.no_tx_pool.unwrap_or_default());
Expand All @@ -235,18 +270,23 @@ where
3,
)
.unwrap();
let payload_id = builder_attrs.payload_id();
let local_payload_id = builder_attrs.payload_id();
parent_span.set_attribute(KeyValue::new(
"parent_hash",
fork_choice_state.head_block_hash.to_string(),
));
parent_span
.set_attribute(KeyValue::new("timestamp", builder_attrs.timestamp() as i64));
parent_span.set_attribute(KeyValue::new("payload_id", payload_id.to_string()));
parent_span
.set_attribute(KeyValue::new("payload_id", local_payload_id.to_string()));
let ctx =
Context::current().with_remote_span_context(parent_span.span_context().clone());
self.payload_trace_context
.store(payload_id, fork_choice_state.head_block_hash, parent_span)
.store(
local_payload_id,
fork_choice_state.head_block_hash,
parent_span,
)
.await;
Some(
self.payload_trace_context
Expand All @@ -263,23 +303,41 @@ where
}
let builder = self.builder_client.clone();
let attr = payload_attributes.clone();
let payload_trace_context = self.payload_trace_context.clone();
let local_payload_id = l2_response.payload_id;
tokio::spawn(async move {
let _ = builder.client.fork_choice_updated_v3(fork_choice_state, attr).await.map(|response| {
let payload_id_str = response.payload_id.map(|id| id.to_string()).unwrap_or_default();
if response.is_invalid() {
error!(message = "builder rejected fork_choice_updated_v3 with attributes", "url" = builder.url, "payload_id" = payload_id_str, "validation_error" = %response.payload_status.status);
} else {
info!(message = "called fork_choice_updated_v3 to builder with payload attributes", "url" = builder.url, "payload_status" = %response.payload_status.status, "payload_id" = payload_id_str);
match builder
.client
.fork_choice_updated_v3(fork_choice_state, attr)
.await
{
Ok(response) => {
let external_payload_id = response.payload_id;
if let (Some(local_id), Some(external_id)) =
(local_payload_id, external_payload_id)
{
payload_trace_context
.store_payload_id_mapping(local_id, external_id)
.await;
}
})
.map_err(|e| {
let payload_id_str = external_payload_id
.map(|id| id.to_string())
.unwrap_or_default();
if response.is_invalid() {
error!(message = "builder rejected fork_choice_updated_v3 with attributes", "url" = builder.url, "payload_id" = payload_id_str, "validation_error" = %response.payload_status.status);
} else {
info!(message = "called fork_choice_updated_v3 to builder with payload attributes", "url" = builder.url, "payload_status" = %response.payload_status.status, "payload_id" = payload_id_str);
}
}
Err(e) => {
error!(
message = "error calling fork_choice_updated_v3 to builder",
"url" = builder.url,
"url" = builder.url,
"error" = %e,
"head_block_hash" = %fork_choice_state.head_block_hash
);
});
}
}
if let Some(mut s) = span {
s.end()
};
Expand All @@ -288,22 +346,7 @@ where
info!(message = "no payload attributes provided or no_tx_pool is set", "head_block_hash" = %fork_choice_state.head_block_hash);
}

self.l2_client
.client
.fork_choice_updated_v3(fork_choice_state, payload_attributes)
.await
.map_err(|e| match e {
ClientError::Call(err) => err, // Already an ErrorObjectOwned, so just return it
other_error => {
error!(
message = "error calling fork_choice_updated_v3 for l2 client",
"url" = self.l2_client.url,
"error" = %other_error,
"head_block_hash" = %fork_choice_state.head_block_hash,
);
ErrorCode::InternalError.into()
}
})
Ok(l2_response)
}

async fn get_payload_v3(
Expand All @@ -327,14 +370,21 @@ where
)
});

// Get the external builder's payload ID that corresponds to our local payload ID
let external_payload_id = self
.payload_trace_context
.get_external_payload_id(&payload_id)
.await
.unwrap_or(payload_id); // Fallback to local ID if no mapping exists

let builder = self.builder_client.clone();
let payload = builder.client.get_payload_v3(payload_id).await.map_err(|e| {
error!(message = "error calling get_payload_v3 from builder", "url" = builder.url, "error" = %e, "payload_id" = %payload_id);
let payload = builder.client.get_payload_v3(external_payload_id).await.map_err(|e| {
error!(message = "error calling get_payload_v3 from builder", "url" = builder.url, "error" = %e, "local_payload_id" = %payload_id, "external_payload_id" = %external_payload_id);
e
})?;

let block_hash = ExecutionPayload::from(payload.clone().execution_payload).block_hash();
info!(message = "received payload from builder", "payload_id" = %payload_id, "block_hash" = %block_hash);
info!(message = "received payload from builder", "local_payload_id" = %payload_id, "external_payload_id" = %external_payload_id, "block_hash" = %block_hash);

// Send the payload to the local execution engine with engine_newPayload to validate the block from the builder.
// Otherwise, we do not want to risk the network to a halt since op-node will not be able to propose the block.
Expand All @@ -343,7 +393,7 @@ where
metrics.new_payload_count.increment(1);
}
let payload_status = self.l2_client.client.new_payload_v3(payload.execution_payload.clone(), vec![], payload.parent_beacon_block_root).await.map_err(|e| {
error!(message = "error calling new_payload_v3 to validate builder payload", "url" = self.l2_client.url, "error" = %e, "payload_id" = %payload_id);
error!(message = "error calling new_payload_v3 to validate builder payload", "url" = self.l2_client.url, "error" = %e, "local_payload_id" = %payload_id, "external_payload_id" = %external_payload_id);
e
})?;
if let Some(mut s) = span {
Expand All @@ -356,14 +406,14 @@ where
}
};
if payload_status.is_invalid() {
error!(message = "builder payload was not valid", "url" = builder.url, "payload_status" = %payload_status.status, "payload_id" = %payload_id);
error!(message = "builder payload was not valid", "url" = builder.url, "payload_status" = %payload_status.status, "local_payload_id" = %payload_id, "external_payload_id" = %external_payload_id);
Err(ClientError::Call(ErrorObject::owned(
INVALID_REQUEST_CODE,
"Builder payload was not valid",
None::<String>,
)))
} else {
info!(message = "received payload status from local execution engine validating builder payload", "payload_id" = %payload_id);
info!(message = "received payload status from local execution engine validating builder payload", "local_payload_id" = %payload_id, "external_payload_id" = %external_payload_id);
Ok(payload)
}
});
Expand Down Expand Up @@ -604,6 +654,8 @@ mod tests {

#[tokio::test]
async fn test_server() {
let _ = tracing_subscriber::fmt::try_init();

engine_success().await;
boost_sync_enabled().await;
builder_payload_err().await;
Expand Down