From 5c2781e86be8efacab52c93a0bc2ee662ca56ec8 Mon Sep 17 00:00:00 2001 From: Hansie Odendaal <39146854+hansieodendaal@users.noreply.github.com> Date: Fri, 27 Oct 2023 15:34:11 +0200 Subject: [PATCH] feat: ban bad block-sync peers (#5871) Description --- - Added a check to ban a misbehaving peer after block sync when not supplying any or all of the blocks corresponding to the accumulated difficulty they claimed they had. - Added a check in the RPC block-sync server method not to try and supply blocks if it does not have both blocks corresponding to the start and end hash in its chain. - Moved all block sync RPC errors to the short ban category from the no=ban cetegrory. - Added happy path and ban integration-level unit tests for block sync. Motivation and Context --- The new unit tests that were added highlighted some issues where sync peers are not banned for their bad behaviour. How Has This Been Tested? --- Added new integration-level unit tests. What process can a PR reviewer use to test or verify this change? --- - Code walk through. - Review and run unit tests . Breaking Changes --- - [x] None - [ ] Requires data directory on base node to be deleted - [ ] Requires hard fork - [ ] Other - Please specify --------- Co-authored-by: SW van Heerden --- .../comms_interface/inbound_handlers.rs | 2 +- .../src/base_node/sync/block_sync/error.rs | 10 +- .../base_node/sync/block_sync/synchronizer.rs | 13 +- .../core/src/base_node/sync/rpc/service.rs | 10 +- .../core/src/base_node/sync/rpc/tests.rs | 5 +- base_layer/core/src/chain_storage/async_db.rs | 2 +- .../src/chain_storage/blockchain_database.rs | 35 ++-- .../core/src/chain_storage/db_transaction.rs | 20 +- .../core/src/chain_storage/lmdb_db/lmdb_db.rs | 12 +- base_layer/core/src/common/mod.rs | 2 +- base_layer/core/tests/helpers/sync.rs | 85 ++++++--- base_layer/core/tests/tests/block_sync.rs | 176 ++++++++++++++++++ base_layer/core/tests/tests/header_sync.rs | 21 ++- base_layer/core/tests/tests/mod.rs | 1 + base_layer/core/tests/tests/node_service.rs | 15 +- 15 files changed, 327 insertions(+), 82 deletions(-) create mode 100644 base_layer/core/tests/tests/block_sync.rs diff --git a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs index 4a21cbcac4..b85aa393ea 100644 --- a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs +++ b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs @@ -522,7 +522,7 @@ where B: BlockchainBackend + 'static } async fn check_exists_and_not_bad_block(&self, block: FixedHash) -> Result { - if self.blockchain_db.block_exists(block).await? { + if self.blockchain_db.chain_block_or_orphan_block_exists(block).await? { debug!( target: LOG_TARGET, "Block with hash `{}` already stored", diff --git a/base_layer/core/src/base_node/sync/block_sync/error.rs b/base_layer/core/src/base_node/sync/block_sync/error.rs index ad224fdf19..8f44e8a3e4 100644 --- a/base_layer/core/src/base_node/sync/block_sync/error.rs +++ b/base_layer/core/src/base_node/sync/block_sync/error.rs @@ -69,6 +69,8 @@ pub enum BlockSyncError { SyncRoundFailed, #[error("Could not find peer info")] PeerNotFound, + #[error("Peer did not supply all the blocks they claimed they had: {0}")] + PeerDidNotSupplyAllClaimedBlocks(String), } impl BlockSyncError { @@ -93,6 +95,7 @@ impl BlockSyncError { BlockSyncError::FixedHashSizeError(_) => "FixedHashSizeError", BlockSyncError::SyncRoundFailed => "SyncRoundFailed", BlockSyncError::PeerNotFound => "PeerNotFound", + BlockSyncError::PeerDidNotSupplyAllClaimedBlocks(_) => "PeerDidNotSupplyAllClaimedBlocks", } } } @@ -102,8 +105,6 @@ impl BlockSyncError { match self { // no ban BlockSyncError::AsyncTaskFailed(_) | - BlockSyncError::RpcError(_) | - BlockSyncError::RpcRequestError(_) | BlockSyncError::ChainStorageError(_) | BlockSyncError::ConnectivityError(_) | BlockSyncError::NoMoreSyncPeers(_) | @@ -113,7 +114,10 @@ impl BlockSyncError { BlockSyncError::SyncRoundFailed => None, // short ban - err @ BlockSyncError::MaxLatencyExceeded { .. } => Some(BanReason { + err @ BlockSyncError::MaxLatencyExceeded { .. } | + err @ BlockSyncError::PeerDidNotSupplyAllClaimedBlocks(_) | + err @ BlockSyncError::RpcError(_) | + err @ BlockSyncError::RpcRequestError(_) => Some(BanReason { reason: format!("{}", err), ban_duration: short_ban, }), diff --git a/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs index a67568a8e2..d61f8f8220 100644 --- a/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs @@ -128,7 +128,9 @@ impl<'a, B: BlockchainBackend + 'static> BlockSynchronizer<'a, B> { warn!(target: LOG_TARGET, "{} ({})", err, sync_round); continue; }, - Err(err) => return Err(err), + Err(err) => { + return Err(err); + }, } } } @@ -407,6 +409,15 @@ impl<'a, B: BlockchainBackend + 'static> BlockSynchronizer<'a, B> { last_sync_timer = Instant::now(); } + let accumulated_difficulty = self.db.get_chain_metadata().await?.accumulated_difficulty(); + if accumulated_difficulty < sync_peer.claimed_chain_metadata().accumulated_difficulty() { + return Err(BlockSyncError::PeerDidNotSupplyAllClaimedBlocks(format!( + "Their claimed difficulty: {}, our local difficulty after block sync: {}", + sync_peer.claimed_chain_metadata().accumulated_difficulty(), + accumulated_difficulty + ))); + } + if let Some(block) = current_block { self.hooks.call_on_complete_hooks(block, best_height); } diff --git a/base_layer/core/src/base_node/sync/rpc/service.rs b/base_layer/core/src/base_node/sync/rpc/service.rs index e3ea5fa9c8..6e96df4b8b 100644 --- a/base_layer/core/src/base_node/sync/rpc/service.rs +++ b/base_layer/core/src/base_node/sync/rpc/service.rs @@ -120,6 +120,9 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ .start_hash .try_into() .map_err(|_| RpcStatus::bad_request(&"Malformed starting hash received".to_string()))?; + if db.fetch_block_by_hash(hash, true).await.is_err() { + return Err(RpcStatus::not_found("Requested start block sync hash was not found")); + } let start_header = db .fetch_header_by_block_hash(hash) .await @@ -137,13 +140,14 @@ impl BaseNodeSyncService for BaseNodeSyncRpcServ ))); } - if start_height > metadata.height_of_longest_chain() { - return Ok(Streaming::empty()); - } let hash = message .end_hash .try_into() .map_err(|_| RpcStatus::bad_request(&"Malformed end hash received".to_string()))?; + if db.fetch_block_by_hash(hash, true).await.is_err() { + return Err(RpcStatus::not_found("Requested end block sync hash was not found")); + } + let end_header = db .fetch_header_by_block_hash(hash) .await diff --git a/base_layer/core/src/base_node/sync/rpc/tests.rs b/base_layer/core/src/base_node/sync/rpc/tests.rs index 8c6c0e861a..a791eb9e6c 100644 --- a/base_layer/core/src/base_node/sync/rpc/tests.rs +++ b/base_layer/core/src/base_node/sync/rpc/tests.rs @@ -75,7 +75,7 @@ mod sync_blocks { } #[tokio::test] - async fn it_sends_an_empty_response() { + async fn it_sends_bad_request_on_bad_response() { let (service, db, rpc_request_mock, _tmp) = setup(); let (_, chain) = create_main_chain(&db, block_specs!(["A->GB"])).await; @@ -86,8 +86,7 @@ mod sync_blocks { end_hash: block.hash().to_vec(), }; let req = rpc_request_mock.request_with_context(Default::default(), msg); - let mut streaming = service.sync_blocks(req).await.unwrap(); - assert!(streaming.next().await.is_none()); + assert!(service.sync_blocks(req).await.is_err()); } #[tokio::test] diff --git a/base_layer/core/src/chain_storage/async_db.rs b/base_layer/core/src/chain_storage/async_db.rs index 3cf0fe4025..f78e6e8f93 100644 --- a/base_layer/core/src/chain_storage/async_db.rs +++ b/base_layer/core/src/chain_storage/async_db.rs @@ -221,7 +221,7 @@ impl AsyncBlockchainDb { make_async_fn!(cleanup_all_orphans() -> (), "cleanup_all_orphans"); - make_async_fn!(block_exists(block_hash: BlockHash) -> bool, "block_exists"); + make_async_fn!(chain_block_or_orphan_block_exists(block_hash: BlockHash) -> bool, "block_exists"); make_async_fn!(bad_block_exists(block_hash: BlockHash) -> bool, "bad_block_exists"); diff --git a/base_layer/core/src/chain_storage/blockchain_database.rs b/base_layer/core/src/chain_storage/blockchain_database.rs index 1af0bbf972..0d917d8267 100644 --- a/base_layer/core/src/chain_storage/blockchain_database.rs +++ b/base_layer/core/src/chain_storage/blockchain_database.rs @@ -251,7 +251,7 @@ where B: BlockchainBackend txn.set_horizon_data(kernel_sum, utxo_sum); blockchain_db.write(txn)?; blockchain_db.store_pruning_horizon(config.pruning_horizon)?; - } else if !blockchain_db.block_exists(genesis_block.accumulated_data().hash)? { + } else if !blockchain_db.chain_block_or_orphan_block_exists(genesis_block.accumulated_data().hash)? { // Check the genesis block in the DB. error!( target: LOG_TARGET, @@ -917,7 +917,7 @@ where B: BlockchainBackend after_lock - before_lock, ); - if db.contains(&DbKey::BlockHash(block_hash))? { + if db.contains(&DbKey::HeaderHash(block_hash))? { return Ok(BlockAddResult::BlockExists); } if db.bad_block_exists(block_hash)? { @@ -1083,9 +1083,10 @@ where B: BlockchainBackend } /// Returns true if this block exists in the chain, or is orphaned. - pub fn block_exists(&self, hash: BlockHash) -> Result { + pub fn chain_block_or_orphan_block_exists(&self, hash: BlockHash) -> Result { let db = self.db_read_access()?; - Ok(db.contains(&DbKey::BlockHash(hash))? || db.contains(&DbKey::OrphanBlock(hash))?) + // we need to check if the block accumulated data exists, and the header might exist without a body + Ok(db.fetch_block_accumulated_data(&hash)?.is_some() || db.contains(&DbKey::OrphanBlock(hash))?) } /// Returns true if this block exists in the chain, or is orphaned. @@ -1404,7 +1405,7 @@ pub fn calculate_validator_node_mr(validator_nodes: &[(PublicKey, [u8; 32])]) -> } pub fn fetch_header(db: &T, block_num: u64) -> Result { - fetch!(db, block_num, BlockHeader) + fetch!(db, block_num, HeaderHeight) } pub fn fetch_headers( @@ -1422,8 +1423,8 @@ pub fn fetch_headers( #[allow(clippy::cast_possible_truncation)] let mut headers = Vec::with_capacity((end_inclusive - start) as usize); for h in start..=end_inclusive { - match db.fetch(&DbKey::BlockHeader(h))? { - Some(DbValue::BlockHeader(header)) => { + match db.fetch(&DbKey::HeaderHeight(h))? { + Some(DbValue::HeaderHeight(header)) => { headers.push(*header); }, Some(_) => unreachable!(), @@ -1476,7 +1477,7 @@ fn fetch_header_by_block_hash( db: &T, hash: BlockHash, ) -> Result, ChainStorageError> { - try_fetch!(db, hash, BlockHash) + try_fetch!(db, hash, HeaderHash) } fn fetch_orphan(db: &T, hash: BlockHash) -> Result { @@ -2367,7 +2368,7 @@ fn get_orphan_link_main_chain( // If this hash is part of the main chain, we're done - since curr_hash has already been set to the previous // hash, the chain Vec does not include the fork block in common with both chains - if db.contains(&DbKey::BlockHash(curr_hash))? { + if db.contains(&DbKey::HeaderHash(curr_hash))? { break; } } @@ -2893,7 +2894,7 @@ mod test { // Check 2b was added let access = test.db_write_access(); let block = orphan_chain_b.get("2b").unwrap().clone(); - assert!(access.contains(&DbKey::BlockHash(*block.hash())).unwrap()); + assert!(access.contains(&DbKey::HeaderHash(*block.hash())).unwrap()); // Check 7d is the tip let block = orphan_chain_d.get("7d").unwrap().clone(); @@ -2902,7 +2903,7 @@ mod test { let metadata = access.fetch_chain_metadata().unwrap(); assert_eq!(metadata.best_block(), block.hash()); assert_eq!(metadata.height_of_longest_chain(), block.height()); - assert!(access.contains(&DbKey::BlockHash(*block.hash())).unwrap()); + assert!(access.contains(&DbKey::HeaderHash(*block.hash())).unwrap()); let mut all_blocks = main_chain .into_iter() @@ -2916,8 +2917,8 @@ mod test { for (height, name) in expected_chain.iter().enumerate() { let expected_block = all_blocks.get(*name).unwrap(); unpack_enum!( - DbValue::BlockHeader(found_block) = - access.fetch(&DbKey::BlockHeader(height as u64)).unwrap().unwrap() + DbValue::HeaderHeight(found_block) = + access.fetch(&DbKey::HeaderHeight(height as u64)).unwrap().unwrap() ); assert_eq!(*found_block, *expected_block.header()); } @@ -2988,7 +2989,7 @@ mod test { // Check 2b was added let access = test.db_write_access(); let block = orphan_chain_b.get("2b").unwrap().clone(); - assert!(access.contains(&DbKey::BlockHash(*block.hash())).unwrap()); + assert!(access.contains(&DbKey::HeaderHash(*block.hash())).unwrap()); // Check 12b is the tip let block = orphan_chain_b.get("12b").unwrap().clone(); @@ -2997,7 +2998,7 @@ mod test { let metadata = access.fetch_chain_metadata().unwrap(); assert_eq!(metadata.best_block(), block.hash()); assert_eq!(metadata.height_of_longest_chain(), block.height()); - assert!(access.contains(&DbKey::BlockHash(*block.hash())).unwrap()); + assert!(access.contains(&DbKey::HeaderHash(*block.hash())).unwrap()); let mut all_blocks = main_chain.into_iter().chain(orphan_chain_b).collect::>(); all_blocks.insert("GB".to_string(), genesis); @@ -3008,8 +3009,8 @@ mod test { for (height, name) in expected_chain.iter().enumerate() { let expected_block = all_blocks.get(*name).unwrap(); unpack_enum!( - DbValue::BlockHeader(found_block) = - access.fetch(&DbKey::BlockHeader(height as u64)).unwrap().unwrap() + DbValue::HeaderHeight(found_block) = + access.fetch(&DbKey::HeaderHeight(height as u64)).unwrap().unwrap() ); assert_eq!(*found_block, *expected_block.header()); } diff --git a/base_layer/core/src/chain_storage/db_transaction.rs b/base_layer/core/src/chain_storage/db_transaction.rs index b3c47e1783..03403b61d5 100644 --- a/base_layer/core/src/chain_storage/db_transaction.rs +++ b/base_layer/core/src/chain_storage/db_transaction.rs @@ -464,16 +464,16 @@ impl fmt::Display for WriteOperation { #[derive(Debug, Clone, PartialEq, Eq)] pub enum DbKey { - BlockHeader(u64), - BlockHash(BlockHash), + HeaderHeight(u64), + HeaderHash(BlockHash), OrphanBlock(HashOutput), } impl DbKey { pub fn to_value_not_found_error(&self) -> ChainStorageError { let (entity, field, value) = match self { - DbKey::BlockHeader(v) => ("BlockHeader", "Height", v.to_string()), - DbKey::BlockHash(v) => ("Block", "Hash", v.to_hex()), + DbKey::HeaderHeight(v) => ("BlockHeader", "Height", v.to_string()), + DbKey::HeaderHash(v) => ("Header", "Hash", v.to_hex()), DbKey::OrphanBlock(v) => ("Orphan", "Hash", v.to_hex()), }; ChainStorageError::ValueNotFound { entity, field, value } @@ -482,16 +482,16 @@ impl DbKey { #[derive(Debug)] pub enum DbValue { - BlockHeader(Box), - BlockHash(Box), + HeaderHeight(Box), + HeaderHash(Box), OrphanBlock(Box), } impl Display for DbValue { fn fmt(&self, f: &mut Formatter) -> Result<(), Error> { match self { - DbValue::BlockHeader(_) => f.write_str("Block header"), - DbValue::BlockHash(_) => f.write_str("Block hash"), + DbValue::HeaderHeight(_) => f.write_str("Header by height"), + DbValue::HeaderHash(_) => f.write_str("Header by hash"), DbValue::OrphanBlock(_) => f.write_str("Orphan block"), } } @@ -500,8 +500,8 @@ impl Display for DbValue { impl Display for DbKey { fn fmt(&self, f: &mut Formatter) -> Result<(), Error> { match self { - DbKey::BlockHeader(v) => f.write_str(&format!("Block header (#{})", v)), - DbKey::BlockHash(v) => f.write_str(&format!("Block hash (#{})", v.to_hex())), + DbKey::HeaderHeight(v) => f.write_str(&format!("Header height (#{})", v)), + DbKey::HeaderHash(v) => f.write_str(&format!("Header hash (#{})", v.to_hex())), DbKey::OrphanBlock(v) => f.write_str(&format!("Orphan block hash ({})", v.to_hex())), } } diff --git a/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs b/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs index 51fe57c1f1..d823c87938 100644 --- a/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs +++ b/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs @@ -1806,11 +1806,11 @@ impl BlockchainBackend for LMDBDatabase { fn fetch(&self, key: &DbKey) -> Result, ChainStorageError> { let txn = self.read_transaction()?; let res = match key { - DbKey::BlockHeader(k) => { + DbKey::HeaderHeight(k) => { let val: Option = lmdb_get(&txn, &self.headers_db, k)?; - val.map(|val| DbValue::BlockHeader(Box::new(val))) + val.map(|val| DbValue::HeaderHeight(Box::new(val))) }, - DbKey::BlockHash(hash) => { + DbKey::HeaderHash(hash) => { let k: Option = self.fetch_height_from_hash(&txn, hash)?; match k { Some(k) => { @@ -1821,7 +1821,7 @@ impl BlockchainBackend for LMDBDatabase { k ); let val: Option = lmdb_get(&txn, &self.headers_db, &k)?; - val.map(|val| DbValue::BlockHash(Box::new(val))) + val.map(|val| DbValue::HeaderHash(Box::new(val))) }, None => { trace!( @@ -1843,8 +1843,8 @@ impl BlockchainBackend for LMDBDatabase { fn contains(&self, key: &DbKey) -> Result { let txn = self.read_transaction()?; Ok(match key { - DbKey::BlockHeader(k) => lmdb_exists(&txn, &self.headers_db, k)?, - DbKey::BlockHash(h) => lmdb_exists(&txn, &self.block_hashes_db, h.deref())?, + DbKey::HeaderHeight(k) => lmdb_exists(&txn, &self.headers_db, k)?, + DbKey::HeaderHash(h) => lmdb_exists(&txn, &self.block_hashes_db, h.deref())?, DbKey::OrphanBlock(k) => lmdb_exists(&txn, &self.orphans_db, k.deref())?, }) } diff --git a/base_layer/core/src/common/mod.rs b/base_layer/core/src/common/mod.rs index b9a2d30dc2..921f8790eb 100644 --- a/base_layer/core/src/common/mod.rs +++ b/base_layer/core/src/common/mod.rs @@ -42,7 +42,7 @@ pub type ConfidentialOutputHasher = DomainSeparatedConsensusHasher BlockSync { + BlockSync::from(vec![SyncPeer::from(PeerChainMetadata::new( + peer_node_interfaces.node_identity.node_id().clone(), + peer_node_interfaces.blockchain_db.get_chain_metadata().unwrap(), + None, + ))]) +} + +pub async fn sync_blocks_execute( + state_machine: &mut BaseNodeStateMachine, + block_sync: &mut BlockSync, +) -> StateEvent { + block_sync.next_event(state_machine).await +} + pub async fn create_network_with_local_and_peer_nodes() -> ( BaseNodeStateMachine, NodeInterfaces, @@ -140,53 +155,77 @@ pub async fn create_network_with_local_and_peer_nodes() -> ( #[allow(dead_code)] #[derive(Debug)] pub enum WhatToDelete { + BlocksAndHeaders, Blocks, Headers, } +// Private helper function to setup a delete a block transaction. +// Note: This private function will panic if the index is out of bounds - caller function's responsibility. +fn delete_block(txn: &mut DbTransaction, node: &NodeInterfaces, blocks: &[ChainBlock], index: usize) { + txn.delete_block(*blocks[index].hash()); + txn.delete_orphan(*blocks[index].hash()); + txn.set_best_block( + blocks[index + 1].height(), + blocks[index + 1].accumulated_data().hash, + blocks[index + 1].accumulated_data().total_accumulated_difficulty, + *node.blockchain_db.get_chain_metadata().unwrap().best_block(), + blocks[index + 1].to_chain_header().timestamp(), + ); +} + // Delete blocks and headers in reverse order; the first block in the slice wil not be deleted pub fn delete_some_blocks_and_headers( blocks_with_anchor: &[ChainBlock], instruction: WhatToDelete, node: &NodeInterfaces, - set_best_block: Option, ) { if blocks_with_anchor.is_empty() || blocks_with_anchor.len() < 2 { panic!("blocks must have at least 2 elements"); } - let set_best_block = set_best_block.unwrap_or(false); let mut blocks: Vec<_> = blocks_with_anchor.to_vec(); blocks.reverse(); for i in 0..blocks.len() - 1 { let mut txn = DbTransaction::new(); match instruction { + WhatToDelete::BlocksAndHeaders => { + delete_block(&mut txn, node, &blocks, i); + txn.delete_header(blocks[i].height()); + }, WhatToDelete::Blocks => { - txn.delete_block(*blocks[i].hash()); - txn.delete_orphan(*blocks[i].hash()); - if set_best_block { - txn.set_best_block( - blocks[i + 1].height(), - blocks[i + 1].accumulated_data().hash, - blocks[i + 1].accumulated_data().total_accumulated_difficulty, - *node.blockchain_db.get_chain_metadata().unwrap().best_block(), - blocks[i + 1].to_chain_header().timestamp(), - ); - } + delete_block(&mut txn, node, &blocks, i); }, WhatToDelete::Headers => { txn.delete_header(blocks[i].height()); }, } node.blockchain_db.write(txn).unwrap(); - // Note: Something is funny here... the block is deleted but the block exists in the db. This should be - // investigated and fixed as it will enhance the tests. If we uncomment the following assertion, the - // tests depending on this function will fail. - // match instruction { - // WhatToDelete::Blocks => { - // assert!(!node.blockchain_db.block_exists(*blocks[i].hash()).unwrap()); - // } - // WhatToDelete::Headers => {} - // } + match instruction { + WhatToDelete::BlocksAndHeaders => { + assert!(!node + .blockchain_db + .chain_block_or_orphan_block_exists(*blocks[i].hash()) + .unwrap()); + assert!(node + .blockchain_db + .fetch_header_by_block_hash(*blocks[i].hash()) + .unwrap() + .is_none()); + }, + WhatToDelete::Blocks => { + assert!(!node + .blockchain_db + .chain_block_or_orphan_block_exists(*blocks[i].hash()) + .unwrap()); + }, + WhatToDelete::Headers => { + assert!(node + .blockchain_db + .fetch_header_by_block_hash(*blocks[i].hash()) + .unwrap() + .is_none()); + }, + } } } diff --git a/base_layer/core/tests/tests/block_sync.rs b/base_layer/core/tests/tests/block_sync.rs new file mode 100644 index 0000000000..9011a4b276 --- /dev/null +++ b/base_layer/core/tests/tests/block_sync.rs @@ -0,0 +1,176 @@ +// Copyright 2022. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use tari_core::base_node::state_machine_service::states::StateEvent; + +use crate::helpers::{sync, sync::WhatToDelete}; + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_block_sync_happy_path() { + // env_logger::init(); // Set `$env:RUST_LOG = "trace"` + + // Create the network with Alice node and Bob node + let (mut alice_state_machine, alice_node, bob_node, initial_block, consensus_manager, key_manager) = + sync::create_network_with_local_and_peer_nodes().await; + + // Add some block to Bob's chain + let _bob_blocks = + sync::create_and_add_some_blocks(&bob_node, &initial_block, 5, &consensus_manager, &key_manager, &[3; 5]).await; + assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 5); + + // Alice attempts header sync + let mut header_sync = sync::initialize_sync_headers_with_ping_pong_data(&alice_node, &bob_node); + let event = sync::sync_headers_execute(&mut alice_state_machine, &mut header_sync).await; + match event.clone() { + StateEvent::HeadersSynchronized(..) => { + // Good, headers are synced + }, + _ => panic!("Expected HeadersSynchronized event"), + } + + // Alice attempts block sync + println!(); + assert_eq!(alice_node.blockchain_db.get_height().unwrap(), 0); + let mut block_sync = sync::initialize_sync_blocks(&bob_node); + let event = sync::sync_blocks_execute(&mut alice_state_machine, &mut block_sync).await; + match event { + StateEvent::BlocksSynchronized => { + // Good, blocks are synced + }, + _ => panic!("Expected BlocksSynchronized event"), + } + assert_eq!(alice_node.blockchain_db.get_height().unwrap(), 5); + + // Alice attempts block sync again + println!(); + let mut block_sync = sync::initialize_sync_blocks(&bob_node); + let event = sync::sync_blocks_execute(&mut alice_state_machine, &mut block_sync).await; + match event { + StateEvent::BlocksSynchronized => { + // Good, blocks are synced + }, + _ => panic!("Expected BlocksSynchronized event"), + } + assert_eq!(alice_node.blockchain_db.get_height().unwrap(), 5); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_block_sync_peer_supplies_no_blocks_with_ban() { + // env_logger::init(); // Set `$env:RUST_LOG = "trace"` + + // Create the network with Alice node and Bob node + let (mut alice_state_machine, alice_node, bob_node, initial_block, consensus_manager, key_manager) = + sync::create_network_with_local_and_peer_nodes().await; + + // Add some block to Bob's chain + let blocks = sync::create_and_add_some_blocks( + &bob_node, + &initial_block, + 10, + &consensus_manager, + &key_manager, + &[3; 10], + ) + .await; + assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 10); + // Add blocks to Alice's chain + sync::add_some_existing_blocks(&blocks[1..=5], &alice_node); + assert_eq!(alice_node.blockchain_db.get_height().unwrap(), 5); + + // Alice attempts header sync + let mut header_sync = sync::initialize_sync_headers_with_ping_pong_data(&alice_node, &bob_node); + let event = sync::sync_headers_execute(&mut alice_state_machine, &mut header_sync).await; + match event.clone() { + StateEvent::HeadersSynchronized(..) => { + // Good, headers are synced + }, + _ => panic!("Expected HeadersSynchronized event"), + } + + // Alice attempts block sync, Bob will not send any blocks and be banned + println!(); + let mut block_sync = sync::initialize_sync_blocks(&bob_node); + sync::delete_some_blocks_and_headers(&blocks[5..=10], WhatToDelete::Blocks, &bob_node); + assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 5); + let event = sync::sync_blocks_execute(&mut alice_state_machine, &mut block_sync).await; + match event { + StateEvent::BlockSyncFailed => { + // Good, Bob is banned. + }, + _ => panic!("Expected BlockSyncFailed event"), + } + assert_eq!(alice_node.blockchain_db.get_height().unwrap(), 5); + + // Bob will be banned + assert!(sync::wait_for_is_peer_banned(&alice_node, bob_node.node_identity.node_id(), 1).await); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_block_sync_peer_supplies_not_all_blocks_with_ban() { + // env_logger::init(); // Set `$env:RUST_LOG = "trace"` + + // Create the network with Alice node and Bob node + let (mut alice_state_machine, alice_node, bob_node, initial_block, consensus_manager, key_manager) = + sync::create_network_with_local_and_peer_nodes().await; + + // Add some block to Bob's chain + let blocks = sync::create_and_add_some_blocks( + &bob_node, + &initial_block, + 10, + &consensus_manager, + &key_manager, + &[3; 10], + ) + .await; + assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 10); + // Add blocks to Alice's chain + sync::add_some_existing_blocks(&blocks[1..=5], &alice_node); + assert_eq!(alice_node.blockchain_db.get_height().unwrap(), 5); + + // Alice attempts header sync + let mut header_sync = sync::initialize_sync_headers_with_ping_pong_data(&alice_node, &bob_node); + let event = sync::sync_headers_execute(&mut alice_state_machine, &mut header_sync).await; + match event.clone() { + StateEvent::HeadersSynchronized(..) => { + // Good, headers are synced + }, + _ => panic!("Expected HeadersSynchronized event"), + } + + // Alice attempts block sync, Bob will not send all blocks and be banned + println!(); + let mut block_sync = sync::initialize_sync_blocks(&bob_node); + sync::delete_some_blocks_and_headers(&blocks[8..=10], WhatToDelete::Blocks, &bob_node); + assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 8); + let event = sync::sync_blocks_execute(&mut alice_state_machine, &mut block_sync).await; + match event { + StateEvent::BlockSyncFailed => { + // Good, Bob is banned. + }, + _ => panic!("Expected BlockSyncFailed event"), + } + assert_eq!(alice_node.blockchain_db.get_height().unwrap(), 5); + + // Bob will be banned + assert!(sync::wait_for_is_peer_banned(&alice_node, bob_node.node_identity.node_id(), 1).await); +} diff --git a/base_layer/core/tests/tests/header_sync.rs b/base_layer/core/tests/tests/header_sync.rs index 63715e640f..5745f24125 100644 --- a/base_layer/core/tests/tests/header_sync.rs +++ b/base_layer/core/tests/tests/header_sync.rs @@ -189,14 +189,14 @@ async fn test_header_sync_uneven_headers_and_blocks_happy_path() { &[3; 10], ) .await; - sync::delete_some_blocks_and_headers(&blocks[5..=10], WhatToDelete::Blocks, &bob_node, Some(true)); - sync::delete_some_blocks_and_headers(&blocks[7..=10], WhatToDelete::Headers, &bob_node, None); + sync::delete_some_blocks_and_headers(&blocks[5..=10], WhatToDelete::Blocks, &bob_node); + sync::delete_some_blocks_and_headers(&blocks[7..=10], WhatToDelete::Headers, &bob_node); assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 5); assert_eq!(bob_node.blockchain_db.fetch_last_header().unwrap().height, 7); // Add blocks and headers to Alice's chain, with more headers than blocks sync::add_some_existing_blocks(&blocks[1..=10], &alice_node); - sync::delete_some_blocks_and_headers(&blocks[2..=10], WhatToDelete::Blocks, &alice_node, Some(true)); + sync::delete_some_blocks_and_headers(&blocks[2..=10], WhatToDelete::Blocks, &alice_node); assert_eq!(alice_node.blockchain_db.get_height().unwrap(), 2); assert_eq!(alice_node.blockchain_db.fetch_last_header().unwrap().height, 10); @@ -237,14 +237,14 @@ async fn test_header_sync_uneven_headers_and_blocks_peer_lies_about_pow_no_ban() &[3; 10], ) .await; - sync::delete_some_blocks_and_headers(&blocks[5..=10], WhatToDelete::Blocks, &bob_node, Some(true)); - sync::delete_some_blocks_and_headers(&blocks[7..=10], WhatToDelete::Headers, &bob_node, None); + sync::delete_some_blocks_and_headers(&blocks[5..=10], WhatToDelete::Blocks, &bob_node); + sync::delete_some_blocks_and_headers(&blocks[7..=10], WhatToDelete::Headers, &bob_node); assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 5); assert_eq!(bob_node.blockchain_db.fetch_last_header().unwrap().height, 7); // Add blocks and headers to Alice's chain, with more headers than blocks sync::add_some_existing_blocks(&blocks[1..=10], &alice_node); - sync::delete_some_blocks_and_headers(&blocks[2..=10], WhatToDelete::Blocks, &alice_node, Some(true)); + sync::delete_some_blocks_and_headers(&blocks[2..=10], WhatToDelete::Blocks, &alice_node); assert_eq!(alice_node.blockchain_db.get_height().unwrap(), 2); assert_eq!(alice_node.blockchain_db.fetch_last_header().unwrap().height, 10); @@ -252,7 +252,9 @@ async fn test_header_sync_uneven_headers_and_blocks_peer_lies_about_pow_no_ban() // Note: This behaviour is undetected! let mut header_sync = sync::initialize_sync_headers_with_ping_pong_data(&alice_node, &bob_node); // Remove blocks from Bpb's chain so his claimed metadata is better than what it actually is - sync::delete_some_blocks_and_headers(&blocks[4..=5], WhatToDelete::Blocks, &bob_node, Some(true)); + sync::delete_some_blocks_and_headers(&blocks[4..=5], WhatToDelete::Blocks, &bob_node); + assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 4); + assert_eq!(bob_node.blockchain_db.fetch_last_header().unwrap().height, 7); assert!( header_sync.clone().into_sync_peers()[0] .claimed_chain_metadata() @@ -302,8 +304,7 @@ async fn test_header_sync_even_headers_and_blocks_peer_lies_about_pow_with_ban() // Alice attempts header sync, but Bob will not supply any blocks let mut header_sync = sync::initialize_sync_headers_with_ping_pong_data(&alice_node, &bob_node); // Remove blocks and headers from Bpb's chain so his claimed metadata is better than what it actually is - sync::delete_some_blocks_and_headers(&blocks[3..=6], WhatToDelete::Blocks, &bob_node, Some(true)); - sync::delete_some_blocks_and_headers(&blocks[3..=6], WhatToDelete::Headers, &bob_node, None); + sync::delete_some_blocks_and_headers(&blocks[3..=6], WhatToDelete::BlocksAndHeaders, &bob_node); assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 3); assert_eq!(bob_node.blockchain_db.fetch_last_header().unwrap().height, 3); assert!( @@ -349,7 +350,7 @@ async fn test_header_sync_even_headers_and_blocks_peer_metadata_improve_with_reo // Alice attempts header sync, but Bob's ping-pong data will be outdated when header sync is executed let mut header_sync = sync::initialize_sync_headers_with_ping_pong_data(&alice_node, &bob_node); // Bob's chain will reorg with improved metadata - sync::delete_some_blocks_and_headers(&blocks[4..=6], WhatToDelete::Blocks, &bob_node, Some(true)); + sync::delete_some_blocks_and_headers(&blocks[4..=6], WhatToDelete::Blocks, &bob_node); let _blocks = sync::create_and_add_some_blocks(&bob_node, &blocks[4], 3, &consensus_manager, &key_manager, &[3; 3]).await; assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 7); diff --git a/base_layer/core/tests/tests/mod.rs b/base_layer/core/tests/tests/mod.rs index 42f811db0f..e36b646680 100644 --- a/base_layer/core/tests/tests/mod.rs +++ b/base_layer/core/tests/tests/mod.rs @@ -24,6 +24,7 @@ use tari_core::{blocks::ChainBlock, chain_storage::BlockAddResult}; mod async_db; mod base_node_rpc; +mod block_sync; mod block_validation; mod header_sync; mod mempool; diff --git a/base_layer/core/tests/tests/node_service.rs b/base_layer/core/tests/tests/node_service.rs index ed483e5f75..435c0fb758 100644 --- a/base_layer/core/tests/tests/node_service.rs +++ b/base_layer/core/tests/tests/node_service.rs @@ -459,9 +459,18 @@ async fn propagate_and_forward_invalid_block() { } assert!(has_banned); - assert!(!bob_node.blockchain_db.block_exists(*block1_hash).unwrap()); - assert!(!carol_node.blockchain_db.block_exists(*block1_hash).unwrap()); - assert!(!dan_node.blockchain_db.block_exists(*block1_hash).unwrap()); + assert!(!bob_node + .blockchain_db + .chain_block_or_orphan_block_exists(*block1_hash) + .unwrap()); + assert!(!carol_node + .blockchain_db + .chain_block_or_orphan_block_exists(*block1_hash) + .unwrap()); + assert!(!dan_node + .blockchain_db + .chain_block_or_orphan_block_exists(*block1_hash) + .unwrap()); alice_node.shutdown().await; bob_node.shutdown().await;