From 35ebf4cf1d40d645faadc865a179684b68cd4568 Mon Sep 17 00:00:00 2001 From: stringhandler Date: Mon, 23 Dec 2024 22:05:36 +0200 Subject: [PATCH] feat: add restore on lmdb --- src/cli/commands/util.rs | 1 + src/server/config.rs | 7 ++++ src/server/p2p/network.rs | 27 +++++------- src/server/p2p/setup.rs | 6 +-- src/sharechain/error.rs | 2 + src/sharechain/in_memory.rs | 61 +++++++++++++++++++++++++--- src/sharechain/lmdb_block_storage.rs | 45 +++++++++++++++++++- src/sharechain/p2chain.rs | 50 +++++++++++++++-------- 8 files changed, 157 insertions(+), 42 deletions(-) diff --git a/src/cli/commands/util.rs b/src/cli/commands/util.rs index 3a7a40a..a2c62cd 100644 --- a/src/cli/commands/util.rs +++ b/src/cli/commands/util.rs @@ -118,6 +118,7 @@ pub async fn server( } config_builder.with_base_node_address(args.base_node_address.clone()); + config_builder.with_block_cache_file(env::current_dir()?.join("block_cache")); let config = config_builder.build(); let randomx_factory = RandomXFactory::new(1); let consensus_manager = ConsensusManager::builder(Network::get_current_or_user_setting_or_default()).build()?; diff --git a/src/server/config.rs b/src/server/config.rs index ccc5dc1..9610002 100644 --- a/src/server/config.rs +++ b/src/server/config.rs @@ -23,6 +23,7 @@ pub struct Config { pub max_relay_circuits_per_peer: Option, pub block_time: u64, pub share_window: u64, + pub block_cache_file: PathBuf, } impl Default for Config { @@ -41,6 +42,7 @@ impl Default for Config { max_relay_circuits_per_peer: None, block_time: 20, share_window: 2160, + block_cache_file: PathBuf::from("block_cache"), } } } @@ -196,6 +198,11 @@ impl ConfigBuilder { self } + pub fn with_block_cache_file(&mut self, config: PathBuf) -> &mut Self { + self.config.block_cache_file = config; + self + } + pub fn build(&self) -> Config { self.config.clone() } diff --git a/src/server/p2p/network.rs b/src/server/p2p/network.rs index ec3ff4d..f3d95c8 100644 --- a/src/server/p2p/network.rs +++ b/src/server/p2p/network.rs @@ -350,6 +350,7 @@ where S: ShareChain stats_broadcast_client: StatsBroadcastClient, ) -> Result { let swarm = setup::new_swarm(config).await?; + info!(target: LOG_TARGET, "Swarm created. Our id: {}", swarm.local_peer_id()); // client related channels let (broadcast_block_tx, broadcast_block_rx) = broadcast::channel::(100); @@ -846,21 +847,15 @@ where S: ShareChain .await .best_peers_to_share(num_peers, &request.known_peer_ids); let my_best_peers: Vec<_> = my_best_peers.into_iter().map(|p| p.peer_info).collect(); - if self - .swarm - .behaviour_mut() - .direct_peer_exchange - .send_response( - channel, - Ok(DirectPeerInfoResponse { - peer_id: local_peer_id.to_base58(), - info, - best_peers: my_best_peers, - }), - ) - .is_err() - { - error!(target: LOG_TARGET, squad = &self.config.squad; "Failed to send peer info response"); + if let Err(e) = self.swarm.behaviour_mut().direct_peer_exchange.send_response( + channel, + Ok(DirectPeerInfoResponse { + peer_id: local_peer_id.to_base58(), + info, + best_peers: my_best_peers, + }), + ) { + error!(target: LOG_TARGET, squad = &self.config.squad; "Failed to send peer info response to {:?}: {:?}", source_peer, e); } } else { error!(target: LOG_TARGET, squad = &self.config.squad; "Failed to create peer info"); @@ -2255,7 +2250,7 @@ where S: ShareChain num_dialed += 1; // We can only do 30 connections // after 30 it starts cancelling dials - if num_dialed > 5 { + if num_dialed > 80 { break; } } diff --git a/src/server/p2p/setup.rs b/src/server/p2p/setup.rs index 3fd676a..950112d 100644 --- a/src/server/p2p/setup.rs +++ b/src/server/p2p/setup.rs @@ -151,14 +151,14 @@ pub(crate) async fn new_swarm(config: &config::Config) -> Result>::new( [( StreamProtocol::new(DIRECT_PEER_EXCHANGE_REQ_RESP_PROTOCOL), request_response::ProtocolSupport::Full, )], - request_response::Config::default().with_request_timeout(Duration::from_secs(10)), // 10 is the default + request_response::Config::default().with_request_timeout(Duration::from_secs(60)), // 10 is the default ), catch_up_sync: cbor::Behaviour::>::new( [( @@ -180,7 +180,7 @@ pub(crate) async fn new_swarm(config: &config::Config) -> Result { + let _unused = + stat_client.send_chain_changed(pow_algo, p.get_height(), p.get_max_chain_length() as u64); + + p2chain = Some(p); + }, + Err(e) => error!(target: LOG_TARGET, "Could not load chain from file: {}", e), + }; + + // fs::remove_dir_all(bkp_file.as_path()) + // .map_err(|e| anyhow::anyhow!("Could not remove old block cache file:{:?}", e))?; + } + + if p2chain.is_none() { + let block_cache = LmdbBlockStorage::new_from_path(&data_path); + p2chain = Some(P2Chain::new_empty( + pow_algo, + config.share_window * 2, + config.share_window, + config.block_time, + block_cache, + )); + } + + let p2chain = p2chain.unwrap(); + + Ok(Self { + p2_chain: Arc::new(RwLock::new(p2chain)), pow_algo, block_validation_params, consensus_manager, diff --git a/src/sharechain/lmdb_block_storage.rs b/src/sharechain/lmdb_block_storage.rs index a177c14..6455341 100644 --- a/src/sharechain/lmdb_block_storage.rs +++ b/src/sharechain/lmdb_block_storage.rs @@ -23,9 +23,12 @@ use std::{ fs, + path::Path, sync::{Arc, RwLock}, }; +use anyhow::{anyhow, Error}; +use log::info; use rkv::{ backend::{BackendInfo, Lmdb, LmdbEnvironment}, Manager, @@ -35,17 +38,19 @@ use rkv::{ }; use tari_common_types::types::BlockHash; use tari_utilities::ByteArray; -use tempfile::Builder; use super::P2Block; use crate::server::p2p::messages::{deserialize_message, serialize_message}; +const LOG_TARGET: &str = "tari::p2pool::sharechain::lmdb_block_storage"; pub(crate) struct LmdbBlockStorage { file_handle: Arc>>, } impl LmdbBlockStorage { + #[cfg(test)] pub fn new_from_temp_dir() -> Self { + use tempfile::Builder; let root = Builder::new().prefix("p2pool").tempdir().unwrap(); fs::create_dir_all(root.path()).unwrap(); let path = root.path(); @@ -54,6 +59,18 @@ impl LmdbBlockStorage { Self { file_handle } } + + pub fn new_from_path(path: &Path) -> Self { + info!(target: LOG_TARGET, "Using block storage at {:?}", path); + if !fs::exists(path).expect("could not get file") { + fs::create_dir_all(path).unwrap(); + // fs::File::create(path).unwrap(); + } + let mut manager = Manager::::singleton().write().unwrap(); + let file_handle = manager.get_or_create(path, Rkv::new::).unwrap(); + + Self { file_handle } + } } impl BlockCache for LmdbBlockStorage { @@ -117,6 +134,27 @@ impl BlockCache for LmdbBlockStorage { } } } + + fn all_blocks(&self) -> Result>, Error> { + let env = self.file_handle.read().expect("reader"); + let store = env.open_single("block_cache", StoreOptions::create()).unwrap(); + let reader = env.read().expect("reader"); + let mut res = vec![]; + let iter = store.iter_start(&reader)?; + for r in iter { + let (_k, v) = r?; + match v { + rkv::Value::Blob(b) => { + let block = Arc::new(deserialize_message(b).unwrap()); + res.push(block); + }, + _ => { + return Err(anyhow!("Invalid block in storage")); + }, + } + } + Ok(res) + } } fn resize_db(env: &Rkv) { @@ -129,6 +167,7 @@ fn resize_db(env: &Rkv) { pub trait BlockCache { fn get(&self, hash: &BlockHash) -> Option>; fn insert(&self, hash: BlockHash, block: Arc); + fn all_blocks(&self) -> Result>, Error>; } #[cfg(test)] @@ -157,6 +196,10 @@ pub mod test { fn insert(&self, hash: BlockHash, block: Arc) { self.blocks.write().unwrap().insert(hash, block); } + + fn all_blocks(&self) -> Vec> { + self.blocks.read().unwrap().values().cloned().collect() + } } #[test] diff --git a/src/sharechain/p2chain.rs b/src/sharechain/p2chain.rs index eff0f43..864624a 100644 --- a/src/sharechain/p2chain.rs +++ b/src/sharechain/p2chain.rs @@ -137,6 +137,39 @@ pub struct P2Chain { } impl P2Chain { + pub fn new_empty(algo: PowAlgorithm, total_size: u64, share_window: u64, block_time: u64, block_cache: T) -> Self { + let levels = HashMap::new(); + let lwma = + LinearWeightedMovingAverage::new(DIFFICULTY_ADJUSTMENT_WINDOW, block_time).expect("Failed to create LWMA"); + Self { + algo, + block_cache: Arc::new(block_cache), + block_time, + cached_shares: None, + levels, + total_size, + share_window, + current_tip: 0, + lwma, + } + } + + pub fn try_load( + algo: PowAlgorithm, + total_size: u64, + share_window: u64, + block_time: u64, + from_block_cache: T, + new_block_cache: T, + ) -> Result { + let mut new_chain = Self::new_empty(algo, total_size, share_window, block_time, new_block_cache); + for block in from_block_cache.all_blocks()? { + info!(target: LOG_TARGET, "Loading block {}({:x}{:x}{:x}{:x}) into chain", block.height, block.hash[0], block.hash[1], block.hash[2], block.hash[3]); + new_chain.add_block_to_chain(block)?; + } + Ok(new_chain) + } + pub fn total_accumulated_tip_difficulty(&self) -> AccumulatedDifficulty { match self.get_tip() { Some(tip) => tip @@ -180,23 +213,6 @@ impl P2Chain { level.get(&level.chain_block()) } - pub fn new_empty(algo: PowAlgorithm, total_size: u64, share_window: u64, block_time: u64, block_cache: T) -> Self { - let levels = HashMap::new(); - let lwma = - LinearWeightedMovingAverage::new(DIFFICULTY_ADJUSTMENT_WINDOW, block_time).expect("Failed to create LWMA"); - Self { - algo, - block_cache: Arc::new(block_cache), - block_time, - cached_shares: None, - levels, - total_size, - share_window, - current_tip: 0, - lwma, - } - } - fn cleanup_chain(&mut self) -> Result<(), ShareChainError> { let mut current_chain_length = self.levels.len() as u64; // let see if we are the limit for the current chain