Skip to content

Commit

Permalink
First round of updates for influx measurements. Split address activit…
Browse files Browse the repository at this point in the history
…y response by type.
  • Loading branch information
Alex Coats committed Feb 21, 2024
1 parent fe7c2da commit 2e25952
Show file tree
Hide file tree
Showing 21 changed files with 810 additions and 187 deletions.
40 changes: 40 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 8 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ dotenvy = { version = "0.15", default-features = false }
eyre = { version = "0.6", default-features = false, features = [ "track-caller", "auto-install" ] }
futures = { version = "0.3", default-features = false }
hex = { version = "0.4", default-features = false }
humantime = { version = "2.1.0", default-features = false }
humantime = { version = "2.1", default-features = false }
humantime-serde = { version = "1.1", default-features = false }
iota-crypto = { version = "0.23", default-features = false, features = [ "blake2b", "ed25519", "slip10", "bip39-en", "random", "zeroize" ] }
iota-sdk = { git = "https://github.com/iotaledger/iota-sdk", branch = "2.0", default-features = false, features = [ "std", "serde" ] }
Expand All @@ -55,20 +55,21 @@ uuid = { version = "1.3", default-features = false, features = [ "v4" ] }
# Optional
chrono = { version = "0.4", default-features = false, features = [ "std" ], optional = true }
influxdb = { version = "0.7", default-features = false, features = [ "use-serde", "reqwest-client-rustls", "derive" ], optional = true }
rayon = { version = "1.8", default-features = false }

# API
auth-helper = { version = "0.3", default-features = false, optional = true }
axum = { version = "0.7.4", default-features = false, features = [ "http1", "json", "query", "original-uri", "tokio", "macros" ], optional = true }
axum-extra = { version = "*", default-features = false, features = [ "typed-header" ] }
axum = { version = "0.7", default-features = false, features = [ "http1", "json", "query", "original-uri", "tokio", "macros" ], optional = true }
axum-extra = { version = "0.9", default-features = false, features = [ "typed-header" ] }
ed25519-zebra = { version = "4.0", default-features = false, features = [ "std", "pkcs8", "pem" ], optional = true }
hyper = { version = "1.1.0", default-features = false, features = [ "server" ], optional = true }
hyper-util = { version = "0.1.3", default-features = false }
hyper-util = { version = "0.1", default-features = false }
rand = { version = "0.8", default-features = false, features = [ "std" ], optional = true }
regex = { version = "1.7", default-features = false, features = [ "std" ], optional = true }
rust-argon2 = { version = "2.0.0", default-features = false, optional = true }
regex = { version = "1.8.4", default-features = false, features = [ "std" ], optional = true }
rust-argon2 = { version = "2.0", default-features = false, optional = true }
serde_urlencoded = { version = "0.7", default-features = false, optional = true }
tower = { version = "0.4", default-features = false, optional = true }
tower-http = { version = "0.5.1", default-features = false, features = [ "cors", "catch-panic", "trace" ], optional = true }
tower-http = { version = "0.5", default-features = false, features = [ "cors", "catch-panic", "trace" ], optional = true }
zeroize = { version = "1.5", default-features = false, features = [ "std", "zeroize_derive" ], optional = true }

# INX
Expand Down
96 changes: 87 additions & 9 deletions src/analytics/influx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ use iota_sdk::types::block::protocol::ProtocolParameters;

use super::{
ledger::{
AddressActivityMeasurement, AddressBalanceMeasurement, BaseTokenActivityMeasurement, LedgerOutputMeasurement,
LedgerSizeMeasurement, OutputActivityMeasurement, TransactionSizeMeasurement, UnlockConditionMeasurement,
AddressActivityMeasurement, AddressBalanceMeasurement, BaseTokenActivityMeasurement, FeaturesMeasurement,
LedgerOutputMeasurement, LedgerSizeMeasurement, OutputActivityMeasurement, TransactionSizeMeasurement,
UnlockConditionMeasurement,
},
tangle::{BlockActivityMeasurement, SlotSizeMeasurement},
tangle::{BlockActivityMeasurement, BlockIssuerMeasurement, ManaActivityMeasurement, SlotSizeMeasurement},
AnalyticsInterval, PerInterval, PerSlot,
};
use crate::db::influxdb::InfluxDb;
Expand Down Expand Up @@ -112,11 +113,42 @@ impl Measurement for AddressBalanceMeasurement {
const NAME: &'static str = "iota_addresses";

fn add_fields(&self, query: WriteQuery) -> WriteQuery {
let mut query = query.add_field("address_with_balance_count", self.address_with_balance_count as u64);
let mut query = query
.add_field(
"ed25519_address_with_balance_count",
self.ed25519_address_with_balance_count as u64,
)
.add_field(
"account_address_with_balance_count",
self.account_address_with_balance_count as u64,
)
.add_field(
"nft_address_with_balance_count",
self.nft_address_with_balance_count as u64,
)
.add_field(
"anchor_address_with_balance_count",
self.anchor_address_with_balance_count as u64,
)
.add_field(
"implicit_account_address_with_balance_count",
self.implicit_address_with_balance_count as u64,
);
for (index, stat) in self.token_distribution.iter().enumerate() {
query = query
.add_field(format!("address_count_{index}"), stat.address_count)
.add_field(format!("total_amount_{index}"), stat.total_amount);
.add_field(format!("ed25519_address_count_{index}"), stat.ed25519_count as u64)
.add_field(format!("ed25519_total_amount_{index}"), stat.ed25519_amount)
.add_field(format!("account_address_count_{index}"), stat.account_count as u64)
.add_field(format!("account_total_amount_{index}"), stat.account_amount)
.add_field(format!("nft_address_count_{index}"), stat.nft_count as u64)
.add_field(format!("nft_total_amount_{index}"), stat.nft_amount)
.add_field(format!("anchor_address_count_{index}"), stat.anchor_count as u64)
.add_field(format!("anchor_total_amount_{index}"), stat.anchor_amount)
.add_field(
format!("implicit_account_address_count_{index}"),
stat.implicit_count as u64,
)
.add_field(format!("implicit_account_total_amount_{index}"), stat.implicit_amount);
}
query
}
Expand Down Expand Up @@ -158,11 +190,35 @@ impl Measurement for BlockActivityMeasurement {
}
}

impl Measurement for BlockIssuerMeasurement {
const NAME: &'static str = "iota_block_issuer_activity";

fn add_fields(&self, query: WriteQuery) -> WriteQuery {
query.add_field("active_issuer_count", self.active_issuer_count as u64)
}
}

impl Measurement for ManaActivityMeasurement {
const NAME: &'static str = "iota_mana_activity";

fn add_fields(&self, query: WriteQuery) -> WriteQuery {
query
.add_field("rewards_claimed", self.rewards_claimed)
.add_field("mana_burned", self.mana_burned)
.add_field("bic_burned", self.bic_burned)
}
}

impl Measurement for AddressActivityMeasurement {
const NAME: &'static str = "iota_active_addresses";

fn add_fields(&self, query: WriteQuery) -> WriteQuery {
query.add_field("count", self.count as u64)
query
.add_field("ed25519_count", self.ed25519_count as u64)
.add_field("account_count", self.account_count as u64)
.add_field("nft_count", self.nft_count as u64)
.add_field("anchor_count", self.anchor_count as u64)
.add_field("implicit_account_count", self.implicit_count as u64)
}
}

Expand Down Expand Up @@ -203,8 +259,9 @@ impl Measurement for LedgerOutputMeasurement {
query
.add_field("basic_count", self.basic.count as u64)
.add_field("basic_amount", self.basic.amount)
.add_field("account_count", self.account.count as u64)
.add_field("account_amount", self.account.amount)
.add_field("account_count", self.account.count_and_amount.count as u64)
.add_field("account_amount", self.account.count_and_amount.amount)
.add_field("block_issuer_accounts", self.account.block_issuers_count as u64)
.add_field("anchor_count", self.anchor.count as u64)
.add_field("anchor_amount", self.anchor.amount)
.add_field("foundry_count", self.foundry.count as u64)
Expand Down Expand Up @@ -251,6 +308,10 @@ impl Measurement for OutputActivityMeasurement {
fn add_fields(&self, query: WriteQuery) -> WriteQuery {
query
.add_field("account_created_count", self.account.created_count as u64)
.add_field(
"account_block_issuer_key_rotated_count",
self.account.block_issuer_key_rotated as u64,
)
.add_field("account_destroyed_count", self.account.destroyed_count as u64)
.add_field("anchor_created_count", self.anchor.created_count as u64)
.add_field("anchor_state_changed_count", self.anchor.state_changed_count as u64)
Expand All @@ -266,7 +327,10 @@ impl Measurement for OutputActivityMeasurement {
.add_field("foundry_transferred_count", self.foundry.transferred_count as u64)
.add_field("foundry_destroyed_count", self.foundry.destroyed_count as u64)
.add_field("delegation_created_count", self.delegation.created_count as u64)
.add_field("delegation_delayed_count", self.delegation.delayed_count as u64)
.add_field("delegation_destroyed_count", self.delegation.destroyed_count as u64)
.add_field("native_token_minted_count", self.native_token.minted_count as u64)
.add_field("native_token_melted_count", self.native_token.melted_count as u64)
}
}

Expand Down Expand Up @@ -297,6 +361,20 @@ impl Measurement for UnlockConditionMeasurement {
}
}

impl Measurement for FeaturesMeasurement {
const NAME: &'static str = "iota_features";

fn add_fields(&self, query: WriteQuery) -> WriteQuery {
query
.add_field("native_tokens_count", self.native_tokens.count as u64)
.add_field("native_tokens_amount", self.native_tokens.amount)
.add_field("block_issuer_key_count", self.block_issuer.count as u64)
.add_field("block_issuer_key_amount", self.block_issuer.amount)
.add_field("staking_count", self.staking.count as u64)
.add_field("staking_amount", self.staking.amount)
}
}

impl InfluxDb {
/// Writes a [`Measurement`] to the InfluxDB database.
pub(super) async fn insert_measurement(&self, measurement: impl PrepareQuery) -> Result<(), influxdb::Error> {
Expand Down
69 changes: 59 additions & 10 deletions src/analytics/ledger/active_addresses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@

use std::collections::HashSet;

use iota_sdk::types::block::address::{Bech32Address, ToBech32Ext};
use iota_sdk::types::block::{
address::{AccountAddress, Address, AnchorAddress, Ed25519Address, ImplicitAccountCreationAddress, NftAddress},
payload::SignedTransactionPayload,
};

use crate::{
analytics::{Analytics, AnalyticsContext, AnalyticsInterval, IntervalAnalytics},
Expand All @@ -13,14 +16,22 @@ use crate::{

#[derive(Debug, Default)]
pub(crate) struct AddressActivityMeasurement {
pub(crate) count: usize,
pub(crate) ed25519_count: usize,
pub(crate) account_count: usize,
pub(crate) nft_count: usize,
pub(crate) anchor_count: usize,
pub(crate) implicit_count: usize,
}

/// Computes the number of addresses that were active during a given time interval.
#[allow(missing_docs)]
#[derive(Debug, Default)]
pub(crate) struct AddressActivityAnalytics {
addresses: HashSet<Bech32Address>,
ed25519_addresses: HashSet<Ed25519Address>,
account_addresses: HashSet<AccountAddress>,
nft_addresses: HashSet<NftAddress>,
anchor_addresses: HashSet<AnchorAddress>,
implicit_addresses: HashSet<ImplicitAccountCreationAddress>,
}

#[async_trait::async_trait]
Expand All @@ -33,35 +44,73 @@ impl IntervalAnalytics for AddressActivityMeasurement {
interval: AnalyticsInterval,
db: &MongoDb,
) -> eyre::Result<Self::Measurement> {
let count = db
let res = db
.collection::<OutputCollection>()
.get_address_activity_count_in_range(start_date, interval.end_date(&start_date))
.await?;
Ok(AddressActivityMeasurement { count })
Ok(AddressActivityMeasurement {
ed25519_count: res.ed25519_count,
account_count: res.account_count,
nft_count: res.nft_count,
anchor_count: res.anchor_count,
implicit_count: res.implicit_count,
})
}
}

impl Analytics for AddressActivityAnalytics {
type Measurement = AddressActivityMeasurement;

fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput], ctx: &dyn AnalyticsContext) {
let hrp = ctx.protocol_parameters().bech32_hrp();
fn handle_transaction(
&mut self,
_payload: &SignedTransactionPayload,
consumed: &[LedgerSpent],
created: &[LedgerOutput],
_ctx: &dyn AnalyticsContext,
) {
for output in consumed {
if let Some(a) = output.address() {
self.addresses.insert(a.clone().to_bech32(hrp));
self.add_address(a);
}
}

for output in created {
if let Some(a) = output.address() {
self.addresses.insert(a.clone().to_bech32(hrp));
self.add_address(a);
}
}
}

fn take_measurement(&mut self, _ctx: &dyn AnalyticsContext) -> Self::Measurement {
AddressActivityMeasurement {
count: std::mem::take(self).addresses.len(),
ed25519_count: std::mem::take(&mut self.ed25519_addresses).len(),
account_count: std::mem::take(&mut self.account_addresses).len(),
nft_count: std::mem::take(&mut self.nft_addresses).len(),
anchor_count: std::mem::take(&mut self.anchor_addresses).len(),
implicit_count: std::mem::take(&mut self.implicit_addresses).len(),
}
}
}

impl AddressActivityAnalytics {
fn add_address(&mut self, address: &Address) {
match address {
Address::Ed25519(a) => {
self.ed25519_addresses.insert(*a);
}
Address::Account(a) => {
self.account_addresses.insert(*a);
}
Address::Nft(a) => {
self.nft_addresses.insert(*a);
}
Address::Anchor(a) => {
self.anchor_addresses.insert(*a);
}
Address::ImplicitAccountCreation(a) => {
self.implicit_addresses.insert(*a);
}
_ => (),
}
}
}
Loading

0 comments on commit 2e25952

Please sign in to comment.