Skip to content

Commit

Permalink
finish revert indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
baichuan3 committed Sep 12, 2024
1 parent ac83d75 commit a89b928
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 40 deletions.
42 changes: 22 additions & 20 deletions crates/rooch-indexer/src/actor/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::actor::messages::{
IndexerApplyObjectStatesMessage, IndexerDeleteAnyObjectStatesMessage, IndexerEventsMessage,
IndexerPersistOrUpdateAnyObjectStatesMessage, IndexerRevertStatesMessage, IndexerStatesMessage,
IndexerPersistOrUpdateAnyObjectStatesMessage, IndexerRevertMessage, IndexerStatesMessage,
IndexerTransactionMessage, UpdateIndexerMessage,
};
use crate::store::traits::IndexerStoreTrait;
Expand All @@ -14,7 +14,10 @@ use coerce::actor::{context::ActorContext, message::Handler, Actor};
use moveos_types::moveos_std::object::ObjectMeta;
use moveos_types::transaction::MoveAction;
use rooch_types::indexer::event::IndexerEvent;
use rooch_types::indexer::state::{handle_object_change, handle_revert_object_change, IndexerObjectStateChangeSet, IndexerObjectStatesIndexGenerator, ObjectStateType};
use rooch_types::indexer::state::{
handle_object_change, handle_revert_object_change, IndexerObjectStateChangeSet,
IndexerObjectStatesIndexGenerator, ObjectStateType,
};
use rooch_types::indexer::transaction::IndexerTransaction;

pub struct IndexerActor {
Expand Down Expand Up @@ -230,22 +233,19 @@ impl Handler<IndexerApplyObjectStatesMessage> for IndexerActor {
}

#[async_trait]
impl Handler<IndexerRevertStatesMessage> for IndexerActor {
async fn handle(
&mut self,
msg: IndexerApplyObjectStatesMessage,
_ctx: &mut ActorContext,
) -> Result<()> {
let IndexerRevertStatesMessage {
impl Handler<IndexerRevertMessage> for IndexerActor {
async fn handle(&mut self, msg: IndexerRevertMessage, _ctx: &mut ActorContext) -> Result<()> {
let IndexerRevertMessage {
revert_tx_order,
revert_ledger_tx,
revert_execution_info,
// revert_ledger_tx,
// revert_execution_info,
revert_state_change_set,
root,
object_mapping,
} = msg;

self.root = root;
let tx_order = ledger_transaction.sequence_info.tx_order;
// let tx_order = ledger_transaction.sequence_info.tx_order;

// 1. revert indexer transaction
self.indexer_store
Expand All @@ -259,25 +259,27 @@ impl Handler<IndexerRevertStatesMessage> for IndexerActor {
let mut state_index_generator = IndexerObjectStatesIndexGenerator::default();
let mut indexer_object_state_change_set = IndexerObjectStateChangeSet::default();

// set genesis tx_order and state_index_generator for new indexer revert
let tx_order: u64 = 0;
let last_state_index = self
.query_last_state_index_by_tx_order(tx_order, state_type.clone())
.await?;
let mut state_index_generator = last_state_index.map_or(0, |x| x + 1);
// // set genesis tx_order and state_index_generator for new indexer revert
// let tx_order: u64 = 0;
// let last_state_index = self
// .query_last_state_index_by_tx_order(tx_order, state_type.clone())
// .await?;
// let mut state_index_generator = last_state_index.map_or(0, |x| x + 1);

// let object_mapping = HashMap::<ObjectID, ObjectMeta>::new();
for (_feild_key, object_change) in revert_state_change_set.state_change_set.changes {
let _ = handle_revert_object_change(
&mut state_index_generator,
tx_order,
revert_tx_order,
&mut indexer_object_state_change_set,
object_change,
&object_mapping,
)?;
}
self.indexer_store
.apply_object_states(indexer_object_state_change_set)?;

self.indexer_store.revert_states(object_state_change_set)?;
// self.indexer_store.revert_states(object_state_change_set)?;
Ok(())
}
}
10 changes: 6 additions & 4 deletions crates/rooch-indexer/src/actor/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use rooch_types::indexer::state::{
use rooch_types::indexer::transaction::{IndexerTransaction, TransactionFilter};
use rooch_types::transaction::LedgerTransaction;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

/// Indexer write Message
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -150,14 +151,15 @@ impl Message for IndexerApplyObjectStatesMessage {
}

#[derive(Debug, Serialize, Deserialize)]
pub struct IndexerRevertStatesMessage {
pub struct IndexerRevertMessage {
pub revert_tx_order: u64,
pub revert_ledger_tx: LedgerTransaction,
pub revert_execution_info: TransactionExecutionInfo,
// pub revert_ledger_tx: LedgerTransaction,
// pub revert_execution_info: TransactionExecutionInfo,
pub revert_state_change_set: StateChangeSetExt,
pub root: ObjectMeta,
pub object_mapping: HashMap<ObjectID, ObjectMeta>,
}

impl Message for IndexerRevertStatesMessage {
impl Message for IndexerRevertMessage {
type Result = Result<()>;
}
31 changes: 27 additions & 4 deletions crates/rooch-indexer/src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@
use crate::actor::indexer::IndexerActor;
use crate::actor::messages::{
IndexerApplyObjectStatesMessage, IndexerDeleteAnyObjectStatesMessage, IndexerEventsMessage,
IndexerPersistOrUpdateAnyObjectStatesMessage, IndexerStatesMessage, IndexerTransactionMessage,
QueryIndexerEventsMessage, QueryIndexerObjectIdsMessage, QueryIndexerTransactionsMessage,
QueryLastStateIndexByTxOrderMessage, UpdateIndexerMessage,
IndexerPersistOrUpdateAnyObjectStatesMessage, IndexerRevertMessage, IndexerStatesMessage,
IndexerTransactionMessage, QueryIndexerEventsMessage, QueryIndexerObjectIdsMessage,
QueryIndexerTransactionsMessage, QueryLastStateIndexByTxOrderMessage, UpdateIndexerMessage,
};
use crate::actor::reader_indexer::IndexerReaderActor;
use anyhow::{Ok, Result};
use coerce::actor::ActorRef;
use moveos_types::moveos_std::event::Event;
use moveos_types::moveos_std::object::{ObjectID, ObjectMeta};
use moveos_types::moveos_std::tx_context::TxContext;
use moveos_types::state::StateChangeSet;
use moveos_types::state::{StateChangeSet, StateChangeSetExt};
use moveos_types::transaction::{MoveAction, TransactionExecutionInfo, VerifiedMoveOSTransaction};
use rooch_types::indexer::event::{EventFilter, IndexerEvent, IndexerEventID};
use rooch_types::indexer::state::{
Expand All @@ -23,6 +23,7 @@ use rooch_types::indexer::state::{
};
use rooch_types::indexer::transaction::{IndexerTransaction, TransactionFilter};
use rooch_types::transaction::LedgerTransaction;
use std::collections::HashMap;

#[derive(Clone)]
pub struct IndexerProxy {
Expand Down Expand Up @@ -209,4 +210,26 @@ impl IndexerProxy {
})
.await?
}

pub async fn revert_indexer(
&self,
revert_tx_order: u64,
// revert_ledger_tx: LedgerTransaction,
// revert_execution_info: TransactionExecutionInfo,
revert_state_change_set: StateChangeSetExt,
root: ObjectMeta,
object_mapping: HashMap<ObjectID, ObjectMeta>,
) -> Result<()> {
self.actor
.notify(IndexerRevertMessage {
revert_tx_order,
// revert_ledger_tx,
// revert_execution_info,
revert_state_change_set,
root,
object_mapping,
})
.await?;
Ok(())
}
}
4 changes: 2 additions & 2 deletions crates/rooch-indexer/src/store/sqlite_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ impl SqliteIndexerStore {
.start_timer();
let mut connection = get_sqlite_pool_connection(&self.connection_pool)?;

let tx_orders = tx_orders.into_iter().map(|v| v as i64).collect();
let tx_orders: Vec<_> = tx_orders.into_iter().map(|v| v as i64).collect();
diesel::delete(
transactions::table.filter(transactions::tx_order.eq_any(tx_orders.as_slice())),
)
Expand Down Expand Up @@ -414,7 +414,7 @@ impl SqliteIndexerStore {
.start_timer();
let mut connection = get_sqlite_pool_connection(&self.connection_pool)?;

let tx_orders = tx_orders.into_iter().map(|v| v as i64).collect();
let tx_orders: Vec<_> = tx_orders.into_iter().map(|v| v as i64).collect();
diesel::delete(events::table.filter(events::tx_order.eq_any(tx_orders.as_slice())))
.execute(&mut connection)
.map_err(|e| IndexerError::SQLiteWriteError(e.to_string()))
Expand Down
1 change: 0 additions & 1 deletion crates/rooch-indexer/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// SPDX-License-Identifier: Apache-2.0

use crate::errors::IndexerError;
use crate::{INDEXER_EVENTS_TABLE_NAME, INDEXER_TRANSACTIONS_TABLE_NAME};
use rooch_types::indexer::event::IndexerEvent;
use rooch_types::indexer::state::{IndexerObjectState, IndexerObjectStateChangeSet};
use rooch_types::indexer::transaction::IndexerTransaction;
Expand Down
64 changes: 59 additions & 5 deletions crates/rooch-types/src/indexer/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use moveos_types::state::{MoveStructType, MoveType, ObjectChange, StateChangeSet
use once_cell::sync::Lazy;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

pub static UTXO_TYPE_TAG: Lazy<TypeTag> = Lazy::new(UTXO::type_tag);

Expand Down Expand Up @@ -194,9 +195,11 @@ pub fn handle_object_change(

pub fn handle_revert_object_change(
state_index_generator: &mut IndexerObjectStatesIndexGenerator,
// revert_state_index_generator: &mut IndexerObjectStatesIndexGenerator,
tx_order: u64,
indexer_object_state_change_set: &mut IndexerObjectStateChangeSet,
object_change: ObjectChange,
object_mapping: &HashMap<ObjectID, ObjectMeta>,
) -> Result<()> {
let ObjectChange {
metadata,
Expand All @@ -214,14 +217,29 @@ pub fn handle_revert_object_change(
if let Some(op) = value {
match op {
Op::Modify(_value) => {
let state = IndexerObjectState::new(metadata.clone(), tx_order, state_index);
indexer_object_state_change_set.update_object_states(state);
// Keep the tx_order and state index consistent before reverting
if let Some(previous_object_meta) = object_mapping.get(&object_id) {
let state = IndexerObjectState::new(
previous_object_meta.clone(),
tx_order,
state_index,
);
indexer_object_state_change_set.update_object_states(state);
}
// let state = IndexerObjectState::new(metadata.clone(), tx_order, state_index);
// indexer_object_state_change_set.update_object_states(state);
}
Op::Delete => {
// indexer_object_state_change_set.remove_object_states(object_id, &object_type);

let state = IndexerObjectState::new(metadata.clone(), tx_order, state_index);
indexer_object_state_change_set.new_object_states(state);
// Use the reverted tx_order and state index as the deleted restored tx_order and tx_order
if let Some(previous_object_meta) = object_mapping.get(&object_id) {
let state = IndexerObjectState::new(
previous_object_meta.clone(),
tx_order,
state_index,
);
indexer_object_state_change_set.new_object_states(state);
}
}
Op::New(_value) => {
// let state = IndexerObjectState::new(metadata.clone(), tx_order, state_index);
Expand All @@ -243,11 +261,47 @@ pub fn handle_revert_object_change(
tx_order,
indexer_object_state_change_set,
change,
object_mapping,
)?;
}
Ok(())
}

pub fn collect_revert_object_change_ids(
object_change: ObjectChange,
object_ids: &mut Vec<ObjectID>,
) -> Result<()> {
let ObjectChange {
metadata,
value,
fields,
} = object_change;
let object_id = metadata.id.clone();
let object_type = metadata.object_type.clone();

// Do not index dynamic field object
if is_dynamic_field_type(&object_type) {
return Ok(());
}
if let Some(op) = value {
match op {
Op::Modify(_value) => {
object_ids.push(object_id);
}
Op::Delete => {
object_ids.push(object_id);
}
Op::New(_value) => {}
}
} else {
}

for (_key, change) in fields {
collect_revert_object_change_ids(change, object_ids)?;
}
Ok(())
}

#[derive(
Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize, Deserialize, JsonSchema, Default,
)]
Expand Down
Loading

0 comments on commit a89b928

Please sign in to comment.