From 64d0a38095eb6076fb16759d8fc3d8457397dbf2 Mon Sep 17 00:00:00 2001 From: juan518munoz <62400508+juan518munoz@users.noreply.github.com> Date: Tue, 12 Nov 2024 10:20:12 -0300 Subject: [PATCH] feat(eigen-client-m0-implementation): concurrent da dispatcher (#333) * initial commit * impl TODO query for concurrent dispatcher --- core/lib/config/src/configs/da_dispatcher.rs | 4 + core/lib/config/src/testonly.rs | 1 + ...76ec553688e69dfd330ca408505b8b2cdee5e.json | 38 ++ core/lib/dal/src/data_availability_dal.rs | 39 ++ core/lib/env_config/src/da_dispatcher.rs | 5 +- core/lib/protobuf_config/src/da_dispatcher.rs | 2 + .../src/proto/config/da_dispatcher.proto | 1 + core/node/da_dispatcher/src/da_dispatcher.rs | 417 ++++++++++++------ core/node/da_dispatcher/src/metrics.rs | 6 + 9 files changed, 379 insertions(+), 134 deletions(-) create mode 100644 core/lib/dal/.sqlx/query-0fdfa0f31142899f3d5f808688d76ec553688e69dfd330ca408505b8b2cdee5e.json diff --git a/core/lib/config/src/configs/da_dispatcher.rs b/core/lib/config/src/configs/da_dispatcher.rs index e9ad6bd3c074..c8bf1b3b8995 100644 --- a/core/lib/config/src/configs/da_dispatcher.rs +++ b/core/lib/config/src/configs/da_dispatcher.rs @@ -6,6 +6,7 @@ pub const DEFAULT_POLLING_INTERVAL_MS: u32 = 5000; pub const DEFAULT_MAX_ROWS_TO_DISPATCH: u32 = 100; pub const DEFAULT_MAX_RETRIES: u16 = 5; pub const DEFAULT_USE_DUMMY_INCLUSION_DATA: bool = false; +pub const DEFAULT_MAX_CONCURRENT_REQUESTS: u32 = 100; #[derive(Debug, Clone, PartialEq, Deserialize)] pub struct DADispatcherConfig { @@ -19,6 +20,8 @@ pub struct DADispatcherConfig { // TODO: run a verification task to check if the L1 contract expects the inclusion proofs to // avoid the scenario where contracts expect real proofs, and server is using dummy proofs. pub use_dummy_inclusion_data: Option, + /// The maximun number of concurrent request to send to the DA server. + pub max_concurrent_requests: Option, } impl DADispatcherConfig { @@ -28,6 +31,7 @@ impl DADispatcherConfig { max_rows_to_dispatch: Some(DEFAULT_MAX_ROWS_TO_DISPATCH), max_retries: Some(DEFAULT_MAX_RETRIES), use_dummy_inclusion_data: Some(DEFAULT_USE_DUMMY_INCLUSION_DATA), + max_concurrent_requests: Some(DEFAULT_MAX_CONCURRENT_REQUESTS), } } diff --git a/core/lib/config/src/testonly.rs b/core/lib/config/src/testonly.rs index 93d502cc4e8a..58cf4015cf0b 100644 --- a/core/lib/config/src/testonly.rs +++ b/core/lib/config/src/testonly.rs @@ -972,6 +972,7 @@ impl Distribution for EncodeDist { max_rows_to_dispatch: self.sample(rng), max_retries: self.sample(rng), use_dummy_inclusion_data: self.sample(rng), + max_concurrent_requests: self.sample(rng), } } } diff --git a/core/lib/dal/.sqlx/query-0fdfa0f31142899f3d5f808688d76ec553688e69dfd330ca408505b8b2cdee5e.json b/core/lib/dal/.sqlx/query-0fdfa0f31142899f3d5f808688d76ec553688e69dfd330ca408505b8b2cdee5e.json new file mode 100644 index 000000000000..355f9993264f --- /dev/null +++ b/core/lib/dal/.sqlx/query-0fdfa0f31142899f3d5f808688d76ec553688e69dfd330ca408505b8b2cdee5e.json @@ -0,0 +1,38 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n l1_batch_number,\n blob_id,\n inclusion_data,\n sent_at\n FROM\n data_availability\n WHERE\n inclusion_data IS NULL\n ORDER BY\n l1_batch_number\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "l1_batch_number", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "blob_id", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "inclusion_data", + "type_info": "Bytea" + }, + { + "ordinal": 3, + "name": "sent_at", + "type_info": "Timestamp" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false, + true, + false + ] + }, + "hash": "0fdfa0f31142899f3d5f808688d76ec553688e69dfd330ca408505b8b2cdee5e" +} diff --git a/core/lib/dal/src/data_availability_dal.rs b/core/lib/dal/src/data_availability_dal.rs index 41dd7efe2732..c427216425b3 100644 --- a/core/lib/dal/src/data_availability_dal.rs +++ b/core/lib/dal/src/data_availability_dal.rs @@ -175,6 +175,45 @@ impl DataAvailabilityDal<'_, '_> { .map(DataAvailabilityBlob::from)) } + pub async fn get_da_blob_ids_awaiting_inclusion( + &mut self, + ) -> DalResult>> { + let rows = sqlx::query!( + r#" + SELECT + l1_batch_number, + blob_id, + inclusion_data, + sent_at + FROM + data_availability + WHERE + inclusion_data IS NULL + ORDER BY + l1_batch_number + "#, + ) + .instrument("get_da_blobs_awaiting_inclusion") + .fetch_all(self.storage) + .await?; + + Ok(rows + .into_iter() + .map(|row| { + let l1_batch_number_u32 = row.l1_batch_number.try_into(); + if let Ok(l1_batch_number) = l1_batch_number_u32 { + Some(DataAvailabilityBlob { + l1_batch_number: L1BatchNumber(l1_batch_number), + blob_id: row.blob_id, + inclusion_data: row.inclusion_data, + sent_at: row.sent_at.and_utc(), + }) + } else { + None + } + }) + .collect()) + } /// Fetches the pubdata and `l1_batch_number` for the L1 batches that are ready for DA dispatch. pub async fn get_ready_for_da_dispatch_l1_batches( &mut self, diff --git a/core/lib/env_config/src/da_dispatcher.rs b/core/lib/env_config/src/da_dispatcher.rs index 246752db91ac..805e6b2234b5 100644 --- a/core/lib/env_config/src/da_dispatcher.rs +++ b/core/lib/env_config/src/da_dispatcher.rs @@ -21,12 +21,14 @@ mod tests { interval: u32, rows_limit: u32, max_retries: u16, + max_concurrent_requests: u32, ) -> DADispatcherConfig { DADispatcherConfig { polling_interval_ms: Some(interval), max_rows_to_dispatch: Some(rows_limit), max_retries: Some(max_retries), use_dummy_inclusion_data: Some(true), + max_concurrent_requests: Some(max_concurrent_requests), } } @@ -38,9 +40,10 @@ mod tests { DA_DISPATCHER_MAX_ROWS_TO_DISPATCH=60 DA_DISPATCHER_MAX_RETRIES=7 DA_DISPATCHER_USE_DUMMY_INCLUSION_DATA="true" + DA_DISPATCHER_MAX_CONCURRENT_REQUESTS=10 "#; lock.set_env(config); let actual = DADispatcherConfig::from_env().unwrap(); - assert_eq!(actual, expected_da_layer_config(5000, 60, 7)); + assert_eq!(actual, expected_da_layer_config(5000, 60, 7, 10)); } } diff --git a/core/lib/protobuf_config/src/da_dispatcher.rs b/core/lib/protobuf_config/src/da_dispatcher.rs index d77073bd32cf..e85ff5ae76ed 100644 --- a/core/lib/protobuf_config/src/da_dispatcher.rs +++ b/core/lib/protobuf_config/src/da_dispatcher.rs @@ -12,6 +12,7 @@ impl ProtoRepr for proto::DataAvailabilityDispatcher { max_rows_to_dispatch: self.max_rows_to_dispatch, max_retries: self.max_retries.map(|x| x as u16), use_dummy_inclusion_data: self.use_dummy_inclusion_data, + max_concurrent_requests: self.max_concurrent_requests, }) } @@ -21,6 +22,7 @@ impl ProtoRepr for proto::DataAvailabilityDispatcher { max_rows_to_dispatch: this.max_rows_to_dispatch, max_retries: this.max_retries.map(Into::into), use_dummy_inclusion_data: this.use_dummy_inclusion_data, + max_concurrent_requests: this.max_concurrent_requests, } } } diff --git a/core/lib/protobuf_config/src/proto/config/da_dispatcher.proto b/core/lib/protobuf_config/src/proto/config/da_dispatcher.proto index dd366bd5b925..d6329d14b281 100644 --- a/core/lib/protobuf_config/src/proto/config/da_dispatcher.proto +++ b/core/lib/protobuf_config/src/proto/config/da_dispatcher.proto @@ -7,4 +7,5 @@ message DataAvailabilityDispatcher { optional uint32 max_rows_to_dispatch = 2; optional uint32 max_retries = 3; optional bool use_dummy_inclusion_data = 4; + optional uint32 max_concurrent_requests = 5; } diff --git a/core/node/da_dispatcher/src/da_dispatcher.rs b/core/node/da_dispatcher/src/da_dispatcher.rs index f8e6f6b31723..52d2168c1cf7 100644 --- a/core/node/da_dispatcher/src/da_dispatcher.rs +++ b/core/node/da_dispatcher/src/da_dispatcher.rs @@ -1,10 +1,11 @@ -use std::{future::Future, time::Duration}; +use std::{collections::HashSet, future::Future, sync::Arc, time::Duration}; use anyhow::Context; use chrono::Utc; +use futures::future::join_all; use rand::Rng; -use tokio::sync::watch::Receiver; -use zksync_config::DADispatcherConfig; +use tokio::sync::{mpsc, watch::Receiver, Mutex, Notify}; +use zksync_config::{configs::da_dispatcher::DEFAULT_MAX_CONCURRENT_REQUESTS, DADispatcherConfig}; use zksync_da_client::{ types::{DAError, InclusionData}, DataAvailabilityClient, @@ -19,6 +20,7 @@ pub struct DataAvailabilityDispatcher { client: Box, pool: ConnectionPool, config: DADispatcherConfig, + request_semaphore: Arc, } impl DataAvailabilityDispatcher { @@ -27,158 +29,307 @@ impl DataAvailabilityDispatcher { config: DADispatcherConfig, client: Box, ) -> Self { + let request_semaphore = Arc::new(tokio::sync::Semaphore::new( + config + .max_concurrent_requests + .unwrap_or(DEFAULT_MAX_CONCURRENT_REQUESTS) as usize, + )); Self { pool, config, client, + request_semaphore, } } - pub async fn run(self, mut stop_receiver: Receiver) -> anyhow::Result<()> { - loop { - if *stop_receiver.borrow() { - break; - } + pub async fn run(self, stop_receiver: Receiver) -> anyhow::Result<()> { + let subtasks = futures::future::join( + async { + if let Err(err) = self.dispatch_batches(stop_receiver.clone()).await { + tracing::error!("dispatch error {err:?}"); + } + }, + async { + if let Err(err) = self.inclusion_poller(stop_receiver.clone()).await { + tracing::error!("poll_for_inclusion error {err:?}"); + } + }, + ); + + tokio::select! { + _ = subtasks => {}, + } + Ok(()) + } - let subtasks = futures::future::join( - async { - if let Err(err) = self.dispatch().await { - tracing::error!("dispatch error {err:?}"); + async fn dispatch_batches(&self, stop_receiver: Receiver) -> anyhow::Result<()> { + let (tx, mut rx) = mpsc::channel( + self.config + .max_concurrent_requests + .unwrap_or(DEFAULT_MAX_CONCURRENT_REQUESTS) as usize, + ); + + let next_expected_batch = Arc::new(Mutex::new(None)); + + let stop_receiver_clone = stop_receiver.clone(); + let pool_clone = self.pool.clone(); + let config_clone = self.config.clone(); + let next_expected_batch_clone = next_expected_batch.clone(); + let pending_blobs_reader = tokio::spawn(async move { + // Used to avoid sending the same batch multiple times + let mut pending_batches = HashSet::new(); + loop { + if *stop_receiver_clone.borrow() { + tracing::info!("Stop signal received, da_dispatcher is shutting down"); + break; + } + + let mut conn = pool_clone.connection_tagged("da_dispatcher").await?; + let batches = conn + .data_availability_dal() + .get_ready_for_da_dispatch_l1_batches( + config_clone.max_rows_to_dispatch() as usize + ) + .await?; + drop(conn); + for batch in batches { + if pending_batches.contains(&batch.l1_batch_number.0) { + continue; } - }, - async { - if let Err(err) = self.poll_for_inclusion().await { - tracing::error!("poll_for_inclusion error {err:?}"); + + // This should only happen once. + // We can't assume that the first batch is always 1 because the dispatcher can be restarted + // and resume from a different batch. + let mut next_expected_batch_lock = next_expected_batch_clone.lock().await; + if next_expected_batch_lock.is_none() { + next_expected_batch_lock.replace(batch.l1_batch_number); } - }, - ); - tokio::select! { - _ = subtasks => {}, - _ = stop_receiver.changed() => { - break; + pending_batches.insert(batch.l1_batch_number.0); + METRICS.blobs_pending_dispatch.inc_by(1); + tx.send(batch).await?; } - } - if tokio::time::timeout(self.config.polling_interval(), stop_receiver.changed()) - .await - .is_ok() - { - break; + tokio::time::sleep(Duration::from_secs(5)).await; } - } + Ok::<(), anyhow::Error>(()) + }); - tracing::info!("Stop signal received, da_dispatcher is shutting down"); - Ok(()) - } + let pool = self.pool.clone(); + let config = self.config.clone(); + let client = self.client.clone(); + let request_semaphore = self.request_semaphore.clone(); + let notifier = Arc::new(Notify::new()); + let pending_blobs_sender = tokio::spawn(async move { + let mut spawned_requests = vec![]; + let notifier = notifier.clone(); + loop { + if *stop_receiver.borrow() { + break; + } - /// Dispatches the blobs to the data availability layer, and saves the blob_id in the database. - async fn dispatch(&self) -> anyhow::Result<()> { - let mut conn = self.pool.connection_tagged("da_dispatcher").await?; - let batches = conn - .data_availability_dal() - .get_ready_for_da_dispatch_l1_batches(self.config.max_rows_to_dispatch() as usize) - .await?; - drop(conn); - - for batch in batches { - let dispatch_latency = METRICS.blob_dispatch_latency.start(); - let dispatch_response = retry(self.config.max_retries(), batch.l1_batch_number, || { - self.client - .dispatch_blob(batch.l1_batch_number.0, batch.pubdata.clone()) - }) - .await - .with_context(|| { - format!( - "failed to dispatch a blob with batch_number: {}, pubdata_len: {}", - batch.l1_batch_number, - batch.pubdata.len() - ) - })?; - let dispatch_latency_duration = dispatch_latency.observe(); - - let sent_at = Utc::now().naive_utc(); - - let mut conn = self.pool.connection_tagged("da_dispatcher").await?; - conn.data_availability_dal() - .insert_l1_batch_da( - batch.l1_batch_number, - dispatch_response.blob_id.as_str(), - sent_at, - ) - .await?; - drop(conn); - - METRICS - .last_dispatched_l1_batch - .set(batch.l1_batch_number.0 as usize); - METRICS.blob_size.observe(batch.pubdata.len()); - tracing::info!( - "Dispatched a DA for batch_number: {}, pubdata_size: {}, dispatch_latency: {dispatch_latency_duration:?}", - batch.l1_batch_number, - batch.pubdata.len(), - ); - } + let batch = match rx.recv().await { + Some(batch) => batch, + None => continue, // Should never happen + }; + + // Block until we can send the request + let permit = request_semaphore.clone().acquire_owned().await?; + + let client = client.clone(); + let pool = pool.clone(); + let config = config.clone(); + let next_expected_batch = next_expected_batch.clone(); + let notifier = notifier.clone(); + let request = tokio::spawn(async move { + let _permit = permit; // move permit into scope + let dispatch_latency = METRICS.blob_dispatch_latency.start(); + let dispatch_response = + retry(config.max_retries(), batch.l1_batch_number, || { + client.dispatch_blob(batch.l1_batch_number.0, batch.pubdata.clone()) + }) + .await + .with_context(|| { + format!( + "failed to dispatch a blob with batch_number: {}, pubdata_len: {}", + batch.l1_batch_number, + batch.pubdata.len() + ) + })?; + let dispatch_latency_duration = dispatch_latency.observe(); + + let sent_at = Utc::now().naive_utc(); + + // Before saving the blob in the database, we need to be sure that we are doing it + // in the correct order. + while next_expected_batch + .lock() + .await + .map_or(true, |next_expected_batch| { + batch.l1_batch_number > next_expected_batch + }) + { + notifier.clone().notified().await; + } + let mut conn = pool.connection_tagged("da_dispatcher").await?; + conn.data_availability_dal() + .insert_l1_batch_da( + batch.l1_batch_number, + dispatch_response.blob_id.as_str(), + sent_at, + ) + .await?; + drop(conn); + + // Update the next expected batch number + next_expected_batch + .lock() + .await + .replace(batch.l1_batch_number + 1); + notifier.notify_waiters(); + + METRICS + .last_dispatched_l1_batch + .set(batch.l1_batch_number.0 as usize); + METRICS.blob_size.observe(batch.pubdata.len()); + METRICS.blobs_dispatched.inc_by(1); + METRICS.blobs_pending_dispatch.dec_by(1); + tracing::info!( + "Dispatched a DA for batch_number: {}, pubdata_size: {}, dispatch_latency: {dispatch_latency_duration:?}", + batch.l1_batch_number, + batch.pubdata.len(), + ); + + Ok::<(), anyhow::Error>(()) + }); + spawned_requests.push(request); + } + join_all(spawned_requests).await; + Ok::<(), anyhow::Error>(()) + }); + + let results = join_all(vec![pending_blobs_reader, pending_blobs_sender]).await; + for result in results { + result??; + } Ok(()) } - /// Polls the data availability layer for inclusion data, and saves it in the database. - async fn poll_for_inclusion(&self) -> anyhow::Result<()> { - let mut conn = self.pool.connection_tagged("da_dispatcher").await?; - let blob_info = conn - .data_availability_dal() - .get_first_da_blob_awaiting_inclusion() - .await?; - drop(conn); - - let Some(blob_info) = blob_info else { - return Ok(()); - }; - - let inclusion_data = if self.config.use_dummy_inclusion_data() { - self.client - .get_inclusion_data(blob_info.blob_id.as_str()) - .await - .with_context(|| { - format!( - "failed to get inclusion data for blob_id: {}, batch_number: {}", - blob_info.blob_id, blob_info.l1_batch_number - ) - })? - } else { - // if the inclusion verification is disabled, we don't need to wait for the inclusion - // data before committing the batch, so simply return an empty vector - Some(InclusionData { data: vec![] }) - }; - - let Some(inclusion_data) = inclusion_data else { - return Ok(()); - }; - - let mut conn = self.pool.connection_tagged("da_dispatcher").await?; - conn.data_availability_dal() - .save_l1_batch_inclusion_data( - L1BatchNumber(blob_info.l1_batch_number.0), - inclusion_data.data.as_slice(), - ) - .await?; - drop(conn); - - let inclusion_latency = Utc::now().signed_duration_since(blob_info.sent_at); - if let Ok(latency) = inclusion_latency.to_std() { - METRICS.inclusion_latency.observe(latency); - } - METRICS - .last_included_l1_batch - .set(blob_info.l1_batch_number.0 as usize); - - tracing::info!( - "Received an inclusion data for a batch_number: {}, inclusion_latency_seconds: {}", - blob_info.l1_batch_number, - inclusion_latency.num_seconds() + async fn inclusion_poller(&self, stop_receiver: Receiver) -> anyhow::Result<()> { + let (tx, mut rx) = mpsc::channel( + self.config + .max_concurrent_requests + .unwrap_or(DEFAULT_MAX_CONCURRENT_REQUESTS) as usize, ); + let stop_receiver_clone = stop_receiver.clone(); + let pool_clone = self.pool.clone(); + let pending_inclusion_reader = tokio::spawn(async move { + let mut pending_inclusions = HashSet::new(); + loop { + if *stop_receiver_clone.borrow() { + break; + } + + let mut conn = pool_clone.connection_tagged("da_dispatcher").await?; + let pending_blobs = conn + .data_availability_dal() + .get_da_blob_ids_awaiting_inclusion() + .await?; + drop(conn); + + for blob_info in pending_blobs.into_iter().flatten() { + if pending_inclusions.contains(&blob_info.blob_id) { + continue; + } + pending_inclusions.insert(blob_info.blob_id.clone()); + tx.send(blob_info).await?; + } + tokio::time::sleep(Duration::from_secs(5)).await; + } + Ok::<(), anyhow::Error>(()) + }); + + let pool = self.pool.clone(); + let config = self.config.clone(); + let client = self.client.clone(); + let semaphore = self.request_semaphore.clone(); + let pending_inclusion_sender = tokio::spawn(async move { + let mut spawned_requests = vec![]; + loop { + if *stop_receiver.borrow() { + break; + } + let blob_info = match rx.recv().await { + Some(blob_info) => blob_info, + None => continue, // Should never happen + }; + + // Block until we can send the request + let permit = semaphore.clone().acquire_owned().await?; + + let client = client.clone(); + let pool = pool.clone(); + let config = config.clone(); + let request = tokio::spawn(async move { + let _permit = permit; // move permit into scope + let inclusion_data = if config.use_dummy_inclusion_data() { + client + .get_inclusion_data(blob_info.blob_id.as_str()) + .await + .with_context(|| { + format!( + "failed to get inclusion data for blob_id: {}, batch_number: {}", + blob_info.blob_id, blob_info.l1_batch_number + ) + })? + } else { + // if the inclusion verification is disabled, we don't need to wait for the inclusion + // data before committing the batch, so simply return an empty vector + Some(InclusionData { data: vec![] }) + }; + + let Some(inclusion_data) = inclusion_data else { + return Ok(()); + }; + + let mut conn = pool.connection_tagged("da_dispatcher").await?; + conn.data_availability_dal() + .save_l1_batch_inclusion_data( + L1BatchNumber(blob_info.l1_batch_number.0), + inclusion_data.data.as_slice(), + ) + .await?; + drop(conn); + + let inclusion_latency = Utc::now().signed_duration_since(blob_info.sent_at); + if let Ok(latency) = inclusion_latency.to_std() { + METRICS.inclusion_latency.observe(latency); + } + METRICS + .last_included_l1_batch + .set(blob_info.l1_batch_number.0 as usize); + METRICS.blobs_included.inc_by(1); + + tracing::info!( + "Received an inclusion data for a batch_number: {}, inclusion_latency_seconds: {}", + blob_info.l1_batch_number, + inclusion_latency.num_seconds() + ); + + Ok::<(), anyhow::Error>(()) + }); + spawned_requests.push(request); + } + join_all(spawned_requests).await; + Ok::<(), anyhow::Error>(()) + }); + + let results = join_all(vec![pending_inclusion_reader, pending_inclusion_sender]).await; + for result in results { + result??; + } Ok(()) } } diff --git a/core/node/da_dispatcher/src/metrics.rs b/core/node/da_dispatcher/src/metrics.rs index 67ac5ed68222..4c21e556abe1 100644 --- a/core/node/da_dispatcher/src/metrics.rs +++ b/core/node/da_dispatcher/src/metrics.rs @@ -19,6 +19,12 @@ pub(super) struct DataAvailabilityDispatcherMetrics { /// Buckets are bytes ranging from 1 KB to 16 MB, which has to satisfy all blob size values. #[metrics(buckets = Buckets::exponential(1_024.0..=16.0 * 1_024.0 * 1_024.0, 2.0), unit = Unit::Bytes)] pub blob_size: Histogram, + /// Amount of pending blobs to be dispatched. + pub blobs_pending_dispatch: Gauge, + /// Total number of blobs dispatched. + pub blobs_dispatched: Gauge, + /// Total number of blobs included. + pub blobs_included: Gauge, /// Number of transactions resent by the DA dispatcher. #[metrics(buckets = Buckets::linear(0.0..=10.0, 1.0))]