diff --git a/README.md b/README.md index 9d59056..877c216 100644 --- a/README.md +++ b/README.md @@ -18,17 +18,9 @@ $ make gui graph TD; map_events[map: map_events]; sf.antelope.type.v1.Block[source: sf.antelope.type.v1.Block] --> map_events; - map_accounts[map: map_accounts]; - sf.antelope.type.v1.Block[source: sf.antelope.type.v1.Block] --> map_accounts; - map_stat[map: map_stat]; - sf.antelope.type.v1.Block[source: sf.antelope.type.v1.Block] --> map_stat; graph_out[map: graph_out]; - map_accounts --> graph_out; - map_stat --> graph_out; map_events --> graph_out; ch_out[map: ch_out]; - map_accounts --> ch_out; - map_stat --> ch_out; map_events --> ch_out; ``` @@ -45,39 +37,21 @@ Initial block: 0 Kind: map Input: source: sf.antelope.type.v1.Block Output Type: proto:antelope.eosio.token.v1.Events -Hash: c85c7f8fad2a0d03984c00ee15d1ded54bfa700e - -Name: map_accounts -Initial block: 0 -Kind: map -Input: source: sf.antelope.type.v1.Block -Output Type: proto:antelope.eosio.token.v1.Accounts -Hash: e85688fa74de76ee66c793b77190025baa242b4a - -Name: map_stat -Initial block: 0 -Kind: map -Input: source: sf.antelope.type.v1.Block -Output Type: proto:antelope.eosio.token.v1.Stats -Hash: c3ba86b0f3f4fdb79e7e51d82fc114df45abc4f9 +Hash: 8dd2fd12c9cb5ad0bb6f32f3b9defcce90090715 Name: graph_out Initial block: 0 Kind: map -Input: map: map_accounts -Input: map: map_stat Input: map: map_events Output Type: proto:sf.substreams.sink.entity.v1.EntityChanges -Hash: 64faa1889da48de19fea8d3b68595f844bba32e9 +Hash: b34452f9c92c173f7a9976534b622ad093dad242 Name: ch_out Initial block: 0 Kind: map -Input: map: map_accounts -Input: map: map_stat Input: map: map_events Output Type: proto:sf.substreams.sink.database.v1.DatabaseChanges -Hash: f4573a7e43387bde4d3572bb649dc315e64f3913 +Hash: 1515b91545f2d2812ac27fbe5f556dc117d57e20 Sink config: ---- diff --git a/proto/v1/eosio.token.proto b/proto/v1/eosio.token.proto index e64fea8..1528382 100644 --- a/proto/v1/eosio.token.proto +++ b/proto/v1/eosio.token.proto @@ -4,68 +4,13 @@ package antelope.eosio.token.v1; import "google/protobuf/timestamp.proto"; -message Accounts { - repeated Account changes = 1; -} - -message Account { - // trace information - string trx_id = 1; - uint32 action_index = 2; - - // contract & scope - string contract = 3; - string symcode = 4; - - // data payload - string account = 5; - string balance = 6; - int64 balance_delta = 7; - - // extras - uint32 precision = 10; - int64 amount = 11; - double value = 12; - - // block information - uint64 block_num = 13; - google.protobuf.Timestamp timestamp = 14; -} - -message Stats { - repeated Stat changes = 1; -} - -message Stat { - // trace information - string trx_id = 1; - uint32 action_index = 2; - - // contract & scope - string contract = 3; - string symcode = 4; - - // data payload - string issuer = 5; - string max_supply = 6; - string supply = 7; - int64 supply_delta = 8; - - // extras - uint32 precision = 10; - int64 amount = 11; - double value = 12; - - // block information - uint64 block_num = 13; - google.protobuf.Timestamp timestamp = 14; -} - message Events { repeated Transfer transfers = 1; repeated Issue issues = 2; repeated Retire retires = 3; repeated Create creates = 4; + repeated BalanceChange balance_changes = 5; + repeated SupplyChange supply_changes = 6; } message Transfer { @@ -163,4 +108,53 @@ message Create { // block information uint64 block_num = 11; google.protobuf.Timestamp timestamp = 12; -} \ No newline at end of file +} + +message BalanceChange { + // trace information + string trx_id = 1; + uint32 action_index = 2; + + // contract & scope + string contract = 3; + string symcode = 4; + + // data payload + string account = 5; + string balance = 6; + int64 balance_delta = 7; + + // extras + uint32 precision = 10; + int64 amount = 11; + double value = 12; + + // block information + uint64 block_num = 13; + google.protobuf.Timestamp timestamp = 14; +} + +message SupplyChange { + // trace information + string trx_id = 1; + uint32 action_index = 2; + + // contract & scope + string contract = 3; + string symcode = 4; + + // data payload + string issuer = 5; + string max_supply = 6; + string supply = 7; + int64 supply_delta = 8; + + // extras + uint32 precision = 10; + int64 amount = 11; + double value = 12; + + // block information + uint64 block_num = 13; + google.protobuf.Timestamp timestamp = 14; +} diff --git a/src/maps.rs b/src/maps.rs index 82e8cba..a9d2117 100644 --- a/src/maps.rs +++ b/src/maps.rs @@ -7,149 +7,6 @@ use crate::abi; use crate::eosio_token::*; use crate::utils; -#[substreams::handlers::map] -fn map_accounts(block: Block) -> Result { - - let changes = block.transaction_traces().flat_map(|trx| { - trx.db_ops.iter().filter_map(|db_op| { - if db_op.table_name != "accounts" { - return None; - } - - let old_data = decode::(&db_op.old_data_json).ok(); - let new_data = decode::(&db_op.new_data_json).ok(); - - let old_balance = old_data.as_ref().and_then(|data| match data.balance.parse::() { - Ok(asset) => Some(asset), - Err(e) => { - log::info!("Error parsing old balance asset in trx {}: {:?}", trx.id, e); - None - } - }); - let new_balance = new_data.as_ref().and_then(|data| match data.balance.parse::() { - Ok(asset) => Some(asset), - Err(e) => { - log::info!("Error parsing new balance asset in trx {}: {:?}", trx.id, e); - None - } - }); - - if old_balance.is_none() && new_balance.is_none() { - return None; - } - - let raw_primary_key = Name::from(db_op.primary_key.as_str()).value; - let symcode = SymbolCode::from(raw_primary_key); - let precision = new_balance.unwrap_or_else(|| old_balance.unwrap()).symbol.precision(); - let sym = Symbol::from_precision(symcode, precision); - let balance = new_balance.unwrap_or_else(|| Asset::from_amount(0, sym)); - let balance_delta = balance.amount - old_balance.unwrap_or_else(|| Asset::from_amount(0, sym)).amount; - - Some(Account { - // trace information - trx_id: trx.id.clone(), - action_index: db_op.action_index, - - // contract & scope - contract: db_op.code.clone(), - symcode: symcode.to_string(), - - // payload - account: db_op.scope.clone(), - balance: balance.to_string(), - balance_delta, - - // extras - precision: precision.into(), - amount: balance.amount, - value: utils::to_value(&balance), - - block_num: block.number as u64, - timestamp: block.header.as_ref().unwrap().timestamp.clone(), - }) - }) - }).collect(); - - Ok(Accounts { changes }) -} - -#[substreams::handlers::map] -fn map_stat(block: Block) -> Result { - let changes = block.transaction_traces().flat_map(|trx| { - trx.db_ops.iter().filter_map(|db_op| { - if db_op.table_name != "stat" { - return None; - } - - let raw_primary_key = match db_op.primary_key.parse::() { - Ok(name) => name.value, - Err(e) => { - log::info!("Error parsing primary key as name in trx {}: {:?}", trx.id, e); - return None; - } - }; - - let old_data = decode::(&db_op.old_data_json).ok(); - let new_data = decode::(&db_op.new_data_json).ok(); - - let old_supply = old_data.as_ref().and_then(|data| match data.supply.parse::() { - Ok(asset) => Some(asset), - Err(e) => { - log::info!("Error parsing old supply asset in trx {}: {:?}", trx.id, e); - None - } - }); - - let new_supply = new_data.as_ref().and_then(|data| match data.supply.parse::() { - Ok(asset) => Some(asset), - Err(e) => { - log::info!("Error parsing new supply asset in trx {}: {:?}", trx.id, e); - None - } - }); - - if old_supply.is_none() && new_supply.is_none() { - return None; - } - - let symcode = SymbolCode::from(raw_primary_key); - let precision = new_supply.unwrap_or_else(|| old_supply.unwrap()).symbol.precision(); - let sym = Symbol::from_precision(symcode, precision); - let supply = new_supply.unwrap_or_else(|| Asset::from_amount(0, sym)); - let supply_delta = supply.amount - old_supply.unwrap_or_else(|| Asset::from_amount(0, sym)).amount; - - let data = new_data.unwrap_or_else(|| old_data.unwrap()); - - Some(Stat { - // trace information - trx_id: trx.id.clone(), - action_index: db_op.action_index, - - // contract & scope - contract: db_op.code.clone(), - symcode: symcode.to_string(), - - // payload - issuer: data.issuer, - max_supply: data.max_supply, - supply: supply.to_string(), - supply_delta, - - // extras - precision: precision.into(), - amount: supply.amount, - value: utils::to_value(&supply), - - block_num: block.number as u64, - timestamp: block.header.as_ref().unwrap().timestamp.clone(), - }) - }) - }) - .collect(); - - Ok(Stats { changes }) -} - #[substreams::handlers::map] fn map_events(block: Block) -> Result { @@ -278,7 +135,129 @@ fn map_events(block: Block) -> Result { }) .collect(); + let balance_changes = block.transaction_traces().flat_map(|trx| { + trx.db_ops.iter().filter_map(|db_op| { + if db_op.table_name != "accounts" { + return None; + } + + let old_data = decode::(&db_op.old_data_json).ok(); + let new_data = decode::(&db_op.new_data_json).ok(); + + let old_balance = old_data.as_ref().and_then(|data| match data.balance.parse::() { + Ok(asset) => Some(asset), + Err(e) => { + log::info!("Error parsing old balance asset in trx {}: {:?}", trx.id, e); + None + } + }); + let new_balance = new_data.as_ref().and_then(|data| match data.balance.parse::() { + Ok(asset) => Some(asset), + Err(e) => { + log::info!("Error parsing new balance asset in trx {}: {:?}", trx.id, e); + None + } + }); + + if old_balance.is_none() && new_balance.is_none() { + return None; + } + + let raw_primary_key = Name::from(db_op.primary_key.as_str()).value; + let symcode = SymbolCode::from(raw_primary_key); + let precision = new_balance.unwrap_or_else(|| old_balance.unwrap()).symbol.precision(); + let sym = Symbol::from_precision(symcode, precision); + let balance = new_balance.unwrap_or_else(|| Asset::from_amount(0, sym)); + let balance_delta = balance.amount - old_balance.unwrap_or_else(|| Asset::from_amount(0, sym)).amount; + + Some(BalanceChange { + // trace information + trx_id: trx.id.clone(), + action_index: db_op.action_index, + + // contract & scope + contract: db_op.code.clone(), + symcode: symcode.to_string(), + + // payload + account: db_op.scope.clone(), + balance: balance.to_string(), + balance_delta, + + // extras + precision: precision.into(), + amount: balance.amount, + value: utils::to_value(&balance), + + block_num: block.number as u64, + timestamp: block.header.as_ref().unwrap().timestamp.clone(), + }) + }) + }).collect(); + + let supply_changes = block.transaction_traces().flat_map(|trx| { + trx.db_ops.iter().filter_map(|db_op| { + if db_op.table_name != "stat" { + return None; + } + + let old_data = decode::(&db_op.old_data_json).ok(); + let new_data = decode::(&db_op.new_data_json).ok(); + + let old_supply = old_data.as_ref().and_then(|data| match data.supply.parse::() { + Ok(asset) => Some(asset), + Err(e) => { + log::info!("Error parsing old supply asset in trx {}: {:?}", trx.id, e); + None + } + }); + + let new_supply = new_data.as_ref().and_then(|data| match data.supply.parse::() { + Ok(asset) => Some(asset), + Err(e) => { + log::info!("Error parsing new supply asset in trx {}: {:?}", trx.id, e); + None + } + }); + + if old_supply.is_none() && new_supply.is_none() { + return None; + } + + let symcode = SymbolCode::from(Name::from(db_op.primary_key.as_str()).value); + let precision = new_supply.unwrap_or_else(|| old_supply.unwrap()).symbol.precision(); + let sym = Symbol::from_precision(symcode, precision); + let supply = new_supply.unwrap_or_else(|| Asset::from_amount(0, sym)); + let supply_delta = supply.amount - old_supply.unwrap_or_else(|| Asset::from_amount(0, sym)).amount; + + let data = new_data.unwrap_or_else(|| old_data.unwrap()); + + Some(SupplyChange { + // trace information + trx_id: trx.id.clone(), + action_index: db_op.action_index, + + // contract & scope + contract: db_op.code.clone(), + symcode: symcode.to_string(), + + // payload + issuer: data.issuer, + max_supply: data.max_supply, + supply: supply.to_string(), + supply_delta, + + // extras + precision: precision.into(), + amount: supply.amount, + value: utils::to_value(&supply), + + block_num: block.number as u64, + timestamp: block.header.as_ref().unwrap().timestamp.clone(), + }) + }) + }).collect(); - Ok(Events { transfers, issues, retires, creates }) + Ok(Events { transfers, issues, retires, creates, balance_changes, supply_changes }) } diff --git a/src/pb/antelope.eosio.token.v1.rs b/src/pb/antelope.eosio.token.v1.rs index 96ce449..6b1631a 100644 --- a/src/pb/antelope.eosio.token.v1.rs +++ b/src/pb/antelope.eosio.token.v1.rs @@ -1,86 +1,6 @@ // @generated #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct Accounts { - #[prost(message, repeated, tag="1")] - pub changes: ::prost::alloc::vec::Vec, -} -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Account { - /// trace information - #[prost(string, tag="1")] - pub trx_id: ::prost::alloc::string::String, - #[prost(uint32, tag="2")] - pub action_index: u32, - /// contract & scope - #[prost(string, tag="3")] - pub contract: ::prost::alloc::string::String, - #[prost(string, tag="4")] - pub symcode: ::prost::alloc::string::String, - /// data payload - #[prost(string, tag="5")] - pub account: ::prost::alloc::string::String, - #[prost(string, tag="6")] - pub balance: ::prost::alloc::string::String, - #[prost(int64, tag="7")] - pub balance_delta: i64, - /// extras - #[prost(uint32, tag="10")] - pub precision: u32, - #[prost(int64, tag="11")] - pub amount: i64, - #[prost(double, tag="12")] - pub value: f64, - /// block information - #[prost(uint64, tag="13")] - pub block_num: u64, - #[prost(message, optional, tag="14")] - pub timestamp: ::core::option::Option<::prost_types::Timestamp>, -} -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Stats { - #[prost(message, repeated, tag="1")] - pub changes: ::prost::alloc::vec::Vec, -} -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Stat { - /// trace information - #[prost(string, tag="1")] - pub trx_id: ::prost::alloc::string::String, - #[prost(uint32, tag="2")] - pub action_index: u32, - /// contract & scope - #[prost(string, tag="3")] - pub contract: ::prost::alloc::string::String, - #[prost(string, tag="4")] - pub symcode: ::prost::alloc::string::String, - /// data payload - #[prost(string, tag="5")] - pub issuer: ::prost::alloc::string::String, - #[prost(string, tag="6")] - pub max_supply: ::prost::alloc::string::String, - #[prost(string, tag="7")] - pub supply: ::prost::alloc::string::String, - #[prost(int64, tag="8")] - pub supply_delta: i64, - /// extras - #[prost(uint32, tag="10")] - pub precision: u32, - #[prost(int64, tag="11")] - pub amount: i64, - #[prost(double, tag="12")] - pub value: f64, - /// block information - #[prost(uint64, tag="13")] - pub block_num: u64, - #[prost(message, optional, tag="14")] - pub timestamp: ::core::option::Option<::prost_types::Timestamp>, -} -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] pub struct Events { #[prost(message, repeated, tag="1")] pub transfers: ::prost::alloc::vec::Vec, @@ -90,6 +10,10 @@ pub struct Events { pub retires: ::prost::alloc::vec::Vec, #[prost(message, repeated, tag="4")] pub creates: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag="5")] + pub balance_changes: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag="6")] + pub supply_changes: ::prost::alloc::vec::Vec, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -225,4 +149,72 @@ pub struct Create { #[prost(message, optional, tag="12")] pub timestamp: ::core::option::Option<::prost_types::Timestamp>, } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BalanceChange { + /// trace information + #[prost(string, tag="1")] + pub trx_id: ::prost::alloc::string::String, + #[prost(uint32, tag="2")] + pub action_index: u32, + /// contract & scope + #[prost(string, tag="3")] + pub contract: ::prost::alloc::string::String, + #[prost(string, tag="4")] + pub symcode: ::prost::alloc::string::String, + /// data payload + #[prost(string, tag="5")] + pub account: ::prost::alloc::string::String, + #[prost(string, tag="6")] + pub balance: ::prost::alloc::string::String, + #[prost(int64, tag="7")] + pub balance_delta: i64, + /// extras + #[prost(uint32, tag="10")] + pub precision: u32, + #[prost(int64, tag="11")] + pub amount: i64, + #[prost(double, tag="12")] + pub value: f64, + /// block information + #[prost(uint64, tag="13")] + pub block_num: u64, + #[prost(message, optional, tag="14")] + pub timestamp: ::core::option::Option<::prost_types::Timestamp>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SupplyChange { + /// trace information + #[prost(string, tag="1")] + pub trx_id: ::prost::alloc::string::String, + #[prost(uint32, tag="2")] + pub action_index: u32, + /// contract & scope + #[prost(string, tag="3")] + pub contract: ::prost::alloc::string::String, + #[prost(string, tag="4")] + pub symcode: ::prost::alloc::string::String, + /// data payload + #[prost(string, tag="5")] + pub issuer: ::prost::alloc::string::String, + #[prost(string, tag="6")] + pub max_supply: ::prost::alloc::string::String, + #[prost(string, tag="7")] + pub supply: ::prost::alloc::string::String, + #[prost(int64, tag="8")] + pub supply_delta: i64, + /// extras + #[prost(uint32, tag="10")] + pub precision: u32, + #[prost(int64, tag="11")] + pub amount: i64, + #[prost(double, tag="12")] + pub value: f64, + /// block information + #[prost(uint64, tag="13")] + pub block_num: u64, + #[prost(message, optional, tag="14")] + pub timestamp: ::core::option::Option<::prost_types::Timestamp>, +} // @@protoc_insertion_point(module) diff --git a/src/sinks.rs b/src/sinks.rs index 2d0d679..93f9a4e 100644 --- a/src/sinks.rs +++ b/src/sinks.rs @@ -3,18 +3,16 @@ use substreams::errors::Error; use substreams_database_change::pb::database::{table_change, DatabaseChanges}; use substreams_entity_change::pb::entity::EntityChanges; -use crate::eosio_token::{Accounts, Stats, Events}; +use crate::eosio_token::Events; use crate::utils::to_key; #[substreams::handlers::map] pub fn graph_out( - map_accounts: Accounts, - map_stats: Stats, map_events: Events, ) -> Result { let mut tables = substreams_entity_change::tables::Tables::new(); - for account in map_accounts.changes { + for account in map_events.balance_changes { let key = to_key(&account.trx_id, account.action_index); tables .create_row("accounts", key) @@ -34,7 +32,7 @@ pub fn graph_out( .set("value", account.value.to_string()); } - for stat in map_stats.changes { + for stat in map_events.supply_changes { let key = to_key(&stat.trx_id, stat.action_index); tables .create_row("stats", key) @@ -141,13 +139,11 @@ pub fn graph_out( #[substreams::handlers::map] pub fn ch_out( - map_accounts: Accounts, - map_stats: Stats, map_events: Events, ) -> Result { let mut tables = DatabaseChanges::default(); - for account in map_accounts.changes { + for account in map_events.balance_changes { let keys = HashMap::from([ ("account".to_string(), account.account.to_string()), ("block_num".to_string(), account.block_num.to_string()), @@ -167,7 +163,7 @@ pub fn ch_out( .change("timestamp", ("", account.timestamp.unwrap().to_string().as_str())); } - for stat in map_stats.changes { + for stat in map_events.supply_changes { let keys = HashMap::from([ ("contract".to_string(), stat.contract.to_string()), ("block_num".to_string(), stat.block_num.to_string()), diff --git a/substreams.yaml b/substreams.yaml index 7f2b0b9..aa67b67 100644 --- a/substreams.yaml +++ b/substreams.yaml @@ -29,25 +29,9 @@ modules: output: type: proto:antelope.eosio.token.v1.Events - - name: map_accounts - kind: map - inputs: - - source: sf.antelope.type.v1.Block - output: - type: proto:antelope.eosio.token.v1.Accounts - - - name: map_stat - kind: map - inputs: - - source: sf.antelope.type.v1.Block - output: - type: proto:antelope.eosio.token.v1.Stats - - name: graph_out kind: map inputs: - - map: map_accounts - - map: map_stat - map: map_events output: type: proto:sf.substreams.sink.entity.v1.EntityChanges @@ -55,8 +39,6 @@ modules: - name: ch_out kind: map inputs: - - map: map_accounts - - map: map_stat - map: map_events output: type: proto:sf.substreams.sink.database.v1.DatabaseChanges