Skip to content

Commit

Permalink
Merge pull request #31 from nervina-labs/develop
Browse files Browse the repository at this point in the history
Release v0.4.2
  • Loading branch information
duanyytop authored May 9, 2022
2 parents a29ca3e + 2783eba commit c0402e5
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 27 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "cota-registry-aggregator"
version = "0.4.1"
version = "0.4.2"
edition = "2018"

[dependencies]
Expand Down
8 changes: 6 additions & 2 deletions src/indexer/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const MAINNET_REGISTRY_COTA_CODE_HASH: &str =
"0x90ca618be6c15f5857d3cbd09f9f24ca6770af047ba9ee70989ec3b229419ac7";
const MAINNET_REGISTRY_COTA_ARGS: &str = "0x563631b49cee549f3585ab4dde5f9d590f507f1f";

pub async fn get_registry_smt_root() -> Result<Option<Vec<u8>>, Error> {
pub async fn get_registry_smt_root() -> Result<Option<[u8; 32]>, Error> {
let ckb_indexer_url = env::var("CKB_INDEXER")
.map_err(|_e| Error::CKBIndexerError("CKB_INDEXER must be set".to_owned()))?;

Expand Down Expand Up @@ -50,7 +50,11 @@ pub async fn get_registry_smt_root() -> Result<Option<Vec<u8>>, Error> {
let cell_data = result.objects.first().unwrap().output_data.as_bytes();
match cell_data.len() {
1 => Ok(None),
33 => Ok(Some(cell_data[1..].to_vec())),
33 => {
let mut ret = [0u8; 32];
ret.copy_from_slice(&cell_data[1..]);
Ok(Some(ret))
}
_ => Err(Error::CKBIndexerError(
"Registry cell data length error".to_owned(),
)),
Expand Down
50 changes: 39 additions & 11 deletions src/smt/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::db::check_lock_hashes_registered;
use crate::error::Error;
use crate::indexer::index::get_registry_smt_root;
use crate::smt::db::db::RocksDB;
use crate::smt::smt::{generate_history_smt, RootSaver};
use crate::smt::smt::{generate_history_smt, init_smt, RootSaver};
use crate::smt::transaction::store_transaction::StoreTransaction;
use cota_smt::common::{Byte32, BytesBuilder};
use cota_smt::molecule::prelude::*;
Expand All @@ -12,11 +12,12 @@ use cota_smt::registry::{
use cota_smt::smt::H256;
use lazy_static::lazy_static;
use log::info;
use parking_lot::Mutex;
use parking_lot::{Condvar, Mutex};
use std::sync::Arc;

lazy_static! {
static ref SMT_LOCK: Arc<Mutex<()>> = Arc::new(Mutex::new(()));
static ref SMT_LOCK: Arc<(Mutex<bool>, Condvar)> =
Arc::new((Mutex::new(false), Condvar::new()));
}

pub async fn generate_registry_smt(
Expand All @@ -37,15 +38,16 @@ pub async fn generate_registry_smt(
}

let smt_root_opt = get_registry_smt_root().await?;

let lock = SMT_LOCK.lock();
let transaction = StoreTransaction::new(db.transaction());
let mut smt = generate_history_smt(&transaction, smt_root_opt)?;
smt.update_all(update_leaves.clone())
.expect("SMT update leave error");
smt.save_root_and_leaves(previous_leaves)?;
transaction.commit()?;
drop(lock);
let mut smt = init_smt(&transaction)?;

with_lock(|| {
generate_history_smt(&mut smt, smt_root_opt)?;
smt.update_all(update_leaves.clone())
.map_err(|e| Error::SMTError(e.to_string()))?;
smt.save_root_and_leaves(previous_leaves.clone())?;
transaction.commit()
})?;

let root_hash = hex::encode(smt.root().as_slice());
info!("registry_smt_root_hash: {:?}", root_hash);
Expand Down Expand Up @@ -85,3 +87,29 @@ pub async fn generate_registry_smt(

Ok((root_hash, registry_entry))
}

fn with_lock<F>(mut operator: F) -> Result<(), Error>
where
F: FnMut() -> Result<(), Error>,
{
let &(ref lock, ref cond) = &*Arc::clone(&SMT_LOCK);
{
let mut pending = lock.lock();
while *pending {
cond.wait(&mut pending);
}
*pending = true;
}
let unlock = || {
let mut pending = lock.lock();
*pending = false;
cond.notify_all();
};

operator().map_err(|err| {
unlock();
err
})?;
unlock();
Ok(())
}
27 changes: 15 additions & 12 deletions src/smt/smt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ impl<'a> RootSaver for CotaSMT<'a> {
}
}

pub fn generate_history_smt<'a>(
transaction: &'a StoreTransaction,
smt_root_opt: Option<Vec<u8>>,
) -> Result<CotaSMT<'a>, Error> {
pub fn init_smt<'a>(transaction: &'a StoreTransaction) -> Result<CotaSMT<'a>, Error> {
let smt_store = SMTStore::new(
COLUMN_SMT_LEAF,
COLUMN_SMT_BRANCH,
Expand All @@ -43,28 +40,34 @@ pub fn generate_history_smt<'a>(
.map_err(|_e| Error::SMTError("Get smt root".to_string()))?
.unwrap_or_default();
debug!("rocksdb smt root: {:?}", root,);
let mut smt: CotaSMT = CotaSMT::new(root, smt_store);
Ok(CotaSMT::new(root, smt_store))
}

pub fn generate_history_smt<'a>(
smt: &mut CotaSMT<'a>,
smt_root_opt: Option<[u8; 32]>,
) -> Result<(), Error> {
let root = *smt.root();
if root == H256::zero() {
return generate_mysql_smt(smt);
}
debug!("registry cell smt root: {:?}", smt_root_opt,);
if let Some(smt_root) = smt_root_opt {
if smt_root.as_slice() == root.as_slice() {
debug!("The smt leaves and root in rocksdb are right");
return Ok(smt);
return Ok(());
} else {
smt = reset_smt_temp_leaves(smt)?;
reset_smt_temp_leaves(smt)?;
if smt_root.as_slice() == smt.root().as_slice() {
debug!("The smt leaves and root in rocksdb are right after reset");
return Ok(smt);
return Ok(());
}
}
}
generate_mysql_smt(smt)
}

fn generate_mysql_smt<'a>(mut smt: CotaSMT<'a>) -> Result<CotaSMT<'a>, Error> {
fn generate_mysql_smt<'a>(smt: &mut CotaSMT<'a>) -> Result<(), Error> {
let start_time = Local::now().timestamp_millis();
let registered_lock_hashes = get_registered_lock_hashes()?;
if !registered_lock_hashes.is_empty() {
Expand All @@ -76,15 +79,15 @@ fn generate_mysql_smt<'a>(mut smt: CotaSMT<'a>) -> Result<CotaSMT<'a>, Error> {
}
let diff_time = (Local::now().timestamp_millis() - start_time) as f64 / 1000f64;
debug!("Push registry history leaves to smt: {}s", diff_time);
Ok(smt)
Ok(())
}

fn reset_smt_temp_leaves<'a>(mut smt: CotaSMT<'a>) -> Result<CotaSMT<'a>, Error> {
fn reset_smt_temp_leaves<'a>(smt: &mut CotaSMT<'a>) -> Result<(), Error> {
let leaves_opt = smt.store().get_leaves()?;
if let Some(leaves) = leaves_opt {
smt.update_all(leaves)
.expect("SMT update temp leaves error");
}
debug!("Reset temp leaves successfully");
Ok(smt)
Ok(())
}

0 comments on commit c0402e5

Please sign in to comment.