Skip to content

Commit

Permalink
draft implements indexer revert
Browse files Browse the repository at this point in the history
  • Loading branch information
baichuan3 committed Sep 11, 2024
1 parent db0483b commit b499c6f
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 25 deletions.
62 changes: 56 additions & 6 deletions crates/rooch-indexer/src/actor/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

use crate::actor::messages::{
IndexerApplyObjectStatesMessage, IndexerDeleteAnyObjectStatesMessage, IndexerEventsMessage,
IndexerPersistOrUpdateAnyObjectStatesMessage, IndexerStatesMessage, IndexerTransactionMessage,
UpdateIndexerMessage,
IndexerPersistOrUpdateAnyObjectStatesMessage, IndexerRevertStatesMessage, IndexerStatesMessage,
IndexerTransactionMessage, UpdateIndexerMessage,
};
use crate::store::traits::IndexerStoreTrait;
use crate::IndexerStore;
Expand All @@ -14,10 +14,7 @@ use coerce::actor::{context::ActorContext, message::Handler, Actor};
use moveos_types::moveos_std::object::ObjectMeta;
use moveos_types::transaction::MoveAction;
use rooch_types::indexer::event::IndexerEvent;
use rooch_types::indexer::state::{
handle_object_change, IndexerObjectStateChangeSet, IndexerObjectStatesIndexGenerator,
ObjectStateType,
};
use rooch_types::indexer::state::{handle_object_change, handle_revert_object_change, IndexerObjectStateChangeSet, IndexerObjectStatesIndexGenerator, ObjectStateType};
use rooch_types::indexer::transaction::IndexerTransaction;

pub struct IndexerActor {
Expand Down Expand Up @@ -231,3 +228,56 @@ impl Handler<IndexerApplyObjectStatesMessage> for IndexerActor {
Ok(())
}
}

#[async_trait]
impl Handler<IndexerRevertStatesMessage> for IndexerActor {
async fn handle(
&mut self,
msg: IndexerApplyObjectStatesMessage,
_ctx: &mut ActorContext,
) -> Result<()> {
let IndexerRevertStatesMessage {
revert_tx_order,
revert_ledger_tx,
revert_execution_info,
revert_state_change_set,
root,
} = msg;

self.root = root;
let tx_order = ledger_transaction.sequence_info.tx_order;

// 1. revert indexer transaction
self.indexer_store
.delete_transactions(vec![revert_tx_order])?;

// 2. revert indexer event
self.indexer_store.delete_events(vec![revert_tx_order])?;

// 3. revert indexer full object state, including object_states, utxos and inscriptions
// indexer object state index generator
let mut state_index_generator = IndexerObjectStatesIndexGenerator::default();
let mut indexer_object_state_change_set = IndexerObjectStateChangeSet::default();

// set genesis tx_order and state_index_generator for new indexer revert
let tx_order: u64 = 0;
let last_state_index = self
.query_last_state_index_by_tx_order(tx_order, state_type.clone())
.await?;
let mut state_index_generator = last_state_index.map_or(0, |x| x + 1);

for (_feild_key, object_change) in revert_state_change_set.state_change_set.changes {
let _ = handle_revert_object_change(
&mut state_index_generator,
tx_order,
&mut indexer_object_state_change_set,
object_change,
)?;
}
self.indexer_store
.apply_object_states(indexer_object_state_change_set)?;

self.indexer_store.revert_states(object_state_change_set)?;
Ok(())
}
}
15 changes: 14 additions & 1 deletion crates/rooch-indexer/src/actor/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use coerce::actor::message::Message;
use moveos_types::moveos_std::event::Event;
use moveos_types::moveos_std::object::{ObjectID, ObjectMeta};
use moveos_types::moveos_std::tx_context::TxContext;
use moveos_types::state::StateChangeSet;
use moveos_types::state::{StateChangeSet, StateChangeSetExt};
use moveos_types::transaction::{MoveAction, TransactionExecutionInfo, VerifiedMoveOSTransaction};
use rooch_types::indexer::event::{EventFilter, IndexerEvent, IndexerEventID};
use rooch_types::indexer::state::{
Expand Down Expand Up @@ -148,3 +148,16 @@ pub struct IndexerApplyObjectStatesMessage {
impl Message for IndexerApplyObjectStatesMessage {
type Result = Result<()>;
}

#[derive(Debug, Serialize, Deserialize)]
pub struct IndexerRevertStatesMessage {
pub revert_tx_order: u64,
pub revert_ledger_tx: LedgerTransaction,
pub revert_execution_info: TransactionExecutionInfo,
pub revert_state_change_set: StateChangeSetExt,
pub root: ObjectMeta,
}

impl Message for IndexerRevertStatesMessage {
type Result = Result<()>;
}
10 changes: 10 additions & 0 deletions crates/rooch-indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,16 @@ impl IndexerStoreTrait for IndexerStore {
self.get_sqlite_store(INDEXER_EVENTS_TABLE_NAME)?
.persist_events(events)
}

fn delete_transactions(&self, tx_orders: Vec<u64>) -> Result<(), IndexerError> {
self.get_sqlite_store(INDEXER_TRANSACTIONS_TABLE_NAME)?
.delete_transactions(tx_orders)
}

fn delete_events(&self, tx_orders: Vec<u64>) -> Result<(), IndexerError> {
self.get_sqlite_store(INDEXER_EVENTS_TABLE_NAME)?
.delete_events(tx_orders)
}
}

impl IndexerStore {
Expand Down
50 changes: 50 additions & 0 deletions crates/rooch-indexer/src/store/sqlite_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,32 @@ impl SqliteIndexerStore {
Ok(())
}

#[named]
pub fn delete_transactions(&self, tx_orders: Vec<u64>) -> Result<(), IndexerError> {
if tx_orders.is_empty() {
return Ok(());
}

let fn_name = function_name!();
let _timer = self
.db_metrics
.indexer_store_metrics
.indexer_persist_or_update_or_delete_latency_seconds
.with_label_values(&[fn_name])
.start_timer();
let mut connection = get_sqlite_pool_connection(&self.connection_pool)?;

let tx_orders = tx_orders.into_iter().map(|v| v as i64).collect();
diesel::delete(
transactions::table.filter(transactions::tx_order.eq_any(tx_orders.as_slice())),
)
.execute(&mut connection)
.map_err(|e| IndexerError::SQLiteWriteError(e.to_string()))
.context("Failed to delete transactions to SQLiteDB")?;

Ok(())
}

#[named]
pub fn persist_events(&self, events: Vec<IndexerEvent>) -> Result<(), IndexerError> {
if events.is_empty() {
Expand Down Expand Up @@ -372,4 +398,28 @@ impl SqliteIndexerStore {

Ok(())
}

#[named]
pub fn delete_events(&self, tx_orders: Vec<u64>) -> Result<(), IndexerError> {
if tx_orders.is_empty() {
return Ok(());
}

let fn_name = function_name!();
let _timer = self
.db_metrics
.indexer_store_metrics
.indexer_persist_or_update_or_delete_latency_seconds
.with_label_values(&[fn_name])
.start_timer();
let mut connection = get_sqlite_pool_connection(&self.connection_pool)?;

let tx_orders = tx_orders.into_iter().map(|v| v as i64).collect();
diesel::delete(events::table.filter(events::tx_order.eq_any(tx_orders.as_slice())))
.execute(&mut connection)
.map_err(|e| IndexerError::SQLiteWriteError(e.to_string()))
.context("Failed to delete events to SQLiteDB")?;

Ok(())
}
}
5 changes: 5 additions & 0 deletions crates/rooch-indexer/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::errors::IndexerError;
use crate::{INDEXER_EVENTS_TABLE_NAME, INDEXER_TRANSACTIONS_TABLE_NAME};
use rooch_types::indexer::event::IndexerEvent;
use rooch_types::indexer::state::{IndexerObjectState, IndexerObjectStateChangeSet};
use rooch_types::indexer::transaction::IndexerTransaction;
Expand Down Expand Up @@ -39,4 +40,8 @@ pub trait IndexerStoreTrait: Send + Sync {
) -> Result<(), IndexerError>;

fn persist_events(&self, events: Vec<IndexerEvent>) -> Result<(), IndexerError>;

fn delete_transactions(&self, tx_orders: Vec<u64>) -> anyhow::Result<(), IndexerError>;

fn delete_events(&self, tx_orders: Vec<u64>) -> anyhow::Result<(), IndexerError>;
}
56 changes: 56 additions & 0 deletions crates/rooch-types/src/indexer/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,62 @@ pub fn handle_object_change(
Ok(())
}

pub fn handle_revert_object_change(
state_index_generator: &mut IndexerObjectStatesIndexGenerator,
tx_order: u64,
indexer_object_state_change_set: &mut IndexerObjectStateChangeSet,
object_change: ObjectChange,
) -> Result<()> {
let ObjectChange {
metadata,
value,
fields,
} = object_change;
let object_id = metadata.id.clone();
let object_type = metadata.object_type.clone();
let state_index = state_index_generator.get(&object_type);

// Do not index dynamic field object
if is_dynamic_field_type(&object_type) {
return Ok(());
}
if let Some(op) = value {
match op {
Op::Modify(_value) => {
let state = IndexerObjectState::new(metadata.clone(), tx_order, state_index);
indexer_object_state_change_set.update_object_states(state);
}
Op::Delete => {
// indexer_object_state_change_set.remove_object_states(object_id, &object_type);

let state = IndexerObjectState::new(metadata.clone(), tx_order, state_index);
indexer_object_state_change_set.new_object_states(state);
}
Op::New(_value) => {
// let state = IndexerObjectState::new(metadata.clone(), tx_order, state_index);
// indexer_object_state_change_set.new_object_states(state);

indexer_object_state_change_set.remove_object_states(object_id, &object_type);
}
}
} else {
//If value is not changed, do nothing.
// let state = IndexerObjectState::new(metadata.clone(), tx_order, state_index);
// indexer_object_state_change_set.update_object_states(state);
}

state_index_generator.incr(&object_type);
for (_key, change) in fields {
handle_revert_object_change(
state_index_generator,
tx_order,
indexer_object_state_change_set,
change,
)?;
}
Ok(())
}

#[derive(
Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize, Deserialize, JsonSchema, Default,
)]
Expand Down
Loading

0 comments on commit b499c6f

Please sign in to comment.