diff --git a/nft_ingester/src/program_transformers/bubblegum/db.rs b/nft_ingester/src/program_transformers/bubblegum/db.rs index cb4368c3e..4ecc78dba 100644 --- a/nft_ingester/src/program_transformers/bubblegum/db.rs +++ b/nft_ingester/src/program_transformers/bubblegum/db.rs @@ -87,7 +87,7 @@ where .to_owned(), ) .build(DbBackend::Postgres); - query.sql = format!("{} WHERE excluded.seq > cl_items.seq", query.sql); + query.sql = format!("{} WHERE excluded.seq >= cl_items.seq", query.sql); txn.execute(query) .await .map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?; diff --git a/ops/src/bubblegum/audit.rs b/ops/src/bubblegum/audit.rs index 671aceed0..8b5247348 100644 --- a/ops/src/bubblegum/audit.rs +++ b/ops/src/bubblegum/audit.rs @@ -1,13 +1,16 @@ use super::rpc::{Rpc, SolanaRpcArgs}; use anyhow::Result; +use borsh::BorshSerialize; use clap::Parser; use das_core::{connect_db, MetricsArgs, PoolArgs}; -use digital_asset_types::dao::cl_audits_v2; use futures::future; +use log::debug; +use std::{path::PathBuf, str::FromStr}; -use sea_orm::{CursorTrait, EntityTrait, SqlxPostgresConnector}; -use solana_sdk::signature::Signature; +use digital_asset_types::dao::{cl_audits_v2, sea_orm_active_enums::Instruction}; +use sea_orm::{ColumnTrait, CursorTrait, EntityTrait, QueryFilter, SqlxPostgresConnector}; +use solana_sdk::{pubkey::Pubkey, signature::Signature}; use solana_transaction_status::EncodedConfirmedTransactionWithStatusMeta; use tokio::io::{stdout, AsyncWriteExt}; @@ -28,6 +31,15 @@ pub struct Args { #[arg(long, env, default_value = "10000")] pub batch_size: u64, + + #[arg(long, env)] + pub only_trees: Option>, + + #[arg(long, env, default_value = "false")] + pub fix: bool, + + #[arg(long, env)] + pub log_path: Option, } pub async fn run(config: Args) -> Result<()> { @@ -37,10 +49,42 @@ pub async fn run(config: Args) -> Result<()> { let mut output = stdout(); let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); - let mut after = None; + let mut after: Option = None; + + if let Some(log_path) = config.log_path { + after = match std::fs::read_to_string(log_path) { + Ok(content) => content + .lines() + .last() + .map(|last_entry| last_entry.parse().ok()) + .flatten(), + Err(_) => None, + }; + } loop { - let mut query = cl_audits_v2::Entity::find().cursor_by(cl_audits_v2::Column::Id); + let mut query = cl_audits_v2::Entity::find(); + + if let Some(only_trees) = &config.only_trees { + let pubkeys = only_trees + .into_iter() + .map(|address| { + Pubkey::from_str(&address) + .map_err(|e| anyhow::anyhow!(e.to_string()))? + .try_to_vec() + .map_err(|e| anyhow::anyhow!(e.to_string())) + }) + .collect::>, anyhow::Error>>()?; + + let pubkeys = pubkeys + .into_iter() + .map(|pubkey| pubkey.try_to_vec()) + .collect::, std::io::Error>>()?; + + query = query.filter(cl_audits_v2::Column::Tree.is_in(pubkeys)); + } + let mut query = query.cursor_by(cl_audits_v2::Column::Id); + let mut query = query.first(config.batch_size); if let Some(after) = after { @@ -57,11 +101,24 @@ pub async fn run(config: Args) -> Result<()> { let transactions = future::join_all(transactions).await; - for (signature, transaction) in transactions.into_iter().flatten() { - if let Some(meta) = transaction.transaction.meta { + for response in transactions.into_iter().flatten() { + if let Some(meta) = response.transaction.transaction.meta { if meta.err.is_some() { + if config.fix { + match response.entry.instruction { + Instruction::Transfer => { + let model: cl_audits_v2::ActiveModel = + response.entry.clone().into(); + + cl_audits_v2::Entity::delete(model).exec(&conn).await?; + } + _ => { + debug!("Unhandled instruction: {:?}", response.entry.instruction); + } + } + } output - .write_all(format!("{}\n", signature).as_bytes()) + .write_all(format!("{}\n", response.entry.id).as_bytes()) .await?; output.flush().await?; @@ -79,13 +136,27 @@ pub async fn run(config: Args) -> Result<()> { Ok(()) } +struct FetchTransactionResponse { + pub entry: cl_audits_v2::Model, + pub transaction: EncodedConfirmedTransactionWithStatusMeta, +} + +impl FetchTransactionResponse { + fn new( + entry: cl_audits_v2::Model, + transaction: EncodedConfirmedTransactionWithStatusMeta, + ) -> Self { + Self { entry, transaction } + } +} + async fn fetch_transaction( entry: cl_audits_v2::Model, solana_rpc: Rpc, -) -> Result<(Signature, EncodedConfirmedTransactionWithStatusMeta)> { +) -> Result { let signature = Signature::try_from(entry.tx.as_ref())?; let transaction = solana_rpc.get_transaction(&signature).await?; - Ok((signature, transaction)) + Ok(FetchTransactionResponse::new(entry, transaction)) }