From 32ba3d99f90015e6663fa061b23bbf90dbe3b0ea Mon Sep 17 00:00:00 2001 From: Hansie Odendaal Date: Thu, 5 Dec 2024 17:41:17 +0200 Subject: [PATCH 1/3] Base node monitoring tool --- .../minotari_node/src/commands/command/mod.rs | 5 + .../commands/command/test_peer_liveness.rs | 215 ++++++++++++++++++ .../core/src/transactions/aggregated_body.rs | 8 +- 3 files changed, 226 insertions(+), 2 deletions(-) create mode 100644 applications/minotari_node/src/commands/command/test_peer_liveness.rs diff --git a/applications/minotari_node/src/commands/command/mod.rs b/applications/minotari_node/src/commands/command/mod.rs index fa11545b02..032f6fdd22 100644 --- a/applications/minotari_node/src/commands/command/mod.rs +++ b/applications/minotari_node/src/commands/command/mod.rs @@ -51,6 +51,7 @@ mod rewind_blockchain; mod search_kernel; mod search_utxo; mod status; +mod test_peer_liveness; mod unban_all_peers; mod version; mod watch_command; @@ -118,6 +119,7 @@ pub enum Command { ResetOfflinePeers(reset_offline_peers::Args), RewindBlockchain(rewind_blockchain::Args), AddPeer(add_peer::ArgsAddPeer), + TestPeerLiveness(test_peer_liveness::ArgsTestPeerLiveness), BanPeer(ban_peer::ArgsBan), UnbanPeer(ban_peer::ArgsUnban), UnbanAllPeers(unban_all_peers::Args), @@ -239,6 +241,8 @@ impl CommandContext { Command::CreateTlsCerts(_) | Command::Quit(_) | Command::Exit(_) => 30, + // This test can potentially take a longer time and should be allowed to run longer + Command::TestPeerLiveness(_) => 240, // These commands involve intense blockchain db operations and needs a lot of time to complete Command::CheckDb(_) | Command::PeriodStats(_) | Command::RewindBlockchain(_) => 600, }; @@ -272,6 +276,7 @@ impl HandleCommand for CommandContext { Command::GetChainMetadata(args) => self.handle_command(args).await, Command::GetDbStats(args) => self.handle_command(args).await, Command::GetPeer(args) => self.handle_command(args).await, + Command::TestPeerLiveness(args) => self.handle_command(args).await, Command::GetStateInfo(args) => self.handle_command(args).await, Command::GetNetworkStats(args) => self.handle_command(args).await, Command::ListPeers(args) => self.handle_command(args).await, diff --git a/applications/minotari_node/src/commands/command/test_peer_liveness.rs b/applications/minotari_node/src/commands/command/test_peer_liveness.rs new file mode 100644 index 0000000000..d64545f660 --- /dev/null +++ b/applications/minotari_node/src/commands/command/test_peer_liveness.rs @@ -0,0 +1,215 @@ +// Copyright 2022, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::{fs, fs::OpenOptions, io::Write, path::PathBuf, process, time::Instant}; + +use anyhow::Error; +use async_trait::async_trait; +use chrono::Local; +use clap::Parser; +use minotari_app_utilities::utilities::UniPublicKey; +use tari_comms::{ + multiaddr::Multiaddr, + net_address::{MultiaddressesWithStats, PeerAddressSource}, + peer_manager::{NodeId, Peer, PeerFeatures, PeerFlags}, +}; +use tari_p2p::services::liveness::LivenessEvent; +use tokio::{sync::watch, task}; + +use super::{CommandContext, HandleCommand}; + +/// Adds a peer +#[derive(Debug, Parser)] +pub struct ArgsTestPeerLiveness { + /// The public key of the peer to be tested + public_key: UniPublicKey, + /// The address of the peer to be tested + address: Multiaddr, + /// Auto exit the base node after test + exit: Option, + /// Write the responsiveness result to file - results will be written to + /// 'peer_liveness_test.log' + output_to_file: Option, + /// Start with a new log file + refresh_file: Option, + /// Optional output directory (otherwise current directory will be used) + output_directory: Option, +} + +#[derive(Debug, Eq, PartialEq, Clone, Copy)] +enum PingResult { + Initial, + Success, + Fail, +} + +#[async_trait] +impl HandleCommand for CommandContext { + async fn handle_command(&mut self, args: ArgsTestPeerLiveness) -> Result<(), Error> { + println!("\nTesting peer liveness...\n"); + let peer_manager = self.comms.peer_manager(); + + let public_key = args.public_key.into(); + if *self.comms.node_identity().public_key() == public_key { + return Err(Error::msg("Self liveness test not supported")); + } + let node_id = NodeId::from_public_key(&public_key); + let node_id_clone = node_id.clone(); + let public_key_clone = public_key.clone(); + + // Remove the peer from the peer manager (not the peer db) + let _res = peer_manager.delete_peer(&node_id).await; + + // Create a new peer with the given address, if the peer exists, this will merge the given address + let peer = Peer::new( + public_key.clone(), + node_id.clone(), + MultiaddressesWithStats::from_addresses_with_source(vec![args.address], &PeerAddressSource::Config), + PeerFlags::empty(), + PeerFeatures::COMMUNICATION_NODE, + vec![], + String::new(), + ); + peer_manager.add_peer(peer).await?; + + let (tx, mut rx) = watch::channel(PingResult::Initial); + + // Attempt to dial and ping the peer + let start = Instant::now(); + for _ in 0..5 { + if self.dial_peer(node_id.clone()).await.is_ok() { + println!("🏓 Peer ({}, {}) dialed successfully", node_id, public_key); + let mut liveness_events = self.liveness.get_event_stream(); + let mut liveness = self.liveness.clone(); + task::spawn(async move { + if let Ok(nonce) = liveness.send_ping(node_id.clone()).await { + println!("🏓 Pinging peer ({}, {}) with nonce {} ...", node_id, public_key, nonce); + for _ in 0..5 { + match liveness_events.recv().await { + Ok(event) => { + if let LivenessEvent::ReceivedPong(pong) = &*event { + if pong.node_id == node_id && pong.nonce == nonce { + println!( + "🏓️ Pong: peer ({}, {}) responded with nonce {}, round-trip-time is \ + {:.2?}!", + pong.node_id, + public_key, + pong.nonce, + pong.latency.unwrap_or_default() + ); + let _ = tx.send(PingResult::Success); + return; + } + } + }, + Err(e) => { + println!("🏓 Ping peer ({}, {}) gave error: {}", node_id, public_key, e); + }, + } + } + let _ = tx.send(PingResult::Fail); + } + }); + // Break if the dial was successful + break; + } else { + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } + } + + // Wait for the liveness test to complete + loop { + tokio::select! { + _ = rx.changed() => { + let test_time = start.elapsed(); + let responsive = *rx.borrow(); + println!("\nWhen rx.changed(): {:?}\n", responsive); + if responsive == PingResult::Success { + println!("✅ Peer ({}, {}) is responsive", node_id_clone, public_key_clone); + } else { + println!("❌ Peer ({}, {}) is unresponsive", node_id_clone, public_key_clone); + } + + if let Some(true) = args.output_to_file { + let test_result = if responsive == PingResult::Success { "PASS" } else { "FAIL" }; + let now = Local::now(); + let date_time = now.format("%Y-%m-%d %H:%M:%S").to_string(); + + let file_name = "peer_liveness_test.csv"; + let file_path = if let Some(path) = args.output_directory.clone() { + if let Ok(true) = fs::exists(&path) { + path.join(file_name) + } else if fs::create_dir_all(&path).is_ok() { + path.join(file_name) + } else { + PathBuf::from(file_name) + } + } else { + PathBuf::from(file_name) + }; + + if let Some(true) = args.refresh_file { + let _ = fs::remove_file(&file_path); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + let write_header = !file_path.exists(); + if let Ok(mut file) = OpenOptions::new().append(true).create(true).open(file_path.clone()) { + let mut file_content = String::new(); + if write_header { + file_content.push_str("Public Key,Date Time,Result\n"); + } + file_content.push_str( + &format!("{},{},{},{:.2?}\n",date_time, public_key_clone, test_result, test_time) + ); + match writeln!(file, "{}", file_content) { + Ok(_) => { + println!("📝 Test result written to file: {}", file_path.display()); + }, + Err(e) => { + println!("❌ Error writing test result to file: {}", e); + }, + } + } + + } + println!(); + + if let Some(true) = args.exit { + println!("The liveness test is complete and base node will now exit\n"); + self.shutdown.trigger(); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + match responsive { + PingResult::Success => process::exit(0), + _ => process::exit(1), + } + } + + break; + }, + + _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}, + } + } + + Ok(()) + } +} diff --git a/base_layer/core/src/transactions/aggregated_body.rs b/base_layer/core/src/transactions/aggregated_body.rs index c6c31498b0..838d2e0a3a 100644 --- a/base_layer/core/src/transactions/aggregated_body.rs +++ b/base_layer/core/src/transactions/aggregated_body.rs @@ -19,16 +19,20 @@ // SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#[cfg(feature = "base_node")] +use std::convert::TryFrom; use std::{ cmp::max, - convert::TryFrom, fmt::{Display, Error, Formatter}, }; use borsh::{BorshDeserialize, BorshSerialize}; use log::*; use serde::{Deserialize, Serialize}; -use tari_common_types::types::{ComAndPubSignature, Commitment, FixedHash, PrivateKey}; +#[cfg(feature = "base_node")] +use tari_common_types::types::FixedHash; +use tari_common_types::types::{ComAndPubSignature, Commitment, PrivateKey}; use tari_crypto::commitment::HomomorphicCommitmentFactory; #[cfg(feature = "base_node")] use tari_mmr::pruned_hashset::PrunedHashSet; From 2b01340e302f889add611b91c6c43c5443481d28 Mon Sep 17 00:00:00 2001 From: Hansie Odendaal Date: Mon, 9 Dec 2024 08:10:19 +0200 Subject: [PATCH 2/3] clippy --- .../minotari_node/src/commands/command/test_peer_liveness.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/applications/minotari_node/src/commands/command/test_peer_liveness.rs b/applications/minotari_node/src/commands/command/test_peer_liveness.rs index d64545f660..a9029c7b8e 100644 --- a/applications/minotari_node/src/commands/command/test_peer_liveness.rs +++ b/applications/minotari_node/src/commands/command/test_peer_liveness.rs @@ -62,6 +62,7 @@ enum PingResult { Fail, } +#[allow(clippy::too_many_lines)] #[async_trait] impl HandleCommand for CommandContext { async fn handle_command(&mut self, args: ArgsTestPeerLiveness) -> Result<(), Error> { @@ -168,7 +169,7 @@ impl HandleCommand for CommandContext { }; if let Some(true) = args.refresh_file { - let _ = fs::remove_file(&file_path); + let _unused = fs::remove_file(&file_path); tokio::time::sleep(std::time::Duration::from_secs(1)).await; } let write_header = !file_path.exists(); @@ -178,7 +179,7 @@ impl HandleCommand for CommandContext { file_content.push_str("Public Key,Date Time,Result\n"); } file_content.push_str( - &format!("{},{},{},{:.2?}\n",date_time, public_key_clone, test_result, test_time) + &format!("{},{},{},{:.2?}",date_time, public_key_clone, test_result, test_time) ); match writeln!(file, "{}", file_content) { Ok(_) => { From b12eaea07aaa1c4ee333c4618aa66acbcb920e76 Mon Sep 17 00:00:00 2001 From: Hansie Odendaal Date: Mon, 9 Dec 2024 09:55:22 +0200 Subject: [PATCH 3/3] review comments --- .../commands/command/test_peer_liveness.rs | 116 +++++++++++------- 1 file changed, 73 insertions(+), 43 deletions(-) diff --git a/applications/minotari_node/src/commands/command/test_peer_liveness.rs b/applications/minotari_node/src/commands/command/test_peer_liveness.rs index a9029c7b8e..d3bdf7f622 100644 --- a/applications/minotari_node/src/commands/command/test_peer_liveness.rs +++ b/applications/minotari_node/src/commands/command/test_peer_liveness.rs @@ -20,13 +20,21 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{fs, fs::OpenOptions, io::Write, path::PathBuf, process, time::Instant}; +use std::{ + fs, + fs::OpenOptions, + io::Write, + path::PathBuf, + process, + time::{Duration, Instant}, +}; use anyhow::Error; use async_trait::async_trait; use chrono::Local; use clap::Parser; use minotari_app_utilities::utilities::UniPublicKey; +use tari_common_types::types::PublicKey; use tari_comms::{ multiaddr::Multiaddr, net_address::{MultiaddressesWithStats, PeerAddressSource}, @@ -62,7 +70,6 @@ enum PingResult { Fail, } -#[allow(clippy::too_many_lines)] #[async_trait] impl HandleCommand for CommandContext { async fn handle_command(&mut self, args: ArgsTestPeerLiveness) -> Result<(), Error> { @@ -76,6 +83,7 @@ impl HandleCommand for CommandContext { let node_id = NodeId::from_public_key(&public_key); let node_id_clone = node_id.clone(); let public_key_clone = public_key.clone(); + let address_clone = args.address.clone(); // Remove the peer from the peer manager (not the peer db) let _res = peer_manager.delete_peer(&node_id).await; @@ -141,7 +149,7 @@ impl HandleCommand for CommandContext { loop { tokio::select! { _ = rx.changed() => { - let test_time = start.elapsed(); + let test_duration = start.elapsed(); let responsive = *rx.borrow(); println!("\nWhen rx.changed(): {:?}\n", responsive); if responsive == PingResult::Success { @@ -151,46 +159,14 @@ impl HandleCommand for CommandContext { } if let Some(true) = args.output_to_file { - let test_result = if responsive == PingResult::Success { "PASS" } else { "FAIL" }; - let now = Local::now(); - let date_time = now.format("%Y-%m-%d %H:%M:%S").to_string(); - - let file_name = "peer_liveness_test.csv"; - let file_path = if let Some(path) = args.output_directory.clone() { - if let Ok(true) = fs::exists(&path) { - path.join(file_name) - } else if fs::create_dir_all(&path).is_ok() { - path.join(file_name) - } else { - PathBuf::from(file_name) - } - } else { - PathBuf::from(file_name) - }; - - if let Some(true) = args.refresh_file { - let _unused = fs::remove_file(&file_path); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - let write_header = !file_path.exists(); - if let Ok(mut file) = OpenOptions::new().append(true).create(true).open(file_path.clone()) { - let mut file_content = String::new(); - if write_header { - file_content.push_str("Public Key,Date Time,Result\n"); - } - file_content.push_str( - &format!("{},{},{},{:.2?}",date_time, public_key_clone, test_result, test_time) - ); - match writeln!(file, "{}", file_content) { - Ok(_) => { - println!("📝 Test result written to file: {}", file_path.display()); - }, - Err(e) => { - println!("❌ Error writing test result to file: {}", e); - }, - } - } - + print_to_file( + responsive, + args.output_directory, + args.refresh_file, + public_key_clone, + address_clone, + test_duration + ).await; } println!(); @@ -214,3 +190,57 @@ impl HandleCommand for CommandContext { Ok(()) } } + +async fn print_to_file( + responsive: PingResult, + output_directory: Option, + refresh_file: Option, + public_key: PublicKey, + address: Multiaddr, + test_duration: Duration, +) { + let test_result = if responsive == PingResult::Success { + "PASS" + } else { + "FAIL" + }; + let now = Local::now(); + let date_time = now.format("%Y-%m-%d %H:%M:%S").to_string(); + + let file_name = "peer_liveness_test.csv"; + let file_path = if let Some(path) = output_directory.clone() { + if let Ok(true) = fs::exists(&path) { + path.join(file_name) + } else if fs::create_dir_all(&path).is_ok() { + path.join(file_name) + } else { + PathBuf::from(file_name) + } + } else { + PathBuf::from(file_name) + }; + + if let Some(true) = refresh_file { + let _unused = fs::remove_file(&file_path); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + let write_header = !file_path.exists(); + if let Ok(mut file) = OpenOptions::new().append(true).create(true).open(file_path.clone()) { + let mut file_content = String::new(); + if write_header { + file_content.push_str("Date Time,Public Key,Address,Result,Test Duration\n"); + } + file_content.push_str(&format!( + "{},{},{},{},{:.2?}", + date_time, public_key, address, test_result, test_duration + )); + match writeln!(file, "{}", file_content) { + Ok(_) => { + println!("📝 Test result written to file: {}", file_path.display()); + }, + Err(e) => { + println!("❌ Error writing test result to file: {}", e); + }, + } + } +}