Skip to content

Commit

Permalink
feat: more lmdb optimizations
Browse files Browse the repository at this point in the history
  • Loading branch information
stringhandler committed Dec 20, 2024
1 parent be1abd4 commit 64a8a38
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 55 deletions.
2 changes: 1 addition & 1 deletion src/server/http/stats/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use axum::{
use log::{error, info};
use serde::Serialize;
use tari_core::proof_of_work::PowAlgorithm;
use tari_utilities::{epoch_time::EpochTime, hex::Hex};
use tari_utilities::{encoding::Base58, epoch_time::EpochTime, hex::Hex};
use tokio::sync::oneshot;

use super::MAX_ACCEPTABLE_HTTP_TIMEOUT;
Expand Down
2 changes: 1 addition & 1 deletion src/server/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use tari_common::configuration::Network;
use tari_common_types::types::FixedHash;
use tari_core::proof_of_work::PowAlgorithm;
use tari_shutdown::ShutdownSignal;
use tari_utilities::{epoch_time::EpochTime, hex::Hex};
use tari_utilities::{encoding::Base58, epoch_time::EpochTime, hex::Hex};
use tokio::{
select,
sync::{
Expand Down
3 changes: 2 additions & 1 deletion src/sharechain/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tari_core::{
PowAlgorithm,
},
};
use tari_utilities::{epoch_time::EpochTime, hex::Hex};
use tari_utilities::{encoding::Base58, epoch_time::EpochTime, hex::Hex};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};

use super::{
Expand Down Expand Up @@ -76,6 +76,7 @@ impl InMemoryShareChain {

Ok(Self {
p2_chain: Arc::new(RwLock::new(P2Chain::new_empty(
pow_algo,
config.share_window * 2,
config.share_window,
config.block_time,
Expand Down
92 changes: 63 additions & 29 deletions src/sharechain/p2chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ use std::{

use itertools::Itertools;
use log::*;
use minotari_app_grpc::tari_rpc::PowAlgo;
use tari_common_types::types::FixedHash;
use tari_core::proof_of_work::{lwma_diff::LinearWeightedMovingAverage, AccumulatedDifficulty};
use tari_core::proof_of_work::{lwma_diff::LinearWeightedMovingAverage, AccumulatedDifficulty, PowAlgorithm};
use tari_utilities::hex::Hex;

use super::lmdb_block_storage::BlockCache;
use super::{lmdb_block_storage::BlockCache, p2chain_level::P2BlockHeader};
use crate::sharechain::{
error::ShareChainError,
in_memory::MAX_UNCLE_AGE,
Expand Down Expand Up @@ -125,6 +126,7 @@ impl Display for ChainAddResult {
}

pub struct P2Chain<T: BlockCache> {
pub algo: PowAlgorithm,
pub block_time: u64,
block_cache: Arc<T>,
pub cached_shares: Option<HashMap<String, (u64, Vec<u8>)>>,
Expand Down Expand Up @@ -164,6 +166,11 @@ impl<T: BlockCache> P2Chain<T> {
level.get(hash)
}

pub fn get_block_header_at_height(&self, height: u64, hash: &FixedHash) -> Option<P2BlockHeader> {
let level = self.level_at_height(height)?;
level.get_header(hash)
}

pub fn block_exists(&self, height: u64, hash: &FixedHash) -> bool {
self.level_at_height(height).map_or(false, |level| level.contains(hash))
}
Expand All @@ -174,11 +181,12 @@ impl<T: BlockCache> P2Chain<T> {
level.get(&level.chain_block())
}

pub fn new_empty(total_size: u64, share_window: u64, block_time: u64, block_cache: T) -> Self {
pub fn new_empty(algo: PowAlgorithm, total_size: u64, share_window: u64, block_time: u64, block_cache: T) -> Self {
let levels = HashMap::new();
let lwma =
LinearWeightedMovingAverage::new(DIFFICULTY_ADJUSTMENT_WINDOW, block_time).expect("Failed to create LWMA");
Self {
algo,
block_cache: Arc::new(block_cache),
block_time,
cached_shares: None,
Expand Down Expand Up @@ -259,22 +267,26 @@ impl<T: BlockCache> P2Chain<T> {
trace!(target: LOG_TARGET, "Trying to verify new block to add: {}:{}", new_block_height, &hash.to_hex()[0..8]);
// we should validate what we can if a block is invalid, we should delete it.
let mut new_tip = ChainAddResult::default();
let block = self
.get_block_at_height(new_block_height, &hash)
.ok_or(ShareChainError::BlockNotFound)?
.clone();
let algo = block.original_header.pow.pow_algo;
let block_prev_hash = self
.get_parent_of(new_block_height, &hash)
.ok_or(ShareChainError::BlockNotFound)?;
// let block = self
// .get_block_at_height(new_block_height, &hash)
// .ok_or(ShareChainError::BlockNotFound)?
// .clone();
// let algo = block.original_header.pow.pow_algo;
// do we know of the parent
// we should not check the chain start for parents
if block.height != 0 {
if !self.block_exists(new_block_height.saturating_sub(1), &block.prev_hash) {
if new_block_height != 0 {
if !self.block_exists(new_block_height.saturating_sub(1), &block_prev_hash) {
// we dont know the parent
new_tip
.missing_blocks
.insert(block.prev_hash, new_block_height.saturating_sub(1));
.insert(block_prev_hash, new_block_height.saturating_sub(1));
}
// now lets check the uncles
for uncle in &block.uncles {

for uncle in &self.get_uncles(new_block_height, &hash) {
if let Some(uncle_parent_hash) = self.get_parent_of(uncle.0, &uncle.1) {
if !self.block_exists(uncle.0.saturating_sub(1), &uncle_parent_hash) {
new_tip
Expand All @@ -295,10 +307,12 @@ impl<T: BlockCache> P2Chain<T> {
self.verify_block(hash, new_block_height)?;
// we have to reload the block to check if verified is set to true now
let block = self
.get_block_at_height(new_block_height, &hash)
.get_block_header_at_height(new_block_height, &hash)
.ok_or(ShareChainError::BlockNotFound)?
.clone();

let algo = self.algo;

// edge case for chain start
if self.get_tip().is_none() && new_block_height == 0 {
self.set_new_tip(new_block_height, hash)?;
Expand Down Expand Up @@ -349,18 +363,21 @@ impl<T: BlockCache> P2Chain<T> {
if current_counting_block.height == 0 {
break;
}
if let Some(parent) = self.get_parent_block(&current_counting_block) {
if let Some(parent) = self.get_block_header_at_height(
current_counting_block.height.saturating_sub(1),
&current_counting_block.prev_hash,
) {
if !parent.verified {
all_blocks_verified = false;
// so this block is unverified, we cannot count it but lets see if it just misses some blocks so
// we can ask for them
if self.get_parent_block(&parent).is_none() {
if !self.block_exists(parent.height.saturating_sub(1), &parent.prev_hash) {
new_tip
.missing_blocks
.insert(parent.prev_hash, parent.height.saturating_sub(1));
}
for uncle in &parent.uncles {
if self.get_block_at_height(uncle.0, &uncle.1).is_none() {
if !self.block_exists(uncle.0, &uncle.1) {
new_tip.missing_blocks.insert(uncle.1, uncle.0);
}
}
Expand All @@ -385,7 +402,12 @@ impl<T: BlockCache> P2Chain<T> {
break;
}
// we can unwrap as we now the parent exists
current_counting_block = self.get_parent_block(&current_counting_block).unwrap().clone();
current_counting_block = self
.get_block_header_at_height(
current_counting_block.height.saturating_sub(1),
&current_counting_block.prev_hash,
)
.ok_or(ShareChainError::BlockNotFound)?;
}

if !all_blocks_verified {
Expand All @@ -397,13 +419,13 @@ impl<T: BlockCache> P2Chain<T> {
let next_level_data = self.calculate_next_level_data(new_block_height, hash);
return Ok((new_tip, next_level_data));
}
if block.total_pow() > self.total_accumulated_tip_difficulty() {
if block.total_pow > self.total_accumulated_tip_difficulty() {
new_tip.set_new_tip(hash, new_block_height);
// we need to reorg the chain
// lets start by resetting the lwma
self.lwma = LinearWeightedMovingAverage::new(DIFFICULTY_ADJUSTMENT_WINDOW, self.block_time)
.expect("Failed to create LWMA");
self.lwma.add_front(block.timestamp, block.target_difficulty());
self.lwma.add_front(block.timestamp, block.target_difficulty);
let chain_height = self
.level_at_height(block.height)
.ok_or(ShareChainError::BlockLevelNotFound)?;
Expand All @@ -426,13 +448,13 @@ impl<T: BlockCache> P2Chain<T> {
let parent_level = self.level_at_height(current_block.height.saturating_sub(1)).unwrap();
if current_block.prev_hash != parent_level.chain_block() {
// safety check
let nextblock = parent_level.get(&current_block.prev_hash);
let nextblock = parent_level.get_header(&current_block.prev_hash);
if nextblock.is_none() {
error!(target: LOG_TARGET, "FATAL: Reorging (block in chain) failed because parent block was not found and chain data is corrupted.");
panic!(
"FATAL: Reorging (block in chain) failed because parent block was not found and chain \
data is corrupted. current_block: {:?}, current tip: {:?}",
current_block,
current_block.height,
self.get_tip().map(|t| t.height())
);
}
Expand All @@ -441,25 +463,25 @@ impl<T: BlockCache> P2Chain<T> {
mut_parent_level.set_chain_block(current_block.prev_hash);
current_block = nextblock.unwrap().clone();
self.lwma
.add_front(current_block.timestamp, current_block.target_difficulty());
.add_front(current_block.timestamp, current_block.target_difficulty);
} else if !self.lwma.is_full() {
// we still need more blocks to fill up the lwma
let nextblock = parent_level.get(&current_block.prev_hash);
let nextblock = parent_level.get_header(&current_block.prev_hash);
if nextblock.is_none() {
error!(target: LOG_TARGET, "FATAL: Reorging (block not in chain) failed because parent block was not found and chain data is corrupted.");
panic!(
"FATAL: Could not calculate LMWA while reorging (block in chain) failed because \
parent block was not found and chain data is corrupted. current_block: {:?}, current \
tip: {:?}",
current_block,
current_block.height,
self.get_tip().map(|t| t.height())
);
}

current_block = nextblock.unwrap().clone();

self.lwma
.add_front(current_block.timestamp, current_block.target_difficulty());
.add_front(current_block.timestamp, current_block.target_difficulty);
} else {
break;
}
Expand Down Expand Up @@ -494,16 +516,23 @@ impl<T: BlockCache> P2Chain<T> {
next_level_data
}

// this assumes it has no missing parents
fn verify_block(&mut self, hash: FixedHash, height: u64) -> Result<(), ShareChainError> {
fn is_verified(&self, hash: &FixedHash, height: u64) -> Result<bool, ShareChainError> {
let level = self
.level_at_height(height)
.ok_or(ShareChainError::BlockLevelNotFound)?;
let block = level.get(&hash).ok_or(ShareChainError::BlockNotFound)?;
if block.verified {
Ok(level.is_verified(hash))
}

// this assumes it has no missing parents
fn verify_block(&mut self, hash: FixedHash, height: u64) -> Result<(), ShareChainError> {
if self.is_verified(&hash, height)? {
return Ok(());
}
let verified = true;
let level = self
.level_at_height(height)
.ok_or(ShareChainError::BlockLevelNotFound)?;
let block = level.get(&hash).ok_or(ShareChainError::BlockNotFound)?;

// lets check the total accumulated difficulty
let mut total_work = AccumulatedDifficulty::from_u128(u128::from(block.target_difficulty().as_u64()))
Expand Down Expand Up @@ -592,6 +621,11 @@ impl<T: BlockCache> P2Chain<T> {
level.get_prev_hash(hash)
}

pub fn get_uncles(&self, height: u64, hash: &FixedHash) -> Vec<(u64, FixedHash)> {
let level = self.level_at_height(height).unwrap();
level.get_uncles(hash)
}

pub fn get_parent_block(&self, block: &P2Block) -> Option<Arc<P2Block>> {
let parent_height = match block.height.checked_sub(1) {
Some(height) => height,
Expand Down
Loading

0 comments on commit 64a8a38

Please sign in to comment.