Skip to content

Commit

Permalink
feat(ops): optionally fix transfer failures by deleting the cl_audits…
Browse files Browse the repository at this point in the history
…_v2. requires rerun of the backfiller. if log-path passed pick back up. switch to saving cl_audits_v2 id as the entry of the audit to allow for picking back work.
  • Loading branch information
kespinola committed Feb 26, 2024
1 parent beca08a commit 60b6da2
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 11 deletions.
2 changes: 1 addition & 1 deletion nft_ingester/src/program_transformers/bubblegum/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ where
.to_owned(),
)
.build(DbBackend::Postgres);
query.sql = format!("{} WHERE excluded.seq > cl_items.seq", query.sql);
query.sql = format!("{} WHERE excluded.seq >= cl_items.seq", query.sql);
txn.execute(query)
.await
.map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?;
Expand Down
91 changes: 81 additions & 10 deletions ops/src/bubblegum/audit.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use super::rpc::{Rpc, SolanaRpcArgs};
use anyhow::Result;

use borsh::BorshSerialize;
use clap::Parser;
use das_core::{connect_db, MetricsArgs, PoolArgs};
use digital_asset_types::dao::cl_audits_v2;
use futures::future;
use log::debug;
use std::{path::PathBuf, str::FromStr};

use sea_orm::{CursorTrait, EntityTrait, SqlxPostgresConnector};
use solana_sdk::signature::Signature;
use digital_asset_types::dao::{cl_audits_v2, sea_orm_active_enums::Instruction};
use sea_orm::{ColumnTrait, CursorTrait, EntityTrait, QueryFilter, SqlxPostgresConnector};
use solana_sdk::{pubkey::Pubkey, signature::Signature};
use solana_transaction_status::EncodedConfirmedTransactionWithStatusMeta;

use tokio::io::{stdout, AsyncWriteExt};
Expand All @@ -28,6 +31,15 @@ pub struct Args {

#[arg(long, env, default_value = "10000")]
pub batch_size: u64,

#[arg(long, env)]
pub only_trees: Option<Vec<String>>,

#[arg(long, env, default_value = "false")]
pub fix: bool,

#[arg(long, env)]
pub log_path: Option<PathBuf>,
}

pub async fn run(config: Args) -> Result<()> {
Expand All @@ -37,10 +49,42 @@ pub async fn run(config: Args) -> Result<()> {

let mut output = stdout();
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);
let mut after = None;
let mut after: Option<i64> = None;

if let Some(log_path) = config.log_path {
after = match std::fs::read_to_string(log_path) {
Ok(content) => content
.lines()
.last()
.map(|last_entry| last_entry.parse().ok())
.flatten(),
Err(_) => None,
};
}

loop {
let mut query = cl_audits_v2::Entity::find().cursor_by(cl_audits_v2::Column::Id);
let mut query = cl_audits_v2::Entity::find();

if let Some(only_trees) = &config.only_trees {
let pubkeys = only_trees
.into_iter()
.map(|address| {
Pubkey::from_str(&address)
.map_err(|e| anyhow::anyhow!(e.to_string()))?
.try_to_vec()
.map_err(|e| anyhow::anyhow!(e.to_string()))
})
.collect::<Result<Vec<Vec<u8>>, anyhow::Error>>()?;

let pubkeys = pubkeys
.into_iter()
.map(|pubkey| pubkey.try_to_vec())
.collect::<Result<Vec<_>, std::io::Error>>()?;

query = query.filter(cl_audits_v2::Column::Tree.is_in(pubkeys));
}
let mut query = query.cursor_by(cl_audits_v2::Column::Id);

let mut query = query.first(config.batch_size);

if let Some(after) = after {
Expand All @@ -57,11 +101,24 @@ pub async fn run(config: Args) -> Result<()> {

let transactions = future::join_all(transactions).await;

for (signature, transaction) in transactions.into_iter().flatten() {
if let Some(meta) = transaction.transaction.meta {
for response in transactions.into_iter().flatten() {
if let Some(meta) = response.transaction.transaction.meta {
if meta.err.is_some() {
if config.fix {
match response.entry.instruction {
Instruction::Transfer => {
let model: cl_audits_v2::ActiveModel =
response.entry.clone().into();

cl_audits_v2::Entity::delete(model).exec(&conn).await?;
}
_ => {
debug!("Unhandled instruction: {:?}", response.entry.instruction);
}
}
}
output
.write_all(format!("{}\n", signature).as_bytes())
.write_all(format!("{}\n", response.entry.id).as_bytes())
.await?;

output.flush().await?;
Expand All @@ -79,13 +136,27 @@ pub async fn run(config: Args) -> Result<()> {
Ok(())
}

struct FetchTransactionResponse {
pub entry: cl_audits_v2::Model,
pub transaction: EncodedConfirmedTransactionWithStatusMeta,
}

impl FetchTransactionResponse {
fn new(
entry: cl_audits_v2::Model,
transaction: EncodedConfirmedTransactionWithStatusMeta,
) -> Self {
Self { entry, transaction }
}
}

async fn fetch_transaction(
entry: cl_audits_v2::Model,
solana_rpc: Rpc,
) -> Result<(Signature, EncodedConfirmedTransactionWithStatusMeta)> {
) -> Result<FetchTransactionResponse> {
let signature = Signature::try_from(entry.tx.as_ref())?;

let transaction = solana_rpc.get_transaction(&signature).await?;

Ok((signature, transaction))
Ok(FetchTransactionResponse::new(entry, transaction))
}

0 comments on commit 60b6da2

Please sign in to comment.