From 939566a73f656f77279bf8e6bf8030ff4e90a202 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Fri, 8 Nov 2024 16:04:34 -0300 Subject: [PATCH 1/8] initial commit --- core/lib/config/src/configs/da_dispatcher.rs | 4 + core/lib/config/src/testonly.rs | 1 + core/lib/env_config/src/da_dispatcher.rs | 5 +- core/lib/protobuf_config/src/da_dispatcher.rs | 2 + .../src/proto/config/da_dispatcher.proto | 1 + core/node/da_dispatcher/src/da_dispatcher.rs | 422 ++++++++++++------ core/node/da_dispatcher/src/metrics.rs | 6 + 7 files changed, 307 insertions(+), 134 deletions(-) diff --git a/core/lib/config/src/configs/da_dispatcher.rs b/core/lib/config/src/configs/da_dispatcher.rs index e9ad6bd3c074..c8bf1b3b8995 100644 --- a/core/lib/config/src/configs/da_dispatcher.rs +++ b/core/lib/config/src/configs/da_dispatcher.rs @@ -6,6 +6,7 @@ pub const DEFAULT_POLLING_INTERVAL_MS: u32 = 5000; pub const DEFAULT_MAX_ROWS_TO_DISPATCH: u32 = 100; pub const DEFAULT_MAX_RETRIES: u16 = 5; pub const DEFAULT_USE_DUMMY_INCLUSION_DATA: bool = false; +pub const DEFAULT_MAX_CONCURRENT_REQUESTS: u32 = 100; #[derive(Debug, Clone, PartialEq, Deserialize)] pub struct DADispatcherConfig { @@ -19,6 +20,8 @@ pub struct DADispatcherConfig { // TODO: run a verification task to check if the L1 contract expects the inclusion proofs to // avoid the scenario where contracts expect real proofs, and server is using dummy proofs. pub use_dummy_inclusion_data: Option, + /// The maximun number of concurrent request to send to the DA server. + pub max_concurrent_requests: Option, } impl DADispatcherConfig { @@ -28,6 +31,7 @@ impl DADispatcherConfig { max_rows_to_dispatch: Some(DEFAULT_MAX_ROWS_TO_DISPATCH), max_retries: Some(DEFAULT_MAX_RETRIES), use_dummy_inclusion_data: Some(DEFAULT_USE_DUMMY_INCLUSION_DATA), + max_concurrent_requests: Some(DEFAULT_MAX_CONCURRENT_REQUESTS), } } diff --git a/core/lib/config/src/testonly.rs b/core/lib/config/src/testonly.rs index 93d502cc4e8a..58cf4015cf0b 100644 --- a/core/lib/config/src/testonly.rs +++ b/core/lib/config/src/testonly.rs @@ -972,6 +972,7 @@ impl Distribution for EncodeDist { max_rows_to_dispatch: self.sample(rng), max_retries: self.sample(rng), use_dummy_inclusion_data: self.sample(rng), + max_concurrent_requests: self.sample(rng), } } } diff --git a/core/lib/env_config/src/da_dispatcher.rs b/core/lib/env_config/src/da_dispatcher.rs index 246752db91ac..805e6b2234b5 100644 --- a/core/lib/env_config/src/da_dispatcher.rs +++ b/core/lib/env_config/src/da_dispatcher.rs @@ -21,12 +21,14 @@ mod tests { interval: u32, rows_limit: u32, max_retries: u16, + max_concurrent_requests: u32, ) -> DADispatcherConfig { DADispatcherConfig { polling_interval_ms: Some(interval), max_rows_to_dispatch: Some(rows_limit), max_retries: Some(max_retries), use_dummy_inclusion_data: Some(true), + max_concurrent_requests: Some(max_concurrent_requests), } } @@ -38,9 +40,10 @@ mod tests { DA_DISPATCHER_MAX_ROWS_TO_DISPATCH=60 DA_DISPATCHER_MAX_RETRIES=7 DA_DISPATCHER_USE_DUMMY_INCLUSION_DATA="true" + DA_DISPATCHER_MAX_CONCURRENT_REQUESTS=10 "#; lock.set_env(config); let actual = DADispatcherConfig::from_env().unwrap(); - assert_eq!(actual, expected_da_layer_config(5000, 60, 7)); + assert_eq!(actual, expected_da_layer_config(5000, 60, 7, 10)); } } diff --git a/core/lib/protobuf_config/src/da_dispatcher.rs b/core/lib/protobuf_config/src/da_dispatcher.rs index d77073bd32cf..e85ff5ae76ed 100644 --- a/core/lib/protobuf_config/src/da_dispatcher.rs +++ b/core/lib/protobuf_config/src/da_dispatcher.rs @@ -12,6 +12,7 @@ impl ProtoRepr for proto::DataAvailabilityDispatcher { max_rows_to_dispatch: self.max_rows_to_dispatch, max_retries: self.max_retries.map(|x| x as u16), use_dummy_inclusion_data: self.use_dummy_inclusion_data, + max_concurrent_requests: self.max_concurrent_requests, }) } @@ -21,6 +22,7 @@ impl ProtoRepr for proto::DataAvailabilityDispatcher { max_rows_to_dispatch: this.max_rows_to_dispatch, max_retries: this.max_retries.map(Into::into), use_dummy_inclusion_data: this.use_dummy_inclusion_data, + max_concurrent_requests: this.max_concurrent_requests, } } } diff --git a/core/lib/protobuf_config/src/proto/config/da_dispatcher.proto b/core/lib/protobuf_config/src/proto/config/da_dispatcher.proto index dd366bd5b925..d6329d14b281 100644 --- a/core/lib/protobuf_config/src/proto/config/da_dispatcher.proto +++ b/core/lib/protobuf_config/src/proto/config/da_dispatcher.proto @@ -7,4 +7,5 @@ message DataAvailabilityDispatcher { optional uint32 max_rows_to_dispatch = 2; optional uint32 max_retries = 3; optional bool use_dummy_inclusion_data = 4; + optional uint32 max_concurrent_requests = 5; } diff --git a/core/node/da_dispatcher/src/da_dispatcher.rs b/core/node/da_dispatcher/src/da_dispatcher.rs index f8e6f6b31723..d038bc87eec6 100644 --- a/core/node/da_dispatcher/src/da_dispatcher.rs +++ b/core/node/da_dispatcher/src/da_dispatcher.rs @@ -1,10 +1,11 @@ -use std::{future::Future, time::Duration}; +use std::{collections::HashSet, future::Future, sync::Arc, time::Duration}; use anyhow::Context; use chrono::Utc; +use futures::future::join_all; use rand::Rng; -use tokio::sync::watch::Receiver; -use zksync_config::DADispatcherConfig; +use tokio::sync::{mpsc, watch::Receiver, Mutex, Notify}; +use zksync_config::{configs::da_dispatcher::DEFAULT_MAX_CONCURRENT_REQUESTS, DADispatcherConfig}; use zksync_da_client::{ types::{DAError, InclusionData}, DataAvailabilityClient, @@ -19,6 +20,7 @@ pub struct DataAvailabilityDispatcher { client: Box, pool: ConnectionPool, config: DADispatcherConfig, + request_semaphore: Arc, } impl DataAvailabilityDispatcher { @@ -27,158 +29,312 @@ impl DataAvailabilityDispatcher { config: DADispatcherConfig, client: Box, ) -> Self { + let request_semaphore = Arc::new(tokio::sync::Semaphore::new( + config + .max_concurrent_requests + .unwrap_or(DEFAULT_MAX_CONCURRENT_REQUESTS) as usize, + )); Self { pool, config, client, + request_semaphore, } } - pub async fn run(self, mut stop_receiver: Receiver) -> anyhow::Result<()> { - loop { - if *stop_receiver.borrow() { - break; - } + pub async fn run(self, stop_receiver: Receiver) -> anyhow::Result<()> { + let subtasks = futures::future::join( + async { + if let Err(err) = self.dispatch_batches(stop_receiver.clone()).await { + tracing::error!("dispatch error {err:?}"); + } + }, + async { + if let Err(err) = self.inclusion_poller(stop_receiver.clone()).await { + tracing::error!("poll_for_inclusion error {err:?}"); + } + }, + ); + + tokio::select! { + _ = subtasks => {}, + } + Ok(()) + } - let subtasks = futures::future::join( - async { - if let Err(err) = self.dispatch().await { - tracing::error!("dispatch error {err:?}"); + async fn dispatch_batches(&self, stop_receiver: Receiver) -> anyhow::Result<()> { + let (tx, mut rx) = mpsc::channel( + self.config + .max_concurrent_requests + .unwrap_or(DEFAULT_MAX_CONCURRENT_REQUESTS) as usize, + ); + + let next_expected_batch = Arc::new(Mutex::new(None)); + + let stop_receiver_clone = stop_receiver.clone(); + let pool_clone = self.pool.clone(); + let config_clone = self.config.clone(); + let next_expected_batch_clone = next_expected_batch.clone(); + let pending_blobs_reader = tokio::spawn(async move { + // Used to avoid sending the same batch multiple times + let mut pending_batches = HashSet::new(); + loop { + if *stop_receiver_clone.borrow() { + tracing::info!("Stop signal received, da_dispatcher is shutting down"); + break; + } + + let mut conn = pool_clone.connection_tagged("da_dispatcher").await?; + let batches = conn + .data_availability_dal() + .get_ready_for_da_dispatch_l1_batches( + config_clone.max_rows_to_dispatch() as usize + ) + .await?; + drop(conn); + for batch in batches { + if pending_batches.contains(&batch.l1_batch_number.0) { + continue; } - }, - async { - if let Err(err) = self.poll_for_inclusion().await { - tracing::error!("poll_for_inclusion error {err:?}"); + + // This should only happen once. + // We can't assume that the first batch is always 1 because the dispatcher can be restarted + // and resume from a different batch. + let mut next_expected_batch_lock = next_expected_batch_clone.lock().await; + if next_expected_batch_lock.is_none() { + next_expected_batch_lock.replace(batch.l1_batch_number); } - }, - ); - tokio::select! { - _ = subtasks => {}, - _ = stop_receiver.changed() => { - break; + pending_batches.insert(batch.l1_batch_number.0); + METRICS.blobs_pending_dispatch.inc_by(1); + tx.send(batch).await?; } - } - if tokio::time::timeout(self.config.polling_interval(), stop_receiver.changed()) - .await - .is_ok() - { - break; + tokio::time::sleep(Duration::from_secs(5)).await; } - } + Ok::<(), anyhow::Error>(()) + }); - tracing::info!("Stop signal received, da_dispatcher is shutting down"); - Ok(()) - } + let pool = self.pool.clone(); + let config = self.config.clone(); + let client = self.client.clone(); + let request_semaphore = self.request_semaphore.clone(); + let notifier = Arc::new(Notify::new()); + let pending_blobs_sender = tokio::spawn(async move { + let mut spawned_requests = vec![]; + let notifier = notifier.clone(); + loop { + if *stop_receiver.borrow() { + break; + } - /// Dispatches the blobs to the data availability layer, and saves the blob_id in the database. - async fn dispatch(&self) -> anyhow::Result<()> { - let mut conn = self.pool.connection_tagged("da_dispatcher").await?; - let batches = conn - .data_availability_dal() - .get_ready_for_da_dispatch_l1_batches(self.config.max_rows_to_dispatch() as usize) - .await?; - drop(conn); - - for batch in batches { - let dispatch_latency = METRICS.blob_dispatch_latency.start(); - let dispatch_response = retry(self.config.max_retries(), batch.l1_batch_number, || { - self.client - .dispatch_blob(batch.l1_batch_number.0, batch.pubdata.clone()) - }) - .await - .with_context(|| { - format!( - "failed to dispatch a blob with batch_number: {}, pubdata_len: {}", - batch.l1_batch_number, - batch.pubdata.len() - ) - })?; - let dispatch_latency_duration = dispatch_latency.observe(); - - let sent_at = Utc::now().naive_utc(); - - let mut conn = self.pool.connection_tagged("da_dispatcher").await?; - conn.data_availability_dal() - .insert_l1_batch_da( - batch.l1_batch_number, - dispatch_response.blob_id.as_str(), - sent_at, - ) - .await?; - drop(conn); - - METRICS - .last_dispatched_l1_batch - .set(batch.l1_batch_number.0 as usize); - METRICS.blob_size.observe(batch.pubdata.len()); - tracing::info!( - "Dispatched a DA for batch_number: {}, pubdata_size: {}, dispatch_latency: {dispatch_latency_duration:?}", - batch.l1_batch_number, - batch.pubdata.len(), - ); - } + let batch = match rx.recv().await { + Some(batch) => batch, + None => continue, // Should never happen + }; + + // Block until we can send the request + let permit = request_semaphore.clone().acquire_owned().await?; + + let client = client.clone(); + let pool = pool.clone(); + let config = config.clone(); + let next_expected_batch = next_expected_batch.clone(); + let notifier = notifier.clone(); + let request = tokio::spawn(async move { + let _permit = permit; // move permit into scope + let dispatch_latency = METRICS.blob_dispatch_latency.start(); + let dispatch_response = + retry(config.max_retries(), batch.l1_batch_number, || { + client.dispatch_blob(batch.l1_batch_number.0, batch.pubdata.clone()) + }) + .await + .with_context(|| { + format!( + "failed to dispatch a blob with batch_number: {}, pubdata_len: {}", + batch.l1_batch_number, + batch.pubdata.len() + ) + })?; + let dispatch_latency_duration = dispatch_latency.observe(); + + let sent_at = Utc::now().naive_utc(); + + // Before saving the blob in the database, we need to be sure that we are doing it + // in the correct order. + while next_expected_batch + .lock() + .await + .map_or(true, |next_expected_batch| { + batch.l1_batch_number > next_expected_batch + }) + { + notifier.clone().notified().await; + } + + let mut conn = pool.connection_tagged("da_dispatcher").await?; + conn.data_availability_dal() + .insert_l1_batch_da( + batch.l1_batch_number, + dispatch_response.blob_id.as_str(), + sent_at, + ) + .await?; + drop(conn); + // Update the next expected batch number + next_expected_batch + .lock() + .await + .replace(batch.l1_batch_number + 1); + notifier.notify_waiters(); + + METRICS + .last_dispatched_l1_batch + .set(batch.l1_batch_number.0 as usize); + METRICS.blob_size.observe(batch.pubdata.len()); + METRICS.blobs_dispatched.inc_by(1); + METRICS.blobs_pending_dispatch.dec_by(1); + tracing::info!( + "Dispatched a DA for batch_number: {}, pubdata_size: {}, dispatch_latency: {dispatch_latency_duration:?}", + batch.l1_batch_number, + batch.pubdata.len(), + ); + + Ok::<(), anyhow::Error>(()) + }); + spawned_requests.push(request); + } + join_all(spawned_requests).await; + Ok::<(), anyhow::Error>(()) + }); + + let results = join_all(vec![pending_blobs_reader, pending_blobs_sender]).await; + for result in results { + result??; + } Ok(()) } - /// Polls the data availability layer for inclusion data, and saves it in the database. - async fn poll_for_inclusion(&self) -> anyhow::Result<()> { - let mut conn = self.pool.connection_tagged("da_dispatcher").await?; - let blob_info = conn - .data_availability_dal() - .get_first_da_blob_awaiting_inclusion() - .await?; - drop(conn); - - let Some(blob_info) = blob_info else { - return Ok(()); - }; - - let inclusion_data = if self.config.use_dummy_inclusion_data() { - self.client - .get_inclusion_data(blob_info.blob_id.as_str()) - .await - .with_context(|| { - format!( - "failed to get inclusion data for blob_id: {}, batch_number: {}", - blob_info.blob_id, blob_info.l1_batch_number - ) - })? - } else { - // if the inclusion verification is disabled, we don't need to wait for the inclusion - // data before committing the batch, so simply return an empty vector - Some(InclusionData { data: vec![] }) - }; - - let Some(inclusion_data) = inclusion_data else { - return Ok(()); - }; - - let mut conn = self.pool.connection_tagged("da_dispatcher").await?; - conn.data_availability_dal() - .save_l1_batch_inclusion_data( - L1BatchNumber(blob_info.l1_batch_number.0), - inclusion_data.data.as_slice(), - ) - .await?; - drop(conn); - - let inclusion_latency = Utc::now().signed_duration_since(blob_info.sent_at); - if let Ok(latency) = inclusion_latency.to_std() { - METRICS.inclusion_latency.observe(latency); - } - METRICS - .last_included_l1_batch - .set(blob_info.l1_batch_number.0 as usize); - - tracing::info!( - "Received an inclusion data for a batch_number: {}, inclusion_latency_seconds: {}", - blob_info.l1_batch_number, - inclusion_latency.num_seconds() + async fn inclusion_poller(&self, stop_receiver: Receiver) -> anyhow::Result<()> { + let (tx, mut rx) = mpsc::channel( + self.config + .max_concurrent_requests + .unwrap_or(DEFAULT_MAX_CONCURRENT_REQUESTS) as usize, ); + let stop_receiver_clone = stop_receiver.clone(); + let pool_clone = self.pool.clone(); + let pending_inclusion_reader = tokio::spawn(async move { + let mut pending_inclusions = HashSet::new(); + loop { + if *stop_receiver_clone.borrow() { + break; + } + + let mut conn = pool_clone.connection_tagged("da_dispatcher").await?; + + // TODO: this query might always return the same blob if the blob is not included + // we should probably change the query to return all blobs that are not included + let blob_info = conn + .data_availability_dal() + .get_first_da_blob_awaiting_inclusion() + .await?; + drop(conn); + + let Some(blob_info) = blob_info else { + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + }; + + if pending_inclusions.contains(&blob_info.blob_id) { + continue; + } + pending_inclusions.insert(blob_info.blob_id.clone()); + tx.send(blob_info).await?; + } + Ok::<(), anyhow::Error>(()) + }); + + let pool = self.pool.clone(); + let config = self.config.clone(); + let client = self.client.clone(); + let semaphore = self.request_semaphore.clone(); + let pending_inclusion_sender = tokio::spawn(async move { + let mut spawned_requests = vec![]; + loop { + if *stop_receiver.borrow() { + break; + } + let blob_info = match rx.recv().await { + Some(blob_info) => blob_info, + None => continue, // Should never happen + }; + + // Block until we can send the request + let permit = semaphore.clone().acquire_owned().await?; + + let client = client.clone(); + let pool = pool.clone(); + let config = config.clone(); + let request = tokio::spawn(async move { + let _permit = permit; // move permit into scope + let inclusion_data = if config.use_dummy_inclusion_data() { + client + .get_inclusion_data(blob_info.blob_id.as_str()) + .await + .with_context(|| { + format!( + "failed to get inclusion data for blob_id: {}, batch_number: {}", + blob_info.blob_id, blob_info.l1_batch_number + ) + })? + } else { + // if the inclusion verification is disabled, we don't need to wait for the inclusion + // data before committing the batch, so simply return an empty vector + Some(InclusionData { data: vec![] }) + }; + + let Some(inclusion_data) = inclusion_data else { + return Ok(()); + }; + + let mut conn = pool.connection_tagged("da_dispatcher").await?; + conn.data_availability_dal() + .save_l1_batch_inclusion_data( + L1BatchNumber(blob_info.l1_batch_number.0), + inclusion_data.data.as_slice(), + ) + .await?; + drop(conn); + + let inclusion_latency = Utc::now().signed_duration_since(blob_info.sent_at); + if let Ok(latency) = inclusion_latency.to_std() { + METRICS.inclusion_latency.observe(latency); + } + METRICS + .last_included_l1_batch + .set(blob_info.l1_batch_number.0 as usize); + METRICS.blobs_included.inc_by(1); + + tracing::info!( + "Received an inclusion data for a batch_number: {}, inclusion_latency_seconds: {}", + blob_info.l1_batch_number, + inclusion_latency.num_seconds() + ); + + Ok::<(), anyhow::Error>(()) + }); + spawned_requests.push(request); + } + join_all(spawned_requests).await; + Ok::<(), anyhow::Error>(()) + }); + + let results = join_all(vec![pending_inclusion_reader, pending_inclusion_sender]).await; + for result in results { + result??; + } Ok(()) } } diff --git a/core/node/da_dispatcher/src/metrics.rs b/core/node/da_dispatcher/src/metrics.rs index 67ac5ed68222..4c21e556abe1 100644 --- a/core/node/da_dispatcher/src/metrics.rs +++ b/core/node/da_dispatcher/src/metrics.rs @@ -19,6 +19,12 @@ pub(super) struct DataAvailabilityDispatcherMetrics { /// Buckets are bytes ranging from 1 KB to 16 MB, which has to satisfy all blob size values. #[metrics(buckets = Buckets::exponential(1_024.0..=16.0 * 1_024.0 * 1_024.0, 2.0), unit = Unit::Bytes)] pub blob_size: Histogram, + /// Amount of pending blobs to be dispatched. + pub blobs_pending_dispatch: Gauge, + /// Total number of blobs dispatched. + pub blobs_dispatched: Gauge, + /// Total number of blobs included. + pub blobs_included: Gauge, /// Number of transactions resent by the DA dispatcher. #[metrics(buckets = Buckets::linear(0.0..=10.0, 1.0))] From 2848d23c3c1f2cf7f47deb145d52c7563e97d166 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Fri, 8 Nov 2024 17:47:10 -0300 Subject: [PATCH 2/8] initial commit --- .../src/eigen/eigenda-integration.md | 28 +++++++++++++++++++ zkstack_cli/crates/common/src/git.rs | 26 +++++++++++++++++ .../commands/ecosystem/setup_observability.rs | 11 +++++++- 3 files changed, 64 insertions(+), 1 deletion(-) diff --git a/core/node/da_clients/src/eigen/eigenda-integration.md b/core/node/da_clients/src/eigen/eigenda-integration.md index 3dbe81e09525..689915eb363e 100644 --- a/core/node/da_clients/src/eigen/eigenda-integration.md +++ b/core/node/da_clients/src/eigen/eigenda-integration.md @@ -113,6 +113,34 @@ You may enable observability here if you want to. zkstack server --chain eigen_da ``` +### Data Availability Grafana Metrics + +1. Get the running port of the eigen_da chain in the `chains/eigen_da/configs/general.yaml` file: + +```yaml +prometheus: + listener_port: 3414 # <- this is the port +``` + +(around line 108) + +Then modify the `era-observability/etc/prometheus/prometheus.yml` with the retrieved port: + +```yaml +scrape_interval: 5s +honor_labels: true +static_configs: + - targets: ['host.docker.internal:3312'] # <- change this to the port +``` + +2. Enable the Data Availability Grafana dashboard + +```bash +mv era-observability/additional_dashboards/EigenDA.json era-observability/dashboards/EigenDA.json +``` + +3. Restart the era-observability container + ### Testing Modify the following flag in `core/lib/config/src/configs/da_dispatcher.rs` (then restart the server) diff --git a/zkstack_cli/crates/common/src/git.rs b/zkstack_cli/crates/common/src/git.rs index ea6540c20b29..aa766c7d8ed4 100644 --- a/zkstack_cli/crates/common/src/git.rs +++ b/zkstack_cli/crates/common/src/git.rs @@ -38,3 +38,29 @@ pub fn pull(shell: &Shell, link_to_code: PathBuf) -> anyhow::Result<()> { Cmd::new(cmd!(shell, "git pull origin {current_branch}")).run()?; Ok(()) } + +pub fn add_remote( + shell: &Shell, + link_to_code: PathBuf, + remote_name: &str, + remote_url: &str, +) -> anyhow::Result<()> { + let _dir_guard = shell.push_dir(link_to_code); + match Cmd::new(cmd!(shell, "git remote add {remote_name} {remote_url}")).run() { + Ok(_) => {} + Err(e) => { + if !e.to_string().contains("already exists") { + return Err(e.into()); + } + } + } + + Cmd::new(cmd!(shell, "git fetch {remote_name}")).run()?; + Ok(()) +} + +pub fn checkout(shell: &Shell, link_to_code: PathBuf, branch: &str) -> anyhow::Result<()> { + let _dir_guard = shell.push_dir(link_to_code); + Cmd::new(cmd!(shell, "git checkout {branch}")).run()?; + Ok(()) +} diff --git a/zkstack_cli/crates/zkstack/src/commands/ecosystem/setup_observability.rs b/zkstack_cli/crates/zkstack/src/commands/ecosystem/setup_observability.rs index f20c3c24157e..f35fad77294e 100644 --- a/zkstack_cli/crates/zkstack/src/commands/ecosystem/setup_observability.rs +++ b/zkstack_cli/crates/zkstack/src/commands/ecosystem/setup_observability.rs @@ -20,7 +20,16 @@ pub fn run(shell: &Shell) -> anyhow::Result<()> { ERA_OBSERBAVILITY_GIT_REPO, ERA_OBSERBAVILITY_DIR, )?; - spinner.finish(); + // Add lambda remote and checkout to `eigenda` for DA metrics + git::add_remote( + shell, + path_to_era_observability.clone(), + "lambda", + "https://github.com/lambdaclass/era-observability.git", + )?; + git::checkout(shell, path_to_era_observability.clone(), "eigenda")?; + + spinner.finish(); Ok(()) } From 0046784f058f6ecb8ee477ebb4a44addcc1656c6 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Mon, 11 Nov 2024 12:31:34 -0300 Subject: [PATCH 3/8] fix ambiguous job name & add docker restart command --- .../da_clients/src/eigen/eigenda-integration.md | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/core/node/da_clients/src/eigen/eigenda-integration.md b/core/node/da_clients/src/eigen/eigenda-integration.md index 689915eb363e..b77a1dc41440 100644 --- a/core/node/da_clients/src/eigen/eigenda-integration.md +++ b/core/node/da_clients/src/eigen/eigenda-integration.md @@ -127,10 +127,11 @@ prometheus: Then modify the `era-observability/etc/prometheus/prometheus.yml` with the retrieved port: ```yaml -scrape_interval: 5s -honor_labels: true -static_configs: - - targets: ['host.docker.internal:3312'] # <- change this to the port +- job_name: 'zksync' + scrape_interval: 5s + honor_labels: true + static_configs: + - targets: ['host.docker.internal:3312'] # <- change this to the port ``` 2. Enable the Data Availability Grafana dashboard @@ -141,6 +142,12 @@ mv era-observability/additional_dashboards/EigenDA.json era-observability/dashbo 3. Restart the era-observability container +```bash +docker ps --filter "label=com.docker.compose.project=era-observability" -q | xargs docker restart +``` + +(this can also be done through the docker dashboard) + ### Testing Modify the following flag in `core/lib/config/src/configs/da_dispatcher.rs` (then restart the server) From 3ad3e9cc50da238bf68a852059d653e4fe86f159 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Mon, 11 Nov 2024 12:32:42 -0300 Subject: [PATCH 4/8] fix integration test command --- core/node/da_clients/src/eigen/eigenda-integration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/node/da_clients/src/eigen/eigenda-integration.md b/core/node/da_clients/src/eigen/eigenda-integration.md index b77a1dc41440..b400e7dd0fd2 100644 --- a/core/node/da_clients/src/eigen/eigenda-integration.md +++ b/core/node/da_clients/src/eigen/eigenda-integration.md @@ -160,7 +160,7 @@ And with the server running on one terminal, you can run the server integration following command: ```bash -zkstack dev test --chain eigen_da +zkstack dev test integration --chain eigen_da ``` ## Mainnet/Testnet setup From 8e9212b05d4100751fde10d85df02e0b2ce63c28 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Tue, 12 Nov 2024 09:46:44 -0300 Subject: [PATCH 5/8] update readme, remove fetching lambda repo --- .../src/eigen/eigenda-integration.md | 22 ++++++++++++++++ zkstack_cli/crates/common/src/git.rs | 26 ------------------- .../commands/ecosystem/setup_observability.rs | 11 +------- 3 files changed, 23 insertions(+), 36 deletions(-) diff --git a/core/node/da_clients/src/eigen/eigenda-integration.md b/core/node/da_clients/src/eigen/eigenda-integration.md index b400e7dd0fd2..b797567d646f 100644 --- a/core/node/da_clients/src/eigen/eigenda-integration.md +++ b/core/node/da_clients/src/eigen/eigenda-integration.md @@ -115,6 +115,28 @@ zkstack server --chain eigen_da ### Data Availability Grafana Metrics +#### Temporary setup (until `era-observabilty` changes are also merged) + +1. Setup the observability container at least once so the `era-observability` directory is cloned. + +```bash +zkstack containers --observability true +``` + +2. Add `lambda` remote to the `era-observability` project: + +```bash +cd era-observability && git remote add lambda https://github.com/lambdaclass/era-observability.git +``` + +3. Fetch and checkout the `eigenda` branch: + +```bash +git fetch lambda && git checkout eigenda +``` + +#### Steps + 1. Get the running port of the eigen_da chain in the `chains/eigen_da/configs/general.yaml` file: ```yaml diff --git a/zkstack_cli/crates/common/src/git.rs b/zkstack_cli/crates/common/src/git.rs index aa766c7d8ed4..ea6540c20b29 100644 --- a/zkstack_cli/crates/common/src/git.rs +++ b/zkstack_cli/crates/common/src/git.rs @@ -38,29 +38,3 @@ pub fn pull(shell: &Shell, link_to_code: PathBuf) -> anyhow::Result<()> { Cmd::new(cmd!(shell, "git pull origin {current_branch}")).run()?; Ok(()) } - -pub fn add_remote( - shell: &Shell, - link_to_code: PathBuf, - remote_name: &str, - remote_url: &str, -) -> anyhow::Result<()> { - let _dir_guard = shell.push_dir(link_to_code); - match Cmd::new(cmd!(shell, "git remote add {remote_name} {remote_url}")).run() { - Ok(_) => {} - Err(e) => { - if !e.to_string().contains("already exists") { - return Err(e.into()); - } - } - } - - Cmd::new(cmd!(shell, "git fetch {remote_name}")).run()?; - Ok(()) -} - -pub fn checkout(shell: &Shell, link_to_code: PathBuf, branch: &str) -> anyhow::Result<()> { - let _dir_guard = shell.push_dir(link_to_code); - Cmd::new(cmd!(shell, "git checkout {branch}")).run()?; - Ok(()) -} diff --git a/zkstack_cli/crates/zkstack/src/commands/ecosystem/setup_observability.rs b/zkstack_cli/crates/zkstack/src/commands/ecosystem/setup_observability.rs index f35fad77294e..f20c3c24157e 100644 --- a/zkstack_cli/crates/zkstack/src/commands/ecosystem/setup_observability.rs +++ b/zkstack_cli/crates/zkstack/src/commands/ecosystem/setup_observability.rs @@ -20,16 +20,7 @@ pub fn run(shell: &Shell) -> anyhow::Result<()> { ERA_OBSERBAVILITY_GIT_REPO, ERA_OBSERBAVILITY_DIR, )?; - - // Add lambda remote and checkout to `eigenda` for DA metrics - git::add_remote( - shell, - path_to_era_observability.clone(), - "lambda", - "https://github.com/lambdaclass/era-observability.git", - )?; - git::checkout(shell, path_to_era_observability.clone(), "eigenda")?; - spinner.finish(); + Ok(()) } From 6f6d786dd500b92bef1489221e2e86f5697f3f25 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Tue, 12 Nov 2024 10:05:57 -0300 Subject: [PATCH 6/8] reorganize readme instructions --- .../src/eigen/eigenda-integration.md | 66 +++++++++---------- 1 file changed, 32 insertions(+), 34 deletions(-) diff --git a/core/node/da_clients/src/eigen/eigenda-integration.md b/core/node/da_clients/src/eigen/eigenda-integration.md index b797567d646f..985881d3f6c0 100644 --- a/core/node/da_clients/src/eigen/eigenda-integration.md +++ b/core/node/da_clients/src/eigen/eigenda-integration.md @@ -76,7 +76,27 @@ cargo install --path zkstack_cli/crates/zkstack --force --locked zkstack containers --observability true ``` -3. Create `eigen_da` chain +3. Temporary metrics setup (until `era-observabilty` changes are also merged) + +a. Setup the observability container at least once so the `era-observability` directory is cloned. + +```bash +zkstack containers --observability true +``` + +b. Add `lambda` remote to the `era-observability` project: + +```bash +cd era-observability && git remote add lambda https://github.com/lambdaclass/era-observability.git +``` + +c. Fetch and checkout the `eigenda` branch: + +```bash +git fetch lambda && git checkout eigenda +``` + +4. Create `eigen_da` chain ```bash zkstack chain create \ @@ -91,7 +111,7 @@ zkstack chain create \ --set-as-default false ``` -4. Initialize created ecosystem +5. Initialize created ecosystem ```bash zkstack ecosystem init \ @@ -107,37 +127,9 @@ zkstack ecosystem init \ You may enable observability here if you want to. -5. Start the server - -```bash -zkstack server --chain eigen_da -``` - -### Data Availability Grafana Metrics - -#### Temporary setup (until `era-observabilty` changes are also merged) +6. Setup grafana dashboard for Data Availability -1. Setup the observability container at least once so the `era-observability` directory is cloned. - -```bash -zkstack containers --observability true -``` - -2. Add `lambda` remote to the `era-observability` project: - -```bash -cd era-observability && git remote add lambda https://github.com/lambdaclass/era-observability.git -``` - -3. Fetch and checkout the `eigenda` branch: - -```bash -git fetch lambda && git checkout eigenda -``` - -#### Steps - -1. Get the running port of the eigen_da chain in the `chains/eigen_da/configs/general.yaml` file: +a. Get the running port of the eigen_da chain in the `chains/eigen_da/configs/general.yaml` file: ```yaml prometheus: @@ -156,13 +148,13 @@ Then modify the `era-observability/etc/prometheus/prometheus.yml` with the retri - targets: ['host.docker.internal:3312'] # <- change this to the port ``` -2. Enable the Data Availability Grafana dashboard +b. Enable the Data Availability Grafana dashboard ```bash mv era-observability/additional_dashboards/EigenDA.json era-observability/dashboards/EigenDA.json ``` -3. Restart the era-observability container +c. Restart the era-observability container ```bash docker ps --filter "label=com.docker.compose.project=era-observability" -q | xargs docker restart @@ -170,6 +162,12 @@ docker ps --filter "label=com.docker.compose.project=era-observability" -q | xar (this can also be done through the docker dashboard) +7. Start the server + +```bash +zkstack server --chain eigen_da +``` + ### Testing Modify the following flag in `core/lib/config/src/configs/da_dispatcher.rs` (then restart the server) From 598d6e0a3ed3972930582f6eeef4c69cab9e4c59 Mon Sep 17 00:00:00 2001 From: Gianbelinche <39842759+gianbelinche@users.noreply.github.com> Date: Thu, 14 Nov 2024 14:43:52 -0300 Subject: [PATCH 7/8] Fix concurrent dispatcher (#338) * Add shutdown to dispatch batches * Add JoinSet * Format code * Fix unbounded channel breaking authenticated dispersal * Fix pr comments --- core/node/da_clients/src/eigen/sdk.rs | 29 ++- core/node/da_dispatcher/src/da_dispatcher.rs | 220 +++++++++++-------- 2 files changed, 147 insertions(+), 102 deletions(-) diff --git a/core/node/da_clients/src/eigen/sdk.rs b/core/node/da_clients/src/eigen/sdk.rs index b7ae37260ff3..d4b74f004a5e 100644 --- a/core/node/da_clients/src/eigen/sdk.rs +++ b/core/node/da_clients/src/eigen/sdk.rs @@ -2,7 +2,7 @@ use std::{str::FromStr, time::Duration}; use secp256k1::{ecdsa::RecoverableSignature, SecretKey}; use tokio::{sync::mpsc, time::Instant}; -use tokio_stream::{wrappers::ReceiverStream, StreamExt}; +use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt}; use tonic::{ transport::{Channel, ClientTlsConfig, Endpoint}, Streaming, @@ -37,8 +37,6 @@ pub(crate) struct RawEigenClient { pub(crate) const DATA_CHUNK_SIZE: usize = 32; impl RawEigenClient { - pub(crate) const BUFFER_SIZE: usize = 1000; - pub async fn new(private_key: SecretKey, config: DisperserConfig) -> anyhow::Result { let endpoint = Endpoint::from_str(config.disperser_rpc.as_str())?.tls_config(ClientTlsConfig::new())?; @@ -119,24 +117,25 @@ impl RawEigenClient { async fn dispatch_blob_authenticated(&self, data: Vec) -> anyhow::Result { let mut client_clone = self.client.clone(); - let (tx, rx) = mpsc::channel(Self::BUFFER_SIZE); + let (tx, rx) = mpsc::unbounded_channel(); let disperse_time = Instant::now(); - let response_stream = client_clone.disperse_blob_authenticated(ReceiverStream::new(rx)); + let response_stream = + client_clone.disperse_blob_authenticated(UnboundedReceiverStream::new(rx)); let padded_data = convert_by_padding_empty_byte(&data); // 1. send DisperseBlobRequest - self.disperse_data(padded_data, &tx).await?; + self.disperse_data(padded_data, &tx)?; // this await is blocked until the first response on the stream, so we only await after sending the `DisperseBlobRequest` - let mut response_stream = response_stream.await?.into_inner(); + let mut response_stream = response_stream.await?; + let response_stream = response_stream.get_mut(); // 2. receive BlobAuthHeader - let blob_auth_header = self.receive_blob_auth_header(&mut response_stream).await?; + let blob_auth_header = self.receive_blob_auth_header(response_stream).await?; // 3. sign and send BlobAuthHeader - self.submit_authentication_data(blob_auth_header.clone(), &tx) - .await?; + self.submit_authentication_data(blob_auth_header.clone(), &tx)?; // 4. receive DisperseBlobReply let reply = response_stream @@ -183,10 +182,10 @@ impl RawEigenClient { } } - async fn disperse_data( + fn disperse_data( &self, data: Vec, - tx: &mpsc::Sender, + tx: &mpsc::UnboundedSender, ) -> anyhow::Result<()> { let req = disperser::AuthenticatedRequest { payload: Some(DisperseRequest(disperser::DisperseBlobRequest { @@ -197,14 +196,13 @@ impl RawEigenClient { }; tx.send(req) - .await .map_err(|e| anyhow::anyhow!("Failed to send DisperseBlobRequest: {}", e)) } - async fn submit_authentication_data( + fn submit_authentication_data( &self, blob_auth_header: BlobAuthHeader, - tx: &mpsc::Sender, + tx: &mpsc::UnboundedSender, ) -> anyhow::Result<()> { // TODO: replace challenge_parameter with actual auth header when it is available let digest = zksync_basic_types::web3::keccak256( @@ -228,7 +226,6 @@ impl RawEigenClient { }; tx.send(req) - .await .map_err(|e| anyhow::anyhow!("Failed to send AuthenticationData: {}", e)) } diff --git a/core/node/da_dispatcher/src/da_dispatcher.rs b/core/node/da_dispatcher/src/da_dispatcher.rs index 52d2168c1cf7..f10b54db7dba 100644 --- a/core/node/da_dispatcher/src/da_dispatcher.rs +++ b/core/node/da_dispatcher/src/da_dispatcher.rs @@ -4,7 +4,14 @@ use anyhow::Context; use chrono::Utc; use futures::future::join_all; use rand::Rng; -use tokio::sync::{mpsc, watch::Receiver, Mutex, Notify}; +use tokio::{ + sync::{ + mpsc, + watch::{self, Receiver}, + Mutex, Notify, + }, + task::JoinSet, +}; use zksync_config::{configs::da_dispatcher::DEFAULT_MAX_CONCURRENT_REQUESTS, DADispatcherConfig}; use zksync_da_client::{ types::{DAError, InclusionData}, @@ -68,6 +75,7 @@ impl DataAvailabilityDispatcher { .max_concurrent_requests .unwrap_or(DEFAULT_MAX_CONCURRENT_REQUESTS) as usize, ); + let (shutdown_tx, mut shutdown_rx) = watch::channel(false); let next_expected_batch = Arc::new(Mutex::new(None)); @@ -75,7 +83,9 @@ impl DataAvailabilityDispatcher { let pool_clone = self.pool.clone(); let config_clone = self.config.clone(); let next_expected_batch_clone = next_expected_batch.clone(); - let pending_blobs_reader = tokio::spawn(async move { + let mut dispatcher_tasks = JoinSet::new(); + // This task reads pending blocks from the database + dispatcher_tasks.spawn(async move { // Used to avoid sending the same batch multiple times let mut pending_batches = HashSet::new(); loop { @@ -83,7 +93,6 @@ impl DataAvailabilityDispatcher { tracing::info!("Stop signal received, da_dispatcher is shutting down"); break; } - let mut conn = pool_clone.connection_tagged("da_dispatcher").await?; let batches = conn .data_availability_dal() @@ -96,7 +105,6 @@ impl DataAvailabilityDispatcher { if pending_batches.contains(&batch.l1_batch_number.0) { continue; } - // This should only happen once. // We can't assume that the first batch is always 1 because the dispatcher can be restarted // and resume from a different batch. @@ -120,99 +128,139 @@ impl DataAvailabilityDispatcher { let client = self.client.clone(); let request_semaphore = self.request_semaphore.clone(); let notifier = Arc::new(Notify::new()); - let pending_blobs_sender = tokio::spawn(async move { - let mut spawned_requests = vec![]; + // This task sends blobs to the dispatcher + dispatcher_tasks.spawn(async move { + let mut spawned_requests = JoinSet::new(); let notifier = notifier.clone(); loop { if *stop_receiver.borrow() { break; } - - let batch = match rx.recv().await { - Some(batch) => batch, - None => continue, // Should never happen - }; - - // Block until we can send the request - let permit = request_semaphore.clone().acquire_owned().await?; - - let client = client.clone(); - let pool = pool.clone(); - let config = config.clone(); - let next_expected_batch = next_expected_batch.clone(); - let notifier = notifier.clone(); - let request = tokio::spawn(async move { - let _permit = permit; // move permit into scope - let dispatch_latency = METRICS.blob_dispatch_latency.start(); - let dispatch_response = - retry(config.max_retries(), batch.l1_batch_number, || { - client.dispatch_blob(batch.l1_batch_number.0, batch.pubdata.clone()) - }) - .await - .with_context(|| { - format!( - "failed to dispatch a blob with batch_number: {}, pubdata_len: {}", - batch.l1_batch_number, - batch.pubdata.len() - ) - })?; - let dispatch_latency_duration = dispatch_latency.observe(); - - let sent_at = Utc::now().naive_utc(); - - // Before saving the blob in the database, we need to be sure that we are doing it - // in the correct order. - while next_expected_batch - .lock() - .await - .map_or(true, |next_expected_batch| { - batch.l1_batch_number > next_expected_batch - }) - { - notifier.clone().notified().await; + tokio::select! { + Some(batch) = rx.recv() => { + let permit = request_semaphore.clone().acquire_owned().await?; + let client = client.clone(); + let pool = pool.clone(); + let config = config.clone(); + let next_expected_batch = next_expected_batch.clone(); + let notifier = notifier.clone(); + let shutdown_tx = shutdown_tx.clone(); + let shutdown_rx = shutdown_rx.clone(); + spawned_requests.spawn(async move { + let _permit = permit; // move permit into scope + let dispatch_latency = METRICS.blob_dispatch_latency.start(); + let result = retry(config.max_retries(), batch.l1_batch_number, || { + client.dispatch_blob(batch.l1_batch_number.0, batch.pubdata.clone()) + }) + .await; + if result.is_err() { + shutdown_tx.send(true)?; + notifier.notify_waiters(); + }; + let dispatch_response = + result + .with_context(|| { + format!( + "failed to dispatch a blob with batch_number: {}, pubdata_len: {}", + batch.l1_batch_number, + batch.pubdata.len() + ) + })?; + let dispatch_latency_duration = dispatch_latency.observe(); + + let sent_at = Utc::now().naive_utc(); + + // Before saving the blob in the database, we need to be sure that we are doing it + // in the correct order. + while next_expected_batch + .lock() + .await + .map_or(true, |next_expected_batch| { + batch.l1_batch_number > next_expected_batch + }) + { + if *shutdown_rx.borrow() { + return Err(anyhow::anyhow!("Batch {} failed to disperse: Shutdown signal received", batch.l1_batch_number)); + } + notifier.clone().notified().await; + } + + let mut conn = pool.connection_tagged("da_dispatcher").await?; + conn.data_availability_dal() + .insert_l1_batch_da( + batch.l1_batch_number, + dispatch_response.blob_id.as_str(), + sent_at, + ) + .await?; + drop(conn); + + // Update the next expected batch number + next_expected_batch + .lock() + .await + .replace(batch.l1_batch_number + 1); + notifier.notify_waiters(); + + METRICS + .last_dispatched_l1_batch + .set(batch.l1_batch_number.0 as usize); + METRICS.blob_size.observe(batch.pubdata.len()); + METRICS.blobs_dispatched.inc_by(1); + METRICS.blobs_pending_dispatch.dec_by(1); + tracing::info!( + "Dispatched a DA for batch_number: {}, pubdata_size: {}, dispatch_latency: {dispatch_latency_duration:?}", + batch.l1_batch_number, + batch.pubdata.len(), + ); + Ok::<(), anyhow::Error>(()) + }); + } + // Check for shutdown signal + _ = shutdown_rx.changed() => { + if *shutdown_rx.borrow() { + println!("Shutdown signal received. Exiting message loop."); + break; + } + } + } + } + while let Some(next) = spawned_requests.join_next().await { + match next { + Ok(value) => + match value { + Ok(_) => (), + Err(err) => { + spawned_requests.shutdown().await; + return Err(err.into()); + } + } + , + Err(err) => { + spawned_requests.shutdown().await; + return Err(err.into()); } - - let mut conn = pool.connection_tagged("da_dispatcher").await?; - conn.data_availability_dal() - .insert_l1_batch_da( - batch.l1_batch_number, - dispatch_response.blob_id.as_str(), - sent_at, - ) - .await?; - drop(conn); - - // Update the next expected batch number - next_expected_batch - .lock() - .await - .replace(batch.l1_batch_number + 1); - notifier.notify_waiters(); - - METRICS - .last_dispatched_l1_batch - .set(batch.l1_batch_number.0 as usize); - METRICS.blob_size.observe(batch.pubdata.len()); - METRICS.blobs_dispatched.inc_by(1); - METRICS.blobs_pending_dispatch.dec_by(1); - tracing::info!( - "Dispatched a DA for batch_number: {}, pubdata_size: {}, dispatch_latency: {dispatch_latency_duration:?}", - batch.l1_batch_number, - batch.pubdata.len(), - ); - - Ok::<(), anyhow::Error>(()) - }); - spawned_requests.push(request); + } } - join_all(spawned_requests).await; Ok::<(), anyhow::Error>(()) }); - let results = join_all(vec![pending_blobs_reader, pending_blobs_sender]).await; - for result in results { - result??; + while let Some(next) = dispatcher_tasks.join_next().await { + match next { + Ok(value) => match value { + Ok(_) => (), + Err(err) => { + dispatcher_tasks.shutdown().await; + return Err(err.into()); + } + }, + Err(err) => { + dispatcher_tasks.shutdown().await; + return Err(err.into()); + } + } } + Ok(()) } From 54edd50b7d61377fa022815a3a4bbf0e527f14ba Mon Sep 17 00:00:00 2001 From: juan518munoz <62400508+juan518munoz@users.noreply.github.com> Date: Tue, 19 Nov 2024 17:28:02 -0300 Subject: [PATCH 8/8] feat(eigen-client-m0-implementation): optimize concurrent dispatcher (#345) * initial commit * optimize dispatch_batches fn * remove commented code * remove needless variables * optimize inclusion_poller fn * break loop if dispatch fail * remove client_lock variable * switch to retriable err * replace arbitrary value with config --- core/node/da_clients/src/eigen/client.rs | 4 +- core/node/da_clients/src/eigen/sdk.rs | 56 ++- core/node/da_dispatcher/src/da_dispatcher.rs | 412 ++++++++----------- 3 files changed, 211 insertions(+), 261 deletions(-) diff --git a/core/node/da_clients/src/eigen/client.rs b/core/node/da_clients/src/eigen/client.rs index 44ca7355ad47..3e74840a403a 100644 --- a/core/node/da_clients/src/eigen/client.rs +++ b/core/node/da_clients/src/eigen/client.rs @@ -11,7 +11,7 @@ use zksync_da_client::{ }; use super::{blob_info::BlobInfo, memstore::MemStore, sdk::RawEigenClient, Disperser}; -use crate::utils::to_non_retriable_da_error; +use crate::utils::{to_non_retriable_da_error, to_retriable_da_error}; /// EigenClient is a client for the Eigen DA service. /// It can be configured to use one of two dispersal methods: @@ -58,7 +58,7 @@ impl DataAvailabilityClient for EigenClient { Disperser::Remote(remote_disperser) => remote_disperser .dispatch_blob(data) .await - .map_err(to_non_retriable_da_error)?, + .map_err(to_retriable_da_error)?, Disperser::Memory(memstore) => memstore .clone() .put_blob(data) diff --git a/core/node/da_clients/src/eigen/sdk.rs b/core/node/da_clients/src/eigen/sdk.rs index d4b74f004a5e..bc43dfa9090e 100644 --- a/core/node/da_clients/src/eigen/sdk.rs +++ b/core/node/da_clients/src/eigen/sdk.rs @@ -1,7 +1,10 @@ -use std::{str::FromStr, time::Duration}; +use std::{str::FromStr, sync::Arc, time::Duration}; use secp256k1::{ecdsa::RecoverableSignature, SecretKey}; -use tokio::{sync::mpsc, time::Instant}; +use tokio::{ + sync::{mpsc, Mutex}, + time::Instant, +}; use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt}; use tonic::{ transport::{Channel, ClientTlsConfig, Endpoint}, @@ -28,7 +31,7 @@ use crate::eigen::{ #[derive(Debug, Clone)] pub(crate) struct RawEigenClient { - client: DisperserClient, + client: Arc>>, private_key: SecretKey, pub config: DisperserConfig, verifier: Verifier, @@ -40,9 +43,11 @@ impl RawEigenClient { pub async fn new(private_key: SecretKey, config: DisperserConfig) -> anyhow::Result { let endpoint = Endpoint::from_str(config.disperser_rpc.as_str())?.tls_config(ClientTlsConfig::new())?; - let client = DisperserClient::connect(endpoint) - .await - .map_err(|e| anyhow::anyhow!("Failed to connect to Disperser server: {}", e))?; + let client = Arc::new(Mutex::new( + DisperserClient::connect(endpoint) + .await + .map_err(|e| anyhow::anyhow!("Failed to connect to Disperser server: {}", e))?, + )); let verifier_config = VerifierConfig { verify_certs: true, @@ -70,13 +75,16 @@ impl RawEigenClient { account_id: String::default(), // Account Id is not used in non-authenticated mode }; - let mut client_clone = self.client.clone(); - let disperse_reply = client_clone.disperse_blob(request).await?.into_inner(); + let disperse_reply = self + .client + .lock() + .await + .disperse_blob(request) + .await? + .into_inner(); let disperse_time = Instant::now(); - let blob_info = self - .await_for_inclusion(client_clone, disperse_reply) - .await?; + let blob_info = self.await_for_inclusion(disperse_reply).await?; let disperse_elapsed = Instant::now() - disperse_time; let blob_info = blob_info::BlobInfo::try_from(blob_info) @@ -116,19 +124,22 @@ impl RawEigenClient { } async fn dispatch_blob_authenticated(&self, data: Vec) -> anyhow::Result { - let mut client_clone = self.client.clone(); let (tx, rx) = mpsc::unbounded_channel(); let disperse_time = Instant::now(); - let response_stream = - client_clone.disperse_blob_authenticated(UnboundedReceiverStream::new(rx)); - let padded_data = convert_by_padding_empty_byte(&data); // 1. send DisperseBlobRequest + let padded_data = convert_by_padding_empty_byte(&data); self.disperse_data(padded_data, &tx)?; // this await is blocked until the first response on the stream, so we only await after sending the `DisperseBlobRequest` - let mut response_stream = response_stream.await?; + let mut response_stream = self + .client + .clone() + .lock() + .await + .disperse_blob_authenticated(UnboundedReceiverStream::new(rx)) + .await?; let response_stream = response_stream.get_mut(); // 2. receive BlobAuthHeader @@ -151,9 +162,7 @@ impl RawEigenClient { }; // 5. poll for blob status until it reaches the Confirmed state - let blob_info = self - .await_for_inclusion(client_clone, disperse_reply) - .await?; + let blob_info = self.await_for_inclusion(disperse_reply).await?; let blob_info = blob_info::BlobInfo::try_from(blob_info) .map_err(|e| anyhow::anyhow!("Failed to convert blob info: {}", e))?; @@ -255,7 +264,6 @@ impl RawEigenClient { async fn await_for_inclusion( &self, - mut client: DisperserClient, disperse_blob_reply: DisperseBlobReply, ) -> anyhow::Result { let polling_request = disperser::BlobStatusRequest { @@ -266,7 +274,10 @@ impl RawEigenClient { while Instant::now() - start_time < Duration::from_millis(self.config.status_query_timeout) { tokio::time::sleep(Duration::from_millis(self.config.status_query_interval)).await; - let resp = client + let resp = self + .client + .lock() + .await .get_blob_status(polling_request.clone()) .await? .into_inner(); @@ -323,7 +334,8 @@ impl RawEigenClient { .batch_header_hash; let get_response = self .client - .clone() + .lock() + .await .retrieve_blob(disperser::RetrieveBlobRequest { batch_header_hash, blob_index, diff --git a/core/node/da_dispatcher/src/da_dispatcher.rs b/core/node/da_dispatcher/src/da_dispatcher.rs index f10b54db7dba..3279313548e1 100644 --- a/core/node/da_dispatcher/src/da_dispatcher.rs +++ b/core/node/da_dispatcher/src/da_dispatcher.rs @@ -2,17 +2,18 @@ use std::{collections::HashSet, future::Future, sync::Arc, time::Duration}; use anyhow::Context; use chrono::Utc; -use futures::future::join_all; use rand::Rng; use tokio::{ sync::{ - mpsc, watch::{self, Receiver}, Mutex, Notify, }, task::JoinSet, }; -use zksync_config::{configs::da_dispatcher::DEFAULT_MAX_CONCURRENT_REQUESTS, DADispatcherConfig}; +use zksync_config::{ + configs::da_dispatcher::{DEFAULT_MAX_CONCURRENT_REQUESTS, DEFAULT_POLLING_INTERVAL_MS}, + DADispatcherConfig, +}; use zksync_da_client::{ types::{DAError, InclusionData}, DataAvailabilityClient, @@ -70,180 +71,137 @@ impl DataAvailabilityDispatcher { } async fn dispatch_batches(&self, stop_receiver: Receiver) -> anyhow::Result<()> { - let (tx, mut rx) = mpsc::channel( - self.config - .max_concurrent_requests - .unwrap_or(DEFAULT_MAX_CONCURRENT_REQUESTS) as usize, - ); - let (shutdown_tx, mut shutdown_rx) = watch::channel(false); - let next_expected_batch = Arc::new(Mutex::new(None)); + let mut pending_batches = HashSet::new(); + let mut dispatcher_tasks: JoinSet> = JoinSet::new(); - let stop_receiver_clone = stop_receiver.clone(); - let pool_clone = self.pool.clone(); - let config_clone = self.config.clone(); - let next_expected_batch_clone = next_expected_batch.clone(); - let mut dispatcher_tasks = JoinSet::new(); - // This task reads pending blocks from the database - dispatcher_tasks.spawn(async move { - // Used to avoid sending the same batch multiple times - let mut pending_batches = HashSet::new(); - loop { - if *stop_receiver_clone.borrow() { - tracing::info!("Stop signal received, da_dispatcher is shutting down"); - break; - } - let mut conn = pool_clone.connection_tagged("da_dispatcher").await?; - let batches = conn - .data_availability_dal() - .get_ready_for_da_dispatch_l1_batches( - config_clone.max_rows_to_dispatch() as usize - ) - .await?; - drop(conn); - for batch in batches { - if pending_batches.contains(&batch.l1_batch_number.0) { - continue; - } - // This should only happen once. - // We can't assume that the first batch is always 1 because the dispatcher can be restarted - // and resume from a different batch. - let mut next_expected_batch_lock = next_expected_batch_clone.lock().await; - if next_expected_batch_lock.is_none() { - next_expected_batch_lock.replace(batch.l1_batch_number); - } - - pending_batches.insert(batch.l1_batch_number.0); - METRICS.blobs_pending_dispatch.inc_by(1); - tx.send(batch).await?; - } - - tokio::time::sleep(Duration::from_secs(5)).await; + let (shutdown_tx, shutdown_rx) = watch::channel(false); + let notifier = Arc::new(Notify::new()); + loop { + if *stop_receiver.clone().borrow() { + tracing::info!("Stop signal received, da_dispatcher is shutting down"); + break; + } + if *shutdown_rx.borrow() { + tracing::error!("A blob dispatch failed, da_dispatcher is shutting down"); + break; } - Ok::<(), anyhow::Error>(()) - }); - let pool = self.pool.clone(); - let config = self.config.clone(); - let client = self.client.clone(); - let request_semaphore = self.request_semaphore.clone(); - let notifier = Arc::new(Notify::new()); - // This task sends blobs to the dispatcher - dispatcher_tasks.spawn(async move { - let mut spawned_requests = JoinSet::new(); - let notifier = notifier.clone(); - loop { - if *stop_receiver.borrow() { - break; + let mut conn = self.pool.connection_tagged("da_dispatcher").await?; + let batches = conn + .data_availability_dal() + .get_ready_for_da_dispatch_l1_batches(self.config.max_rows_to_dispatch() as usize) + .await?; + drop(conn); + let shutdown_tx = shutdown_tx.clone(); + for batch in batches { + if pending_batches.contains(&batch.l1_batch_number.0) { + continue; } - tokio::select! { - Some(batch) = rx.recv() => { - let permit = request_semaphore.clone().acquire_owned().await?; - let client = client.clone(); - let pool = pool.clone(); - let config = config.clone(); - let next_expected_batch = next_expected_batch.clone(); - let notifier = notifier.clone(); - let shutdown_tx = shutdown_tx.clone(); - let shutdown_rx = shutdown_rx.clone(); - spawned_requests.spawn(async move { - let _permit = permit; // move permit into scope - let dispatch_latency = METRICS.blob_dispatch_latency.start(); - let result = retry(config.max_retries(), batch.l1_batch_number, || { - client.dispatch_blob(batch.l1_batch_number.0, batch.pubdata.clone()) - }) - .await; - if result.is_err() { - shutdown_tx.send(true)?; - notifier.notify_waiters(); - }; - let dispatch_response = - result - .with_context(|| { - format!( - "failed to dispatch a blob with batch_number: {}, pubdata_len: {}", - batch.l1_batch_number, - batch.pubdata.len() - ) - })?; - let dispatch_latency_duration = dispatch_latency.observe(); - - let sent_at = Utc::now().naive_utc(); - - // Before saving the blob in the database, we need to be sure that we are doing it - // in the correct order. - while next_expected_batch - .lock() - .await - .map_or(true, |next_expected_batch| { - batch.l1_batch_number > next_expected_batch - }) - { - if *shutdown_rx.borrow() { - return Err(anyhow::anyhow!("Batch {} failed to disperse: Shutdown signal received", batch.l1_batch_number)); - } - notifier.clone().notified().await; - } - - let mut conn = pool.connection_tagged("da_dispatcher").await?; - conn.data_availability_dal() - .insert_l1_batch_da( - batch.l1_batch_number, - dispatch_response.blob_id.as_str(), - sent_at, - ) - .await?; - drop(conn); - - // Update the next expected batch number - next_expected_batch - .lock() - .await - .replace(batch.l1_batch_number + 1); - notifier.notify_waiters(); - - METRICS - .last_dispatched_l1_batch - .set(batch.l1_batch_number.0 as usize); - METRICS.blob_size.observe(batch.pubdata.len()); - METRICS.blobs_dispatched.inc_by(1); - METRICS.blobs_pending_dispatch.dec_by(1); - tracing::info!( - "Dispatched a DA for batch_number: {}, pubdata_size: {}, dispatch_latency: {dispatch_latency_duration:?}", - batch.l1_batch_number, - batch.pubdata.len(), - ); - Ok::<(), anyhow::Error>(()) - }); - } - // Check for shutdown signal - _ = shutdown_rx.changed() => { - if *shutdown_rx.borrow() { - println!("Shutdown signal received. Exiting message loop."); - break; - } - } + + // This should only happen once. + // We can't assume that the first batch is always 1 because the dispatcher can be restarted + // and resume from a different batch. + let mut next_expected_batch_lock = next_expected_batch.lock().await; + if next_expected_batch_lock.is_none() { + next_expected_batch_lock.replace(batch.l1_batch_number); } - } - while let Some(next) = spawned_requests.join_next().await { - match next { - Ok(value) => - match value { - Ok(_) => (), - Err(err) => { - spawned_requests.shutdown().await; - return Err(err.into()); - } + drop(next_expected_batch_lock); + + pending_batches.insert(batch.l1_batch_number.0); + METRICS.blobs_pending_dispatch.inc_by(1); + + let request_semaphore = self.request_semaphore.clone(); + let client = self.client.clone(); + let config = self.config.clone(); + let notifier = notifier.clone(); + let shutdown_rx = shutdown_rx.clone(); + let shutdown_tx = shutdown_tx.clone(); + let next_expected_batch = next_expected_batch.clone(); + let pool = self.pool.clone(); + dispatcher_tasks.spawn(async move { + let permit = request_semaphore.clone().acquire_owned().await?; + let dispatch_latency = METRICS.blob_dispatch_latency.start(); + + let result = retry(config.max_retries(), batch.l1_batch_number, || { + client.dispatch_blob(batch.l1_batch_number.0, batch.pubdata.clone()) + }) + .await; + drop(permit); + if result.is_err() { + shutdown_tx.clone().send(true)?; + notifier.notify_waiters(); + }; + + let dispatch_response = result.with_context(|| { + format!( + "failed to dispatch a blob with batch_number: {}, pubdata_len: {}", + batch.l1_batch_number, + batch.pubdata.len() + ) + })?; + let dispatch_latency_duration = dispatch_latency.observe(); + + let sent_at = Utc::now().naive_utc(); + + // Before saving the blob in the database, we need to be sure that we are doing it + // in the correct order. + while next_expected_batch + .lock() + .await + .map_or(true, |next_expected_batch| { + batch.l1_batch_number > next_expected_batch + }) + { + if *shutdown_rx.clone().borrow() { + return Err(anyhow::anyhow!( + "Batch {} failed to disperse: Shutdown signal received", + batch.l1_batch_number + )); } - , - Err(err) => { - spawned_requests.shutdown().await; - return Err(err.into()); + notifier.clone().notified().await; } - } + + let mut conn = pool.connection_tagged("da_dispatcher").await?; + conn.data_availability_dal() + .insert_l1_batch_da( + batch.l1_batch_number, + dispatch_response.blob_id.as_str(), + sent_at, + ) + .await?; + drop(conn); + + // Update the next expected batch number + next_expected_batch + .lock() + .await + .replace(batch.l1_batch_number + 1); + notifier.notify_waiters(); + + METRICS + .last_dispatched_l1_batch + .set(batch.l1_batch_number.0 as usize); + METRICS.blob_size.observe(batch.pubdata.len()); + METRICS.blobs_dispatched.inc_by(1); + METRICS.blobs_pending_dispatch.dec_by(1); + tracing::info!( + "Dispatched a DA for batch_number: {}, pubdata_size: {}, dispatch_latency: {dispatch_latency_duration:?}", + batch.l1_batch_number, + batch.pubdata.len(), + ); + Ok(()) + }); } - Ok::<(), anyhow::Error>(()) - }); + + // Sleep so we prevent hammering the database + tokio::time::sleep(Duration::from_secs( + self.config + .polling_interval_ms + .unwrap_or(DEFAULT_POLLING_INTERVAL_MS) as u64, + )) + .await; + } while let Some(next) = dispatcher_tasks.join_next().await { match next { @@ -251,7 +209,7 @@ impl DataAvailabilityDispatcher { Ok(_) => (), Err(err) => { dispatcher_tasks.shutdown().await; - return Err(err.into()); + return Err(err); } }, Err(err) => { @@ -265,64 +223,34 @@ impl DataAvailabilityDispatcher { } async fn inclusion_poller(&self, stop_receiver: Receiver) -> anyhow::Result<()> { - let (tx, mut rx) = mpsc::channel( - self.config - .max_concurrent_requests - .unwrap_or(DEFAULT_MAX_CONCURRENT_REQUESTS) as usize, - ); + let mut pending_inclusions = HashSet::new(); + let mut inclusion_tasks: JoinSet> = JoinSet::new(); - let stop_receiver_clone = stop_receiver.clone(); - let pool_clone = self.pool.clone(); - let pending_inclusion_reader = tokio::spawn(async move { - let mut pending_inclusions = HashSet::new(); - loop { - if *stop_receiver_clone.borrow() { - break; - } + loop { + if *stop_receiver.borrow() { + break; + } - let mut conn = pool_clone.connection_tagged("da_dispatcher").await?; - let pending_blobs = conn - .data_availability_dal() - .get_da_blob_ids_awaiting_inclusion() - .await?; - drop(conn); + let mut conn = self.pool.connection_tagged("da_dispatcher").await?; + let pending_blobs = conn + .data_availability_dal() + .get_da_blob_ids_awaiting_inclusion() + .await?; + drop(conn); - for blob_info in pending_blobs.into_iter().flatten() { - if pending_inclusions.contains(&blob_info.blob_id) { - continue; - } - pending_inclusions.insert(blob_info.blob_id.clone()); - tx.send(blob_info).await?; - } - tokio::time::sleep(Duration::from_secs(5)).await; - } - Ok::<(), anyhow::Error>(()) - }); - - let pool = self.pool.clone(); - let config = self.config.clone(); - let client = self.client.clone(); - let semaphore = self.request_semaphore.clone(); - let pending_inclusion_sender = tokio::spawn(async move { - let mut spawned_requests = vec![]; - loop { - if *stop_receiver.borrow() { - break; + for blob_info in pending_blobs.into_iter().flatten() { + if pending_inclusions.contains(&blob_info.blob_id) { + continue; } - let blob_info = match rx.recv().await { - Some(blob_info) => blob_info, - None => continue, // Should never happen - }; - - // Block until we can send the request - let permit = semaphore.clone().acquire_owned().await?; - - let client = client.clone(); - let pool = pool.clone(); - let config = config.clone(); - let request = tokio::spawn(async move { - let _permit = permit; // move permit into scope + pending_inclusions.insert(blob_info.blob_id.clone()); + + let client = self.client.clone(); + let config = self.config.clone(); + let pool = self.pool.clone(); + let request_semaphore = self.request_semaphore.clone(); + inclusion_tasks.spawn(async move { let inclusion_data = if config.use_dummy_inclusion_data() { + let _permit = request_semaphore.acquire_owned().await?; client .get_inclusion_data(blob_info.blob_id.as_str()) .await @@ -332,11 +260,11 @@ impl DataAvailabilityDispatcher { blob_info.blob_id, blob_info.l1_batch_number ) })? - } else { - // if the inclusion verification is disabled, we don't need to wait for the inclusion - // data before committing the batch, so simply return an empty vector - Some(InclusionData { data: vec![] }) - }; + } else { + // if the inclusion verification is disabled, we don't need to wait for the inclusion + // data before committing the batch, so simply return an empty vector + Some(InclusionData { data: vec![] }) + }; let Some(inclusion_data) = inclusion_data else { return Ok(()); @@ -348,7 +276,7 @@ impl DataAvailabilityDispatcher { L1BatchNumber(blob_info.l1_batch_number.0), inclusion_data.data.as_slice(), ) - .await?; + .await?; drop(conn); let inclusion_latency = Utc::now().signed_duration_since(blob_info.sent_at); @@ -365,19 +293,29 @@ impl DataAvailabilityDispatcher { blob_info.l1_batch_number, inclusion_latency.num_seconds() ); - - Ok::<(), anyhow::Error>(()) + Ok(()) }); - spawned_requests.push(request); } - join_all(spawned_requests).await; - Ok::<(), anyhow::Error>(()) - }); - let results = join_all(vec![pending_inclusion_reader, pending_inclusion_sender]).await; - for result in results { - result??; + // Sleep so we prevent hammering the database + tokio::time::sleep(Duration::from_secs( + self.config + .polling_interval_ms + .unwrap_or(DEFAULT_POLLING_INTERVAL_MS) as u64, + )) + .await; } + + while let Some(next) = inclusion_tasks.join_next().await { + match next { + Ok(_) => (), + Err(e) => { + inclusion_tasks.shutdown().await; + return Err(e.into()); + } + } + } + Ok(()) } }