Skip to content

Commit

Permalink
Merge pull request #39 from nervina-labs/db-diff
Browse files Browse the repository at this point in the history
Compare rocksdb and mysql to reduce smt building time
  • Loading branch information
duanyytop authored Jun 2, 2022
2 parents a9df959 + 6b01772 commit b255f8f
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 21 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "cota-registry-aggregator"
version = "0.4.5"
version = "0.4.6"
edition = "2018"

[dependencies]
Expand Down
17 changes: 9 additions & 8 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::schema::check_infos::dsl::block_number;
use crate::schema::check_infos::dsl::check_infos;
use crate::utils::parse_bytes_n;
use crate::POOL;
use cota_smt::smt::H256;
use diesel::r2d2::{self, ConnectionManager, Pool};
use diesel::*;
use jsonrpc_http_server::jsonrpc_core::serde_json::from_str;
Expand All @@ -22,13 +23,13 @@ pub fn init_connection_pool() -> SqlConnectionPool {
r2d2::Pool::builder().max_size(max).build(manager).unwrap()
}

pub fn get_registered_lock_hashes() -> Result<Vec<[u8; 32]>, Error> {
pub fn get_registered_lock_hashes() -> Result<Vec<H256>, Error> {
let conn = &POOL.clone().get().expect("Mysql pool connection error");
const PAGE_SIZE: i64 = 1000;
let mut lock_hashes: Vec<[u8; 32]> = Vec::new();
let mut leaves: Vec<H256> = Vec::new();
let mut page: i64 = 0;
loop {
let lock_hashes_page = register_cota_kv_pairs
let leaves_page = register_cota_kv_pairs
.select(lock_hash)
.limit(PAGE_SIZE)
.offset(PAGE_SIZE * page)
Expand All @@ -40,14 +41,14 @@ pub fn get_registered_lock_hashes() -> Result<Vec<[u8; 32]>, Error> {
},
|registries| Ok(parse_registry_cota_nft(registries)),
)?;
let length = lock_hashes_page.len();
lock_hashes.extend(lock_hashes_page);
let length = leaves_page.len();
leaves.extend(leaves_page);
if length < (PAGE_SIZE as usize) {
break;
}
page += 1;
}
Ok(lock_hashes)
Ok(leaves)
}

pub fn check_lock_hashes_registered(lock_hashes: Vec<[u8; 32]>) -> Result<(bool, u64), Error> {
Expand Down Expand Up @@ -79,9 +80,9 @@ pub fn get_syncer_tip_block_number() -> Result<u64, Error> {
})
}

fn parse_registry_cota_nft(registries: Vec<String>) -> Vec<[u8; 32]> {
fn parse_registry_cota_nft(registries: Vec<String>) -> Vec<H256> {
registries
.into_iter()
.map(|registry| parse_bytes_n(registry).unwrap())
.map(|registry| H256::from(parse_bytes_n::<32>(registry).unwrap()))
.collect()
}
2 changes: 1 addition & 1 deletion src/smt/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::db::check_lock_hashes_registered;
use crate::error::Error;
use crate::indexer::index::get_registry_smt_root;
use crate::smt::db::db::RocksDB;
use crate::smt::smt::{generate_history_smt, init_smt, RootSaver};
use crate::smt::smt::{generate_history_smt, init_smt, Extension};
use crate::smt::transaction::store_transaction::StoreTransaction;
use cota_smt::common::{Byte32, BytesBuilder};
use cota_smt::molecule::prelude::*;
Expand Down
36 changes: 26 additions & 10 deletions src/smt/smt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ use crate::smt::transaction::store_transaction::StoreTransaction;
use chrono::prelude::*;
use cota_smt::smt::{Blake2bHasher, H256};
use log::debug;
use sparse_merkle_tree::traits::Store;
use sparse_merkle_tree::SparseMerkleTree;

pub type CotaSMT<'a> = SparseMerkleTree<Blake2bHasher, H256, SMTStore<'a>>;

pub trait RootSaver {
pub trait Extension {
fn save_root_and_leaves(&self, leaves: Vec<(H256, H256)>) -> Result<(), Error>;
fn is_non_existent(&self, leaf_key: &H256) -> bool;
}

impl<'a> RootSaver for CotaSMT<'a> {
impl<'a> Extension for CotaSMT<'a> {
fn save_root_and_leaves(&self, leaves: Vec<(H256, H256)>) -> Result<(), Error> {
self.store()
.save_root(self.root())
Expand All @@ -25,6 +27,13 @@ impl<'a> RootSaver for CotaSMT<'a> {
debug!("Save latest smt root: {:?} and leaves", self.root());
Ok(())
}

fn is_non_existent(&self, leaf_key: &H256) -> bool {
if let Ok(result) = self.store().get_leaf(leaf_key) {
return result.is_none();
}
true
}
}

pub fn init_smt<'a>(transaction: &'a StoreTransaction) -> Result<CotaSMT<'a>, Error> {
Expand Down Expand Up @@ -69,14 +78,21 @@ pub fn generate_history_smt<'a>(

fn generate_mysql_smt<'a>(smt: &mut CotaSMT<'a>) -> Result<(), Error> {
let start_time = Local::now().timestamp_millis();
let registered_lock_hashes = get_registered_lock_hashes()?;
if !registered_lock_hashes.is_empty() {
for lock_hash in registered_lock_hashes {
let key: H256 = H256::from(lock_hash);
let value: H256 = H256::from([255u8; 32]);
smt.update(key, value).expect("SMT update leave error");
}
}
let registered_lock_hashes: Vec<H256> = get_registered_lock_hashes()?;
let leaves = if smt.root() == &H256::zero() {
registered_lock_hashes
.into_iter()
.map(|key| (key, H256::from([255u8; 32])))
.collect()
} else {
debug!("Compare rocksdb and mysql to get diff leaves");
registered_lock_hashes
.into_iter()
.filter(|key| smt.is_non_existent(&key))
.map(|key| (key, H256::from([255u8; 32])))
.collect()
};
smt.update_all(leaves).expect("SMT update leave error");
let diff_time = (Local::now().timestamp_millis() - start_time) as f64 / 1000f64;
debug!("Push registry history leaves to smt: {}s", diff_time);
Ok(())
Expand Down

0 comments on commit b255f8f

Please sign in to comment.