Skip to content

Commit

Permalink
feat: add restore on lmdb
Browse files Browse the repository at this point in the history
  • Loading branch information
stringhandler committed Dec 23, 2024
1 parent 01e3f40 commit 35ebf4c
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 42 deletions.
1 change: 1 addition & 0 deletions src/cli/commands/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ pub async fn server(
}
config_builder.with_base_node_address(args.base_node_address.clone());

config_builder.with_block_cache_file(env::current_dir()?.join("block_cache"));
let config = config_builder.build();
let randomx_factory = RandomXFactory::new(1);
let consensus_manager = ConsensusManager::builder(Network::get_current_or_user_setting_or_default()).build()?;
Expand Down
7 changes: 7 additions & 0 deletions src/server/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub struct Config {
pub max_relay_circuits_per_peer: Option<usize>,
pub block_time: u64,
pub share_window: u64,
pub block_cache_file: PathBuf,
}

impl Default for Config {
Expand All @@ -41,6 +42,7 @@ impl Default for Config {
max_relay_circuits_per_peer: None,
block_time: 20,
share_window: 2160,
block_cache_file: PathBuf::from("block_cache"),
}
}
}
Expand Down Expand Up @@ -196,6 +198,11 @@ impl ConfigBuilder {
self
}

pub fn with_block_cache_file(&mut self, config: PathBuf) -> &mut Self {
self.config.block_cache_file = config;
self
}

pub fn build(&self) -> Config {
self.config.clone()
}
Expand Down
27 changes: 11 additions & 16 deletions src/server/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ where S: ShareChain
stats_broadcast_client: StatsBroadcastClient,
) -> Result<Self, Error> {
let swarm = setup::new_swarm(config).await?;
info!(target: LOG_TARGET, "Swarm created. Our id: {}", swarm.local_peer_id());

// client related channels
let (broadcast_block_tx, broadcast_block_rx) = broadcast::channel::<NotifyNewTipBlock>(100);
Expand Down Expand Up @@ -846,21 +847,15 @@ where S: ShareChain
.await
.best_peers_to_share(num_peers, &request.known_peer_ids);
let my_best_peers: Vec<_> = my_best_peers.into_iter().map(|p| p.peer_info).collect();
if self
.swarm
.behaviour_mut()
.direct_peer_exchange
.send_response(
channel,
Ok(DirectPeerInfoResponse {
peer_id: local_peer_id.to_base58(),
info,
best_peers: my_best_peers,
}),
)
.is_err()
{
error!(target: LOG_TARGET, squad = &self.config.squad; "Failed to send peer info response");
if let Err(e) = self.swarm.behaviour_mut().direct_peer_exchange.send_response(
channel,
Ok(DirectPeerInfoResponse {
peer_id: local_peer_id.to_base58(),
info,
best_peers: my_best_peers,
}),
) {
error!(target: LOG_TARGET, squad = &self.config.squad; "Failed to send peer info response to {:?}: {:?}", source_peer, e);
}
} else {
error!(target: LOG_TARGET, squad = &self.config.squad; "Failed to create peer info");
Expand Down Expand Up @@ -2255,7 +2250,7 @@ where S: ShareChain
num_dialed += 1;
// We can only do 30 connections
// after 30 it starts cancelling dials
if num_dialed > 5 {
if num_dialed > 80 {
break;
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/server/p2p/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,14 @@ pub(crate) async fn new_swarm(config: &config::Config) -> Result<Swarm<ServerNet
StreamProtocol::new(SHARE_CHAIN_SYNC_REQ_RESP_PROTOCOL),
request_response::ProtocolSupport::Full,
)],
request_response::Config::default().with_request_timeout(Duration::from_secs(10)), // 10 is the default
request_response::Config::default().with_request_timeout(Duration::from_secs(30)), // 10 is the default
),
direct_peer_exchange: cbor::Behaviour::<DirectPeerInfoRequest, Result<DirectPeerInfoResponse, String>>::new(
[(
StreamProtocol::new(DIRECT_PEER_EXCHANGE_REQ_RESP_PROTOCOL),
request_response::ProtocolSupport::Full,
)],
request_response::Config::default().with_request_timeout(Duration::from_secs(10)), // 10 is the default
request_response::Config::default().with_request_timeout(Duration::from_secs(60)), // 10 is the default
),
catch_up_sync: cbor::Behaviour::<CatchUpSyncRequest, Result<CatchUpSyncResponse, String>>::new(
[(
Expand All @@ -180,7 +180,7 @@ pub(crate) async fn new_swarm(config: &config::Config) -> Result<Swarm<ServerNet
dcutr: dcutr::Behaviour::new(key_pair.public().to_peer_id()),
autonat: autonat::Behaviour::new(key_pair.public().to_peer_id(), Default::default()),
connection_limits: connection_limits::Behaviour::new(ConnectionLimits::default().with_max_established_incoming(config.max_incoming_connections).with_max_established_outgoing(config.max_outgoing_connections)),
ping: ping::Behaviour::new(ping::Config::default())
ping: ping::Behaviour::new(ping::Config::default().with_timeout(Duration::from_secs(30))),
})
})
?
Expand Down
2 changes: 2 additions & 0 deletions src/sharechain/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ pub enum ShareChainError {
UncleParentNotInMainChain,
#[error("Block does not have correct total work accumulated")]
BlockTotalWorkMismatch,
#[error("Other: {0}")]
Anyhow(#[from] anyhow::Error),
}

#[derive(Error, Debug)]
Expand Down
61 changes: 56 additions & 5 deletions src/sharechain/in_memory.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::{cmp, collections::HashMap, str::FromStr, sync::Arc};
use std::{cmp, collections::HashMap, fs, str::FromStr, sync::Arc};

use anyhow::anyhow;
use async_trait::async_trait;
use log::*;
use minotari_app_grpc::tari_rpc::NewBlockCoinbase;
Expand Down Expand Up @@ -74,14 +75,64 @@ impl InMemoryShareChain {
return Err(ShareChainError::MissingBlockValidationParams);
}

Ok(Self {
p2_chain: Arc::new(RwLock::new(P2Chain::new_empty(
let data_path = config.block_cache_file.join(pow_algo.to_string());

let mut p2chain = None;
if fs::exists(&data_path).map_err(|e| anyhow!("block cache file errored when checking exists: {}", e))? {
let bkp_file = config
.block_cache_file
.as_path()
.parent()
.ok_or_else(|| anyhow!("Block cache file has no parent"))?
.join("block_cache_backup")
.join(pow_algo.to_string());
info!(target: LOG_TARGET, "Found old block cache file, renaming from {:?} to {:?}", data_path.as_path(), &bkp_file);

// First remove the old backup file
let _ = fs::remove_dir_all(bkp_file.as_path())
.inspect_err(|e| error!(target: LOG_TARGET, "Could not remove old block cache file:{:?}", e));
fs::create_dir_all(bkp_file.parent().unwrap())
.map_err(|e| anyhow::anyhow!("Could not create block cache backup directory:{:?}", e))?;
fs::rename(data_path.as_path(), bkp_file.as_path())
.map_err(|e| anyhow::anyhow!("Could not rename file to old file:{:?}", e))?;
let old = LmdbBlockStorage::new_from_path(bkp_file.as_path());
let new = LmdbBlockStorage::new_from_path(&data_path);
match P2Chain::try_load(
pow_algo,
config.share_window * 2,
config.share_window,
config.block_time,
LmdbBlockStorage::new_from_temp_dir(),
))),
old,
new,
) {
Ok(p) => {
let _unused =
stat_client.send_chain_changed(pow_algo, p.get_height(), p.get_max_chain_length() as u64);

p2chain = Some(p);
},
Err(e) => error!(target: LOG_TARGET, "Could not load chain from file: {}", e),
};

// fs::remove_dir_all(bkp_file.as_path())
// .map_err(|e| anyhow::anyhow!("Could not remove old block cache file:{:?}", e))?;
}

if p2chain.is_none() {
let block_cache = LmdbBlockStorage::new_from_path(&data_path);
p2chain = Some(P2Chain::new_empty(
pow_algo,
config.share_window * 2,
config.share_window,
config.block_time,
block_cache,
));
}

let p2chain = p2chain.unwrap();

Ok(Self {
p2_chain: Arc::new(RwLock::new(p2chain)),
pow_algo,
block_validation_params,
consensus_manager,
Expand Down
45 changes: 44 additions & 1 deletion src/sharechain/lmdb_block_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@

use std::{
fs,
path::Path,
sync::{Arc, RwLock},
};

use anyhow::{anyhow, Error};
use log::info;
use rkv::{
backend::{BackendInfo, Lmdb, LmdbEnvironment},
Manager,
Expand All @@ -35,17 +38,19 @@ use rkv::{
};
use tari_common_types::types::BlockHash;
use tari_utilities::ByteArray;
use tempfile::Builder;

use super::P2Block;
use crate::server::p2p::messages::{deserialize_message, serialize_message};

const LOG_TARGET: &str = "tari::p2pool::sharechain::lmdb_block_storage";
pub(crate) struct LmdbBlockStorage {
file_handle: Arc<RwLock<Rkv<LmdbEnvironment>>>,
}

impl LmdbBlockStorage {
#[cfg(test)]
pub fn new_from_temp_dir() -> Self {
use tempfile::Builder;
let root = Builder::new().prefix("p2pool").tempdir().unwrap();
fs::create_dir_all(root.path()).unwrap();
let path = root.path();
Expand All @@ -54,6 +59,18 @@ impl LmdbBlockStorage {

Self { file_handle }
}

pub fn new_from_path(path: &Path) -> Self {
info!(target: LOG_TARGET, "Using block storage at {:?}", path);
if !fs::exists(path).expect("could not get file") {
fs::create_dir_all(path).unwrap();
// fs::File::create(path).unwrap();
}
let mut manager = Manager::<LmdbEnvironment>::singleton().write().unwrap();
let file_handle = manager.get_or_create(path, Rkv::new::<Lmdb>).unwrap();

Self { file_handle }
}
}

impl BlockCache for LmdbBlockStorage {
Expand Down Expand Up @@ -117,6 +134,27 @@ impl BlockCache for LmdbBlockStorage {
}
}
}

fn all_blocks(&self) -> Result<Vec<Arc<P2Block>>, Error> {
let env = self.file_handle.read().expect("reader");
let store = env.open_single("block_cache", StoreOptions::create()).unwrap();
let reader = env.read().expect("reader");
let mut res = vec![];
let iter = store.iter_start(&reader)?;
for r in iter {
let (_k, v) = r?;
match v {
rkv::Value::Blob(b) => {
let block = Arc::new(deserialize_message(b).unwrap());
res.push(block);
},
_ => {
return Err(anyhow!("Invalid block in storage"));
},
}
}
Ok(res)
}
}

fn resize_db(env: &Rkv<LmdbEnvironment>) {
Expand All @@ -129,6 +167,7 @@ fn resize_db(env: &Rkv<LmdbEnvironment>) {
pub trait BlockCache {
fn get(&self, hash: &BlockHash) -> Option<Arc<P2Block>>;
fn insert(&self, hash: BlockHash, block: Arc<P2Block>);
fn all_blocks(&self) -> Result<Vec<Arc<P2Block>>, Error>;
}

#[cfg(test)]
Expand Down Expand Up @@ -157,6 +196,10 @@ pub mod test {
fn insert(&self, hash: BlockHash, block: Arc<P2Block>) {
self.blocks.write().unwrap().insert(hash, block);
}

fn all_blocks(&self) -> Vec<Arc<P2Block>> {
self.blocks.read().unwrap().values().cloned().collect()
}
}

#[test]
Expand Down
50 changes: 33 additions & 17 deletions src/sharechain/p2chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,39 @@ pub struct P2Chain<T: BlockCache> {
}

impl<T: BlockCache> P2Chain<T> {
pub fn new_empty(algo: PowAlgorithm, total_size: u64, share_window: u64, block_time: u64, block_cache: T) -> Self {
let levels = HashMap::new();
let lwma =
LinearWeightedMovingAverage::new(DIFFICULTY_ADJUSTMENT_WINDOW, block_time).expect("Failed to create LWMA");
Self {
algo,
block_cache: Arc::new(block_cache),
block_time,
cached_shares: None,
levels,
total_size,
share_window,
current_tip: 0,
lwma,
}
}

pub fn try_load(
algo: PowAlgorithm,
total_size: u64,
share_window: u64,
block_time: u64,
from_block_cache: T,
new_block_cache: T,
) -> Result<Self, ShareChainError> {
let mut new_chain = Self::new_empty(algo, total_size, share_window, block_time, new_block_cache);
for block in from_block_cache.all_blocks()? {
info!(target: LOG_TARGET, "Loading block {}({:x}{:x}{:x}{:x}) into chain", block.height, block.hash[0], block.hash[1], block.hash[2], block.hash[3]);
new_chain.add_block_to_chain(block)?;
}
Ok(new_chain)
}

pub fn total_accumulated_tip_difficulty(&self) -> AccumulatedDifficulty {
match self.get_tip() {
Some(tip) => tip
Expand Down Expand Up @@ -180,23 +213,6 @@ impl<T: BlockCache> P2Chain<T> {
level.get(&level.chain_block())
}

pub fn new_empty(algo: PowAlgorithm, total_size: u64, share_window: u64, block_time: u64, block_cache: T) -> Self {
let levels = HashMap::new();
let lwma =
LinearWeightedMovingAverage::new(DIFFICULTY_ADJUSTMENT_WINDOW, block_time).expect("Failed to create LWMA");
Self {
algo,
block_cache: Arc::new(block_cache),
block_time,
cached_shares: None,
levels,
total_size,
share_window,
current_tip: 0,
lwma,
}
}

fn cleanup_chain(&mut self) -> Result<(), ShareChainError> {
let mut current_chain_length = self.levels.len() as u64;
// let see if we are the limit for the current chain
Expand Down

0 comments on commit 35ebf4c

Please sign in to comment.