From 226834fa9995fd39a2f5775396ceda6a4b097cb6 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Thu, 31 Oct 2024 17:36:14 -0300 Subject: [PATCH 01/16] initial commit --- .../lib/config/src/configs/da_client/eigen.rs | 1 + core/lib/env_config/src/da_client.rs | 2 ++ core/lib/protobuf_config/src/da_client.rs | 3 ++ .../src/proto/config/da_client.proto | 1 + core/node/da_clients/src/eigen/client.rs | 1 + core/node/da_clients/src/eigen/sdk.rs | 35 ++++++++++++++++++- eigenda-integration.md | 1 - 7 files changed, 42 insertions(+), 2 deletions(-) diff --git a/core/lib/config/src/configs/da_client/eigen.rs b/core/lib/config/src/configs/da_client/eigen.rs index f2c05a0f61ef..8460c1beb171 100644 --- a/core/lib/config/src/configs/da_client/eigen.rs +++ b/core/lib/config/src/configs/da_client/eigen.rs @@ -5,6 +5,7 @@ use zksync_basic_types::secrets::PrivateKey; pub struct EigenConfig { pub rpc_node_url: String, pub inclusion_polling_interval_ms: u64, + pub authenticated_dispersal: bool, } #[derive(Clone, Debug, PartialEq)] diff --git a/core/lib/env_config/src/da_client.rs b/core/lib/env_config/src/da_client.rs index 8ceeb215faf4..40eaad074da7 100644 --- a/core/lib/env_config/src/da_client.rs +++ b/core/lib/env_config/src/da_client.rs @@ -250,6 +250,7 @@ mod tests { DA_CLIENT="Eigen" DA_RPC_NODE_URL="localhost:12345" DA_INCLUSION_POLLING_INTERVAL_MS="1000" + DA_AUTHENTICATED_DISPERSAL="true" "#; lock.set_env(config); @@ -259,6 +260,7 @@ mod tests { DAClientConfig::Eigen(EigenConfig { rpc_node_url: "localhost:12345".to_string(), inclusion_polling_interval_ms: 1000, + authenticated_dispersal: true, }) ); } diff --git a/core/lib/protobuf_config/src/da_client.rs b/core/lib/protobuf_config/src/da_client.rs index 341a6a9e4f43..5cd8fccb9af3 100644 --- a/core/lib/protobuf_config/src/da_client.rs +++ b/core/lib/protobuf_config/src/da_client.rs @@ -58,6 +58,8 @@ impl ProtoRepr for proto::DataAvailabilityClient { .clone(), inclusion_polling_interval_ms: *required(&conf.inclusion_polling_interval_ms) .context("inclusion_polling_interval_ms")?, + authenticated_dispersal: *required(&conf.authenticated_dispersal) + .context("authenticated_dispersal")?, }), proto::data_availability_client::Config::ObjectStore(conf) => { ObjectStore(object_store_proto::ObjectStore::read(conf)?) @@ -98,6 +100,7 @@ impl ProtoRepr for proto::DataAvailabilityClient { Eigen(config) => proto::data_availability_client::Config::Eigen(proto::EigenConfig { rpc_node_url: Some(config.rpc_node_url.clone()), inclusion_polling_interval_ms: Some(config.inclusion_polling_interval_ms), + authenticated_dispersal: Some(config.authenticated_dispersal), }), ObjectStore(config) => proto::data_availability_client::Config::ObjectStore( object_store_proto::ObjectStore::build(config), diff --git a/core/lib/protobuf_config/src/proto/config/da_client.proto b/core/lib/protobuf_config/src/proto/config/da_client.proto index 0a302120d775..303a5c4cf12a 100644 --- a/core/lib/protobuf_config/src/proto/config/da_client.proto +++ b/core/lib/protobuf_config/src/proto/config/da_client.proto @@ -39,6 +39,7 @@ message CelestiaConfig { message EigenConfig { optional string rpc_node_url = 1; optional uint64 inclusion_polling_interval_ms = 2; + optional bool authenticated_dispersal = 3; } message DataAvailabilityClient { diff --git a/core/node/da_clients/src/eigen/client.rs b/core/node/da_clients/src/eigen/client.rs index d977620526aa..ccd85aac4256 100644 --- a/core/node/da_clients/src/eigen/client.rs +++ b/core/node/da_clients/src/eigen/client.rs @@ -28,6 +28,7 @@ impl EigenClient { config.rpc_node_url, config.inclusion_polling_interval_ms, private_key, + config.authenticated_dispersal, ) .await?, ), diff --git a/core/node/da_clients/src/eigen/sdk.rs b/core/node/da_clients/src/eigen/sdk.rs index 7ab7ea3ce33b..f288781eff4a 100644 --- a/core/node/da_clients/src/eigen/sdk.rs +++ b/core/node/da_clients/src/eigen/sdk.rs @@ -23,6 +23,7 @@ pub struct RawEigenClient { polling_interval: Duration, private_key: SecretKey, account_id: String, + authenticated_dispersal: bool, } pub(crate) const DATA_CHUNK_SIZE: usize = 32; @@ -34,6 +35,7 @@ impl RawEigenClient { rpc_node_url: String, inclusion_polling_interval_ms: u64, private_key: SecretKey, + authenticated_dispersal: bool, ) -> anyhow::Result { let endpoint = Endpoint::from_str(rpc_node_url.as_str())?.tls_config(ClientTlsConfig::new())?; @@ -49,10 +51,34 @@ impl RawEigenClient { polling_interval, private_key, account_id, + authenticated_dispersal, }) } - pub async fn dispatch_blob(&self, data: Vec) -> anyhow::Result { + async fn dispatch_blob_non_authenticated(&self, data: Vec) -> anyhow::Result { + let padded_data = convert_by_padding_empty_byte(&data); + let request = disperser::DisperseBlobRequest { + data: padded_data, + custom_quorum_numbers: vec![], + account_id: String::default(), // Account Id is not used in non-authenticated mode + }; + + let mut client_clone = self.client.clone(); + let disperse_reply = client_clone.disperse_blob(request).await?.into_inner(); + + let verification_proof = self + .await_for_inclusion(client_clone, disperse_reply) + .await?; + let blob_id = format!( + "{}:{}", + verification_proof.batch_id, verification_proof.blob_index + ); + tracing::info!("Blob dispatch confirmed, blob id: {}", blob_id); + + Ok(blob_id) + } + + async fn dispatch_blob_authenticated(&self, data: Vec) -> anyhow::Result { let mut client_clone = self.client.clone(); let (tx, rx) = mpsc::channel(Self::BUFFER_SIZE); @@ -98,6 +124,13 @@ impl RawEigenClient { Ok(blob_id) } + pub async fn dispatch_blob(&self, data: Vec) -> anyhow::Result { + match self.authenticated_dispersal { + true => self.dispatch_blob_authenticated(data).await, + false => self.dispatch_blob_non_authenticated(data).await, + } + } + async fn disperse_data( &self, data: Vec, diff --git a/eigenda-integration.md b/eigenda-integration.md index 5151fdd8c075..7cfe0be1e66c 100644 --- a/eigenda-integration.md +++ b/eigenda-integration.md @@ -226,4 +226,3 @@ Note that: ```bash git submodule update --init --recursive && zkstack dev contracts ``` - From 7f5d01e498b93b8de45574bec200e45e7075a791 Mon Sep 17 00:00:00 2001 From: Gianbelinche <39842759+gianbelinche@users.noreply.github.com> Date: Fri, 1 Nov 2024 13:08:26 -0300 Subject: [PATCH 02/16] Add tests --- Cargo.lock | 9 +++ core/node/da_clients/Cargo.toml | 1 + core/node/da_clients/src/eigen/client.rs | 77 +++++++++++++++++++++++- core/node/da_clients/src/eigen/sdk.rs | 59 +++++++++++++++++- 4 files changed, 143 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 814b02df2b81..4db2026ff983 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4522,6 +4522,14 @@ dependencies = [ "log", ] +[[package]] +name = "kzgpad-rs" +version = "0.1.0" +source = "git+https://github.com/Layr-Labs/kzgpad-rs.git?tag=v0.1.0#b5f8c8d3d6482407dc118cb1f51597a017a1cc89" +dependencies = [ + "rand 0.8.5", +] + [[package]] name = "lalrpop" version = "0.20.2" @@ -10930,6 +10938,7 @@ dependencies = [ "hex", "http 1.1.0", "jsonrpsee 0.23.2", + "kzgpad-rs", "parity-scale-codec", "pbjson-types", "prost 0.12.6", diff --git a/core/node/da_clients/Cargo.toml b/core/node/da_clients/Cargo.toml index 84f528a5df17..317a889bd8d3 100644 --- a/core/node/da_clients/Cargo.toml +++ b/core/node/da_clients/Cargo.toml @@ -56,3 +56,4 @@ pbjson-types.workspace = true # Eigen dependencies tokio-stream.workspace = true rlp.workspace = true +kzgpad-rs = { git = "https://github.com/Layr-Labs/kzgpad-rs.git", tag = "v0.1.0" } diff --git a/core/node/da_clients/src/eigen/client.rs b/core/node/da_clients/src/eigen/client.rs index 2528e264b111..deeeea624189 100644 --- a/core/node/da_clients/src/eigen/client.rs +++ b/core/node/da_clients/src/eigen/client.rs @@ -29,7 +29,7 @@ impl EigenClient { config.disperser_rpc, config.status_query_interval, private_key, - config.authenticated_dispersal, + config.authenticaded, ) .await?; Ok(EigenClient { @@ -71,3 +71,78 @@ impl DataAvailabilityClient for EigenClient { Some(1920 * 1024) // 2mb - 128kb as a buffer } } + +#[cfg(test)] +impl EigenClient { + pub async fn get_blob_data(&self, blob_id: &str) -> anyhow::Result>, DAError> { + self.client.get_blob_data(blob_id).await + /*match &self.disperser { + Disperser::Remote(remote_client) => remote_client.get_blob_data(blob_id).await, + Disperser::Memory(memstore) => memstore.clone().get_blob_data(blob_id).await, + }*/ + } +} +#[cfg(test)] +mod tests { + use zksync_config::configs::da_client::eigen::DisperserConfig; + use zksync_types::secrets::PrivateKey; + + use super::*; + use crate::eigen::blob_info::BlobInfo; + #[tokio::test] + async fn test_non_auth_dispersal() { + let config = EigenConfig::Disperser(DisperserConfig { + custom_quorum_numbers: None, + disperser_rpc: "https://disperser-holesky.eigenda.xyz:443".to_string(), + eth_confirmation_depth: -1, + eigenda_eth_rpc: String::default(), + eigenda_svc_manager_address: "0xD4A7E1Bd8015057293f0D0A557088c286942e84b".to_string(), + blob_size_limit: 2 * 1024 * 1024, // 2MB + status_query_timeout: 1800, // 30 minutes + status_query_interval: 5, // 5 seconds + wait_for_finalization: false, + authenticaded: false, + }); + let secrets = EigenSecrets { + private_key: PrivateKey::from_str( + "d08aa7ae1bb5ddd46c3c2d8cdb5894ab9f54dec467233686ca42629e826ac4c6", + ) + .unwrap(), + }; + let client = EigenClient::new(config, secrets).await.unwrap(); + let data = vec![1; 20]; + let result = client.dispatch_blob(0, data).await.unwrap(); + let blob_info: BlobInfo = + rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap(); + // TODO: once rlp encoding is added to the client, we can check the contents of the blob_id + assert!(result.blob_id.len() > 0); + } + #[tokio::test] + async fn test_auth_dispersal() { + let config = EigenConfig::Disperser(DisperserConfig { + custom_quorum_numbers: None, + disperser_rpc: "https://disperser-holesky.eigenda.xyz:443".to_string(), + eth_confirmation_depth: -1, + eigenda_eth_rpc: String::default(), + eigenda_svc_manager_address: "0xD4A7E1Bd8015057293f0D0A557088c286942e84b".to_string(), + blob_size_limit: 2 * 1024 * 1024, // 2MB + status_query_timeout: 1800, // 30 minutes + status_query_interval: 5, // 5 seconds + wait_for_finalization: false, + authenticaded: true, + }); + let secrets = EigenSecrets { + private_key: PrivateKey::from_str( + "d08aa7ae1bb5ddd46c3c2d8cdb5894ab9f54dec467233686ca42629e826ac4c6", + ) + .unwrap(), + }; + let client = EigenClient::new(config, secrets).await.unwrap(); + let data = vec![1; 20]; + let result = client.dispatch_blob(0, data).await.unwrap(); + let blob_info: BlobInfo = + rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap(); + // TODO: once rlp encoding is added to the client, we can check the contents of the blob_id + assert!(result.blob_id.len() > 0); + } +} diff --git a/core/node/da_clients/src/eigen/sdk.rs b/core/node/da_clients/src/eigen/sdk.rs index c24ba0b96a13..0aac30af8685 100644 --- a/core/node/da_clients/src/eigen/sdk.rs +++ b/core/node/da_clients/src/eigen/sdk.rs @@ -7,6 +7,8 @@ use tonic::{ transport::{Channel, ClientTlsConfig, Endpoint}, Streaming, }; +#[cfg(test)] +use zksync_da_client::types::DAError; use super::disperser::BlobInfo; use crate::eigen::{ @@ -68,16 +70,24 @@ impl RawEigenClient { let mut client_clone = self.client.clone(); let disperse_reply = client_clone.disperse_blob(request).await?.into_inner(); - let verification_proof = self + let blob_info = self .await_for_inclusion(client_clone, disperse_reply) .await?; + + let verification_proof = blob_info + .blob_verification_proof + .clone() + .ok_or_else(|| anyhow::anyhow!("No blob verification proof in response"))?; let blob_id = format!( "{}:{}", verification_proof.batch_id, verification_proof.blob_index ); tracing::info!("Blob dispatch confirmed, blob id: {}", blob_id); - Ok(blob_id) + let blob_id = blob_info::BlobInfo::try_from(blob_info) + .map_err(|e| anyhow::anyhow!("Failed to convert blob info: {}", e))?; + + Ok(hex::encode(rlp::encode(&blob_id))) } async fn dispatch_blob_authenticated(&self, data: Vec) -> anyhow::Result { @@ -248,6 +258,51 @@ impl RawEigenClient { } } } + + #[cfg(test)] + pub async fn get_blob_data(&self, blob_id: &str) -> anyhow::Result>, DAError> { + use anyhow::anyhow; + use zksync_da_client::types::DAError; + + use crate::eigen::blob_info::BlobInfo; + + let commit = hex::decode(blob_id).map_err(|_| DAError { + error: anyhow!("Failed to decode blob_id"), + is_retriable: false, + })?; + let blob_info: BlobInfo = rlp::decode(&commit).map_err(|_| DAError { + error: anyhow!("Failed to decode blob_info"), + is_retriable: false, + })?; + let blob_index = blob_info.blob_verification_proof.blob_index; + let batch_header_hash = blob_info + .blob_verification_proof + .batch_medatada + .batch_header_hash; + let get_response = self + .client + .clone() + .retrieve_blob(disperser::RetrieveBlobRequest { + batch_header_hash, + blob_index, + }) + .await + .map_err(|e| DAError { + error: anyhow!(e), + is_retriable: true, + })? + .into_inner(); + + if get_response.data.len() == 0 { + return Err(DAError { + error: anyhow!("Failed to get blob data"), + is_retriable: false, + }); + } + //TODO: remove zkgpad_rs + let data = kzgpad_rs::remove_empty_byte_from_padded_bytes(&get_response.data); + return Ok(Some(data)); + } } fn get_account_id(secret_key: &SecretKey) -> String { From ab036542e10255c8914989df0bfed47a83e4d4ca Mon Sep 17 00:00:00 2001 From: Gianbelinche <39842759+gianbelinche@users.noreply.github.com> Date: Fri, 1 Nov 2024 14:48:05 -0300 Subject: [PATCH 03/16] Add memstore --- Cargo.lock | 2 + core/node/da_clients/Cargo.toml | 2 + core/node/da_clients/src/eigen/client.rs | 7 + core/node/da_clients/src/eigen/memstore.rs | 232 +++++++++++++++++++++ core/node/da_clients/src/eigen/mod.rs | 1 + 5 files changed, 244 insertions(+) create mode 100644 core/node/da_clients/src/eigen/memstore.rs diff --git a/Cargo.lock b/Cargo.lock index 4db2026ff983..b5d05d3fa60c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10942,6 +10942,7 @@ dependencies = [ "parity-scale-codec", "pbjson-types", "prost 0.12.6", + "rand 0.8.5", "reqwest 0.12.9", "ripemd", "rlp", @@ -10950,6 +10951,7 @@ dependencies = [ "serde", "serde_json", "sha2 0.10.8", + "sha3 0.10.8", "subxt-metadata", "subxt-signer", "tokio", diff --git a/core/node/da_clients/Cargo.toml b/core/node/da_clients/Cargo.toml index 317a889bd8d3..915e78ee70de 100644 --- a/core/node/da_clients/Cargo.toml +++ b/core/node/da_clients/Cargo.toml @@ -57,3 +57,5 @@ pbjson-types.workspace = true tokio-stream.workspace = true rlp.workspace = true kzgpad-rs = { git = "https://github.com/Layr-Labs/kzgpad-rs.git", tag = "v0.1.0" } +rand.workspace = true +sha3.workspace = true diff --git a/core/node/da_clients/src/eigen/client.rs b/core/node/da_clients/src/eigen/client.rs index deeeea624189..07d9dc3c3186 100644 --- a/core/node/da_clients/src/eigen/client.rs +++ b/core/node/da_clients/src/eigen/client.rs @@ -82,6 +82,13 @@ impl EigenClient { }*/ } } + +pub fn to_retriable_error(error: anyhow::Error) -> DAError { + DAError { + error, + is_retriable: true, + } +} #[cfg(test)] mod tests { use zksync_config::configs::da_client::eigen::DisperserConfig; diff --git a/core/node/da_clients/src/eigen/memstore.rs b/core/node/da_clients/src/eigen/memstore.rs new file mode 100644 index 000000000000..e2249b96dc3d --- /dev/null +++ b/core/node/da_clients/src/eigen/memstore.rs @@ -0,0 +1,232 @@ +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, + time::{Duration, Instant}, +}; + +use anyhow::Error; +use rand::{rngs::OsRng, Rng, RngCore}; +use sha3::{Digest, Keccak256}; +use tokio::time::interval; +use zksync_config::configs::da_client::eigen::MemStoreConfig; +use zksync_da_client::types::{DAError, DispatchResponse, InclusionData}; + +use super::{ + blob_info::{self, BlobInfo}, + client::to_retriable_error, +}; + +#[derive(Debug, PartialEq)] +pub enum MemStoreError { + BlobToLarge, + BlobAlreadyExists, + IncorrectCommitment, + #[cfg(test)] + BlobNotFound, +} + +impl Into for MemStoreError { + fn into(self) -> Error { + match self { + MemStoreError::BlobToLarge => Error::msg("Blob too large"), + MemStoreError::BlobAlreadyExists => Error::msg("Blob already exists"), + MemStoreError::IncorrectCommitment => Error::msg("Incorrect commitment"), + #[cfg(test)] + MemStoreError::BlobNotFound => Error::msg("Blob not found"), + } + } +} + +#[derive(Debug)] +struct MemStoreData { + store: HashMap>, + key_starts: HashMap, +} + +#[derive(Clone, Debug)] +pub struct MemStore { + config: MemStoreConfig, + data: Arc>, +} + +impl MemStore { + pub fn new(config: MemStoreConfig) -> Arc { + let memstore = Arc::new(Self { + config, + data: Arc::new(RwLock::new(MemStoreData { + store: HashMap::new(), + key_starts: HashMap::new(), + })), + }); + let store_clone = Arc::clone(&memstore); + tokio::spawn(async move { + store_clone.pruning_loop().await; + }); + memstore + } + + async fn put_blob(self: Arc, value: Vec) -> Result, MemStoreError> { + tokio::time::sleep(Duration::from_millis(self.config.put_latency)).await; + if value.len() as u64 > self.config.max_blob_size_bytes { + return Err(MemStoreError::BlobToLarge.into()); + } + + // todo: Encode blob? + + let mut entropy = [0u8; 10]; + OsRng.fill_bytes(&mut entropy); + + let mut hasher = Keccak256::new(); + hasher.update(&entropy); + let mock_batch_root = hasher.finalize().to_vec(); + + let block_num = OsRng.gen_range(0u32..1000); + + let blob_info = blob_info::BlobInfo { + blob_header: blob_info::BlobHeader { + commitment: blob_info::G1Commitment { + // todo: generate real commitment + x: vec![0u8; 32], + y: vec![0u8; 32], + }, + data_length: value.len() as u32, + blob_quorum_params: vec![blob_info::BlobQuorumParam { + quorum_number: 1, + adversary_threshold_percentage: 29, + confirmation_threshold_percentage: 30, + chunk_length: 300, + }], + }, + blob_verification_proof: blob_info::BlobVerificationProof { + batch_medatada: blob_info::BatchMetadata { + batch_header: blob_info::BatchHeader { + batch_root: mock_batch_root.clone(), + quorum_numbers: vec![0x1, 0x0], + quorum_signed_percentages: vec![0x60, 0x90], + reference_block_number: block_num, + }, + signatory_record_hash: mock_batch_root, + fee: vec![], + confirmation_block_number: block_num, + batch_header_hash: vec![], + }, + batch_id: 69, + blob_index: 420, + inclusion_proof: entropy.to_vec(), + quorum_indexes: vec![0x1, 0x0], + }, + }; + + let cert_bytes = rlp::encode(&blob_info).to_vec(); + + let key = String::from_utf8_lossy( + blob_info + .blob_verification_proof + .inclusion_proof + .clone() + .as_slice(), + ) + .to_string(); + + let mut data = self.data.write().unwrap(); + + if data.store.contains_key(key.as_str()) { + return Err(MemStoreError::BlobAlreadyExists); + } + + data.key_starts.insert(key.clone(), Instant::now()); + data.store.insert(key, value); + Ok(cert_bytes) + } + + pub async fn store_blob( + self: Arc, + blob_data: Vec, + ) -> Result { + let request_id = self + .put_blob(blob_data) + .await + .map_err(|err| to_retriable_error(err.into()))?; + Ok(DispatchResponse { + blob_id: hex::encode(request_id), + }) + } + + pub async fn get_inclusion_data( + self: Arc, + blob_id: &str, + ) -> anyhow::Result, DAError> { + let rlp_encoded_bytes = hex::decode(blob_id).map_err(|_| DAError { + error: MemStoreError::IncorrectCommitment.into(), + is_retriable: false, + })?; + let blob_info: BlobInfo = rlp::decode(&rlp_encoded_bytes).map_err(|_| DAError { + error: MemStoreError::IncorrectCommitment.into(), + is_retriable: false, + })?; + let inclusion_data = blob_info.blob_verification_proof.inclusion_proof; + Ok(Some(InclusionData { + data: inclusion_data, + })) + } + + #[cfg(test)] + pub async fn get_blob_data( + self: Arc, + blob_id: &str, + ) -> anyhow::Result>, DAError> { + tokio::time::sleep(Duration::from_millis(self.config.get_latency)).await; + let request_id = hex::decode(blob_id).map_err(|_| DAError { + error: MemStoreError::IncorrectCommitment.into(), + is_retriable: false, + })?; + let blob_info: BlobInfo = rlp::decode(&request_id).map_err(|_| DAError { + error: MemStoreError::IncorrectCommitment.into(), + is_retriable: false, + })?; + let key = String::from_utf8_lossy( + blob_info + .blob_verification_proof + .inclusion_proof + .clone() + .as_slice(), + ) + .to_string(); + + let data = self.data.read().map_err(|_| DAError { + error: MemStoreError::BlobNotFound.into(), + is_retriable: false, + })?; + match data.store.get(&key) { + Some(value) => Ok(Some(value.clone())), + None => Err(DAError { + error: MemStoreError::BlobNotFound.into(), + is_retriable: false, + }), + } + } + + async fn prune_expired(self: Arc) { + let mut data = self.data.write().unwrap(); + let mut to_remove = vec![]; + for (key, start) in data.key_starts.iter() { + if start.elapsed() > Duration::from_secs(self.config.blob_expiration) { + to_remove.push(key.clone()); + } + } + for key in to_remove { + data.store.remove(&key); + data.key_starts.remove(&key); + } + } + + async fn pruning_loop(self: Arc) { + let mut interval = interval(Duration::from_secs(self.config.blob_expiration)); + + loop { + interval.tick().await; + let self_clone = Arc::clone(&self); + self_clone.prune_expired().await; + } + } +} diff --git a/core/node/da_clients/src/eigen/mod.rs b/core/node/da_clients/src/eigen/mod.rs index f5fd9a9f8637..52f2e39e4590 100644 --- a/core/node/da_clients/src/eigen/mod.rs +++ b/core/node/da_clients/src/eigen/mod.rs @@ -1,5 +1,6 @@ mod blob_info; mod client; +mod memstore; mod sdk; pub use self::client::EigenClient; From c565e84af026f49666e8fefb0847f59b6d3bee6c Mon Sep 17 00:00:00 2001 From: Gianbelinche <39842759+gianbelinche@users.noreply.github.com> Date: Fri, 1 Nov 2024 14:51:09 -0300 Subject: [PATCH 04/16] Add assert to tests --- core/node/da_clients/src/eigen/client.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/core/node/da_clients/src/eigen/client.rs b/core/node/da_clients/src/eigen/client.rs index deeeea624189..5df7b1b02fb0 100644 --- a/core/node/da_clients/src/eigen/client.rs +++ b/core/node/da_clients/src/eigen/client.rs @@ -111,11 +111,12 @@ mod tests { }; let client = EigenClient::new(config, secrets).await.unwrap(); let data = vec![1; 20]; - let result = client.dispatch_blob(0, data).await.unwrap(); + let result = client.dispatch_blob(0, data.clone()).await.unwrap(); let blob_info: BlobInfo = rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap(); - // TODO: once rlp encoding is added to the client, we can check the contents of the blob_id - assert!(result.blob_id.len() > 0); + // TODO: once get inclusion data is added, check it + let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap(); + assert_eq!(retrieved_data.unwrap(), data); } #[tokio::test] async fn test_auth_dispersal() { @@ -139,10 +140,11 @@ mod tests { }; let client = EigenClient::new(config, secrets).await.unwrap(); let data = vec![1; 20]; - let result = client.dispatch_blob(0, data).await.unwrap(); + let result = client.dispatch_blob(0, data.clone()).await.unwrap(); let blob_info: BlobInfo = rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap(); - // TODO: once rlp encoding is added to the client, we can check the contents of the blob_id - assert!(result.blob_id.len() > 0); + // TODO: once get inclusion data is added, check it + let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap(); + assert_eq!(retrieved_data.unwrap(), data); } } From dc5b721ce2a606af1df5c93624ff7dded723d3a6 Mon Sep 17 00:00:00 2001 From: Gianbelinche <39842759+gianbelinche@users.noreply.github.com> Date: Fri, 1 Nov 2024 15:31:31 -0300 Subject: [PATCH 05/16] Add rest of memstore --- core/node/da_clients/src/eigen/client.rs | 77 +++++++++++++++--------- core/node/da_clients/src/eigen/mod.rs | 11 ++++ core/node/da_clients/src/eigen/sdk.rs | 29 +++------ 3 files changed, 68 insertions(+), 49 deletions(-) diff --git a/core/node/da_clients/src/eigen/client.rs b/core/node/da_clients/src/eigen/client.rs index 77d15a15765c..2d0fa0094d0c 100644 --- a/core/node/da_clients/src/eigen/client.rs +++ b/core/node/da_clients/src/eigen/client.rs @@ -9,12 +9,12 @@ use zksync_da_client::{ DataAvailabilityClient, }; -use super::sdk::RawEigenClient; +use super::{memstore::MemStore, sdk::RawEigenClient, Disperser}; use crate::utils::to_non_retriable_da_error; #[derive(Debug, Clone)] pub struct EigenClient { - client: Arc, + client: Disperser, } impl EigenClient { @@ -22,24 +22,14 @@ impl EigenClient { let private_key = SecretKey::from_str(secrets.private_key.0.expose_secret().as_str()) .map_err(|e| anyhow::anyhow!("Failed to parse private key: {}", e))?; - match config { + let disperser: Disperser = match config.clone() { EigenConfig::Disperser(config) => { - // TODO: add complete config - let client = RawEigenClient::new( - config.disperser_rpc, - config.status_query_interval, - private_key, - config.authenticaded, - ) - .await?; - Ok(EigenClient { - client: Arc::new(client), - }) + let client = RawEigenClient::new(private_key, config).await?; + Disperser::Remote(Arc::new(client)) } - EigenConfig::MemStore(_) => { - todo!() - } - } + EigenConfig::MemStore(config) => Disperser::Memory(MemStore::new(config)), + }; + Ok(Self { client: disperser }) } } @@ -50,13 +40,16 @@ impl DataAvailabilityClient for EigenClient { _: u32, // batch number data: Vec, ) -> Result { - let blob_id = self - .client - .dispatch_blob(data) - .await - .map_err(to_non_retriable_da_error)?; - - Ok(DispatchResponse::from(blob_id)) + match &self.client { + Disperser::Remote(remote_disperser) => { + let blob_id = remote_disperser + .dispatch_blob(data) + .await + .map_err(to_non_retriable_da_error)?; + Ok(DispatchResponse::from(blob_id)) + } + Disperser::Memory(memstore) => memstore.clone().store_blob(data).await, + } } async fn get_inclusion_data(&self, _: &str) -> Result, DAError> { @@ -75,11 +68,10 @@ impl DataAvailabilityClient for EigenClient { #[cfg(test)] impl EigenClient { pub async fn get_blob_data(&self, blob_id: &str) -> anyhow::Result>, DAError> { - self.client.get_blob_data(blob_id).await - /*match &self.disperser { + match &self.client { Disperser::Remote(remote_client) => remote_client.get_blob_data(blob_id).await, Disperser::Memory(memstore) => memstore.clone().get_blob_data(blob_id).await, - }*/ + } } } @@ -91,11 +83,38 @@ pub fn to_retriable_error(error: anyhow::Error) -> DAError { } #[cfg(test)] mod tests { - use zksync_config::configs::da_client::eigen::DisperserConfig; + use zksync_config::configs::da_client::eigen::{DisperserConfig, MemStoreConfig}; use zksync_types::secrets::PrivateKey; use super::*; use crate::eigen::blob_info::BlobInfo; + + #[tokio::test] + async fn test_eigenda_memory_disperser() { + let config = EigenConfig::MemStore(MemStoreConfig { + max_blob_size_bytes: 2 * 1024 * 1024, // 2MB, + blob_expiration: 60 * 2, + get_latency: 0, + put_latency: 0, + }); + let secrets = EigenSecrets { + private_key: PrivateKey::from_str( + "d08aa7ae1bb5ddd46c3c2d8cdb5894ab9f54dec467233686ca42629e826ac4c6", + ) + .unwrap(), + }; + let client = EigenClient::new(config, secrets).await.unwrap(); + let data = vec![1u8; 100]; + let result = client.dispatch_blob(0, data.clone()).await.unwrap(); + + let blob_info: BlobInfo = + rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap(); + // TODO: once get inclusion data is added, check it + + let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap(); + assert_eq!(retrieved_data.unwrap(), data); + } + #[tokio::test] async fn test_non_auth_dispersal() { let config = EigenConfig::Disperser(DisperserConfig { diff --git a/core/node/da_clients/src/eigen/mod.rs b/core/node/da_clients/src/eigen/mod.rs index 52f2e39e4590..60df8a6385e7 100644 --- a/core/node/da_clients/src/eigen/mod.rs +++ b/core/node/da_clients/src/eigen/mod.rs @@ -3,6 +3,11 @@ mod client; mod memstore; mod sdk; +use std::sync::Arc; + +use memstore::MemStore; +use sdk::RawEigenClient; + pub use self::client::EigenClient; #[allow(clippy::all)] @@ -14,3 +19,9 @@ pub(crate) mod disperser { pub(crate) mod common { include!("generated/common.rs"); } + +#[derive(Clone, Debug)] +pub(crate) enum Disperser { + Remote(Arc), + Memory(Arc), +} diff --git a/core/node/da_clients/src/eigen/sdk.rs b/core/node/da_clients/src/eigen/sdk.rs index 0aac30af8685..02e7c9bddba7 100644 --- a/core/node/da_clients/src/eigen/sdk.rs +++ b/core/node/da_clients/src/eigen/sdk.rs @@ -7,6 +7,7 @@ use tonic::{ transport::{Channel, ClientTlsConfig, Endpoint}, Streaming, }; +use zksync_config::configs::da_client::eigen::DisperserConfig; #[cfg(test)] use zksync_da_client::types::DAError; @@ -24,10 +25,8 @@ use crate::eigen::{ #[derive(Debug, Clone)] pub struct RawEigenClient { client: DisperserClient, - polling_interval: Duration, private_key: SecretKey, - account_id: String, - authenticated_dispersal: bool, + config: DisperserConfig, } pub(crate) const DATA_CHUNK_SIZE: usize = 32; @@ -35,27 +34,17 @@ pub(crate) const DATA_CHUNK_SIZE: usize = 32; impl RawEigenClient { pub(crate) const BUFFER_SIZE: usize = 1000; - pub async fn new( - rpc_node_url: String, - inclusion_polling_interval_ms: u64, - private_key: SecretKey, - authenticated_dispersal: bool, - ) -> anyhow::Result { - let endpoint = - Endpoint::from_str(rpc_node_url.as_str())?.tls_config(ClientTlsConfig::new())?; + pub async fn new(private_key: SecretKey, config: DisperserConfig) -> anyhow::Result { + let endpoint = Endpoint::from_str(&config.disperser_rpc.as_str())? + .tls_config(ClientTlsConfig::new())?; let client = DisperserClient::connect(endpoint) .await .map_err(|e| anyhow::anyhow!("Failed to connect to Disperser server: {}", e))?; - let polling_interval = Duration::from_millis(inclusion_polling_interval_ms); - - let account_id = get_account_id(&private_key); Ok(RawEigenClient { client, - polling_interval, private_key, - account_id, - authenticated_dispersal, + config, }) } @@ -144,7 +133,7 @@ impl RawEigenClient { } pub async fn dispatch_blob(&self, data: Vec) -> anyhow::Result { - match self.authenticated_dispersal { + match self.config.authenticaded { true => self.dispatch_blob_authenticated(data).await, false => self.dispatch_blob_non_authenticated(data).await, } @@ -159,7 +148,7 @@ impl RawEigenClient { payload: Some(DisperseRequest(disperser::DisperseBlobRequest { data, custom_quorum_numbers: vec![], - account_id: self.account_id.clone(), + account_id: get_account_id(&self.private_key), })), }; @@ -233,7 +222,7 @@ impl RawEigenClient { }; loop { - tokio::time::sleep(self.polling_interval).await; + tokio::time::sleep(Duration::from_millis(self.config.status_query_interval)).await; let resp = client .get_blob_status(polling_request.clone()) .await? From 1bac89ad09373d72160b920e3abbd6c08cb46f56 Mon Sep 17 00:00:00 2001 From: Gianbelinche <39842759+gianbelinche@users.noreply.github.com> Date: Fri, 1 Nov 2024 17:01:19 -0300 Subject: [PATCH 06/16] Add soft confirmations --- core/node/da_clients/src/eigen/client.rs | 34 ++++++++++++++++++++++-- core/node/da_clients/src/eigen/sdk.rs | 18 ++++++++++--- 2 files changed, 47 insertions(+), 5 deletions(-) diff --git a/core/node/da_clients/src/eigen/client.rs b/core/node/da_clients/src/eigen/client.rs index 2d0fa0094d0c..f689a501be3e 100644 --- a/core/node/da_clients/src/eigen/client.rs +++ b/core/node/da_clients/src/eigen/client.rs @@ -125,7 +125,7 @@ mod tests { eigenda_svc_manager_address: "0xD4A7E1Bd8015057293f0D0A557088c286942e84b".to_string(), blob_size_limit: 2 * 1024 * 1024, // 2MB status_query_timeout: 1800, // 30 minutes - status_query_interval: 5, // 5 seconds + status_query_interval: 5, // 5 ms wait_for_finalization: false, authenticaded: false, }); @@ -154,7 +154,7 @@ mod tests { eigenda_svc_manager_address: "0xD4A7E1Bd8015057293f0D0A557088c286942e84b".to_string(), blob_size_limit: 2 * 1024 * 1024, // 2MB status_query_timeout: 1800, // 30 minutes - status_query_interval: 5, // 5 seconds + status_query_interval: 5, // 5 ms wait_for_finalization: false, authenticaded: true, }); @@ -173,4 +173,34 @@ mod tests { let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap(); assert_eq!(retrieved_data.unwrap(), data); } + + #[tokio::test] + async fn test_wait_for_finalization() { + let config = EigenConfig::Disperser(DisperserConfig { + custom_quorum_numbers: None, + disperser_rpc: "https://disperser-holesky.eigenda.xyz:443".to_string(), + eth_confirmation_depth: -1, + eigenda_eth_rpc: String::default(), + eigenda_svc_manager_address: "0xD4A7E1Bd8015057293f0D0A557088c286942e84b".to_string(), + blob_size_limit: 2 * 1024 * 1024, // 2MB + status_query_timeout: 1800, // 30 minutes + status_query_interval: 5000, // 5000 ms + wait_for_finalization: true, + authenticaded: true, + }); + let secrets = EigenSecrets { + private_key: PrivateKey::from_str( + "d08aa7ae1bb5ddd46c3c2d8cdb5894ab9f54dec467233686ca42629e826ac4c6", + ) + .unwrap(), + }; + let client = EigenClient::new(config, secrets).await.unwrap(); + let data = vec![1; 20]; + let result = client.dispatch_blob(0, data.clone()).await.unwrap(); + let blob_info: BlobInfo = + rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap(); + // TODO: once get inclusion data is added, check it + let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap(); + assert_eq!(retrieved_data.unwrap(), data); + } } diff --git a/core/node/da_clients/src/eigen/sdk.rs b/core/node/da_clients/src/eigen/sdk.rs index 02e7c9bddba7..92d3b4773fa6 100644 --- a/core/node/da_clients/src/eigen/sdk.rs +++ b/core/node/da_clients/src/eigen/sdk.rs @@ -1,7 +1,7 @@ use std::{str::FromStr, time::Duration}; use secp256k1::{ecdsa::RecoverableSignature, SecretKey}; -use tokio::sync::mpsc; +use tokio::{sync::mpsc, time::Instant}; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tonic::{ transport::{Channel, ClientTlsConfig, Endpoint}, @@ -221,7 +221,8 @@ impl RawEigenClient { request_id: disperse_blob_reply.request_id, }; - loop { + let start_time = Instant::now(); + while Instant::now() - start_time < Duration::from_secs(self.config.status_query_timeout) { tokio::time::sleep(Duration::from_millis(self.config.status_query_interval)).await; let resp = client .get_blob_status(polling_request.clone()) @@ -236,7 +237,16 @@ impl RawEigenClient { disperser::BlobStatus::InsufficientSignatures => { return Err(anyhow::anyhow!("Insufficient signatures")) } - disperser::BlobStatus::Confirmed | disperser::BlobStatus::Finalized => { + disperser::BlobStatus::Confirmed => { + println!("Blob dispatch confirmed"); + if !self.config.wait_for_finalization { + let blob_info = resp + .info + .ok_or_else(|| anyhow::anyhow!("No blob header in response"))?; + return Ok(blob_info); + } + } + disperser::BlobStatus::Finalized => { let blob_info = resp .info .ok_or_else(|| anyhow::anyhow!("No blob header in response"))?; @@ -246,6 +256,8 @@ impl RawEigenClient { _ => return Err(anyhow::anyhow!("Received unknown blob status")), } } + + Err(anyhow::anyhow!("Failed to disperse blob (timeout)")) } #[cfg(test)] From 70b10a95188dda67e4d8b4d73a927ae87a86e0f0 Mon Sep 17 00:00:00 2001 From: Gianbelinche <39842759+gianbelinche@users.noreply.github.com> Date: Fri, 1 Nov 2024 18:00:55 -0300 Subject: [PATCH 07/16] Remove print --- core/node/da_clients/src/eigen/sdk.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/core/node/da_clients/src/eigen/sdk.rs b/core/node/da_clients/src/eigen/sdk.rs index 92d3b4773fa6..f079e1a6b42f 100644 --- a/core/node/da_clients/src/eigen/sdk.rs +++ b/core/node/da_clients/src/eigen/sdk.rs @@ -238,7 +238,6 @@ impl RawEigenClient { return Err(anyhow::anyhow!("Insufficient signatures")) } disperser::BlobStatus::Confirmed => { - println!("Blob dispatch confirmed"); if !self.config.wait_for_finalization { let blob_info = resp .info From 46838fcaed9ed5e26c8ede5777894ff773a18812 Mon Sep 17 00:00:00 2001 From: Gianbelinche <39842759+gianbelinche@users.noreply.github.com> Date: Fri, 1 Nov 2024 18:14:32 -0300 Subject: [PATCH 08/16] Address pr comments --- core/node/da_clients/src/eigen/client.rs | 23 ++++++++++++---------- core/node/da_clients/src/eigen/memstore.rs | 19 ++---------------- 2 files changed, 15 insertions(+), 27 deletions(-) diff --git a/core/node/da_clients/src/eigen/client.rs b/core/node/da_clients/src/eigen/client.rs index 2d0fa0094d0c..3e389476bad3 100644 --- a/core/node/da_clients/src/eigen/client.rs +++ b/core/node/da_clients/src/eigen/client.rs @@ -40,16 +40,19 @@ impl DataAvailabilityClient for EigenClient { _: u32, // batch number data: Vec, ) -> Result { - match &self.client { - Disperser::Remote(remote_disperser) => { - let blob_id = remote_disperser - .dispatch_blob(data) - .await - .map_err(to_non_retriable_da_error)?; - Ok(DispatchResponse::from(blob_id)) - } - Disperser::Memory(memstore) => memstore.clone().store_blob(data).await, - } + let blob_id = match &self.client { + Disperser::Remote(remote_disperser) => remote_disperser + .dispatch_blob(data) + .await + .map_err(to_non_retriable_da_error)?, + Disperser::Memory(memstore) => memstore + .clone() + .put_blob(data) + .await + .map_err(to_non_retriable_da_error)?, + }; + + Ok(DispatchResponse::from(blob_id)) } async fn get_inclusion_data(&self, _: &str) -> Result, DAError> { diff --git a/core/node/da_clients/src/eigen/memstore.rs b/core/node/da_clients/src/eigen/memstore.rs index e2249b96dc3d..dc47260b9ec8 100644 --- a/core/node/da_clients/src/eigen/memstore.rs +++ b/core/node/da_clients/src/eigen/memstore.rs @@ -65,14 +65,12 @@ impl MemStore { memstore } - async fn put_blob(self: Arc, value: Vec) -> Result, MemStoreError> { + pub async fn put_blob(self: Arc, value: Vec) -> Result { tokio::time::sleep(Duration::from_millis(self.config.put_latency)).await; if value.len() as u64 > self.config.max_blob_size_bytes { return Err(MemStoreError::BlobToLarge.into()); } - // todo: Encode blob? - let mut entropy = [0u8; 10]; OsRng.fill_bytes(&mut entropy); @@ -136,20 +134,7 @@ impl MemStore { data.key_starts.insert(key.clone(), Instant::now()); data.store.insert(key, value); - Ok(cert_bytes) - } - - pub async fn store_blob( - self: Arc, - blob_data: Vec, - ) -> Result { - let request_id = self - .put_blob(blob_data) - .await - .map_err(|err| to_retriable_error(err.into()))?; - Ok(DispatchResponse { - blob_id: hex::encode(request_id), - }) + Ok(hex::encode(cert_bytes)) } pub async fn get_inclusion_data( From ec1a65a2bcd62fcd5ca7d8c6adb4a104a6cade33 Mon Sep 17 00:00:00 2001 From: Gianbelinche <39842759+gianbelinche@users.noreply.github.com> Date: Fri, 1 Nov 2024 18:23:54 -0300 Subject: [PATCH 09/16] Remove to retriable error --- core/node/da_clients/src/eigen/client.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/core/node/da_clients/src/eigen/client.rs b/core/node/da_clients/src/eigen/client.rs index 2a7e2a77fa1e..0a47f1fb0624 100644 --- a/core/node/da_clients/src/eigen/client.rs +++ b/core/node/da_clients/src/eigen/client.rs @@ -77,13 +77,6 @@ impl EigenClient { } } } - -pub fn to_retriable_error(error: anyhow::Error) -> DAError { - DAError { - error, - is_retriable: true, - } -} #[cfg(test)] mod tests { use zksync_config::configs::da_client::eigen::{DisperserConfig, MemStoreConfig}; From edafcffd8aee4c66f4582f6304375b03876bbaa7 Mon Sep 17 00:00:00 2001 From: Gianbelinche <39842759+gianbelinche@users.noreply.github.com> Date: Mon, 4 Nov 2024 11:06:55 -0300 Subject: [PATCH 10/16] Fix conflicts --- core/node/da_clients/src/eigen/client.rs | 11 +++++++++-- core/node/da_clients/src/eigen/memstore.rs | 5 +---- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/core/node/da_clients/src/eigen/client.rs b/core/node/da_clients/src/eigen/client.rs index 17f9b2833ca1..6c721df5da35 100644 --- a/core/node/da_clients/src/eigen/client.rs +++ b/core/node/da_clients/src/eigen/client.rs @@ -10,7 +10,7 @@ use zksync_da_client::{ DataAvailabilityClient, }; -use super::{memstore::MemStore, sdk::RawEigenClient, Disperser,blob_info::BlobInfo}; +use super::{blob_info::BlobInfo, memstore::MemStore, sdk::RawEigenClient, Disperser}; use crate::utils::to_non_retriable_da_error; #[derive(Debug, Clone)] @@ -191,7 +191,14 @@ mod tests { let blob_info: BlobInfo = rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap(); - // TODO: once get inclusion data is added, check it + let expected_inclusion_data = blob_info.blob_verification_proof.inclusion_proof; + let actual_inclusion_data = client + .get_inclusion_data(&result.blob_id) + .await + .unwrap() + .unwrap() + .data; + assert_eq!(expected_inclusion_data, actual_inclusion_data); let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap(); assert_eq!(retrieved_data.unwrap(), data); diff --git a/core/node/da_clients/src/eigen/memstore.rs b/core/node/da_clients/src/eigen/memstore.rs index dc47260b9ec8..e3443c58e41b 100644 --- a/core/node/da_clients/src/eigen/memstore.rs +++ b/core/node/da_clients/src/eigen/memstore.rs @@ -11,10 +11,7 @@ use tokio::time::interval; use zksync_config::configs::da_client::eigen::MemStoreConfig; use zksync_da_client::types::{DAError, DispatchResponse, InclusionData}; -use super::{ - blob_info::{self, BlobInfo}, - client::to_retriable_error, -}; +use super::blob_info::{self, BlobInfo}; #[derive(Debug, PartialEq)] pub enum MemStoreError { From 3bd58123fe0b37dda2b0f1f753e305157ce79332 Mon Sep 17 00:00:00 2001 From: Gianbelinche <39842759+gianbelinche@users.noreply.github.com> Date: Mon, 4 Nov 2024 11:38:51 -0300 Subject: [PATCH 11/16] Add inclusion data --- core/node/da_clients/src/eigen/client.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/core/node/da_clients/src/eigen/client.rs b/core/node/da_clients/src/eigen/client.rs index 76b8c98df4b1..fe1c0bd8c5dd 100644 --- a/core/node/da_clients/src/eigen/client.rs +++ b/core/node/da_clients/src/eigen/client.rs @@ -229,7 +229,14 @@ mod tests { let result = client.dispatch_blob(0, data.clone()).await.unwrap(); let blob_info: BlobInfo = rlp::decode(&hex::decode(result.blob_id.clone()).unwrap()).unwrap(); - // TODO: once get inclusion data is added, check it + let expected_inclusion_data = blob_info.blob_verification_proof.inclusion_proof; + let actual_inclusion_data = client + .get_inclusion_data(&result.blob_id) + .await + .unwrap() + .unwrap() + .data; + assert_eq!(expected_inclusion_data, actual_inclusion_data); let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap(); assert_eq!(retrieved_data.unwrap(), data); } From 860bc28e1793027d8230434f9ab14d6ebc5426a8 Mon Sep 17 00:00:00 2001 From: Gianbelinche <39842759+gianbelinche@users.noreply.github.com> Date: Mon, 4 Nov 2024 13:52:52 -0300 Subject: [PATCH 12/16] Change status query timeout to millis --- core/node/da_clients/src/eigen/client.rs | 6 +++--- core/node/da_clients/src/eigen/sdk.rs | 3 ++- eigenda-integration.md | 4 ++-- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/core/node/da_clients/src/eigen/client.rs b/core/node/da_clients/src/eigen/client.rs index fe1c0bd8c5dd..dc5919275f92 100644 --- a/core/node/da_clients/src/eigen/client.rs +++ b/core/node/da_clients/src/eigen/client.rs @@ -106,7 +106,7 @@ mod tests { eigenda_eth_rpc: String::default(), eigenda_svc_manager_address: "0xD4A7E1Bd8015057293f0D0A557088c286942e84b".to_string(), blob_size_limit: 2 * 1024 * 1024, // 2MB - status_query_timeout: 1800, // 30 minutes + status_query_timeout: 1800000, // 30 minutes status_query_interval: 5, // 5 ms wait_for_finalization: false, authenticaded: false, @@ -143,7 +143,7 @@ mod tests { eigenda_eth_rpc: String::default(), eigenda_svc_manager_address: "0xD4A7E1Bd8015057293f0D0A557088c286942e84b".to_string(), blob_size_limit: 2 * 1024 * 1024, // 2MB - status_query_timeout: 1800, // 30 minutes + status_query_timeout: 1800000, // 30 minutes status_query_interval: 5, // 5 ms wait_for_finalization: false, authenticaded: true, @@ -213,7 +213,7 @@ mod tests { eigenda_eth_rpc: String::default(), eigenda_svc_manager_address: "0xD4A7E1Bd8015057293f0D0A557088c286942e84b".to_string(), blob_size_limit: 2 * 1024 * 1024, // 2MB - status_query_timeout: 1800, // 30 minutes + status_query_timeout: 1800000, // 30 minutes status_query_interval: 5000, // 5000 ms wait_for_finalization: true, authenticaded: true, diff --git a/core/node/da_clients/src/eigen/sdk.rs b/core/node/da_clients/src/eigen/sdk.rs index f079e1a6b42f..9f007631d702 100644 --- a/core/node/da_clients/src/eigen/sdk.rs +++ b/core/node/da_clients/src/eigen/sdk.rs @@ -222,7 +222,8 @@ impl RawEigenClient { }; let start_time = Instant::now(); - while Instant::now() - start_time < Duration::from_secs(self.config.status_query_timeout) { + while Instant::now() - start_time < Duration::from_millis(self.config.status_query_timeout) + { tokio::time::sleep(Duration::from_millis(self.config.status_query_interval)).await; let resp = client .get_blob_status(polling_request.clone()) diff --git a/eigenda-integration.md b/eigenda-integration.md index 99d86e5f4730..6209996977b4 100644 --- a/eigenda-integration.md +++ b/eigenda-integration.md @@ -40,8 +40,8 @@ da_client: eigenda_eth_rpc: eigenda_svc_manager_address: '0xD4A7E1Bd8015057293f0D0A557088c286942e84b' blob_size_limit: 2097152 - status_query_timeout: 1800 - status_query_interval: 5 + status_query_timeout: 1800000 # ms + status_query_interval: 5 # ms wait_for_finalization: false authenticated: false ``` From 22fd8a2dbb71eb5d6d7a5cc2227c5829080e5af0 Mon Sep 17 00:00:00 2001 From: Gianbelinche <39842759+gianbelinche@users.noreply.github.com> Date: Mon, 4 Nov 2024 17:48:25 -0300 Subject: [PATCH 13/16] Document memstore --- core/node/da_clients/src/eigen/memstore.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/core/node/da_clients/src/eigen/memstore.rs b/core/node/da_clients/src/eigen/memstore.rs index e3443c58e41b..e8c90fc5f177 100644 --- a/core/node/da_clients/src/eigen/memstore.rs +++ b/core/node/da_clients/src/eigen/memstore.rs @@ -40,6 +40,8 @@ struct MemStoreData { key_starts: HashMap, } +/// This struct represents a memory store for blobs. +/// It should be used for testing purposes only. #[derive(Clone, Debug)] pub struct MemStore { config: MemStoreConfig, @@ -62,6 +64,7 @@ impl MemStore { memstore } + /// Saves a blob to the memory store, it harcodes the blob info, since we don't care about it in a memory based store pub async fn put_blob(self: Arc, value: Vec) -> Result { tokio::time::sleep(Duration::from_millis(self.config.put_latency)).await; if value.len() as u64 > self.config.max_blob_size_bytes { @@ -134,6 +137,7 @@ impl MemStore { Ok(hex::encode(cert_bytes)) } + /// It returns the inclusion proof pub async fn get_inclusion_data( self: Arc, blob_id: &str, @@ -152,6 +156,7 @@ impl MemStore { })) } + /// This function is only used on tests, it returns the blob data #[cfg(test)] pub async fn get_blob_data( self: Arc, @@ -188,6 +193,7 @@ impl MemStore { } } + /// After some time has passed, blobs are removed from the store async fn prune_expired(self: Arc) { let mut data = self.data.write().unwrap(); let mut to_remove = vec![]; @@ -202,6 +208,7 @@ impl MemStore { } } + /// Loop used to prune expired blobs async fn pruning_loop(self: Arc) { let mut interval = interval(Duration::from_secs(self.config.blob_expiration)); From 84cbecd63fe21853e7987df497f7792ad4249151 Mon Sep 17 00:00:00 2001 From: Gianbelinche <39842759+gianbelinche@users.noreply.github.com> Date: Mon, 4 Nov 2024 18:01:55 -0300 Subject: [PATCH 14/16] Fix typo --- core/node/da_clients/src/eigen/sdk.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/node/da_clients/src/eigen/sdk.rs b/core/node/da_clients/src/eigen/sdk.rs index 02e7c9bddba7..a1fed693af05 100644 --- a/core/node/da_clients/src/eigen/sdk.rs +++ b/core/node/da_clients/src/eigen/sdk.rs @@ -133,7 +133,7 @@ impl RawEigenClient { } pub async fn dispatch_blob(&self, data: Vec) -> anyhow::Result { - match self.config.authenticaded { + match self.config.authenticated { true => self.dispatch_blob_authenticated(data).await, false => self.dispatch_blob_non_authenticated(data).await, } From 1476acf820948c9a198d1d7a36796359d4fe7d09 Mon Sep 17 00:00:00 2001 From: Gianbelinche <39842759+gianbelinche@users.noreply.github.com> Date: Mon, 4 Nov 2024 18:02:43 -0300 Subject: [PATCH 15/16] Fix typo --- core/node/da_clients/src/eigen/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/node/da_clients/src/eigen/client.rs b/core/node/da_clients/src/eigen/client.rs index 59f6d2937716..52572158af12 100644 --- a/core/node/da_clients/src/eigen/client.rs +++ b/core/node/da_clients/src/eigen/client.rs @@ -216,7 +216,7 @@ mod tests { status_query_timeout: 1800000, // 30 minutes status_query_interval: 5000, // 5000 ms wait_for_finalization: true, - authenticaded: true, + authenticated: true, }); let secrets = EigenSecrets { private_key: PrivateKey::from_str( From 0b646992eb69569d4fa584e9e1b11cf40ef4fe67 Mon Sep 17 00:00:00 2001 From: Gianbelinche <39842759+gianbelinche@users.noreply.github.com> Date: Tue, 5 Nov 2024 14:40:40 -0300 Subject: [PATCH 16/16] Format --- core/node/da_clients/src/eigen/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/node/da_clients/src/eigen/client.rs b/core/node/da_clients/src/eigen/client.rs index 3696b6340adb..272217fdc5ae 100644 --- a/core/node/da_clients/src/eigen/client.rs +++ b/core/node/da_clients/src/eigen/client.rs @@ -254,7 +254,7 @@ mod tests { let retrieved_data = client.get_blob_data(&result.blob_id).await.unwrap(); assert_eq!(retrieved_data.unwrap(), data); } - + #[tokio::test] async fn test_eigenda_dispatch_blob_too_large() { let config = EigenConfig::MemStore(MemStoreConfig {