From f8a0b16a1b1abb4c5c9a96a3e44746ebc4a00153 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Wed, 27 Nov 2024 11:42:42 -0300 Subject: [PATCH 1/4] initial commit --- core/node/da_clients/src/eigen/client.rs | 4 +- core/node/da_clients/src/eigen/sdk.rs | 80 +++++++++++++----------- 2 files changed, 46 insertions(+), 38 deletions(-) diff --git a/core/node/da_clients/src/eigen/client.rs b/core/node/da_clients/src/eigen/client.rs index aa7b701998c6..c8fd45a7dd8b 100644 --- a/core/node/da_clients/src/eigen/client.rs +++ b/core/node/da_clients/src/eigen/client.rs @@ -11,7 +11,7 @@ use zksync_da_client::{ }; use super::{blob_info::BlobInfo, sdk::RawEigenClient}; -use crate::utils::to_non_retriable_da_error; +use crate::utils::to_retriable_da_error; /// EigenClient is a client for the Eigen DA service. /// It can be configured to use one of two dispersal methods: @@ -54,7 +54,7 @@ impl DataAvailabilityClient for EigenClient { .client .dispatch_blob(data) .await - .map_err(to_non_retriable_da_error)?; + .map_err(to_retriable_da_error)?; Ok(DispatchResponse::from(blob_id)) } diff --git a/core/node/da_clients/src/eigen/sdk.rs b/core/node/da_clients/src/eigen/sdk.rs index 6f250ba35862..0e50b73206dd 100644 --- a/core/node/da_clients/src/eigen/sdk.rs +++ b/core/node/da_clients/src/eigen/sdk.rs @@ -1,9 +1,12 @@ -use std::{str::FromStr, time::Duration}; +use std::{str::FromStr, sync::Arc, time::Duration}; use backon::{ConstantBuilder, Retryable}; use secp256k1::{ecdsa::RecoverableSignature, SecretKey}; -use tokio::{sync::mpsc, time::Instant}; -use tokio_stream::{wrappers::ReceiverStream, StreamExt}; +use tokio::{ + sync::{mpsc, Mutex}, + time::Instant, +}; +use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt}; use tonic::{ transport::{Channel, ClientTlsConfig, Endpoint}, Streaming, @@ -29,7 +32,7 @@ use crate::eigen::{ #[derive(Debug, Clone)] pub(crate) struct RawEigenClient { - client: DisperserClient, + client: Arc>>, private_key: SecretKey, pub config: EigenConfig, verifier: Verifier, @@ -39,14 +42,14 @@ pub(crate) const DATA_CHUNK_SIZE: usize = 32; pub(crate) const AVG_BLOCK_TIME: u64 = 12; impl RawEigenClient { - pub(crate) const BUFFER_SIZE: usize = 1000; - pub async fn new(private_key: SecretKey, config: EigenConfig) -> anyhow::Result { let endpoint = Endpoint::from_str(config.disperser_rpc.as_str())?.tls_config(ClientTlsConfig::new())?; - let client = DisperserClient::connect(endpoint) - .await - .map_err(|e| anyhow::anyhow!("Failed to connect to Disperser server: {}", e))?; + let client = Arc::new(Mutex::new( + DisperserClient::connect(endpoint) + .await + .map_err(|e| anyhow::anyhow!("Failed to connect to Disperser server: {}", e))?, + )); let verifier_config = VerifierConfig { verify_certs: true, @@ -76,13 +79,16 @@ impl RawEigenClient { account_id: String::default(), // Account Id is not used in non-authenticated mode }; - let mut client_clone = self.client.clone(); - let disperse_reply = client_clone.disperse_blob(request).await?.into_inner(); + let disperse_reply = self + .client + .lock() + .await + .disperse_blob(request) + .await? + .into_inner(); let disperse_time = Instant::now(); - let blob_info = self - .await_for_inclusion(client_clone, disperse_reply) - .await?; + let blob_info = self.await_for_inclusion(disperse_reply).await?; let disperse_elapsed = Instant::now() - disperse_time; let blob_info = blob_info::BlobInfo::try_from(blob_info) @@ -123,25 +129,29 @@ impl RawEigenClient { } async fn dispatch_blob_authenticated(&self, data: Vec) -> anyhow::Result { - let mut client_clone = self.client.clone(); - let (tx, rx) = mpsc::channel(Self::BUFFER_SIZE); + let (tx, rx) = mpsc::unbounded_channel(); let disperse_time = Instant::now(); - let response_stream = client_clone.disperse_blob_authenticated(ReceiverStream::new(rx)); - let padded_data = convert_by_padding_empty_byte(&data); // 1. send DisperseBlobRequest - self.disperse_data(padded_data, &tx).await?; + let padded_data = convert_by_padding_empty_byte(&data); + self.disperse_data(padded_data, &tx)?; // this await is blocked until the first response on the stream, so we only await after sending the `DisperseBlobRequest` - let mut response_stream = response_stream.await?.into_inner(); + let mut response_stream = self + .client + .clone() + .lock() + .await + .disperse_blob_authenticated(UnboundedReceiverStream::new(rx)) + .await?; + let response_stream = response_stream.get_mut(); // 2. receive BlobAuthHeader - let blob_auth_header = self.receive_blob_auth_header(&mut response_stream).await?; + let blob_auth_header = self.receive_blob_auth_header(response_stream).await?; // 3. sign and send BlobAuthHeader - self.submit_authentication_data(blob_auth_header.clone(), &tx) - .await?; + self.submit_authentication_data(blob_auth_header.clone(), &tx)?; // 4. receive DisperseBlobReply let reply = response_stream @@ -157,9 +167,7 @@ impl RawEigenClient { }; // 5. poll for blob status until it reaches the Confirmed state - let blob_info = self - .await_for_inclusion(client_clone, disperse_reply) - .await?; + let blob_info = self.await_for_inclusion(disperse_reply).await?; let blob_info = blob_info::BlobInfo::try_from(blob_info) .map_err(|e| anyhow::anyhow!("Failed to convert blob info: {}", e))?; @@ -189,10 +197,10 @@ impl RawEigenClient { } } - async fn disperse_data( + fn disperse_data( &self, data: Vec, - tx: &mpsc::Sender, + tx: &mpsc::UnboundedSender, ) -> anyhow::Result<()> { let req = disperser::AuthenticatedRequest { payload: Some(DisperseRequest(disperser::DisperseBlobRequest { @@ -203,14 +211,13 @@ impl RawEigenClient { }; tx.send(req) - .await .map_err(|e| anyhow::anyhow!("Failed to send DisperseBlobRequest: {}", e)) } - async fn submit_authentication_data( + fn submit_authentication_data( &self, blob_auth_header: BlobAuthHeader, - tx: &mpsc::Sender, + tx: &mpsc::UnboundedSender, ) -> anyhow::Result<()> { // TODO: replace challenge_parameter with actual auth header when it is available let digest = zksync_basic_types::web3::keccak256( @@ -234,7 +241,6 @@ impl RawEigenClient { }; tx.send(req) - .await .map_err(|e| anyhow::anyhow!("Failed to send AuthenticationData: {}", e)) } @@ -264,7 +270,6 @@ impl RawEigenClient { async fn await_for_inclusion( &self, - client: DisperserClient, disperse_blob_reply: DisperseBlobReply, ) -> anyhow::Result { let polling_request = disperser::BlobStatusRequest { @@ -272,8 +277,10 @@ impl RawEigenClient { }; let blob_info = (|| async { - let mut client_clone = client.clone(); - let resp = client_clone + let resp = self + .client + .lock() + .await .get_blob_status(polling_request.clone()) .await? .into_inner(); @@ -340,7 +347,8 @@ impl RawEigenClient { .batch_header_hash; let get_response = self .client - .clone() + .lock() + .await .retrieve_blob(disperser::RetrieveBlobRequest { batch_header_hash, blob_index, From cb0db56a3c3bc2d1d468f759dfafc6ce6f5c7236 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Wed, 27 Nov 2024 14:56:05 -0300 Subject: [PATCH 2/4] remove anyhow err --- core/node/da_clients/src/eigen/sdk.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/core/node/da_clients/src/eigen/sdk.rs b/core/node/da_clients/src/eigen/sdk.rs index 0e50b73206dd..efff76894382 100644 --- a/core/node/da_clients/src/eigen/sdk.rs +++ b/core/node/da_clients/src/eigen/sdk.rs @@ -45,11 +45,7 @@ impl RawEigenClient { pub async fn new(private_key: SecretKey, config: EigenConfig) -> anyhow::Result { let endpoint = Endpoint::from_str(config.disperser_rpc.as_str())?.tls_config(ClientTlsConfig::new())?; - let client = Arc::new(Mutex::new( - DisperserClient::connect(endpoint) - .await - .map_err(|e| anyhow::anyhow!("Failed to connect to Disperser server: {}", e))?, - )); + let client = Arc::new(Mutex::new(DisperserClient::connect(endpoint).await?)); let verifier_config = VerifierConfig { verify_certs: true, From d609a0ed6bf9974ee4a0564006e2ffc9833fc0f0 Mon Sep 17 00:00:00 2001 From: Gianbelinche <39842759+gianbelinche@users.noreply.github.com> Date: Thu, 28 Nov 2024 16:02:56 -0300 Subject: [PATCH 3/4] Format code --- core/node/da_clients/src/eigen/sdk.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/core/node/da_clients/src/eigen/sdk.rs b/core/node/da_clients/src/eigen/sdk.rs index 56d186de9d29..208a2f138c7e 100644 --- a/core/node/da_clients/src/eigen/sdk.rs +++ b/core/node/da_clients/src/eigen/sdk.rs @@ -150,9 +150,7 @@ impl RawEigenClient { pub async fn get_inclusion_data(&self, blob_id: &str) -> anyhow::Result { let disperse_time = Instant::now(); - let blob_info = self - .await_for_inclusion(blob_id.to_string()) - .await?; + let blob_info = self.await_for_inclusion(blob_id.to_string()).await?; let blob_info = blob_info::BlobInfo::try_from(blob_info) .map_err(|e| anyhow::anyhow!("Failed to convert blob info: {}", e))?; @@ -259,10 +257,7 @@ impl RawEigenClient { } } - async fn await_for_inclusion( - &self, - request_id: String, - ) -> anyhow::Result { + async fn await_for_inclusion(&self, request_id: String) -> anyhow::Result { let polling_request = disperser::BlobStatusRequest { request_id: hex::decode(request_id)?, }; From b26be02aee5e68ee315c36ef93e6f7cbb6a66768 Mon Sep 17 00:00:00 2001 From: Gianbelinche <39842759+gianbelinche@users.noreply.github.com> Date: Thu, 28 Nov 2024 16:06:03 -0300 Subject: [PATCH 4/4] Fix compilation --- core/node/da_clients/src/eigen/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/node/da_clients/src/eigen/client.rs b/core/node/da_clients/src/eigen/client.rs index 22480c59dcdc..1066065adc10 100644 --- a/core/node/da_clients/src/eigen/client.rs +++ b/core/node/da_clients/src/eigen/client.rs @@ -59,7 +59,7 @@ impl DataAvailabilityClient for EigenClient { let blob_info = self .get_commitment(blob_id) .await - .map_err(to_non_retriable_da_error)?; + .map_err(to_retriable_da_error)?; let rlp_encoded_bytes = hex::decode(blob_info).map_err(|_| DAError { error: anyhow!("Failed to decode blob_id"), is_retriable: false,