diff --git a/cli/artifacts/metadata.scale b/cli/artifacts/metadata.scale index 04e06dc0..678385ac 100644 Binary files a/cli/artifacts/metadata.scale and b/cli/artifacts/metadata.scale differ diff --git a/cli/polka-storage-provider/server/src/rpc.rs b/cli/polka-storage-provider/server/src/rpc.rs index 0089a46b..f1eddda3 100644 --- a/cli/polka-storage-provider/server/src/rpc.rs +++ b/cli/polka-storage-provider/server/src/rpc.rs @@ -4,6 +4,7 @@ use jsonrpsee::server::Server; use polka_storage_provider_common::rpc::{RpcError, ServerInfo, StorageProviderRpcServer}; use primitives_commitment::commd::compute_unsealed_sector_commitment; use storagext::{ + runtime::{market::events as MarketEvents, ResultType}, types::market::{ClientDealProposal as SxtClientDealProposal, DealProposal as SxtDealProposal}, MarketClientExt, }; @@ -110,19 +111,12 @@ impl StorageProviderRpcServer for RpcServerState { // it just requires some API design let result = self .xt_client - .publish_signed_storage_deals(&self.xt_keypair, vec![deal]) - .await?; - - let published_deals = result - .events - .find::() - .collect::, _>>() - .map_err(|err| RpcError::internal_error(err, None))?; + .publish_signed_storage_deals(&self.xt_keypair, vec![deal], ResultType::Finalisation) + .await? + .expect("requested to return submission-result"); // We currently just support a single deal and if there's no published deals, // an error MUST've happened - debug_assert_eq!(published_deals.len(), 1); - let unsealed_dir = self.unsealed_piece_storage_dir.clone(); let sector_size = self.server_info.seal_proof.sector_size(); @@ -151,7 +145,7 @@ impl StorageProviderRpcServer for RpcServerState { tracing::info!("{:?}", comm_d); }); - Ok(published_deals[0].deal_id) + Ok(result.event.deal_id) } } diff --git a/cli/polka-storage/storagext-cli/src/cmd/market.rs b/cli/polka-storage/storagext-cli/src/cmd/market.rs index d5e74ede..04f3b56b 100644 --- a/cli/polka-storage/storagext-cli/src/cmd/market.rs +++ b/cli/polka-storage/storagext-cli/src/cmd/market.rs @@ -6,7 +6,7 @@ use primitives_proofs::DealId; use storagext::{ deser::DeserializablePath, multipair::{DebugPair, MultiPairSigner}, - runtime::SubmissionResult, + runtime::{market::events as MarketEvents, HashOfPsc, ResultType, SubmissionResult}, types::market::DealProposal as SxtDealProposal, MarketClientExt, PolkaStorageConfig, }; @@ -15,6 +15,7 @@ use subxt::ext::sp_core::{ }; use url::Url; +use super::display_submission_result; use crate::{missing_keypair_error, operation_takes_a_while, OutputFormat}; #[derive(Debug, Subcommand)] @@ -81,6 +82,7 @@ impl MarketCommand { account_keypair: Option, n_retries: u32, retry_interval: Duration, + result_type: ResultType, output_format: OutputFormat, ) -> Result<(), anyhow::Error> { let client = storagext::Client::new(node_rpc, n_retries, retry_interval).await?; @@ -110,7 +112,7 @@ impl MarketCommand { return Err(missing_keypair_error::().into()); }; else_ - .with_keypair(client, account_keypair, output_format) + .with_keypair(client, account_keypair, result_type, output_format) .await?; } }; @@ -122,6 +124,7 @@ impl MarketCommand { self, client: Client, account_keypair: MultiPairSigner, + result_type: ResultType, output_format: OutputFormat, ) -> Result<(), anyhow::Error> where @@ -129,19 +132,26 @@ impl MarketCommand { { operation_takes_a_while(); - let submission_result = match self { + match self { MarketCommand::AddBalance { amount } => { - Self::add_balance(client, account_keypair, amount).await? + let opt_result = + Self::add_balance(client, account_keypair, amount, result_type).await?; + display_submission_result::<_>(opt_result, output_format)?; } MarketCommand::SettleDealPayments { deal_ids } => { if deal_ids.is_empty() { bail!("No deals provided to settle"); } - Self::settle_deal_payments(client, account_keypair, deal_ids).await? + let opt_result = + Self::settle_deal_payments(client, account_keypair, deal_ids, result_type) + .await?; + display_submission_result::<_>(opt_result, output_format)?; } MarketCommand::WithdrawBalance { amount } => { - Self::withdraw_balance(client, account_keypair, amount).await? + let opt_result = + Self::withdraw_balance(client, account_keypair, amount, result_type).await?; + display_submission_result::<_>(opt_result, output_format)?; } MarketCommand::PublishStorageDeals { deals, @@ -156,34 +166,19 @@ impl MarketCommand { client_ed25519_key.map(DebugPair::into_inner) ) .expect("client is required to submit at least one key, this should've been handled by clap's ArgGroup"); - Self::publish_storage_deals(client, account_keypair, client_keypair, deals).await? + let opt_result = Self::publish_storage_deals( + client, + account_keypair, + client_keypair, + deals, + result_type, + ) + .await?; + display_submission_result::<_>(opt_result, output_format)?; } _unsigned => unreachable!("unsigned commands should have been previously handled"), }; - let hash = submission_result.hash; - // This monstrosity first converts incoming events into a "generic" (subxt generated) event, - // and then we extract only the Market events. We could probably extract this into a proper - // iterator but the effort to improvement ratio seems low (for 2 pallets at least). - let submission_results = submission_result - .events - .iter() - .flat_map(|event| { - event.map(|details| details.as_root_event::()) - }) - .filter_map(|event| match event { - Ok(storagext::runtime::Event::Market(e)) => Some(Ok(e)), - Err(err) => Some(Err(err)), - _ => None, - }); - for event in submission_results { - let event = event?; - let output = output_format.format(&event)?; - match output_format { - OutputFormat::Plain => println!("[{}] {}", hash, output), - OutputFormat::Json => println!("{}", output), - } - } Ok(()) } @@ -191,16 +186,21 @@ impl MarketCommand { client: Client, account_keypair: MultiPairSigner, amount: u128, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result>, subxt::Error> where Client: MarketClientExt, { - let submission_result = client.add_balance(&account_keypair, amount).await?; - tracing::debug!( - "[{}] Successfully added {} to Market Balance", - submission_result.hash, - amount - ); + let submission_result = client + .add_balance(&account_keypair, amount, result_type) + .await?; + if submission_result.is_some() { + tracing::debug!( + "[{}] Successfully added {} to Market Balance", + submission_result.as_ref().expect("never none").hash, + amount + ); + } Ok(submission_result) } @@ -210,7 +210,8 @@ impl MarketCommand { account_keypair: MultiPairSigner, client_keypair: MultiPairSigner, deals: Vec, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result>, subxt::Error> where Client: MarketClientExt, { @@ -219,12 +220,15 @@ impl MarketCommand { &account_keypair, &client_keypair, deals.into_iter().map(Into::into).collect(), + result_type, ) .await?; - tracing::debug!( - "[{}] Successfully published storage deals", - submission_result.hash - ); + if submission_result.is_some() { + tracing::debug!( + "[{}] Successfully published storage deals", + submission_result.as_ref().expect("never none").hash, + ); + } Ok(submission_result) } @@ -233,17 +237,20 @@ impl MarketCommand { client: Client, account_keypair: MultiPairSigner, deal_ids: Vec, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result>, subxt::Error> where Client: MarketClientExt, { let submission_result = client - .settle_deal_payments(&account_keypair, deal_ids) + .settle_deal_payments(&account_keypair, deal_ids, result_type) .await?; - tracing::debug!( - "[{}] Successfully settled deal payments", - submission_result.hash - ); + if submission_result.is_some() { + tracing::debug!( + "[{}] Successfully settled deal payments", + submission_result.as_ref().expect("never none").hash + ); + } Ok(submission_result) } @@ -252,16 +259,21 @@ impl MarketCommand { client: Client, account_keypair: MultiPairSigner, amount: u128, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result>, subxt::Error> where Client: MarketClientExt, { - let submission_result = client.withdraw_balance(&account_keypair, amount).await?; - tracing::debug!( - "[{}] Successfully withdrew {} from Market Balance", - submission_result.hash, - amount - ); + let submission_result = client + .withdraw_balance(&account_keypair, amount, result_type) + .await?; + if submission_result.is_some() { + tracing::debug!( + "[{}] Successfully withdrew {} from Market Balance", + submission_result.as_ref().expect("never none").hash, + amount + ); + } Ok(submission_result) } diff --git a/cli/polka-storage/storagext-cli/src/cmd/mod.rs b/cli/polka-storage/storagext-cli/src/cmd/mod.rs index c0c627bf..0856e4c0 100644 --- a/cli/polka-storage/storagext-cli/src/cmd/mod.rs +++ b/cli/polka-storage/storagext-cli/src/cmd/mod.rs @@ -1,3 +1,33 @@ pub mod market; pub mod storage_provider; pub mod system; + +use storagext::runtime::{HashOfPsc, SubmissionResult}; + +use crate::OutputFormat; + +pub(crate) fn display_submission_result( + opt_result: Option>, + _output_format: OutputFormat, +) -> Result<(), anyhow::Error> +where + Event: subxt::events::StaticEvent, +{ + if let Some(result) = opt_result { + // TODO(@neutrinoks,24.10.24): Check if we can return as root event instead to enable this + // display possibility again. + // let output = output_format.format(&result.event)?; + // match output_format { + // OutputFormat::Plain => println!("[{}] {}", result.hash, output), + // OutputFormat::Json => println!("{}", output), + // } + println!( + "[{}] {}::{}", + result.hash, + ::PALLET, + ::EVENT + ); + } + + Ok(()) +} diff --git a/cli/polka-storage/storagext-cli/src/cmd/storage_provider.rs b/cli/polka-storage/storagext-cli/src/cmd/storage_provider.rs index 2d2bfbe8..cf19609d 100644 --- a/cli/polka-storage/storagext-cli/src/cmd/storage_provider.rs +++ b/cli/polka-storage/storagext-cli/src/cmd/storage_provider.rs @@ -7,7 +7,7 @@ use storagext::{ multipair::MultiPairSigner, runtime::{ runtime_types::pallet_storage_provider::sector::ProveCommitSector as RuntimeProveCommitSector, - SubmissionResult, + storage_provider::events as SpEvents, HashOfPsc, ResultType, SubmissionResult, }, types::storage_provider::{ FaultDeclaration as SxtFaultDeclaration, ProveCommitSector as SxtProveCommitSector, @@ -16,10 +16,11 @@ use storagext::{ SubmitWindowedPoStParams as SxtSubmitWindowedPoStParams, TerminationDeclaration as SxtTerminationDeclaration, }, - PolkaStorageConfig, StorageProviderClientExt, + StorageProviderClientExt, }; use url::Url; +use super::display_submission_result; use crate::{missing_keypair_error, operation_takes_a_while, OutputFormat}; fn parse_post_proof(src: &str) -> Result { @@ -102,6 +103,7 @@ impl StorageProviderCommand { account_keypair: Option, n_retries: u32, retry_interval: Duration, + result_type: ResultType, output_format: OutputFormat, ) -> Result<(), anyhow::Error> { let client = storagext::Client::new(node_rpc, n_retries, retry_interval).await?; @@ -130,7 +132,7 @@ impl StorageProviderCommand { return Err(missing_keypair_error::().into()); }; else_ - .with_keypair(client, account_keypair, output_format) + .with_keypair(client, account_keypair, result_type, output_format) .await?; } }; @@ -142,6 +144,7 @@ impl StorageProviderCommand { self, client: Client, account_keypair: MultiPairSigner, + result_type: ResultType, output_format: OutputFormat, ) -> Result<(), anyhow::Error> where @@ -149,57 +152,65 @@ impl StorageProviderCommand { { operation_takes_a_while(); - let submission_result = match self { + match self { StorageProviderCommand::RegisterStorageProvider { peer_id, post_proof, } => { - Self::register_storage_provider(client, account_keypair, peer_id, post_proof) - .await? + let opt_result = Self::register_storage_provider( + client, + account_keypair, + peer_id, + post_proof, + result_type, + ) + .await?; + display_submission_result::<_>(opt_result, output_format)?; } StorageProviderCommand::PreCommit { pre_commit_sectors } => { - Self::pre_commit(client, account_keypair, pre_commit_sectors).await? + let opt_result = + Self::pre_commit(client, account_keypair, pre_commit_sectors, result_type) + .await?; + display_submission_result::<_>(opt_result, output_format)?; } StorageProviderCommand::ProveCommit { prove_commit_sectors, - } => Self::prove_commit(client, account_keypair, prove_commit_sectors).await?, + } => { + let opt_result = + Self::prove_commit(client, account_keypair, prove_commit_sectors, result_type) + .await?; + display_submission_result::<_>(opt_result, output_format)?; + } StorageProviderCommand::SubmitWindowedProofOfSpaceTime { windowed_post } => { - Self::submit_windowed_post(client, account_keypair, windowed_post).await? + let opt_result = + Self::submit_windowed_post(client, account_keypair, windowed_post, result_type) + .await?; + display_submission_result::<_>(opt_result, output_format)?; } StorageProviderCommand::DeclareFaults { faults } => { - Self::declare_faults(client, account_keypair, faults).await? + let opt_result = + Self::declare_faults(client, account_keypair, faults, result_type).await?; + display_submission_result::<_>(opt_result, output_format)?; } StorageProviderCommand::DeclareFaultsRecovered { recoveries } => { - Self::declare_faults_recovered(client, account_keypair, recoveries).await? + let opt_result = Self::declare_faults_recovered( + client, + account_keypair, + recoveries, + result_type, + ) + .await?; + display_submission_result::<_>(opt_result, output_format)?; } StorageProviderCommand::TerminateSectors { terminations } => { - Self::terminate_sectors(client, account_keypair, terminations).await? + let opt_result = + Self::terminate_sectors(client, account_keypair, terminations, result_type) + .await?; + display_submission_result::<_>(opt_result, output_format)?; } _unsigned => unreachable!("unsigned commands should have been previously handled"), - }; - - // This monstrosity first converts incoming events into a "generic" (subxt generated) event, - // and then we extract only the Market events. We could probably extract this into a proper - // iterator but the effort to improvement ratio seems low (for 2 pallets at least). - let submission_results = submission_result - .events - .iter() - .flat_map(|event| { - event.map(|details| details.as_root_event::()) - }) - .filter_map(|event| match event { - Ok(storagext::runtime::Event::StorageProvider(e)) => Some(Ok(e)), - Err(err) => Some(Err(err)), - _ => None, - }); - for event in submission_results { - let event = event?; - let output = output_format.format(&event)?; - match output_format { - OutputFormat::Plain => println!("[{}] {}", submission_result.hash, output), - OutputFormat::Json => println!("{}", output), - } } + Ok(()) } @@ -208,19 +219,25 @@ impl StorageProviderCommand { account_keypair: MultiPairSigner, peer_id: String, post_proof: RegisteredPoStProof, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result< + Option>, + subxt::Error, + > where Client: StorageProviderClientExt, { let submission_result = client - .register_storage_provider(&account_keypair, peer_id.clone(), post_proof) + .register_storage_provider(&account_keypair, peer_id.clone(), post_proof, result_type) .await?; - tracing::debug!( - "[{}] Successfully registered {}, seal: {:?} in Storage Provider Pallet", - submission_result.hash, - peer_id, - post_proof - ); + if submission_result.is_some() { + tracing::debug!( + "[{}] Successfully registered {}, seal: {:?} in Storage Provider Pallet", + submission_result.as_ref().expect("never none").hash, + peer_id, + post_proof + ); + } Ok(submission_result) } @@ -229,7 +246,8 @@ impl StorageProviderCommand { client: Client, account_keypair: MultiPairSigner, pre_commit_sectors: Vec, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result>, subxt::Error> where Client: StorageProviderClientExt, { @@ -240,13 +258,15 @@ impl StorageProviderCommand { .unzip(); let submission_result = client - .pre_commit_sectors(&account_keypair, pre_commit_sectors) + .pre_commit_sectors(&account_keypair, pre_commit_sectors, result_type) .await?; - tracing::debug!( - "[{}] Successfully pre-commited sectors {:?}.", - submission_result.hash, - sector_numbers - ); + if submission_result.is_some() { + tracing::debug!( + "[{}] Successfully pre-commited sectors {:?}.", + submission_result.as_ref().expect("never none").hash, + sector_numbers + ); + } Ok(submission_result) } @@ -255,7 +275,8 @@ impl StorageProviderCommand { client: Client, account_keypair: MultiPairSigner, prove_commit_sectors: Vec, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result>, subxt::Error> where Client: StorageProviderClientExt, { @@ -270,13 +291,15 @@ impl StorageProviderCommand { }) .unzip(); let submission_result = client - .prove_commit_sectors(&account_keypair, prove_commit_sectors) + .prove_commit_sectors(&account_keypair, prove_commit_sectors, result_type) .await?; - tracing::debug!( - "[{}] Successfully proven sector {:?}.", - submission_result.hash, - sector_numbers - ); + if submission_result.is_some() { + tracing::debug!( + "[{}] Successfully proven sector {:?}.", + submission_result.as_ref().expect("never none").hash, + sector_numbers + ); + } Ok(submission_result) } @@ -285,14 +308,20 @@ impl StorageProviderCommand { client: Client, account_keypair: MultiPairSigner, windowed_post: SxtSubmitWindowedPoStParams, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result>, subxt::Error> where Client: StorageProviderClientExt, { let submission_result = client - .submit_windowed_post(&account_keypair, windowed_post.into()) + .submit_windowed_post(&account_keypair, windowed_post.into(), result_type) .await?; - tracing::debug!("[{}] Successfully submitted proof.", submission_result.hash); + if submission_result.is_some() { + tracing::debug!( + "[{}] Successfully submitted proof.", + submission_result.as_ref().expect("never none").hash + ); + } Ok(submission_result) } @@ -301,12 +330,20 @@ impl StorageProviderCommand { client: Client, account_keypair: MultiPairSigner, faults: Vec, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result>, subxt::Error> where Client: StorageProviderClientExt, { - let submission_result = client.declare_faults(&account_keypair, faults).await?; - tracing::debug!("[{}] Successfully declared faults.", submission_result.hash); + let submission_result = client + .declare_faults(&account_keypair, faults, result_type) + .await?; + if submission_result.is_some() { + tracing::debug!( + "[{}] Successfully declared faults.", + submission_result.as_ref().expect("never none").hash + ); + } Ok(submission_result) } @@ -315,14 +352,20 @@ impl StorageProviderCommand { client: Client, account_keypair: MultiPairSigner, recoveries: Vec, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result>, subxt::Error> where Client: StorageProviderClientExt, { let submission_result = client - .declare_faults_recovered(&account_keypair, recoveries) + .declare_faults_recovered(&account_keypair, recoveries, result_type) .await?; - tracing::debug!("[{}] Successfully declared faults.", submission_result.hash); + if submission_result.is_some() { + tracing::debug!( + "[{}] Successfully declared faults.", + submission_result.as_ref().expect("never none").hash + ); + } Ok(submission_result) } @@ -331,17 +374,20 @@ impl StorageProviderCommand { client: Client, account_keypair: MultiPairSigner, terminations: Vec, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result>, subxt::Error> where Client: StorageProviderClientExt, { let submission_result = client - .terminate_sectors(&account_keypair, terminations) + .terminate_sectors(&account_keypair, terminations, result_type) .await?; - tracing::debug!( - "[{}] Successfully terminated sectors.", - submission_result.hash - ); + if submission_result.is_some() { + tracing::debug!( + "[{}] Successfully terminated sectors.", + submission_result.as_ref().expect("never none").hash, + ); + } Ok(submission_result) } diff --git a/cli/polka-storage/storagext-cli/src/main.rs b/cli/polka-storage/storagext-cli/src/main.rs index b512d357..47c3d704 100644 --- a/cli/polka-storage/storagext-cli/src/main.rs +++ b/cli/polka-storage/storagext-cli/src/main.rs @@ -6,7 +6,10 @@ use std::{fmt::Debug, time::Duration}; use clap::{ArgGroup, Parser, Subcommand}; use cmd::{market::MarketCommand, storage_provider::StorageProviderCommand, system::SystemCommand}; -use storagext::multipair::{DebugPair, MultiPairSigner}; +use storagext::{ + multipair::{DebugPair, MultiPairSigner}, + runtime::ResultType, +}; use subxt::ext::sp_core::{ ecdsa::Pair as ECDSAPair, ed25519::Pair as Ed25519Pair, sr25519::Pair as Sr25519Pair, }; @@ -77,6 +80,10 @@ struct Cli { #[arg(long, env, default_value = DEFAULT_RETRY_INTERVAL_MS, value_parser = parse_ms)] pub retry_interval: Duration, + /// The expected return type, i.e. none, hash, events, etc.. + #[arg(long, env, default_value_t = ResultType::Finalisation, value_parser = ResultType::value_parser)] + pub result_type: ResultType, + /// Output format. #[arg(long, env, value_parser = OutputFormat::value_parser, default_value_t = OutputFormat::Plain)] pub format: OutputFormat, @@ -101,6 +108,7 @@ impl SubCommand { account_keypair: Option, n_retries: u32, retry_interval: Duration, + result_type: ResultType, output_format: OutputFormat, ) -> Result<(), anyhow::Error> { match self { @@ -110,6 +118,7 @@ impl SubCommand { account_keypair, n_retries, retry_interval, + result_type, output_format, ) .await?; @@ -120,6 +129,7 @@ impl SubCommand { account_keypair, n_retries, retry_interval, + result_type, output_format, ) .await?; @@ -179,6 +189,7 @@ async fn main() -> Result<(), anyhow::Error> { multi_pair_signer, cli_arguments.n_retries, cli_arguments.retry_interval, + cli_arguments.result_type, cli_arguments.format, ) .await?; @@ -242,10 +253,10 @@ impl OutputFormat { pub fn format(&self, value: &T) -> Result where - T: std::fmt::Display + serde::Serialize, + T: std::fmt::Debug + serde::Serialize, { match self { - OutputFormat::Plain => Ok(value.to_string()), + OutputFormat::Plain => Ok(format!("{value:?}")), OutputFormat::Json => serde_json::to_string(value), } } diff --git a/cli/polka-storage/storagext/src/clients/market.rs b/cli/polka-storage/storagext/src/clients/market.rs index ee7b9e6e..c2b47eda 100644 --- a/cli/polka-storage/storagext/src/clients/market.rs +++ b/cli/polka-storage/storagext/src/clients/market.rs @@ -6,7 +6,8 @@ use subxt::{ext::sp_core::crypto::Ss58Codec, utils::Static}; use crate::{ runtime::{ self, - client::SubmissionResult, + client::{HashOfPsc, ResultType, SubmissionResult}, + market::events as MarketEvents, runtime_types::pallet_market::pallet::{ BalanceEntry, ClientDealProposal as RuntimeClientDealProposal, }, @@ -34,7 +35,13 @@ pub trait MarketClientExt { &self, account_keypair: &Keypair, amount: Currency, - ) -> impl Future, subxt::Error>> + result_type: ResultType, + ) -> impl Future< + Output = Result< + Option>, + subxt::Error, + >, + > where Keypair: subxt::tx::Signer; @@ -43,7 +50,13 @@ pub trait MarketClientExt { &self, account_keypair: &Keypair, amount: Currency, - ) -> impl Future, subxt::Error>> + result_type: ResultType, + ) -> impl Future< + Output = Result< + Option>, + subxt::Error, + >, + > where Keypair: subxt::tx::Signer; @@ -54,7 +67,13 @@ pub trait MarketClientExt { &self, account_keypair: &Keypair, deal_ids: Vec, - ) -> impl Future, subxt::Error>> + result_type: ResultType, + ) -> impl Future< + Output = Result< + Option>, + subxt::Error, + >, + > where Keypair: subxt::tx::Signer; @@ -66,7 +85,13 @@ pub trait MarketClientExt { account_keypair: &Keypair, client_keypair: &ClientKeypair, deals: Vec, - ) -> impl Future, subxt::Error>> + result_type: ResultType, + ) -> impl Future< + Output = Result< + Option>, + subxt::Error, + >, + > where Keypair: subxt::tx::Signer, ClientKeypair: subxt::tx::Signer; @@ -78,7 +103,13 @@ pub trait MarketClientExt { &self, account_keypair: &Keypair, deals: Vec, - ) -> impl Future, subxt::Error>> + result_type: ResultType, + ) -> impl Future< + Output = Result< + Option>, + subxt::Error, + >, + > where Keypair: subxt::tx::Signer; @@ -102,12 +133,14 @@ impl MarketClientExt for crate::runtime::client::Client { &self, account_keypair: &Keypair, amount: Currency, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result>, subxt::Error> where Keypair: subxt::tx::Signer, { let payload = runtime::tx().market().withdraw_balance(amount); - self.traced_submission(&payload, account_keypair).await + self.traced_submission(&payload, account_keypair, result_type) + .await } #[tracing::instrument( @@ -122,12 +155,14 @@ impl MarketClientExt for crate::runtime::client::Client { &self, account_keypair: &Keypair, amount: Currency, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result>, subxt::Error> where Keypair: subxt::tx::Signer, { let payload = runtime::tx().market().add_balance(amount); - self.traced_submission(&payload, account_keypair).await + self.traced_submission(&payload, account_keypair, result_type) + .await } #[tracing::instrument( @@ -142,7 +177,8 @@ impl MarketClientExt for crate::runtime::client::Client { &self, account_keypair: &Keypair, mut deal_ids: Vec, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result>, subxt::Error> where Keypair: subxt::tx::Signer, { @@ -160,7 +196,8 @@ impl MarketClientExt for crate::runtime::client::Client { .market() .settle_deal_payments(bounded_unbounded_deal_ids); - self.traced_submission(&payload, account_keypair).await + self.traced_submission(&payload, account_keypair, result_type) + .await } #[tracing::instrument( @@ -175,7 +212,8 @@ impl MarketClientExt for crate::runtime::client::Client { account_keypair: &Keypair, client_keypair: &ClientKeypair, mut deals: Vec, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result>, subxt::Error> where Keypair: subxt::tx::Signer, ClientKeypair: subxt::tx::Signer, @@ -202,7 +240,8 @@ impl MarketClientExt for crate::runtime::client::Client { .market() .publish_storage_deals(bounded_unbounded_deals); - self.traced_submission(&payload, account_keypair).await + self.traced_submission(&payload, account_keypair, result_type) + .await } #[tracing::instrument( @@ -216,7 +255,8 @@ impl MarketClientExt for crate::runtime::client::Client { &self, account_keypair: &Keypair, mut deals: Vec, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result>, subxt::Error> where Keypair: subxt::tx::Signer, { @@ -240,7 +280,8 @@ impl MarketClientExt for crate::runtime::client::Client { .market() .publish_storage_deals(bounded_unbounded_deals); - self.traced_submission(&payload, account_keypair).await + self.traced_submission(&payload, account_keypair, result_type) + .await } #[tracing::instrument( diff --git a/cli/polka-storage/storagext/src/clients/storage_provider.rs b/cli/polka-storage/storagext/src/clients/storage_provider.rs index 26ee45c5..d491392c 100644 --- a/cli/polka-storage/storagext/src/clients/storage_provider.rs +++ b/cli/polka-storage/storagext/src/clients/storage_provider.rs @@ -11,25 +11,32 @@ use crate::{ runtime::{ self, bounded_vec::IntoBoundedByteVec, - client::SubmissionResult, + client::{HashOfPsc, ResultType, SubmissionResult}, runtime_types::pallet_storage_provider::{ proofs::SubmitWindowedPoStParams, sector::ProveCommitSector, storage_provider::StorageProviderState, }, - storage_provider::calls::types::register_storage_provider::PeerId, + storage_provider::{calls::types::register_storage_provider::PeerId, events as SpEvents}, }, types::storage_provider::{ FaultDeclaration, RecoveryDeclaration, SectorPreCommitInfo, TerminationDeclaration, }, BlockNumber, Currency, PolkaStorageConfig, }; + pub trait StorageProviderClientExt { fn register_storage_provider( &self, account_keypair: &Keypair, peer_id: String, post_proof: RegisteredPoStProof, - ) -> impl Future, subxt::Error>> + result_type: ResultType, + ) -> impl Future< + Output = Result< + Option>, + subxt::Error, + >, + > where Keypair: subxt::tx::Signer; @@ -37,7 +44,13 @@ pub trait StorageProviderClientExt { &self, account_keypair: &Keypair, sectors: Vec, - ) -> impl Future, subxt::Error>> + result_type: ResultType, + ) -> impl Future< + Output = Result< + Option>, + subxt::Error, + >, + > where Keypair: subxt::tx::Signer; @@ -45,7 +58,10 @@ pub trait StorageProviderClientExt { &self, account_keypair: &Keypair, sectors: Vec, - ) -> impl Future, subxt::Error>> + result_type: ResultType, + ) -> impl Future< + Output = Result>, subxt::Error>, + > where Keypair: subxt::tx::Signer; @@ -53,7 +69,13 @@ pub trait StorageProviderClientExt { &self, account_keypair: &Keypair, windowed_post: SubmitWindowedPoStParams, - ) -> impl Future, subxt::Error>> + result_type: ResultType, + ) -> impl Future< + Output = Result< + Option>, + subxt::Error, + >, + > where Keypair: subxt::tx::Signer; @@ -61,7 +83,13 @@ pub trait StorageProviderClientExt { &self, account_keypair: &Keypair, faults: Vec, - ) -> impl Future, subxt::Error>> + result_type: ResultType, + ) -> impl Future< + Output = Result< + Option>, + subxt::Error, + >, + > where Keypair: subxt::tx::Signer; @@ -69,7 +97,13 @@ pub trait StorageProviderClientExt { &self, account_keypair: &Keypair, recoveries: Vec, - ) -> impl Future, subxt::Error>> + result_type: ResultType, + ) -> impl Future< + Output = Result< + Option>, + subxt::Error, + >, + > where Keypair: subxt::tx::Signer; @@ -77,7 +111,13 @@ pub trait StorageProviderClientExt { &self, account_keypair: &Keypair, terminations: Vec, - ) -> impl Future, subxt::Error>> + result_type: ResultType, + ) -> impl Future< + Output = Result< + Option>, + subxt::Error, + >, + > where Keypair: subxt::tx::Signer; @@ -106,7 +146,11 @@ impl StorageProviderClientExt for crate::runtime::client::Client { account_keypair: &Keypair, peer_id: String, post_proof: RegisteredPoStProof, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result< + Option>, + subxt::Error, + > where Keypair: subxt::tx::Signer, { @@ -114,7 +158,8 @@ impl StorageProviderClientExt for crate::runtime::client::Client { .storage_provider() .register_storage_provider(peer_id.into_bounded_byte_vec(), post_proof); - self.traced_submission(&payload, account_keypair).await + self.traced_submission(&payload, account_keypair, result_type) + .await } #[tracing::instrument( @@ -128,14 +173,16 @@ impl StorageProviderClientExt for crate::runtime::client::Client { &self, account_keypair: &Keypair, sectors: Vec, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result>, subxt::Error> where Keypair: subxt::tx::Signer, { let sectors = BoundedVec(sectors.into_iter().map(Into::into).collect()); let payload = runtime::tx().storage_provider().pre_commit_sectors(sectors); - self.traced_submission(&payload, account_keypair).await + self.traced_submission(&payload, account_keypair, result_type) + .await } #[tracing::instrument( @@ -149,7 +196,8 @@ impl StorageProviderClientExt for crate::runtime::client::Client { &self, account_keypair: &Keypair, sectors: Vec, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result>, subxt::Error> where Keypair: subxt::tx::Signer, { @@ -158,7 +206,8 @@ impl StorageProviderClientExt for crate::runtime::client::Client { .storage_provider() .prove_commit_sectors(sectors); - self.traced_submission(&payload, account_keypair).await + self.traced_submission(&payload, account_keypair, result_type) + .await } #[tracing::instrument( @@ -172,7 +221,8 @@ impl StorageProviderClientExt for crate::runtime::client::Client { &self, account_keypair: &Keypair, windowed_post: SubmitWindowedPoStParams, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result>, subxt::Error> where Keypair: subxt::tx::Signer, { @@ -180,7 +230,8 @@ impl StorageProviderClientExt for crate::runtime::client::Client { .storage_provider() .submit_windowed_post(windowed_post); - self.traced_submission(&payload, account_keypair).await + self.traced_submission(&payload, account_keypair, result_type) + .await } #[tracing::instrument( @@ -194,7 +245,8 @@ impl StorageProviderClientExt for crate::runtime::client::Client { &self, account_keypair: &Keypair, faults: Vec, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result>, subxt::Error> where Keypair: subxt::tx::Signer, { @@ -202,7 +254,8 @@ impl StorageProviderClientExt for crate::runtime::client::Client { .storage_provider() .declare_faults(faults.into()); - self.traced_submission(&payload, account_keypair).await + self.traced_submission(&payload, account_keypair, result_type) + .await } #[tracing::instrument( @@ -216,7 +269,8 @@ impl StorageProviderClientExt for crate::runtime::client::Client { &self, account_keypair: &Keypair, recoveries: Vec, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result>, subxt::Error> where Keypair: subxt::tx::Signer, { @@ -224,7 +278,8 @@ impl StorageProviderClientExt for crate::runtime::client::Client { .storage_provider() .declare_faults_recovered(recoveries.into()); - self.traced_submission(&payload, account_keypair).await + self.traced_submission(&payload, account_keypair, result_type) + .await } #[tracing::instrument(level = "debug", skip_all)] @@ -232,7 +287,8 @@ impl StorageProviderClientExt for crate::runtime::client::Client { &self, account_keypair: &Keypair, terminations: Vec, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result>, subxt::Error> where Keypair: subxt::tx::Signer, { @@ -240,7 +296,8 @@ impl StorageProviderClientExt for crate::runtime::client::Client { .storage_provider() .terminate_sectors(terminations.into()); - self.traced_submission(&payload, account_keypair).await + self.traced_submission(&payload, account_keypair, result_type) + .await } #[tracing::instrument(level = "debug", skip_all)] diff --git a/cli/polka-storage/storagext/src/lib.rs b/cli/polka-storage/storagext/src/lib.rs index d4da7aed..6db1c592 100644 --- a/cli/polka-storage/storagext/src/lib.rs +++ b/cli/polka-storage/storagext/src/lib.rs @@ -19,7 +19,7 @@ pub type Currency = u128; pub type BlockNumber = u64; /// Parachain configuration for subxt. -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum PolkaStorageConfig {} // Types are fully qualified ON PURPOSE! diff --git a/cli/polka-storage/storagext/src/runtime/client.rs b/cli/polka-storage/storagext/src/runtime/client.rs index 5cf101e7..8fbfa820 100644 --- a/cli/polka-storage/storagext/src/runtime/client.rs +++ b/cli/polka-storage/storagext/src/runtime/client.rs @@ -1,20 +1,52 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use hex::ToHex; -use subxt::{blocks::ExtrinsicEvents, OnlineClient}; +use subxt::{utils::AccountId32, OnlineClient}; +use tokio::sync::RwLock; -use crate::PolkaStorageConfig; +use crate::{ + runtime::{market::events as MarketEvents, storage_provider::events as SpEvents}, + PolkaStorageConfig, +}; + +type HashOf = ::Hash; +pub type HashOfPsc = HashOf; +type ParaEvents = Arc, subxt::events::EventDetails)>>>; /// Helper type for [`Client::traced_submission`] successful results. -pub struct SubmissionResult -where - Config: subxt::Config, -{ +#[derive(Debug)] +pub struct SubmissionResult { /// Submission block hash. - pub hash: Config::Hash, - + pub hash: Hash, /// Resulting extrinsic's events. - pub events: ExtrinsicEvents, + pub event: Event, +} + +// TODO: Since I changed the enum variants, I still haven't found a better name for that enum. +/// Selector type for the result type. +#[derive(Clone, Debug, Default, PartialEq)] +pub enum ResultType { + /// Send extrinsic and return (does not wait for finalisation). + SendOnly, + /// Will wait for finalisation and return parameters of success defined in `SubmissionResult`. + #[default] + Finalisation, +} + +impl ResultType { + pub fn value_parser(s: &str) -> Result { + match s.to_lowercase().as_str() { + "sendonly" => Ok(ResultType::SendOnly), + "finalisation" => Ok(ResultType::Finalisation), + format => Err(format!("unknown format: {}", format)), + } + } +} + +impl std::fmt::Display for ResultType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } } /// Client to interact with a pallet extrinsics. @@ -66,15 +98,19 @@ impl Client { /// /// Equivalent to performing [`OnlineClient::sign_and_submit_then_watch_default`], /// followed by [`TxInBlock::wait_for_finalized`] and [`TxInBlock::wait_for_success`]. - pub(crate) async fn traced_submission( + pub(crate) async fn traced_submission( &self, call: &Call, account_keypair: &Keypair, - ) -> Result, subxt::Error> + result_type: ResultType, + ) -> Result>, subxt::Error> where - Call: subxt::tx::Payload, + Call: subxt::tx::Payload + std::fmt::Debug, Keypair: subxt::tx::Signer, + Event: subxt::events::StaticEvent + std::fmt::Debug, + EventFilterProvider: EventFilterType, { + println!("call: {call:?}"); tracing::trace!("submitting extrinsic"); let submission_progress = self .client @@ -82,24 +118,43 @@ impl Client { .sign_and_submit_then_watch_default(call, account_keypair) .await?; + if result_type == ResultType::SendOnly { + return Ok(None); + } tracing::trace!( extrinsic_hash = submission_progress.extrinsic_hash().encode_hex::(), "waiting for finalization" ); - let finalized_xt = submission_progress.wait_for_finalized().await?; - let block_hash = finalized_xt.block_hash(); - tracing::trace!( - block_hash = block_hash.encode_hex::(), - "successfully submitted extrinsic" - ); - // finalized != successful - let xt_events = finalized_xt.wait_for_success().await?; + let para_events: ParaEvents = Arc::new(RwLock::new(Vec::new())); + let account_id = AccountId32::from(account_keypair.account_id()); + let listener_flag = Arc::new(RwLock::new(true)); + + let p_api = self.client.clone(); + let p_events = para_events.clone(); + let p_flag = listener_flag.clone(); + + let _ = tokio::spawn(async move { + para_watcher(p_api, p_flag, p_events).await; + }); + + static EVENT_META_PROVIDER: EventFilterProvider = EventFilterProvider; + let (hash, event) = wait_for_para_event::( + para_events.clone(), + >::pallet_name(&EVENT_META_PROVIDER), + >::event_name(&EVENT_META_PROVIDER), + >::filter_fn( + &EVENT_META_PROVIDER, + account_id, + ), + ) + .await; + { + let mut flag_guard = listener_flag.write().await; + *flag_guard = false; + } - Ok(SubmissionResult { - hash: block_hash, - events: xt_events, - }) + Ok(Some(SubmissionResult { hash, event })) } } @@ -108,3 +163,156 @@ impl From> for Client { Self { client } } } + +/// Methods iterates through the given stack of collected events from the listener and compares for +/// a given expected event type, for example `pallet_market::Event::BalanceAdded`. If the event has +/// been found it will be returned. +#[tracing::instrument(skip_all)] +async fn wait_for_para_event( + events: ParaEvents, + pallet: &'static str, + variant: &'static str, + predicate: impl Fn(&E) -> bool, +) -> (HashOf, E) +where + C: subxt::Config + Clone + std::fmt::Debug, + E: subxt::events::StaticEvent + std::fmt::Debug, +{ + loop { + let mut events = events.write().await; + if let Some(entry) = events.iter().find(|&e| { + e.2.pallet_name() == pallet + && e.2.variant_name() == variant + && predicate(&e.2.as_event::().unwrap().unwrap()) + }) { + let entry = entry.clone(); + events.retain(|e| e.0 > entry.0); + tracing::trace!( + "Found related event {}::{} on block {}", + pallet, + variant, + entry.0 + ); + return (entry.1, entry.2.as_event::().unwrap().unwrap()); + } + drop(events); + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + } +} + +/// Method listens to finalised blocks, collects all events and pushes them to a given stack. +async fn para_watcher( + api: OnlineClient, + flag: Arc>, + events: ParaEvents, +) where + ::Number: std::fmt::Display, +{ + tracing::trace!("start listening to events on finalised blocks"); + let mut blocks_sub = api.blocks().subscribe_finalized().await.unwrap(); + + while *flag.read().await { + while let Some(block) = blocks_sub.next().await { + let block = block.unwrap(); + let hash = block.hash(); + + for event in block.events().await.unwrap().iter() { + let event = event.unwrap(); + { + events + .write() + .await + .push((block.number().into(), hash, event.clone())); + } + } + } + } + tracing::trace!("stoped event-listener"); +} + +/// There is `subxt::events::StaticEvent` which provides the pallet's name and its event's name. On +/// top we need individualised filter methods to check whether the event in focus is exactly ours +/// (consider a situation with multiple events of same type but from different users). This trait +/// extens the existing `StaticEvent` by that filter function which can be specified here +/// individually to adapt to polka-storage needs. +pub trait EventFilterType { + fn pallet_name(&self) -> &str { + ::PALLET + } + + fn event_name(&self) -> &str { + ::EVENT + } + + fn filter_fn(&self, acc: AccountId32) -> impl Fn(&Event) -> bool; +} + +/// A default implementation of `EventFilterType` that implements every `Event` variant in +/// polka-storage. +pub struct EventFilterProvider; + +impl EventFilterType for EventFilterProvider { + fn filter_fn(&self, acc: AccountId32) -> impl Fn(&MarketEvents::BalanceAdded) -> bool { + move |e: &MarketEvents::BalanceAdded| e.who == acc + } +} + +impl EventFilterType for EventFilterProvider { + fn filter_fn(&self, acc: AccountId32) -> impl Fn(&MarketEvents::BalanceWithdrawn) -> bool { + move |e: &MarketEvents::BalanceWithdrawn| e.who == acc + } +} + +impl EventFilterType for EventFilterProvider { + fn filter_fn(&self, _: AccountId32) -> impl Fn(&MarketEvents::DealsSettled) -> bool { + move |_: &MarketEvents::DealsSettled| true + } +} + +impl EventFilterType for EventFilterProvider { + fn filter_fn(&self, _: AccountId32) -> impl Fn(&MarketEvents::DealPublished) -> bool { + move |_: &MarketEvents::DealPublished| true + } +} + +impl EventFilterType for EventFilterProvider { + fn filter_fn(&self, acc: AccountId32) -> impl Fn(&SpEvents::StorageProviderRegistered) -> bool { + move |e: &SpEvents::StorageProviderRegistered| e.owner == acc + } +} + +impl EventFilterType for EventFilterProvider { + fn filter_fn(&self, acc: AccountId32) -> impl Fn(&SpEvents::SectorsPreCommitted) -> bool { + move |e: &SpEvents::SectorsPreCommitted| e.owner == acc + } +} + +impl EventFilterType for EventFilterProvider { + fn filter_fn(&self, acc: AccountId32) -> impl Fn(&SpEvents::SectorsProven) -> bool { + move |e: &SpEvents::SectorsProven| e.owner == acc + } +} + +impl EventFilterType for EventFilterProvider { + fn filter_fn(&self, acc: AccountId32) -> impl Fn(&SpEvents::ValidPoStSubmitted) -> bool { + move |e: &SpEvents::ValidPoStSubmitted| e.owner == acc + } +} + +impl EventFilterType for EventFilterProvider { + fn filter_fn(&self, acc: AccountId32) -> impl Fn(&SpEvents::FaultsDeclared) -> bool { + move |e: &SpEvents::FaultsDeclared| e.owner == acc + } +} + +impl EventFilterType for EventFilterProvider { + fn filter_fn(&self, acc: AccountId32) -> impl Fn(&SpEvents::FaultsRecovered) -> bool { + move |e: &SpEvents::FaultsRecovered| e.owner == acc + } +} + +impl EventFilterType for EventFilterProvider { + fn filter_fn(&self, acc: AccountId32) -> impl Fn(&SpEvents::SectorsTerminated) -> bool { + move |e: &SpEvents::SectorsTerminated| e.owner == acc + } +} diff --git a/cli/polka-storage/storagext/src/runtime/mod.rs b/cli/polka-storage/storagext/src/runtime/mod.rs index 484a1d43..e5b10442 100644 --- a/cli/polka-storage/storagext/src/runtime/mod.rs +++ b/cli/polka-storage/storagext/src/runtime/mod.rs @@ -103,7 +103,7 @@ pub mod display; mod polka_storage_runtime {} // Using self keeps the import separate from the others -pub use client::SubmissionResult; +pub use client::{HashOfPsc, ResultType, SubmissionResult}; pub use self::polka_storage_runtime::*; #[cfg(test)]