From 4c5af898fdd27640ffd90276d027d40ce73edda0 Mon Sep 17 00:00:00 2001 From: juan518munoz <62400508+juan518munoz@users.noreply.github.com> Date: Tue, 19 Nov 2024 17:32:08 -0300 Subject: [PATCH] feat(eigen-client-m0-implementation): grafana metrics (#334) * initial commit * initial commit * fix ambiguous job name & add docker restart command * fix integration test command * update readme, remove fetching lambda repo * reorganize readme instructions * Fix concurrent dispatcher (#338) * Add shutdown to dispatch batches * Add JoinSet * Format code * Fix unbounded channel breaking authenticated dispersal * Fix pr comments * feat(eigen-client-m0-implementation): optimize concurrent dispatcher (#345) * initial commit * optimize dispatch_batches fn * remove commented code * remove needless variables * optimize inclusion_poller fn * break loop if dispatch fail * remove client_lock variable * switch to retriable err * replace arbitrary value with config --------- Co-authored-by: Gianbelinche <39842759+gianbelinche@users.noreply.github.com> --- core/node/da_clients/src/eigen/client.rs | 4 +- .../src/eigen/eigenda-integration.md | 63 +++- core/node/da_clients/src/eigen/sdk.rs | 79 +++-- core/node/da_dispatcher/src/da_dispatcher.rs | 322 +++++++++--------- 4 files changed, 259 insertions(+), 209 deletions(-) diff --git a/core/node/da_clients/src/eigen/client.rs b/core/node/da_clients/src/eigen/client.rs index 44ca7355ad47..3e74840a403a 100644 --- a/core/node/da_clients/src/eigen/client.rs +++ b/core/node/da_clients/src/eigen/client.rs @@ -11,7 +11,7 @@ use zksync_da_client::{ }; use super::{blob_info::BlobInfo, memstore::MemStore, sdk::RawEigenClient, Disperser}; -use crate::utils::to_non_retriable_da_error; +use crate::utils::{to_non_retriable_da_error, to_retriable_da_error}; /// EigenClient is a client for the Eigen DA service. /// It can be configured to use one of two dispersal methods: @@ -58,7 +58,7 @@ impl DataAvailabilityClient for EigenClient { Disperser::Remote(remote_disperser) => remote_disperser .dispatch_blob(data) .await - .map_err(to_non_retriable_da_error)?, + .map_err(to_retriable_da_error)?, Disperser::Memory(memstore) => memstore .clone() .put_blob(data) diff --git a/core/node/da_clients/src/eigen/eigenda-integration.md b/core/node/da_clients/src/eigen/eigenda-integration.md index 3dbe81e09525..985881d3f6c0 100644 --- a/core/node/da_clients/src/eigen/eigenda-integration.md +++ b/core/node/da_clients/src/eigen/eigenda-integration.md @@ -76,7 +76,27 @@ cargo install --path zkstack_cli/crates/zkstack --force --locked zkstack containers --observability true ``` -3. Create `eigen_da` chain +3. Temporary metrics setup (until `era-observabilty` changes are also merged) + +a. Setup the observability container at least once so the `era-observability` directory is cloned. + +```bash +zkstack containers --observability true +``` + +b. Add `lambda` remote to the `era-observability` project: + +```bash +cd era-observability && git remote add lambda https://github.com/lambdaclass/era-observability.git +``` + +c. Fetch and checkout the `eigenda` branch: + +```bash +git fetch lambda && git checkout eigenda +``` + +4. Create `eigen_da` chain ```bash zkstack chain create \ @@ -91,7 +111,7 @@ zkstack chain create \ --set-as-default false ``` -4. Initialize created ecosystem +5. Initialize created ecosystem ```bash zkstack ecosystem init \ @@ -107,7 +127,42 @@ zkstack ecosystem init \ You may enable observability here if you want to. -5. Start the server +6. Setup grafana dashboard for Data Availability + +a. Get the running port of the eigen_da chain in the `chains/eigen_da/configs/general.yaml` file: + +```yaml +prometheus: + listener_port: 3414 # <- this is the port +``` + +(around line 108) + +Then modify the `era-observability/etc/prometheus/prometheus.yml` with the retrieved port: + +```yaml +- job_name: 'zksync' + scrape_interval: 5s + honor_labels: true + static_configs: + - targets: ['host.docker.internal:3312'] # <- change this to the port +``` + +b. Enable the Data Availability Grafana dashboard + +```bash +mv era-observability/additional_dashboards/EigenDA.json era-observability/dashboards/EigenDA.json +``` + +c. Restart the era-observability container + +```bash +docker ps --filter "label=com.docker.compose.project=era-observability" -q | xargs docker restart +``` + +(this can also be done through the docker dashboard) + +7. Start the server ```bash zkstack server --chain eigen_da @@ -125,7 +180,7 @@ And with the server running on one terminal, you can run the server integration following command: ```bash -zkstack dev test --chain eigen_da +zkstack dev test integration --chain eigen_da ``` ## Mainnet/Testnet setup diff --git a/core/node/da_clients/src/eigen/sdk.rs b/core/node/da_clients/src/eigen/sdk.rs index b7ae37260ff3..bc43dfa9090e 100644 --- a/core/node/da_clients/src/eigen/sdk.rs +++ b/core/node/da_clients/src/eigen/sdk.rs @@ -1,8 +1,11 @@ -use std::{str::FromStr, time::Duration}; +use std::{str::FromStr, sync::Arc, time::Duration}; use secp256k1::{ecdsa::RecoverableSignature, SecretKey}; -use tokio::{sync::mpsc, time::Instant}; -use tokio_stream::{wrappers::ReceiverStream, StreamExt}; +use tokio::{ + sync::{mpsc, Mutex}, + time::Instant, +}; +use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt}; use tonic::{ transport::{Channel, ClientTlsConfig, Endpoint}, Streaming, @@ -28,7 +31,7 @@ use crate::eigen::{ #[derive(Debug, Clone)] pub(crate) struct RawEigenClient { - client: DisperserClient, + client: Arc>>, private_key: SecretKey, pub config: DisperserConfig, verifier: Verifier, @@ -37,14 +40,14 @@ pub(crate) struct RawEigenClient { pub(crate) const DATA_CHUNK_SIZE: usize = 32; impl RawEigenClient { - pub(crate) const BUFFER_SIZE: usize = 1000; - pub async fn new(private_key: SecretKey, config: DisperserConfig) -> anyhow::Result { let endpoint = Endpoint::from_str(config.disperser_rpc.as_str())?.tls_config(ClientTlsConfig::new())?; - let client = DisperserClient::connect(endpoint) - .await - .map_err(|e| anyhow::anyhow!("Failed to connect to Disperser server: {}", e))?; + let client = Arc::new(Mutex::new( + DisperserClient::connect(endpoint) + .await + .map_err(|e| anyhow::anyhow!("Failed to connect to Disperser server: {}", e))?, + )); let verifier_config = VerifierConfig { verify_certs: true, @@ -72,13 +75,16 @@ impl RawEigenClient { account_id: String::default(), // Account Id is not used in non-authenticated mode }; - let mut client_clone = self.client.clone(); - let disperse_reply = client_clone.disperse_blob(request).await?.into_inner(); + let disperse_reply = self + .client + .lock() + .await + .disperse_blob(request) + .await? + .into_inner(); let disperse_time = Instant::now(); - let blob_info = self - .await_for_inclusion(client_clone, disperse_reply) - .await?; + let blob_info = self.await_for_inclusion(disperse_reply).await?; let disperse_elapsed = Instant::now() - disperse_time; let blob_info = blob_info::BlobInfo::try_from(blob_info) @@ -118,25 +124,29 @@ impl RawEigenClient { } async fn dispatch_blob_authenticated(&self, data: Vec) -> anyhow::Result { - let mut client_clone = self.client.clone(); - let (tx, rx) = mpsc::channel(Self::BUFFER_SIZE); + let (tx, rx) = mpsc::unbounded_channel(); let disperse_time = Instant::now(); - let response_stream = client_clone.disperse_blob_authenticated(ReceiverStream::new(rx)); - let padded_data = convert_by_padding_empty_byte(&data); // 1. send DisperseBlobRequest - self.disperse_data(padded_data, &tx).await?; + let padded_data = convert_by_padding_empty_byte(&data); + self.disperse_data(padded_data, &tx)?; // this await is blocked until the first response on the stream, so we only await after sending the `DisperseBlobRequest` - let mut response_stream = response_stream.await?.into_inner(); + let mut response_stream = self + .client + .clone() + .lock() + .await + .disperse_blob_authenticated(UnboundedReceiverStream::new(rx)) + .await?; + let response_stream = response_stream.get_mut(); // 2. receive BlobAuthHeader - let blob_auth_header = self.receive_blob_auth_header(&mut response_stream).await?; + let blob_auth_header = self.receive_blob_auth_header(response_stream).await?; // 3. sign and send BlobAuthHeader - self.submit_authentication_data(blob_auth_header.clone(), &tx) - .await?; + self.submit_authentication_data(blob_auth_header.clone(), &tx)?; // 4. receive DisperseBlobReply let reply = response_stream @@ -152,9 +162,7 @@ impl RawEigenClient { }; // 5. poll for blob status until it reaches the Confirmed state - let blob_info = self - .await_for_inclusion(client_clone, disperse_reply) - .await?; + let blob_info = self.await_for_inclusion(disperse_reply).await?; let blob_info = blob_info::BlobInfo::try_from(blob_info) .map_err(|e| anyhow::anyhow!("Failed to convert blob info: {}", e))?; @@ -183,10 +191,10 @@ impl RawEigenClient { } } - async fn disperse_data( + fn disperse_data( &self, data: Vec, - tx: &mpsc::Sender, + tx: &mpsc::UnboundedSender, ) -> anyhow::Result<()> { let req = disperser::AuthenticatedRequest { payload: Some(DisperseRequest(disperser::DisperseBlobRequest { @@ -197,14 +205,13 @@ impl RawEigenClient { }; tx.send(req) - .await .map_err(|e| anyhow::anyhow!("Failed to send DisperseBlobRequest: {}", e)) } - async fn submit_authentication_data( + fn submit_authentication_data( &self, blob_auth_header: BlobAuthHeader, - tx: &mpsc::Sender, + tx: &mpsc::UnboundedSender, ) -> anyhow::Result<()> { // TODO: replace challenge_parameter with actual auth header when it is available let digest = zksync_basic_types::web3::keccak256( @@ -228,7 +235,6 @@ impl RawEigenClient { }; tx.send(req) - .await .map_err(|e| anyhow::anyhow!("Failed to send AuthenticationData: {}", e)) } @@ -258,7 +264,6 @@ impl RawEigenClient { async fn await_for_inclusion( &self, - mut client: DisperserClient, disperse_blob_reply: DisperseBlobReply, ) -> anyhow::Result { let polling_request = disperser::BlobStatusRequest { @@ -269,7 +274,10 @@ impl RawEigenClient { while Instant::now() - start_time < Duration::from_millis(self.config.status_query_timeout) { tokio::time::sleep(Duration::from_millis(self.config.status_query_interval)).await; - let resp = client + let resp = self + .client + .lock() + .await .get_blob_status(polling_request.clone()) .await? .into_inner(); @@ -326,7 +334,8 @@ impl RawEigenClient { .batch_header_hash; let get_response = self .client - .clone() + .lock() + .await .retrieve_blob(disperser::RetrieveBlobRequest { batch_header_hash, blob_index, diff --git a/core/node/da_dispatcher/src/da_dispatcher.rs b/core/node/da_dispatcher/src/da_dispatcher.rs index 52d2168c1cf7..3279313548e1 100644 --- a/core/node/da_dispatcher/src/da_dispatcher.rs +++ b/core/node/da_dispatcher/src/da_dispatcher.rs @@ -2,10 +2,18 @@ use std::{collections::HashSet, future::Future, sync::Arc, time::Duration}; use anyhow::Context; use chrono::Utc; -use futures::future::join_all; use rand::Rng; -use tokio::sync::{mpsc, watch::Receiver, Mutex, Notify}; -use zksync_config::{configs::da_dispatcher::DEFAULT_MAX_CONCURRENT_REQUESTS, DADispatcherConfig}; +use tokio::{ + sync::{ + watch::{self, Receiver}, + Mutex, Notify, + }, + task::JoinSet, +}; +use zksync_config::{ + configs::da_dispatcher::{DEFAULT_MAX_CONCURRENT_REQUESTS, DEFAULT_POLLING_INTERVAL_MS}, + DADispatcherConfig, +}; use zksync_da_client::{ types::{DAError, InclusionData}, DataAvailabilityClient, @@ -63,99 +71,75 @@ impl DataAvailabilityDispatcher { } async fn dispatch_batches(&self, stop_receiver: Receiver) -> anyhow::Result<()> { - let (tx, mut rx) = mpsc::channel( - self.config - .max_concurrent_requests - .unwrap_or(DEFAULT_MAX_CONCURRENT_REQUESTS) as usize, - ); - let next_expected_batch = Arc::new(Mutex::new(None)); + let mut pending_batches = HashSet::new(); + let mut dispatcher_tasks: JoinSet> = JoinSet::new(); - let stop_receiver_clone = stop_receiver.clone(); - let pool_clone = self.pool.clone(); - let config_clone = self.config.clone(); - let next_expected_batch_clone = next_expected_batch.clone(); - let pending_blobs_reader = tokio::spawn(async move { - // Used to avoid sending the same batch multiple times - let mut pending_batches = HashSet::new(); - loop { - if *stop_receiver_clone.borrow() { - tracing::info!("Stop signal received, da_dispatcher is shutting down"); - break; - } - - let mut conn = pool_clone.connection_tagged("da_dispatcher").await?; - let batches = conn - .data_availability_dal() - .get_ready_for_da_dispatch_l1_batches( - config_clone.max_rows_to_dispatch() as usize - ) - .await?; - drop(conn); - for batch in batches { - if pending_batches.contains(&batch.l1_batch_number.0) { - continue; - } - - // This should only happen once. - // We can't assume that the first batch is always 1 because the dispatcher can be restarted - // and resume from a different batch. - let mut next_expected_batch_lock = next_expected_batch_clone.lock().await; - if next_expected_batch_lock.is_none() { - next_expected_batch_lock.replace(batch.l1_batch_number); - } - - pending_batches.insert(batch.l1_batch_number.0); - METRICS.blobs_pending_dispatch.inc_by(1); - tx.send(batch).await?; - } - - tokio::time::sleep(Duration::from_secs(5)).await; + let (shutdown_tx, shutdown_rx) = watch::channel(false); + let notifier = Arc::new(Notify::new()); + loop { + if *stop_receiver.clone().borrow() { + tracing::info!("Stop signal received, da_dispatcher is shutting down"); + break; + } + if *shutdown_rx.borrow() { + tracing::error!("A blob dispatch failed, da_dispatcher is shutting down"); + break; } - Ok::<(), anyhow::Error>(()) - }); - let pool = self.pool.clone(); - let config = self.config.clone(); - let client = self.client.clone(); - let request_semaphore = self.request_semaphore.clone(); - let notifier = Arc::new(Notify::new()); - let pending_blobs_sender = tokio::spawn(async move { - let mut spawned_requests = vec![]; - let notifier = notifier.clone(); - loop { - if *stop_receiver.borrow() { - break; + let mut conn = self.pool.connection_tagged("da_dispatcher").await?; + let batches = conn + .data_availability_dal() + .get_ready_for_da_dispatch_l1_batches(self.config.max_rows_to_dispatch() as usize) + .await?; + drop(conn); + let shutdown_tx = shutdown_tx.clone(); + for batch in batches { + if pending_batches.contains(&batch.l1_batch_number.0) { + continue; } - let batch = match rx.recv().await { - Some(batch) => batch, - None => continue, // Should never happen - }; + // This should only happen once. + // We can't assume that the first batch is always 1 because the dispatcher can be restarted + // and resume from a different batch. + let mut next_expected_batch_lock = next_expected_batch.lock().await; + if next_expected_batch_lock.is_none() { + next_expected_batch_lock.replace(batch.l1_batch_number); + } + drop(next_expected_batch_lock); - // Block until we can send the request - let permit = request_semaphore.clone().acquire_owned().await?; + pending_batches.insert(batch.l1_batch_number.0); + METRICS.blobs_pending_dispatch.inc_by(1); - let client = client.clone(); - let pool = pool.clone(); - let config = config.clone(); - let next_expected_batch = next_expected_batch.clone(); + let request_semaphore = self.request_semaphore.clone(); + let client = self.client.clone(); + let config = self.config.clone(); let notifier = notifier.clone(); - let request = tokio::spawn(async move { - let _permit = permit; // move permit into scope + let shutdown_rx = shutdown_rx.clone(); + let shutdown_tx = shutdown_tx.clone(); + let next_expected_batch = next_expected_batch.clone(); + let pool = self.pool.clone(); + dispatcher_tasks.spawn(async move { + let permit = request_semaphore.clone().acquire_owned().await?; let dispatch_latency = METRICS.blob_dispatch_latency.start(); - let dispatch_response = - retry(config.max_retries(), batch.l1_batch_number, || { - client.dispatch_blob(batch.l1_batch_number.0, batch.pubdata.clone()) - }) - .await - .with_context(|| { - format!( - "failed to dispatch a blob with batch_number: {}, pubdata_len: {}", - batch.l1_batch_number, - batch.pubdata.len() - ) - })?; + + let result = retry(config.max_retries(), batch.l1_batch_number, || { + client.dispatch_blob(batch.l1_batch_number.0, batch.pubdata.clone()) + }) + .await; + drop(permit); + if result.is_err() { + shutdown_tx.clone().send(true)?; + notifier.notify_waiters(); + }; + + let dispatch_response = result.with_context(|| { + format!( + "failed to dispatch a blob with batch_number: {}, pubdata_len: {}", + batch.l1_batch_number, + batch.pubdata.len() + ) + })?; let dispatch_latency_duration = dispatch_latency.observe(); let sent_at = Utc::now().naive_utc(); @@ -169,6 +153,12 @@ impl DataAvailabilityDispatcher { batch.l1_batch_number > next_expected_batch }) { + if *shutdown_rx.clone().borrow() { + return Err(anyhow::anyhow!( + "Batch {} failed to disperse: Shutdown signal received", + batch.l1_batch_number + )); + } notifier.clone().notified().await; } @@ -184,14 +174,14 @@ impl DataAvailabilityDispatcher { // Update the next expected batch number next_expected_batch - .lock() - .await - .replace(batch.l1_batch_number + 1); + .lock() + .await + .replace(batch.l1_batch_number + 1); notifier.notify_waiters(); METRICS - .last_dispatched_l1_batch - .set(batch.l1_batch_number.0 as usize); + .last_dispatched_l1_batch + .set(batch.l1_batch_number.0 as usize); METRICS.blob_size.observe(batch.pubdata.len()); METRICS.blobs_dispatched.inc_by(1); METRICS.blobs_pending_dispatch.dec_by(1); @@ -200,81 +190,67 @@ impl DataAvailabilityDispatcher { batch.l1_batch_number, batch.pubdata.len(), ); - - Ok::<(), anyhow::Error>(()) + Ok(()) }); - spawned_requests.push(request); } - join_all(spawned_requests).await; - Ok::<(), anyhow::Error>(()) - }); - let results = join_all(vec![pending_blobs_reader, pending_blobs_sender]).await; - for result in results { - result??; + // Sleep so we prevent hammering the database + tokio::time::sleep(Duration::from_secs( + self.config + .polling_interval_ms + .unwrap_or(DEFAULT_POLLING_INTERVAL_MS) as u64, + )) + .await; + } + + while let Some(next) = dispatcher_tasks.join_next().await { + match next { + Ok(value) => match value { + Ok(_) => (), + Err(err) => { + dispatcher_tasks.shutdown().await; + return Err(err); + } + }, + Err(err) => { + dispatcher_tasks.shutdown().await; + return Err(err.into()); + } + } } + Ok(()) } async fn inclusion_poller(&self, stop_receiver: Receiver) -> anyhow::Result<()> { - let (tx, mut rx) = mpsc::channel( - self.config - .max_concurrent_requests - .unwrap_or(DEFAULT_MAX_CONCURRENT_REQUESTS) as usize, - ); + let mut pending_inclusions = HashSet::new(); + let mut inclusion_tasks: JoinSet> = JoinSet::new(); - let stop_receiver_clone = stop_receiver.clone(); - let pool_clone = self.pool.clone(); - let pending_inclusion_reader = tokio::spawn(async move { - let mut pending_inclusions = HashSet::new(); - loop { - if *stop_receiver_clone.borrow() { - break; - } + loop { + if *stop_receiver.borrow() { + break; + } - let mut conn = pool_clone.connection_tagged("da_dispatcher").await?; - let pending_blobs = conn - .data_availability_dal() - .get_da_blob_ids_awaiting_inclusion() - .await?; - drop(conn); + let mut conn = self.pool.connection_tagged("da_dispatcher").await?; + let pending_blobs = conn + .data_availability_dal() + .get_da_blob_ids_awaiting_inclusion() + .await?; + drop(conn); - for blob_info in pending_blobs.into_iter().flatten() { - if pending_inclusions.contains(&blob_info.blob_id) { - continue; - } - pending_inclusions.insert(blob_info.blob_id.clone()); - tx.send(blob_info).await?; - } - tokio::time::sleep(Duration::from_secs(5)).await; - } - Ok::<(), anyhow::Error>(()) - }); - - let pool = self.pool.clone(); - let config = self.config.clone(); - let client = self.client.clone(); - let semaphore = self.request_semaphore.clone(); - let pending_inclusion_sender = tokio::spawn(async move { - let mut spawned_requests = vec![]; - loop { - if *stop_receiver.borrow() { - break; + for blob_info in pending_blobs.into_iter().flatten() { + if pending_inclusions.contains(&blob_info.blob_id) { + continue; } - let blob_info = match rx.recv().await { - Some(blob_info) => blob_info, - None => continue, // Should never happen - }; - - // Block until we can send the request - let permit = semaphore.clone().acquire_owned().await?; - - let client = client.clone(); - let pool = pool.clone(); - let config = config.clone(); - let request = tokio::spawn(async move { - let _permit = permit; // move permit into scope + pending_inclusions.insert(blob_info.blob_id.clone()); + + let client = self.client.clone(); + let config = self.config.clone(); + let pool = self.pool.clone(); + let request_semaphore = self.request_semaphore.clone(); + inclusion_tasks.spawn(async move { let inclusion_data = if config.use_dummy_inclusion_data() { + let _permit = request_semaphore.acquire_owned().await?; client .get_inclusion_data(blob_info.blob_id.as_str()) .await @@ -284,11 +260,11 @@ impl DataAvailabilityDispatcher { blob_info.blob_id, blob_info.l1_batch_number ) })? - } else { - // if the inclusion verification is disabled, we don't need to wait for the inclusion - // data before committing the batch, so simply return an empty vector - Some(InclusionData { data: vec![] }) - }; + } else { + // if the inclusion verification is disabled, we don't need to wait for the inclusion + // data before committing the batch, so simply return an empty vector + Some(InclusionData { data: vec![] }) + }; let Some(inclusion_data) = inclusion_data else { return Ok(()); @@ -300,7 +276,7 @@ impl DataAvailabilityDispatcher { L1BatchNumber(blob_info.l1_batch_number.0), inclusion_data.data.as_slice(), ) - .await?; + .await?; drop(conn); let inclusion_latency = Utc::now().signed_duration_since(blob_info.sent_at); @@ -317,19 +293,29 @@ impl DataAvailabilityDispatcher { blob_info.l1_batch_number, inclusion_latency.num_seconds() ); - - Ok::<(), anyhow::Error>(()) + Ok(()) }); - spawned_requests.push(request); } - join_all(spawned_requests).await; - Ok::<(), anyhow::Error>(()) - }); - let results = join_all(vec![pending_inclusion_reader, pending_inclusion_sender]).await; - for result in results { - result??; + // Sleep so we prevent hammering the database + tokio::time::sleep(Duration::from_secs( + self.config + .polling_interval_ms + .unwrap_or(DEFAULT_POLLING_INTERVAL_MS) as u64, + )) + .await; } + + while let Some(next) = inclusion_tasks.join_next().await { + match next { + Ok(_) => (), + Err(e) => { + inclusion_tasks.shutdown().await; + return Err(e.into()); + } + } + } + Ok(()) } }