Skip to content

Commit

Permalink
update to latest substreams-antelope
Browse files Browse the repository at this point in the history
fixes #33
  • Loading branch information
YaroShkvorets committed Feb 6, 2024
1 parent 06ac188 commit 07efbfa
Show file tree
Hide file tree
Showing 22 changed files with 285 additions and 168 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ members = [
prost = "0.11"
prost-types = "0.11"
substreams = "0.5"
substreams-antelope = "0.1"
substreams-antelope = "0.3.2"
substreams-ethereum = "0.9"
substreams-bitcoin = "1"
substreams-sink-kv = "0.1.2"
Expand Down
4 changes: 2 additions & 2 deletions accounts/src/maps.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use substreams::log;
use substreams::errors::Error;
use substreams_antelope::Block;
use substreams::log;
use substreams_antelope::pb::Block;

use crate::abi;
use crate::accounts;
Expand Down
14 changes: 7 additions & 7 deletions antelope.oracles/src/maps.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use substreams::log;
use substreams::errors::Error;
use substreams_antelope::Block;
use substreams::log;
use substreams_antelope::pb::Block;

use crate::abi;
use crate::antelope_oracles::*;
Expand Down Expand Up @@ -38,13 +38,13 @@ fn map_prices(_params: String, block: Block) -> Result<Prices, Error> {
acc_price: price.acc_price,
last_price: price.last_price,
avg_price: price.avg_price,
last_update: price.last_update
last_update: price.last_update,
});
}
Err(e) => {
log::debug!("error={:?}", e);
continue;
},
}
}
}
}
Expand Down Expand Up @@ -76,14 +76,14 @@ fn map_quotes(_params: String, block: Block) -> Result<Quotes, Error> {
median: datapoint.median,
owner: datapoint.owner.as_str().to_string(),
timestamp: datapoint.timestamp.as_str().to_string(),
value: datapoint.value
})
value: datapoint.value,
}),
});
}
Err(e) => {
log::debug!("error={:?}", e);
continue;
},
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion antelope.trxstats/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use substreams::errors::Error;
use substreams_antelope::Block;
use substreams_antelope::pb::Block;
use substreams_database_change::pb::database::DatabaseChanges;
use substreams_database_change::tables::Tables as DatabaseChangeTables;

Expand Down
2 changes: 1 addition & 1 deletion atomicmarket/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ crate-type = ["cdylib"]
[dependencies]
serde_json = "1"
serde = { version = "1", features = ["derive"] }
antelope = "0.0.11"
antelope = "0.1"
substreams = { workspace = true }
substreams-antelope = { workspace = true }
substreams-sink-prometheus = { workspace = true }
6 changes: 4 additions & 2 deletions atomicmarket/src/abi.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use substreams_antelope::ActionTrace;
use antelope::Asset;
use serde::{Deserialize, Serialize};
use substreams_antelope::pb::ActionTrace;

#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
Expand All @@ -22,7 +22,9 @@ pub struct NewSale {

pub fn parse_lognewsale(action_trace: &ActionTrace) -> Option<NewSale> {
let action = action_trace.action.as_ref()?;
if action.name != "lognewsale" { return None; }
if action.name != "lognewsale" {
return None;
}
let data: LogNewSale = serde_json::from_str(&action.json_data).unwrap();
Some(NewSale {
collection_name: data.collection_name,
Expand Down
10 changes: 7 additions & 3 deletions atomicmarket/src/maps.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use substreams::errors::Error;
use substreams_antelope::{Block, ActionTraces};
use substreams_antelope::pb::{ActionTraces, Block};

#[substreams::handlers::map]
fn map_actions(block: Block) -> Result<ActionTraces, Error> {
Expand All @@ -8,8 +8,12 @@ fn map_actions(block: Block) -> Result<ActionTraces, Error> {
for trx in block.all_transaction_traces() {
for trace in &trx.action_traces {
let action_trace = trace.action.as_ref().unwrap().clone();
if action_trace.account != "atomicmarket" { continue; }
if action_trace.account != trace.receiver { continue; }
if action_trace.account != "atomicmarket" {
continue;
}
if action_trace.account != trace.receiver {
continue;
}
action_traces.push(trace.clone());
}
}
Expand Down
10 changes: 5 additions & 5 deletions atomicmarket/src/sinks.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use substreams_antelope::ActionTraces;
use std::collections::HashMap;
use substreams_antelope::pb::ActionTraces;

use substreams::errors::Error;
use substreams_sink_prometheus::{PrometheusOperations, Counter};
use crate::abi;
use substreams::errors::Error;
use substreams_sink_prometheus::{Counter, PrometheusOperations};

#[substreams::handlers::map]
pub fn prom_out(action_traces: ActionTraces) -> Result<PrometheusOperations, Error> {
Expand All @@ -14,9 +14,9 @@ pub fn prom_out(action_traces: ActionTraces) -> Result<PrometheusOperations, Err
let labels = HashMap::from([("collection_name".to_string(), new_sale.collection_name)]);
let mut counter = Counter::from("newsale").with(labels);
prom_out.push(counter.add(new_sale.listen_price as f64));
},
}
None => (),
}
}
Ok(prom_out)
}
}
45 changes: 32 additions & 13 deletions common/src/maps.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use substreams::errors::Error;
use substreams_antelope::{Block, BlockHeader, ActionTraces, BlockRootMerkle, TransactionTraces, DbOps };
use substreams_antelope::pb::{ActionTraces, Block, BlockHeader, BlockRootMerkle, DbOps, TransactionTraces};

use crate::utils;

Expand All @@ -10,7 +10,7 @@ fn map_block_header(block: Block) -> Result<BlockHeader, Error> {

#[substreams::handlers::map]
fn map_blockroot_merkle(block: Block) -> Result<BlockRootMerkle, Error> {
Ok(block.blockroot_merkle.unwrap() )
Ok(block.blockroot_merkle.unwrap())
}

#[substreams::handlers::map]
Expand All @@ -30,13 +30,23 @@ fn map_transaction_traces(params: String, block: Block) -> Result<TransactionTra
let action = trace.action.as_ref().unwrap();

// filter by params
if !filter_contract.is_empty() && filter_contract.contains(&action.account) { has_contract = true; }
if !filter_action.is_empty() && filter_action.contains(&action.name) { has_action = true; }
if !filter_receiver.is_empty() && filter_receiver.contains(&trace.receiver) { has_receiver = true; }
if has_contract || has_action || has_receiver { break; }
if !filter_contract.is_empty() && filter_contract.contains(&action.account) {
has_contract = true;
}
if !filter_action.is_empty() && filter_action.contains(&action.name) {
has_action = true;
}
if !filter_receiver.is_empty() && filter_receiver.contains(&trace.receiver) {
has_receiver = true;
}
if has_contract || has_action || has_receiver {
break;
}
}
// don't include transaction
if !has_contract && !has_action && !has_receiver { continue; }
if !has_contract && !has_action && !has_receiver {
continue;
}
}
transaction_traces.push(trx.clone());
}
Expand All @@ -56,9 +66,15 @@ fn map_action_traces(params: String, block: Block) -> Result<ActionTraces, Error
let action = trace.action.as_ref().unwrap();

// filter by params
if !filter_contract.is_empty() && !filter_contract.contains(&action.account) { continue; }
if !filter_action.is_empty() && !filter_action.contains(&action.name) { continue; }
if !filter_receiver.is_empty() && !filter_receiver.contains(&trace.receiver) { continue; }
if !filter_contract.is_empty() && !filter_contract.contains(&action.account) {
continue;
}
if !filter_action.is_empty() && !filter_action.contains(&action.name) {
continue;
}
if !filter_receiver.is_empty() && !filter_receiver.contains(&trace.receiver) {
continue;
}

action_traces.push(trace.clone());
}
Expand All @@ -75,10 +91,13 @@ fn map_db_ops(params: String, block: Block) -> Result<DbOps, Error> {

for trx in block.all_transaction_traces() {
for db_op in &trx.db_ops {

// filter by params
if !filter_contract.is_empty() && !filter_contract.contains(&db_op.code) { continue; }
if !filter_table.is_empty() && !filter_table.contains(&db_op.table_name) { continue; }
if !filter_contract.is_empty() && !filter_contract.contains(&db_op.code) {
continue;
}
if !filter_table.is_empty() && !filter_table.contains(&db_op.table_name) {
continue;
}

db_ops.push(db_op.clone());
}
Expand Down
53 changes: 30 additions & 23 deletions deferred/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use sinkfiles::Lines;
use substreams::errors::Error;
use substreams_antelope::{Block, d_trx_op};
use substreams_antelope::pb::{d_trx_op, Block};


#[path = "pb/substreams.sink.files.v1.rs"]
#[allow(dead_code)]
pub mod sinkfiles;
#[path = "pb/antelope.eosio.deferred.v1.rs"]
#[allow(dead_code)]
pub mod eosio_deferred;
#[path = "pb/substreams.sink.files.v1.rs"]
#[allow(dead_code)]
pub mod sinkfiles;
pub use self::eosio_deferred::*;

fn get_op_str(op: d_trx_op::Operation) -> String {
Expand All @@ -32,7 +31,11 @@ fn map_deferred(block: Block) -> Result<Transactions, Error> {
for dtrx in &trx.dtrx_ops {
let dtrx = dtrx.clone();
let actions = dtrx.transaction.unwrap().transaction.unwrap().actions;
let action = if actions.len() != 0 { actions.get(0).unwrap().clone() } else { substreams_antelope::Action { ..Default::default() } };
let action = if actions.len() != 0 {
actions.get(0).unwrap().clone()
} else {
substreams_antelope::pb::Action { ..Default::default() }
};
res.transactions.push(Transaction {
parent_trx_id: trx.id.clone(),
trx_id: dtrx.transaction_id,
Expand All @@ -43,7 +46,7 @@ fn map_deferred(block: Block) -> Result<Transactions, Error> {
json_data: action.json_data,
block_num: trx.block_num,
timestamp: block.header.as_ref().unwrap().timestamp.clone(),
producer: producer.to_string()
producer: producer.to_string(),
});
}
}
Expand All @@ -52,21 +55,25 @@ fn map_deferred(block: Block) -> Result<Transactions, Error> {

#[substreams::handlers::map]
fn csv_out(transactions: Transactions) -> Result<Lines, substreams::errors::Error> {

Ok(Lines {
lines: transactions.transactions.into_iter().map(|t| {
format!("{},{},{},{},{},{},\"{}\",{},{},{}",
t.trx_id,
t.parent_trx_id,
t.op,
t.sender,
t.account,
t.action,
t.json_data.replace("\"", "'"),
t.block_num,
t.timestamp.unwrap().to_string(),
t.producer
)
}).collect()
lines: transactions
.transactions
.into_iter()
.map(|t| {
format!(
"{},{},{},{},{},{},\"{}\",{},{},{}",
t.trx_id,
t.parent_trx_id,
t.op,
t.sender,
t.account,
t.action,
t.json_data.replace("\"", "'"),
t.block_num,
t.timestamp.unwrap().to_string(),
t.producer
)
})
.collect(),
})
}
}
33 changes: 23 additions & 10 deletions eosio.cpu/src/maps.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use antelope::Symbol;
use substreams::errors::Error;
use substreams_antelope::Block;
use substreams::log;
use substreams_antelope::pb::Block;

use crate::abi;
use crate::eosio_cpu::*;
Expand All @@ -26,7 +26,6 @@ fn map_transfers(params: String, block: Block) -> Result<TransferEvents, Error>
let mut transaction_count: u32 = 0;

for trx in block.all_transaction_traces() {

let cpu_usage = trx.receipt.as_ref().unwrap().cpu_usage_micro_seconds as u32;
let net_usage = trx.net_usage as u32;
let producer = block.header.as_ref().unwrap().producer.clone();
Expand All @@ -35,8 +34,12 @@ fn map_transfers(params: String, block: Block) -> Result<TransferEvents, Error>
for trace in &trx.action_traces {
let action_trace = trace.action.as_ref().unwrap();

if action_trace.account != trace.receiver { continue; }
if action_trace.name != "transfer" { continue; }
if action_trace.account != trace.receiver {
continue;
}
if action_trace.name != "transfer" {
continue;
}

match abi::Transfer::try_from(action_trace.json_data.as_str()) {
Ok(data) => {
Expand All @@ -47,11 +50,21 @@ fn map_transfers(params: String, block: Block) -> Result<TransferEvents, Error>
let contract = action_trace.account.clone();

// filter by params
if !filter_from.is_empty() && !filter_from.contains(&data.from) { continue; }
if !filter_to.is_empty() && !filter_to.contains(&data.to) { continue; }
if !filter_symcode.is_empty() && !filter_symcode.contains(&symcode) { continue; }
if !filter_contract.is_empty() && !filter_contract.contains(&contract) { continue; }
if !filter_to_or_from.is_empty() && !(filter_to_or_from.contains(&data.to) || filter_to_or_from.contains(&data.from)) { continue; }
if !filter_from.is_empty() && !filter_from.contains(&data.from) {
continue;
}
if !filter_to.is_empty() && !filter_to.contains(&data.to) {
continue;
}
if !filter_symcode.is_empty() && !filter_symcode.contains(&symcode) {
continue;
}
if !filter_contract.is_empty() && !filter_contract.contains(&contract) {
continue;
}
if !filter_to_or_from.is_empty() && !(filter_to_or_from.contains(&data.to) || filter_to_or_from.contains(&data.from)) {
continue;
}
//if filter_quantity_lt.is_some() && !(quantity.amount < filter_quantity_lt.unwrap()) { continue; }
//if filter_quantity_gt.is_some() && !(quantity.amount > filter_quantity_gt.unwrap()) { continue; }
//if filter_quantity_lte.is_some() && !(quantity.amount <= filter_quantity_lte.unwrap()) { continue; }
Expand Down Expand Up @@ -84,7 +97,7 @@ fn map_transfers(params: String, block: Block) -> Result<TransferEvents, Error>
producer: producer.to_string(),
cpu_usage: cpu_usage,
net_usage: net_usage,
tx_count: transaction_count
tx_count: transaction_count,
});
}
Err(_) => continue,
Expand Down
Loading

0 comments on commit 07efbfa

Please sign in to comment.