diff --git a/core/node/da_clients/src/eigen/client.rs b/core/node/da_clients/src/eigen/client.rs index aa7b701998c6..a97c6feb3293 100644 --- a/core/node/da_clients/src/eigen/client.rs +++ b/core/node/da_clients/src/eigen/client.rs @@ -32,6 +32,11 @@ impl EigenClient { client: Arc::new(client), }) } + + pub async fn get_commitment(&self, blob_id: &str) -> anyhow::Result { + let blob_info = self.client.get_inclusion_data(blob_id).await?; + Ok(blob_info) + } } #[async_trait] @@ -60,7 +65,11 @@ impl DataAvailabilityClient for EigenClient { } async fn get_inclusion_data(&self, blob_id: &str) -> Result, DAError> { - let rlp_encoded_bytes = hex::decode(blob_id).map_err(|_| DAError { + let blob_info = self + .get_commitment(blob_id) + .await + .map_err(to_non_retriable_da_error)?; + let rlp_encoded_bytes = hex::decode(blob_info).map_err(|_| DAError { error: anyhow!("Failed to decode blob_id"), is_retriable: false, })?; @@ -123,8 +132,11 @@ mod tests { let client = EigenClient::new(config, secrets).await.unwrap(); let data = vec![1; 20]; let result = client.dispatch_blob(0, data.clone()).await.unwrap(); + + let blob_info_str = client.get_commitment(&result.blob_id).await.unwrap(); + let blob_info: BlobInfo = - rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap(); + rlp::decode(&hex::decode(blob_info_str.clone()).unwrap()).unwrap(); let expected_inclusion_data = blob_info.blob_verification_proof.inclusion_proof; let actual_inclusion_data = client .get_inclusion_data(&result.blob_id) @@ -133,7 +145,7 @@ mod tests { .unwrap() .data; assert_eq!(expected_inclusion_data, actual_inclusion_data); - let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap(); + let retrieved_data = client.get_blob_data(&blob_info_str).await.unwrap(); assert_eq!(retrieved_data.unwrap(), data); } @@ -163,8 +175,10 @@ mod tests { let client = EigenClient::new(config, secrets).await.unwrap(); let data = vec![1; 20]; let result = client.dispatch_blob(0, data.clone()).await.unwrap(); + let blob_info_str = client.get_commitment(&result.blob_id).await.unwrap(); + let blob_info: BlobInfo = - rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap(); + rlp::decode(&hex::decode(blob_info_str.clone()).unwrap()).unwrap(); let expected_inclusion_data = blob_info.blob_verification_proof.inclusion_proof; let actual_inclusion_data = client .get_inclusion_data(&result.blob_id) @@ -173,7 +187,7 @@ mod tests { .unwrap() .data; assert_eq!(expected_inclusion_data, actual_inclusion_data); - let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap(); + let retrieved_data = client.get_blob_data(&blob_info_str).await.unwrap(); assert_eq!(retrieved_data.unwrap(), data); } @@ -203,8 +217,10 @@ mod tests { let client = EigenClient::new(config, secrets).await.unwrap(); let data = vec![1; 20]; let result = client.dispatch_blob(0, data.clone()).await.unwrap(); + let blob_info_str = client.get_commitment(&result.blob_id).await.unwrap(); + let blob_info: BlobInfo = - rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap(); + rlp::decode(&hex::decode(blob_info_str.clone()).unwrap()).unwrap(); let expected_inclusion_data = blob_info.blob_verification_proof.inclusion_proof; let actual_inclusion_data = client .get_inclusion_data(&result.blob_id) @@ -213,7 +229,7 @@ mod tests { .unwrap() .data; assert_eq!(expected_inclusion_data, actual_inclusion_data); - let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap(); + let retrieved_data = client.get_blob_data(&blob_info_str).await.unwrap(); assert_eq!(retrieved_data.unwrap(), data); } @@ -277,8 +293,10 @@ mod tests { let client = EigenClient::new(config, secrets).await.unwrap(); let data = vec![1; 20]; let result = client.dispatch_blob(0, data.clone()).await.unwrap(); + let blob_info_str = client.get_commitment(&result.blob_id).await.unwrap(); + let blob_info: BlobInfo = - rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap(); + rlp::decode(&hex::decode(blob_info_str.clone()).unwrap()).unwrap(); let expected_inclusion_data = blob_info.blob_verification_proof.inclusion_proof; let actual_inclusion_data = client .get_inclusion_data(&result.blob_id) @@ -287,7 +305,7 @@ mod tests { .unwrap() .data; assert_eq!(expected_inclusion_data, actual_inclusion_data); - let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap(); + let retrieved_data = client.get_blob_data(&blob_info_str).await.unwrap(); assert_eq!(retrieved_data.unwrap(), data); } @@ -317,8 +335,10 @@ mod tests { let client = EigenClient::new(config, secrets).await.unwrap(); let data = vec![1; 20]; let result = client.dispatch_blob(0, data.clone()).await.unwrap(); + let blob_info_str = client.get_commitment(&result.blob_id).await.unwrap(); + let blob_info: BlobInfo = - rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap(); + rlp::decode(&hex::decode(blob_info_str.clone()).unwrap()).unwrap(); let expected_inclusion_data = blob_info.blob_verification_proof.inclusion_proof; let actual_inclusion_data = client .get_inclusion_data(&result.blob_id) @@ -327,7 +347,7 @@ mod tests { .unwrap() .data; assert_eq!(expected_inclusion_data, actual_inclusion_data); - let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap(); + let retrieved_data = client.get_blob_data(&blob_info_str).await.unwrap(); assert_eq!(retrieved_data.unwrap(), data); } } diff --git a/core/node/da_clients/src/eigen/sdk.rs b/core/node/da_clients/src/eigen/sdk.rs index 6f250ba35862..36dfd4fc498e 100644 --- a/core/node/da_clients/src/eigen/sdk.rs +++ b/core/node/da_clients/src/eigen/sdk.rs @@ -9,7 +9,6 @@ use tonic::{ Streaming, }; use zksync_config::EigenConfig; -#[cfg(test)] use zksync_da_client::types::DAError; use super::{ @@ -79,28 +78,7 @@ impl RawEigenClient { let mut client_clone = self.client.clone(); let disperse_reply = client_clone.disperse_blob(request).await?.into_inner(); - let disperse_time = Instant::now(); - let blob_info = self - .await_for_inclusion(client_clone, disperse_reply) - .await?; - let disperse_elapsed = Instant::now() - disperse_time; - - let blob_info = blob_info::BlobInfo::try_from(blob_info) - .map_err(|e| anyhow::anyhow!("Failed to convert blob info: {}", e))?; - self.verifier - .verify_commitment(blob_info.blob_header.commitment.clone(), data) - .map_err(|_| anyhow::anyhow!("Failed to verify commitment"))?; - - self.loop_verify_certificate(blob_info.clone(), disperse_elapsed) - .await?; - let verification_proof = blob_info.blob_verification_proof.clone(); - let blob_id = format!( - "{}:{}", - verification_proof.batch_id, verification_proof.blob_index - ); - tracing::info!("Blob dispatch confirmed, blob id: {}", blob_id); - - Ok(hex::encode(rlp::encode(&blob_info))) + Ok(hex::encode(disperse_reply.request_id)) } async fn loop_verify_certificate( @@ -126,7 +104,6 @@ impl RawEigenClient { let mut client_clone = self.client.clone(); let (tx, rx) = mpsc::channel(Self::BUFFER_SIZE); - let disperse_time = Instant::now(); let response_stream = client_clone.disperse_blob_authenticated(ReceiverStream::new(rx)); let padded_data = convert_by_padding_empty_byte(&data); @@ -155,18 +132,28 @@ impl RawEigenClient { let disperser::authenticated_reply::Payload::DisperseReply(disperse_reply) = reply else { return Err(anyhow::anyhow!("Unexpected response from server")); }; + Ok(hex::encode(disperse_reply.request_id)) + } - // 5. poll for blob status until it reaches the Confirmed state + pub async fn get_inclusion_data(&self, blob_id: &str) -> anyhow::Result { + let client_clone = self.client.clone(); + let disperse_time = Instant::now(); let blob_info = self - .await_for_inclusion(client_clone, disperse_reply) + .await_for_inclusion(client_clone, blob_id.to_string()) .await?; let blob_info = blob_info::BlobInfo::try_from(blob_info) .map_err(|e| anyhow::anyhow!("Failed to convert blob info: {}", e))?; let disperse_elapsed = Instant::now() - disperse_time; + let data = self + .get_blob_data(&hex::encode(rlp::encode(&blob_info))) + .await?; + if data.is_none() { + return Err(anyhow::anyhow!("Failed to get blob data")); + } self.verifier - .verify_commitment(blob_info.blob_header.commitment.clone(), data) + .verify_commitment(blob_info.blob_header.commitment.clone(), data.unwrap()) .map_err(|_| anyhow::anyhow!("Failed to verify commitment"))?; self.loop_verify_certificate(blob_info.clone(), disperse_elapsed) @@ -265,10 +252,10 @@ impl RawEigenClient { async fn await_for_inclusion( &self, client: DisperserClient, - disperse_blob_reply: DisperseBlobReply, + request_id: String, ) -> anyhow::Result { let polling_request = disperser::BlobStatusRequest { - request_id: disperse_blob_reply.request_id, + request_id: hex::decode(request_id)?, }; let blob_info = (|| async { @@ -318,14 +305,13 @@ impl RawEigenClient { Ok(blob_info) } - #[cfg(test)] - pub async fn get_blob_data(&self, blob_id: &str) -> anyhow::Result>, DAError> { + pub async fn get_blob_data(&self, blob_info: &str) -> anyhow::Result>, DAError> { use anyhow::anyhow; use zksync_da_client::types::DAError; use crate::eigen::blob_info::BlobInfo; - let commit = hex::decode(blob_id).map_err(|_| DAError { + let commit = hex::decode(blob_info).map_err(|_| DAError { error: anyhow!("Failed to decode blob_id"), is_retriable: false, })?; @@ -398,7 +384,6 @@ fn convert_by_padding_empty_byte(data: &[u8]) -> Vec { valid_data } -#[cfg(test)] fn remove_empty_byte_from_padded_bytes(data: &[u8]) -> Vec { let parse_size = DATA_CHUNK_SIZE;