Skip to content

Commit

Permalink
Move inclusion logic
Browse files Browse the repository at this point in the history
  • Loading branch information
gianbelinche committed Nov 25, 2024
1 parent 99b153a commit cbab8d3
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 44 deletions.
42 changes: 31 additions & 11 deletions core/node/da_clients/src/eigen/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ impl EigenClient {
client: Arc::new(client),
})
}

pub async fn get_commitment(&self, blob_id: &str) -> anyhow::Result<String> {
let blob_info = self.client.get_inclusion_data(blob_id).await?;
Ok(blob_info)
}
}

#[async_trait]
Expand Down Expand Up @@ -60,7 +65,11 @@ impl DataAvailabilityClient for EigenClient {
}

async fn get_inclusion_data(&self, blob_id: &str) -> Result<Option<InclusionData>, DAError> {
let rlp_encoded_bytes = hex::decode(blob_id).map_err(|_| DAError {
let blob_info = self
.get_commitment(blob_id)
.await
.map_err(to_non_retriable_da_error)?;
let rlp_encoded_bytes = hex::decode(blob_info).map_err(|_| DAError {
error: anyhow!("Failed to decode blob_id"),
is_retriable: false,
})?;
Expand Down Expand Up @@ -123,8 +132,11 @@ mod tests {
let client = EigenClient::new(config, secrets).await.unwrap();
let data = vec![1; 20];
let result = client.dispatch_blob(0, data.clone()).await.unwrap();

let blob_info_str = client.get_commitment(&result.blob_id).await.unwrap();

let blob_info: BlobInfo =
rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap();
rlp::decode(&hex::decode(blob_info_str.clone()).unwrap()).unwrap();
let expected_inclusion_data = blob_info.blob_verification_proof.inclusion_proof;
let actual_inclusion_data = client
.get_inclusion_data(&result.blob_id)
Expand All @@ -133,7 +145,7 @@ mod tests {
.unwrap()
.data;
assert_eq!(expected_inclusion_data, actual_inclusion_data);
let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap();
let retrieved_data = client.get_blob_data(&blob_info_str).await.unwrap();
assert_eq!(retrieved_data.unwrap(), data);
}

Expand Down Expand Up @@ -163,8 +175,10 @@ mod tests {
let client = EigenClient::new(config, secrets).await.unwrap();
let data = vec![1; 20];
let result = client.dispatch_blob(0, data.clone()).await.unwrap();
let blob_info_str = client.get_commitment(&result.blob_id).await.unwrap();

let blob_info: BlobInfo =
rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap();
rlp::decode(&hex::decode(blob_info_str.clone()).unwrap()).unwrap();
let expected_inclusion_data = blob_info.blob_verification_proof.inclusion_proof;
let actual_inclusion_data = client
.get_inclusion_data(&result.blob_id)
Expand All @@ -173,7 +187,7 @@ mod tests {
.unwrap()
.data;
assert_eq!(expected_inclusion_data, actual_inclusion_data);
let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap();
let retrieved_data = client.get_blob_data(&blob_info_str).await.unwrap();
assert_eq!(retrieved_data.unwrap(), data);
}

Expand Down Expand Up @@ -203,8 +217,10 @@ mod tests {
let client = EigenClient::new(config, secrets).await.unwrap();
let data = vec![1; 20];
let result = client.dispatch_blob(0, data.clone()).await.unwrap();
let blob_info_str = client.get_commitment(&result.blob_id).await.unwrap();

let blob_info: BlobInfo =
rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap();
rlp::decode(&hex::decode(blob_info_str.clone()).unwrap()).unwrap();
let expected_inclusion_data = blob_info.blob_verification_proof.inclusion_proof;
let actual_inclusion_data = client
.get_inclusion_data(&result.blob_id)
Expand All @@ -213,7 +229,7 @@ mod tests {
.unwrap()
.data;
assert_eq!(expected_inclusion_data, actual_inclusion_data);
let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap();
let retrieved_data = client.get_blob_data(&blob_info_str).await.unwrap();
assert_eq!(retrieved_data.unwrap(), data);
}

Expand Down Expand Up @@ -277,8 +293,10 @@ mod tests {
let client = EigenClient::new(config, secrets).await.unwrap();
let data = vec![1; 20];
let result = client.dispatch_blob(0, data.clone()).await.unwrap();
let blob_info_str = client.get_commitment(&result.blob_id).await.unwrap();

let blob_info: BlobInfo =
rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap();
rlp::decode(&hex::decode(blob_info_str.clone()).unwrap()).unwrap();
let expected_inclusion_data = blob_info.blob_verification_proof.inclusion_proof;
let actual_inclusion_data = client
.get_inclusion_data(&result.blob_id)
Expand All @@ -287,7 +305,7 @@ mod tests {
.unwrap()
.data;
assert_eq!(expected_inclusion_data, actual_inclusion_data);
let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap();
let retrieved_data = client.get_blob_data(&blob_info_str).await.unwrap();
assert_eq!(retrieved_data.unwrap(), data);
}

Expand Down Expand Up @@ -317,8 +335,10 @@ mod tests {
let client = EigenClient::new(config, secrets).await.unwrap();
let data = vec![1; 20];
let result = client.dispatch_blob(0, data.clone()).await.unwrap();
let blob_info_str = client.get_commitment(&result.blob_id).await.unwrap();

let blob_info: BlobInfo =
rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap();
rlp::decode(&hex::decode(blob_info_str.clone()).unwrap()).unwrap();
let expected_inclusion_data = blob_info.blob_verification_proof.inclusion_proof;
let actual_inclusion_data = client
.get_inclusion_data(&result.blob_id)
Expand All @@ -327,7 +347,7 @@ mod tests {
.unwrap()
.data;
assert_eq!(expected_inclusion_data, actual_inclusion_data);
let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap();
let retrieved_data = client.get_blob_data(&blob_info_str).await.unwrap();
assert_eq!(retrieved_data.unwrap(), data);
}
}
51 changes: 18 additions & 33 deletions core/node/da_clients/src/eigen/sdk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use tonic::{
Streaming,
};
use zksync_config::EigenConfig;
#[cfg(test)]
use zksync_da_client::types::DAError;

use super::{
Expand Down Expand Up @@ -79,28 +78,7 @@ impl RawEigenClient {
let mut client_clone = self.client.clone();
let disperse_reply = client_clone.disperse_blob(request).await?.into_inner();

let disperse_time = Instant::now();
let blob_info = self
.await_for_inclusion(client_clone, disperse_reply)
.await?;
let disperse_elapsed = Instant::now() - disperse_time;

let blob_info = blob_info::BlobInfo::try_from(blob_info)
.map_err(|e| anyhow::anyhow!("Failed to convert blob info: {}", e))?;
self.verifier
.verify_commitment(blob_info.blob_header.commitment.clone(), data)
.map_err(|_| anyhow::anyhow!("Failed to verify commitment"))?;

self.loop_verify_certificate(blob_info.clone(), disperse_elapsed)
.await?;
let verification_proof = blob_info.blob_verification_proof.clone();
let blob_id = format!(
"{}:{}",
verification_proof.batch_id, verification_proof.blob_index
);
tracing::info!("Blob dispatch confirmed, blob id: {}", blob_id);

Ok(hex::encode(rlp::encode(&blob_info)))
Ok(hex::encode(disperse_reply.request_id))
}

async fn loop_verify_certificate(
Expand All @@ -126,7 +104,6 @@ impl RawEigenClient {
let mut client_clone = self.client.clone();
let (tx, rx) = mpsc::channel(Self::BUFFER_SIZE);

let disperse_time = Instant::now();
let response_stream = client_clone.disperse_blob_authenticated(ReceiverStream::new(rx));
let padded_data = convert_by_padding_empty_byte(&data);

Expand Down Expand Up @@ -155,18 +132,28 @@ impl RawEigenClient {
let disperser::authenticated_reply::Payload::DisperseReply(disperse_reply) = reply else {
return Err(anyhow::anyhow!("Unexpected response from server"));
};
Ok(hex::encode(disperse_reply.request_id))
}

// 5. poll for blob status until it reaches the Confirmed state
pub async fn get_inclusion_data(&self, blob_id: &str) -> anyhow::Result<String> {
let client_clone = self.client.clone();
let disperse_time = Instant::now();
let blob_info = self
.await_for_inclusion(client_clone, disperse_reply)
.await_for_inclusion(client_clone, blob_id.to_string())
.await?;

let blob_info = blob_info::BlobInfo::try_from(blob_info)
.map_err(|e| anyhow::anyhow!("Failed to convert blob info: {}", e))?;

let disperse_elapsed = Instant::now() - disperse_time;
let data = self
.get_blob_data(&hex::encode(rlp::encode(&blob_info)))
.await?;
if data.is_none() {
return Err(anyhow::anyhow!("Failed to get blob data"));
}
self.verifier
.verify_commitment(blob_info.blob_header.commitment.clone(), data)
.verify_commitment(blob_info.blob_header.commitment.clone(), data.unwrap())
.map_err(|_| anyhow::anyhow!("Failed to verify commitment"))?;

self.loop_verify_certificate(blob_info.clone(), disperse_elapsed)
Expand Down Expand Up @@ -265,10 +252,10 @@ impl RawEigenClient {
async fn await_for_inclusion(
&self,
client: DisperserClient<Channel>,
disperse_blob_reply: DisperseBlobReply,
request_id: String,
) -> anyhow::Result<DisperserBlobInfo> {
let polling_request = disperser::BlobStatusRequest {
request_id: disperse_blob_reply.request_id,
request_id: hex::decode(request_id)?,
};

let blob_info = (|| async {
Expand Down Expand Up @@ -318,14 +305,13 @@ impl RawEigenClient {
Ok(blob_info)
}

#[cfg(test)]
pub async fn get_blob_data(&self, blob_id: &str) -> anyhow::Result<Option<Vec<u8>>, DAError> {
pub async fn get_blob_data(&self, blob_info: &str) -> anyhow::Result<Option<Vec<u8>>, DAError> {
use anyhow::anyhow;
use zksync_da_client::types::DAError;

use crate::eigen::blob_info::BlobInfo;

let commit = hex::decode(blob_id).map_err(|_| DAError {
let commit = hex::decode(blob_info).map_err(|_| DAError {
error: anyhow!("Failed to decode blob_id"),
is_retriable: false,
})?;
Expand Down Expand Up @@ -398,7 +384,6 @@ fn convert_by_padding_empty_byte(data: &[u8]) -> Vec<u8> {
valid_data
}

#[cfg(test)]
fn remove_empty_byte_from_padded_bytes(data: &[u8]) -> Vec<u8> {
let parse_size = DATA_CHUNK_SIZE;

Expand Down

0 comments on commit cbab8d3

Please sign in to comment.