Skip to content

Commit

Permalink
feat(eigen-client-extra-features): minimal client changes (#362)
Browse files Browse the repository at this point in the history
* initial commit

* remove anyhow err

* Format code

* Fix compilation

---------

Co-authored-by: Gianbelinche <[email protected]>
  • Loading branch information
juan518munoz and gianbelinche authored Nov 28, 2024
1 parent 99c6d42 commit 163c41a
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 41 deletions.
6 changes: 3 additions & 3 deletions core/node/da_clients/src/eigen/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use zksync_da_client::{
};

use super::{blob_info::BlobInfo, sdk::RawEigenClient};
use crate::utils::to_non_retriable_da_error;
use crate::utils::to_retriable_da_error;

/// EigenClient is a client for the Eigen DA service.
/// It can be configured to use one of two dispersal methods:
Expand Down Expand Up @@ -50,7 +50,7 @@ impl DataAvailabilityClient for EigenClient {
.client
.dispatch_blob(data)
.await
.map_err(to_non_retriable_da_error)?;
.map_err(to_retriable_da_error)?;

Ok(DispatchResponse::from(blob_id))
}
Expand All @@ -59,7 +59,7 @@ impl DataAvailabilityClient for EigenClient {
let blob_info = self
.get_commitment(blob_id)
.await
.map_err(to_non_retriable_da_error)?;
.map_err(to_retriable_da_error)?;
let rlp_encoded_bytes = hex::decode(blob_info).map_err(|_| DAError {
error: anyhow!("Failed to decode blob_id"),
is_retriable: false,
Expand Down
78 changes: 40 additions & 38 deletions core/node/da_clients/src/eigen/sdk.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::{str::FromStr, time::Duration};
use std::{str::FromStr, sync::Arc, time::Duration};

use backon::{ConstantBuilder, Retryable};
use secp256k1::{ecdsa::RecoverableSignature, SecretKey};
use tokio::{sync::mpsc, time::Instant};
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use tokio::{
sync::{mpsc, Mutex},
time::Instant,
};
use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt};
use tonic::{
transport::{Channel, ClientTlsConfig, Endpoint},
Streaming,
Expand All @@ -28,7 +31,7 @@ use crate::eigen::{

#[derive(Debug, Clone)]
pub(crate) struct RawEigenClient {
client: DisperserClient<Channel>,
client: Arc<Mutex<DisperserClient<Channel>>>,
private_key: SecretKey,
pub config: EigenConfig,
verifier: Verifier,
Expand All @@ -38,15 +41,12 @@ pub(crate) const DATA_CHUNK_SIZE: usize = 32;
pub(crate) const AVG_BLOCK_TIME: u64 = 12;

impl RawEigenClient {
pub(crate) const BUFFER_SIZE: usize = 1000;
const BLOB_SIZE_LIMIT: usize = 1024 * 1024 * 2; // 2 MB

pub async fn new(private_key: SecretKey, config: EigenConfig) -> anyhow::Result<Self> {
let endpoint =
Endpoint::from_str(config.disperser_rpc.as_str())?.tls_config(ClientTlsConfig::new())?;
let client = DisperserClient::connect(endpoint)
.await
.map_err(|e| anyhow::anyhow!("Failed to connect to Disperser server: {}", e))?;
let client = Arc::new(Mutex::new(DisperserClient::connect(endpoint).await?));

let verifier_config = VerifierConfig {
rpc_url: config.eigenda_eth_rpc.clone(),
Expand Down Expand Up @@ -80,8 +80,13 @@ impl RawEigenClient {
account_id: String::default(), // Account Id is not used in non-authenticated mode
};

let mut client_clone = self.client.clone();
let disperse_reply = client_clone.disperse_blob(request).await?.into_inner();
let disperse_reply = self
.client
.lock()
.await
.disperse_blob(request)
.await?
.into_inner();

Ok(hex::encode(disperse_reply.request_id))
}
Expand All @@ -106,24 +111,27 @@ impl RawEigenClient {
}

async fn dispatch_blob_authenticated(&self, data: Vec<u8>) -> anyhow::Result<String> {
let mut client_clone = self.client.clone();
let (tx, rx) = mpsc::channel(Self::BUFFER_SIZE);

let response_stream = client_clone.disperse_blob_authenticated(ReceiverStream::new(rx));
let padded_data = convert_by_padding_empty_byte(&data);
let (tx, rx) = mpsc::unbounded_channel();

// 1. send DisperseBlobRequest
self.disperse_data(padded_data, &tx).await?;
let padded_data = convert_by_padding_empty_byte(&data);
self.disperse_data(padded_data, &tx)?;

// this await is blocked until the first response on the stream, so we only await after sending the `DisperseBlobRequest`
let mut response_stream = response_stream.await?.into_inner();
let mut response_stream = self
.client
.clone()
.lock()
.await
.disperse_blob_authenticated(UnboundedReceiverStream::new(rx))
.await?;
let response_stream = response_stream.get_mut();

// 2. receive BlobAuthHeader
let blob_auth_header = self.receive_blob_auth_header(&mut response_stream).await?;
let blob_auth_header = self.receive_blob_auth_header(response_stream).await?;

// 3. sign and send BlobAuthHeader
self.submit_authentication_data(blob_auth_header.clone(), &tx)
.await?;
self.submit_authentication_data(blob_auth_header.clone(), &tx)?;

// 4. receive DisperseBlobReply
let reply = response_stream
Expand All @@ -141,11 +149,8 @@ impl RawEigenClient {
}

pub async fn get_inclusion_data(&self, blob_id: &str) -> anyhow::Result<String> {
let client_clone = self.client.clone();
let disperse_time = Instant::now();
let blob_info = self
.await_for_inclusion(client_clone, blob_id.to_string())
.await?;
let blob_info = self.await_for_inclusion(blob_id.to_string()).await?;

let blob_info = blob_info::BlobInfo::try_from(blob_info)
.map_err(|e| anyhow::anyhow!("Failed to convert blob info: {}", e))?;
Expand Down Expand Up @@ -181,10 +186,10 @@ impl RawEigenClient {
}
}

async fn disperse_data(
fn disperse_data(
&self,
data: Vec<u8>,
tx: &mpsc::Sender<disperser::AuthenticatedRequest>,
tx: &mpsc::UnboundedSender<disperser::AuthenticatedRequest>,
) -> anyhow::Result<()> {
let req = disperser::AuthenticatedRequest {
payload: Some(DisperseRequest(disperser::DisperseBlobRequest {
Expand All @@ -195,14 +200,13 @@ impl RawEigenClient {
};

tx.send(req)
.await
.map_err(|e| anyhow::anyhow!("Failed to send DisperseBlobRequest: {}", e))
}

async fn submit_authentication_data(
fn submit_authentication_data(
&self,
blob_auth_header: BlobAuthHeader,
tx: &mpsc::Sender<disperser::AuthenticatedRequest>,
tx: &mpsc::UnboundedSender<disperser::AuthenticatedRequest>,
) -> anyhow::Result<()> {
// TODO: replace challenge_parameter with actual auth header when it is available
let digest = zksync_basic_types::web3::keccak256(
Expand All @@ -226,7 +230,6 @@ impl RawEigenClient {
};

tx.send(req)
.await
.map_err(|e| anyhow::anyhow!("Failed to send AuthenticationData: {}", e))
}

Expand Down Expand Up @@ -254,18 +257,16 @@ impl RawEigenClient {
}
}

async fn await_for_inclusion(
&self,
client: DisperserClient<Channel>,
request_id: String,
) -> anyhow::Result<DisperserBlobInfo> {
async fn await_for_inclusion(&self, request_id: String) -> anyhow::Result<DisperserBlobInfo> {
let polling_request = disperser::BlobStatusRequest {
request_id: hex::decode(request_id)?,
};

let blob_info = (|| async {
let mut client_clone = client.clone();
let resp = client_clone
let resp = self
.client
.lock()
.await
.get_blob_status(polling_request.clone())
.await?
.into_inner();
Expand Down Expand Up @@ -331,7 +332,8 @@ impl RawEigenClient {
.batch_header_hash;
let get_response = self
.client
.clone()
.lock()
.await
.retrieve_blob(disperser::RetrieveBlobRequest {
batch_header_hash,
blob_index,
Expand Down

0 comments on commit 163c41a

Please sign in to comment.