Skip to content

Commit

Permalink
move balance_change & supply_change to file
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisCarriere committed Nov 9, 2024
1 parent 9166f27 commit fe1af9d
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 190 deletions.
94 changes: 94 additions & 0 deletions src/balance_changes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use crate::{abi, BalanceChange};
use antelope::{Asset, Name, Symbol, SymbolCode};
use substreams::{log, pb::substreams::Clock};
use substreams_antelope::decoder::decode;
use substreams_antelope::Block;

use crate::utils;

pub fn collect_balance_changes(clock: &Clock, block: &Block) -> Vec<BalanceChange> {
block
.transaction_traces()
.flat_map(|trx| {
trx.db_ops.iter().filter_map(|db_op| {
if db_op.table_name != "accounts" {
return None;
}

let old_data = decode::<abi::types::Account>(&db_op.old_data_json).ok();
let new_data = decode::<abi::types::Account>(&db_op.new_data_json).ok();

let old_balance =
old_data
.as_ref()
.and_then(|data| match data.balance.parse::<Asset>() {
Ok(asset) => Some(asset),
Err(e) => {
log::info!(
"Error parsing old balance asset in trx {}: {:?}",
trx.id,
e
);
None
}
});
let new_balance =
new_data
.as_ref()
.and_then(|data| match data.balance.parse::<Asset>() {
Ok(asset) => Some(asset),
Err(e) => {
log::info!(
"Error parsing new balance asset in trx {}: {:?}",
trx.id,
e
);
None
}
});

if old_balance.is_none() && new_balance.is_none() {
return None;
}

let raw_primary_key = Name::from(db_op.primary_key.as_str()).value;
let symcode = SymbolCode::from(raw_primary_key);
let precision = new_balance
.unwrap_or_else(|| old_balance.unwrap())
.symbol
.precision();
let sym = Symbol::from_precision(symcode, precision);
let balance = new_balance.unwrap_or_else(|| Asset::from_amount(0, sym));
let balance_delta = balance.amount
- old_balance
.unwrap_or_else(|| Asset::from_amount(0, sym))
.amount;

Some(BalanceChange {
// trace information
trx_id: trx.id.clone(),
action_index: db_op.action_index,

// contract & scope
contract: db_op.code.clone(),
symcode: symcode.to_string(),

// payload
account: db_op.scope.clone(),
balance: balance.to_string(),
balance_delta,

// extras
precision: precision.into(),
amount: balance.amount,
value: utils::to_value(&balance),

block_num: clock.number,
timestamp: clock.timestamp,
block_hash: clock.id.clone(),
block_date: utils::to_date(&clock),
})
})
})
.collect()
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ mod balance_changes;
mod maps;
mod pb;
mod sinks;
mod supply_changes;
mod utils;
197 changes: 7 additions & 190 deletions src/maps.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use antelope::{Asset, Name, Symbol, SymbolCode};
use antelope::Asset;
use substreams::errors::Error;
use substreams::log;
use substreams::pb::substreams::Clock;
use substreams_antelope::{decoder::decode, pb::Block};
use substreams_antelope::pb::Block;

use crate::abi;
use crate::balance_changes::collect_balance_changes;
use crate::eosio_token::*;
use crate::utils;
use crate::supply_changes::collect_supply_changes;
use crate::utils::{self, to_date};

#[substreams::handlers::map]
fn map_events(clock: Clock, block: Block) -> Result<Events, Error> {
Expand Down Expand Up @@ -151,197 +153,12 @@ fn map_events(clock: Clock, block: Block) -> Result<Events, Error> {
})
.collect();

let balance_changes = block
.transaction_traces()
.flat_map(|trx| {
trx.db_ops.iter().filter_map(|db_op| {
if db_op.table_name != "accounts" {
return None;
}

let old_data = decode::<abi::types::Account>(&db_op.old_data_json).ok();
let new_data = decode::<abi::types::Account>(&db_op.new_data_json).ok();

let old_balance =
old_data
.as_ref()
.and_then(|data| match data.balance.parse::<Asset>() {
Ok(asset) => Some(asset),
Err(e) => {
log::info!(
"Error parsing old balance asset in trx {}: {:?}",
trx.id,
e
);
None
}
});
let new_balance =
new_data
.as_ref()
.and_then(|data| match data.balance.parse::<Asset>() {
Ok(asset) => Some(asset),
Err(e) => {
log::info!(
"Error parsing new balance asset in trx {}: {:?}",
trx.id,
e
);
None
}
});

if old_balance.is_none() && new_balance.is_none() {
return None;
}

let raw_primary_key = Name::from(db_op.primary_key.as_str()).value;
let symcode = SymbolCode::from(raw_primary_key);
let precision = new_balance
.unwrap_or_else(|| old_balance.unwrap())
.symbol
.precision();
let sym = Symbol::from_precision(symcode, precision);
let balance = new_balance.unwrap_or_else(|| Asset::from_amount(0, sym));
let balance_delta = balance.amount
- old_balance
.unwrap_or_else(|| Asset::from_amount(0, sym))
.amount;

Some(BalanceChange {
// trace information
trx_id: trx.id.clone(),
action_index: db_op.action_index,

// contract & scope
contract: db_op.code.clone(),
symcode: symcode.to_string(),

// payload
account: db_op.scope.clone(),
balance: balance.to_string(),
balance_delta,

// extras
precision: precision.into(),
amount: balance.amount,
value: utils::to_value(&balance),

block_num: clock.number,
timestamp: clock.timestamp,
block_hash: clock.id.clone(),
block_date: to_date(&clock),
})
})
})
.collect();

let supply_changes = block
.transaction_traces()
.flat_map(|trx| {
trx.db_ops.iter().filter_map(|db_op| {
if db_op.table_name != "stat" {
return None;
}

let old_data = decode::<abi::types::CurrencyStats>(&db_op.old_data_json).ok();
let new_data = decode::<abi::types::CurrencyStats>(&db_op.new_data_json).ok();

let old_supply =
old_data
.as_ref()
.and_then(|data| match data.supply.parse::<Asset>() {
Ok(asset) => Some(asset),
Err(e) => {
log::info!(
"Error parsing old supply asset in trx {}: {:?}",
trx.id,
e
);
None
}
});

let new_supply =
new_data
.as_ref()
.and_then(|data| match data.supply.parse::<Asset>() {
Ok(asset) => Some(asset),
Err(e) => {
log::info!(
"Error parsing new supply asset in trx {}: {:?}",
trx.id,
e
);
None
}
});

if old_supply.is_none() && new_supply.is_none() {
return None;
}

let symcode = SymbolCode::from(Name::from(db_op.primary_key.as_str()).value);
let precision = new_supply
.unwrap_or_else(|| old_supply.unwrap())
.symbol
.precision();
let sym = Symbol::from_precision(symcode, precision);
let supply = new_supply.unwrap_or_else(|| Asset::from_amount(0, sym));
let supply_delta = supply.amount
- old_supply
.unwrap_or_else(|| Asset::from_amount(0, sym))
.amount;

let data = new_data.unwrap_or_else(|| old_data.unwrap());

Some(SupplyChange {
// trace information
trx_id: trx.id.clone(),
action_index: db_op.action_index,

// contract & scope
contract: db_op.code.clone(),
symcode: symcode.to_string(),

// payload
issuer: data.issuer,
max_supply: data.max_supply,
supply: supply.to_string(),
supply_delta,

// extras
precision: precision.into(),
amount: supply.amount,
value: utils::to_value(&supply),

block_num: clock.number,
timestamp: clock.timestamp,
block_hash: clock.id.clone(),
block_date: to_date(&clock),
})
})
})
.collect();

Ok(Events {
transfers,
issues,
retires,
creates,
balance_changes,
supply_changes,
balance_changes: collect_balance_changes(&clock, &block),
supply_changes: collect_supply_changes(&clock, &block),
})
}

// Clock to date string
// ex: Clock => 2015-07-30
pub fn to_date(clock: &Clock) -> String {
let timestamp = clock.timestamp.as_ref().expect("missing timestamp");
timestamp
.to_string()
.split('T')
.next()
.expect("missing date")
.to_string()
}
Loading

0 comments on commit fe1af9d

Please sign in to comment.