Skip to content

Commit

Permalink
fix: various cleanup and logs (#51)
Browse files Browse the repository at this point in the history
  • Loading branch information
stringhandler authored Sep 19, 2024
1 parent 38ee3bd commit d596c48
Show file tree
Hide file tree
Showing 14 changed files with 168 additions and 143 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ libp2p = { version = "0.53.2", features = [
"kad",
"relay",
"dcutr",
"autonat"
"autonat",
] }
tokio = { version = "1.38.0", features = ["full"] }
thiserror = "1.0"
Expand All @@ -56,6 +56,7 @@ serde_json = "1.0.122"
hickory-resolver = { version = "*", features = ["dns-over-rustls"] }
convert_case = "0.6.0"
lazy_static = "1.5.0"
num-format = "*"

[package.metadata.cargo-machete]
ignored = ["log4rs"]
2 changes: 1 addition & 1 deletion src/cli/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub struct StartArgs {
pub external_address: Option<String>,

/// (Optional) Address of the Tari base node.
#[arg(long, value_name = "base-node-address", default_value = "http://127.0.0.1:18142")]
#[arg(long, value_name = "base-node-address", default_value = "http://127.0.0.1:18182")]
pub base_node_address: String,

/// (Optional) seed peers.
Expand Down
5 changes: 5 additions & 0 deletions src/cli/commands/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
use std::{env, sync::Arc};

use libp2p::identity::Keypair;
use log::info;
use tari_common::{configuration::Network, initialize_logging};
use tari_core::{
consensus::ConsensusManager,
proof_of_work::{randomx_factory::RandomXFactory, PowAlgorithm},
};
use tari_shutdown::ShutdownSignal;
use tari_utilities::hex::Hex;

use crate::{
cli::args::{Cli, StartArgs},
Expand Down Expand Up @@ -89,6 +91,9 @@ pub async fn server(
let randomx_factory = RandomXFactory::new(1);
let consensus_manager = ConsensusManager::builder(Network::get_current_or_user_setting_or_default()).build()?;
let genesis_block_hash = *consensus_manager.get_genesis_block().hash();

info!(target: "p2pool::server", "Consensus manager initialized with network: {}, and genesis hash {}", Network::get_current_or_user_setting_or_default(),
genesis_block_hash.to_hex());
let block_validation_params = Arc::new(BlockValidationParams::new(
randomx_factory,
consensus_manager,
Expand Down
2 changes: 1 addition & 1 deletion src/server/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub struct Config {
impl Default for Config {
fn default() -> Self {
Self {
base_node_address: String::from("http://127.0.0.1:18142"),
base_node_address: String::from("http://127.0.0.1:18182"),
p2p_port: 0, // bind to any free port
grpc_port: 18145, // to possibly not collide with any other ports
idle_connection_timeout: Duration::from_secs(30),
Expand Down
12 changes: 6 additions & 6 deletions src/server/grpc/base_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ use minotari_app_grpc::{
},
};
use tari_shutdown::ShutdownSignal;
use tokio::sync::Mutex;
use tokio::sync::RwLock;
use tonic::{transport::Channel, Request, Response, Status, Streaming};

use crate::server::grpc::{error::Error, util};
Expand All @@ -72,7 +72,7 @@ const GET_DIFFICULTY_PAGE_SIZE: usize = 1_000;
#[macro_export]
macro_rules! proxy_simple_result {
($self:ident, $call:ident, $request:ident) => {
match $self.client.lock().await.$call($request.into_inner()).await {
match $self.client.write().await.$call($request.into_inner()).await {
Ok(resp) => Ok(resp),
Err(error) => {
error!("Error while calling {:?} on base node: {:?}", stringify!($call), error);
Expand All @@ -86,7 +86,7 @@ macro_rules! proxy_stream_result {
($self:ident, $call:ident, $request:ident, $page_size:ident) => {
streaming_response(
String::from(stringify!($call)),
$self.client.lock().await.$call($request.into_inner()).await,
$self.client.write().await.$call($request.into_inner()).await,
$page_size,
)
.await
Expand All @@ -95,7 +95,7 @@ macro_rules! proxy_stream_result {
($self:ident, $call:ident, $request:ident, $page_size:expr) => {
streaming_response(
String::from(stringify!($call)),
$self.client.lock().await.$call($request.into_inner()).await,
$self.client.write().await.$call($request.into_inner()).await,
$page_size,
)
.await
Expand Down Expand Up @@ -133,13 +133,13 @@ where
/// Base node gRPC service that proxies all the requests to base node when miner calls them.
/// This makes sure that any extra call towards base node is served.
pub struct TariBaseNodeGrpc {
client: Arc<Mutex<BaseNodeClient<Channel>>>,
client: Arc<RwLock<BaseNodeClient<Channel>>>,
}

impl TariBaseNodeGrpc {
pub async fn new(base_node_address: String, shutdown_signal: ShutdownSignal) -> Result<Self, Error> {
Ok(Self {
client: Arc::new(Mutex::new(
client: Arc::new(RwLock::new(
util::connect_base_node(base_node_address, shutdown_signal).await?,
)),
})
Expand Down
72 changes: 44 additions & 28 deletions src/server/grpc/p2pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use minotari_app_grpc::tari_rpc::{
SubmitBlockRequest,
SubmitBlockResponse,
};
use num_format::{Locale, ToFormattedString};
use tari_common::configuration::Network;
use tari_common_types::types::FixedHash;
use tari_core::{
Expand All @@ -24,7 +25,7 @@ use tari_core::{
};
use tari_shutdown::ShutdownSignal;
use tari_utilities::hex::Hex;
use tokio::sync::Mutex;
use tokio::sync::RwLock;
use tonic::{Request, Response, Status};

use crate::{
Expand Down Expand Up @@ -65,11 +66,11 @@ pub fn min_difficulty(pow: PowAlgorithm) -> Result<u64, Error> {
}

/// P2Pool specific gRPC service to provide `get_new_block` and `submit_block` functionalities.
pub struct ShaP2PoolGrpc<S>
pub(crate) struct ShaP2PoolGrpc<S>
where S: ShareChain
{
/// Base node client
client: Arc<Mutex<BaseNodeClient<tonic::transport::Channel>>>,
client: Arc<RwLock<BaseNodeClient<tonic::transport::Channel>>>,
/// P2P service client
p2p_client: p2p::ServiceClient,
/// SHA-3 share chain
Expand All @@ -80,8 +81,8 @@ where S: ShareChain
stats_store: Arc<StatsStore>,
/// Block validation params to be used when checking block difficulty.
block_validation_params: BlockValidationParams,
block_height_difficulty_cache: Arc<Mutex<HashMap<u64, u64>>>,
stats_max_difficulty_since_last_success: Arc<Mutex<u64>>,
block_height_difficulty_cache: Arc<RwLock<HashMap<u64, u64>>>,
stats_max_difficulty_since_last_success: Arc<RwLock<u64>>,
}

impl<S> ShaP2PoolGrpc<S>
Expand All @@ -99,7 +100,7 @@ where S: ShareChain
genesis_block_hash: FixedHash,
) -> Result<Self, Error> {
Ok(Self {
client: Arc::new(Mutex::new(
client: Arc::new(RwLock::new(
util::connect_base_node(base_node_address, shutdown_signal).await?,
)),
p2p_client,
Expand All @@ -111,14 +112,14 @@ where S: ShareChain
consensus_manager,
genesis_block_hash,
),
block_height_difficulty_cache: Arc::new(Mutex::new(HashMap::new())),
stats_max_difficulty_since_last_success: Arc::new(Mutex::new(0)),
block_height_difficulty_cache: Arc::new(RwLock::new(HashMap::new())),
stats_max_difficulty_since_last_success: Arc::new(RwLock::new(0)),
})
}

/// Submits a new block to share chain and broadcasts to the p2p network.
pub async fn submit_share_chain_block(&self, block: &Block) -> Result<(), Status> {
let pow_algo = block.original_block_header().pow.pow_algo;
let pow_algo = block.original_block_header.pow.pow_algo;
let share_chain = match pow_algo {
PowAlgorithm::RandomX => self.share_chain_random_x.clone(),
PowAlgorithm::Sha3x => self.share_chain_sha3x.clone(),
Expand All @@ -128,7 +129,7 @@ where S: ShareChain
self.stats_store
.inc(&algo_stat_key(pow_algo, MINER_STAT_ACCEPTED_BLOCKS_COUNT), 1)
.await;
info!(target: LOG_TARGET, "Broadcast new block: {:?}", block.hash().to_hex());
info!(target: LOG_TARGET, "Broadcast new block: {:?}", block.hash.to_hex());
self.p2p_client
.broadcast_block(block)
.await
Expand Down Expand Up @@ -172,7 +173,13 @@ where S: ShareChain
algo: Some(grpc_block_header_pow.clone()),
max_weight: 0,
};
let template_response = self.client.lock().await.get_new_block_template(req).await?.into_inner();
let template_response = self
.client
.write()
.await
.get_new_block_template(req)
.await?
.into_inner();
let miner_data = template_response
.miner_data
.ok_or_else(|| Status::internal("missing miner data"))?;
Expand All @@ -187,7 +194,7 @@ where S: ShareChain

let mut response = self
.client
.lock()
.write()
.await
.get_new_block_template_with_coinbases(GetNewBlockTemplateWithCoinbasesRequest {
algo: Some(grpc_block_header_pow),
Expand All @@ -206,7 +213,7 @@ where S: ShareChain
if let Some(header) = &response.block {
let height = header.header.as_ref().map(|h| h.height).unwrap_or(0);
self.block_height_difficulty_cache
.lock()
.write()
.await
.insert(height, miner_data.target_difficulty);
}
Expand Down Expand Up @@ -269,7 +276,7 @@ where S: ShareChain
.await
.map_err(|error| Status::internal(error.to_string()))?;

let origin_block_header = &block.original_block_header().clone();
let origin_block_header = &&block.original_block_header.clone();

// Check block's difficulty compared to the latest network one to increase the probability
// to get the block accepted (and also a block with lower difficulty than latest one is invalid anyway).
Expand All @@ -285,6 +292,7 @@ where S: ShareChain
)
.map_err(|error| Status::internal(error.to_string()))?,
};
info!("Submitted block difficulty: {}", request_block_difficulty);
// TODO: Cache this so that we don't ask each time. If we have a block we should not
// waste time before submitting it, or we might lose a share
// let mut network_difficulty_stream = self
Expand All @@ -307,23 +315,30 @@ where S: ShareChain
// network_difficulty_matches = true;
// }
// }
let network_difficulty_matches = match self
let network_difficulty = *self
.block_height_difficulty_cache
.lock()
.read()
.await
.get(&(origin_block_header.height))
{
Some(difficulty) => request_block_difficulty.as_u64() >= *difficulty,
None => false,
};
let mut max_difficulty = self.stats_max_difficulty_since_last_success.lock().await;
.unwrap_or(&0);
let network_difficulty_matches = request_block_difficulty.as_u64() >= network_difficulty;
let mut max_difficulty = self.stats_max_difficulty_since_last_success.write().await;
if *max_difficulty < request_block_difficulty.as_u64() {
*max_difficulty = request_block_difficulty.as_u64();
}
info!("Max difficulty: {}", max_difficulty);
info!(
"Max difficulty: {}. Network difficulty {}. Accepted {}",
max_difficulty.to_formatted_string(&Locale::en),
network_difficulty.to_formatted_string(&Locale::en),
self.stats_store
.get(&algo_stat_key(pow_algo, P2POOL_STAT_ACCEPTED_BLOCKS_COUNT))
.await
.to_formatted_string(&Locale::en)
);
block.achieved_difficulty = request_block_difficulty;

if !network_difficulty_matches {
block.set_sent_to_main_chain(false);
block.sent_to_main_chain = false;
// Don't error if we can't submit it.
match self.submit_share_chain_block(&block).await {
Ok(_) => {
Expand All @@ -335,7 +350,7 @@ where S: ShareChain
},
};
return Ok(Response::new(SubmitBlockResponse {
block_hash: block.hash().to_vec(),
block_hash: block.hash.to_vec(),
}));
}

Expand All @@ -344,16 +359,17 @@ where S: ShareChain
info!("🔗 Submitting block {} to base node...", origin_block_header.hash());

let grpc_request = Request::from_parts(metadata, extensions, grpc_request_payload);
match self.client.lock().await.submit_block(grpc_request).await {
match self.client.write().await.submit_block(grpc_request).await {
Ok(resp) => {
*max_difficulty = 0;
self.stats_store
.inc(&algo_stat_key(pow_algo, P2POOL_STAT_ACCEPTED_BLOCKS_COUNT), 1)
.await;
info!(
"💰 New matching block found and sent to network! Block hash: {}",
origin_block_header.hash()
);
block.set_sent_to_main_chain(true);
block.sent_to_main_chain = true;
self.submit_share_chain_block(&block).await?;
Ok(resp)
},
Expand All @@ -365,10 +381,10 @@ where S: ShareChain
self.stats_store
.inc(&algo_stat_key(pow_algo, P2POOL_STAT_REJECTED_BLOCKS_COUNT), 1)
.await;
block.set_sent_to_main_chain(false);
block.sent_to_main_chain = false;
self.submit_share_chain_block(&block).await?;
Ok(Response::new(SubmitBlockResponse {
block_hash: block.hash().to_vec(),
block_hash: block.hash.to_vec(),
}))
},
}
Expand Down
1 change: 1 addition & 0 deletions src/server/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ where S: ShareChain
.route("/miners", get(handlers::handle_miners_with_shares))
.route("/health", get(health::handle_health))
.route("/version", get(version::handle_version))
.route("/chain", get(handlers::handle_chain))
.with_state(AppState {
share_chain_sha3x: self.share_chain_sha3x.clone(),
share_chain_random_x: self.share_chain_random_x.clone(),
Expand Down
Loading

0 comments on commit d596c48

Please sign in to comment.