Skip to content

Commit

Permalink
feat(eigen-client-m0-implementation): concurrent da dispatcher (#333)
Browse files Browse the repository at this point in the history
* initial commit

* impl TODO query for concurrent dispatcher
  • Loading branch information
juan518munoz authored Nov 12, 2024
1 parent 9c73b3f commit 64d0a38
Show file tree
Hide file tree
Showing 9 changed files with 379 additions and 134 deletions.
4 changes: 4 additions & 0 deletions core/lib/config/src/configs/da_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub const DEFAULT_POLLING_INTERVAL_MS: u32 = 5000;
pub const DEFAULT_MAX_ROWS_TO_DISPATCH: u32 = 100;
pub const DEFAULT_MAX_RETRIES: u16 = 5;
pub const DEFAULT_USE_DUMMY_INCLUSION_DATA: bool = false;
pub const DEFAULT_MAX_CONCURRENT_REQUESTS: u32 = 100;

#[derive(Debug, Clone, PartialEq, Deserialize)]
pub struct DADispatcherConfig {
Expand All @@ -19,6 +20,8 @@ pub struct DADispatcherConfig {
// TODO: run a verification task to check if the L1 contract expects the inclusion proofs to
// avoid the scenario where contracts expect real proofs, and server is using dummy proofs.
pub use_dummy_inclusion_data: Option<bool>,
/// The maximun number of concurrent request to send to the DA server.
pub max_concurrent_requests: Option<u32>,
}

impl DADispatcherConfig {
Expand All @@ -28,6 +31,7 @@ impl DADispatcherConfig {
max_rows_to_dispatch: Some(DEFAULT_MAX_ROWS_TO_DISPATCH),
max_retries: Some(DEFAULT_MAX_RETRIES),
use_dummy_inclusion_data: Some(DEFAULT_USE_DUMMY_INCLUSION_DATA),
max_concurrent_requests: Some(DEFAULT_MAX_CONCURRENT_REQUESTS),
}
}

Expand Down
1 change: 1 addition & 0 deletions core/lib/config/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,7 @@ impl Distribution<configs::da_dispatcher::DADispatcherConfig> for EncodeDist {
max_rows_to_dispatch: self.sample(rng),
max_retries: self.sample(rng),
use_dummy_inclusion_data: self.sample(rng),
max_concurrent_requests: self.sample(rng),
}
}
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 39 additions & 0 deletions core/lib/dal/src/data_availability_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,45 @@ impl DataAvailabilityDal<'_, '_> {
.map(DataAvailabilityBlob::from))
}

pub async fn get_da_blob_ids_awaiting_inclusion(
&mut self,
) -> DalResult<Vec<Option<DataAvailabilityBlob>>> {
let rows = sqlx::query!(
r#"
SELECT
l1_batch_number,
blob_id,
inclusion_data,
sent_at
FROM
data_availability
WHERE
inclusion_data IS NULL
ORDER BY
l1_batch_number
"#,
)
.instrument("get_da_blobs_awaiting_inclusion")
.fetch_all(self.storage)
.await?;

Ok(rows
.into_iter()
.map(|row| {
let l1_batch_number_u32 = row.l1_batch_number.try_into();
if let Ok(l1_batch_number) = l1_batch_number_u32 {
Some(DataAvailabilityBlob {
l1_batch_number: L1BatchNumber(l1_batch_number),
blob_id: row.blob_id,
inclusion_data: row.inclusion_data,
sent_at: row.sent_at.and_utc(),
})
} else {
None
}
})
.collect())
}
/// Fetches the pubdata and `l1_batch_number` for the L1 batches that are ready for DA dispatch.
pub async fn get_ready_for_da_dispatch_l1_batches(
&mut self,
Expand Down
5 changes: 4 additions & 1 deletion core/lib/env_config/src/da_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ mod tests {
interval: u32,
rows_limit: u32,
max_retries: u16,
max_concurrent_requests: u32,
) -> DADispatcherConfig {
DADispatcherConfig {
polling_interval_ms: Some(interval),
max_rows_to_dispatch: Some(rows_limit),
max_retries: Some(max_retries),
use_dummy_inclusion_data: Some(true),
max_concurrent_requests: Some(max_concurrent_requests),
}
}

Expand All @@ -38,9 +40,10 @@ mod tests {
DA_DISPATCHER_MAX_ROWS_TO_DISPATCH=60
DA_DISPATCHER_MAX_RETRIES=7
DA_DISPATCHER_USE_DUMMY_INCLUSION_DATA="true"
DA_DISPATCHER_MAX_CONCURRENT_REQUESTS=10
"#;
lock.set_env(config);
let actual = DADispatcherConfig::from_env().unwrap();
assert_eq!(actual, expected_da_layer_config(5000, 60, 7));
assert_eq!(actual, expected_da_layer_config(5000, 60, 7, 10));
}
}
2 changes: 2 additions & 0 deletions core/lib/protobuf_config/src/da_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ impl ProtoRepr for proto::DataAvailabilityDispatcher {
max_rows_to_dispatch: self.max_rows_to_dispatch,
max_retries: self.max_retries.map(|x| x as u16),
use_dummy_inclusion_data: self.use_dummy_inclusion_data,
max_concurrent_requests: self.max_concurrent_requests,
})
}

Expand All @@ -21,6 +22,7 @@ impl ProtoRepr for proto::DataAvailabilityDispatcher {
max_rows_to_dispatch: this.max_rows_to_dispatch,
max_retries: this.max_retries.map(Into::into),
use_dummy_inclusion_data: this.use_dummy_inclusion_data,
max_concurrent_requests: this.max_concurrent_requests,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ message DataAvailabilityDispatcher {
optional uint32 max_rows_to_dispatch = 2;
optional uint32 max_retries = 3;
optional bool use_dummy_inclusion_data = 4;
optional uint32 max_concurrent_requests = 5;
}
Loading

0 comments on commit 64d0a38

Please sign in to comment.