Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(api): consider expiration return addresses for ledger updates #1314

Merged
merged 17 commits into from
Jan 24, 2024
12 changes: 6 additions & 6 deletions documentation/api/api-explorer.yml
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,13 @@ components:
totalBalance:
type: string
description: >-
The total value held in unspent outputs owned by the given address
(includes funds held in storage deposit).
sigLockedBalance:
The total value held in unspent outputs that is unlockable by the given address or currently timelocked.
Does not include funds held in storage deposit.
availableBalance:
DaughterOfMars marked this conversation as resolved.
Show resolved Hide resolved
type: string
description: >-
The sum of value held in unspent outputs owned by the given address
that are signature locked ("trivially unlockable").
The total value held in unspent outputs that is immediately unlockable at ledgerIndex by the given address.
Does not include funds held in storage deposit.
ledgerIndex:
type: integer
description: The ledger index for which the balance calculation was performed.
Expand Down Expand Up @@ -585,7 +585,7 @@ components:
balance-example:
value:
totalBalance: 100000
sigLockedBalance: 99900
availableBalance: 99900
ledgerIndex: 500000
ledger-updates-address-example:
value:
Expand Down
4 changes: 2 additions & 2 deletions src/analytics/ledger/active_addresses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ impl IntervalAnalytics for AddressActivityMeasurement {
impl Analytics for AddressActivityAnalytics {
type Measurement = AddressActivityMeasurement;

fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput], _ctx: &dyn AnalyticsContext) {
fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput], ctx: &dyn AnalyticsContext) {
for output in consumed {
if let Some(a) = output.owning_address() {
self.addresses.insert(*a);
}
}

for output in created {
if let Some(a) = output.owning_address() {
if let Some(a) = output.output.owning_address(ctx.at().milestone_timestamp) {
self.addresses.insert(*a);
}
}
Expand Down
16 changes: 11 additions & 5 deletions src/analytics/ledger/address_balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
use std::collections::HashMap;

use super::*;
use crate::model::utxo::{Address, TokenAmount};
use crate::model::{
payload::milestone::MilestoneTimestamp,
utxo::{Address, TokenAmount},
};

#[derive(Debug)]
pub(crate) struct AddressBalanceMeasurement {
Expand All @@ -29,10 +32,13 @@ pub(crate) struct AddressBalancesAnalytics {

impl AddressBalancesAnalytics {
/// Initialize the analytics by reading the current ledger state.
pub(crate) fn init<'a>(unspent_outputs: impl IntoIterator<Item = &'a LedgerOutput>) -> Self {
pub(crate) fn init<'a>(
unspent_outputs: impl IntoIterator<Item = &'a LedgerOutput>,
milestone_timestamp: MilestoneTimestamp,
) -> Self {
let mut balances = HashMap::new();
for output in unspent_outputs {
if let Some(&a) = output.owning_address() {
if let Some(&a) = output.output.owning_address(milestone_timestamp) {
*balances.entry(a).or_default() += output.amount();
}
}
Expand All @@ -43,7 +49,7 @@ impl AddressBalancesAnalytics {
impl Analytics for AddressBalancesAnalytics {
type Measurement = AddressBalanceMeasurement;

fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput], _ctx: &dyn AnalyticsContext) {
fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput], ctx: &dyn AnalyticsContext) {
for output in consumed {
if let Some(a) = output.owning_address() {
// All inputs should be present in `addresses`. If not, we skip it's value.
Expand All @@ -57,7 +63,7 @@ impl Analytics for AddressBalancesAnalytics {
}

for output in created {
if let Some(&a) = output.owning_address() {
if let Some(&a) = output.output.owning_address(ctx.at().milestone_timestamp) {
// All inputs should be present in `addresses`. If not, we skip it's value.
*self.balances.entry(a).or_default() += output.amount();
}
Expand Down
4 changes: 2 additions & 2 deletions src/analytics/ledger/base_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ pub(crate) struct BaseTokenActivityMeasurement {
impl Analytics for BaseTokenActivityMeasurement {
type Measurement = Self;

fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput], _ctx: &dyn AnalyticsContext) {
fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput], ctx: &dyn AnalyticsContext) {
// The idea behind the following code is that we keep track of the deltas that are applied to each account that
// is represented by an address.
let mut balance_deltas: HashMap<&Address, i128> = HashMap::new();

// We first gather all tokens that have been moved to an individual address.
for output in created {
if let Some(address) = output.owning_address() {
if let Some(address) = output.output.owning_address(ctx.at().milestone_timestamp) {
*balance_deltas.entry(address).or_default() += output.amount().0 as i128;
}
}
Expand Down
12 changes: 8 additions & 4 deletions src/analytics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
model::{
ledger::{LedgerOutput, LedgerSpent},
metadata::LedgerInclusionState,
payload::{Payload, TransactionEssence},
payload::{milestone::MilestoneTimestamp, Payload, TransactionEssence},
protocol::ProtocolParameters,
tangle::{MilestoneIndex, MilestoneIndexTimestamp},
utxo::Input,
Expand Down Expand Up @@ -152,9 +152,12 @@ impl Analytic {
choice: &AnalyticsChoice,
protocol_params: &ProtocolParameters,
unspent_outputs: impl IntoIterator<Item = &'a LedgerOutput>,
milestone_timestamp: MilestoneTimestamp,
) -> Self {
Self(match choice {
AnalyticsChoice::AddressBalance => Box::new(AddressBalancesAnalytics::init(unspent_outputs)) as _,
AnalyticsChoice::AddressBalance => {
Box::new(AddressBalancesAnalytics::init(unspent_outputs, milestone_timestamp)) as _
}
AnalyticsChoice::BaseTokenActivity => Box::<BaseTokenActivityMeasurement>::default() as _,
AnalyticsChoice::BlockActivity => Box::<BlockActivityMeasurement>::default() as _,
AnalyticsChoice::ActiveAddresses => Box::<AddressActivityAnalytics>::default() as _,
Expand Down Expand Up @@ -396,7 +399,7 @@ mod test {
ledger::{LedgerOutput, LedgerSpent},
metadata::BlockMetadata,
node::NodeConfiguration,
payload::{MilestoneId, MilestonePayload},
payload::{milestone::MilestoneTimestamp, MilestoneId, MilestonePayload},
protocol::ProtocolParameters,
tangle::{MilestoneIndex, MilestoneIndexTimestamp},
},
Expand Down Expand Up @@ -444,10 +447,11 @@ mod test {
fn init<'a>(
protocol_params: ProtocolParameters,
unspent_outputs: impl IntoIterator<Item = &'a LedgerOutput> + Copy,
milestone_timestamp: MilestoneTimestamp,
) -> Self {
Self {
active_addresses: Default::default(),
address_balance: AddressBalancesAnalytics::init(unspent_outputs),
address_balance: AddressBalancesAnalytics::init(unspent_outputs, milestone_timestamp),
base_tokens: Default::default(),
ledger_outputs: LedgerOutputMeasurement::init(unspent_outputs),
ledger_size: LedgerSizeAnalytics::init(protocol_params, unspent_outputs),
Expand Down
2 changes: 1 addition & 1 deletion src/bin/inx-chronicle/api/explorer/responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl From<LedgerUpdateByMilestoneRecord> for LedgerUpdateByMilestoneDto {
#[serde(rename_all = "camelCase")]
pub struct BalanceResponse {
pub total_balance: String,
pub sig_locked_balance: String,
pub available_balance: String,
pub ledger_index: MilestoneIndex,
}

Expand Down
10 changes: 5 additions & 5 deletions src/bin/inx-chronicle/api/explorer/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,22 +155,22 @@ async fn ledger_updates_by_milestone(
}

async fn balance(database: Extension<MongoDb>, Path(address): Path<String>) -> ApiResult<BalanceResponse> {
let ledger_index = database
let ledger_ms = database
.collection::<MilestoneCollection>()
.get_ledger_index()
.get_newest_milestone()
.await?
.ok_or(MissingError::NoResults)?;
let address = Address::from_str(&address).map_err(RequestError::from)?;
let res = database
.collection::<OutputCollection>()
.get_address_balance(address, ledger_index)
.get_address_balance(address, ledger_ms)
.await?
.ok_or(MissingError::NoResults)?;

Ok(BalanceResponse {
total_balance: res.total_balance,
sig_locked_balance: res.sig_locked_balance,
ledger_index,
available_balance: res.available_balance,
ledger_index: ledger_ms.milestone_index,
})
}

Expand Down
9 changes: 8 additions & 1 deletion src/bin/inx-chronicle/cli/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,14 @@ pub async fn fill_analytics<I: 'static + InputSource + Clone>(

let analytics = analytics_choices
.iter()
.map(|choice| Analytic::init(choice, &milestone.protocol_params, &ledger_state))
.map(|choice| {
Analytic::init(
choice,
&milestone.protocol_params,
&ledger_state,
milestone.at.milestone_timestamp,
)
})
.collect::<Vec<_>>();
state = Some(AnalyticsState {
analytics,
Expand Down
9 changes: 8 additions & 1 deletion src/bin/inx-chronicle/inx/influx/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,14 @@ impl InxWorker {

let analytics = analytics_choices
.iter()
.map(|choice| Analytic::init(choice, &milestone.protocol_params, &ledger_state))
.map(|choice| {
Analytic::init(
choice,
&milestone.protocol_params,
&ledger_state,
milestone.at.milestone_timestamp,
)
})
.collect::<Vec<_>>();
*state = Some(AnalyticsState {
analytics,
Expand Down
122 changes: 122 additions & 0 deletions src/bin/inx-chronicle/migrations/migrate_2.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright 2023 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use async_trait::async_trait;
use chronicle::{
db::{
mongodb::collections::{LedgerUpdateCollection, OutputCollection},
MongoDb, MongoDbCollection, MongoDbCollectionExt,
},
model::{
ledger::{LedgerOutput, LedgerSpent, RentStructureBytes},
metadata::OutputMetadata,
utxo::{Output, OutputId},
},
};
use futures::{prelude::stream::TryStreamExt, StreamExt};
use mongodb::bson::doc;
use serde::Deserialize;
use tokio::{task::JoinSet, try_join};

use super::Migration;

const INSERT_BATCH_SIZE: usize = 1000;

pub struct Migrate;

#[async_trait]
impl Migration for Migrate {
const ID: usize = 2;
const APP_VERSION: &'static str = "1.0.0-rc.3";
const DATE: time::Date = time::macros::date!(2024 - 01 - 12);

async fn migrate(db: &MongoDb) -> eyre::Result<()> {
db.collection::<LedgerUpdateCollection>()
.collection()
.drop(None)
.await?;

let outputs_stream = db
.collection::<OutputCollection>()
.find::<OutputDocument>(doc! {}, None)
.await?;
let mut batched_stream = outputs_stream.try_chunks(INSERT_BATCH_SIZE);

let mut tasks = JoinSet::new();

while let Some(batch) = batched_stream.next().await {
let batch = batch?;
while tasks.len() >= 100 {
if let Some(res) = tasks.join_next().await {
res??;
}
}
let db = db.clone();
tasks.spawn(async move {
let consumed = batch.iter().filter_map(Option::<LedgerSpent>::from).collect::<Vec<_>>();
let created = batch.into_iter().map(LedgerOutput::from).collect::<Vec<_>>();
try_join! {
async {
db.collection::<LedgerUpdateCollection>()
.insert_unspent_ledger_updates(&created)
.await
},
async {
db.collection::<OutputCollection>().update_spent_outputs(&consumed).await
},
async {
db.collection::<LedgerUpdateCollection>().insert_spent_ledger_updates(&consumed).await
}
}
.and(Ok(()))
});
}

while let Some(res) = tasks.join_next().await {
res??;
}

Ok(())
}
}

#[derive(Deserialize)]
pub struct OutputDocument {
#[serde(rename = "_id")]
output_id: OutputId,
output: Output,
metadata: OutputMetadata,
details: OutputDetails,
}

#[derive(Deserialize)]
struct OutputDetails {
rent_structure: RentStructureBytes,
}

impl From<OutputDocument> for LedgerOutput {
fn from(value: OutputDocument) -> Self {
Self {
output_id: value.output_id,
block_id: value.metadata.block_id,
booked: value.metadata.booked,
output: value.output,
rent_structure: value.details.rent_structure,
}
}
}

impl From<&OutputDocument> for Option<LedgerSpent> {
fn from(value: &OutputDocument) -> Self {
value.metadata.spent_metadata.map(|spent_metadata| LedgerSpent {
spent_metadata,
output: LedgerOutput {
output_id: value.output_id,
block_id: value.metadata.block_id,
booked: value.metadata.booked,
output: value.output.clone(),
rent_structure: value.details.rent_structure,
},
})
}
}
4 changes: 3 additions & 1 deletion src/bin/inx-chronicle/migrations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@ use eyre::bail;

pub mod migrate_0;
pub mod migrate_1;
pub mod migrate_2;

pub type LatestMigration = migrate_1::Migrate;
pub type LatestMigration = migrate_2::Migrate;

/// The list of migrations, in order.
const MIGRATIONS: &[&'static dyn DynMigration] = &[
// In order to add a new migration, change the `LatestMigration` type above and add an entry at the bottom of this
// list.
&migrate_0::Migrate,
&migrate_1::Migrate,
&migrate_2::Migrate,
];

fn build_migrations(migrations: &[&'static dyn DynMigration]) -> HashMap<Option<usize>, &'static dyn DynMigration> {
Expand Down
Loading
Loading