Skip to content

Commit

Permalink
Add builder url to logs
Browse files Browse the repository at this point in the history
  • Loading branch information
avalonche committed Nov 5, 2024
1 parent 09fe1c4 commit 9098595
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 36 deletions.
5 changes: 5 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Ignore build artifacts
target

# Ignore version control directories
.git
15 changes: 8 additions & 7 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
FROM lukemathwalker/cargo-chef:latest AS chef
WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get -y upgrade && apt-get install -y libclang-dev pkg-config

# Prepare build plan
FROM chef AS planner
COPY ./Cargo.toml ./Cargo.lock ./
Expand All @@ -13,14 +10,18 @@ RUN cargo chef prepare
# Build application
FROM chef AS builder
COPY --from=planner /app/recipe.json .

# Install system dependencies
RUN apt-get update && \
apt-get install -y openssl libclang-dev libssl3 && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

RUN cargo chef cook --release
COPY . .
RUN cargo build --release

FROM debian:stable-slim AS runtime

WORKDIR /app

FROM chef AS final
COPY --from=builder /app/target/release/rollup-boost /usr/local/bin/

ENTRYPOINT ["/usr/local/bin/rollup-boost"]
15 changes: 9 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ use opentelemetry_sdk::trace::Config;
use opentelemetry_sdk::Resource;
use proxy::ProxyLayer;
use reth_rpc_layer::{AuthClientLayer, AuthClientService};
use server::{EngineApiServer, EthEngineApi};
use server::{EngineApiServer, EthEngineApi, HttpClientWrapper};
use std::sync::Arc;
use std::time::Duration;
use std::{net::SocketAddr, path::PathBuf};
use tracing::error;
use tracing::{info, Level};
Expand Down Expand Up @@ -191,15 +192,17 @@ async fn main() -> Result<()> {
fn create_client(
url: &str,
jwt_secret: JwtSecret,
) -> Result<HttpClient<AuthClientService<HttpBackend>>> {
) -> Result<HttpClientWrapper<HttpClient<AuthClientService<HttpBackend>>>> {
// Create a middleware that adds a new JWT token to every request.
let auth_layer = AuthClientLayer::new(jwt_secret);
let client_middleware = tower::ServiceBuilder::new().layer(auth_layer);

HttpClientBuilder::new()
let client = HttpClientBuilder::new()
.set_http_middleware(client_middleware)
.request_timeout(Duration::from_secs(10))
.build(url)
.map_err(|e| Error::InitRPCClient(e.to_string()))
.map_err(|e| Error::InitRPCClient(e.to_string()))?;
Ok(HttpClientWrapper::new(client, url.to_string()))
}

fn init_tracing(endpoint: &str) {
Expand Down Expand Up @@ -265,7 +268,7 @@ mod tests {
let secret = JwtSecret::from_hex(SECRET).unwrap();
let url = format!("http://{}:{}", AUTH_ADDR, AUTH_PORT);
let client = create_client(url.as_str(), secret);
let response = send_request(client.unwrap()).await;
let response = send_request(client.unwrap().client).await;
assert!(response.is_ok());
assert_eq!(response.unwrap(), "You are the dark lord");
}
Expand All @@ -274,7 +277,7 @@ mod tests {
let secret = JwtSecret::random();
let url = format!("http://{}:{}", AUTH_ADDR, AUTH_PORT);
let client = create_client(url.as_str(), secret);
let response = send_request(client.unwrap()).await;
let response = send_request(client.unwrap().client).await;
assert!(response.is_err());
assert!(matches!(
response.unwrap_err(),
Expand Down
72 changes: 49 additions & 23 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,18 @@ pub trait EthApi {
async fn send_raw_transaction(&self, bytes: Bytes) -> RpcResult<B256>;
}

pub struct EthEngineApi<C = HttpClient<AuthClientService<HttpBackend>>> {
pub struct HttpClientWrapper<C = HttpClient<AuthClientService<HttpBackend>>> {
pub client: C,
pub url: String,
}

impl<C> HttpClientWrapper<C> {
pub fn new(client: C, url: String) -> Self {
Self { client, url }
}
}

pub struct EthEngineApi<C = HttpClientWrapper> {
l2_client: Arc<C>,
builder_client: Arc<C>,
boost_sync: bool,
Expand All @@ -143,9 +154,9 @@ impl<C> EthEngineApi<C> {
}

#[async_trait]
impl<C> EthApiServer for EthEngineApi<C>
impl<C> EthApiServer for EthEngineApi<HttpClientWrapper<C>>
where
C: EthApiClient + Send + Sync + 'static,
C: EthApiClient + Send + Sync + Clone + 'static,
{
async fn send_raw_transaction(&self, bytes: Bytes) -> RpcResult<B256> {
debug!(
Expand All @@ -157,22 +168,25 @@ where
metrics.send_raw_tx_count.increment(1);
}

let builder = self.builder_client.clone();
let builder_client = self.builder_client.client.clone();
let url = self.builder_client.url.clone();
let tx_bytes = bytes.clone();
tokio::spawn(async move {
builder.send_raw_transaction(tx_bytes).await.map_err(|e| {
error!(message = "error calling send_raw_transaction for builder", "error" = %e);
builder_client.send_raw_transaction(tx_bytes).await.map_err(|e| {
error!(message = "error calling send_raw_transaction for builder", "url" = url, "error" = %e);
})
});

self.l2_client
.client
.send_raw_transaction(bytes)
.await
.map_err(|e| match e {
ClientError::Call(err) => err, // Already an ErrorObjectOwned, so just return it
other_error => {
error!(
message = "error calling send_raw_transaction for l2 client",
"url" = self.l2_client.url,
"error" = %other_error,
);
ErrorCode::InternalError.into()
Expand All @@ -182,9 +196,9 @@ where
}

#[async_trait]
impl<C> EngineApiServer for EthEngineApi<C>
impl<C> EngineApiServer for EthEngineApi<HttpClientWrapper<C>>
where
C: EngineApiClient + Send + Sync + 'static,
C: EngineApiClient + Send + Sync + Clone + 'static,
{
async fn fork_choice_updated_v3(
&self,
Expand Down Expand Up @@ -251,17 +265,18 @@ where
let builder = self.builder_client.clone();
let attr = payload_attributes.clone();
tokio::spawn(async move {
let _ = builder.fork_choice_updated_v3(fork_choice_state, attr).await.map(|response| {
let _ = builder.client.fork_choice_updated_v3(fork_choice_state, attr).await.map(|response| {
let payload_id_str = response.payload_id.map(|id| id.to_string()).unwrap_or_default();
if response.is_invalid() {
error!(message = "builder rejected fork_choice_updated_v3 with attributes", "payload_id" = payload_id_str, "validation_error" = %response.payload_status.status);
error!(message = "builder rejected fork_choice_updated_v3 with attributes", "url" = builder.url, "payload_id" = payload_id_str, "validation_error" = %response.payload_status.status);
} else {
info!(message = "called fork_choice_updated_v3 to builder with payload attributes", "payload_status" = %response.payload_status.status, "payload_id" = payload_id_str);
info!(message = "called fork_choice_updated_v3 to builder with payload attributes", "url" = builder.url, "payload_status" = %response.payload_status.status, "payload_id" = payload_id_str);
}
})
.map_err(|e| {
error!(
message = "error calling fork_choice_updated_v3 to builder",
"url" = builder.url,
"error" = %e,
"head_block_hash" = %fork_choice_state.head_block_hash
);
Expand All @@ -275,13 +290,15 @@ where
}

self.l2_client
.client
.fork_choice_updated_v3(fork_choice_state, payload_attributes)
.await
.map_err(|e| match e {
ClientError::Call(err) => err, // Already an ErrorObjectOwned, so just return it
other_error => {
error!(
message = "error calling fork_choice_updated_v3 for l2 client",
"url" = self.l2_client.url,
"error" = %other_error,
"head_block_hash" = %fork_choice_state.head_block_hash,
);
Expand All @@ -295,7 +312,7 @@ where
payload_id: PayloadId,
) -> RpcResult<OpExecutionPayloadEnvelopeV3> {
info!(message = "received get_payload_v3", "payload_id" = %payload_id);
let l2_client_future = self.l2_client.get_payload_v3(payload_id);
let l2_client_future = self.l2_client.client.get_payload_v3(payload_id);
let builder_client_future = Box::pin(async move {
if let Some(metrics) = &self.metrics {
metrics.get_payload_count.increment(1);
Expand All @@ -312,8 +329,8 @@ where
});

let builder = self.builder_client.clone();
let payload = builder.get_payload_v3(payload_id).await.map_err(|e| {
error!(message = "error calling get_payload_v3 from builder", "error" = %e, "payload_id" = %payload_id);
let payload = builder.client.get_payload_v3(payload_id).await.map_err(|e| {
error!(message = "error calling get_payload_v3 from builder", "url" = builder.url, "error" = %e, "payload_id" = %payload_id);
e
})?;

Expand All @@ -326,8 +343,8 @@ where
if let Some(metrics) = &self.metrics {
metrics.new_payload_count.increment(1);
}
let payload_status = self.l2_client.new_payload_v3(payload.execution_payload.clone(), vec![], payload.parent_beacon_block_root).await.map_err(|e| {
error!(message = "error calling new_payload_v3 to validate builder payload", "error" = %e, "payload_id" = %payload_id);
let payload_status = self.l2_client.client.new_payload_v3(payload.execution_payload.clone(), vec![], payload.parent_beacon_block_root).await.map_err(|e| {
error!(message = "error calling new_payload_v3 to validate builder payload", "url" = self.l2_client.url, "error" = %e, "payload_id" = %payload_id);
e
})?;
if let Some(mut s) = span {
Expand All @@ -340,7 +357,7 @@ where
}
};
if payload_status.is_invalid() {
error!(message = "builder payload was not valid", "payload_status" = %payload_status.status, "payload_id" = %payload_id);
error!(message = "builder payload was not valid", "url" = builder.url, "payload_status" = %payload_status.status, "payload_id" = %payload_id);
Err(ClientError::Call(ErrorObject::owned(
INVALID_REQUEST_CODE,
"Builder payload was not valid",
Expand All @@ -358,6 +375,7 @@ where
other_error => {
error!(
message = "error calling get_payload_v3",
"url" = self.builder_client.url,
"error" = %other_error,
"payload_id" = %payload_id
);
Expand Down Expand Up @@ -401,19 +419,20 @@ where
.remove_by_parent_hash(&parent_hash)
.await;

let builder = self.builder_client.clone();
let builder = self.builder_client.client.clone();
let builder_url = self.builder_client.url.clone();
let builder_payload = payload.clone();
let builder_versioned_hashes = versioned_hashes.clone();
tokio::spawn(async move {
let _ = builder.new_payload_v3(builder_payload, builder_versioned_hashes, parent_beacon_block_root).await
.map(|response: PayloadStatus| {
if response.is_invalid() {
error!(message = "builder rejected new_payload_v3", "block_hash" = %block_hash);
error!(message = "builder rejected new_payload_v3", "url" = builder_url, "block_hash" = %block_hash);
} else {
info!(message = "called new_payload_v3 to builder", "payload_status" = %response.status, "block_hash" = %block_hash);
info!(message = "called new_payload_v3 to builder", "url" = builder_url, "payload_status" = %response.status, "block_hash" = %block_hash);
}
}).map_err(|e| {
error!(message = "error calling new_payload_v3 to builder", "error" = %e, "block_hash" = %block_hash);
error!(message = "error calling new_payload_v3 to builder", "url" = builder_url, "error" = %e, "block_hash" = %block_hash);
e
});
if let Some(mut spans) = spans {
Expand All @@ -422,6 +441,7 @@ where
});
}
self.l2_client
.client
.new_payload_v3(payload, versioned_hashes, parent_beacon_block_root)
.await
.map_err(|e| match e {
Expand Down Expand Up @@ -536,8 +556,14 @@ mod tests {
.unwrap();

let eth_engine_api = EthEngineApi::new(
Arc::new(l2_client),
Arc::new(builder_client),
Arc::new(HttpClientWrapper::new(
l2_client,
format!("http://{L2_ADDR}"),
)),
Arc::new(HttpClientWrapper::new(
builder_client,
format!("http://{BUILDER_ADDR}"),
)),
boost_sync,
None,
);
Expand Down

0 comments on commit 9098595

Please sign in to comment.