Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(test): Fixes a bug in the sending_transactions_using_lightwalletd test #9052

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion zebrad/src/components/mempool/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ pub enum TransactionDownloadVerifyError {
#[error("transaction download / verification was cancelled")]
Cancelled,

#[error("transaction did not pass consensus validation")]
#[error("transaction did not pass consensus validation, error: {0}")]
Invalid(#[from] zebra_consensus::error::TransactionError),
}

Expand Down
191 changes: 138 additions & 53 deletions zebrad/tests/common/lightwalletd/send_transaction_test.rs
Copy link
Contributor Author

@arya2 arya2 Nov 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test could also fail due to transactions having unpaid actions (which are allowed by default in blocks but rejected by default in the mempool).

I'm looking into why it failed here: https://github.com/ZcashFoundation/zebra/actions/runs/11979091682/job/33400891720?pr=9052#step:15:844 where that transaction did pay the conventional fee.

The log also indicates that lightwalletd can't parse errors returned from Zebra's sendrawtransaction method, seemingly because it's expecting a colon-separated string but is receiving JSON.

Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@
//! already been seen in a block.

use std::{cmp::min, sync::Arc, time::Duration};
use tower::BoxError;

use color_eyre::eyre::Result;
use color_eyre::eyre::{eyre, Result};

use zebra_chain::{
parameters::Network::{self, *},
block::Block,
parameters::Network::*,
serialization::ZcashSerialize,
transaction::{self, Transaction},
};
use zebra_node_services::rpc_client::RpcRequestClient;
use zebra_rpc::queue::CHANNEL_AND_QUEUE_CAPACITY;
use zebra_test::prelude::TestChild;
use zebrad::components::mempool::downloads::MAX_INBOUND_CONCURRENCY;

use crate::common::{
Expand All @@ -34,10 +38,14 @@ use crate::common::{
lightwalletd::{
can_spawn_lightwalletd_for_rpc, spawn_lightwalletd_for_rpc,
sync::wait_for_zebrad_and_lightwalletd_sync,
wallet_grpc::{self, connect_to_lightwalletd, Empty, Exclude},
wallet_grpc::{
self, compact_tx_streamer_client::CompactTxStreamerClient, connect_to_lightwalletd,
Empty, Exclude,
},
},
regtest::MiningRpcMethods,
sync::LARGE_CHECKPOINT_TIMEOUT,
test_type::TestType::{self, *},
test_type::TestType::*,
};

/// The maximum number of transactions we want to send in the test.
Expand Down Expand Up @@ -85,11 +93,19 @@ pub async fn run() -> Result<()> {
"running gRPC send transaction test using lightwalletd & zebrad",
);

let transactions =
load_transactions_from_future_blocks(network.clone(), test_type, test_name).await?;
let mut count = 0;
let blocks: Vec<Block> =
get_future_blocks(&network, test_type, test_name, MAX_NUM_FUTURE_BLOCKS)
.await?
.into_iter()
.take_while(|block| {
count += block.transactions.len() - 1;
count <= max_sent_transactions()
})
.collect();

tracing::info!(
transaction_count = ?transactions.len(),
blocks_count = ?blocks.len(),
partial_sync_path = ?zebrad_state_path,
"got transactions to send, spawning isolated zebrad...",
);
Expand All @@ -113,6 +129,8 @@ pub async fn run() -> Result<()> {

let zebra_rpc_address = zebra_rpc_address.expect("lightwalletd test must have RPC port");

let zebrad_rpc_client = RpcRequestClient::new(zebra_rpc_address);

tracing::info!(
?test_type,
?zebra_rpc_address,
Expand Down Expand Up @@ -149,6 +167,7 @@ pub async fn run() -> Result<()> {
?lightwalletd_rpc_port,
"connecting gRPC client to lightwalletd...",
);
zebrad.wait_for_stdout_line(None);

let mut rpc_client = connect_to_lightwalletd(lightwalletd_rpc_port).await?;

Expand All @@ -163,14 +182,73 @@ pub async fn run() -> Result<()> {
.get_mempool_tx(Exclude { txid: vec![] })
.await?
.into_inner();
zebrad.wait_for_stdout_line(None);

let mut has_tx_with_shielded_elements = false;
let mut counter = 0;

for block in blocks {
zebrad.wait_for_stdout_line(None);

let (zebrad_child, has_shielded_elements, count) = send_transactions_from_block(
zebrad,
&mut rpc_client,
&zebrad_rpc_client,
block.clone(),
)
.await?;

zebrad = zebrad_child;
zebrad.wait_for_stdout_line(None);

has_tx_with_shielded_elements |= has_shielded_elements;
counter += count;

zebrad_rpc_client.submit_block(block).await?;
}

// GetMempoolTx: make sure at least one of the transactions were inserted into the mempool.
//
// TODO: Update `load_transactions_from_future_blocks()` to return block height offsets and,
// only check if a transaction from the first block has shielded elements
assert!(
!has_tx_with_shielded_elements || counter >= 1,
"failed to read v4+ transactions with shielded elements \
from future blocks in mempool via lightwalletd"
);

Ok(())
}

/// Sends non-coinbase transactions from a block to the mempool, verifies that the transactions
/// can be found in the mempool via lightwalletd, and commits the block to Zebra's chainstate.
///
/// Returns the zebrad test child that's handling the RPC requests.

#[tracing::instrument(skip_all)]
async fn send_transactions_from_block(
mut zebrad: TestChild<tempfile::TempDir>,
rpc_client: &mut CompactTxStreamerClient<tonic::transport::Channel>,
zebrad_rpc_client: &RpcRequestClient,
block: Block,
) -> Result<(TestChild<tempfile::TempDir>, bool, usize)> {
// Lightwalletd won't call `get_raw_mempool` again until 2 seconds after the last call:
// <https://github.com/zcash/lightwalletd/blob/master/frontend/service.go#L482>
//
// So we need to wait much longer than that here.
let sleep_until_lwd_last_mempool_refresh =
tokio::time::sleep(std::time::Duration::from_secs(4));

let transactions: Vec<_> = block
.transactions
.iter()
.filter(|tx| !tx.is_coinbase())
.collect();

if !transactions.is_empty() {
return Ok((zebrad, false, 0));
}

let transaction_hashes: Vec<transaction::Hash> =
transactions.iter().map(|tx| tx.hash()).collect();

Expand All @@ -181,7 +259,7 @@ pub async fn run() -> Result<()> {
);

let mut has_tx_with_shielded_elements = false;
for transaction in transactions {
for &transaction in &transactions {
let transaction_hash = transaction.hash();

// See <https://github.com/zcash/lightwalletd/blob/master/parser/transaction.go#L367>
Expand All @@ -194,28 +272,46 @@ pub async fn run() -> Result<()> {
};

tracing::info!(?transaction_hash, "sending transaction...");
// TODO: This may consume expected lines, try increasing the tracing buffer size instead if a full stdout pipe is causing panics.
zebrad.wait_for_stdout_line(None);

let request = prepare_send_transaction_request(transaction.clone());

match rpc_client.send_transaction(request).await {
Ok(response) => assert_eq!(response.into_inner(), expected_response),
Err(err) => {
tracing::warn!(?err, "failed to send transaction");
zebrad_rpc_client
.send_transaction(transaction)
.await
.map_err(|e| eyre!(e))?;
}
};
}

let request = prepare_send_transaction_request(transaction);

let response = rpc_client.send_transaction(request).await?.into_inner();

assert_eq!(response, expected_response);
if transactions.len() >= 10 {
// Check if some transaction is sent to mempool,
// Fails if there are only coinbase transactions in the first 50 future blocks
tracing::info!("waiting for mempool to verify some transactions...");
zebrad.expect_stdout_line_matches("sending mempool transaction broadcast")?;
} else {
zebrad.wait_for_stdout_line(None);
}

// Check if some transaction is sent to mempool,
// Fails if there are only coinbase transactions in the first 50 future blocks
tracing::info!("waiting for mempool to verify some transactions...");
zebrad.expect_stdout_line_matches("sending mempool transaction broadcast")?;
// Wait for more transactions to verify, `GetMempoolTx` only returns txs where tx.HasShieldedElements()
// <https://github.com/zcash/lightwalletd/blob/master/frontend/service.go#L537>
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
zebrad.wait_for_stdout_line(None);

sleep_until_lwd_last_mempool_refresh.await;
zebrad.wait_for_stdout_line(None);

tracing::info!("calling GetMempoolTx gRPC to fetch transactions...");
let mut transactions_stream = rpc_client
.get_mempool_tx(Exclude { txid: vec![] })
.await?
.into_inner();
zebrad.wait_for_stdout_line(None);

// Sometimes lightwalletd doesn't check the mempool, and waits for the next block instead.
// If that happens, we skip the rest of the test.
Expand All @@ -233,12 +329,14 @@ pub async fn run() -> Result<()> {

if tx_log.is_err() {
tracing::info!("lightwalletd didn't query the mempool, skipping mempool contents checks");
return Ok(());
return Ok((zebrad, has_tx_with_shielded_elements, 0));
}

tracing::info!("checking the mempool contains some of the sent transactions...");
let mut counter = 0;
while let Some(tx) = transactions_stream.message().await? {
zebrad.wait_for_stdout_line(None);

let hash: [u8; 32] = tx.hash.clone().try_into().expect("hash is correct length");
let hash = transaction::Hash::from_bytes_in_display_order(&hash);

Expand All @@ -250,52 +348,21 @@ pub async fn run() -> Result<()> {

counter += 1;
}

// GetMempoolTx: make sure at least one of the transactions were inserted into the mempool.
//
// TODO: Update `load_transactions_from_future_blocks()` to return block height offsets and,
// only check if a transaction from the first block has shielded elements
assert!(
!has_tx_with_shielded_elements || counter >= 1,
"failed to read v4+ transactions with shielded elements from future blocks in mempool via lightwalletd"
);
zebrad.wait_for_stdout_line(None);

// TODO: GetMempoolStream: make sure at least one of the transactions were inserted into the mempool.
tracing::info!("calling GetMempoolStream gRPC to fetch transactions...");
let mut transaction_stream = rpc_client.get_mempool_stream(Empty {}).await?.into_inner();
zebrad.wait_for_stdout_line(None);

let mut _counter = 0;
while let Some(_tx) = transaction_stream.message().await? {
// TODO: check tx.data or tx.height here?
_counter += 1;
zebrad.wait_for_stdout_line(None);
}

Ok(())
}

/// Loads transactions from a few block(s) after the chain tip of the cached state.
///
/// Returns a list of non-coinbase transactions from blocks that have not been finalized to disk
/// in the `ZEBRA_CACHED_STATE_DIR`.
///
/// ## Panics
///
/// If the provided `test_type` doesn't need an rpc server and cached state
#[tracing::instrument]
async fn load_transactions_from_future_blocks(
network: Network,
test_type: TestType,
test_name: &str,
) -> Result<Vec<Arc<Transaction>>> {
let transactions = get_future_blocks(&network, test_type, test_name, MAX_NUM_FUTURE_BLOCKS)
.await?
.into_iter()
.flat_map(|block| block.transactions)
.filter(|transaction| !transaction.is_coinbase())
.take(max_sent_transactions())
.collect();

Ok(transactions)
Ok((zebrad, has_tx_with_shielded_elements, counter))
}

/// Prepare a request to send to lightwalletd that contains a transaction to be sent.
Expand All @@ -307,3 +374,21 @@ fn prepare_send_transaction_request(transaction: Arc<Transaction>) -> wallet_grp
height: 0,
}
}

trait SendTransactionMethod {
async fn send_transaction(
&self,
transaction: &Arc<Transaction>,
) -> Result<zebra_rpc::methods::SentTransactionHash, BoxError>;
}

impl SendTransactionMethod for RpcRequestClient {
async fn send_transaction(
&self,
transaction: &Arc<Transaction>,
) -> Result<zebra_rpc::methods::SentTransactionHash, BoxError> {
let tx_data = hex::encode(transaction.zcash_serialize_to_vec()?);
self.json_result_from_call("sendrawtransaction", format!(r#"["{tx_data}"]"#))
.await
}
}
2 changes: 1 addition & 1 deletion zebrad/tests/common/lightwalletd/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub fn wait_for_zebrad_and_lightwalletd_sync<
wait_for_zebrad_mempool: bool,
wait_for_zebrad_tip: bool,
) -> Result<(TestChild<TempDir>, TestChild<P>)> {
let is_zebrad_finished = AtomicBool::new(false);
let is_zebrad_finished = AtomicBool::new(!wait_for_zebrad_tip);
let is_lightwalletd_finished = AtomicBool::new(false);

let is_zebrad_finished = &is_zebrad_finished;
Expand Down
Loading