From dc2542c06e1acbcb88e945a690d1fd64ea66276e Mon Sep 17 00:00:00 2001 From: YaroShkvorets Date: Tue, 9 Apr 2024 09:25:19 -0400 Subject: [PATCH 1/4] refactor/add events --- Cargo.lock | 2 +- Cargo.toml | 2 +- README.md | 58 +++++++------- proto/v1/eosio.token.proto | 86 +++++++++++++++++++-- src/maps.rs | 121 ++++++++++++++++++++++++++---- src/pb/antelope.eosio.token.v1.rs | 117 +++++++++++++++++++++++++++-- src/sinks.rs | 80 +++++++++++++++++--- substreams.yaml | 12 +-- 8 files changed, 402 insertions(+), 76 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 92e4659..3130243 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -22,7 +22,7 @@ dependencies = [ [[package]] name = "antelope_tokens" -version = "0.3.8" +version = "0.4.0" dependencies = [ "antelope", "prost 0.11.9", diff --git a/Cargo.toml b/Cargo.toml index b29db09..13c6a7a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "antelope_tokens" -version = "0.3.8" +version = "0.4.0" authors = [ "Denis ", "Yaro ", diff --git a/README.md b/README.md index f27009e..9d59056 100644 --- a/README.md +++ b/README.md @@ -16,8 +16,8 @@ $ make gui ```mermaid graph TD; - map_transfers[map: map_transfers]; - sf.antelope.type.v1.Block[source: sf.antelope.type.v1.Block] --> map_transfers; + map_events[map: map_events]; + sf.antelope.type.v1.Block[source: sf.antelope.type.v1.Block] --> map_events; map_accounts[map: map_accounts]; sf.antelope.type.v1.Block[source: sf.antelope.type.v1.Block] --> map_accounts; map_stat[map: map_stat]; @@ -25,75 +25,75 @@ graph TD; graph_out[map: graph_out]; map_accounts --> graph_out; map_stat --> graph_out; - map_transfers --> graph_out; + map_events --> graph_out; ch_out[map: ch_out]; map_accounts --> ch_out; map_stat --> ch_out; - map_transfers --> ch_out; + map_events --> ch_out; ``` ### Modules ```yaml Package name: antelope_tokens -Version: v0.3.8 +Version: v0.4.0 Doc: Antelope `eosio.token` based action traces & database operations. Modules: - ---- -Name: map_transfers +---- +Name: map_events Initial block: 0 Kind: map Input: source: sf.antelope.type.v1.Block -Output Type: proto:antelope.eosio.token.v1.TransferEvents -Hash: 68580bd87f70567d5a794b7ed2c42829563c17a6 +Output Type: proto:antelope.eosio.token.v1.Events +Hash: c85c7f8fad2a0d03984c00ee15d1ded54bfa700e Name: map_accounts Initial block: 0 Kind: map Input: source: sf.antelope.type.v1.Block Output Type: proto:antelope.eosio.token.v1.Accounts -Hash: dc10dd69bc5bc0ae08a87724995a97e728158dbd +Hash: e85688fa74de76ee66c793b77190025baa242b4a Name: map_stat Initial block: 0 Kind: map Input: source: sf.antelope.type.v1.Block Output Type: proto:antelope.eosio.token.v1.Stats -Hash: 8124e7464c489fe27ee13e4f1ed80abb4c8b6763 +Hash: c3ba86b0f3f4fdb79e7e51d82fc114df45abc4f9 Name: graph_out Initial block: 0 Kind: map Input: map: map_accounts Input: map: map_stat -Input: map: map_transfers +Input: map: map_events Output Type: proto:sf.substreams.sink.entity.v1.EntityChanges -Hash: d4b1a6dc23e5467da5e613ed76366cd73a43fade +Hash: 64faa1889da48de19fea8d3b68595f844bba32e9 Name: ch_out Initial block: 0 Kind: map Input: map: map_accounts Input: map: map_stat -Input: map: map_transfers +Input: map: map_events Output Type: proto:sf.substreams.sink.database.v1.DatabaseChanges -Hash: 6f3621429ae1087b55ba3753a1d0cd7cb632948d +Hash: f4573a7e43387bde4d3572bb649dc315e64f3913 Sink config: - ---- +---- type: sf.substreams.sink.sql.v1.Service configs: - - schema: (6376 bytes) MD5SUM: 6c54c43f4c19f465e51cf36c415bc9f6 [LOADED_FILE] - - dbt_config: - - files: (empty) [ZIPPED_FOLDER] - - run_interval_seconds: 0 - - enabled: false - - wire_protocol_access: false - - hasura_frontend: - - enabled: false - - postgraphile_frontend: - - enabled: false - - pgweb_frontend: - - enabled: false - - engine: 2 +- schema: (6814 bytes) MD5SUM: adf98a1becc37604e5f14ce2ed6a1629 [LOADED_FILE] +- dbt_config: + - files: (empty) [ZIPPED_FOLDER] + - run_interval_seconds: 0 + - enabled: false +- wire_protocol_access: false +- hasura_frontend: + - enabled: false +- postgraphile_frontend: + - enabled: false +- pgweb_frontend: + - enabled: false +- engine: 2 ``` diff --git a/proto/v1/eosio.token.proto b/proto/v1/eosio.token.proto index a188ef5..e64fea8 100644 --- a/proto/v1/eosio.token.proto +++ b/proto/v1/eosio.token.proto @@ -5,7 +5,7 @@ package antelope.eosio.token.v1; import "google/protobuf/timestamp.proto"; message Accounts { - repeated Account items = 1; + repeated Account changes = 1; } message Account { @@ -33,7 +33,7 @@ message Account { } message Stats { - repeated Stat items = 1; + repeated Stat changes = 1; } message Stat { @@ -61,18 +61,20 @@ message Stat { google.protobuf.Timestamp timestamp = 14; } -message TransferEvents { - repeated TransferEvent items = 1; +message Events { + repeated Transfer transfers = 1; + repeated Issue issues = 2; + repeated Retire retires = 3; + repeated Create creates = 4; } -message TransferEvent { +message Transfer { // trace information string trx_id = 1; uint32 action_index = 2; // contract & scope string contract = 3; - string action = 4; string symcode = 5; // data payload @@ -90,3 +92,75 @@ message TransferEvent { uint64 block_num = 13; google.protobuf.Timestamp timestamp = 14; } + +message Issue { + // trace information + string trx_id = 1; + uint32 action_index = 2; + + // contract & scope + string contract = 3; + string symcode = 4; + + // data payload + string issuer = 5; + string to = 6; + string quantity = 7; + string memo = 8; + + // extras + uint32 precision = 9; + int64 amount = 10; + double value = 11; + + // block information + uint64 block_num = 12; + google.protobuf.Timestamp timestamp = 13; +} + +message Retire { + // trace information + string trx_id = 1; + uint32 action_index = 2; + + // contract & scope + string contract = 3; + string symcode = 5; + + // data payload + string from = 6; + string quantity = 7; + string memo = 8; + + // extras + uint32 precision = 9; + int64 amount = 10; + double value = 11; + + // block information + uint64 block_num = 12; + google.protobuf.Timestamp timestamp = 13; +} + +message Create { + // trace information + string trx_id = 1; + uint32 action_index = 2; + + // contract & scope + string contract = 3; + string symcode = 5; + + // data payload + string issuer = 6; + string maximum_supply = 7; + + // extras + uint32 precision = 8; + int64 amount = 9; + double value = 10; + + // block information + uint64 block_num = 11; + google.protobuf.Timestamp timestamp = 12; +} \ No newline at end of file diff --git a/src/maps.rs b/src/maps.rs index 6495a80..82e8cba 100644 --- a/src/maps.rs +++ b/src/maps.rs @@ -10,7 +10,7 @@ use crate::utils; #[substreams::handlers::map] fn map_accounts(block: Block) -> Result { - let items = block.transaction_traces().flat_map(|trx| { + let changes = block.transaction_traces().flat_map(|trx| { trx.db_ops.iter().filter_map(|db_op| { if db_op.table_name != "accounts" { return None; @@ -70,12 +70,12 @@ fn map_accounts(block: Block) -> Result { }) }).collect(); - Ok(Accounts { items }) + Ok(Accounts { changes }) } #[substreams::handlers::map] fn map_stat(block: Block) -> Result { - let items = block.transaction_traces().flat_map(|trx| { + let changes = block.transaction_traces().flat_map(|trx| { trx.db_ops.iter().filter_map(|db_op| { if db_op.table_name != "stat" { return None; @@ -147,13 +147,13 @@ fn map_stat(block: Block) -> Result { }) .collect(); - Ok(Stats { items }) + Ok(Stats { changes }) } #[substreams::handlers::map] -fn map_transfers(block: Block) -> Result { +fn map_events(block: Block) -> Result { - let items = block.actions::(&[]).filter_map(|(action, action_trace, trx)| { + let transfers = block.actions::(&[]).filter_map(|(action, action_trace, trx)| { let quantity = match action.quantity.parse::() { Ok(asset) => asset, @@ -162,25 +162,21 @@ fn map_transfers(block: Block) -> Result { return None; } }; - let symcode = quantity.symbol.code().to_string(); - let precision = quantity.symbol.precision().into(); - let amount = quantity.amount; - Some(TransferEvent { + Some(Transfer { trx_id: trx.id.clone(), action_index: action_trace.action_ordinal, contract: action_trace.action.as_ref().unwrap().account.clone(), - action: action_trace.action.as_ref().unwrap().name.clone(), - symcode, + symcode: quantity.symbol.code().to_string(), from: action.from, to: action.to, quantity: action.quantity, memo: action.memo, - precision, - amount, + precision: quantity.symbol.precision().into(), + amount: quantity.amount, value: utils::to_value(&quantity), block_num: block.number as u64, @@ -189,5 +185,100 @@ fn map_transfers(block: Block) -> Result { }) .collect(); - Ok(TransferEvents { items }) + let issues = block.actions::(&[]).filter_map(|(action, action_trace, trx)| { + + let quantity = match action.quantity.parse::() { + Ok(asset) => asset, + Err(e) => { + log::info!("Error parsing issue asset in trx {}: {:?}", trx.id, e); + return None; + } + }; + + Some(Issue { + trx_id: trx.id.clone(), + action_index: action_trace.action_ordinal, + + contract: action_trace.action.as_ref().unwrap().account.clone(), + symcode: quantity.symbol.code().to_string(), + + issuer: action_trace.receiver.clone(), + to: action.to, + quantity: action.quantity, + memo: action.memo, + + precision: quantity.symbol.precision().into(), + amount: quantity.amount, + value: utils::to_value(&quantity), + + block_num: block.number as u64, + timestamp: block.header.as_ref().unwrap().timestamp.clone(), + }) + }) + .collect(); + + let retires = block.actions::(&[]).filter_map(|(action, action_trace, trx)| { + + let quantity = match action.quantity.parse::() { + Ok(asset) => asset, + Err(e) => { + log::info!("Error parsing retire asset in trx {}: {:?}", trx.id, e); + return None; + } + }; + + Some(Retire { + trx_id: trx.id.clone(), + action_index: action_trace.action_ordinal, + + contract: action_trace.action.as_ref().unwrap().account.clone(), + symcode: quantity.symbol.code().to_string(), + + from: action_trace.receiver.clone(), + quantity: action.quantity, + memo: action.memo, + + precision: quantity.symbol.precision().into(), + amount: quantity.amount, + value: utils::to_value(&quantity), + + block_num: block.number as u64, + timestamp: block.header.as_ref().unwrap().timestamp.clone(), + }) + }) + .collect(); + + let creates = block.actions::(&[]).filter_map(|(action, action_trace, trx)| { + + let maximum_supply = match action.maximum_supply.parse::() { + Ok(asset) => asset, + Err(e) => { + log::info!("Error parsing create max supply asset in trx {}: {:?}", trx.id, e); + return None; + } + }; + + Some(Create { + trx_id: trx.id.clone(), + action_index: action_trace.action_ordinal, + + contract: action_trace.action.as_ref().unwrap().account.clone(), + symcode: maximum_supply.symbol.code().to_string(), + + issuer: action_trace.receiver.clone(), + maximum_supply: action.maximum_supply, + + precision: maximum_supply.symbol.precision().into(), + amount: maximum_supply.amount, + value: utils::to_value(&maximum_supply), + + block_num: block.number as u64, + timestamp: block.header.as_ref().unwrap().timestamp.clone(), + }) + }) + .collect(); + + + + Ok(Events { transfers, issues, retires, creates }) } diff --git a/src/pb/antelope.eosio.token.v1.rs b/src/pb/antelope.eosio.token.v1.rs index bc09ac4..96ce449 100644 --- a/src/pb/antelope.eosio.token.v1.rs +++ b/src/pb/antelope.eosio.token.v1.rs @@ -3,7 +3,7 @@ #[derive(Clone, PartialEq, ::prost::Message)] pub struct Accounts { #[prost(message, repeated, tag="1")] - pub items: ::prost::alloc::vec::Vec, + pub changes: ::prost::alloc::vec::Vec, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -42,7 +42,7 @@ pub struct Account { #[derive(Clone, PartialEq, ::prost::Message)] pub struct Stats { #[prost(message, repeated, tag="1")] - pub items: ::prost::alloc::vec::Vec, + pub changes: ::prost::alloc::vec::Vec, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -81,13 +81,19 @@ pub struct Stat { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct TransferEvents { +pub struct Events { #[prost(message, repeated, tag="1")] - pub items: ::prost::alloc::vec::Vec, + pub transfers: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag="2")] + pub issues: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag="3")] + pub retires: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag="4")] + pub creates: ::prost::alloc::vec::Vec, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct TransferEvent { +pub struct Transfer { /// trace information #[prost(string, tag="1")] pub trx_id: ::prost::alloc::string::String, @@ -96,8 +102,6 @@ pub struct TransferEvent { /// contract & scope #[prost(string, tag="3")] pub contract: ::prost::alloc::string::String, - #[prost(string, tag="4")] - pub action: ::prost::alloc::string::String, #[prost(string, tag="5")] pub symcode: ::prost::alloc::string::String, /// data payload @@ -122,4 +126,103 @@ pub struct TransferEvent { #[prost(message, optional, tag="14")] pub timestamp: ::core::option::Option<::prost_types::Timestamp>, } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Issue { + /// trace information + #[prost(string, tag="1")] + pub trx_id: ::prost::alloc::string::String, + #[prost(uint32, tag="2")] + pub action_index: u32, + /// contract & scope + #[prost(string, tag="3")] + pub contract: ::prost::alloc::string::String, + #[prost(string, tag="4")] + pub symcode: ::prost::alloc::string::String, + /// data payload + #[prost(string, tag="5")] + pub issuer: ::prost::alloc::string::String, + #[prost(string, tag="6")] + pub to: ::prost::alloc::string::String, + #[prost(string, tag="7")] + pub quantity: ::prost::alloc::string::String, + #[prost(string, tag="8")] + pub memo: ::prost::alloc::string::String, + /// extras + #[prost(uint32, tag="9")] + pub precision: u32, + #[prost(int64, tag="10")] + pub amount: i64, + #[prost(double, tag="11")] + pub value: f64, + /// block information + #[prost(uint64, tag="12")] + pub block_num: u64, + #[prost(message, optional, tag="13")] + pub timestamp: ::core::option::Option<::prost_types::Timestamp>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Retire { + /// trace information + #[prost(string, tag="1")] + pub trx_id: ::prost::alloc::string::String, + #[prost(uint32, tag="2")] + pub action_index: u32, + /// contract & scope + #[prost(string, tag="3")] + pub contract: ::prost::alloc::string::String, + #[prost(string, tag="5")] + pub symcode: ::prost::alloc::string::String, + /// data payload + #[prost(string, tag="6")] + pub from: ::prost::alloc::string::String, + #[prost(string, tag="7")] + pub quantity: ::prost::alloc::string::String, + #[prost(string, tag="8")] + pub memo: ::prost::alloc::string::String, + /// extras + #[prost(uint32, tag="9")] + pub precision: u32, + #[prost(int64, tag="10")] + pub amount: i64, + #[prost(double, tag="11")] + pub value: f64, + /// block information + #[prost(uint64, tag="12")] + pub block_num: u64, + #[prost(message, optional, tag="13")] + pub timestamp: ::core::option::Option<::prost_types::Timestamp>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Create { + /// trace information + #[prost(string, tag="1")] + pub trx_id: ::prost::alloc::string::String, + #[prost(uint32, tag="2")] + pub action_index: u32, + /// contract & scope + #[prost(string, tag="3")] + pub contract: ::prost::alloc::string::String, + #[prost(string, tag="5")] + pub symcode: ::prost::alloc::string::String, + /// data payload + #[prost(string, tag="6")] + pub issuer: ::prost::alloc::string::String, + #[prost(string, tag="7")] + pub maximum_supply: ::prost::alloc::string::String, + /// extras + #[prost(uint32, tag="8")] + pub precision: u32, + #[prost(int64, tag="9")] + pub amount: i64, + #[prost(double, tag="10")] + pub value: f64, + /// block information + #[prost(uint64, tag="11")] + pub block_num: u64, + #[prost(message, optional, tag="12")] + pub timestamp: ::core::option::Option<::prost_types::Timestamp>, +} // @@protoc_insertion_point(module) diff --git a/src/sinks.rs b/src/sinks.rs index 45227c0..2d0d679 100644 --- a/src/sinks.rs +++ b/src/sinks.rs @@ -3,18 +3,18 @@ use substreams::errors::Error; use substreams_database_change::pb::database::{table_change, DatabaseChanges}; use substreams_entity_change::pb::entity::EntityChanges; -use crate::eosio_token::{Accounts, Stats, TransferEvents}; +use crate::eosio_token::{Accounts, Stats, Events}; use crate::utils::to_key; #[substreams::handlers::map] pub fn graph_out( map_accounts: Accounts, map_stats: Stats, - map_transfers: TransferEvents, + map_events: Events, ) -> Result { let mut tables = substreams_entity_change::tables::Tables::new(); - for account in map_accounts.items { + for account in map_accounts.changes { let key = to_key(&account.trx_id, account.action_index); tables .create_row("accounts", key) @@ -34,7 +34,7 @@ pub fn graph_out( .set("value", account.value.to_string()); } - for stat in map_stats.items { + for stat in map_stats.changes { let key = to_key(&stat.trx_id, stat.action_index); tables .create_row("stats", key) @@ -55,7 +55,7 @@ pub fn graph_out( .set("value", stat.value.to_string()); } - for transfer in map_transfers.items { + for transfer in map_events.transfers { let key = to_key(&transfer.trx_id, transfer.action_index); tables .create_row("transfers", key) @@ -64,7 +64,6 @@ pub fn graph_out( .set("action_index", transfer.action_index.to_string()) // contract & scope .set("contract", transfer.contract.to_string()) - .set("action", transfer.action.to_string()) .set("symcode", transfer.symcode.to_string()) // data payload .set("from", transfer.from.to_string()) @@ -77,6 +76,66 @@ pub fn graph_out( .set("value", transfer.value.to_string()); } + for issue in map_events.issues { + let key = to_key(&issue.trx_id, issue.action_index); + tables + .create_row("issues", key) + // transaction + .set("trx_id", issue.trx_id.to_string()) + .set("action_index", issue.action_index.to_string()) + // contract & scope + .set("contract", issue.contract.to_string()) + .set("symcode", issue.symcode.to_string()) + // data payload + .set("issuer", issue.issuer.to_string()) + .set("to", issue.to.to_string()) + .set("memo", issue.memo.to_string()) + .set("quantity", issue.quantity.to_string()) + // extras + .set("amount", issue.amount.to_string()) + .set("precision", issue.precision.to_string()) + .set("value", issue.value.to_string()); + } + + for retire in map_events.retires { + let key = to_key(&retire.trx_id, retire.action_index); + tables + .create_row("retires", key) + // transaction + .set("trx_id", retire.trx_id.to_string()) + .set("action_index", retire.action_index.to_string()) + // contract & scope + .set("contract", retire.contract.to_string()) + .set("symcode", retire.symcode.to_string()) + // data payload + .set("quantity", retire.quantity.to_string()) + .set("from", retire.from.to_string()) + .set("memo", retire.memo.to_string()) + // extras + .set("amount", retire.amount.to_string()) + .set("precision", retire.precision.to_string()) + .set("value", retire.value.to_string()); + } + + for create in map_events.creates { + let key = to_key(&create.trx_id, create.action_index); + tables + .create_row("creates", key) + // transaction + .set("trx_id", create.trx_id.to_string()) + .set("action_index", create.action_index.to_string()) + // contract & scope + .set("contract", create.contract.to_string()) + .set("symcode", create.symcode.to_string()) + // data payload + .set("issuer", create.issuer.to_string()) + .set("maximum_supply", create.maximum_supply.to_string()) + // extras + .set("amount", create.amount.to_string()) + .set("precision", create.precision.to_string()) + .set("value", create.value.to_string()); + } + Ok(tables.to_entity_changes()) } @@ -84,11 +143,11 @@ pub fn graph_out( pub fn ch_out( map_accounts: Accounts, map_stats: Stats, - map_transfers: TransferEvents, + map_events: Events, ) -> Result { let mut tables = DatabaseChanges::default(); - for account in map_accounts.items { + for account in map_accounts.changes { let keys = HashMap::from([ ("account".to_string(), account.account.to_string()), ("block_num".to_string(), account.block_num.to_string()), @@ -108,7 +167,7 @@ pub fn ch_out( .change("timestamp", ("", account.timestamp.unwrap().to_string().as_str())); } - for stat in map_stats.items { + for stat in map_stats.changes { let keys = HashMap::from([ ("contract".to_string(), stat.contract.to_string()), ("block_num".to_string(), stat.block_num.to_string()), @@ -129,7 +188,7 @@ pub fn ch_out( .change("timestamp", ("", stat.timestamp.unwrap().to_string().as_str())); } - for transfer in map_transfers.items { + for transfer in map_events.transfers { let keys = HashMap::from([ ("trx_id".to_string(), transfer.trx_id), ("action_index".to_string(), transfer.action_index.to_string()), @@ -138,7 +197,6 @@ pub fn ch_out( tables .push_change_composite("transfer_events", keys, 0, table_change::Operation::Create) .change("contract", ("", transfer.contract.to_string().as_str())) - .change("action", ("", transfer.action.to_string().as_str())) .change("symcode", ("", transfer.symcode.to_string().as_str())) .change("from", ("", transfer.from.to_string().as_str())) .change("to", ("", transfer.to.to_string().as_str())) diff --git a/substreams.yaml b/substreams.yaml index 7217c3d..7f2b0b9 100644 --- a/substreams.yaml +++ b/substreams.yaml @@ -1,13 +1,13 @@ specVersion: v0.1.0 package: name: antelope_tokens - version: v0.3.9 + version: v0.4.0 url: https://github.com/pinax-network/substreams-antelope-tokens doc: Antelope `eosio.token` based action traces & database operations. imports: entities: https://github.com/streamingfast/substreams-sink-entity-changes/releases/download/v1.3.1/substreams-sink-entity-changes-v1.3.1.spkg - database_change: https://github.com/streamingfast/substreams-database-change/releases/download/v1.0.0/substreams-database-change-v1.0.0.spkg + database_change: https://github.com/streamingfast/substreams-database-change/releases/download/v1.3.1/substreams-database-change-v1.3.1.spkg sql: https://github.com/streamingfast/substreams-sink-sql/releases/download/protodefs-v1.0.3/substreams-sink-sql-protodefs-v1.0.3.spkg binaries: @@ -22,12 +22,12 @@ protobuf: - ./proto/v1 modules: - - name: map_transfers + - name: map_events kind: map inputs: - source: sf.antelope.type.v1.Block output: - type: proto:antelope.eosio.token.v1.TransferEvents + type: proto:antelope.eosio.token.v1.Events - name: map_accounts kind: map @@ -48,7 +48,7 @@ modules: inputs: - map: map_accounts - map: map_stat - - map: map_transfers + - map: map_events output: type: proto:sf.substreams.sink.entity.v1.EntityChanges @@ -57,7 +57,7 @@ modules: inputs: - map: map_accounts - map: map_stat - - map: map_transfers + - map: map_events output: type: proto:sf.substreams.sink.database.v1.DatabaseChanges From 5ef48f28eac163aa2f3dd9fc514af09c40b4901e Mon Sep 17 00:00:00 2001 From: YaroShkvorets Date: Tue, 9 Apr 2024 17:50:09 -0400 Subject: [PATCH 2/4] move all events into map_events --- README.md | 32 +--- proto/v1/eosio.token.proto | 110 ++++++------ src/maps.rs | 267 ++++++++++++++---------------- src/pb/antelope.eosio.token.v1.rs | 152 ++++++++--------- src/sinks.rs | 14 +- substreams.yaml | 18 -- 6 files changed, 255 insertions(+), 338 deletions(-) diff --git a/README.md b/README.md index 9d59056..877c216 100644 --- a/README.md +++ b/README.md @@ -18,17 +18,9 @@ $ make gui graph TD; map_events[map: map_events]; sf.antelope.type.v1.Block[source: sf.antelope.type.v1.Block] --> map_events; - map_accounts[map: map_accounts]; - sf.antelope.type.v1.Block[source: sf.antelope.type.v1.Block] --> map_accounts; - map_stat[map: map_stat]; - sf.antelope.type.v1.Block[source: sf.antelope.type.v1.Block] --> map_stat; graph_out[map: graph_out]; - map_accounts --> graph_out; - map_stat --> graph_out; map_events --> graph_out; ch_out[map: ch_out]; - map_accounts --> ch_out; - map_stat --> ch_out; map_events --> ch_out; ``` @@ -45,39 +37,21 @@ Initial block: 0 Kind: map Input: source: sf.antelope.type.v1.Block Output Type: proto:antelope.eosio.token.v1.Events -Hash: c85c7f8fad2a0d03984c00ee15d1ded54bfa700e - -Name: map_accounts -Initial block: 0 -Kind: map -Input: source: sf.antelope.type.v1.Block -Output Type: proto:antelope.eosio.token.v1.Accounts -Hash: e85688fa74de76ee66c793b77190025baa242b4a - -Name: map_stat -Initial block: 0 -Kind: map -Input: source: sf.antelope.type.v1.Block -Output Type: proto:antelope.eosio.token.v1.Stats -Hash: c3ba86b0f3f4fdb79e7e51d82fc114df45abc4f9 +Hash: 8dd2fd12c9cb5ad0bb6f32f3b9defcce90090715 Name: graph_out Initial block: 0 Kind: map -Input: map: map_accounts -Input: map: map_stat Input: map: map_events Output Type: proto:sf.substreams.sink.entity.v1.EntityChanges -Hash: 64faa1889da48de19fea8d3b68595f844bba32e9 +Hash: b34452f9c92c173f7a9976534b622ad093dad242 Name: ch_out Initial block: 0 Kind: map -Input: map: map_accounts -Input: map: map_stat Input: map: map_events Output Type: proto:sf.substreams.sink.database.v1.DatabaseChanges -Hash: f4573a7e43387bde4d3572bb649dc315e64f3913 +Hash: 1515b91545f2d2812ac27fbe5f556dc117d57e20 Sink config: ---- diff --git a/proto/v1/eosio.token.proto b/proto/v1/eosio.token.proto index e64fea8..1528382 100644 --- a/proto/v1/eosio.token.proto +++ b/proto/v1/eosio.token.proto @@ -4,68 +4,13 @@ package antelope.eosio.token.v1; import "google/protobuf/timestamp.proto"; -message Accounts { - repeated Account changes = 1; -} - -message Account { - // trace information - string trx_id = 1; - uint32 action_index = 2; - - // contract & scope - string contract = 3; - string symcode = 4; - - // data payload - string account = 5; - string balance = 6; - int64 balance_delta = 7; - - // extras - uint32 precision = 10; - int64 amount = 11; - double value = 12; - - // block information - uint64 block_num = 13; - google.protobuf.Timestamp timestamp = 14; -} - -message Stats { - repeated Stat changes = 1; -} - -message Stat { - // trace information - string trx_id = 1; - uint32 action_index = 2; - - // contract & scope - string contract = 3; - string symcode = 4; - - // data payload - string issuer = 5; - string max_supply = 6; - string supply = 7; - int64 supply_delta = 8; - - // extras - uint32 precision = 10; - int64 amount = 11; - double value = 12; - - // block information - uint64 block_num = 13; - google.protobuf.Timestamp timestamp = 14; -} - message Events { repeated Transfer transfers = 1; repeated Issue issues = 2; repeated Retire retires = 3; repeated Create creates = 4; + repeated BalanceChange balance_changes = 5; + repeated SupplyChange supply_changes = 6; } message Transfer { @@ -163,4 +108,53 @@ message Create { // block information uint64 block_num = 11; google.protobuf.Timestamp timestamp = 12; -} \ No newline at end of file +} + +message BalanceChange { + // trace information + string trx_id = 1; + uint32 action_index = 2; + + // contract & scope + string contract = 3; + string symcode = 4; + + // data payload + string account = 5; + string balance = 6; + int64 balance_delta = 7; + + // extras + uint32 precision = 10; + int64 amount = 11; + double value = 12; + + // block information + uint64 block_num = 13; + google.protobuf.Timestamp timestamp = 14; +} + +message SupplyChange { + // trace information + string trx_id = 1; + uint32 action_index = 2; + + // contract & scope + string contract = 3; + string symcode = 4; + + // data payload + string issuer = 5; + string max_supply = 6; + string supply = 7; + int64 supply_delta = 8; + + // extras + uint32 precision = 10; + int64 amount = 11; + double value = 12; + + // block information + uint64 block_num = 13; + google.protobuf.Timestamp timestamp = 14; +} diff --git a/src/maps.rs b/src/maps.rs index 82e8cba..a9d2117 100644 --- a/src/maps.rs +++ b/src/maps.rs @@ -7,149 +7,6 @@ use crate::abi; use crate::eosio_token::*; use crate::utils; -#[substreams::handlers::map] -fn map_accounts(block: Block) -> Result { - - let changes = block.transaction_traces().flat_map(|trx| { - trx.db_ops.iter().filter_map(|db_op| { - if db_op.table_name != "accounts" { - return None; - } - - let old_data = decode::(&db_op.old_data_json).ok(); - let new_data = decode::(&db_op.new_data_json).ok(); - - let old_balance = old_data.as_ref().and_then(|data| match data.balance.parse::() { - Ok(asset) => Some(asset), - Err(e) => { - log::info!("Error parsing old balance asset in trx {}: {:?}", trx.id, e); - None - } - }); - let new_balance = new_data.as_ref().and_then(|data| match data.balance.parse::() { - Ok(asset) => Some(asset), - Err(e) => { - log::info!("Error parsing new balance asset in trx {}: {:?}", trx.id, e); - None - } - }); - - if old_balance.is_none() && new_balance.is_none() { - return None; - } - - let raw_primary_key = Name::from(db_op.primary_key.as_str()).value; - let symcode = SymbolCode::from(raw_primary_key); - let precision = new_balance.unwrap_or_else(|| old_balance.unwrap()).symbol.precision(); - let sym = Symbol::from_precision(symcode, precision); - let balance = new_balance.unwrap_or_else(|| Asset::from_amount(0, sym)); - let balance_delta = balance.amount - old_balance.unwrap_or_else(|| Asset::from_amount(0, sym)).amount; - - Some(Account { - // trace information - trx_id: trx.id.clone(), - action_index: db_op.action_index, - - // contract & scope - contract: db_op.code.clone(), - symcode: symcode.to_string(), - - // payload - account: db_op.scope.clone(), - balance: balance.to_string(), - balance_delta, - - // extras - precision: precision.into(), - amount: balance.amount, - value: utils::to_value(&balance), - - block_num: block.number as u64, - timestamp: block.header.as_ref().unwrap().timestamp.clone(), - }) - }) - }).collect(); - - Ok(Accounts { changes }) -} - -#[substreams::handlers::map] -fn map_stat(block: Block) -> Result { - let changes = block.transaction_traces().flat_map(|trx| { - trx.db_ops.iter().filter_map(|db_op| { - if db_op.table_name != "stat" { - return None; - } - - let raw_primary_key = match db_op.primary_key.parse::() { - Ok(name) => name.value, - Err(e) => { - log::info!("Error parsing primary key as name in trx {}: {:?}", trx.id, e); - return None; - } - }; - - let old_data = decode::(&db_op.old_data_json).ok(); - let new_data = decode::(&db_op.new_data_json).ok(); - - let old_supply = old_data.as_ref().and_then(|data| match data.supply.parse::() { - Ok(asset) => Some(asset), - Err(e) => { - log::info!("Error parsing old supply asset in trx {}: {:?}", trx.id, e); - None - } - }); - - let new_supply = new_data.as_ref().and_then(|data| match data.supply.parse::() { - Ok(asset) => Some(asset), - Err(e) => { - log::info!("Error parsing new supply asset in trx {}: {:?}", trx.id, e); - None - } - }); - - if old_supply.is_none() && new_supply.is_none() { - return None; - } - - let symcode = SymbolCode::from(raw_primary_key); - let precision = new_supply.unwrap_or_else(|| old_supply.unwrap()).symbol.precision(); - let sym = Symbol::from_precision(symcode, precision); - let supply = new_supply.unwrap_or_else(|| Asset::from_amount(0, sym)); - let supply_delta = supply.amount - old_supply.unwrap_or_else(|| Asset::from_amount(0, sym)).amount; - - let data = new_data.unwrap_or_else(|| old_data.unwrap()); - - Some(Stat { - // trace information - trx_id: trx.id.clone(), - action_index: db_op.action_index, - - // contract & scope - contract: db_op.code.clone(), - symcode: symcode.to_string(), - - // payload - issuer: data.issuer, - max_supply: data.max_supply, - supply: supply.to_string(), - supply_delta, - - // extras - precision: precision.into(), - amount: supply.amount, - value: utils::to_value(&supply), - - block_num: block.number as u64, - timestamp: block.header.as_ref().unwrap().timestamp.clone(), - }) - }) - }) - .collect(); - - Ok(Stats { changes }) -} - #[substreams::handlers::map] fn map_events(block: Block) -> Result { @@ -278,7 +135,129 @@ fn map_events(block: Block) -> Result { }) .collect(); + let balance_changes = block.transaction_traces().flat_map(|trx| { + trx.db_ops.iter().filter_map(|db_op| { + if db_op.table_name != "accounts" { + return None; + } + + let old_data = decode::(&db_op.old_data_json).ok(); + let new_data = decode::(&db_op.new_data_json).ok(); + + let old_balance = old_data.as_ref().and_then(|data| match data.balance.parse::() { + Ok(asset) => Some(asset), + Err(e) => { + log::info!("Error parsing old balance asset in trx {}: {:?}", trx.id, e); + None + } + }); + let new_balance = new_data.as_ref().and_then(|data| match data.balance.parse::() { + Ok(asset) => Some(asset), + Err(e) => { + log::info!("Error parsing new balance asset in trx {}: {:?}", trx.id, e); + None + } + }); + + if old_balance.is_none() && new_balance.is_none() { + return None; + } + + let raw_primary_key = Name::from(db_op.primary_key.as_str()).value; + let symcode = SymbolCode::from(raw_primary_key); + let precision = new_balance.unwrap_or_else(|| old_balance.unwrap()).symbol.precision(); + let sym = Symbol::from_precision(symcode, precision); + let balance = new_balance.unwrap_or_else(|| Asset::from_amount(0, sym)); + let balance_delta = balance.amount - old_balance.unwrap_or_else(|| Asset::from_amount(0, sym)).amount; + + Some(BalanceChange { + // trace information + trx_id: trx.id.clone(), + action_index: db_op.action_index, + + // contract & scope + contract: db_op.code.clone(), + symcode: symcode.to_string(), + + // payload + account: db_op.scope.clone(), + balance: balance.to_string(), + balance_delta, + + // extras + precision: precision.into(), + amount: balance.amount, + value: utils::to_value(&balance), + + block_num: block.number as u64, + timestamp: block.header.as_ref().unwrap().timestamp.clone(), + }) + }) + }).collect(); + + let supply_changes = block.transaction_traces().flat_map(|trx| { + trx.db_ops.iter().filter_map(|db_op| { + if db_op.table_name != "stat" { + return None; + } + + let old_data = decode::(&db_op.old_data_json).ok(); + let new_data = decode::(&db_op.new_data_json).ok(); + + let old_supply = old_data.as_ref().and_then(|data| match data.supply.parse::() { + Ok(asset) => Some(asset), + Err(e) => { + log::info!("Error parsing old supply asset in trx {}: {:?}", trx.id, e); + None + } + }); + + let new_supply = new_data.as_ref().and_then(|data| match data.supply.parse::() { + Ok(asset) => Some(asset), + Err(e) => { + log::info!("Error parsing new supply asset in trx {}: {:?}", trx.id, e); + None + } + }); + + if old_supply.is_none() && new_supply.is_none() { + return None; + } + + let symcode = SymbolCode::from(Name::from(db_op.primary_key.as_str()).value); + let precision = new_supply.unwrap_or_else(|| old_supply.unwrap()).symbol.precision(); + let sym = Symbol::from_precision(symcode, precision); + let supply = new_supply.unwrap_or_else(|| Asset::from_amount(0, sym)); + let supply_delta = supply.amount - old_supply.unwrap_or_else(|| Asset::from_amount(0, sym)).amount; + + let data = new_data.unwrap_or_else(|| old_data.unwrap()); + + Some(SupplyChange { + // trace information + trx_id: trx.id.clone(), + action_index: db_op.action_index, + + // contract & scope + contract: db_op.code.clone(), + symcode: symcode.to_string(), + + // payload + issuer: data.issuer, + max_supply: data.max_supply, + supply: supply.to_string(), + supply_delta, + + // extras + precision: precision.into(), + amount: supply.amount, + value: utils::to_value(&supply), + + block_num: block.number as u64, + timestamp: block.header.as_ref().unwrap().timestamp.clone(), + }) + }) + }).collect(); - Ok(Events { transfers, issues, retires, creates }) + Ok(Events { transfers, issues, retires, creates, balance_changes, supply_changes }) } diff --git a/src/pb/antelope.eosio.token.v1.rs b/src/pb/antelope.eosio.token.v1.rs index 96ce449..6b1631a 100644 --- a/src/pb/antelope.eosio.token.v1.rs +++ b/src/pb/antelope.eosio.token.v1.rs @@ -1,86 +1,6 @@ // @generated #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct Accounts { - #[prost(message, repeated, tag="1")] - pub changes: ::prost::alloc::vec::Vec, -} -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Account { - /// trace information - #[prost(string, tag="1")] - pub trx_id: ::prost::alloc::string::String, - #[prost(uint32, tag="2")] - pub action_index: u32, - /// contract & scope - #[prost(string, tag="3")] - pub contract: ::prost::alloc::string::String, - #[prost(string, tag="4")] - pub symcode: ::prost::alloc::string::String, - /// data payload - #[prost(string, tag="5")] - pub account: ::prost::alloc::string::String, - #[prost(string, tag="6")] - pub balance: ::prost::alloc::string::String, - #[prost(int64, tag="7")] - pub balance_delta: i64, - /// extras - #[prost(uint32, tag="10")] - pub precision: u32, - #[prost(int64, tag="11")] - pub amount: i64, - #[prost(double, tag="12")] - pub value: f64, - /// block information - #[prost(uint64, tag="13")] - pub block_num: u64, - #[prost(message, optional, tag="14")] - pub timestamp: ::core::option::Option<::prost_types::Timestamp>, -} -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Stats { - #[prost(message, repeated, tag="1")] - pub changes: ::prost::alloc::vec::Vec, -} -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Stat { - /// trace information - #[prost(string, tag="1")] - pub trx_id: ::prost::alloc::string::String, - #[prost(uint32, tag="2")] - pub action_index: u32, - /// contract & scope - #[prost(string, tag="3")] - pub contract: ::prost::alloc::string::String, - #[prost(string, tag="4")] - pub symcode: ::prost::alloc::string::String, - /// data payload - #[prost(string, tag="5")] - pub issuer: ::prost::alloc::string::String, - #[prost(string, tag="6")] - pub max_supply: ::prost::alloc::string::String, - #[prost(string, tag="7")] - pub supply: ::prost::alloc::string::String, - #[prost(int64, tag="8")] - pub supply_delta: i64, - /// extras - #[prost(uint32, tag="10")] - pub precision: u32, - #[prost(int64, tag="11")] - pub amount: i64, - #[prost(double, tag="12")] - pub value: f64, - /// block information - #[prost(uint64, tag="13")] - pub block_num: u64, - #[prost(message, optional, tag="14")] - pub timestamp: ::core::option::Option<::prost_types::Timestamp>, -} -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] pub struct Events { #[prost(message, repeated, tag="1")] pub transfers: ::prost::alloc::vec::Vec, @@ -90,6 +10,10 @@ pub struct Events { pub retires: ::prost::alloc::vec::Vec, #[prost(message, repeated, tag="4")] pub creates: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag="5")] + pub balance_changes: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag="6")] + pub supply_changes: ::prost::alloc::vec::Vec, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -225,4 +149,72 @@ pub struct Create { #[prost(message, optional, tag="12")] pub timestamp: ::core::option::Option<::prost_types::Timestamp>, } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BalanceChange { + /// trace information + #[prost(string, tag="1")] + pub trx_id: ::prost::alloc::string::String, + #[prost(uint32, tag="2")] + pub action_index: u32, + /// contract & scope + #[prost(string, tag="3")] + pub contract: ::prost::alloc::string::String, + #[prost(string, tag="4")] + pub symcode: ::prost::alloc::string::String, + /// data payload + #[prost(string, tag="5")] + pub account: ::prost::alloc::string::String, + #[prost(string, tag="6")] + pub balance: ::prost::alloc::string::String, + #[prost(int64, tag="7")] + pub balance_delta: i64, + /// extras + #[prost(uint32, tag="10")] + pub precision: u32, + #[prost(int64, tag="11")] + pub amount: i64, + #[prost(double, tag="12")] + pub value: f64, + /// block information + #[prost(uint64, tag="13")] + pub block_num: u64, + #[prost(message, optional, tag="14")] + pub timestamp: ::core::option::Option<::prost_types::Timestamp>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SupplyChange { + /// trace information + #[prost(string, tag="1")] + pub trx_id: ::prost::alloc::string::String, + #[prost(uint32, tag="2")] + pub action_index: u32, + /// contract & scope + #[prost(string, tag="3")] + pub contract: ::prost::alloc::string::String, + #[prost(string, tag="4")] + pub symcode: ::prost::alloc::string::String, + /// data payload + #[prost(string, tag="5")] + pub issuer: ::prost::alloc::string::String, + #[prost(string, tag="6")] + pub max_supply: ::prost::alloc::string::String, + #[prost(string, tag="7")] + pub supply: ::prost::alloc::string::String, + #[prost(int64, tag="8")] + pub supply_delta: i64, + /// extras + #[prost(uint32, tag="10")] + pub precision: u32, + #[prost(int64, tag="11")] + pub amount: i64, + #[prost(double, tag="12")] + pub value: f64, + /// block information + #[prost(uint64, tag="13")] + pub block_num: u64, + #[prost(message, optional, tag="14")] + pub timestamp: ::core::option::Option<::prost_types::Timestamp>, +} // @@protoc_insertion_point(module) diff --git a/src/sinks.rs b/src/sinks.rs index 2d0d679..93f9a4e 100644 --- a/src/sinks.rs +++ b/src/sinks.rs @@ -3,18 +3,16 @@ use substreams::errors::Error; use substreams_database_change::pb::database::{table_change, DatabaseChanges}; use substreams_entity_change::pb::entity::EntityChanges; -use crate::eosio_token::{Accounts, Stats, Events}; +use crate::eosio_token::Events; use crate::utils::to_key; #[substreams::handlers::map] pub fn graph_out( - map_accounts: Accounts, - map_stats: Stats, map_events: Events, ) -> Result { let mut tables = substreams_entity_change::tables::Tables::new(); - for account in map_accounts.changes { + for account in map_events.balance_changes { let key = to_key(&account.trx_id, account.action_index); tables .create_row("accounts", key) @@ -34,7 +32,7 @@ pub fn graph_out( .set("value", account.value.to_string()); } - for stat in map_stats.changes { + for stat in map_events.supply_changes { let key = to_key(&stat.trx_id, stat.action_index); tables .create_row("stats", key) @@ -141,13 +139,11 @@ pub fn graph_out( #[substreams::handlers::map] pub fn ch_out( - map_accounts: Accounts, - map_stats: Stats, map_events: Events, ) -> Result { let mut tables = DatabaseChanges::default(); - for account in map_accounts.changes { + for account in map_events.balance_changes { let keys = HashMap::from([ ("account".to_string(), account.account.to_string()), ("block_num".to_string(), account.block_num.to_string()), @@ -167,7 +163,7 @@ pub fn ch_out( .change("timestamp", ("", account.timestamp.unwrap().to_string().as_str())); } - for stat in map_stats.changes { + for stat in map_events.supply_changes { let keys = HashMap::from([ ("contract".to_string(), stat.contract.to_string()), ("block_num".to_string(), stat.block_num.to_string()), diff --git a/substreams.yaml b/substreams.yaml index 7f2b0b9..aa67b67 100644 --- a/substreams.yaml +++ b/substreams.yaml @@ -29,25 +29,9 @@ modules: output: type: proto:antelope.eosio.token.v1.Events - - name: map_accounts - kind: map - inputs: - - source: sf.antelope.type.v1.Block - output: - type: proto:antelope.eosio.token.v1.Accounts - - - name: map_stat - kind: map - inputs: - - source: sf.antelope.type.v1.Block - output: - type: proto:antelope.eosio.token.v1.Stats - - name: graph_out kind: map inputs: - - map: map_accounts - - map: map_stat - map: map_events output: type: proto:sf.substreams.sink.entity.v1.EntityChanges @@ -55,8 +39,6 @@ modules: - name: ch_out kind: map inputs: - - map: map_accounts - - map: map_stat - map: map_events output: type: proto:sf.substreams.sink.database.v1.DatabaseChanges From db8d2e1d05db0bb1c1c16b5d345dc028ce677b48 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frederik=20Sch=C3=B6ll?= Date: Wed, 10 Apr 2024 13:35:39 +0200 Subject: [PATCH 3/4] update schema.sql and ch_out --- schema.sql | 98 +++++++++++++++++++++++++++++++++++++++++++--------- src/sinks.rs | 64 ++++++++++++++++++++++++++++++++-- 2 files changed, 144 insertions(+), 18 deletions(-) diff --git a/schema.sql b/schema.sql index 1835549..3adb80e 100644 --- a/schema.sql +++ b/schema.sql @@ -25,7 +25,6 @@ CREATE TABLE IF NOT EXISTS transfer_events action_index UInt32, -- contract & scope -- contract String, - action String, symcode String, -- data payload -- from String, @@ -44,9 +43,9 @@ CREATE TABLE IF NOT EXISTS transfer_events PRIMARY KEY (trx_id, action_index) ORDER BY (trx_id, action_index); --- The table to store all account balance changes. This uses the account and block_num as first primary keys so we can --- use this table to lookup the account balance from a certain block number. -CREATE TABLE IF NOT EXISTS account_events +-- The table to store all account balance changes from the database operations. This uses the account and block_num as +-- first primary keys so we can use this table to lookup the account balance from a certain block number. +CREATE TABLE IF NOT EXISTS balance_change_events ( trx_id String, action_index UInt32, @@ -69,9 +68,9 @@ CREATE TABLE IF NOT EXISTS account_events PRIMARY KEY (account, block_num, trx_id, action_index) ORDER BY (account, block_num, trx_id, action_index); --- The table to store all token supply changes. This uses the account and block_num as first primary keys so we can --- use this table to lookup token supplies from a certain block number. -CREATE TABLE IF NOT EXISTS token_supply_events +-- The table to store all token supply changes from the database operations. This uses the account and block_num as +-- first primary keys so we can use this table to lookup token supplies from a certain block number. +CREATE TABLE IF NOT EXISTS supply_change_events ( trx_id String, action_index UInt32, @@ -95,12 +94,83 @@ CREATE TABLE IF NOT EXISTS token_supply_events PRIMARY KEY (contract, block_num, trx_id, action_index) ORDER BY (contract, block_num, trx_id, action_index); +-- Table to contain all 'eosio.token:issue' transactions +CREATE TABLE IF NOT EXISTS issue_events +( + trx_id String, + action_index UInt32, + -- contract & scope -- + contract String, + symcode String, + -- data payload -- + issuer String, + to String, + quantity String, + memo String, + -- extras -- + precision UInt32, + amount Int64, + value Float64, + -- meta -- + block_num UInt64, + timestamp DateTime +) + ENGINE = ReplacingMergeTree() + PRIMARY KEY (contract, symcode, to, amount, trx_id, action_index) + ORDER BY (contract, symcode, to, amount, trx_id, action_index); + +-- Table to contain all 'eosio.token:retire' transactions -- +CREATE TABLE IF NOT EXISTS retire_events +( + trx_id String, + action_index UInt32, + -- contract & scope -- + contract String, + symcode String, + -- data payload -- + from String, + quantity String, + memo String, + -- extras -- + precision UInt32, + amount Int64, + value Float64, + -- meta -- + block_num UInt64, + timestamp DateTime +) + ENGINE = ReplacingMergeTree() + PRIMARY KEY (contract, symcode, amount, trx_id, action_index) + ORDER BY (contract, symcode, amount, trx_id, action_index); + +-- Table to contain all 'eosio.token:create' transactions +CREATE TABLE IF NOT EXISTS create_events +( + trx_id String, + action_index UInt32, + -- contract & scope -- + contract String, + symcode String, + -- data payload -- + issuer String, + maximum_supply String, + -- extras -- + precision UInt32, + amount Int64, + value Float64, + -- meta -- + block_num UInt64, + timestamp DateTime +) + ENGINE = ReplacingMergeTree() + PRIMARY KEY (contract, symcode, trx_id, action_index) + ORDER BY (contract, symcode, trx_id, action_index); ----------------------------------------------- -- Tables to store the extracted information -- ----------------------------------------------- --- Table to store up to date balances per account and token -- +-- Table to store up to date balances per account and token CREATE TABLE IF NOT EXISTS account_balances ( account String, @@ -120,7 +190,7 @@ CREATE TABLE IF NOT EXISTS account_balances PRIMARY KEY (account, contract, symcode) ORDER BY (account, contract, symcode); --- Table to store up to date token supplies -- +-- Table to store up to date token supplies CREATE TABLE IF NOT EXISTS token_supplies ( contract String, @@ -148,7 +218,6 @@ CREATE TABLE IF NOT EXISTS transfers_from action_index UInt32, contract String, - action String, symcode String, from String, @@ -167,14 +236,13 @@ CREATE TABLE IF NOT EXISTS transfers_from PRIMARY KEY (from, to, trx_id, action_index) ORDER BY (from, to, trx_id, action_index); --- Table to store token transfers primarily indexed by the 'to' field -- +-- Table to store token transfers primarily indexed by the 'to' field CREATE TABLE IF NOT EXISTS transfers_to ( trx_id String, action_index UInt32, contract String, - action String, symcode String, from String, @@ -209,7 +277,7 @@ SELECT account, value, block_num AS updated_at_block_num, timestamp AS updated_at_timestamp -FROM account_events; +FROM balance_change_events; CREATE MATERIALIZED VIEW token_supplies_mv TO token_supplies @@ -224,7 +292,7 @@ SELECT contract, value, block_num AS updated_at_block_num, timestamp AS updated_at_timestamp -FROM token_supply_events; +FROM supply_change_events; CREATE MATERIALIZED VIEW transfers_from_mv TO transfers_from @@ -232,7 +300,6 @@ AS SELECT trx_id, action_index, contract, - action, symcode, from, to, @@ -251,7 +318,6 @@ AS SELECT trx_id, action_index, contract, - action, symcode, from, to, diff --git a/src/sinks.rs b/src/sinks.rs index 93f9a4e..d78cd6c 100644 --- a/src/sinks.rs +++ b/src/sinks.rs @@ -152,7 +152,7 @@ pub fn ch_out( ]); tables - .push_change_composite("account_events", keys, 0, table_change::Operation::Create) + .push_change_composite("balance_change_events", keys, 0, table_change::Operation::Create) .change("contract", ("", account.contract.to_string().as_str())) .change("symcode", ("", account.symcode.to_string().as_str())) .change("balance", ("", account.balance.to_string().as_str())) @@ -172,7 +172,7 @@ pub fn ch_out( ]); tables - .push_change_composite("token_supply_events", keys, 0, table_change::Operation::Create) + .push_change_composite("supply_change_events", keys, 0, table_change::Operation::Create) .change("symcode", ("", stat.symcode.to_string().as_str())) .change("issuer", ("", stat.issuer.to_string().as_str())) .change("max_supply", ("", stat.max_supply.to_string().as_str())) @@ -205,5 +205,65 @@ pub fn ch_out( .change("timestamp", ("", transfer.timestamp.unwrap().to_string().as_str())); } + for issue in map_events.issues { + let keys = HashMap::from([ + ("contract".to_string(), issue.contract), + ("symcode".to_string(), issue.symcode), + ("to".to_string(), issue.to), + ("amount".to_string(), issue.amount.to_string()), + ("trx_id".to_string(), issue.trx_id), + ("action_index".to_string(), issue.action_index.to_string()), + ]); + + tables + .push_change_composite("issue_events", keys, 0, table_change::Operation::Create) + .change("issuer", ("", issue.issuer.to_string().as_str())) + .change("quantity", ("", issue.quantity.to_string().as_str())) + .change("memo", ("", issue.memo.to_string().as_str())) + .change("precision", ("", issue.precision.to_string().as_str())) + .change("value", ("", issue.value.to_string().as_str())) + .change("block_num", ("", issue.block_num.to_string().as_str())) + .change("timestamp", ("", issue.timestamp.unwrap().to_string().as_str())); + } + + for retire in map_events.retires { + let keys = HashMap::from([ + ("contract".to_string(), retire.contract), + ("symcode".to_string(), retire.symcode), + ("amount".to_string(), retire.amount.to_string()), + ("trx_id".to_string(), retire.trx_id), + ("action_index".to_string(), retire.action_index.to_string()), + ]); + + tables + .push_change_composite("retire_events", keys, 0, table_change::Operation::Create) + .change("from", ("", retire.from.to_string().as_str())) + .change("quantity", ("", retire.quantity.to_string().as_str())) + .change("memo", ("", retire.memo.to_string().as_str())) + .change("precision", ("", retire.precision.to_string().as_str())) + .change("value", ("", retire.value.to_string().as_str())) + .change("block_num", ("", retire.block_num.to_string().as_str())) + .change("timestamp", ("", retire.timestamp.unwrap().to_string().as_str())); + } + + for create in map_events.creates { + let keys = HashMap::from([ + ("contract".to_string(), create.contract), + ("symcode".to_string(), create.symcode), + ("trx_id".to_string(), create.trx_id), + ("action_index".to_string(), create.action_index.to_string()), + ]); + + tables + .push_change_composite("create_events", keys, 0, table_change::Operation::Create) + .change("issuer", ("", create.issuer.to_string().as_str())) + .change("maximum_supply", ("", create.maximum_supply.to_string().as_str())) + .change("precision", ("", create.precision.to_string().as_str())) + .change("amount", ("", create.amount.to_string().as_str())) + .change("value", ("", create.value.to_string().as_str())) + .change("block_num", ("", create.block_num.to_string().as_str())) + .change("timestamp", ("", create.timestamp.unwrap().to_string().as_str())); + } + Ok(tables) } From 881e9ea60ac08d6824892c2b1bc2d7ac9f2765de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frederik=20Sch=C3=B6ll?= Date: Wed, 10 Apr 2024 14:28:46 +0200 Subject: [PATCH 4/4] add transfers table indexed by block_num --- schema.sql | 47 +++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 45 insertions(+), 2 deletions(-) diff --git a/schema.sql b/schema.sql index 3adb80e..1df1b8d 100644 --- a/schema.sql +++ b/schema.sql @@ -232,7 +232,7 @@ CREATE TABLE IF NOT EXISTS transfers_from block_num UInt64, timestamp DateTime ) - ENGINE = ReplacingMergeTree(block_num) + ENGINE = ReplacingMergeTree() PRIMARY KEY (from, to, trx_id, action_index) ORDER BY (from, to, trx_id, action_index); @@ -257,10 +257,35 @@ CREATE TABLE IF NOT EXISTS transfers_to block_num UInt64, timestamp DateTime ) - ENGINE = ReplacingMergeTree(block_num) + ENGINE = ReplacingMergeTree() PRIMARY KEY (to, from, trx_id, action_index) ORDER BY (to, from, trx_id, action_index); +-- Table to store token transfers primarily indexed by the 'block_num' field +CREATE TABLE IF NOT EXISTS transfers_block_num +( + trx_id String, + action_index UInt32, + + contract String, + symcode String, + + from String, + to String, + quantity String, + memo String, + + precision UInt32, + amount Int64, + value Float64, + + block_num UInt64, + timestamp DateTime +) + ENGINE = ReplacingMergeTree() + PRIMARY KEY (block_num, trx_id, action_index) + ORDER BY (block_num, trx_id, action_index); + --------------------------------------------------------- -- Materialized views to populate the extracted tables -- --------------------------------------------------------- @@ -315,6 +340,24 @@ FROM transfer_events; CREATE MATERIALIZED VIEW transfers_to_mv TO transfers_to AS +SELECT trx_id, + action_index, + contract, + symcode, + from, + to, + quantity, + memo, + precision, + amount, + value, + block_num, + timestamp +FROM transfer_events; + +CREATE MATERIALIZED VIEW transfers_block_num_mv + TO transfers_block_num +AS SELECT trx_id, action_index, contract,