From b6469e781371b3f056600e91d57d9204b8eb6caa Mon Sep 17 00:00:00 2001 From: Denis Carriere Date: Sat, 9 Nov 2024 22:26:59 +0100 Subject: [PATCH] Add token & update supply changes & balance_changes --- Makefile | 14 ++-- proto/v1/eosio.token.proto | 8 +++ src/balance_changes.rs | 106 ++++++++++++++++-------------- src/pb/antelope.eosio.token.v1.rs | 16 +++++ src/supply_changes.rs | 103 ++++++++++++++--------------- src/utils.rs | 34 ++++++++-- 6 files changed, 169 insertions(+), 112 deletions(-) diff --git a/Makefile b/Makefile index fc99540..379b2f8 100644 --- a/Makefile +++ b/Makefile @@ -25,10 +25,14 @@ graph: info: substreams info -.PHONY: run -run: - substreams run -e eos.substreams.pinax.network:443 graph_out -s -10000 - .PHONY: gui gui: - substreams gui -e eos.substreams.pinax.network:443 graph_out -s -10000 + substreams gui substreams.yaml -e eos.substreams.pinax.network:443 map_events -s -1000 -t 0 + +.PHONY: parquet +parquet: + substreams-sink-files run eos.substreams.pinax.network:443 substreams.yaml map_events '.' 2: --encoder parquet + +.PHONY: schema +schema: + substreams-sink-files tools parquet schema substreams.yaml map_events \ No newline at end of file diff --git a/proto/v1/eosio.token.proto b/proto/v1/eosio.token.proto index e9df70e..06df0d7 100644 --- a/proto/v1/eosio.token.proto +++ b/proto/v1/eosio.token.proto @@ -142,6 +142,10 @@ message BalanceChange { google.protobuf.Timestamp timestamp = 14; string block_hash = 15; string block_date = 16; + + // token + string token = 17; // ExtendedSymbol + string operation = 18; // db_op::Operation } message SupplyChange { @@ -169,4 +173,8 @@ message SupplyChange { google.protobuf.Timestamp timestamp = 14; string block_hash = 15; string block_date = 16; + + // token + string token = 17; // ExtendedSymbol + string operation = 18; // db_op::Operation } diff --git a/src/balance_changes.rs b/src/balance_changes.rs index 3956a0e..1c58e78 100644 --- a/src/balance_changes.rs +++ b/src/balance_changes.rs @@ -1,10 +1,9 @@ -use crate::{abi, BalanceChange}; -use antelope::{Asset, Name, Symbol, SymbolCode}; +use crate::BalanceChange; +use antelope::{Asset, ExtendedSymbol, Name}; use substreams::{log, pb::substreams::Clock}; -use substreams_antelope::decoder::decode; use substreams_antelope::Block; -use crate::utils; +use crate::utils::{self, parse_json_asset}; pub fn collect_balance_changes(clock: &Clock, block: &Block) -> Vec { block @@ -15,54 +14,54 @@ pub fn collect_balance_changes(clock: &Clock, block: &Block) -> Vec(&db_op.old_data_json).ok(); - let new_data = decode::(&db_op.new_data_json).ok(); - - let old_balance = - old_data - .as_ref() - .and_then(|data| match data.balance.parse::() { - Ok(asset) => Some(asset), - Err(e) => { - log::info!( - "Error parsing old balance asset in trx {}: {:?}", - trx.id, - e - ); - None - } - }); - let new_balance = - new_data - .as_ref() - .and_then(|data| match data.balance.parse::() { - Ok(asset) => Some(asset), - Err(e) => { - log::info!( - "Error parsing new balance asset in trx {}: {:?}", - trx.id, - e - ); - None - } - }); + // decoded + let old_balance = parse_json_asset(&db_op.old_data_json, "balance"); + let new_balance = parse_json_asset(&db_op.new_data_json, "balance"); + // no valid Accounts if old_balance.is_none() && new_balance.is_none() { return None; } - let raw_primary_key = Name::from(db_op.primary_key.as_str()).value; - let symcode = SymbolCode::from(raw_primary_key); - let precision = new_balance - .unwrap_or_else(|| old_balance.unwrap()) - .symbol - .precision(); - let sym = Symbol::from_precision(symcode, precision); - let balance = new_balance.unwrap_or_else(|| Asset::from_amount(0, sym)); - let balance_delta = balance.amount - - old_balance - .unwrap_or_else(|| Asset::from_amount(0, sym)) - .amount; + // token contract & account + let contract = Name::from(db_op.code.as_str()); + let account = Name::from(db_op.scope.as_str()); + + // ignore invalid contract or account + if contract.value == 0 || account.value == 0 { + log::info!( + "Invalid contract or account in trx {}: contract: {}, account: {}", + trx.id, + contract, + account + ); + return None; + } + + // ignore mismatched balances + if old_balance.is_some() && new_balance.is_some() { + if old_balance.unwrap().symbol != new_balance.unwrap().symbol { + log::info!( + "Mismatched balance in trx {}: old_balance: {:?}, new_balance: {:?}", + trx.id, + old_balance, + new_balance + ); + return None; + } + } + + // fields derived from old_balance or new_balance + let sym = old_balance + .or(new_balance) + .as_ref() + .expect("missing old_balance or new_balance") + .symbol; + let token = ExtendedSymbol::from_extended(sym, contract); + let zero = Asset::from_amount(0, sym); + let balance = new_balance.as_ref().unwrap_or(&zero); + let old_balance = old_balance.as_ref().unwrap_or(&zero); + let balance_delta = balance.amount - old_balance.amount; Some(BalanceChange { // trace information @@ -70,23 +69,28 @@ pub fn collect_balance_changes(clock: &Clock, block: &Block) -> Vec Vec { block @@ -14,57 +13,52 @@ pub fn collect_supply_changes(clock: &Clock, block: &Block) -> Vec if db_op.table_name != "stat" { return None; } + // token contract + let contract = Name::from(db_op.code.as_str()); - let old_data = decode::(&db_op.old_data_json).ok(); - let new_data = decode::(&db_op.new_data_json).ok(); - - let old_supply = - old_data - .as_ref() - .and_then(|data| match data.supply.parse::() { - Ok(asset) => Some(asset), - Err(e) => { - log::info!( - "Error parsing old supply asset in trx {}: {:?}", - trx.id, - e - ); - None - } - }); + // ignore invalid contract or account + if contract.value == 0 { + log::info!("Invalid contract in trx {}: contract: {}", trx.id, contract,); + return None; + } - let new_supply = - new_data - .as_ref() - .and_then(|data| match data.supply.parse::() { - Ok(asset) => Some(asset), - Err(e) => { - log::info!( - "Error parsing new supply asset in trx {}: {:?}", - trx.id, - e - ); - None - } - }); + // parse Assets + let old_supply = parse_json_asset(&db_op.old_data_json, "supply"); + let new_supply = parse_json_asset(&db_op.new_data_json, "supply"); + let new_max_supply = parse_json_asset(&db_op.new_data_json, "max_supply"); + let new_issuer = parse_json_name(&db_op.new_data_json, "issuer"); + // no valid Assets if old_supply.is_none() && new_supply.is_none() { return None; } - let symcode = SymbolCode::from(Name::from(db_op.primary_key.as_str()).value); - let precision = new_supply - .unwrap_or_else(|| old_supply.unwrap()) - .symbol - .precision(); - let sym = Symbol::from_precision(symcode, precision); - let supply = new_supply.unwrap_or_else(|| Asset::from_amount(0, sym)); - let supply_delta = supply.amount - - old_supply - .unwrap_or_else(|| Asset::from_amount(0, sym)) - .amount; + // ignore mismatched supply + if old_supply.is_some() && new_supply.is_some() { + if old_supply.unwrap().symbol != new_supply.unwrap().symbol { + log::info!( + "Mismatched supply in trx {}: old_supply: {:?}, new_supply: {:?}", + trx.id, + old_supply, + new_supply + ); + return None; + } + } - let data = new_data.unwrap_or_else(|| old_data.unwrap()); + // fields derived from old_balance or new_balance + let sym = old_supply + .or(new_supply) + .as_ref() + .expect("missing old_supply or new_supply") + .symbol; + let token = ExtendedSymbol::from_extended(sym, contract); + let zero = Asset::from_amount(0, sym); + let old_supply = old_supply.as_ref().unwrap_or(&zero); + let supply = new_supply.as_ref().unwrap_or(&zero); + let supply_delta = supply.amount - old_supply.amount; + let max_supply = new_max_supply.as_ref().unwrap_or(&zero); + let issuer = new_issuer.unwrap_or(Name::new()); Some(SupplyChange { // trace information @@ -72,24 +66,29 @@ pub fn collect_supply_changes(clock: &Clock, block: &Block) -> Vec action_index: db_op.action_index, // contract & scope - contract: db_op.code.clone(), - symcode: symcode.to_string(), + contract: contract.to_string(), + symcode: sym.code().to_string(), // payload - issuer: data.issuer, - max_supply: data.max_supply, + issuer: issuer.to_string(), + max_supply: max_supply.to_string(), supply: supply.to_string(), supply_delta, // extras - precision: precision.into(), + precision: sym.precision().into(), amount: supply.amount, value: utils::to_value(&supply), + // block block_num: clock.number, timestamp: clock.timestamp, block_hash: clock.id.clone(), block_date: utils::to_date(&clock), + + // token (ex: "4,EOS@eosio.token") + token: token.to_string(), + operation: db_op.operation().as_str_name().to_string(), }) }) }) diff --git a/src/utils.rs b/src/utils.rs index 084d0e3..9712d86 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,4 +1,4 @@ -use antelope::Asset; +use antelope::{Asset, Name}; use substreams::pb::substreams::Clock; pub fn to_value(quantity: &Asset) -> f64 { @@ -22,6 +22,32 @@ pub fn to_date(clock: &Clock) -> String { .to_string() } -// pub fn to_key(trx_id: &str, action_index: u32) -> String { -// format!("{}-{}", trx_id, action_index) -// } +pub fn parse_json_asset(data_json: &str, key: &str) -> Option { + let v = serde_json::from_str::(data_json); + match v { + Ok(data) => { + let value_str = data[key].as_str().unwrap_or(""); + let value = value_str.parse::(); + match value { + Ok(asset) => Some(asset), + Err(_e) => None, + } + } + Err(_e) => None, + } +} + +pub fn parse_json_name(data_json: &str, key: &str) -> Option { + let v = serde_json::from_str::(data_json); + match v { + Ok(data) => { + let value_str = data[key].as_str().unwrap_or(""); + let value = value_str.parse::(); + match value { + Ok(name) => Some(name), + Err(_e) => None, + } + } + Err(_e) => None, + } +}