Skip to content

Commit

Permalink
Remove subxt (#460)
Browse files Browse the repository at this point in the history
* Remove subxt

Removes ~20 crates from our Cargo.lock.

Removes downloading the metadata and enables removing the getMetadata RPC route
(relevant to #379).

Moves forward #337.

Done now due to distinctions in the subxt 0.32 API surface which make it
justifiable to not update.

* fmt, update due to deny triggering on a yanked crate

* Correct the handling of substrate_block_notifier now that it's ephemeral, not long-lived

* Correct URL in tests/coordinator from ws to http
  • Loading branch information
kayabaNerve authored Nov 28, 2023
1 parent 571195b commit 695d1f0
Show file tree
Hide file tree
Showing 30 changed files with 475 additions and 720 deletions.
439 changes: 54 additions & 385 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions coordinator/src/cosign_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl<D: Db> CosignEvaluator<D> {
}

async fn update_stakes(&self) -> Result<(), SeraiError> {
let serai = self.serai.as_of(self.serai.latest_block_hash().await?);
let serai = self.serai.as_of_latest_finalized_block().await?;

let mut stakes = HashMap::new();
for network in NETWORKS {
Expand Down Expand Up @@ -112,13 +112,13 @@ impl<D: Db> CosignEvaluator<D> {
}

// If this an old cosign (older than a day), drop it
let latest_block = self.serai.latest_block().await?;
let latest_block = self.serai.latest_finalized_block().await?;
if (cosign.block_number + (24 * 60 * 60 / 6)) < latest_block.number() {
log::debug!("received old cosign supposedly signed by {:?}", cosign.network);
return Ok(());
}

let Some(block) = self.serai.block_by_number(cosign.block_number).await? else {
let Some(block) = self.serai.finalized_block_by_number(cosign.block_number).await? else {
log::warn!("received cosign with a block number which doesn't map to a block");
return Ok(());
};
Expand Down
4 changes: 2 additions & 2 deletions coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1222,8 +1222,8 @@ async fn main() {

let serai = || async {
loop {
let Ok(serai) = Serai::new(&format!(
"ws://{}:9944",
let Ok(serai) = Serai::new(format!(
"http://{}:9944",
serai_env::var("SERAI_HOSTNAME").expect("Serai hostname wasn't provided")
))
.await
Expand Down
56 changes: 39 additions & 17 deletions coordinator/src/substrate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use serai_db::DbTxn;

use processor_messages::SubstrateContext;

use futures::stream::StreamExt;
use tokio::{sync::mpsc, time::sleep};

use crate::{
Expand Down Expand Up @@ -81,7 +80,7 @@ async fn handle_new_set<D: Db>(
assert_eq!(block.number(), 0);
// Use the next block's time
loop {
let Ok(Some(res)) = serai.block_by_number(1).await else {
let Ok(Some(res)) = serai.finalized_block_by_number(1).await else {
sleep(Duration::from_secs(5)).await;
continue;
};
Expand Down Expand Up @@ -340,7 +339,7 @@ async fn handle_new_blocks<D: Db, Pro: Processors>(
next_block: &mut u64,
) -> Result<(), SeraiError> {
// Check if there's been a new Substrate block
let latest_number = serai.latest_block().await?.number();
let latest_number = serai.latest_finalized_block().await?.number();

// TODO: If this block directly builds off a cosigned block *and* doesn't contain events, mark
// cosigned,
Expand Down Expand Up @@ -369,7 +368,7 @@ async fn handle_new_blocks<D: Db, Pro: Processors>(
None => {
let serai = serai.as_of(
serai
.block_by_number(block)
.finalized_block_by_number(block)
.await?
.expect("couldn't get block which should've been finalized")
.hash(),
Expand Down Expand Up @@ -432,7 +431,7 @@ async fn handle_new_blocks<D: Db, Pro: Processors>(
skipped_block.map(|skipped_block| skipped_block + COSIGN_DISTANCE);
for block in (last_intended_to_cosign_block + 1) ..= latest_number {
let actual_block = serai
.block_by_number(block)
.finalized_block_by_number(block)
.await?
.expect("couldn't get block which should've been finalized");
SeraiBlockNumber::set(&mut txn, actual_block.hash(), &block);
Expand Down Expand Up @@ -535,7 +534,7 @@ async fn handle_new_blocks<D: Db, Pro: Processors>(
processors,
serai,
serai
.block_by_number(b)
.finalized_block_by_number(b)
.await?
.expect("couldn't get block before the latest finalized block"),
)
Expand All @@ -561,6 +560,7 @@ pub async fn scan_task<D: Db, Pro: Processors>(
let mut db = SubstrateDb::new(db);
let mut next_substrate_block = db.next_block();

/*
let new_substrate_block_notifier = {
let serai = &serai;
move || async move {
Expand All @@ -575,31 +575,55 @@ pub async fn scan_task<D: Db, Pro: Processors>(
}
}
};
let mut substrate_block_notifier = new_substrate_block_notifier().await;
*/
// TODO: Restore the above subscription-based system
let new_substrate_block_notifier = {
let serai = &serai;
move |next_substrate_block| async move {
loop {
match serai.latest_finalized_block().await {
Ok(latest) => {
if latest.header().number >= next_substrate_block {
return latest;
} else {
sleep(Duration::from_secs(3)).await;
}
}
Err(e) => {
log::error!("couldn't communicate with serai node: {e}");
sleep(Duration::from_secs(5)).await;
}
}
}
}
};

loop {
// await the next block, yet if our notifier had an error, re-create it
{
let Ok(next_block) =
tokio::time::timeout(Duration::from_secs(60), substrate_block_notifier.next()).await
let Ok(_) = tokio::time::timeout(
Duration::from_secs(60),
new_substrate_block_notifier(next_substrate_block),
)
.await
else {
// Timed out, which may be because Serai isn't finalizing or may be some issue with the
// notifier
if serai.latest_block().await.map(|block| block.number()).ok() ==
if serai.latest_finalized_block().await.map(|block| block.number()).ok() ==
Some(next_substrate_block.saturating_sub(1))
{
log::info!("serai hasn't finalized a block in the last 60s...");
} else {
substrate_block_notifier = new_substrate_block_notifier().await;
}
continue;
};

/*
// next_block is a Option<Result>
if next_block.and_then(Result::ok).is_none() {
substrate_block_notifier = new_substrate_block_notifier().await;
substrate_block_notifier = new_substrate_block_notifier(next_substrate_block);
continue;
}
*/
}

match handle_new_blocks(
Expand Down Expand Up @@ -632,12 +656,10 @@ pub(crate) async fn get_expected_next_batch(serai: &Serai, network: NetworkId) -
}
first = false;

let Ok(latest_block) = serai.latest_block().await else {
let Ok(serai) = serai.as_of_latest_finalized_block().await else {
continue;
};
let Ok(last) =
serai.as_of(latest_block.hash()).in_instructions().last_batch_for_network(network).await
else {
let Ok(last) = serai.in_instructions().last_batch_for_network(network).await else {
continue;
};
break if let Some(last) = last { last + 1 } else { 0 };
Expand Down
15 changes: 12 additions & 3 deletions coordinator/src/tests/tributary/dkg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,13 +321,22 @@ async fn dkg_test() {
async move {
// Version, Pallet, Call, Network, Key Pair, Signature
let expected_len = 1 + 1 + 1 + 1 + 32 + 1 + key_pair.1.len() + 64;
assert_eq!(tx.len(), expected_len);
// It's length prefixed
assert_eq!(tx.len(), 2 + expected_len);
let expected_len = u16::try_from(expected_len).unwrap();

// Check the encoded length
// This is the compact encoding from SCALE, specifically the two-byte length encoding case
let bottom_six = expected_len & 0b111111;
let upper_eight = expected_len >> 6;
assert_eq!(u8::try_from((bottom_six << 2) | 1).unwrap(), tx[0]);
assert_eq!(u8::try_from(upper_eight).unwrap(), tx[1]);

// Version
assert_eq!(tx[0], 4);
assert_eq!(tx[2], 4);

// Call
let tx = serai_client::runtime::RuntimeCall::decode(&mut &tx[1 ..]).unwrap();
let tx = serai_client::runtime::RuntimeCall::decode(&mut &tx[3 ..]).unwrap();
match tx {
serai_client::runtime::RuntimeCall::ValidatorSets(
serai_client::runtime::validator_sets::Call::set_keys {
Expand Down
2 changes: 1 addition & 1 deletion coordinator/src/tributary/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ pub(crate) async fn scan_tributaries_task<
// creation
// TODO2: Differentiate connection errors from invariants
Err(e) => {
if let Ok(serai) = serai.with_current_latest_block().await {
if let Ok(serai) = serai.as_of_latest_finalized_block().await {
let serai = serai.validator_sets();
// Check if this failed because the keys were already set by someone
// else
Expand Down
10 changes: 4 additions & 6 deletions substrate/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,15 @@ rustdoc-args = ["--cfg", "docsrs"]
zeroize = "^1.5"
thiserror = { version = "1", optional = true }

futures = "0.3"

hex = "0.4"
scale = { package = "parity-scale-codec", version = "3" }
scale-info = { version = "2", optional = true }
serde = { version = "1", features = ["derive"] }
serde_json = "1"

sp-core = { git = "https://github.com/serai-dex/substrate" }
sp-runtime = { git = "https://github.com/serai-dex/substrate" }
serai-runtime = { path = "../runtime", version = "0.1" }

subxt = { version = "0.29", default-features = false, features = ["jsonrpsee-ws"], optional = true }
simple-request = { path = "../../common/request", version = "0.1" }

bitcoin = { version = "0.31", optional = true }

Expand All @@ -50,7 +48,7 @@ dockertest = "0.4"
serai-docker-tests = { path = "../../tests/docker" }

[features]
serai = ["thiserror", "scale-info", "subxt"]
serai = ["thiserror"]

networks = []
bitcoin = ["networks", "dep:bitcoin"]
Expand Down
48 changes: 31 additions & 17 deletions substrate/client/src/serai/coins.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use scale::Encode;

use serai_runtime::{
primitives::{SeraiAddress, SubstrateAmount, Amount, Coin, Balance},
coins, Coins, Runtime,
primitives::{SeraiAddress, Amount, Coin, Balance},
coins, Runtime,
};
pub use coins::primitives;
use primitives::OutInstructionWithBalance;

use crate::{TemporalSerai, SeraiError, scale_value};
use crate::{TemporalSerai, SeraiError};

const PALLET: &str = "Coins";

Expand All @@ -19,39 +21,51 @@ impl<'a> SeraiCoins<'a> {
}

pub async fn mint_events(&self) -> Result<Vec<CoinsEvent>, SeraiError> {
self.0.events::<Coins, _>(|event| matches!(event, CoinsEvent::Mint { .. })).await
self
.0
.events(|event| {
if let serai_runtime::RuntimeEvent::Coins(event) = event {
Some(event).filter(|event| matches!(event, CoinsEvent::Mint { .. }))
} else {
None
}
})
.await
}

pub async fn burn_with_instruction_events(&self) -> Result<Vec<CoinsEvent>, SeraiError> {
self.0.events::<Coins, _>(|event| matches!(event, CoinsEvent::BurnWithInstruction { .. })).await
self
.0
.events(|event| {
if let serai_runtime::RuntimeEvent::Coins(event) = event {
Some(event).filter(|event| matches!(event, CoinsEvent::BurnWithInstruction { .. }))
} else {
None
}
})
.await
}

pub async fn coin_supply(&self, coin: Coin) -> Result<Amount, SeraiError> {
Ok(Amount(
self
.0
.storage::<SubstrateAmount>(PALLET, "Supply", Some(vec![scale_value(coin)]))
.await?
.unwrap_or(0),
))
Ok(self.0.storage(PALLET, "Supply", coin).await?.unwrap_or(Amount(0)))
}

pub async fn coin_balance(
&self,
coin: Coin,
address: SeraiAddress,
) -> Result<Amount, SeraiError> {
Ok(Amount(
Ok(
self
.0
.storage::<SubstrateAmount>(
.storage(
PALLET,
"Balances",
Some(vec![scale_value(address), scale_value(coin)]),
(sp_core::hashing::blake2_128(&address.encode()), &address.0, coin),
)
.await?
.unwrap_or(0),
))
.unwrap_or(Amount(0)),
)
}

pub fn transfer(to: SeraiAddress, balance: Balance) -> serai_runtime::RuntimeCall {
Expand Down
17 changes: 14 additions & 3 deletions substrate/client/src/serai/dex.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use sp_core::bounded_vec::BoundedVec;
use serai_runtime::{
primitives::{SeraiAddress, Amount, Coin},
dex, Dex, Runtime,
dex, Runtime,
};

use crate::{SeraiError, TemporalSerai};
Expand All @@ -11,8 +11,19 @@ pub type DexEvent = dex::Event<Runtime>;
#[derive(Clone, Copy)]
pub struct SeraiDex<'a>(pub(crate) TemporalSerai<'a>);
impl<'a> SeraiDex<'a> {
pub async fn all_events(&self) -> Result<Vec<DexEvent>, SeraiError> {
self.0.events::<Dex, _>(|_| true).await
pub async fn events(&self) -> Result<Vec<DexEvent>, SeraiError> {
self
.0
.events(
|event| {
if let serai_runtime::RuntimeEvent::Dex(event) = event {
Some(event)
} else {
None
}
},
)
.await
}

pub fn add_liquidity(
Expand Down
16 changes: 11 additions & 5 deletions substrate/client/src/serai/in_instructions.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use serai_runtime::{in_instructions, InInstructions, Runtime};
use serai_runtime::{in_instructions, Runtime};
pub use in_instructions::primitives;
use primitives::SignedBatch;

use crate::{
primitives::{BlockHash, NetworkId},
SeraiError, Serai, TemporalSerai, scale_value,
SeraiError, Serai, TemporalSerai,
};

pub type InInstructionsEvent = in_instructions::Event<Runtime>;
Expand All @@ -22,20 +22,26 @@ impl<'a> SeraiInInstructions<'a> {
&self,
network: NetworkId,
) -> Result<Option<BlockHash>, SeraiError> {
self.0.storage(PALLET, "LatestNetworkBlock", Some(vec![scale_value(network)])).await
self.0.storage(PALLET, "LatestNetworkBlock", network).await
}

pub async fn last_batch_for_network(
&self,
network: NetworkId,
) -> Result<Option<u32>, SeraiError> {
self.0.storage(PALLET, "LastBatch", Some(vec![scale_value(network)])).await
self.0.storage(PALLET, "LastBatch", network).await
}

pub async fn batch_events(&self) -> Result<Vec<InInstructionsEvent>, SeraiError> {
self
.0
.events::<InInstructions, _>(|event| matches!(event, InInstructionsEvent::Batch { .. }))
.events(|event| {
if let serai_runtime::RuntimeEvent::InInstructions(event) = event {
Some(event).filter(|event| matches!(event, InInstructionsEvent::Batch { .. }))
} else {
None
}
})
.await
}

Expand Down
Loading

0 comments on commit 695d1f0

Please sign in to comment.