From 9067b41bab10224cc3978ee3150c5ed576698864 Mon Sep 17 00:00:00 2001 From: Michael Eberhardt <64731211+neutrinoks@users.noreply.github.com> Date: Mon, 28 Oct 2024 10:08:56 +0400 Subject: [PATCH] feat: enable output-format by return root-event as well --- Cargo.lock | 6 +- Cargo.toml | 5 +- cli/polka-storage-provider/server/src/rpc.rs | 11 +- cli/polka-storage/storagext-cli/Cargo.toml | 1 + .../storagext-cli/src/cmd/market.rs | 109 +++---- .../storagext-cli/src/cmd/mod.rs | 28 +- .../storagext-cli/src/cmd/storage_provider.rs | 165 ++++++----- cli/polka-storage/storagext/Cargo.toml | 1 + .../storagext/src/clients/market.rs | 29 +- .../storagext/src/clients/storage_provider.rs | 44 ++- .../storagext/src/runtime/client.rs | 266 +++++------------- 11 files changed, 289 insertions(+), 376 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0c28eba6..0055e68f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5881,9 +5881,9 @@ dependencies = [ [[package]] name = "libm" -version = "0.2.8" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +checksum = "3bda4c6077b0b08da2c48b172195795498381a7c8988c9e6212a6c55c5b9bd70" [[package]] name = "libp2p" @@ -15762,6 +15762,7 @@ dependencies = [ "itertools 0.13.0", "parity-scale-codec", "primitives-proofs", + "scale-decode", "serde", "serde_json", "sha2 0.10.8", @@ -15786,6 +15787,7 @@ dependencies = [ "hex", "parity-scale-codec", "primitives-proofs", + "scale-decode", "serde", "serde_json", "storagext", diff --git a/Cargo.toml b/Cargo.toml index f99d46bc..b73e97fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,6 +88,8 @@ rand = { version = "0.8.5", default-features = false } rand_chacha = { version = "0.3.1", default-features = false } rand_xorshift = "0.3" rocksdb = { version = "0.21" } +scale-decode = { version = "0.13.1", default-features = false } +scale-encode = { version = "0.7.1", default-features = false } scale-info = { version = "2.11.1", default-features = false } serde = { version = "1.0.197", default-features = false } serde-big-array = { version = "0.3.2" } @@ -111,9 +113,6 @@ tracing-subscriber = "0.3.18" url = "2.5.0" uuid = "1.8.0" -scale-decode = { version = "0.13.1", default-features = false } -scale-encode = { version = "0.7.1", default-features = false } - # Testing rstest = { version = "0.22.0" } diff --git a/cli/polka-storage-provider/server/src/rpc.rs b/cli/polka-storage-provider/server/src/rpc.rs index 7218566e..0f0baae3 100644 --- a/cli/polka-storage-provider/server/src/rpc.rs +++ b/cli/polka-storage-provider/server/src/rpc.rs @@ -89,15 +89,20 @@ impl StorageProviderRpcServer for RpcServerState { )); } - let result = self + let submission_result = self .xt_client .publish_signed_storage_deals(&self.xt_keypair, vec![deal], true) .await? .expect("requested to return submission-result"); + let events = if let Ok(events) = submission_result { + events + } else { + return Err(RpcError::internal_error("pallet returned an error", None)); + }; // We currently just support a single deal and if there's no published deals, // an error MUST've happened - debug_assert_eq!(result.len(), 1); + debug_assert_eq!(events.len(), 1); // We always publish only 1 deal let deal_id = published_deals[0].deal_id; @@ -145,7 +150,7 @@ impl StorageProviderRpcServer for RpcServerState { tracing::info!("{:?}", precommit_result); }); - Ok(result.event[0].deal_id) + Ok(events[0].variant.deal_id) } } diff --git a/cli/polka-storage/storagext-cli/Cargo.toml b/cli/polka-storage/storagext-cli/Cargo.toml index 15929b2c..f445f52e 100644 --- a/cli/polka-storage/storagext-cli/Cargo.toml +++ b/cli/polka-storage/storagext-cli/Cargo.toml @@ -20,6 +20,7 @@ codec.workspace = true frame-support = { workspace = true, features = ["std"] } hex = { workspace = true, features = ["serde", "std"] } primitives-proofs = { workspace = true } +scale-decode.workspace = true serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } subxt = { workspace = true, features = ["jsonrpsee", "substrate-compat"] } diff --git a/cli/polka-storage/storagext-cli/src/cmd/market.rs b/cli/polka-storage/storagext-cli/src/cmd/market.rs index bc06b7b1..cbe95950 100644 --- a/cli/polka-storage/storagext-cli/src/cmd/market.rs +++ b/cli/polka-storage/storagext-cli/src/cmd/market.rs @@ -6,7 +6,10 @@ use primitives_proofs::DealId; use storagext::{ deser::DeserializablePath, multipair::{DebugPair, MultiPairSigner}, - runtime::{market::events as MarketEvents, HashOfPsc, SubmissionResult}, + runtime::{ + market::{events as MktEvents, Event as MktEvent}, + HashOfPsc, SubmissionResult, + }, types::market::DealProposal as SxtDealProposal, MarketClientExt, PolkaStorageConfig, }; @@ -18,6 +21,25 @@ use url::Url; use super::display_submission_result; use crate::{missing_keypair_error, operation_takes_a_while, OutputFormat}; +macro_rules! trace_submission_result { + ($submission_result:expr, $format:expr $(,$par1:expr)*) => ( + if let Some(result) = $submission_result { + if let Ok(events) = result { + tracing::debug!( + $format, + events[0].hash, + $($par1),* + ); + Ok(Some(Ok(events))) + } else { + Ok(Some(result)) + } + } else { + Ok(None) + } + ) +} + #[derive(Debug, Subcommand)] #[command(name = "market", about = "CLI Client to the Market Pallet", version)] pub(crate) enum MarketCommand { @@ -135,14 +157,16 @@ impl MarketCommand { where Client: MarketClientExt, { - operation_takes_a_while(); + if wait_for_finalization { + operation_takes_a_while(); + } match self { MarketCommand::AddBalance { amount } => { let opt_result = Self::add_balance(client, account_keypair, amount, wait_for_finalization) .await?; - display_submission_result::<_>(opt_result, output_format)?; + display_submission_result::<_, _>(opt_result, output_format)?; } MarketCommand::SettleDealPayments { deal_ids } => { if deal_ids.is_empty() { @@ -156,13 +180,13 @@ impl MarketCommand { wait_for_finalization, ) .await?; - display_submission_result::<_>(opt_result, output_format)?; + display_submission_result::<_, _>(opt_result, output_format)?; } MarketCommand::WithdrawBalance { amount } => { let opt_result = Self::withdraw_balance(client, account_keypair, amount, wait_for_finalization) .await?; - display_submission_result::<_>(opt_result, output_format)?; + display_submission_result::<_, _>(opt_result, output_format)?; } MarketCommand::PublishStorageDeals { deals, @@ -185,7 +209,7 @@ impl MarketCommand { wait_for_finalization, ) .await?; - display_submission_result::<_>(opt_result, output_format)?; + display_submission_result::<_, _>(opt_result, output_format)?; } _unsigned => unreachable!("unsigned commands should have been previously handled"), }; @@ -198,23 +222,18 @@ impl MarketCommand { account_keypair: MultiPairSigner, amount: u128, wait_for_finalization: bool, - ) -> Result>, subxt::Error> + ) -> Result>, subxt::Error> where Client: MarketClientExt, { let submission_result = client .add_balance(&account_keypair, amount, wait_for_finalization) .await?; - if let Some(result) = submission_result { - tracing::debug!( - "[{}] Successfully added {} to Market Balance", - result.hash[0], - amount - ); - Ok(Some(result)) - } else { - Ok(None) - } + trace_submission_result!( + submission_result, + "[{}] Successfully added {} to Market Balance", + amount + ) } async fn publish_storage_deals( @@ -223,10 +242,11 @@ impl MarketCommand { client_keypair: MultiPairSigner, deals: Vec, wait_for_finalization: bool, - ) -> Result>, subxt::Error> + ) -> Result>, subxt::Error> where Client: MarketClientExt, { + let n_deals = deals.len(); let submission_result = client .publish_storage_deals( &account_keypair, @@ -235,16 +255,11 @@ impl MarketCommand { wait_for_finalization, ) .await?; - if let Some(result) = submission_result { - tracing::debug!( - "[{}] Successfully published {} storage deals", - result.hash[0], - result.len() - ); - Ok(Some(result)) - } else { - Ok(None) - } + trace_submission_result!( + submission_result, + "[{}] Successfully published {} storage deals", + n_deals + ) } async fn settle_deal_payments( @@ -252,23 +267,19 @@ impl MarketCommand { account_keypair: MultiPairSigner, deal_ids: Vec, wait_for_finalization: bool, - ) -> Result>, subxt::Error> + ) -> Result>, subxt::Error> where Client: MarketClientExt, { + let n_deal_ids = deal_ids.len(); let submission_result = client .settle_deal_payments(&account_keypair, deal_ids, wait_for_finalization) .await?; - if let Some(result) = submission_result { - tracing::debug!( - "[{}] Successfully settled {} deal payments", - result.hash[0], - result.len() - ); - Ok(Some(result)) - } else { - Ok(None) - } + trace_submission_result!( + submission_result, + "[{}] Successfully settled {} deal payments", + n_deal_ids + ) } async fn withdraw_balance( @@ -276,22 +287,20 @@ impl MarketCommand { account_keypair: MultiPairSigner, amount: u128, wait_for_finalization: bool, - ) -> Result>, subxt::Error> + ) -> Result< + Option>, + subxt::Error, + > where Client: MarketClientExt, { let submission_result = client .withdraw_balance(&account_keypair, amount, wait_for_finalization) .await?; - if let Some(result) = submission_result { - tracing::debug!( - "[{}] Successfully withdrew {} from Market Balance", - result.hash[0], - amount - ); - Ok(Some(result)) - } else { - Ok(None) - } + trace_submission_result!( + submission_result, + "[{}] Successfully withdrew {} from Market Balance", + amount + ) } } diff --git a/cli/polka-storage/storagext-cli/src/cmd/mod.rs b/cli/polka-storage/storagext-cli/src/cmd/mod.rs index a1be5f95..e71904ec 100644 --- a/cli/polka-storage/storagext-cli/src/cmd/mod.rs +++ b/cli/polka-storage/storagext-cli/src/cmd/mod.rs @@ -6,27 +6,19 @@ use storagext::runtime::{HashOfPsc, SubmissionResult}; use crate::OutputFormat; -pub(crate) fn display_submission_result( - opt_result: Option>, - _output_format: OutputFormat, +pub(crate) fn display_submission_result( + opt_result: Option>, + output_format: OutputFormat, ) -> Result<(), anyhow::Error> where - Event: subxt::events::StaticEvent, + Event: scale_decode::DecodeAsType + std::fmt::Display + serde::Serialize, { - if let Some(result) = opt_result { - // TODO(@neutrinoks,24.10.24): Check if we can return as root event instead to enable this - // display possibility again. - // let output = output_format.format(&result.event)?; - // match output_format { - // OutputFormat::Plain => println!("[{}] {}", result.hash, output), - // OutputFormat::Json => println!("{}", output), - // } - println!( - "[{}] {}::{}", - result.hash[0], - ::PALLET, - ::EVENT - ); + if let Some(Ok(events)) = opt_result { + let output = output_format.format(&events[0].event)?; + match output_format { + OutputFormat::Plain => println!("[{}] {}", events[0].hash, output), + OutputFormat::Json => println!("{}", output), + } } Ok(()) diff --git a/cli/polka-storage/storagext-cli/src/cmd/storage_provider.rs b/cli/polka-storage/storagext-cli/src/cmd/storage_provider.rs index f44e92a2..626cce68 100644 --- a/cli/polka-storage/storagext-cli/src/cmd/storage_provider.rs +++ b/cli/polka-storage/storagext-cli/src/cmd/storage_provider.rs @@ -7,7 +7,8 @@ use storagext::{ multipair::MultiPairSigner, runtime::{ runtime_types::pallet_storage_provider::sector::ProveCommitSector as RuntimeProveCommitSector, - storage_provider::events as SpEvents, HashOfPsc, SubmissionResult, + storage_provider::{events as SpEvents, Event as SpEvent}, + HashOfPsc, SubmissionResult, }, types::storage_provider::{ FaultDeclaration as SxtFaultDeclaration, ProveCommitSector as SxtProveCommitSector, @@ -23,6 +24,25 @@ use url::Url; use super::display_submission_result; use crate::{missing_keypair_error, operation_takes_a_while, OutputFormat}; +macro_rules! trace_submission_result { + ($submission_result:expr, $format:expr $(,$par1:expr)*) => ( + if let Some(result) = $submission_result { + if let Ok(events) = result { + tracing::debug!( + $format, + events[0].hash, + $($par1),* + ); + Ok(Some(Ok(events))) + } else { + Ok(Some(result)) + } + } else { + Ok(None) + } + ) +} + fn parse_post_proof(src: &str) -> Result { match src { "2KiB" => Ok(RegisteredPoStProof::StackedDRGWindow2KiBV1P1), @@ -155,7 +175,9 @@ impl StorageProviderCommand { where Client: StorageProviderClientExt, { - operation_takes_a_while(); + if wait_for_finalization { + operation_takes_a_while(); + } match self { StorageProviderCommand::RegisterStorageProvider { @@ -170,7 +192,7 @@ impl StorageProviderCommand { wait_for_finalization, ) .await?; - display_submission_result::<_>(opt_result, output_format)?; + display_submission_result::<_, _>(opt_result, output_format)?; } StorageProviderCommand::PreCommit { pre_commit_sectors } => { let opt_result = Self::pre_commit( @@ -180,7 +202,7 @@ impl StorageProviderCommand { wait_for_finalization, ) .await?; - display_submission_result::<_>(opt_result, output_format)?; + display_submission_result::<_, _>(opt_result, output_format)?; } StorageProviderCommand::ProveCommit { prove_commit_sectors, @@ -192,7 +214,7 @@ impl StorageProviderCommand { wait_for_finalization, ) .await?; - display_submission_result::<_>(opt_result, output_format)?; + display_submission_result::<_, _>(opt_result, output_format)?; } StorageProviderCommand::SubmitWindowedProofOfSpaceTime { windowed_post } => { let opt_result = Self::submit_windowed_post( @@ -202,13 +224,13 @@ impl StorageProviderCommand { wait_for_finalization, ) .await?; - display_submission_result::<_>(opt_result, output_format)?; + display_submission_result::<_, _>(opt_result, output_format)?; } StorageProviderCommand::DeclareFaults { faults } => { let opt_result = Self::declare_faults(client, account_keypair, faults, wait_for_finalization) .await?; - display_submission_result::<_>(opt_result, output_format)?; + display_submission_result::<_, _>(opt_result, output_format)?; } StorageProviderCommand::DeclareFaultsRecovered { recoveries } => { let opt_result = Self::declare_faults_recovered( @@ -218,7 +240,7 @@ impl StorageProviderCommand { wait_for_finalization, ) .await?; - display_submission_result::<_>(opt_result, output_format)?; + display_submission_result::<_, _>(opt_result, output_format)?; } StorageProviderCommand::TerminateSectors { terminations } => { let opt_result = Self::terminate_sectors( @@ -228,7 +250,7 @@ impl StorageProviderCommand { wait_for_finalization, ) .await?; - display_submission_result::<_>(opt_result, output_format)?; + display_submission_result::<_, _>(opt_result, output_format)?; } _unsigned => unreachable!("unsigned commands should have been previously handled"), } @@ -243,7 +265,7 @@ impl StorageProviderCommand { post_proof: RegisteredPoStProof, wait_for_finalization: bool, ) -> Result< - Option>, + Option>, subxt::Error, > where @@ -257,17 +279,12 @@ impl StorageProviderCommand { wait_for_finalization, ) .await?; - if let Some(result) = submission_result { - tracing::debug!( - "[{}] Successfully registered {}, seal: {:?} in Storage Provider Pallet", - result.hash[0], - peer_id, - post_proof - ); - Ok(Some(result)) - } else { - Ok(None) - } + trace_submission_result!( + submission_result, + "[{}] Successfully registered {}, seal: {:?} in Storage Provider Pallet", + peer_id, + post_proof + ) } async fn pre_commit( @@ -275,7 +292,10 @@ impl StorageProviderCommand { account_keypair: MultiPairSigner, pre_commit_sectors: Vec, wait_for_finalization: bool, - ) -> Result>, subxt::Error> + ) -> Result< + Option>, + subxt::Error, + > where Client: StorageProviderClientExt, { @@ -288,16 +308,11 @@ impl StorageProviderCommand { let submission_result = client .pre_commit_sectors(&account_keypair, pre_commit_sectors, wait_for_finalization) .await?; - if let Some(result) = submission_result { - tracing::debug!( - "[{}] Successfully pre-commited sectors {:?}.", - result.hash[0], - sector_numbers - ); - Ok(Some(result)) - } else { - Ok(None) - } + trace_submission_result!( + submission_result, + "[{}] Successfully pre-commited sectors {:?}.", + sector_numbers + ) } async fn prove_commit( @@ -305,7 +320,7 @@ impl StorageProviderCommand { account_keypair: MultiPairSigner, prove_commit_sectors: Vec, wait_for_finalization: bool, - ) -> Result>, subxt::Error> + ) -> Result>, subxt::Error> where Client: StorageProviderClientExt, { @@ -326,16 +341,11 @@ impl StorageProviderCommand { wait_for_finalization, ) .await?; - if let Some(result) = submission_result { - tracing::debug!( - "[{}] Successfully proven sector {:?}.", - result.hash[0], - sector_numbers - ); - Ok(Some(result)) - } else { - Ok(None) - } + trace_submission_result!( + submission_result, + "[{}] Successfully proven sector {:?}.", + sector_numbers + ) } async fn submit_windowed_post( @@ -343,7 +353,10 @@ impl StorageProviderCommand { account_keypair: MultiPairSigner, windowed_post: SxtSubmitWindowedPoStParams, wait_for_finalization: bool, - ) -> Result>, subxt::Error> + ) -> Result< + Option>, + subxt::Error, + > where Client: StorageProviderClientExt, { @@ -354,12 +367,7 @@ impl StorageProviderCommand { wait_for_finalization, ) .await?; - if let Some(result) = submission_result { - tracing::debug!("[{}] Successfully submitted proof.", result.hash[0]); - Ok(Some(result)) - } else { - Ok(None) - } + trace_submission_result!(submission_result, "[{}] Successfully submitted proof.") } async fn declare_faults( @@ -367,23 +375,19 @@ impl StorageProviderCommand { account_keypair: MultiPairSigner, faults: Vec, wait_for_finalization: bool, - ) -> Result>, subxt::Error> + ) -> Result>, subxt::Error> where Client: StorageProviderClientExt, { + let n_faults = faults.len(); let submission_result = client .declare_faults(&account_keypair, faults, wait_for_finalization) .await?; - if let Some(result) = submission_result { - tracing::debug!( - "[{}] Successfully declared {} faults.", - result.hash[0], - result.len() - ); - Ok(Some(result)) - } else { - Ok(None) - } + trace_submission_result!( + submission_result, + "[{}] Successfully declared {} faults.", + n_faults + ) } async fn declare_faults_recovered( @@ -391,23 +395,19 @@ impl StorageProviderCommand { account_keypair: MultiPairSigner, recoveries: Vec, wait_for_finalization: bool, - ) -> Result>, subxt::Error> + ) -> Result>, subxt::Error> where Client: StorageProviderClientExt, { + let n_recoveries = recoveries.len(); let submission_result = client .declare_faults_recovered(&account_keypair, recoveries, wait_for_finalization) .await?; - if let Some(result) = submission_result { - tracing::debug!( - "[{}] Successfully declared {} faults.", - result.hash[0], - result.len() - ); - Ok(Some(result)) - } else { - Ok(None) - } + trace_submission_result!( + submission_result, + "[{}] Successfully declared {} faults.", + n_recoveries + ) } async fn terminate_sectors( @@ -415,22 +415,21 @@ impl StorageProviderCommand { account_keypair: MultiPairSigner, terminations: Vec, wait_for_finalization: bool, - ) -> Result>, subxt::Error> + ) -> Result< + Option>, + subxt::Error, + > where Client: StorageProviderClientExt, { + let n_terminations = terminations.len(); let submission_result = client .terminate_sectors(&account_keypair, terminations, wait_for_finalization) .await?; - if let Some(result) = submission_result { - tracing::debug!( - "[{}] Successfully terminated {} sectors.", - result.hash[0], - result.len() - ); - Ok(Some(result)) - } else { - Ok(None) - } + trace_submission_result!( + submission_result, + "[{}] Successfully terminated {} sectors.", + n_terminations + ) } } diff --git a/cli/polka-storage/storagext/Cargo.toml b/cli/polka-storage/storagext/Cargo.toml index 44b21ec6..1fe8622e 100644 --- a/cli/polka-storage/storagext/Cargo.toml +++ b/cli/polka-storage/storagext/Cargo.toml @@ -22,6 +22,7 @@ futures.workspace = true hex = { workspace = true, features = ["serde"] } itertools = { workspace = true } primitives-proofs = { workspace = true, features = ["serde"] } +scale-decode.workspace = true serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } sha2 = { workspace = true } diff --git a/cli/polka-storage/storagext/src/clients/market.rs b/cli/polka-storage/storagext/src/clients/market.rs index afe33728..9b4fe103 100644 --- a/cli/polka-storage/storagext/src/clients/market.rs +++ b/cli/polka-storage/storagext/src/clients/market.rs @@ -7,7 +7,7 @@ use crate::{ runtime::{ self, client::{HashOfPsc, SubmissionResult}, - market::events as MarketEvents, + market::{events as MktEvents, Event as MktEvent}, runtime_types::pallet_market::pallet::{ BalanceEntry, ClientDealProposal as RuntimeClientDealProposal, }, @@ -38,7 +38,7 @@ pub trait MarketClientExt { wait_for_finalization: bool, ) -> impl Future< Output = Result< - Option>, + Option>, subxt::Error, >, > @@ -53,7 +53,7 @@ pub trait MarketClientExt { wait_for_finalization: bool, ) -> impl Future< Output = Result< - Option>, + Option>, subxt::Error, >, > @@ -70,7 +70,7 @@ pub trait MarketClientExt { wait_for_finalization: bool, ) -> impl Future< Output = Result< - Option>, + Option>, subxt::Error, >, > @@ -88,7 +88,7 @@ pub trait MarketClientExt { wait_for_finalization: bool, ) -> impl Future< Output = Result< - Option>, + Option>, subxt::Error, >, > @@ -106,7 +106,7 @@ pub trait MarketClientExt { wait_for_finalization: bool, ) -> impl Future< Output = Result< - Option>, + Option>, subxt::Error, >, > @@ -134,7 +134,10 @@ impl MarketClientExt for crate::runtime::client::Client { account_keypair: &Keypair, amount: Currency, wait_for_finalization: bool, - ) -> Result>, subxt::Error> + ) -> Result< + Option>, + subxt::Error, + > where Keypair: subxt::tx::Signer, { @@ -156,7 +159,7 @@ impl MarketClientExt for crate::runtime::client::Client { account_keypair: &Keypair, amount: Currency, wait_for_finalization: bool, - ) -> Result>, subxt::Error> + ) -> Result>, subxt::Error> where Keypair: subxt::tx::Signer, { @@ -178,15 +181,15 @@ impl MarketClientExt for crate::runtime::client::Client { account_keypair: &Keypair, mut deal_ids: Vec, wait_for_finalization: bool, - ) -> Result>, subxt::Error> + ) -> Result>, subxt::Error> where Keypair: subxt::tx::Signer, { + let n_deal_ids = deal_ids.len(); if deal_ids.len() > MAX_N_DEALS { tracing::warn!("more than {} deal ids, truncating", MAX_N_DEALS); deal_ids.truncate(MAX_N_DEALS); } - let n_deal_ids = deal_ids.len(); // `deal_ids` has been truncated to fit the proper bound, however, // the `BoundedVec` defined in the `runtime::runtime_types` is actually just a newtype // making the `BoundedVec` actually unbounded @@ -214,17 +217,17 @@ impl MarketClientExt for crate::runtime::client::Client { client_keypair: &ClientKeypair, mut deals: Vec, wait_for_finalization: bool, - ) -> Result>, subxt::Error> + ) -> Result>, subxt::Error> where Keypair: subxt::tx::Signer, ClientKeypair: subxt::tx::Signer, { + let n_deals = deals.len(); if deals.len() > MAX_N_DEALS { tracing::warn!("more than {} deals, truncating", MAX_N_DEALS); deals.truncate(MAX_N_DEALS); } - let n_deals = deals.len(); let signed_deal_proposals = deals .into_iter() .map(|deal| deal.sign(client_keypair)) @@ -258,7 +261,7 @@ impl MarketClientExt for crate::runtime::client::Client { account_keypair: &Keypair, mut deals: Vec, wait_for_finalization: bool, - ) -> Result>, subxt::Error> + ) -> Result>, subxt::Error> where Keypair: subxt::tx::Signer, { diff --git a/cli/polka-storage/storagext/src/clients/storage_provider.rs b/cli/polka-storage/storagext/src/clients/storage_provider.rs index a4f64bfe..ebf8ff17 100644 --- a/cli/polka-storage/storagext/src/clients/storage_provider.rs +++ b/cli/polka-storage/storagext/src/clients/storage_provider.rs @@ -16,7 +16,9 @@ use crate::{ proofs::SubmitWindowedPoStParams, sector::ProveCommitSector, storage_provider::StorageProviderState, }, - storage_provider::{calls::types::register_storage_provider::PeerId, events as SpEvents}, + storage_provider::{ + calls::types::register_storage_provider::PeerId, events as SpEvents, Event as SpEvent, + }, }, types::storage_provider::{ FaultDeclaration, RecoveryDeclaration, SectorPreCommitInfo, TerminationDeclaration, @@ -33,7 +35,7 @@ pub trait StorageProviderClientExt { wait_for_finalization: bool, ) -> impl Future< Output = Result< - Option>, + Option>, subxt::Error, >, > @@ -47,7 +49,7 @@ pub trait StorageProviderClientExt { wait_for_finalization: bool, ) -> impl Future< Output = Result< - Option>, + Option>, subxt::Error, >, > @@ -60,7 +62,10 @@ pub trait StorageProviderClientExt { sectors: Vec, wait_for_finalization: bool, ) -> impl Future< - Output = Result>, subxt::Error>, + Output = Result< + Option>, + subxt::Error, + >, > where Keypair: subxt::tx::Signer; @@ -72,7 +77,7 @@ pub trait StorageProviderClientExt { wait_for_finalization: bool, ) -> impl Future< Output = Result< - Option>, + Option>, subxt::Error, >, > @@ -86,7 +91,7 @@ pub trait StorageProviderClientExt { wait_for_finalization: bool, ) -> impl Future< Output = Result< - Option>, + Option>, subxt::Error, >, > @@ -100,7 +105,7 @@ pub trait StorageProviderClientExt { wait_for_finalization: bool, ) -> impl Future< Output = Result< - Option>, + Option>, subxt::Error, >, > @@ -114,7 +119,7 @@ pub trait StorageProviderClientExt { wait_for_finalization: bool, ) -> impl Future< Output = Result< - Option>, + Option>, subxt::Error, >, > @@ -148,7 +153,7 @@ impl StorageProviderClientExt for crate::runtime::client::Client { post_proof: RegisteredPoStProof, wait_for_finalization: bool, ) -> Result< - Option>, + Option>, subxt::Error, > where @@ -174,7 +179,10 @@ impl StorageProviderClientExt for crate::runtime::client::Client { account_keypair: &Keypair, sectors: Vec, wait_for_finalization: bool, - ) -> Result>, subxt::Error> + ) -> Result< + Option>, + subxt::Error, + > where Keypair: subxt::tx::Signer, { @@ -198,7 +206,7 @@ impl StorageProviderClientExt for crate::runtime::client::Client { account_keypair: &Keypair, sectors: Vec, wait_for_finalization: bool, - ) -> Result>, subxt::Error> + ) -> Result>, subxt::Error> where Keypair: subxt::tx::Signer, { @@ -224,7 +232,10 @@ impl StorageProviderClientExt for crate::runtime::client::Client { account_keypair: &Keypair, windowed_post: SubmitWindowedPoStParams, wait_for_finalization: bool, - ) -> Result>, subxt::Error> + ) -> Result< + Option>, + subxt::Error, + > where Keypair: subxt::tx::Signer, { @@ -248,7 +259,7 @@ impl StorageProviderClientExt for crate::runtime::client::Client { account_keypair: &Keypair, faults: Vec, wait_for_finalization: bool, - ) -> Result>, subxt::Error> + ) -> Result>, subxt::Error> where Keypair: subxt::tx::Signer, { @@ -273,7 +284,7 @@ impl StorageProviderClientExt for crate::runtime::client::Client { account_keypair: &Keypair, recoveries: Vec, wait_for_finalization: bool, - ) -> Result>, subxt::Error> + ) -> Result>, subxt::Error> where Keypair: subxt::tx::Signer, { @@ -297,7 +308,10 @@ impl StorageProviderClientExt for crate::runtime::client::Client { account_keypair: &Keypair, terminations: Vec, wait_for_finalization: bool, - ) -> Result>, subxt::Error> + ) -> Result< + Option>, + subxt::Error, + > where Keypair: subxt::tx::Signer, { diff --git a/cli/polka-storage/storagext/src/runtime/client.rs b/cli/polka-storage/storagext/src/runtime/client.rs index 05efe3a2..61e397a6 100644 --- a/cli/polka-storage/storagext/src/runtime/client.rs +++ b/cli/polka-storage/storagext/src/runtime/client.rs @@ -2,54 +2,37 @@ use std::{sync::Arc, time::Duration, vec::Vec}; use hex::ToHex; use subxt::{ - ext::subxt_core::error::{CustomError, Error as SxtCoreError, ExtrinsicParamsError}, - utils::AccountId32, + blocks::ExtrinsicEvents, + error::BlockError, + ext::subxt_core::error::{CustomError, ExtrinsicParamsError}, OnlineClient, }; -use tokio::{select, sync::RwLock}; +use tokio::sync::RwLock; use tokio_util::sync::CancellationToken; -use crate::{ - runtime::{market::events as MarketEvents, storage_provider::events as SpEvents}, - PolkaStorageConfig, -}; +use crate::PolkaStorageConfig; type HashOf = ::Hash; pub type HashOfPsc = HashOf; -type ParaEvents = Arc, subxt::events::EventDetails)>>>; +type ParaEvents = Arc, ExtrinsicEvents)>>>; type ParaErrors = Arc>>>; -/// Helper type for [`Client::traced_submission`] successful results. +/// This definition defines one single, successful event of an extrinsic execution. For example, +/// one published deal, or one settled deal. #[derive(Debug)] -pub struct SubmissionResult { +pub struct ExtrinsicEvent { /// Submission block hash. - pub hash: Vec, - /// Resulting extrinsic's events. - pub event: Vec, -} - -impl SubmissionResult { - /// New type pattern with empty vectors. - pub fn new() -> Self { - Self { - hash: Vec::new(), - event: Vec::new(), - } - } - - // Like any len() method. - pub fn len(&self) -> usize { - debug_assert_eq!(self.hash.len(), self.event.len()); - self.hash.len() - } + pub hash: Hash, + /// Resulting extrinsic's event. + pub event: Event, + /// Resulting extrinsic's event-variant. + pub variant: Variant, } -impl Default for SubmissionResult { - fn default() -> Self { - Self::new() - } -} +/// Helper type for [`Client::traced_submission`] successful results. +pub type SubmissionResult = + Result>, Box>; /// Client to interact with a pallet extrinsics. /// You can call any extrinsic via [`Client::traced_submission`]. @@ -100,22 +83,21 @@ impl Client { /// /// Equivalent to performing [`OnlineClient::sign_and_submit_then_watch_default`], /// followed by [`TxInBlock::wait_for_finalized`] and [`TxInBlock::wait_for_success`]. - pub(crate) async fn traced_submission( + pub(crate) async fn traced_submission( &self, call: &Call, account_keypair: &Keypair, wait_for_finalization: bool, - n_events: usize, - ) -> Result>, subxt::Error> + expected_results: usize, + ) -> Result>, subxt::Error> where Call: subxt::tx::Payload + std::fmt::Debug, Keypair: subxt::tx::Signer, - Event: subxt::events::StaticEvent + std::fmt::Debug, - EventFilterProvider: EventFilterType, + Event: scale_decode::DecodeAsType + std::fmt::Display, + Variant: subxt::events::StaticEvent, { let para_events: ParaEvents = Arc::new(RwLock::new(Vec::new())); let para_errors: ParaErrors = Arc::new(RwLock::new(Vec::new())); - let account_id = AccountId32::from(account_keypair.account_id()); let cancel_token = CancellationToken::new(); let p_api = self.client.clone(); @@ -141,22 +123,17 @@ impl Client { if !wait_for_finalization { return Ok(None); } + let extrinsic_hash = submission_progress.extrinsic_hash(); tracing::trace!( - extrinsic_hash = submission_progress.extrinsic_hash().encode_hex::(), - "waiting for finalization" + "waiting for finalization {}", + extrinsic_hash.encode_hex::(), ); - static EVENT_META_PROVIDER: EventFilterProvider = EventFilterProvider; - let submission_result = wait_for_para_event::( + let submission_result = wait_for_para_event::( para_events.clone(), para_errors.clone(), - >::pallet_name(&EVENT_META_PROVIDER), - >::event_name(&EVENT_META_PROVIDER), - >::filter_fn( - &EVENT_META_PROVIDER, - account_id, - ), - n_events, + extrinsic_hash, + expected_results, ) .await?; cancel_token.cancel(); @@ -177,54 +154,56 @@ impl From> for Client { /// Methods iterates through the given stack of collected events from the listener and compares for /// a given expected event type, for example `pallet_market::Event::BalanceAdded`. If the event has /// been found it will be returned. -async fn wait_for_para_event( - events: ParaEvents, - _errors: ParaErrors, - pallet: &'static str, - variant: &'static str, - predicate: impl Fn(&E) -> bool, - n_events: usize, -) -> Result, E>, subxt::Error> +async fn wait_for_para_event( + event_stack: ParaEvents, + _error_stack: ParaErrors, + extrinsic_hash: HashOf, + expected_results: usize, +) -> Result, E, V>, subxt::Error> where C: subxt::Config + Clone + std::fmt::Debug, - E: subxt::events::StaticEvent + std::fmt::Debug, + E: scale_decode::DecodeAsType + std::fmt::Display, + V: subxt::events::StaticEvent, { - let mut result = SubmissionResult::, E>::new(); + let mut catched_events = Vec::, E, V>>::new(); loop { // Check for new events from a finalised block. - let mut events = events.write().await; - if let Some(entry) = events - .iter() - .find(|&e| e.2.pallet_name() == pallet && e.2.variant_name() == variant) - { - let event_variant = entry - .2 - .as_event::() - .map_err(|e| subxt::Error::Other(format!("{entry:?}: {e:?}")))? - .ok_or(subxt::Error::Other(format!( - "{entry:?}: inner Option::None" - )))?; - if !predicate(&event_variant) { - continue; - } - let entry = entry.clone(); - events.retain(|e| e.0 > entry.0); - tracing::trace!( - "Found related event {}::{} on block {}", - pallet, - variant, - entry.0 - ); - result.hash.push(entry.1); - result.event.push(event_variant); - if result.len() == n_events { - return Ok(result); + let mut events_lock = event_stack.write().await; + while let Some((hash, ex_events)) = events_lock.pop() { + if ex_events.extrinsic_hash() == extrinsic_hash { + // Currently, it is assumed only one event to be contained, because only one event + // will be emitted in case of a successful extrinsoc. + if let Some(entry) = ex_events.iter().find(|_| true) { + let entry = entry?; + let event = entry + .as_root_event::() + .map_err(|e| subxt::Error::Other(format!("{entry:?}: {e:?}")))?; + let variant = entry + .as_event::() + .map_err(|e| subxt::Error::Other(format!("{entry:?}: {e:?}")))? + .ok_or(subxt::Error::Other(format!( + "{entry:?}: inner option error" + )))?; + tracing::trace!( + "Found related event to extrinsic with hash {:?}", + extrinsic_hash + ); + catched_events.push(ExtrinsicEvent::, E, V> { + hash, + event, + variant, + }); + if catched_events.len() == expected_results { + return Ok(Ok(catched_events)); + } + } } } - drop(events); + drop(events_lock); // Check for new collected custom errors (extrinsic errors). + // Check if one error is sufficient for compound exeuctions (i.e. multiple sectors). // TODO(@neutrinoks,25.10.24): Implement error filtering and test it. // let mut errors = errors.write().await; // while let Some(error) = errors.pop() { @@ -255,7 +234,7 @@ where let mut blocks_sub = api.blocks().subscribe_finalized().await?; loop { - let block = select! { + let block = tokio::select! { _ = token.cancelled() => { break } @@ -263,23 +242,21 @@ where if let Some(block) = block { block? } else { - continue + return Err(subxt::Error::Block(BlockError::NotFound("blocks_sub::next() returned None".to_string()))) } } }; - let hash = block.hash(); + let block_hash = block.hash(); - for event in block.events().await?.iter() { - match event { - Ok(event) => { - events - .write() - .await - .push((block.number().into(), hash, event.clone())); + for extrinsic in block.extrinsics().await?.iter() { + match extrinsic { + Ok(extrinsic) => { + let ex_events = extrinsic.events().await?; + events.write().await.push((block_hash, ex_events)); } Err(error) => { - if let SxtCoreError::ExtrinsicParams(ExtrinsicParamsError::Custom( + if let subxt::Error::ExtrinsicParams(ExtrinsicParamsError::Custom( boxed_custom_err, )) = error { @@ -293,92 +270,3 @@ where tracing::trace!("stopped event-listener"); Ok(()) } - -// TODO(@neutrinoks,25.10.24): Check whether we can search for that event/extrinsic by comparing -// the hash or bytes instead, to avoid all the following definitions. -/// There is `subxt::events::StaticEvent` which provides the pallet's name and its event's name. On -/// top we need individualized filter methods to check whether the event in focus is exactly ours -/// (consider a situation with multiple events of same type but from different users). This trait -/// extends the existing `StaticEvent` by that filter function which can be specified here -/// individually to adapt to polka-storage needs. -pub trait EventFilterType { - fn pallet_name(&self) -> &str { - ::PALLET - } - - fn event_name(&self) -> &str { - ::EVENT - } - - fn filter_fn(&self, acc: AccountId32) -> impl Fn(&Event) -> bool; -} - -/// A default implementation of `EventFilterType` that implements every `Event` variant in -/// polka-storage. -pub struct EventFilterProvider; - -impl EventFilterType for EventFilterProvider { - fn filter_fn(&self, acc: AccountId32) -> impl Fn(&MarketEvents::BalanceAdded) -> bool { - move |e: &MarketEvents::BalanceAdded| e.who == acc - } -} - -impl EventFilterType for EventFilterProvider { - fn filter_fn(&self, acc: AccountId32) -> impl Fn(&MarketEvents::BalanceWithdrawn) -> bool { - move |e: &MarketEvents::BalanceWithdrawn| e.who == acc - } -} - -impl EventFilterType for EventFilterProvider { - fn filter_fn(&self, _: AccountId32) -> impl Fn(&MarketEvents::DealsSettled) -> bool { - move |_: &MarketEvents::DealsSettled| true - } -} - -impl EventFilterType for EventFilterProvider { - fn filter_fn(&self, _: AccountId32) -> impl Fn(&MarketEvents::DealPublished) -> bool { - move |_: &MarketEvents::DealPublished| true - } -} - -impl EventFilterType for EventFilterProvider { - fn filter_fn(&self, acc: AccountId32) -> impl Fn(&SpEvents::StorageProviderRegistered) -> bool { - move |e: &SpEvents::StorageProviderRegistered| e.owner == acc - } -} - -impl EventFilterType for EventFilterProvider { - fn filter_fn(&self, acc: AccountId32) -> impl Fn(&SpEvents::SectorsPreCommitted) -> bool { - move |e: &SpEvents::SectorsPreCommitted| e.owner == acc - } -} - -impl EventFilterType for EventFilterProvider { - fn filter_fn(&self, acc: AccountId32) -> impl Fn(&SpEvents::SectorsProven) -> bool { - move |e: &SpEvents::SectorsProven| e.owner == acc - } -} - -impl EventFilterType for EventFilterProvider { - fn filter_fn(&self, acc: AccountId32) -> impl Fn(&SpEvents::ValidPoStSubmitted) -> bool { - move |e: &SpEvents::ValidPoStSubmitted| e.owner == acc - } -} - -impl EventFilterType for EventFilterProvider { - fn filter_fn(&self, acc: AccountId32) -> impl Fn(&SpEvents::FaultsDeclared) -> bool { - move |e: &SpEvents::FaultsDeclared| e.owner == acc - } -} - -impl EventFilterType for EventFilterProvider { - fn filter_fn(&self, acc: AccountId32) -> impl Fn(&SpEvents::FaultsRecovered) -> bool { - move |e: &SpEvents::FaultsRecovered| e.owner == acc - } -} - -impl EventFilterType for EventFilterProvider { - fn filter_fn(&self, acc: AccountId32) -> impl Fn(&SpEvents::SectorsTerminated) -> bool { - move |e: &SpEvents::SectorsTerminated| e.owner == acc - } -}