Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(eigen-client-extra-features): minimal client changes #362

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/node/da_clients/src/eigen/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use zksync_da_client::{
};

use super::{blob_info::BlobInfo, sdk::RawEigenClient};
use crate::utils::to_non_retriable_da_error;
use crate::utils::to_retriable_da_error;

/// EigenClient is a client for the Eigen DA service.
/// It can be configured to use one of two dispersal methods:
Expand Down Expand Up @@ -54,7 +54,7 @@ impl DataAvailabilityClient for EigenClient {
.client
.dispatch_blob(data)
.await
.map_err(to_non_retriable_da_error)?;
.map_err(to_retriable_da_error)?;

Ok(DispatchResponse::from(blob_id))
}
Expand Down
80 changes: 44 additions & 36 deletions core/node/da_clients/src/eigen/sdk.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::{str::FromStr, time::Duration};
use std::{str::FromStr, sync::Arc, time::Duration};

use backon::{ConstantBuilder, Retryable};
use secp256k1::{ecdsa::RecoverableSignature, SecretKey};
use tokio::{sync::mpsc, time::Instant};
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use tokio::{
sync::{mpsc, Mutex},
time::Instant,
};
use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt};
use tonic::{
transport::{Channel, ClientTlsConfig, Endpoint},
Streaming,
Expand All @@ -29,7 +32,7 @@ use crate::eigen::{

#[derive(Debug, Clone)]
pub(crate) struct RawEigenClient {
client: DisperserClient<Channel>,
client: Arc<Mutex<DisperserClient<Channel>>>,
private_key: SecretKey,
pub config: EigenConfig,
verifier: Verifier,
Expand All @@ -39,14 +42,14 @@ pub(crate) const DATA_CHUNK_SIZE: usize = 32;
pub(crate) const AVG_BLOCK_TIME: u64 = 12;

impl RawEigenClient {
pub(crate) const BUFFER_SIZE: usize = 1000;

pub async fn new(private_key: SecretKey, config: EigenConfig) -> anyhow::Result<Self> {
let endpoint =
Endpoint::from_str(config.disperser_rpc.as_str())?.tls_config(ClientTlsConfig::new())?;
let client = DisperserClient::connect(endpoint)
.await
.map_err(|e| anyhow::anyhow!("Failed to connect to Disperser server: {}", e))?;
let client = Arc::new(Mutex::new(
DisperserClient::connect(endpoint)
.await
.map_err(|e| anyhow::anyhow!("Failed to connect to Disperser server: {}", e))?,
));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why anyhow here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually unneeded, I have removed it!


let verifier_config = VerifierConfig {
verify_certs: true,
Expand Down Expand Up @@ -76,13 +79,16 @@ impl RawEigenClient {
account_id: String::default(), // Account Id is not used in non-authenticated mode
};

let mut client_clone = self.client.clone();
let disperse_reply = client_clone.disperse_blob(request).await?.into_inner();
let disperse_reply = self
.client
.lock()
.await
.disperse_blob(request)
.await?
.into_inner();

let disperse_time = Instant::now();
let blob_info = self
.await_for_inclusion(client_clone, disperse_reply)
.await?;
let blob_info = self.await_for_inclusion(disperse_reply).await?;
let disperse_elapsed = Instant::now() - disperse_time;

let blob_info = blob_info::BlobInfo::try_from(blob_info)
Expand Down Expand Up @@ -123,25 +129,29 @@ impl RawEigenClient {
}

async fn dispatch_blob_authenticated(&self, data: Vec<u8>) -> anyhow::Result<String> {
let mut client_clone = self.client.clone();
let (tx, rx) = mpsc::channel(Self::BUFFER_SIZE);
let (tx, rx) = mpsc::unbounded_channel();

let disperse_time = Instant::now();
let response_stream = client_clone.disperse_blob_authenticated(ReceiverStream::new(rx));
let padded_data = convert_by_padding_empty_byte(&data);

// 1. send DisperseBlobRequest
self.disperse_data(padded_data, &tx).await?;
let padded_data = convert_by_padding_empty_byte(&data);
self.disperse_data(padded_data, &tx)?;

// this await is blocked until the first response on the stream, so we only await after sending the `DisperseBlobRequest`
let mut response_stream = response_stream.await?.into_inner();
let mut response_stream = self
.client
.clone()
.lock()
.await
.disperse_blob_authenticated(UnboundedReceiverStream::new(rx))
.await?;
let response_stream = response_stream.get_mut();

// 2. receive BlobAuthHeader
let blob_auth_header = self.receive_blob_auth_header(&mut response_stream).await?;
let blob_auth_header = self.receive_blob_auth_header(response_stream).await?;

// 3. sign and send BlobAuthHeader
self.submit_authentication_data(blob_auth_header.clone(), &tx)
.await?;
self.submit_authentication_data(blob_auth_header.clone(), &tx)?;

// 4. receive DisperseBlobReply
let reply = response_stream
Expand All @@ -157,9 +167,7 @@ impl RawEigenClient {
};

// 5. poll for blob status until it reaches the Confirmed state
let blob_info = self
.await_for_inclusion(client_clone, disperse_reply)
.await?;
let blob_info = self.await_for_inclusion(disperse_reply).await?;

let blob_info = blob_info::BlobInfo::try_from(blob_info)
.map_err(|e| anyhow::anyhow!("Failed to convert blob info: {}", e))?;
Expand Down Expand Up @@ -189,10 +197,10 @@ impl RawEigenClient {
}
}

async fn disperse_data(
fn disperse_data(
&self,
data: Vec<u8>,
tx: &mpsc::Sender<disperser::AuthenticatedRequest>,
tx: &mpsc::UnboundedSender<disperser::AuthenticatedRequest>,
) -> anyhow::Result<()> {
let req = disperser::AuthenticatedRequest {
payload: Some(DisperseRequest(disperser::DisperseBlobRequest {
Expand All @@ -203,14 +211,13 @@ impl RawEigenClient {
};

tx.send(req)
.await
.map_err(|e| anyhow::anyhow!("Failed to send DisperseBlobRequest: {}", e))
}

async fn submit_authentication_data(
fn submit_authentication_data(
&self,
blob_auth_header: BlobAuthHeader,
tx: &mpsc::Sender<disperser::AuthenticatedRequest>,
tx: &mpsc::UnboundedSender<disperser::AuthenticatedRequest>,
) -> anyhow::Result<()> {
// TODO: replace challenge_parameter with actual auth header when it is available
let digest = zksync_basic_types::web3::keccak256(
Expand All @@ -234,7 +241,6 @@ impl RawEigenClient {
};

tx.send(req)
.await
.map_err(|e| anyhow::anyhow!("Failed to send AuthenticationData: {}", e))
}

Expand Down Expand Up @@ -264,16 +270,17 @@ impl RawEigenClient {

async fn await_for_inclusion(
&self,
client: DisperserClient<Channel>,
disperse_blob_reply: DisperseBlobReply,
) -> anyhow::Result<DisperserBlobInfo> {
let polling_request = disperser::BlobStatusRequest {
request_id: disperse_blob_reply.request_id,
};

let blob_info = (|| async {
let mut client_clone = client.clone();
let resp = client_clone
let resp = self
.client
.lock()
.await
.get_blob_status(polling_request.clone())
.await?
.into_inner();
Expand Down Expand Up @@ -340,7 +347,8 @@ impl RawEigenClient {
.batch_header_hash;
let get_response = self
.client
.clone()
.lock()
.await
.retrieve_blob(disperser::RetrieveBlobRequest {
batch_header_hash,
blob_index,
Expand Down
Loading