From a89b928b2c148b8938a54b6206146b0ffce005bb Mon Sep 17 00:00:00 2001 From: baichuan3 Date: Thu, 12 Sep 2024 11:54:58 +0800 Subject: [PATCH] finish revert indexer --- crates/rooch-indexer/src/actor/indexer.rs | 42 +++++----- crates/rooch-indexer/src/actor/messages.rs | 10 ++- crates/rooch-indexer/src/proxy/mod.rs | 31 ++++++- .../rooch-indexer/src/store/sqlite_store.rs | 4 +- crates/rooch-indexer/src/store/traits.rs | 1 - crates/rooch-types/src/indexer/state.rs | 64 ++++++++++++-- .../src/commands/db/commands/rollback.rs | 84 ++++++++++++++++++- 7 files changed, 196 insertions(+), 40 deletions(-) diff --git a/crates/rooch-indexer/src/actor/indexer.rs b/crates/rooch-indexer/src/actor/indexer.rs index c218f631b2..af015db52c 100644 --- a/crates/rooch-indexer/src/actor/indexer.rs +++ b/crates/rooch-indexer/src/actor/indexer.rs @@ -3,7 +3,7 @@ use crate::actor::messages::{ IndexerApplyObjectStatesMessage, IndexerDeleteAnyObjectStatesMessage, IndexerEventsMessage, - IndexerPersistOrUpdateAnyObjectStatesMessage, IndexerRevertStatesMessage, IndexerStatesMessage, + IndexerPersistOrUpdateAnyObjectStatesMessage, IndexerRevertMessage, IndexerStatesMessage, IndexerTransactionMessage, UpdateIndexerMessage, }; use crate::store::traits::IndexerStoreTrait; @@ -14,7 +14,10 @@ use coerce::actor::{context::ActorContext, message::Handler, Actor}; use moveos_types::moveos_std::object::ObjectMeta; use moveos_types::transaction::MoveAction; use rooch_types::indexer::event::IndexerEvent; -use rooch_types::indexer::state::{handle_object_change, handle_revert_object_change, IndexerObjectStateChangeSet, IndexerObjectStatesIndexGenerator, ObjectStateType}; +use rooch_types::indexer::state::{ + handle_object_change, handle_revert_object_change, IndexerObjectStateChangeSet, + IndexerObjectStatesIndexGenerator, ObjectStateType, +}; use rooch_types::indexer::transaction::IndexerTransaction; pub struct IndexerActor { @@ -230,22 +233,19 @@ impl Handler for IndexerActor { } #[async_trait] -impl Handler for IndexerActor { - async fn handle( - &mut self, - msg: IndexerApplyObjectStatesMessage, - _ctx: &mut ActorContext, - ) -> Result<()> { - let IndexerRevertStatesMessage { +impl Handler for IndexerActor { + async fn handle(&mut self, msg: IndexerRevertMessage, _ctx: &mut ActorContext) -> Result<()> { + let IndexerRevertMessage { revert_tx_order, - revert_ledger_tx, - revert_execution_info, + // revert_ledger_tx, + // revert_execution_info, revert_state_change_set, root, + object_mapping, } = msg; self.root = root; - let tx_order = ledger_transaction.sequence_info.tx_order; + // let tx_order = ledger_transaction.sequence_info.tx_order; // 1. revert indexer transaction self.indexer_store @@ -259,25 +259,27 @@ impl Handler for IndexerActor { let mut state_index_generator = IndexerObjectStatesIndexGenerator::default(); let mut indexer_object_state_change_set = IndexerObjectStateChangeSet::default(); - // set genesis tx_order and state_index_generator for new indexer revert - let tx_order: u64 = 0; - let last_state_index = self - .query_last_state_index_by_tx_order(tx_order, state_type.clone()) - .await?; - let mut state_index_generator = last_state_index.map_or(0, |x| x + 1); + // // set genesis tx_order and state_index_generator for new indexer revert + // let tx_order: u64 = 0; + // let last_state_index = self + // .query_last_state_index_by_tx_order(tx_order, state_type.clone()) + // .await?; + // let mut state_index_generator = last_state_index.map_or(0, |x| x + 1); + // let object_mapping = HashMap::::new(); for (_feild_key, object_change) in revert_state_change_set.state_change_set.changes { let _ = handle_revert_object_change( &mut state_index_generator, - tx_order, + revert_tx_order, &mut indexer_object_state_change_set, object_change, + &object_mapping, )?; } self.indexer_store .apply_object_states(indexer_object_state_change_set)?; - self.indexer_store.revert_states(object_state_change_set)?; + // self.indexer_store.revert_states(object_state_change_set)?; Ok(()) } } diff --git a/crates/rooch-indexer/src/actor/messages.rs b/crates/rooch-indexer/src/actor/messages.rs index beb2dcdf07..677e18fdbe 100644 --- a/crates/rooch-indexer/src/actor/messages.rs +++ b/crates/rooch-indexer/src/actor/messages.rs @@ -16,6 +16,7 @@ use rooch_types::indexer::state::{ use rooch_types::indexer::transaction::{IndexerTransaction, TransactionFilter}; use rooch_types::transaction::LedgerTransaction; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; /// Indexer write Message #[derive(Debug, Clone)] @@ -150,14 +151,15 @@ impl Message for IndexerApplyObjectStatesMessage { } #[derive(Debug, Serialize, Deserialize)] -pub struct IndexerRevertStatesMessage { +pub struct IndexerRevertMessage { pub revert_tx_order: u64, - pub revert_ledger_tx: LedgerTransaction, - pub revert_execution_info: TransactionExecutionInfo, + // pub revert_ledger_tx: LedgerTransaction, + // pub revert_execution_info: TransactionExecutionInfo, pub revert_state_change_set: StateChangeSetExt, pub root: ObjectMeta, + pub object_mapping: HashMap, } -impl Message for IndexerRevertStatesMessage { +impl Message for IndexerRevertMessage { type Result = Result<()>; } diff --git a/crates/rooch-indexer/src/proxy/mod.rs b/crates/rooch-indexer/src/proxy/mod.rs index f157d1a71b..084ffd032b 100644 --- a/crates/rooch-indexer/src/proxy/mod.rs +++ b/crates/rooch-indexer/src/proxy/mod.rs @@ -4,9 +4,9 @@ use crate::actor::indexer::IndexerActor; use crate::actor::messages::{ IndexerApplyObjectStatesMessage, IndexerDeleteAnyObjectStatesMessage, IndexerEventsMessage, - IndexerPersistOrUpdateAnyObjectStatesMessage, IndexerStatesMessage, IndexerTransactionMessage, - QueryIndexerEventsMessage, QueryIndexerObjectIdsMessage, QueryIndexerTransactionsMessage, - QueryLastStateIndexByTxOrderMessage, UpdateIndexerMessage, + IndexerPersistOrUpdateAnyObjectStatesMessage, IndexerRevertMessage, IndexerStatesMessage, + IndexerTransactionMessage, QueryIndexerEventsMessage, QueryIndexerObjectIdsMessage, + QueryIndexerTransactionsMessage, QueryLastStateIndexByTxOrderMessage, UpdateIndexerMessage, }; use crate::actor::reader_indexer::IndexerReaderActor; use anyhow::{Ok, Result}; @@ -14,7 +14,7 @@ use coerce::actor::ActorRef; use moveos_types::moveos_std::event::Event; use moveos_types::moveos_std::object::{ObjectID, ObjectMeta}; use moveos_types::moveos_std::tx_context::TxContext; -use moveos_types::state::StateChangeSet; +use moveos_types::state::{StateChangeSet, StateChangeSetExt}; use moveos_types::transaction::{MoveAction, TransactionExecutionInfo, VerifiedMoveOSTransaction}; use rooch_types::indexer::event::{EventFilter, IndexerEvent, IndexerEventID}; use rooch_types::indexer::state::{ @@ -23,6 +23,7 @@ use rooch_types::indexer::state::{ }; use rooch_types::indexer::transaction::{IndexerTransaction, TransactionFilter}; use rooch_types::transaction::LedgerTransaction; +use std::collections::HashMap; #[derive(Clone)] pub struct IndexerProxy { @@ -209,4 +210,26 @@ impl IndexerProxy { }) .await? } + + pub async fn revert_indexer( + &self, + revert_tx_order: u64, + // revert_ledger_tx: LedgerTransaction, + // revert_execution_info: TransactionExecutionInfo, + revert_state_change_set: StateChangeSetExt, + root: ObjectMeta, + object_mapping: HashMap, + ) -> Result<()> { + self.actor + .notify(IndexerRevertMessage { + revert_tx_order, + // revert_ledger_tx, + // revert_execution_info, + revert_state_change_set, + root, + object_mapping, + }) + .await?; + Ok(()) + } } diff --git a/crates/rooch-indexer/src/store/sqlite_store.rs b/crates/rooch-indexer/src/store/sqlite_store.rs index a5939afb8f..836e6e0efb 100644 --- a/crates/rooch-indexer/src/store/sqlite_store.rs +++ b/crates/rooch-indexer/src/store/sqlite_store.rs @@ -360,7 +360,7 @@ impl SqliteIndexerStore { .start_timer(); let mut connection = get_sqlite_pool_connection(&self.connection_pool)?; - let tx_orders = tx_orders.into_iter().map(|v| v as i64).collect(); + let tx_orders: Vec<_> = tx_orders.into_iter().map(|v| v as i64).collect(); diesel::delete( transactions::table.filter(transactions::tx_order.eq_any(tx_orders.as_slice())), ) @@ -414,7 +414,7 @@ impl SqliteIndexerStore { .start_timer(); let mut connection = get_sqlite_pool_connection(&self.connection_pool)?; - let tx_orders = tx_orders.into_iter().map(|v| v as i64).collect(); + let tx_orders: Vec<_> = tx_orders.into_iter().map(|v| v as i64).collect(); diesel::delete(events::table.filter(events::tx_order.eq_any(tx_orders.as_slice()))) .execute(&mut connection) .map_err(|e| IndexerError::SQLiteWriteError(e.to_string())) diff --git a/crates/rooch-indexer/src/store/traits.rs b/crates/rooch-indexer/src/store/traits.rs index 33b95c8bc5..b6b908a0d8 100644 --- a/crates/rooch-indexer/src/store/traits.rs +++ b/crates/rooch-indexer/src/store/traits.rs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 use crate::errors::IndexerError; -use crate::{INDEXER_EVENTS_TABLE_NAME, INDEXER_TRANSACTIONS_TABLE_NAME}; use rooch_types::indexer::event::IndexerEvent; use rooch_types::indexer::state::{IndexerObjectState, IndexerObjectStateChangeSet}; use rooch_types::indexer::transaction::IndexerTransaction; diff --git a/crates/rooch-types/src/indexer/state.rs b/crates/rooch-types/src/indexer/state.rs index d858af4780..5992be4a6e 100644 --- a/crates/rooch-types/src/indexer/state.rs +++ b/crates/rooch-types/src/indexer/state.rs @@ -14,6 +14,7 @@ use moveos_types::state::{MoveStructType, MoveType, ObjectChange, StateChangeSet use once_cell::sync::Lazy; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; pub static UTXO_TYPE_TAG: Lazy = Lazy::new(UTXO::type_tag); @@ -194,9 +195,11 @@ pub fn handle_object_change( pub fn handle_revert_object_change( state_index_generator: &mut IndexerObjectStatesIndexGenerator, + // revert_state_index_generator: &mut IndexerObjectStatesIndexGenerator, tx_order: u64, indexer_object_state_change_set: &mut IndexerObjectStateChangeSet, object_change: ObjectChange, + object_mapping: &HashMap, ) -> Result<()> { let ObjectChange { metadata, @@ -214,14 +217,29 @@ pub fn handle_revert_object_change( if let Some(op) = value { match op { Op::Modify(_value) => { - let state = IndexerObjectState::new(metadata.clone(), tx_order, state_index); - indexer_object_state_change_set.update_object_states(state); + // Keep the tx_order and state index consistent before reverting + if let Some(previous_object_meta) = object_mapping.get(&object_id) { + let state = IndexerObjectState::new( + previous_object_meta.clone(), + tx_order, + state_index, + ); + indexer_object_state_change_set.update_object_states(state); + } + // let state = IndexerObjectState::new(metadata.clone(), tx_order, state_index); + // indexer_object_state_change_set.update_object_states(state); } Op::Delete => { // indexer_object_state_change_set.remove_object_states(object_id, &object_type); - - let state = IndexerObjectState::new(metadata.clone(), tx_order, state_index); - indexer_object_state_change_set.new_object_states(state); + // Use the reverted tx_order and state index as the deleted restored tx_order and tx_order + if let Some(previous_object_meta) = object_mapping.get(&object_id) { + let state = IndexerObjectState::new( + previous_object_meta.clone(), + tx_order, + state_index, + ); + indexer_object_state_change_set.new_object_states(state); + } } Op::New(_value) => { // let state = IndexerObjectState::new(metadata.clone(), tx_order, state_index); @@ -243,11 +261,47 @@ pub fn handle_revert_object_change( tx_order, indexer_object_state_change_set, change, + object_mapping, )?; } Ok(()) } +pub fn collect_revert_object_change_ids( + object_change: ObjectChange, + object_ids: &mut Vec, +) -> Result<()> { + let ObjectChange { + metadata, + value, + fields, + } = object_change; + let object_id = metadata.id.clone(); + let object_type = metadata.object_type.clone(); + + // Do not index dynamic field object + if is_dynamic_field_type(&object_type) { + return Ok(()); + } + if let Some(op) = value { + match op { + Op::Modify(_value) => { + object_ids.push(object_id); + } + Op::Delete => { + object_ids.push(object_id); + } + Op::New(_value) => {} + } + } else { + } + + for (_key, change) in fields { + collect_revert_object_change_ids(change, object_ids)?; + } + Ok(()) +} + #[derive( Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize, Deserialize, JsonSchema, Default, )] diff --git a/crates/rooch/src/commands/db/commands/rollback.rs b/crates/rooch/src/commands/db/commands/rollback.rs index af447aadce..353b8c714a 100644 --- a/crates/rooch/src/commands/db/commands/rollback.rs +++ b/crates/rooch/src/commands/db/commands/rollback.rs @@ -1,22 +1,28 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 -use anyhow::Error; +use anyhow::{anyhow, Error}; use clap::Parser; use metrics::RegistryService; use moveos_store::config_store::ConfigStore; use moveos_store::transaction_store::TransactionStore as TxExecutionInfoStore; -use moveos_store::MoveOSStore; +use moveos_types::access_path::AccessPath; use moveos_types::moveos_std::object::ObjectMeta; use moveos_types::startup_info; +use moveos_types::state_resolver::{RootObjectResolver, StateReader}; use rooch_config::{RoochOpt, R_OPT_NET_HELP}; use rooch_db::RoochDB; use rooch_genesis::RoochGenesis; +use rooch_indexer::store::traits::IndexerStoreTrait; use rooch_store::state_store::StateStore; -use rooch_store::RoochStore; use rooch_types::error::{RoochError, RoochResult}; +use rooch_types::indexer::state::{ + collect_revert_object_change_ids, handle_revert_object_change, IndexerObjectStateChangeSet, + IndexerObjectStatesIndexGenerator, +}; use rooch_types::rooch_network::RoochChainID; use rooch_types::sequencer::SequencerInfo; +use std::collections::HashMap; use std::path::PathBuf; use std::time::SystemTime; @@ -152,9 +158,79 @@ impl RollbackCommand { } // revert the indexer - if state_change_set_ext_opt.is_some() { + let previous_tx_order = if tx_order > 0 { tx_order - 1 } else { 0 }; + let previous_state_change_set_ext_opt = rooch_db + .rooch_store + .get_state_change_set(previous_tx_order)?; + if previous_state_change_set_ext_opt.is_some() && state_change_set_ext_opt.is_some() { + // let previoud_state_root = previous_state_change_set_ext_opt.unwrap().state_change_set.state_root; + let previous_state_change_set_ext = previous_state_change_set_ext_opt.unwrap(); let state_change_set_ext = state_change_set_ext_opt.unwrap(); + + let mut object_ids = vec![]; + for (_feild_key, object_change) in + state_change_set_ext.state_change_set.changes.clone() + { + let _ = collect_revert_object_change_ids(object_change, &mut object_ids)?; + } + + let root = ObjectMeta::root_metadata( + previous_state_change_set_ext.state_change_set.state_root, + previous_state_change_set_ext.state_change_set.global_size, + ); + let resolver = RootObjectResolver::new(root, &rooch_db.moveos_store); + let object_mapping = resolver + .get_states(AccessPath::objects(object_ids))? + .into_iter() + .flatten() + .map(|v| (v.metadata.id.clone(), v.metadata)) + .collect::>(); + // let + // rooch_db.indexer_store. + + // 1. revert indexer transaction + rooch_db + .indexer_store + .delete_transactions(vec![tx_order]) + .map_err(|e| anyhow!(format!("Revert indexer transactions error: {:?}", e)))?; + + // 2. revert indexer event + rooch_db + .indexer_store + .delete_events(vec![tx_order]) + .map_err(|e| anyhow!(format!("Revert indexer events error: {:?}", e)))?; + + // 3. revert indexer full object state, including object_states, utxos and inscriptions + // indexer object state index generator + let mut state_index_generator = IndexerObjectStatesIndexGenerator::default(); + let mut indexer_object_state_change_set = IndexerObjectStateChangeSet::default(); + + // // set genesis tx_order and state_index_generator for new indexer revert + // let tx_order: u64 = 0; + // let last_state_index = self + // .query_last_state_index_by_tx_order(tx_order, state_type.clone()) + // .await?; + // let mut state_index_generator = last_state_index.map_or(0, |x| x + 1); + + // let object_mapping = HashMap::::new(); + for (_feild_key, object_change) in state_change_set_ext.state_change_set.changes { + let _ = handle_revert_object_change( + &mut state_index_generator, + tx_order, + &mut indexer_object_state_change_set, + object_change, + &object_mapping, + )?; + } + rooch_db + .indexer_store + .apply_object_states(indexer_object_state_change_set) + .map_err(|e| anyhow!(format!("Revert indexer states error: {:?}", e)))?; } + println!( + "revert indexer succ, tx_hash: {:?}, tx_order {}", + tx_hash, tx_order + ); } let rollback_sequencer_info = SequencerInfo {