Skip to content

Commit

Permalink
Merge branch 'eigen-client-extra-features' into eigen-client-small-ch…
Browse files Browse the repository at this point in the history
…anges
  • Loading branch information
gianbelinche committed Nov 28, 2024
2 parents 5187c05 + daace34 commit fd0e4bd
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 56 deletions.
42 changes: 31 additions & 11 deletions core/node/da_clients/src/eigen/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ impl EigenClient {
client: Arc::new(client),
})
}

pub async fn get_commitment(&self, blob_id: &str) -> anyhow::Result<String> {
let blob_info = self.client.get_inclusion_data(blob_id).await?;
Ok(blob_info)
}
}

#[async_trait]
Expand All @@ -51,7 +56,11 @@ impl DataAvailabilityClient for EigenClient {
}

async fn get_inclusion_data(&self, blob_id: &str) -> Result<Option<InclusionData>, DAError> {
let rlp_encoded_bytes = hex::decode(blob_id).map_err(|_| DAError {
let blob_info = self
.get_commitment(blob_id)
.await
.map_err(to_non_retriable_da_error)?;
let rlp_encoded_bytes = hex::decode(blob_info).map_err(|_| DAError {
error: anyhow!("Failed to decode blob_id"),
is_retriable: false,
})?;
Expand Down Expand Up @@ -113,8 +122,11 @@ mod tests {
let client = EigenClient::new(config, secrets).await.unwrap();
let data = vec![1; 20];
let result = client.dispatch_blob(0, data.clone()).await.unwrap();

let blob_info_str = client.get_commitment(&result.blob_id).await.unwrap();

let blob_info: BlobInfo =
rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap();
rlp::decode(&hex::decode(blob_info_str.clone()).unwrap()).unwrap();
let expected_inclusion_data = blob_info.blob_verification_proof.inclusion_proof;
let actual_inclusion_data = client
.get_inclusion_data(&result.blob_id)
Expand All @@ -123,7 +135,7 @@ mod tests {
.unwrap()
.data;
assert_eq!(expected_inclusion_data, actual_inclusion_data);
let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap();
let retrieved_data = client.get_blob_data(&blob_info_str).await.unwrap();
assert_eq!(retrieved_data.unwrap(), data);
}

Expand Down Expand Up @@ -152,8 +164,10 @@ mod tests {
let client = EigenClient::new(config, secrets).await.unwrap();
let data = vec![1; 20];
let result = client.dispatch_blob(0, data.clone()).await.unwrap();
let blob_info_str = client.get_commitment(&result.blob_id).await.unwrap();

let blob_info: BlobInfo =
rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap();
rlp::decode(&hex::decode(blob_info_str.clone()).unwrap()).unwrap();
let expected_inclusion_data = blob_info.blob_verification_proof.inclusion_proof;
let actual_inclusion_data = client
.get_inclusion_data(&result.blob_id)
Expand All @@ -162,7 +176,7 @@ mod tests {
.unwrap()
.data;
assert_eq!(expected_inclusion_data, actual_inclusion_data);
let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap();
let retrieved_data = client.get_blob_data(&blob_info_str).await.unwrap();
assert_eq!(retrieved_data.unwrap(), data);
}

Expand Down Expand Up @@ -191,8 +205,10 @@ mod tests {
let client = EigenClient::new(config, secrets).await.unwrap();
let data = vec![1; 20];
let result = client.dispatch_blob(0, data.clone()).await.unwrap();
let blob_info_str = client.get_commitment(&result.blob_id).await.unwrap();

let blob_info: BlobInfo =
rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap();
rlp::decode(&hex::decode(blob_info_str.clone()).unwrap()).unwrap();
let expected_inclusion_data = blob_info.blob_verification_proof.inclusion_proof;
let actual_inclusion_data = client
.get_inclusion_data(&result.blob_id)
Expand All @@ -201,7 +217,7 @@ mod tests {
.unwrap()
.data;
assert_eq!(expected_inclusion_data, actual_inclusion_data);
let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap();
let retrieved_data = client.get_blob_data(&blob_info_str).await.unwrap();
assert_eq!(retrieved_data.unwrap(), data);
}

Expand Down Expand Up @@ -230,8 +246,10 @@ mod tests {
let client = EigenClient::new(config, secrets).await.unwrap();
let data = vec![1; 20];
let result = client.dispatch_blob(0, data.clone()).await.unwrap();
let blob_info_str = client.get_commitment(&result.blob_id).await.unwrap();

let blob_info: BlobInfo =
rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap();
rlp::decode(&hex::decode(blob_info_str.clone()).unwrap()).unwrap();
let expected_inclusion_data = blob_info.blob_verification_proof.inclusion_proof;
let actual_inclusion_data = client
.get_inclusion_data(&result.blob_id)
Expand All @@ -240,7 +258,7 @@ mod tests {
.unwrap()
.data;
assert_eq!(expected_inclusion_data, actual_inclusion_data);
let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap();
let retrieved_data = client.get_blob_data(&blob_info_str).await.unwrap();
assert_eq!(retrieved_data.unwrap(), data);
}

Expand Down Expand Up @@ -269,8 +287,10 @@ mod tests {
let client = EigenClient::new(config, secrets).await.unwrap();
let data = vec![1; 20];
let result = client.dispatch_blob(0, data.clone()).await.unwrap();
let blob_info_str = client.get_commitment(&result.blob_id).await.unwrap();

let blob_info: BlobInfo =
rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap();
rlp::decode(&hex::decode(blob_info_str.clone()).unwrap()).unwrap();
let expected_inclusion_data = blob_info.blob_verification_proof.inclusion_proof;
let actual_inclusion_data = client
.get_inclusion_data(&result.blob_id)
Expand All @@ -279,7 +299,7 @@ mod tests {
.unwrap()
.data;
assert_eq!(expected_inclusion_data, actual_inclusion_data);
let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap();
let retrieved_data = client.get_blob_data(&blob_info_str).await.unwrap();
assert_eq!(retrieved_data.unwrap(), data);
}
}
54 changes: 19 additions & 35 deletions core/node/da_clients/src/eigen/sdk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use tonic::{
Streaming,
};
use zksync_config::EigenConfig;
#[cfg(test)]
use zksync_da_client::types::DAError;

use super::{
Expand All @@ -23,7 +22,7 @@ use crate::eigen::{
self,
authenticated_request::Payload::{AuthenticationData, DisperseRequest},
disperser_client::DisperserClient,
AuthenticatedReply, BlobAuthHeader, DisperseBlobReply,
AuthenticatedReply, BlobAuthHeader,
},
};

Expand All @@ -50,7 +49,6 @@ impl RawEigenClient {
.map_err(|e| anyhow::anyhow!("Failed to connect to Disperser server: {}", e))?;

let verifier_config = VerifierConfig {
verify_certs: true,
rpc_url: config.eigenda_eth_rpc.clone(),
svc_manager_addr: config.eigenda_svc_manager_address.clone(),
max_blob_size: Self::BLOB_SIZE_LIMIT as u32,
Expand Down Expand Up @@ -85,28 +83,7 @@ impl RawEigenClient {
let mut client_clone = self.client.clone();
let disperse_reply = client_clone.disperse_blob(request).await?.into_inner();

let disperse_time = Instant::now();
let blob_info = self
.await_for_inclusion(client_clone, disperse_reply)
.await?;
let disperse_elapsed = Instant::now() - disperse_time;

let blob_info = blob_info::BlobInfo::try_from(blob_info)
.map_err(|e| anyhow::anyhow!("Failed to convert blob info: {}", e))?;
self.verifier
.verify_commitment(blob_info.blob_header.commitment.clone(), data)
.map_err(|_| anyhow::anyhow!("Failed to verify commitment"))?;

self.perform_verification(blob_info.clone(), disperse_elapsed)
.await?;
let verification_proof = blob_info.blob_verification_proof.clone();
let blob_id = format!(
"{}:{}",
verification_proof.batch_id, verification_proof.blob_index
);
tracing::info!("Blob dispatch confirmed, blob id: {}", blob_id);

Ok(hex::encode(rlp::encode(&blob_info)))
Ok(hex::encode(disperse_reply.request_id))
}

async fn perform_verification(
Expand All @@ -132,7 +109,6 @@ impl RawEigenClient {
let mut client_clone = self.client.clone();
let (tx, rx) = mpsc::channel(Self::BUFFER_SIZE);

let disperse_time = Instant::now();
let response_stream = client_clone.disperse_blob_authenticated(ReceiverStream::new(rx));
let padded_data = convert_by_padding_empty_byte(&data);

Expand Down Expand Up @@ -161,18 +137,28 @@ impl RawEigenClient {
let disperser::authenticated_reply::Payload::DisperseReply(disperse_reply) = reply else {
return Err(anyhow::anyhow!("Unexpected response from server"));
};
Ok(hex::encode(disperse_reply.request_id))
}

// 5. poll for blob status until it reaches the Confirmed state
pub async fn get_inclusion_data(&self, blob_id: &str) -> anyhow::Result<String> {
let client_clone = self.client.clone();
let disperse_time = Instant::now();
let blob_info = self
.await_for_inclusion(client_clone, disperse_reply)
.await_for_inclusion(client_clone, blob_id.to_string())
.await?;

let blob_info = blob_info::BlobInfo::try_from(blob_info)
.map_err(|e| anyhow::anyhow!("Failed to convert blob info: {}", e))?;

let disperse_elapsed = Instant::now() - disperse_time;
let data = self
.get_blob_data(&hex::encode(rlp::encode(&blob_info)))
.await?;
if data.is_none() {
return Err(anyhow::anyhow!("Failed to get blob data"));
}
self.verifier
.verify_commitment(blob_info.blob_header.commitment.clone(), data)
.verify_commitment(blob_info.blob_header.commitment.clone(), data.unwrap())
.map_err(|_| anyhow::anyhow!("Failed to verify commitment"))?;

self.perform_verification(blob_info.clone(), disperse_elapsed)
Expand Down Expand Up @@ -271,10 +257,10 @@ impl RawEigenClient {
async fn await_for_inclusion(
&self,
client: DisperserClient<Channel>,
disperse_blob_reply: DisperseBlobReply,
request_id: String,
) -> anyhow::Result<DisperserBlobInfo> {
let polling_request = disperser::BlobStatusRequest {
request_id: disperse_blob_reply.request_id,
request_id: hex::decode(request_id)?,
};

let blob_info = (|| async {
Expand Down Expand Up @@ -324,14 +310,13 @@ impl RawEigenClient {
Ok(blob_info)
}

#[cfg(test)]
pub async fn get_blob_data(&self, blob_id: &str) -> anyhow::Result<Option<Vec<u8>>, DAError> {
pub async fn get_blob_data(&self, blob_info: &str) -> anyhow::Result<Option<Vec<u8>>, DAError> {
use anyhow::anyhow;
use zksync_da_client::types::DAError;

use crate::eigen::blob_info::BlobInfo;

let commit = hex::decode(blob_id).map_err(|_| DAError {
let commit = hex::decode(blob_info).map_err(|_| DAError {
error: anyhow!("Failed to decode blob_id"),
is_retriable: false,
})?;
Expand Down Expand Up @@ -404,7 +389,6 @@ fn convert_by_padding_empty_byte(data: &[u8]) -> Vec<u8> {
valid_data
}

#[cfg(test)]
fn remove_empty_byte_from_padded_bytes(data: &[u8]) -> Vec<u8> {
let parse_size = DATA_CHUNK_SIZE;

Expand Down
10 changes: 0 additions & 10 deletions core/node/da_clients/src/eigen/verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ pub enum VerificationError {
/// Configuration for the verifier used for authenticated dispersals
#[derive(Debug, Clone)]
pub struct VerifierConfig {
pub verify_certs: bool,
pub rpc_url: String,
pub svc_manager_addr: String,
pub max_blob_size: u32,
Expand Down Expand Up @@ -470,9 +469,6 @@ impl Verifier {

/// Verifies that the certificate is valid
pub async fn verify_certificate(&self, cert: BlobInfo) -> Result<(), VerificationError> {
if !self.cfg.verify_certs {
return Ok(());
}
self.verify_batch(cert.clone()).await?;
self.verify_merkle_proof(cert.clone())?;
self.verify_security_params(cert.clone()).await?;
Expand All @@ -490,7 +486,6 @@ mod test {
#[test]
fn test_verify_commitment() {
let verifier = super::Verifier::new(super::VerifierConfig {
verify_certs: true,
rpc_url: "https://ethereum-holesky-rpc.publicnode.com".to_string(),
svc_manager_addr: "0xD4A7E1Bd8015057293f0D0A557088c286942e84b".to_string(),
max_blob_size: 2 * 1024 * 1024,
Expand Down Expand Up @@ -519,7 +514,6 @@ mod test {
#[test]
fn test_verify_merkle_proof() {
let verifier = super::Verifier::new(super::VerifierConfig {
verify_certs: true,
rpc_url: "https://ethereum-holesky-rpc.publicnode.com".to_string(),
svc_manager_addr: "0xD4A7E1Bd8015057293f0D0A557088c286942e84b".to_string(),
max_blob_size: 2 * 1024 * 1024,
Expand Down Expand Up @@ -609,7 +603,6 @@ mod test {
#[test]
fn test_hash_blob_header() {
let verifier = super::Verifier::new(super::VerifierConfig {
verify_certs: true,
rpc_url: "https://ethereum-holesky-rpc.publicnode.com".to_string(),
svc_manager_addr: "0xD4A7E1Bd8015057293f0D0A557088c286942e84b".to_string(),
max_blob_size: 2 * 1024 * 1024,
Expand Down Expand Up @@ -655,7 +648,6 @@ mod test {
#[test]
fn test_inclusion_proof() {
let verifier = super::Verifier::new(super::VerifierConfig {
verify_certs: true,
rpc_url: "https://ethereum-holesky-rpc.publicnode.com".to_string(),
svc_manager_addr: "0xD4A7E1Bd8015057293f0D0A557088c286942e84b".to_string(),
max_blob_size: 2 * 1024 * 1024,
Expand Down Expand Up @@ -684,7 +676,6 @@ mod test {
#[tokio::test]
async fn test_verify_batch() {
let verifier = super::Verifier::new(super::VerifierConfig {
verify_certs: true,
rpc_url: "https://ethereum-holesky-rpc.publicnode.com".to_string(),
svc_manager_addr: "0xD4A7E1Bd8015057293f0D0A557088c286942e84b".to_string(),
max_blob_size: 2 * 1024 * 1024,
Expand Down Expand Up @@ -774,7 +765,6 @@ mod test {
#[tokio::test]
async fn test_verify_security_params() {
let verifier = super::Verifier::new(super::VerifierConfig {
verify_certs: true,
rpc_url: "https://ethereum-holesky-rpc.publicnode.com".to_string(),
svc_manager_addr: "0xD4A7E1Bd8015057293f0D0A557088c286942e84b".to_string(),
max_blob_size: 2 * 1024 * 1024,
Expand Down

0 comments on commit fd0e4bd

Please sign in to comment.