Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/development' into st-create-rand…
Browse files Browse the repository at this point in the history
…om-squads
  • Loading branch information
stringhandler committed Jan 9, 2025
2 parents d2fcb01 + 0cece29 commit 2384ff1
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 114 deletions.
1 change: 0 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::{
fs::File,
io::Write,
panic,
process,
time::{SystemTime, UNIX_EPOCH},
};

Expand Down
6 changes: 1 addition & 5 deletions src/server/grpc/p2pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,7 @@ where S: ShareChain
.collect();
new_blocks.append(&mut uncles);
if new_tip.new_tip.is_some() {
let total_pow = share_chain.get_total_chain_pow().await;
let notify = NotifyNewTipBlock::new(self.local_peer_id, new_blocks, total_pow);
let notify = NotifyNewTipBlock::new(self.local_peer_id, new_blocks);
let res = self
.p2p_client
.broadcast_block(notify)
Expand Down Expand Up @@ -235,14 +234,11 @@ where S: ShareChain
.await
.map_err(|error| Status::internal(format!("failed to get new tip block {error:?}")))?)
.clone();
// dbg!(&new_tip_block.height, &new_tip_block.hash);
let shares = share_chain
.generate_shares(&new_tip_block, !synced_status)
.await
.map_err(|error| Status::internal(format!("failed to generate shares {error:?}")))?;

// dbg!(&shares);

let mut response = self
.client
.write()
Expand Down
14 changes: 9 additions & 5 deletions src/server/p2p/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,6 @@ pub struct NotifyNewTipBlock {
pub version: u64,
peer_id: PeerId,
pub new_blocks: Vec<P2Block>,
pub total_accumulated_difficulty: u128,
pub timestamp: u64,
}

Expand All @@ -238,7 +237,6 @@ impl Display for NotifyNewTipBlock {
writeln!(f, "--------- new tip notify Block -----------------")?;
writeln!(f, "version: {}", self.version)?;
writeln!(f, "from: {}", self.peer_id.to_base58())?;
writeln!(f, "total_accumulated_difficulty: {}", self.total_accumulated_difficulty)?;
writeln!(f, "timestamp: {}", self.timestamp)?;
writeln!(f, "--------- p2pblocks -----------------")?;
for block in &self.new_blocks {
Expand All @@ -251,14 +249,20 @@ impl Display for NotifyNewTipBlock {
impl_conversions!(NotifyNewTipBlock);

impl NotifyNewTipBlock {
pub fn new(peer_id: PeerId, new_blocks: Vec<P2Block>, total_acculumted_difficulty: AccumulatedDifficulty) -> Self {
let total_acculumted_difficulty = total_acculumted_difficulty.as_u128();
pub fn total_proof_of_work(&self) -> AccumulatedDifficulty {
let max_block = self.new_blocks.iter().max_by_key(|x| x.height);
match max_block {
Some(block) => block.total_pow(),
None => AccumulatedDifficulty::min(),
}
}

pub fn new(peer_id: PeerId, new_blocks: Vec<P2Block>) -> Self {
let timestamp = EpochTime::now().as_u64();
Self {
version: PROTOCOL_VERSION,
peer_id,
new_blocks,
total_accumulated_difficulty: total_acculumted_difficulty,
timestamp,
}
}
Expand Down
7 changes: 1 addition & 6 deletions src/server/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use hickory_resolver::{
use itertools::Itertools;
use libp2p::{
autonat::{self, NatStatus, OutboundProbeEvent},
connection_limits::{self},
dcutr,
futures::StreamExt,
gossipsub::{self, IdentTopic, Message, MessageAcceptance, PublishError},
Expand Down Expand Up @@ -629,7 +628,7 @@ where S: ShareChain

let our_tip = share_chain.tip_height().await.unwrap_or(0);
let our_pow = share_chain.get_total_chain_pow().await;
if payload.total_accumulated_difficulty < our_pow.as_u128() &&
if payload.total_proof_of_work() < our_pow &&
payload
.new_blocks
.iter()
Expand Down Expand Up @@ -1980,10 +1979,8 @@ where S: ShareChain
let mut lock = self.relay_store.write().await;
// TODO: Do relays expire?
// if lock.has_active_relay() {
// // dbg!("Already have an active relay");
// return;
// }
// dbg!("No, select a relay");
lock.select_random_relay();
if let Some(relay) = lock.selected_relay_mut() {
let mut addresses = relay.addresses.clone();
Expand Down Expand Up @@ -2273,7 +2270,6 @@ where S: ShareChain

let mut peers_to_dial = vec![];
for record in store_write_lock.best_peers_to_dial(100) {
// dbg!(&record.peer_id);
// Only dial seed peers if we have 0 connections
if !self.swarm.is_connected(&record.peer_id)
&& !store_write_lock.is_seed_peer(&record.peer_id) {
Expand Down Expand Up @@ -2304,7 +2300,6 @@ where S: ShareChain
}
drop(store_write_lock);
// for peer in peers_to_dial {
// dbg!("trying");
// self.initiate_direct_peer_exchange(&peer).await;
// }
}
Expand Down
1 change: 0 additions & 1 deletion src/server/p2p/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use blake2::Blake2b;
use digest::{consts::U32, generic_array::GenericArray, Digest};
use libp2p::{
autonat::{self},
connection_limits::{self, ConnectionLimits},
dcutr,
gossipsub::{self, Message, MessageId},
identify,
Expand Down
89 changes: 25 additions & 64 deletions src/sharechain/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl InMemoryShareChain {
info!(target: LOG_TARGET, "Found old block cache file, renaming from {:?} to {:?}", data_path.as_path(), &bkp_file);

// First remove the old backup file
let _ = fs::remove_dir_all(bkp_file.as_path())
let _unused = fs::remove_dir_all(bkp_file.as_path())
.inspect_err(|e| error!(target: LOG_TARGET, "Could not remove old block cache file:{:?}", e));
fs::create_dir_all(bkp_file.parent().unwrap())
.map_err(|e| anyhow::anyhow!("Could not create block cache backup directory:{:?}", e))?;
Expand Down Expand Up @@ -313,7 +313,6 @@ impl InMemoryShareChain {
}
}
let mut miners_to_shares = HashMap::new();

let tip_level = match p2_chain.get_tip() {
Some(tip_level) => tip_level,
None => return Ok(miners_to_shares),
Expand Down Expand Up @@ -876,16 +875,28 @@ pub mod test {
let consensus_manager = ConsensusManager::builder(Network::LocalNet).build().unwrap();
let coinbase_extras = Arc::new(RwLock::new(HashMap::<String, Vec<u8>>::new()));
let (stats_tx, _) = tokio::sync::broadcast::channel(1000);
let stats_broadcast_client = StatsBroadcastClient::new(stats_tx);
InMemoryShareChain::new(
Config::default(),
PowAlgorithm::Sha3x,
None,
let stat_client = StatsBroadcastClient::new(stats_tx);
let config = Config::default();
let pow_algo = PowAlgorithm::Sha3x;

let block_cache = LmdbBlockStorage::new_from_temp_dir();
let p2chain = P2Chain::new_empty(
pow_algo,
config.share_window * 2,
config.share_window,
config.block_time,
block_cache,
);

InMemoryShareChain {
p2_chain: Arc::new(RwLock::new(p2chain)),
pow_algo,
block_validation_params: None,
consensus_manager,
coinbase_extras,
stats_broadcast_client,
)
.unwrap()
stat_client,
config,
}
}

pub fn new_random_address() -> TariAddress {
Expand All @@ -897,19 +908,7 @@ pub mod test {

#[tokio::test]
async fn equal_shares() {
let consensus_manager = ConsensusManager::builder(Network::LocalNet).build().unwrap();
let coinbase_extras = Arc::new(RwLock::new(HashMap::<String, Vec<u8>>::new()));
let (stats_tx, _) = tokio::sync::broadcast::channel(1000);
let stats_broadcast_client = StatsBroadcastClient::new(stats_tx);
let share_chain = InMemoryShareChain::new(
Config::default(),
PowAlgorithm::Sha3x,
None,
consensus_manager,
coinbase_extras,
stats_broadcast_client,
)
.unwrap();
let share_chain = new_chain();

let mut timestamp = EpochTime::now();
let mut prev_block = None;
Expand All @@ -929,7 +928,6 @@ pub mod test {
.unwrap();

prev_block = Some(block.clone());

share_chain.submit_block(block).await.unwrap();
}

Expand All @@ -946,20 +944,8 @@ pub mod test {

#[tokio::test]
async fn equal_share_same_participants() {
let consensus_manager = ConsensusManager::builder(Network::LocalNet).build().unwrap();
let coinbase_extras = Arc::new(RwLock::new(HashMap::<String, Vec<u8>>::new()));
let (stats_tx, _) = tokio::sync::broadcast::channel(1000);
let static_coinbase_extra = Vec::new();
let stats_broadcast_client = StatsBroadcastClient::new(stats_tx);
let share_chain = InMemoryShareChain::new(
Config::default(),
PowAlgorithm::Sha3x,
None,
consensus_manager,
coinbase_extras,
stats_broadcast_client,
)
.unwrap();
let share_chain = new_chain();

let mut timestamp = EpochTime::now();
let mut prev_block = None;
Expand Down Expand Up @@ -1000,20 +986,8 @@ pub mod test {

#[tokio::test]
async fn equal_share_same_participants_with_uncles() {
let consensus_manager = ConsensusManager::builder(Network::LocalNet).build().unwrap();
let coinbase_extras = Arc::new(RwLock::new(HashMap::<String, Vec<u8>>::new()));
let (stats_tx, _) = tokio::sync::broadcast::channel(1000);
let stats_broadcast_client = StatsBroadcastClient::new(stats_tx);
let static_coinbase_extra = Vec::new();
let share_chain = InMemoryShareChain::new(
Config::default(),
PowAlgorithm::Sha3x,
None,
consensus_manager,
coinbase_extras,
stats_broadcast_client,
)
.unwrap();
let share_chain = new_chain();

let mut timestamp = EpochTime::now();
let mut prev_block = None;
Expand Down Expand Up @@ -1160,20 +1134,7 @@ pub mod test {

#[tokio::test]
async fn chain_start() {
let consensus_manager = ConsensusManager::builder(Network::LocalNet).build().unwrap();
let coinbase_extras = Arc::new(RwLock::new(HashMap::<String, Vec<u8>>::new()));
let (stats_tx, _) = tokio::sync::broadcast::channel(1000);
let stats_broadcast_client = StatsBroadcastClient::new(stats_tx);
let share_chain = InMemoryShareChain::new(
Config::default(),
PowAlgorithm::Sha3x,
None,
consensus_manager,
coinbase_extras,
stats_broadcast_client,
)
.unwrap();

let share_chain = new_chain();
let static_coinbase_extra = Vec::new();
let mut new_tip = share_chain
.generate_new_tip_block(&new_random_address(), static_coinbase_extra.clone())
Expand Down
43 changes: 37 additions & 6 deletions src/sharechain/lmdb_block_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::{
};

use anyhow::{anyhow, Error};
use log::info;
use log::{error, info};
use rkv::{
backend::{BackendInfo, Lmdb, LmdbEnvironment},
Manager,
Expand All @@ -50,8 +50,14 @@ pub(crate) struct LmdbBlockStorage {
impl LmdbBlockStorage {
#[cfg(test)]
pub fn new_from_temp_dir() -> Self {
use rand::{distributions::Alphanumeric, Rng};
use tempfile::Builder;
let root = Builder::new().prefix("p2pool").tempdir().unwrap();
let instance: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(7)
.map(char::from)
.collect();
let root = Builder::new().prefix("p2pool").suffix(&instance).tempdir().unwrap();
fs::create_dir_all(root.path()).unwrap();
let path = root.path();
let mut manager = Manager::<LmdbEnvironment>::singleton().write().unwrap();
Expand Down Expand Up @@ -93,6 +99,16 @@ impl BlockCache for LmdbBlockStorage {
None
}

fn delete(&self, hash: &BlockHash) {
let env = self.file_handle.read().expect("reader");
let store = env.open_single("block_cache", StoreOptions::create()).unwrap();
let mut writer = env.write().expect("writer");
store.delete(&mut writer, hash.as_bytes()).unwrap();
if let Err(e) = writer.commit() {
error!(target: LOG_TARGET, "Error deleting block from lmdb: {:?}", e);
}
}

fn insert(&self, hash: BlockHash, block: Arc<P2Block>) {
// Retry if the map is full
// This weird pattern of setting a bool is so that the env is closed before resizing, otherwise
Expand All @@ -106,7 +122,6 @@ impl BlockCache for LmdbBlockStorage {
// next_resize = false;
}
let store = env.open_single("block_cache", StoreOptions::create()).unwrap();
// dbg!(_retry);
let mut writer = env.write().expect("writer");
let block_blob = serialize_message(&block).unwrap();
match store.put(&mut writer, hash.as_bytes(), &rkv::Value::Blob(&block_blob)) {
Expand Down Expand Up @@ -159,13 +174,13 @@ impl BlockCache for LmdbBlockStorage {

fn resize_db(env: &Rkv<LmdbEnvironment>) {
let size = env.info().map(|i| i.map_size()).unwrap_or(0);
// dbg!(size);
// let new_size = (size as f64 * 1.2f64).ceil() as usize;
let new_size = size * 2;
env.set_map_size(new_size).unwrap();
}
pub trait BlockCache {
fn get(&self, hash: &BlockHash) -> Option<Arc<P2Block>>;
fn delete(&self, hash: &BlockHash);
fn insert(&self, hash: BlockHash, block: Arc<P2Block>);
fn all_blocks(&self) -> Result<Vec<Arc<P2Block>>, Error>;
}
Expand Down Expand Up @@ -193,12 +208,16 @@ pub mod test {
self.blocks.read().unwrap().get(hash).cloned()
}

fn delete(&self, hash: &BlockHash) {
self.blocks.write().unwrap().remove(hash);
}

fn insert(&self, hash: BlockHash, block: Arc<P2Block>) {
self.blocks.write().unwrap().insert(hash, block);
}

fn all_blocks(&self) -> Vec<Arc<P2Block>> {
self.blocks.read().unwrap().values().cloned().collect()
fn all_blocks(&self) -> Result<Vec<Arc<P2Block>>, Error> {
Ok(self.blocks.read().unwrap().values().cloned().collect())
}
}

Expand All @@ -211,4 +230,16 @@ pub mod test {
let retrieved_block = cache.get(&hash).unwrap();
assert_eq!(block, retrieved_block);
}

#[test]
fn test_deleting_blocks() {
let cache = LmdbBlockStorage::new_from_temp_dir();
let block = Arc::new(P2Block::default());
let hash = block.hash;
cache.insert(hash, block.clone());
let retrieved_block = cache.get(&hash).unwrap();
assert_eq!(block, retrieved_block);
cache.delete(&hash);
assert!(cache.get(&hash).is_none());
}
}
Loading

0 comments on commit 2384ff1

Please sign in to comment.