Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add subscribe flag and event listener #448

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
674 changes: 331 additions & 343 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ rand = { version = "0.8.5", default-features = false }
rand_chacha = { version = "0.3.1", default-features = false }
rand_xorshift = "0.3"
rocksdb = { version = "0.21" }
scale-decode = { version = "0.13.1", default-features = false }
scale-encode = { version = "0.7.1", default-features = false }
neutrinoks marked this conversation as resolved.
Show resolved Hide resolved
scale-info = { version = "2.11.1", default-features = false }
serde = { version = "1.0.197", default-features = false }
serde-big-array = { version = "0.3.2" }
Expand All @@ -111,9 +113,6 @@ tracing-subscriber = "0.3.18"
url = "2.5.0"
uuid = "1.8.0"

scale-decode = { version = "0.13.1", default-features = false }
scale-encode = { version = "0.7.1", default-features = false }

# Testing
rstest = { version = "0.22.0" }

Expand Down
Binary file modified cli/artifacts/metadata.scale
Binary file not shown.
28 changes: 11 additions & 17 deletions cli/polka-storage-provider/server/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,26 +108,20 @@ impl StorageProviderRpcServer for RpcServerState {
));
}

// TODO(@jmg-duarte,#428,04/10/2024):
// There's a small bug here, currently, xt_client waits for a "full extrisic submission"
// meaning that it will wait until the block where it is included in is finalized
// however, due to https://github.com/paritytech/subxt/issues/1668 it may wrongly fail.
// Fixing this requires the xt_client not wait for the finalization, it's not hard to do
// it just requires some API design
let result = self
let submission_result = self
.xt_client
.publish_signed_storage_deals(&self.xt_keypair, vec![deal])
.await?;

let published_deals = result
.events
.find::<storagext::runtime::market::events::DealPublished>()
.collect::<Result<Vec<_>, _>>()
.map_err(|err| RpcError::internal_error(err, None))?;
.publish_signed_storage_deals(&self.xt_keypair, vec![deal], true)
.await?
.expect("requested to return submission-result");
neutrinoks marked this conversation as resolved.
Show resolved Hide resolved
let events = if let Ok(events) = submission_result {
events
} else {
return Err(RpcError::internal_error("pallet returned an error", None));
};

// We currently just support a single deal and if there's no published deals,
// an error MUST've happened
debug_assert_eq!(published_deals.len(), 1);
neutrinoks marked this conversation as resolved.
Show resolved Hide resolved
debug_assert_eq!(events.len(), 1);

let unsealed_dir = self.unsealed_piece_storage_dir.clone();
let sealed_dir = self.sealed_piece_storage_dir.clone();
Expand Down Expand Up @@ -173,7 +167,7 @@ impl StorageProviderRpcServer for RpcServerState {
tracing::info!("{:?}", precommit_result);
});

Ok(published_deals[0].deal_id)
Ok(events[0].variant.deal_id)
}
}

Expand Down
157 changes: 93 additions & 64 deletions cli/polka-storage/storagext-cli/src/cmd/market.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use primitives_proofs::DealId;
use storagext::{
deser::DeserializablePath,
multipair::{DebugPair, MultiPairSigner},
runtime::SubmissionResult,
runtime::{market::events as MktEvents, HashOfPsc, SubmissionResult},
types::market::DealProposal as SxtDealProposal,
MarketClientExt, PolkaStorageConfig,
};
Expand All @@ -15,6 +15,7 @@ use subxt::ext::sp_core::{
};
use url::Url;

use super::display_submission_result;
use crate::{missing_keypair_error, operation_takes_a_while, OutputFormat};

#[derive(Debug, Subcommand)]
Expand Down Expand Up @@ -81,6 +82,7 @@ impl MarketCommand {
account_keypair: Option<MultiPairSigner>,
n_retries: u32,
retry_interval: Duration,
wait_for_finalization: bool,
output_format: OutputFormat,
) -> Result<(), anyhow::Error> {
let client = storagext::Client::new(node_rpc, n_retries, retry_interval).await?;
Expand Down Expand Up @@ -110,7 +112,12 @@ impl MarketCommand {
return Err(missing_keypair_error::<Self>().into());
};
else_
.with_keypair(client, account_keypair, output_format)
.with_keypair(
client,
account_keypair,
wait_for_finalization,
output_format,
)
.await?;
}
};
Expand All @@ -122,26 +129,42 @@ impl MarketCommand {
self,
client: Client,
account_keypair: MultiPairSigner,
wait_for_finalization: bool,
output_format: OutputFormat,
) -> Result<(), anyhow::Error>
where
Client: MarketClientExt,
{
operation_takes_a_while();
if wait_for_finalization {
operation_takes_a_while();
}

let submission_result = match self {
match self {
MarketCommand::AddBalance { amount } => {
Self::add_balance(client, account_keypair, amount).await?
let opt_result =
Self::add_balance(client, account_keypair, amount, wait_for_finalization)
.await?;
display_submission_result::<_>(opt_result, output_format)?;
}
MarketCommand::SettleDealPayments { deal_ids } => {
if deal_ids.is_empty() {
bail!("No deals provided to settle");
}

Self::settle_deal_payments(client, account_keypair, deal_ids).await?
let opt_result = Self::settle_deal_payments(
client,
account_keypair,
deal_ids,
wait_for_finalization,
)
.await?;
display_submission_result::<_>(opt_result, output_format)?;
}
MarketCommand::WithdrawBalance { amount } => {
Self::withdraw_balance(client, account_keypair, amount).await?
let opt_result =
Self::withdraw_balance(client, account_keypair, amount, wait_for_finalization)
.await?;
display_submission_result::<_>(opt_result, output_format)?;
}
MarketCommand::PublishStorageDeals {
deals,
Expand All @@ -156,113 +179,119 @@ impl MarketCommand {
client_ed25519_key.map(DebugPair::into_inner)
)
.expect("client is required to submit at least one key, this should've been handled by clap's ArgGroup");
Self::publish_storage_deals(client, account_keypair, client_keypair, deals).await?
let opt_result = Self::publish_storage_deals(
client,
account_keypair,
client_keypair,
deals,
wait_for_finalization,
)
.await?;
display_submission_result::<_>(opt_result, output_format)?;
}
_unsigned => unreachable!("unsigned commands should have been previously handled"),
};

let hash = submission_result.hash;
// This monstrosity first converts incoming events into a "generic" (subxt generated) event,
// and then we extract only the Market events. We could probably extract this into a proper
// iterator but the effort to improvement ratio seems low (for 2 pallets at least).
let submission_results = submission_result
.events
.iter()
.flat_map(|event| {
event.map(|details| details.as_root_event::<storagext::runtime::Event>())
})
.filter_map(|event| match event {
Ok(storagext::runtime::Event::Market(e)) => Some(Ok(e)),
Err(err) => Some(Err(err)),
_ => None,
});
for event in submission_results {
let event = event?;
let output = output_format.format(&event)?;
match output_format {
OutputFormat::Plain => println!("[{}] {}", hash, output),
OutputFormat::Json => println!("{}", output),
}
}
Ok(())
}

async fn add_balance<Client>(
client: Client,
account_keypair: MultiPairSigner,
amount: u128,
) -> Result<SubmissionResult<PolkaStorageConfig>, subxt::Error>
wait_for_finalization: bool,
) -> Result<Option<SubmissionResult<HashOfPsc, MktEvents::BalanceAdded>>, subxt::Error>
where
Client: MarketClientExt,
{
let submission_result = client.add_balance(&account_keypair, amount).await?;
tracing::debug!(
"[{}] Successfully added {} to Market Balance",
submission_result.hash,
amount
);

Ok(submission_result)
let submission_result = client
.add_balance(&account_keypair, amount, wait_for_finalization)
.await?;
Ok(submission_result.inspect(|sr| {
let _ = sr.as_ref().inspect(|events| {
tracing::trace!(
"[{}] Successfully added {} to Market Balance",
events[0].hash,
amount
)
});
}))
}

async fn publish_storage_deals<Client>(
client: Client,
account_keypair: MultiPairSigner,
client_keypair: MultiPairSigner,
deals: Vec<SxtDealProposal>,
) -> Result<SubmissionResult<PolkaStorageConfig>, subxt::Error>
wait_for_finalization: bool,
) -> Result<Option<SubmissionResult<HashOfPsc, MktEvents::DealPublished>>, subxt::Error>
where
Client: MarketClientExt,
{
let n_deals = deals.len();
let submission_result = client
.publish_storage_deals(
&account_keypair,
&client_keypair,
deals.into_iter().map(Into::into).collect(),
wait_for_finalization,
)
.await?;
tracing::debug!(
"[{}] Successfully published storage deals",
submission_result.hash
);

Ok(submission_result)
Ok(submission_result.inspect(|sr| {
let _ = sr.as_ref().inspect(|events| {
tracing::trace!(
"[{}] Successfully published {} storage deals",
events[0].hash,
n_deals
)
});
}))
}

async fn settle_deal_payments<Client>(
client: Client,
account_keypair: MultiPairSigner,
deal_ids: Vec<u64>,
) -> Result<SubmissionResult<PolkaStorageConfig>, subxt::Error>
wait_for_finalization: bool,
) -> Result<Option<SubmissionResult<HashOfPsc, MktEvents::DealsSettled>>, subxt::Error>
where
Client: MarketClientExt,
{
let n_deal_ids = deal_ids.len();
let submission_result = client
.settle_deal_payments(&account_keypair, deal_ids)
.settle_deal_payments(&account_keypair, deal_ids, wait_for_finalization)
.await?;
tracing::debug!(
"[{}] Successfully settled deal payments",
submission_result.hash
);

Ok(submission_result)
Ok(submission_result.inspect(|sr| {
let _ = sr.as_ref().inspect(|events| {
tracing::trace!(
"[{}] Successfully settled {} deal payments",
events[0].hash,
n_deal_ids
)
});
}))
}

async fn withdraw_balance<Client>(
client: Client,
account_keypair: MultiPairSigner,
amount: u128,
) -> Result<SubmissionResult<PolkaStorageConfig>, subxt::Error>
wait_for_finalization: bool,
) -> Result<Option<SubmissionResult<HashOfPsc, MktEvents::BalanceWithdrawn>>, subxt::Error>
where
Client: MarketClientExt,
{
let submission_result = client.withdraw_balance(&account_keypair, amount).await?;
tracing::debug!(
"[{}] Successfully withdrew {} from Market Balance",
submission_result.hash,
amount
);

Ok(submission_result)
let submission_result = client
.withdraw_balance(&account_keypair, amount, wait_for_finalization)
.await?;
Ok(submission_result.inspect(|sr| {
let _ = sr.as_ref().inspect(|events| {
tracing::trace!(
"[{}] Successfully withdrew {} from Market Balance",
events[0].hash,
amount
)
});
}))
}
}
48 changes: 48 additions & 0 deletions cli/polka-storage/storagext-cli/src/cmd/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,51 @@
pub mod market;
pub mod storage_provider;
pub mod system;

use storagext::runtime::{HashOfPsc, SubmissionResult};

use crate::OutputFormat;

pub(crate) fn display_submission_result<Variant: std::fmt::Debug>(
opt_result: Option<SubmissionResult<HashOfPsc, Variant>>,
output_format: OutputFormat,
) -> Result<(), anyhow::Error> {
if opt_result.is_none() {
return Ok(());
}
let result = opt_result.expect("expect some, checked before");

match result {
Ok(events) => {
events
.iter()
.for_each(|e| println!("[{}] {:?}", e.hash, e.variant));
match &events[0].event {
neutrinoks marked this conversation as resolved.
Show resolved Hide resolved
storagext::runtime::Event::Market(e) => {
display::<_>(events[0].hash, e, output_format)?
}
storagext::runtime::Event::StorageProvider(e) => {
display::<_>(events[0].hash, e, output_format)?
}
_ => return Ok(()),
}
}
Err(_) => {
println!("Extrinsic failed: {}!", std::any::type_name::<Variant>());
}
}

Ok(())
}

fn display<E>(hash: HashOfPsc, event: E, output_format: OutputFormat) -> Result<(), anyhow::Error>
where
E: std::fmt::Display + serde::Serialize,
{
let output = output_format.format(&event)?;
match output_format {
OutputFormat::Plain => println!("[{}] {}", hash, output),
OutputFormat::Json => println!("{}", output),
}
Ok(())
}
Loading