From 0f47ff59c11fddfad7c81df426f135f88c4e7a5f Mon Sep 17 00:00:00 2001 From: adel Date: Wed, 2 Oct 2024 00:01:17 +0200 Subject: [PATCH] feat: ExEx proof of concept (#5) * feat(exex): ExEx v0.0.000001 * feat(exex): * feat(exex): Running minimal * feat(exex): * feat(exex): * feat(exex): * feat(exex): Arch * feat(exex): Full node exex * feat(exex): Updated structs before context * feat(exex): lint --- .editorconfig | 18 - .github/workflows/karnot-ci.yml | 14 +- .github/workflows/release.yml | 2 +- CHANGELOG.md | 1 + Cargo.lock | 31 +- Cargo.toml | 5 + configs/presets/devnet.yaml | 4 +- crates/client/devnet/src/lib.rs | 1 + crates/client/mempool/Cargo.toml | 1 + crates/client/mempool/src/block_production.rs | 18 +- crates/client/sync/Cargo.toml | 1 + crates/client/sync/src/l2.rs | 18 + crates/client/sync/src/lib.rs | 3 + crates/node/Cargo.toml | 1 + crates/node/src/extensions/mod.rs | 21 + crates/node/src/extensions/pragma_dispatch.rs | 15 + crates/node/src/main.rs | 123 +++--- crates/node/src/service/block_production.rs | 19 +- crates/node/src/service/exex/mod.rs | 1 + crates/node/src/service/sync.rs | 6 + crates/primitives/exex/Cargo.toml | 30 ++ crates/primitives/exex/src/context.rs | 30 ++ crates/primitives/exex/src/event.rs | 6 + crates/primitives/exex/src/head.rs | 34 ++ crates/primitives/exex/src/launcher.rs | 124 ++++++ crates/primitives/exex/src/lib.rs | 20 + crates/primitives/exex/src/manager.rs | 366 ++++++++++++++++++ crates/primitives/exex/src/notification.rs | 48 +++ infra/pragma/pragma-env.yaml | 2 +- 29 files changed, 869 insertions(+), 94 deletions(-) delete mode 100644 .editorconfig create mode 100644 crates/node/src/extensions/mod.rs create mode 100644 crates/node/src/extensions/pragma_dispatch.rs create mode 100644 crates/node/src/service/exex/mod.rs create mode 100644 crates/primitives/exex/Cargo.toml create mode 100644 crates/primitives/exex/src/context.rs create mode 100644 crates/primitives/exex/src/event.rs create mode 100644 crates/primitives/exex/src/head.rs create mode 100644 crates/primitives/exex/src/launcher.rs create mode 100644 crates/primitives/exex/src/lib.rs create mode 100644 crates/primitives/exex/src/manager.rs create mode 100644 crates/primitives/exex/src/notification.rs diff --git a/.editorconfig b/.editorconfig deleted file mode 100644 index 6c27ac462..000000000 --- a/.editorconfig +++ /dev/null @@ -1,18 +0,0 @@ -# Madara editor configuration - -root = true - -[*] -indent_style=space -indent_size=2 -tab_width=2 -end_of_line=lf -charset=utf-8 -trim_trailing_whitespace=true -insert_final_newline = true - -[*.{rs,toml}] -indent_style=tab -indent_size=tab -tab_width=4 -max_line_length=100 diff --git a/.github/workflows/karnot-ci.yml b/.github/workflows/karnot-ci.yml index 9f594973c..caea07f0b 100644 --- a/.github/workflows/karnot-ci.yml +++ b/.github/workflows/karnot-ci.yml @@ -2,7 +2,7 @@ on: workflow_dispatch: inputs: environment: - description: 'Environment to deploy' + description: "Environment to deploy" required: true type: choice options: @@ -14,14 +14,14 @@ jobs: - uses: actions/checkout@v3 - name: Debug Information run: | - echo "Selected environment: ${{ github.event.inputs.environment }}" - echo "Current directory: $(pwd)" - echo "Directory contents:" - ls -R + echo "Selected environment: ${{ github.event.inputs.environment }}" + echo "Current directory: $(pwd)" + echo "Directory contents:" + ls -R - name: Run Karnot Cloud Pro uses: karnotxyz/kcloud-pro-github-action@main with: - input_file: './infra/pragma/pragma-env.yaml' # Specify your file + input_file: "./infra/pragma/pragma-env.yaml" # Specify your file environment: ${{ github.event.inputs.environment }} KARNOT_CLOUD_URL: ${{ secrets.KARNOT_CLOUD_URL }} - KARNOT_CLOUD_TOKEN: ${{ secrets.KARNOT_CLOUD_TOKEN }} \ No newline at end of file + KARNOT_CLOUD_TOKEN: ${{ secrets.KARNOT_CLOUD_TOKEN }} diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 7bf2dcfba..08f2ca67d 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -26,4 +26,4 @@ jobs: commit: ${{ github.sha }} draft: false prerelease: false - generateReleaseNotes: true \ No newline at end of file + generateReleaseNotes: true diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f6756451..f0337f99d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## Next release +- feat: Madara ExExs proof of concept - feat(cli): launcher script and release workflows - fix: cleaned cli settings for sequencer, devnet and full - feat: move to karnot runner diff --git a/Cargo.lock b/Cargo.lock index 1e1c1d636..ccf6afe6a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5515,6 +5515,7 @@ dependencies = [ "mp-block", "mp-chain-config", "mp-convert", + "mp-exex", "mp-utils", "primitive-types", "rand", @@ -5741,6 +5742,7 @@ dependencies = [ "mp-chain-config", "mp-class", "mp-convert", + "mp-exex", "mp-receipt", "mp-state-update", "mp-transactions", @@ -5816,6 +5818,7 @@ dependencies = [ "mp-chain-config", "mp-class", "mp-convert", + "mp-exex", "mp-receipt", "mp-transactions", "mp-utils", @@ -6004,6 +6007,26 @@ dependencies = [ "thiserror", ] +[[package]] +name = "mp-exex" +version = "0.7.0" +dependencies = [ + "anyhow", + "async-trait", + "futures", + "log", + "mc-block-import", + "mp-block", + "mp-chain-config", + "rayon", + "rstest 0.18.2", + "serde", + "starknet_api", + "tokio", + "tokio-stream", + "tokio-util", +] + [[package]] name = "mp-receipt" version = "0.7.0" @@ -8930,9 +8953,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.15" +version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" +checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1" dependencies = [ "futures-core", "pin-project-lite", @@ -8942,9 +8965,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.11" +version = "0.7.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" +checksum = "61e7c3654c13bcd040d4a03abee2c75b1d14a37b423cf5a813ceae1cc903ec6a" dependencies = [ "bytes", "futures-core", diff --git a/Cargo.toml b/Cargo.toml index ae7489bcb..790c85026 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ members = [ "crates/primitives/state_update", "crates/primitives/chain_config", "crates/primitives/utils", + "crates/primitives/exex", "crates/proc-macros", "crates/tests", ] @@ -43,6 +44,7 @@ default-members = [ "crates/primitives/state_update", "crates/primitives/chain_config", "crates/primitives/utils", + "crates/primitives/exex", "crates/proc-macros", "crates/tests", ] @@ -92,6 +94,7 @@ mp-receipt = { path = "crates/primitives/receipt", default-features = false } mp-state-update = { path = "crates/primitives/state_update", default-features = false } mp-utils = { path = "crates/primitives/utils", default-features = false } mp-chain-config = { path = "crates/primitives/chain_config", default-features = false } +mp-exex = { path = "crates/primitives/exex", default-features = false } # Madara client mc-telemetry = { path = "crates/client/telemetry" } @@ -176,6 +179,8 @@ env_logger = "0.11.3" mockall = "0.13.0" serial_test = "3.1.1" itertools = "0.13.0" +tokio-stream = "0.1.16" +tokio-util = "0.7.12" [patch.crates-io] starknet-core = { git = "https://github.com/kasarlabs/starknet-rs.git", branch = "fork" } diff --git a/configs/presets/devnet.yaml b/configs/presets/devnet.yaml index a33227182..939b2e796 100644 --- a/configs/presets/devnet.yaml +++ b/configs/presets/devnet.yaml @@ -9,8 +9,8 @@ versioned_constants: "0.13.2": "crates/primitives/chain_config/resources/versioned_constants_13_2.json" eth_core_contract_address: "0xE2Bb56ee936fd6433DC0F6e7e3b8365C906AA057" latest_protocol_version: "0.13.2" -block_time: "30s" -pending_block_update_time: "2s" +block_time: "1s" +pending_block_update_time: "500ms" execution_batch_size: 16 bouncer_config: block_max_capacity: diff --git a/crates/client/devnet/src/lib.rs b/crates/client/devnet/src/lib.rs index 301c05305..2e54ad5ad 100644 --- a/crates/client/devnet/src/lib.rs +++ b/crates/client/devnet/src/lib.rs @@ -331,6 +331,7 @@ mod tests { Arc::clone(&importer), Arc::clone(&mempool), Arc::clone(&l1_data_provider), + Option::None, ) .unwrap(); diff --git a/crates/client/mempool/Cargo.toml b/crates/client/mempool/Cargo.toml index 17bd99e56..f2db88b80 100644 --- a/crates/client/mempool/Cargo.toml +++ b/crates/client/mempool/Cargo.toml @@ -37,6 +37,7 @@ mp-block.workspace = true mp-chain-config.workspace = true mp-class.workspace = true mp-convert.workspace = true +mp-exex.workspace = true mp-receipt.workspace = true mp-state-update.workspace = true mp-transactions.workspace = true diff --git a/crates/client/mempool/src/block_production.rs b/crates/client/mempool/src/block_production.rs index b1ec49dbd..b9b55a14d 100644 --- a/crates/client/mempool/src/block_production.rs +++ b/crates/client/mempool/src/block_production.rs @@ -1,5 +1,6 @@ // TODO: Move this into its own crate. +use anyhow::Context; use blockifier::blockifier::transaction_executor::{TransactionExecutor, VisitedSegmentsMapping}; use blockifier::bouncer::{Bouncer, BouncerWeights, BuiltinCount}; use blockifier::state::cached_state::CommitmentStateDiff; @@ -13,6 +14,7 @@ use mc_exec::{BlockifierStateAdapter, ExecutionContext}; use mp_block::{BlockId, BlockTag, MadaraPendingBlock}; use mp_class::ConvertedClass; use mp_convert::ToFelt; +use mp_exex::{ExExManagerHandle, ExExNotification}; use mp_receipt::from_blockifier_execution_info; use mp_state_update::{ ContractStorageDiffItem, DeclaredClassItem, DeployedContractItem, NonceUpdate, ReplacedClassItem, StateDiff, @@ -20,6 +22,7 @@ use mp_state_update::{ }; use mp_transactions::TransactionWithHash; use mp_utils::graceful_shutdown; +use starknet_api::block::BlockNumber; use starknet_types_core::felt::Felt; use std::collections::VecDeque; use std::mem; @@ -176,6 +179,7 @@ pub struct BlockProductionTask { pub(crate) executor: TransactionExecutor, l1_data_provider: Arc, current_pending_tick: usize, + exex_manager: Option, } impl BlockProductionTask { @@ -189,6 +193,7 @@ impl BlockProductionTask { importer: Arc, mempool: Arc, l1_data_provider: Arc, + exex_manager: Option, ) -> Result { let parent_block_hash = backend .get_block_hash(&BlockId::Tag(BlockTag::Latest))? @@ -218,6 +223,7 @@ impl BlockProductionTask { block: pending_block, declared_classes: vec![], l1_data_provider, + exex_manager, }) } @@ -396,7 +402,6 @@ impl BlockProductionTask { self.continue_block(self.backend.chain_config().bouncer_config.block_max_capacity)?; // Convert the pending block to a closed block and save to db. - let parent_block_hash = Felt::ZERO; // temp parent block hash let new_empty_block = MadaraPendingBlock::new_empty(make_pending_header( parent_block_hash, @@ -427,6 +432,7 @@ impl BlockProductionTask { self.current_pending_tick = 0; log::info!("⛏️ Closed block #{} with {} transactions - {:?}", block_n, n_txs, start_time.elapsed()); + let _ = self.notify_exexs(block_n).context("Sending notification to ExExs"); Ok(()) } @@ -479,4 +485,14 @@ impl BlockProductionTask { fn block_n(&self) -> u64 { self.executor.block_context.block_info().block_number.0 } + + /// Sends a notification to the ExExs that a block has been closed. + fn notify_exexs(&self, block_n: u64) -> anyhow::Result<()> { + let Some(manager) = self.exex_manager.as_ref() else { + return Ok(()); + }; + + let notification = ExExNotification::BlockClosed { new: BlockNumber(block_n) }; + manager.send(notification).map_err(|e| anyhow::anyhow!("Could not send ExEx notification: {}", e)) + } } diff --git a/crates/client/sync/Cargo.toml b/crates/client/sync/Cargo.toml index 900828be0..797800557 100644 --- a/crates/client/sync/Cargo.toml +++ b/crates/client/sync/Cargo.toml @@ -26,6 +26,7 @@ mp-block = { workspace = true } mp-chain-config = { workspace = true } mp-class = { workspace = true } mp-convert = { workspace = true } +mp-exex = { workspace = true } mp-receipt = { workspace = true } mp-transactions = { workspace = true } mp-utils = { workspace = true } diff --git a/crates/client/sync/src/l2.rs b/crates/client/sync/src/l2.rs index faf83a81a..d541291a0 100644 --- a/crates/client/sync/src/l2.rs +++ b/crates/client/sync/src/l2.rs @@ -10,7 +10,10 @@ use mc_block_import::{ use mc_db::MadaraBackend; use mc_db::MadaraStorageError; use mc_telemetry::{TelemetryHandle, VerbosityLevel}; +use mp_exex::ExExManagerHandle; +use mp_exex::ExExNotification; use mp_utils::{channel_wait_or_graceful_shutdown, wait_or_graceful_shutdown, PerfStopwatch}; +use starknet_api::block::BlockNumber; use starknet_api::core::ChainId; use starknet_providers::{ProviderError, SequencerGatewayProvider}; use starknet_types_core::felt::Felt; @@ -41,6 +44,16 @@ pub struct L2StateUpdate { pub block_hash: Felt, } +/// Sends a notification to the ExExs that a block has been imported. +fn notify_exexs(exex_manager: &Option, block_n: u64) -> anyhow::Result<()> { + let Some(manager) = exex_manager.as_ref() else { + return Ok(()); + }; + + let notification = ExExNotification::BlockClosed { new: BlockNumber(block_n) }; + manager.send(notification).map_err(|e| anyhow::anyhow!("Could not send ExEx notification: {}", e)) +} + #[allow(clippy::too_many_arguments)] async fn l2_verify_and_apply_task( backend: Arc, @@ -49,6 +62,7 @@ async fn l2_verify_and_apply_task( validation: BlockValidationContext, backup_every_n_blocks: Option, telemetry: TelemetryHandle, + exex_manager: Option, ) -> anyhow::Result<()> { while let Some(block) = channel_wait_or_graceful_shutdown(pin!(updates_receiver.recv())).await { let BlockImportResult { header, block_hash } = block_import.verify_apply(block, validation.clone()).await?; @@ -66,6 +80,8 @@ async fn l2_verify_and_apply_task( header.global_state_root ); + notify_exexs(&exex_manager, header.block_number)?; + telemetry.send( VerbosityLevel::Info, serde_json::json!({ @@ -185,6 +201,7 @@ pub async fn sync( chain_id: ChainId, telemetry: TelemetryHandle, block_importer: Arc, + exex_manager: Option, ) -> anyhow::Result<()> { let (fetch_stream_sender, fetch_stream_receiver) = mpsc::channel(8); let (block_conv_sender, block_conv_receiver) = mpsc::channel(4); @@ -231,6 +248,7 @@ pub async fn sync( validation.clone(), config.backup_every_n_blocks, telemetry, + exex_manager, )); join_set.spawn(l2_pending_block_task( Arc::clone(backend), diff --git a/crates/client/sync/src/lib.rs b/crates/client/sync/src/lib.rs index 95d132842..260622554 100644 --- a/crates/client/sync/src/lib.rs +++ b/crates/client/sync/src/lib.rs @@ -5,6 +5,7 @@ use mc_block_import::BlockImporter; use mc_db::MadaraBackend; use mc_telemetry::TelemetryHandle; use mp_convert::ToFelt; +use mp_exex::ExExManagerHandle; use starknet_providers::SequencerGatewayProvider; use std::{sync::Arc, time::Duration}; @@ -23,6 +24,7 @@ pub async fn sync( backup_every_n_blocks: Option, telemetry: TelemetryHandle, pending_block_poll_interval: Duration, + exex_manager: Option, ) -> anyhow::Result<()> { let (starting_block, ignore_block_order) = if let Some(starting_block) = starting_block { log::warn!("Forcing unordered state. This will most probably break your database."); @@ -65,6 +67,7 @@ pub async fn sync( backend.chain_config().chain_id.clone(), telemetry, block_importer, + exex_manager, ) .await?; diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index b874518d1..6701a3e50 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -30,6 +30,7 @@ mc-telemetry = { workspace = true } mp-block = { workspace = true } mp-chain-config = { workspace = true } mp-convert = { workspace = true } +mp-exex = { workspace = true } mp-utils = { workspace = true } # Starknet diff --git a/crates/node/src/extensions/mod.rs b/crates/node/src/extensions/mod.rs new file mode 100644 index 000000000..2bb44dd7e --- /dev/null +++ b/crates/node/src/extensions/mod.rs @@ -0,0 +1,21 @@ +mod pragma_dispatch; + +use futures::future::BoxFuture; +use mp_exex::{BoxExEx, BoxedLaunchExEx, ExExContext}; +use pragma_dispatch::exex_pragma_dispatch; + +// Helper function to create a boxed ExEx +fn box_exex(f: F) -> Box +where + F: FnOnce(ExExContext) -> Fut + Send + Sync + 'static, + Fut: futures::Future> + Send + 'static, +{ + Box::new(move |ctx| { + Box::pin(async move { Ok(Box::pin(f(ctx)) as BoxExEx) }) as BoxFuture<'static, anyhow::Result> + }) +} + +/// List of all ExEx that will be ran along Madara. +pub fn madara_exexs() -> Vec<(String, Box)> { + vec![("Pragma Dispatch ExEx".to_string(), box_exex(exex_pragma_dispatch))] +} diff --git a/crates/node/src/extensions/pragma_dispatch.rs b/crates/node/src/extensions/pragma_dispatch.rs new file mode 100644 index 000000000..8eb2f2fa0 --- /dev/null +++ b/crates/node/src/extensions/pragma_dispatch.rs @@ -0,0 +1,15 @@ +//! ExEx of Pragma Dispatcher +//! Adds a new TX at the end of each block, dispatching a message through +//! Hyperlane. + +use futures::StreamExt; +use mp_exex::{ExExContext, ExExEvent}; + +pub async fn exex_pragma_dispatch(mut ctx: ExExContext) -> anyhow::Result<()> { + while let Some(notification) = ctx.notifications.next().await { + let block_number = notification.closed_block(); + log::info!("👋 Hello from the ExEx (triggered at block #{})", block_number); + ctx.events.send(ExExEvent::FinishedHeight(block_number))?; + } + Ok(()) +} diff --git a/crates/node/src/main.rs b/crates/node/src/main.rs index 26766687f..4f6f9ff02 100644 --- a/crates/node/src/main.rs +++ b/crates/node/src/main.rs @@ -3,14 +3,15 @@ #![warn(clippy::unwrap_used)] mod cli; +mod extensions; mod service; mod util; -use std::sync::Arc; - use anyhow::Context; use clap::Parser; +use extensions::madara_exexs; use mc_block_import::BlockImporter; +use std::sync::Arc; use mc_db::DatabaseService; use mc_mempool::{GasPriceProvider, L1DataProvider, Mempool}; @@ -18,6 +19,7 @@ use mc_metrics::MetricsService; use mc_rpc::providers::{AddTransactionProvider, ForwardToProvider, MempoolAddTxProvider}; use mc_telemetry::{SysInfo, TelemetryService}; use mp_convert::ToFelt; +use mp_exex::ExExLauncher; use mp_utils::service::{Service, ServiceGroup}; use starknet_providers::SequencerGatewayProvider; @@ -117,62 +119,69 @@ async fn main() -> anyhow::Result<()> { // Block provider startup. // `rpc_add_txs_method_provider` is a trait object that tells the RPC task where to put the transactions when using the Write endpoints. - let (block_provider_service, rpc_add_txs_method_provider): (_, Arc) = - match run_cmd.is_sequencer() { - // Block production service. (authority) - true => { - let mempool = Arc::new(Mempool::new(Arc::clone(db_service.backend()), Arc::clone(&l1_data_provider))); - - let block_production_service = BlockProductionService::new( - &run_cmd.block_production_params, - &db_service, - Arc::clone(&mempool), - importer, - Arc::clone(&l1_data_provider), - run_cmd.devnet, - prometheus_service.registry(), - telemetry_service.new_handle(), - )?; - - (ServiceGroup::default().with(block_production_service), Arc::new(MempoolAddTxProvider::new(mempool))) - } - // Block sync service. (full node) - false => { - // Feeder gateway sync service. - let sync_service = SyncService::new( - &run_cmd.sync_params, - Arc::clone(&chain_config), - run_cmd.network.context( - "You should provide a `--network` argument to ensure you're syncing from the right FGW", - )?, - &db_service, - importer, - telemetry_service.new_handle(), - ) - .await - .context("Initializing sync service")?; - - ( - ServiceGroup::default().with(sync_service), - // TODO(rate-limit): we may get rate limited with this unconfigured provider? - Arc::new(ForwardToProvider::new(SequencerGatewayProvider::new( - run_cmd - .network - .context( - "You should provide a `--network` argument to ensure you're syncing from the right gateway", - )? - .gateway(), - run_cmd - .network - .context( - "You should provide a `--network` argument to ensure you're syncing from the right FGW", - )? - .feeder_gateway(), - chain_config.chain_id.to_felt(), - ))), + let (block_provider_service, rpc_add_txs_method_provider): (_, Arc) = match run_cmd + .is_sequencer() + { + // Block production service. (authority) + true => { + let mempool = Arc::new(Mempool::new(Arc::clone(db_service.backend()), Arc::clone(&l1_data_provider))); + let mempool_provider = MempoolAddTxProvider::new(Arc::clone(&mempool)); + + // Launch the ExEx manager for configured ExExs - if any. + let exex_manager = ExExLauncher::new(Arc::clone(&chain_config), madara_exexs()).launch().await?; + + let block_production_service = BlockProductionService::new( + &run_cmd.block_production_params, + &db_service, + Arc::clone(&mempool), + importer, + Arc::clone(&l1_data_provider), + run_cmd.devnet, + exex_manager, + prometheus_service.registry(), + telemetry_service.new_handle(), + )?; + + (ServiceGroup::default().with(block_production_service), Arc::new(mempool_provider)) + } + // Block sync service. (full node) + false => { + // TODO(rate-limit): we may get rate limited with this unconfigured provider? + let gateway_provider = ForwardToProvider::new(SequencerGatewayProvider::new( + run_cmd + .network + .context( + "You should provide a `--network` argument to ensure you're syncing from the right gateway", + )? + .gateway(), + run_cmd + .network + .context("You should provide a `--network` argument to ensure you're syncing from the right FGW")? + .feeder_gateway(), + chain_config.chain_id.to_felt(), + )); + + // Launch the ExEx manager for configured ExExs - if any. + let exex_manager = ExExLauncher::new(Arc::clone(&chain_config), madara_exexs()).launch().await?; + + // Feeder gateway sync service. + let sync_service = SyncService::new( + &run_cmd.sync_params, + Arc::clone(&chain_config), + run_cmd + .network + .context("You should provide a `--network` argument to ensure you're syncing from the right FGW")?, + &db_service, + importer, + exex_manager, + telemetry_service.new_handle(), ) - } - }; + .await + .context("Initializing sync service")?; + + (ServiceGroup::default().with(sync_service), Arc::new(gateway_provider)) + } + }; let rpc_service = RpcService::new( &run_cmd.rpc_params, diff --git a/crates/node/src/service/block_production.rs b/crates/node/src/service/block_production.rs index f859ba24b..ce5423c4f 100644 --- a/crates/node/src/service/block_production.rs +++ b/crates/node/src/service/block_production.rs @@ -7,6 +7,7 @@ use mc_devnet::{ChainGenesisDescription, DevnetKeys}; use mc_mempool::{block_production::BlockProductionTask, L1DataProvider, Mempool}; use mc_metrics::MetricsRegistry; use mc_telemetry::TelemetryHandle; +use mp_exex::ExExManagerHandle; use mp_utils::service::Service; use tokio::task::JoinSet; @@ -19,6 +20,7 @@ struct StartParams { l1_data_provider: Arc, is_devnet: bool, n_devnet_contracts: u64, + exex_manager: Option, } pub struct BlockProductionService { @@ -34,6 +36,7 @@ impl BlockProductionService { block_import: Arc, l1_data_provider: Arc, is_devnet: bool, + exex_manager: Option, _metrics_handle: &MetricsRegistry, _telemetry: TelemetryHandle, ) -> anyhow::Result { @@ -49,6 +52,7 @@ impl BlockProductionService { block_import, n_devnet_contracts: config.devnet_contracts, is_devnet, + exex_manager, }), enabled: true, }) @@ -62,8 +66,15 @@ impl Service for BlockProductionService { if !self.enabled { return Ok(()); } - let StartParams { backend, l1_data_provider, mempool, is_devnet, n_devnet_contracts, block_import } = - self.start.take().expect("Service already started"); + let StartParams { + backend, + l1_data_provider, + mempool, + is_devnet, + n_devnet_contracts, + block_import, + exex_manager, + } = self.start.take().expect("Service already started"); if is_devnet { // DEVNET: we the genesis block for the devnet if not deployed, otherwise we only print the devnet keys. @@ -107,7 +118,9 @@ impl Service for BlockProductionService { } join_set.spawn(async move { - BlockProductionTask::new(backend, block_import, mempool, l1_data_provider)?.block_production_task().await?; + BlockProductionTask::new(backend, block_import, mempool, l1_data_provider, exex_manager)? + .block_production_task() + .await?; Ok(()) }); diff --git a/crates/node/src/service/exex/mod.rs b/crates/node/src/service/exex/mod.rs new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/crates/node/src/service/exex/mod.rs @@ -0,0 +1 @@ + diff --git a/crates/node/src/service/sync.rs b/crates/node/src/service/sync.rs index 646061fcd..8883dc791 100644 --- a/crates/node/src/service/sync.rs +++ b/crates/node/src/service/sync.rs @@ -5,6 +5,7 @@ use mc_db::{DatabaseService, MadaraBackend}; use mc_sync::fetch::fetchers::FetchConfig; use mc_telemetry::TelemetryHandle; use mp_chain_config::ChainConfig; +use mp_exex::ExExManagerHandle; use mp_utils::service::Service; use std::sync::Arc; use std::time::Duration; @@ -20,6 +21,7 @@ pub struct SyncService { start_params: Option, disabled: bool, pending_block_poll_interval: Duration, + exex_manager: Option, } impl SyncService { @@ -29,6 +31,7 @@ impl SyncService { network: NetworkType, db: &DatabaseService, block_importer: Arc, + exex_manager: Option, telemetry: TelemetryHandle, ) -> anyhow::Result { let fetch_config = config.block_fetch_config(chain_config.chain_id.clone(), network); @@ -42,6 +45,7 @@ impl SyncService { start_params: Some(telemetry), disabled: config.sync_disabled, pending_block_poll_interval: config.pending_block_poll_interval, + exex_manager, }) } } @@ -58,6 +62,7 @@ impl Service for SyncService { starting_block, pending_block_poll_interval, block_importer, + exex_manager, .. } = self.clone(); let telemetry = self.start_params.take().context("Service already started")?; @@ -73,6 +78,7 @@ impl Service for SyncService { backup_every_n_blocks, telemetry, pending_block_poll_interval, + exex_manager, ) .await }); diff --git a/crates/primitives/exex/Cargo.toml b/crates/primitives/exex/Cargo.toml new file mode 100644 index 000000000..1fa7c01b3 --- /dev/null +++ b/crates/primitives/exex/Cargo.toml @@ -0,0 +1,30 @@ +[package] +description = "Madara Execution Extensions" +name = "mp-exex" +authors.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true +version.workspace = true +homepage.workspace = true + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] + +[dependencies] +mc-block-import = { workspace = true } +mp-block.workspace = true +mp-chain-config = { workspace = true } + +# Other +anyhow.workspace = true +async-trait.workspace = true +futures.workspace = true +log.workspace = true +rayon.workspace = true +rstest = { workspace = true } +serde.workspace = true +starknet_api.workspace = true +tokio = { workspace = true, features = ["signal"] } +tokio-stream.workspace = true +tokio-util.workspace = true diff --git a/crates/primitives/exex/src/context.rs b/crates/primitives/exex/src/context.rs new file mode 100644 index 000000000..2845d870f --- /dev/null +++ b/crates/primitives/exex/src/context.rs @@ -0,0 +1,30 @@ +use std::sync::Arc; + +use tokio::sync::mpsc::UnboundedSender; + +use mp_chain_config::ChainConfig; + +use crate::{notification::ExExNotifications, ExExEvent}; + +/// Captures the context that an `ExEx` has access to. +pub struct ExExContext { + /// The chain config + pub chain_config: Arc, + + /// Channel used to send [`ExExEvent`]s to the rest of the node. + /// + /// # Important + /// + /// The exex should emit a `FinishedHeight` whenever a processed block is safe to prune. + /// Additionally, the exex can pre-emptively emit a `FinishedHeight` event to specify what + /// blocks to receive notifications for. + pub events: UnboundedSender, + + /// Channel to receive [`ExExNotification`]s. + /// + /// # Important + /// + /// Once an [`ExExNotification`] is sent over the channel, it is + /// considered delivered by the node. + pub notifications: ExExNotifications, +} diff --git a/crates/primitives/exex/src/event.rs b/crates/primitives/exex/src/event.rs new file mode 100644 index 000000000..7fca70ad5 --- /dev/null +++ b/crates/primitives/exex/src/event.rs @@ -0,0 +1,6 @@ +use starknet_api::block::BlockNumber; + +/// Events emitted by an `ExEx`. +pub enum ExExEvent { + FinishedHeight(BlockNumber), +} diff --git a/crates/primitives/exex/src/head.rs b/crates/primitives/exex/src/head.rs new file mode 100644 index 000000000..3fea9e9ed --- /dev/null +++ b/crates/primitives/exex/src/head.rs @@ -0,0 +1,34 @@ +use starknet_api::block::BlockNumber; + +/// A head of the ExEx. It contains the highest host block committed to the +/// internal ExEx state. I.e. the latest block that the ExEx has fully +/// processed. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct ExExHead { + /// The head block. + pub block: BlockNumber, +} + +/// The finished height of all `ExEx`'s. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FinishedExExHeight { + /// No `ExEx`'s are installed, so there is no finished height. + NoExExs, + /// Not all `ExExs` have emitted a `FinishedHeight` event yet. + NotReady, + /// The finished height of all `ExEx`'s. + /// + /// This is the lowest common denominator between all `ExEx`'s. + /// + /// This block is used to (amongst other things) determine what blocks are safe to prune. + /// + /// The number is inclusive, i.e. all blocks `<= finished_height` are safe to prune. + Height(BlockNumber), +} + +impl FinishedExExHeight { + /// Returns `true` if not all `ExExs` have emitted a `FinishedHeight` event yet. + pub const fn is_not_ready(&self) -> bool { + matches!(self, Self::NotReady) + } +} diff --git a/crates/primitives/exex/src/launcher.rs b/crates/primitives/exex/src/launcher.rs new file mode 100644 index 000000000..752078791 --- /dev/null +++ b/crates/primitives/exex/src/launcher.rs @@ -0,0 +1,124 @@ +use std::future::Future; +use std::sync::Arc; + +use futures::{ + future::{self, BoxFuture}, + FutureExt, +}; +use mp_chain_config::ChainConfig; + +use crate::{context::ExExContext, ExExHandle, ExExManager, ExExManagerHandle}; + +const DEFAULT_EXEX_MANAGER_CAPACITY: usize = 16; + +pub struct ExExLauncher { + chain_config: Arc, + extensions: Vec<(String, Box)>, +} + +impl ExExLauncher { + /// Create a new `ExExLauncher` with the given extensions. + pub const fn new(chain_config: Arc, extensions: Vec<(String, Box)>) -> Self { + Self { chain_config, extensions } + } + + /// Launches all execution extensions. + /// + /// Spawns all extensions and returns the handle to the exex manager if any extensions are + /// installed. + pub async fn launch(self) -> anyhow::Result> { + let Self { chain_config, extensions } = self; + + if extensions.is_empty() { + // nothing to launch + return Ok(None); + } + + let mut exex_handles = Vec::with_capacity(extensions.len()); + let mut exexes = Vec::with_capacity(extensions.len()); + + for (id, exex) in extensions { + // create a new exex handle + let (handle, events, notifications) = ExExHandle::new(id.clone()); + exex_handles.push(handle); + + // create the launch context for the exex + let context = ExExContext { chain_config: chain_config.clone(), events, notifications }; + + exexes.push(async move { + // init the exex + let exex = exex.launch(context).await.unwrap(); + tokio::spawn(async move { + match exex.await { + Ok(_) => panic!("ExEx {id} finished. ExExes should run indefinitely"), + Err(err) => panic!("ExEx {id} crashed: {err}"), + } + }); + }); + } + + future::join_all(exexes).await; + + let exex_manager = ExExManager::new(exex_handles, DEFAULT_EXEX_MANAGER_CAPACITY); + let handle = exex_manager.handle(); + tokio::spawn(async move { + if let Err(e) = exex_manager.await { + eprintln!("ExExManager error: {:?}", e); + } + }); + Ok(Some(handle)) + } +} + +/// A trait for launching an `ExEx`. +pub trait LaunchExEx: Send { + /// Launches the `ExEx`. + /// + /// The `ExEx` should be able to run independently and emit events on the channels provided in + /// the [`ExExContext`]. + fn launch( + self, + ctx: ExExContext, + ) -> impl Future> + Send>> + Send; +} + +/// A boxed exex future. +pub type BoxExEx = BoxFuture<'static, anyhow::Result<()>>; + +/// A version of [`LaunchExEx`] that returns a boxed future. Makes the trait object-safe. +pub trait BoxedLaunchExEx: Send + Sync { + /// Launches the `ExEx` and returns a boxed future. + fn launch(self: Box, ctx: ExExContext) -> BoxFuture<'static, anyhow::Result>; +} + +/// Implements [`BoxedLaunchExEx`] for any [`LaunchExEx`] that is [Send] and `'static`. +/// +/// Returns a [`BoxFuture`] that resolves to a [`BoxExEx`]. +impl BoxedLaunchExEx for E +where + E: LaunchExEx + Send + Sync + 'static, +{ + fn launch(self: Box, ctx: ExExContext) -> BoxFuture<'static, anyhow::Result> { + async move { + let exex = LaunchExEx::launch(*self, ctx).await?; + Ok(Box::pin(exex) as BoxExEx) + } + .boxed() + } +} + +/// Implements `LaunchExEx` for any closure that takes an [`ExExContext`] and returns a future +/// resolving to an `ExEx`. +impl LaunchExEx for F +where + F: FnOnce(ExExContext) -> Fut + Send, + Fut: Future> + Send, + E: Future> + Send, +{ + fn launch( + self, + ctx: ExExContext, + ) -> impl Future> + Send>> + Send { + self(ctx) + } +} diff --git a/crates/primitives/exex/src/lib.rs b/crates/primitives/exex/src/lib.rs new file mode 100644 index 000000000..b7f7dbb30 --- /dev/null +++ b/crates/primitives/exex/src/lib.rs @@ -0,0 +1,20 @@ +//! Execution Extensions. +//! +//! The code comes from the [reth](https://github.com/paradigmxyz/reth) repository. +//! It has been slightly adapted for Madara & simplified as a proof of concept. +//! -- +//! Source link: +//! https://github.com/paradigmxyz/reth/tree/ea1d04aa75cbd8fcf680c79671290b108642de1a/crates/exex +pub mod context; +pub mod event; +pub mod head; +pub mod launcher; +pub mod manager; +pub mod notification; + +pub use context::ExExContext; +pub use event::ExExEvent; +pub use head::{ExExHead, FinishedExExHeight}; +pub use launcher::{BoxExEx, BoxedLaunchExEx, ExExLauncher, LaunchExEx}; +pub use manager::{ExExHandle, ExExManager, ExExManagerHandle}; +pub use notification::{ExExNotification, ExExNotifications}; diff --git a/crates/primitives/exex/src/manager.rs b/crates/primitives/exex/src/manager.rs new file mode 100644 index 000000000..f7a1d384a --- /dev/null +++ b/crates/primitives/exex/src/manager.rs @@ -0,0 +1,366 @@ +use futures::ready; +use starknet_api::block::BlockNumber; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::{ + collections::VecDeque, + future::poll_fn, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, +}; +use tokio::sync::{ + mpsc::{self, error::SendError, UnboundedReceiver, UnboundedSender}, + watch, +}; +use tokio_util::sync::{PollSendError, PollSender, ReusableBoxFuture}; + +use crate::ExExNotifications; +use crate::{event::ExExEvent, head::FinishedExExHeight, notification::ExExNotification}; + +/// The execution extension manager. +/// +/// The manager is responsible for: +/// +/// - Receiving relevant events from the rest of the node, and sending these to the execution +/// extensions +/// - Backpressure +/// - Error handling +/// - Monitoring +#[derive(Debug)] +pub struct ExExManager { + /// Handles to communicate with the `ExEx`'s. + pub exex_handles: Vec, + + /// [`ExExNotification`] channel from the [`ExExManagerHandle`]s. + pub handle_rx: UnboundedReceiver, + + /// The minimum notification ID currently present in the buffer. + min_id: usize, + /// Monotonically increasing ID for [`ExExNotification`]s. + next_id: usize, + + /// Internal buffer of [`ExExNotification`]s. + /// + /// The first element of the tuple is a monotonically increasing ID unique to the notification + /// (the second element of the tuple). + buffer: VecDeque<(usize, ExExNotification)>, + /// Max size of the internal state notifications buffer. + max_capacity: usize, + /// Current state notifications buffer capacity. + /// + /// Used to inform the execution stage of possible batch sizes. + current_capacity: Arc, + + /// Whether the manager is ready to receive new notifications. + is_ready: watch::Sender, + + /// The finished height of all `ExEx`'s. + finished_height: watch::Sender, + + /// A handle to the `ExEx` manager. + handle: ExExManagerHandle, +} + +impl ExExManager { + /// Create a new [`ExExManager`]. + /// + /// You must provide an [`ExExHandle`] for each `ExEx` and the maximum capacity of the + /// notification buffer in the manager. + /// + /// When the capacity is exceeded (which can happen if an `ExEx` is slow) no one can send + /// notifications over [`ExExManagerHandle`]s until there is capacity again. + pub fn new(handles: Vec, max_capacity: usize) -> Self { + let num_exexs = handles.len(); + + let (handle_tx, handle_rx) = mpsc::unbounded_channel(); + let (is_ready_tx, is_ready_rx) = watch::channel(true); + let (finished_height_tx, finished_height_rx) = + watch::channel(if num_exexs == 0 { FinishedExExHeight::NoExExs } else { FinishedExExHeight::NotReady }); + + let current_capacity = Arc::new(AtomicUsize::new(max_capacity)); + + Self { + exex_handles: handles, + + handle_rx, + + min_id: 0, + next_id: 0, + buffer: VecDeque::with_capacity(max_capacity), + max_capacity, + current_capacity: Arc::clone(¤t_capacity), + + is_ready: is_ready_tx, + finished_height: finished_height_tx, + + handle: ExExManagerHandle { + exex_tx: handle_tx, + num_exexs, + is_ready_receiver: is_ready_rx.clone(), + is_ready: ReusableBoxFuture::new(make_wait_future(is_ready_rx)), + current_capacity, + finished_height: finished_height_rx, + }, + } + } + + /// Returns the handle to the manager. + pub fn handle(&self) -> ExExManagerHandle { + self.handle.clone() + } + /// Updates the current buffer capacity and notifies all `is_ready` watchers of the manager's + /// readiness to receive notifications. + fn update_capacity(&self) { + let capacity = self.max_capacity.saturating_sub(self.buffer.len()); + self.current_capacity.store(capacity, Ordering::Relaxed); + + // we can safely ignore if the channel is closed, since the manager always holds it open + // internally + let _ = self.is_ready.send(capacity > 0); + } + + /// Pushes a new notification into the managers internal buffer, assigning the notification a + /// unique ID. + pub fn push_notification(&mut self, notification: ExExNotification) { + let next_id = self.next_id; + self.buffer.push_back((next_id, notification)); + self.next_id += 1; + } +} + +impl std::future::Future for ExExManager { + type Output = anyhow::Result<()>; + + /// Main loop of the [`ExExManager`]. The order of operations is as follows: + /// 1. Handle incoming ExEx events. + /// 2. Drain [`ExExManagerHandle`] notifications, push them to the internal buffer and update + /// the internal buffer capacity. + /// 3. Send notifications from the internal buffer to those ExExes that are ready to receive new + /// notifications. + /// 4. Remove notifications from the internal buffer that have been sent to **all** ExExes and + /// update the internal buffer capacity. + /// 5. Update the channel with the lowest [`FinishedExExHeight`] among all ExExes. + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + // Handle incoming ExEx events + for exex in &mut this.exex_handles { + while let Poll::Ready(Some(event)) = exex.receiver.poll_recv(cx) { + match event { + ExExEvent::FinishedHeight(height) => exex.finished_height = Some(height), + } + } + } + + // Drain handle notifications + while this.buffer.len() < this.max_capacity { + if let Poll::Ready(Some(notification)) = this.handle_rx.poll_recv(cx) { + this.push_notification(notification); + continue; + } + break; + } + + // Update capacity + this.update_capacity(); + + // Advance all poll senders + let mut min_id = usize::MAX; + for idx in (0..this.exex_handles.len()).rev() { + let mut exex = this.exex_handles.swap_remove(idx); + + // It is a logic error for this to ever underflow since the manager manages the + // notification IDs + let notification_index = exex + .next_notification_id + .checked_sub(this.min_id) + .expect("exex expected notification ID outside the manager's range"); + if let Some(notification) = this.buffer.get(notification_index) { + if let Poll::Ready(Err(err)) = exex.send(cx, notification) { + // The channel was closed, which is irrecoverable for the manager + return Poll::Ready(Err(err.into())); + } + } + min_id = min_id.min(exex.next_notification_id); + this.exex_handles.push(exex); + } + + // Remove processed buffered notifications + this.buffer.retain(|&(id, _)| id >= min_id); + this.min_id = min_id; + + // Update capacity + this.update_capacity(); + + // Update watch channel block number + let finished_height = this + .exex_handles + .iter_mut() + .try_fold(u64::MAX, |curr, exex| exex.finished_height.map_or(Err(()), |height| Ok(height.0.min(curr)))); + + if let Ok(finished_height) = finished_height { + let _ = this.finished_height.send(FinishedExExHeight::Height(BlockNumber(finished_height))); + } + + Poll::Pending + } +} + +/// A handle to communicate with the [`ExExManager`]. +#[derive(Debug)] +pub struct ExExManagerHandle { + /// Channel to send notifications to the `ExEx` manager. + exex_tx: UnboundedSender, + /// The number of `ExEx`'s running on the node. + num_exexs: usize, + /// A watch channel denoting whether the manager is ready for new notifications or not. + /// This is stored internally alongside a `ReusableBoxFuture` representation of the same value. + /// This field is only used to create a new `ReusableBoxFuture` when the handle is cloned, + /// but is otherwise unused. + is_ready_receiver: watch::Receiver, + /// A reusable future that resolves when the manager is ready for new + /// notifications. + is_ready: ReusableBoxFuture<'static, watch::Receiver>, + /// The current capacity of the manager's internal notification buffer. + current_capacity: Arc, + /// The finished height of all `ExEx`'s. + finished_height: watch::Receiver, +} + +impl ExExManagerHandle { + /// Synchronously send a notification over the channel to all execution extensions. + /// + /// Senders should call [`Self::has_capacity`] first. + pub fn send(&self, notification: ExExNotification) -> Result<(), SendError> { + self.exex_tx.send(notification) + } + + /// Asynchronously send a notification over the channel to all execution extensions. + /// + /// The returned future resolves when the notification has been delivered. If there is no + /// capacity in the channel, the future will wait. + pub async fn send_async(&mut self, notification: ExExNotification) -> Result<(), SendError> { + self.ready().await; + self.exex_tx.send(notification) + } + + /// Get the current capacity of the `ExEx` manager's internal notification buffer. + pub fn capacity(&self) -> usize { + self.current_capacity.load(Ordering::Relaxed) + } + + /// Whether there is capacity in the `ExEx` manager's internal notification buffer. + /// + /// If this returns `false`, the owner of the handle should **NOT** send new notifications over + /// the channel until the manager is ready again, as this can lead to unbounded memory growth. + pub fn has_capacity(&self) -> bool { + self.capacity() > 0 + } + + /// Returns `true` if there are `ExEx`'s installed in the node. + pub const fn has_exexs(&self) -> bool { + self.num_exexs > 0 + } + + /// The finished height of all `ExEx`'s. + pub fn finished_height(&self) -> watch::Receiver { + self.finished_height.clone() + } + + /// Wait until the manager is ready for new notifications. + pub async fn ready(&mut self) { + poll_fn(|cx| self.poll_ready(cx)).await + } + + /// Wait until the manager is ready for new notifications. + pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<()> { + let rx = ready!(self.is_ready.poll(cx)); + self.is_ready.set(make_wait_future(rx)); + Poll::Ready(()) + } +} + +impl Clone for ExExManagerHandle { + fn clone(&self) -> Self { + Self { + exex_tx: self.exex_tx.clone(), + num_exexs: self.num_exexs, + is_ready_receiver: self.is_ready_receiver.clone(), + is_ready: ReusableBoxFuture::new(make_wait_future(self.is_ready_receiver.clone())), + current_capacity: self.current_capacity.clone(), + finished_height: self.finished_height.clone(), + } + } +} + +/// A handle to an `ExEx` used by the [`ExExManager`] to communicate with `ExEx`'s. +/// +/// A handle should be created for each `ExEx` with a unique ID. The channels returned by +/// [`ExExHandle::new`] should be given to the `ExEx`, while the handle itself should be given to +/// the manager in [`ExExManager::new`]. +#[derive(Debug)] +pub struct ExExHandle { + /// The execution extension's ID. + pub id: String, + /// Channel to send [`ExExNotification`]s to the `ExEx`. + pub sender: PollSender, + /// Channel to receive [`ExExEvent`]s from the `ExEx`. + receiver: UnboundedReceiver, + /// The ID of the next notification to send to this `ExEx`. + next_notification_id: usize, + /// The finished block of the `ExEx`. + /// + /// If this is `None`, the `ExEx` has not emitted a `FinishedHeight` event. + finished_height: Option, +} + +impl ExExHandle { + pub fn new(id: String) -> (Self, UnboundedSender, ExExNotifications) { + let (notification_tx, notification_rx) = mpsc::channel(1); + let (event_tx, event_rx) = mpsc::unbounded_channel(); + let notifications = ExExNotifications::new(notification_rx); + + ( + Self { + id: id.clone(), + sender: PollSender::new(notification_tx), + receiver: event_rx, + next_notification_id: 0, + finished_height: None, + }, + event_tx, + notifications, + ) + } + + /// Reserves a slot in the `PollSender` channel and sends the notification if the slot was + /// successfully reserved. + /// + /// When the notification is sent, it is considered delivered. + fn send( + &mut self, + cx: &mut Context<'_>, + (notification_id, notification): &(usize, ExExNotification), + ) -> Poll>> { + match self.sender.poll_reserve(cx) { + Poll::Ready(Ok(())) => (), + other => return other, + } + + match self.sender.send_item(notification.clone()) { + Ok(()) => { + self.next_notification_id = notification_id + 1; + Poll::Ready(Ok(())) + } + Err(err) => Poll::Ready(Err(err)), + } + } +} + +/// Creates a future that resolves once the given watch channel receiver is true. +async fn make_wait_future(mut rx: watch::Receiver) -> watch::Receiver { + let _ = rx.wait_for(|ready| *ready).await; + rx +} diff --git a/crates/primitives/exex/src/notification.rs b/crates/primitives/exex/src/notification.rs new file mode 100644 index 000000000..a24b967d9 --- /dev/null +++ b/crates/primitives/exex/src/notification.rs @@ -0,0 +1,48 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use futures::Stream; +use starknet_api::block::BlockNumber; +use tokio::sync::mpsc::Receiver; + +/// Notifications sent to an `ExEx`. +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub enum ExExNotification { + /// Chain got committed without a reorg, and only the new chain is returned. + BlockClosed { + /// The new chain after commit. + new: BlockNumber, + }, +} + +impl ExExNotification { + /// Returns the committed chain. + pub fn closed_block(&self) -> BlockNumber { + match self { + Self::BlockClosed { new } => *new, + } + } +} + +/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. +#[derive(Debug)] +pub struct ExExNotifications { + notifications: Receiver, +} + +impl ExExNotifications { + /// Creates a new instance of [`ExExNotifications`]. + pub const fn new(notifications: Receiver) -> Self { + Self { notifications } + } +} + +impl Stream for ExExNotifications { + type Item = ExExNotification; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_mut().notifications.poll_recv(cx) + } +} diff --git a/infra/pragma/pragma-env.yaml b/infra/pragma/pragma-env.yaml index df4cb2304..3e5b9b148 100644 --- a/infra/pragma/pragma-env.yaml +++ b/infra/pragma/pragma-env.yaml @@ -7,4 +7,4 @@ environments: config: --network: "devnet" --chain-config-path: "./pragma-devnet.yaml" - --devnet: "" \ No newline at end of file + --devnet: ""