Skip to content

Commit

Permalink
added http status server and 2 basic stats to the new endpoint, hash …
Browse files Browse the repository at this point in the history
…rate calculation in progress
  • Loading branch information
ksrichard committed Jul 29, 2024
1 parent 1830604 commit 92d84d6
Show file tree
Hide file tree
Showing 18 changed files with 282 additions and 82 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ rand = "0.8.0"
dirs = "4.0.0"
log4rs = "1.3.0"
axum = "0.7.5"
itertools = "0.13.0"

[package.metadata.cargo-machete]
ignored = ["log4rs"]
19 changes: 14 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use clap::{
builder::{Styles, styling::AnsiColor},
Parser,
};
use log::LevelFilter;
use tari_common::initialize_logging;

use crate::sharechain::in_memory::InMemoryShareChain;
Expand All @@ -31,9 +30,7 @@ fn cli_styles() -> Styles {
#[command(styles = cli_styles())]
#[command(about = "⛏ Decentralized mining pool for Tari network ⛏", long_about = None)]
struct Cli {
/// Log level
#[arg(short, long, value_name = "log-level", default_value = Some("info"))]
log_level: LevelFilter,
base_dir: Option<PathBuf>,

/// (Optional) gRPC port to use.
#[arg(short, long, value_name = "grpc-port")]
Expand All @@ -43,6 +40,10 @@ struct Cli {
#[arg(short, long, value_name = "p2p-port")]
p2p_port: Option<u16>,

/// (Optional) stats server port to use.
#[arg(long, value_name = "stats-server-port")]
stats_server_port: Option<u16>,

/// (Optional) seed peers.
/// Any amount of seed peers can be added to join a p2pool network.
///
Expand Down Expand Up @@ -84,7 +85,11 @@ struct Cli {
#[arg(long, value_name = "mdns-disabled", default_value_t = false)]
mdns_disabled: bool,

base_dir: Option<PathBuf>,
/// Stats server disabled
///
/// If set, local stats HTTP server is disabled.
#[arg(long, value_name = "stats-server-disabled", default_value_t = false)]
stats_server_disabled: bool,
}

impl Cli {
Expand Down Expand Up @@ -123,6 +128,10 @@ async fn main() -> anyhow::Result<()> {
config_builder.with_private_key_folder(cli.private_key_folder.clone());
config_builder.with_mining_enabled(!cli.mining_disabled);
config_builder.with_mdns_enabled(!cli.mdns_disabled);
config_builder.with_stats_server_enabled(!cli.stats_server_disabled);
if let Some(stats_server_port) = cli.stats_server_port {
config_builder.with_stats_server_port(stats_server_port);
}

// server start
let config = config_builder.build();
Expand Down
13 changes: 13 additions & 0 deletions src/server/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::{path::PathBuf, time::Duration};

use crate::server::{p2p, p2p::peer_store::PeerStoreConfig};
use crate::server::http::stats;

/// Config is the server configuration struct.
#[derive(Clone)]
Expand All @@ -15,6 +16,7 @@ pub struct Config {
pub peer_store: PeerStoreConfig,
pub p2p_service: p2p::Config,
pub mining_enabled: bool,
pub stats_server: stats::server::Config,
}

impl Default for Config {
Expand All @@ -27,6 +29,7 @@ impl Default for Config {
peer_store: PeerStoreConfig::default(),
p2p_service: p2p::Config::default(),
mining_enabled: true,
stats_server: stats::server::Config::default(),
}
}
}
Expand Down Expand Up @@ -95,6 +98,16 @@ impl ConfigBuilder {
self
}

pub fn with_stats_server_enabled(&mut self, config: bool) -> &mut Self {
self.config.stats_server.enabled = config;
self
}

pub fn with_stats_server_port(&mut self, config: u16) -> &mut Self {
self.config.stats_server.port = config;
self
}

pub fn build(&self) -> Config {
self.config.clone()
}
Expand Down
40 changes: 20 additions & 20 deletions src/server/grpc/base_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,66 +109,48 @@ impl TariBaseNodeGrpc {

#[tonic::async_trait]
impl tari_rpc::base_node_server::BaseNode for TariBaseNodeGrpc {
type FetchMatchingUtxosStream = mpsc::Receiver<Result<tari_rpc::FetchMatchingUtxosResponse, Status>>;
type GetActiveValidatorNodesStream = mpsc::Receiver<Result<tari_rpc::GetActiveValidatorNodesResponse, Status>>;
type GetBlocksStream = mpsc::Receiver<Result<HistoricalBlock, Status>>;
type GetMempoolTransactionsStream = mpsc::Receiver<Result<tari_rpc::GetMempoolTransactionsResponse, Status>>;
type GetNetworkDifficultyStream = mpsc::Receiver<Result<tari_rpc::NetworkDifficultyResponse, Status>>;
type GetPeersStream = mpsc::Receiver<Result<tari_rpc::GetPeersResponse, Status>>;
type GetSideChainUtxosStream = mpsc::Receiver<Result<tari_rpc::GetSideChainUtxosResponse, Status>>;
type GetTemplateRegistrationsStream = mpsc::Receiver<Result<tari_rpc::GetTemplateRegistrationResponse, Status>>;
type GetTokensInCirculationStream = mpsc::Receiver<Result<ValueAtHeightResponse, Status>>;
type ListHeadersStream = mpsc::Receiver<Result<BlockHeaderResponse, Status>>;
type SearchKernelsStream = mpsc::Receiver<Result<HistoricalBlock, Status>>;
type SearchUtxosStream = mpsc::Receiver<Result<HistoricalBlock, Status>>;

async fn list_headers(
&self,
request: Request<ListHeadersRequest>,
) -> Result<Response<Self::ListHeadersStream>, Status> {
proxy_stream_result!(self, list_headers, request, LIST_HEADERS_PAGE_SIZE)
}

async fn get_header_by_hash(
&self,
request: Request<GetHeaderByHashRequest>,
) -> Result<Response<BlockHeaderResponse>, Status> {
proxy_simple_result!(self, get_header_by_hash, request)
}

type GetBlocksStream = mpsc::Receiver<Result<HistoricalBlock, Status>>;
async fn get_blocks(&self, request: Request<GetBlocksRequest>) -> Result<Response<Self::GetBlocksStream>, Status> {
proxy_stream_result!(self, get_blocks, request, GET_BLOCKS_PAGE_SIZE)
}

async fn get_block_timing(&self, request: Request<HeightRequest>) -> Result<Response<BlockTimingResponse>, Status> {
proxy_simple_result!(self, get_block_timing, request)
}

async fn get_constants(&self, request: Request<BlockHeight>) -> Result<Response<ConsensusConstants>, Status> {
proxy_simple_result!(self, get_constants, request)
}

async fn get_block_size(
&self,
request: Request<BlockGroupRequest>,
) -> Result<Response<BlockGroupResponse>, Status> {
proxy_simple_result!(self, get_block_size, request)
}

async fn get_block_fees(
&self,
request: Request<BlockGroupRequest>,
) -> Result<Response<BlockGroupResponse>, Status> {
proxy_simple_result!(self, get_block_fees, request)
}

async fn get_version(&self, request: Request<Empty>) -> Result<Response<StringValue>, Status> {
proxy_simple_result!(self, get_version, request)
}

async fn check_for_updates(&self, request: Request<Empty>) -> Result<Response<SoftwareUpdate>, Status> {
proxy_simple_result!(self, check_for_updates, request)
}
type GetTokensInCirculationStream = mpsc::Receiver<Result<ValueAtHeightResponse, Status>>;

async fn get_tokens_in_circulation(
&self,
Expand All @@ -182,6 +164,8 @@ impl tari_rpc::base_node_server::BaseNode for TariBaseNodeGrpc {
)
}

type GetNetworkDifficultyStream = mpsc::Receiver<Result<tari_rpc::NetworkDifficultyResponse, Status>>;

async fn get_network_difficulty(
&self,
request: Request<HeightRequest>,
Expand Down Expand Up @@ -251,31 +235,41 @@ impl tari_rpc::base_node_server::BaseNode for TariBaseNodeGrpc {
proxy_simple_result!(self, get_tip_info, request)
}

type SearchKernelsStream = mpsc::Receiver<Result<HistoricalBlock, Status>>;

async fn search_kernels(
&self,
request: Request<SearchKernelsRequest>,
) -> Result<Response<Self::SearchKernelsStream>, Status> {
proxy_stream_result!(self, search_kernels, request, GET_BLOCKS_PAGE_SIZE)
}

type SearchUtxosStream = mpsc::Receiver<Result<HistoricalBlock, Status>>;

async fn search_utxos(
&self,
request: Request<SearchUtxosRequest>,
) -> Result<Response<Self::SearchUtxosStream>, Status> {
proxy_stream_result!(self, search_utxos, request, GET_BLOCKS_PAGE_SIZE)
}

type FetchMatchingUtxosStream = mpsc::Receiver<Result<tari_rpc::FetchMatchingUtxosResponse, Status>>;

async fn fetch_matching_utxos(
&self,
request: Request<FetchMatchingUtxosRequest>,
) -> Result<Response<Self::FetchMatchingUtxosStream>, Status> {
proxy_stream_result!(self, fetch_matching_utxos, request, GET_BLOCKS_PAGE_SIZE)
}

type GetPeersStream = mpsc::Receiver<Result<tari_rpc::GetPeersResponse, Status>>;

async fn get_peers(&self, request: Request<GetPeersRequest>) -> Result<Response<Self::GetPeersStream>, Status> {
proxy_stream_result!(self, get_peers, request, GET_BLOCKS_PAGE_SIZE)
}

type GetMempoolTransactionsStream = mpsc::Receiver<Result<tari_rpc::GetMempoolTransactionsResponse, Status>>;

async fn get_mempool_transactions(
&self,
request: Request<GetMempoolTransactionsRequest>,
Expand Down Expand Up @@ -309,6 +303,8 @@ impl tari_rpc::base_node_server::BaseNode for TariBaseNodeGrpc {
proxy_simple_result!(self, get_mempool_stats, request)
}

type GetActiveValidatorNodesStream = mpsc::Receiver<Result<tari_rpc::GetActiveValidatorNodesResponse, Status>>;

async fn get_active_validator_nodes(
&self,
request: Request<GetActiveValidatorNodesRequest>,
Expand All @@ -323,13 +319,17 @@ impl tari_rpc::base_node_server::BaseNode for TariBaseNodeGrpc {
proxy_simple_result!(self, get_shard_key, request)
}

type GetTemplateRegistrationsStream = mpsc::Receiver<Result<tari_rpc::GetTemplateRegistrationResponse, Status>>;

async fn get_template_registrations(
&self,
request: Request<GetTemplateRegistrationsRequest>,
) -> Result<Response<Self::GetTemplateRegistrationsStream>, Status> {
proxy_stream_result!(self, get_template_registrations, request, 10)
}

type GetSideChainUtxosStream = mpsc::Receiver<Result<tari_rpc::GetSideChainUtxosResponse, Status>>;

async fn get_side_chain_utxos(
&self,
request: Request<GetSideChainUtxosRequest>,
Expand Down
24 changes: 12 additions & 12 deletions src/server/grpc/p2pool.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use log::{debug, info, warn};
use minotari_app_grpc::tari_rpc::{
base_node_client::BaseNodeClient, GetNewBlockRequest, GetNewBlockResponse, GetNewBlockTemplateWithCoinbasesRequest,
HeightRequest, NewBlockTemplateRequest, pow_algo::PowAlgos, PowAlgo, sha_p2_pool_server::ShaP2Pool,
base_node_client::BaseNodeClient, pow_algo::PowAlgos, sha_p2_pool_server::ShaP2Pool, GetNewBlockRequest,
GetNewBlockResponse, GetNewBlockTemplateWithCoinbasesRequest, HeightRequest, NewBlockTemplateRequest, PowAlgo,
SubmitBlockRequest, SubmitBlockResponse,
};
use tari_core::proof_of_work::sha3x_difficulty;
Expand All @@ -19,15 +19,15 @@ use crate::{
grpc::{error::Error, util},
p2p,
},
sharechain::{block::Block, SHARE_COUNT, ShareChain},
sharechain::{block::Block, ShareChain, SHARE_COUNT},
};

const LOG_TARGET: &str = "p2pool::server::grpc::p2pool";

/// P2Pool specific gRPC service to provide `get_new_block` and `submit_block` functionalities.
pub struct ShaP2PoolGrpc<S>
where
S: ShareChain + Send + Sync + 'static,
where
S: ShareChain,
{
/// Base node client
client: Arc<Mutex<BaseNodeClient<tonic::transport::Channel>>>,
Expand All @@ -40,8 +40,8 @@ pub struct ShaP2PoolGrpc<S>
}

impl<S> ShaP2PoolGrpc<S>
where
S: ShareChain + Send + Sync + 'static,
where
S: ShareChain,
{
pub async fn new(
base_node_address: String,
Expand Down Expand Up @@ -76,8 +76,8 @@ impl<S> ShaP2PoolGrpc<S>

#[tonic::async_trait]
impl<S> ShaP2Pool for ShaP2PoolGrpc<S>
where
S: ShareChain + Send + Sync + 'static,
where
S: ShareChain,
{
/// Returns a new block (that can be mined) which contains all the shares generated
/// from the current share chain as coinbase transactions.
Expand Down Expand Up @@ -196,14 +196,14 @@ impl<S> ShaP2Pool for ShaP2PoolGrpc<S>
block.set_sent_to_main_chain(true);
self.submit_share_chain_block(&block).await?;
Ok(resp)
}
},
Err(_) => {
block.set_sent_to_main_chain(false);
self.submit_share_chain_block(&block).await?;
Ok(Response::new(SubmitBlockResponse {
block_hash: block.hash().to_vec(),
}))
}
},
}
}
}
2 changes: 1 addition & 1 deletion src/server/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

pub mod stats;
pub mod stats;
32 changes: 29 additions & 3 deletions src/server/http/stats/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,37 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use axum::extract::State;
use axum::http::StatusCode;
use axum::Json;
use itertools::Itertools;
use log::error;

use crate::server::http::stats::models::Stats;
use crate::server::http::stats::server::AppState;

pub async fn handle_get_stats() -> (StatusCode, Json<Stats>) {
todo!()
}
const LOG_TARGET: &str = "p2pool::server::stats::get";

pub async fn handle_get_stats(State(state): State<AppState>) -> Result<Json<Stats>, StatusCode> {
let chain = state.share_chain.blocks(0).await.map_err(|error| {
error!(target: LOG_TARGET, "Failed to get blocks of share chain: {error:?}");
StatusCode::INTERNAL_SERVER_ERROR
})?;

// collect number of miners
let num_of_miners = chain.iter()
.map(|block| block.miner_wallet_address())
.filter(|addr_opt| addr_opt.is_some())
.map(|addr| addr.as_ref().unwrap().to_base58())
.unique()
.count();

// last won block
let last_block_won = chain.iter()
.filter(|block| block.sent_to_main_chain())
.last()
.cloned()
.map(|block| block.into());

Ok(Json(Stats { num_of_miners, last_block_won }))
}
Loading

0 comments on commit 92d84d6

Please sign in to comment.