diff --git a/Cargo.lock b/Cargo.lock index 5acee8c..e56f610 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -532,7 +532,7 @@ checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" [[package]] name = "cota-registry-aggregator" -version = "0.4.1" +version = "0.4.2" dependencies = [ "chrono", "ckb-jsonrpc-types", diff --git a/Cargo.toml b/Cargo.toml index 86adfc3..983e8dd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cota-registry-aggregator" -version = "0.4.1" +version = "0.4.2" edition = "2018" [dependencies] diff --git a/src/indexer/index.rs b/src/indexer/index.rs index 66f2bc5..0cd8eae 100644 --- a/src/indexer/index.rs +++ b/src/indexer/index.rs @@ -12,7 +12,7 @@ const MAINNET_REGISTRY_COTA_CODE_HASH: &str = "0x90ca618be6c15f5857d3cbd09f9f24ca6770af047ba9ee70989ec3b229419ac7"; const MAINNET_REGISTRY_COTA_ARGS: &str = "0x563631b49cee549f3585ab4dde5f9d590f507f1f"; -pub async fn get_registry_smt_root() -> Result>, Error> { +pub async fn get_registry_smt_root() -> Result, Error> { let ckb_indexer_url = env::var("CKB_INDEXER") .map_err(|_e| Error::CKBIndexerError("CKB_INDEXER must be set".to_owned()))?; @@ -50,7 +50,11 @@ pub async fn get_registry_smt_root() -> Result>, Error> { let cell_data = result.objects.first().unwrap().output_data.as_bytes(); match cell_data.len() { 1 => Ok(None), - 33 => Ok(Some(cell_data[1..].to_vec())), + 33 => { + let mut ret = [0u8; 32]; + ret.copy_from_slice(&cell_data[1..]); + Ok(Some(ret)) + } _ => Err(Error::CKBIndexerError( "Registry cell data length error".to_owned(), )), diff --git a/src/smt/entry.rs b/src/smt/entry.rs index b59e665..f329544 100644 --- a/src/smt/entry.rs +++ b/src/smt/entry.rs @@ -2,7 +2,7 @@ use crate::db::check_lock_hashes_registered; use crate::error::Error; use crate::indexer::index::get_registry_smt_root; use crate::smt::db::db::RocksDB; -use crate::smt::smt::{generate_history_smt, RootSaver}; +use crate::smt::smt::{generate_history_smt, init_smt, RootSaver}; use crate::smt::transaction::store_transaction::StoreTransaction; use cota_smt::common::{Byte32, BytesBuilder}; use cota_smt::molecule::prelude::*; @@ -12,11 +12,12 @@ use cota_smt::registry::{ use cota_smt::smt::H256; use lazy_static::lazy_static; use log::info; -use parking_lot::Mutex; +use parking_lot::{Condvar, Mutex}; use std::sync::Arc; lazy_static! { - static ref SMT_LOCK: Arc> = Arc::new(Mutex::new(())); + static ref SMT_LOCK: Arc<(Mutex, Condvar)> = + Arc::new((Mutex::new(false), Condvar::new())); } pub async fn generate_registry_smt( @@ -37,15 +38,16 @@ pub async fn generate_registry_smt( } let smt_root_opt = get_registry_smt_root().await?; - - let lock = SMT_LOCK.lock(); let transaction = StoreTransaction::new(db.transaction()); - let mut smt = generate_history_smt(&transaction, smt_root_opt)?; - smt.update_all(update_leaves.clone()) - .expect("SMT update leave error"); - smt.save_root_and_leaves(previous_leaves)?; - transaction.commit()?; - drop(lock); + let mut smt = init_smt(&transaction)?; + + with_lock(|| { + generate_history_smt(&mut smt, smt_root_opt)?; + smt.update_all(update_leaves.clone()) + .map_err(|e| Error::SMTError(e.to_string()))?; + smt.save_root_and_leaves(previous_leaves.clone())?; + transaction.commit() + })?; let root_hash = hex::encode(smt.root().as_slice()); info!("registry_smt_root_hash: {:?}", root_hash); @@ -85,3 +87,29 @@ pub async fn generate_registry_smt( Ok((root_hash, registry_entry)) } + +fn with_lock(mut operator: F) -> Result<(), Error> +where + F: FnMut() -> Result<(), Error>, +{ + let &(ref lock, ref cond) = &*Arc::clone(&SMT_LOCK); + { + let mut pending = lock.lock(); + while *pending { + cond.wait(&mut pending); + } + *pending = true; + } + let unlock = || { + let mut pending = lock.lock(); + *pending = false; + cond.notify_all(); + }; + + operator().map_err(|err| { + unlock(); + err + })?; + unlock(); + Ok(()) +} diff --git a/src/smt/smt.rs b/src/smt/smt.rs index d952b76..f29f16b 100644 --- a/src/smt/smt.rs +++ b/src/smt/smt.rs @@ -27,10 +27,7 @@ impl<'a> RootSaver for CotaSMT<'a> { } } -pub fn generate_history_smt<'a>( - transaction: &'a StoreTransaction, - smt_root_opt: Option>, -) -> Result, Error> { +pub fn init_smt<'a>(transaction: &'a StoreTransaction) -> Result, Error> { let smt_store = SMTStore::new( COLUMN_SMT_LEAF, COLUMN_SMT_BRANCH, @@ -43,8 +40,14 @@ pub fn generate_history_smt<'a>( .map_err(|_e| Error::SMTError("Get smt root".to_string()))? .unwrap_or_default(); debug!("rocksdb smt root: {:?}", root,); - let mut smt: CotaSMT = CotaSMT::new(root, smt_store); + Ok(CotaSMT::new(root, smt_store)) +} +pub fn generate_history_smt<'a>( + smt: &mut CotaSMT<'a>, + smt_root_opt: Option<[u8; 32]>, +) -> Result<(), Error> { + let root = *smt.root(); if root == H256::zero() { return generate_mysql_smt(smt); } @@ -52,19 +55,19 @@ pub fn generate_history_smt<'a>( if let Some(smt_root) = smt_root_opt { if smt_root.as_slice() == root.as_slice() { debug!("The smt leaves and root in rocksdb are right"); - return Ok(smt); + return Ok(()); } else { - smt = reset_smt_temp_leaves(smt)?; + reset_smt_temp_leaves(smt)?; if smt_root.as_slice() == smt.root().as_slice() { debug!("The smt leaves and root in rocksdb are right after reset"); - return Ok(smt); + return Ok(()); } } } generate_mysql_smt(smt) } -fn generate_mysql_smt<'a>(mut smt: CotaSMT<'a>) -> Result, Error> { +fn generate_mysql_smt<'a>(smt: &mut CotaSMT<'a>) -> Result<(), Error> { let start_time = Local::now().timestamp_millis(); let registered_lock_hashes = get_registered_lock_hashes()?; if !registered_lock_hashes.is_empty() { @@ -76,15 +79,15 @@ fn generate_mysql_smt<'a>(mut smt: CotaSMT<'a>) -> Result, Error> { } let diff_time = (Local::now().timestamp_millis() - start_time) as f64 / 1000f64; debug!("Push registry history leaves to smt: {}s", diff_time); - Ok(smt) + Ok(()) } -fn reset_smt_temp_leaves<'a>(mut smt: CotaSMT<'a>) -> Result, Error> { +fn reset_smt_temp_leaves<'a>(smt: &mut CotaSMT<'a>) -> Result<(), Error> { let leaves_opt = smt.store().get_leaves()?; if let Some(leaves) = leaves_opt { smt.update_all(leaves) .expect("SMT update temp leaves error"); } debug!("Reset temp leaves successfully"); - Ok(smt) + Ok(()) }