From 75c0f179dfcef0b16efef576599f78232b54927a Mon Sep 17 00:00:00 2001 From: canonbrother Date: Tue, 24 Dec 2024 20:23:54 +0800 Subject: [PATCH] Ocean: fixes (#3128) * refix prices db * filter_map as set * fmt_rs * note * filter_map to get sub_root * rm clone * clean * only store latest data in db * add PriceTickerKey * fmt_rs * fix oracle interval type * rm debug * take(1) to next * missing grab prefix - take_while * fmt_rs * rm unuse * fix oracle interval index * fix oracle interval index 2 * missing dash * fix missing script_agg * fix OraclePriceAggregatedIntervalResponse * fix OraclePriceAggregatedIntervalResponse sort * fix OraclePriceAggregatedIntervalResponse sort * fix OraclePriceAggregatedIntervalResponse sort * skip * decimal * update get_price api * update get_price api * update get_price api * rename * fmt --- lib/ain-ocean/src/api/address.rs | 13 ++- lib/ain-ocean/src/api/common.rs | 29 +++++- lib/ain-ocean/src/api/loan.rs | 9 +- lib/ain-ocean/src/api/pool_pair/mod.rs | 2 +- lib/ain-ocean/src/api/pool_pair/service.rs | 16 +-- lib/ain-ocean/src/api/prices.rs | 99 ++++++++++--------- lib/ain-ocean/src/api/stats/cache.rs | 5 +- lib/ain-ocean/src/error.rs | 6 ++ lib/ain-ocean/src/indexer/loan_token.rs | 37 +++---- lib/ain-ocean/src/indexer/mod.rs | 22 ++--- lib/ain-ocean/src/indexer/oracle.rs | 93 ++++++++++------- lib/ain-ocean/src/indexer/poolswap.rs | 52 +++------- lib/ain-ocean/src/lib.rs | 2 + .../model/oracle_price_aggregated_interval.rs | 43 ++++++-- lib/ain-ocean/src/model/price_ticker.rs | 4 +- lib/ain-ocean/src/storage/mod.rs | 11 ++- 16 files changed, 262 insertions(+), 181 deletions(-) diff --git a/lib/ain-ocean/src/api/address.rs b/lib/ain-ocean/src/api/address.rs index be6ee17749..2ab5a2f8ad 100644 --- a/lib/ain-ocean/src/api/address.rs +++ b/lib/ain-ocean/src/api/address.rs @@ -174,19 +174,18 @@ fn get_latest_aggregation( .script_aggregation .by_id .list(Some((hid, [0xffu8; 4])), SortOrder::Descending)? - .take(1) .take_while(|item| match item { Ok(((v, _), _)) => v == &hid, _ => true, }) + .next() + .transpose()? .map(|item| { - let (_, v) = item?; - let res = v.into(); - Ok(res) - }) - .collect::>>()?; + let (_, v) = item; + v.into() + }); - Ok(latest.first().cloned()) + Ok(latest) } #[ocean_endpoint] diff --git a/lib/ain-ocean/src/api/common.rs b/lib/ain-ocean/src/api/common.rs index 64d79f0d9f..1b70b257ff 100644 --- a/lib/ain-ocean/src/api/common.rs +++ b/lib/ain-ocean/src/api/common.rs @@ -11,9 +11,10 @@ use super::query::PaginationQuery; use crate::{ error::{ Error::ToArrayError, InvalidAmountSnafu, InvalidFixedIntervalPriceSnafu, - InvalidPoolPairSymbolSnafu, InvalidTokenCurrencySnafu, + InvalidPoolPairSymbolSnafu, InvalidPriceTickerSortKeySnafu, InvalidTokenCurrencySnafu, }, hex_encoder::as_sha256, + model::PriceTickerId, network::Network, Result, }; @@ -128,6 +129,32 @@ pub fn parse_query_height_txid(item: &str) -> Result<(u32, Txid)> { Ok((height, txid)) } +pub fn parse_price_ticker_sort(item: &str) -> Result { + let mut parts = item.split('-'); + let count_height_token = parts + .next() + .context(InvalidPriceTickerSortKeySnafu { item })?; + let encoded_count = &count_height_token[..8]; + let encoded_height = &count_height_token[8..16]; + let token = &count_height_token[16..]; + let token = token.to_string(); + + let count: [u8; 4] = hex::decode(encoded_count)? + .try_into() + .map_err(|_| ToArrayError)?; + + let height: [u8; 4] = hex::decode(encoded_height)? + .try_into() + .map_err(|_| ToArrayError)?; + + let currency = parts + .next() + .context(InvalidTokenCurrencySnafu { item })? + .to_string(); + + Ok((count, height, token, currency)) +} + #[must_use] pub fn format_number(v: Decimal) -> String { if v == dec!(0) { diff --git a/lib/ain-ocean/src/api/loan.rs b/lib/ain-ocean/src/api/loan.rs index fb67b4b63f..905c491eae 100644 --- a/lib/ain-ocean/src/api/loan.rs +++ b/lib/ain-ocean/src/api/loan.rs @@ -271,7 +271,14 @@ async fn list_loan_token( .services .oracle_price_active .by_id - .list(Some((token, currency, [0xffu8; 4])), SortOrder::Descending)? + .list( + Some((token.clone(), currency.clone(), [0xffu8; 4])), + SortOrder::Descending, + )? + .take_while(|item| match item { + Ok((k, _)) => k.0 == token && k.1 == currency, + _ => true, + }) .next() .map(|item| { let (_, v) = item?; diff --git a/lib/ain-ocean/src/api/pool_pair/mod.rs b/lib/ain-ocean/src/api/pool_pair/mod.rs index 045e035cf1..73a535b7c3 100644 --- a/lib/ain-ocean/src/api/pool_pair/mod.rs +++ b/lib/ain-ocean/src/api/pool_pair/mod.rs @@ -79,7 +79,7 @@ impl PoolSwapVerboseResponse { Self { id: format!("{}-{}", v.pool_id, v.txid), sort: format!( - "{}{}", + "{}-{}", hex::encode(v.block.height.to_be_bytes()), hex::encode(v.txno.to_be_bytes()), ), diff --git a/lib/ain-ocean/src/api/pool_pair/service.rs b/lib/ain-ocean/src/api/pool_pair/service.rs index c12d3780a4..d45b1ff93f 100644 --- a/lib/ain-ocean/src/api/pool_pair/service.rs +++ b/lib/ain-ocean/src/api/pool_pair/service.rs @@ -594,22 +594,22 @@ fn call_dftx(ctx: &Arc, txid: Txid) -> Result> { .transaction .vout_by_id .list(Some((txid, 0)), SortOrder::Ascending)? - .take(1) .take_while(|item| match item { Ok((_, vout)) => vout.txid == txid, _ => true, }) + .next() + .transpose()? .map(|item| { - let (_, v) = item?; - Ok(v) - }) - .collect::>>()?; + let (_, v) = item; + v + }); - if vout.is_empty() { + let Some(vout) = vout else { return Ok(None); - } + }; - let bytes = &vout[0].script.hex; + let bytes = &vout.script.hex; if bytes.len() > 6 && bytes[0] == 0x6a && bytes[1] <= 0x4e { let offset = 1 + match bytes[1] { 0x4c => 2, diff --git a/lib/ain-ocean/src/api/prices.rs b/lib/ain-ocean/src/api/prices.rs index d320e3b07d..e1dd0ebf28 100644 --- a/lib/ain-ocean/src/api/prices.rs +++ b/lib/ain-ocean/src/api/prices.rs @@ -1,4 +1,4 @@ -use std::{collections::HashSet, str::FromStr, sync::Arc}; +use std::{str::FromStr, sync::Arc}; use ain_dftx::{Currency, Token, Weightage, COIN}; use ain_macros::ocean_endpoint; @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize}; use serde_with::skip_serializing_none; use super::{ - common::parse_token_currency, + common::{parse_price_ticker_sort, parse_token_currency}, oracle::OraclePriceFeedResponse, query::PaginationQuery, response::{ApiPagedResponse, Response}, @@ -23,7 +23,7 @@ use crate::{ error::{ApiError, Error}, model::{ BlockContext, OracleIntervalSeconds, OraclePriceActive, - OraclePriceAggregatedIntervalAggregated, PriceTicker, + OraclePriceAggregatedIntervalAggregatedOracles, PriceTicker, }, storage::{RepositoryOps, SortOrder}, Result, @@ -119,28 +119,24 @@ async fn list_prices( Query(query): Query, Extension(ctx): Extension>, ) -> Result> { - let mut set: HashSet<(Token, Currency)> = HashSet::new(); + let next = query + .next + .map(|item| { + let id = parse_price_ticker_sort(&item)?; + Ok::<([u8; 4], [u8; 4], Token, Currency), Error>(id) + }) + .transpose()?; let prices = ctx .services .price_ticker .by_id - .list(None, SortOrder::Descending)? - .flat_map(|item| { + .list(next, SortOrder::Descending)? + .map(|item| { let ((_, _, token, currency), v) = item?; - let has_key = set.contains(&(token.clone(), currency.clone())); - if !has_key { - set.insert((token.clone(), currency.clone())); - Ok::, Error>(Some(PriceTickerResponse::from(( - (token, currency), - v, - )))) - } else { - Ok(None) - } + Ok(PriceTickerResponse::from(((token, currency), v))) }) - .flatten() - .collect::>(); + .collect::>>()?; Ok(ApiPagedResponse::of(prices, query.size, |price| { price.sort.to_string() @@ -154,18 +150,14 @@ async fn get_price( ) -> Result>> { let (token, currency) = parse_token_currency(&key)?; - let price_ticker = ctx - .services - .price_ticker - .by_id - .list( - Some(([0xffu8; 4], [0xffu8; 4], token.clone(), currency.clone())), - SortOrder::Descending, - )? - .next() - .transpose()?; + let price_repo = &ctx.services.price_ticker; + let sort_key = price_repo.by_key.get(&(token.clone(), currency.clone()))?; + let Some(sort_key) = sort_key else { + return Ok(Response::new(None)); + }; + let price_ticker = price_repo.by_id.get(&sort_key)?; - let Some((_, price_ticker)) = price_ticker else { + let Some(price_ticker) = price_ticker else { return Ok(Response::new(None)); }; @@ -217,7 +209,7 @@ async fn get_feed( token: token.clone(), currency: currency.clone(), aggregated: OraclePriceAggregatedAggregatedResponse { - amount: format!("{:.8}", v.aggregated.amount), + amount: format!("{:.8}", v.aggregated.amount / Decimal::from(COIN)), weightage: v.aggregated.weightage.to_i32().unwrap_or_default(), oracles: OraclePriceActiveNextOraclesResponse { active: v.aggregated.oracles.active.to_i32().unwrap_or_default(), @@ -334,8 +326,19 @@ pub struct OraclePriceAggregatedIntervalResponse { pub sort: String, // medianTime-height pub token: Token, pub currency: Currency, - pub aggregated: OraclePriceAggregatedIntervalAggregated, + pub aggregated: OraclePriceAggregatedIntervalAggregatedResponse, pub block: BlockContext, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct OraclePriceAggregatedIntervalAggregatedResponse { + #[serde(with = "rust_decimal::serde::str")] + pub amount: Decimal, + #[serde(with = "rust_decimal::serde::str")] + pub weightage: Decimal, + pub count: i32, + pub oracles: OraclePriceAggregatedIntervalAggregatedOracles, /** * Aggregated interval time range in seconds. * - Interval that aggregated in seconds @@ -362,23 +365,26 @@ async fn get_feed_with_interval( let (token, currency) = parse_token_currency(&key)?; let interval = interval.parse::()?; - let interval_type = match interval { + let interval = match interval { 900 => OracleIntervalSeconds::FifteenMinutes, 3600 => OracleIntervalSeconds::OneHour, 86400 => OracleIntervalSeconds::OneDay, _ => return Err(From::from("Invalid oracle interval")), }; + let interval = interval as u32; let next = query .next .map(|q| { - let height = q.parse::()?.to_be_bytes(); + let height = hex::decode(q)? + .try_into() + .map_err(|_| Error::ToArrayError)?; Ok::<[u8; 4], Error>(height) }) .transpose()? .unwrap_or([0xffu8; 4]); - let id = (token.clone(), currency.clone(), interval_type.clone(), next); + let id = (token.clone(), currency.clone(), interval.to_string(), next); let items = ctx .services @@ -388,40 +394,37 @@ async fn get_feed_with_interval( .take(query.size) .take_while(|item| match item { Ok(((t, c, i, _), _)) => { - t == &token.clone() && c == ¤cy.clone() && i == &interval_type.clone() + t == &token.clone() && c == ¤cy.clone() && i == &interval.to_string() } _ => true, }) .flatten() .collect::>(); + let interval = interval as i64; let mut prices = Vec::new(); for (id, item) in items { let start = item.block.median_time - (item.block.median_time % interval); let height = u32::from_be_bytes(id.3); let price = OraclePriceAggregatedIntervalResponse { - id: format!("{}-{}-{:?}-{}", id.0, id.1, id.2, height), - key: format!("{}-{}-{:?}", id.0, id.1, id.2), - sort: format!( - "{}{}", - hex::encode(item.block.median_time.to_be_bytes()), - hex::encode(item.block.height.to_be_bytes()), - ), + id: format!("{}-{}-{}-{}", id.0, id.1, id.2, height), + key: format!("{}-{}-{}", id.0, id.1, id.2), + sort: hex::encode(item.block.height.to_be_bytes()).to_string(), token: token.clone(), currency: currency.clone(), - aggregated: OraclePriceAggregatedIntervalAggregated { + aggregated: OraclePriceAggregatedIntervalAggregatedResponse { amount: item.aggregated.amount, weightage: item.aggregated.weightage, oracles: item.aggregated.oracles, count: item.aggregated.count, + time: OraclePriceAggregatedIntervalTime { + interval, + start, + end: start + interval, + }, }, block: item.block, - time: OraclePriceAggregatedIntervalTime { - interval, - start, - end: start + interval, - }, }; prices.push(price); } diff --git a/lib/ain-ocean/src/api/stats/cache.rs b/lib/ain-ocean/src/api/stats/cache.rs index cdc60a00b8..9e5d853bbb 100644 --- a/lib/ain-ocean/src/api/stats/cache.rs +++ b/lib/ain-ocean/src/api/stats/cache.rs @@ -27,7 +27,7 @@ use crate::{ stats::get_block_reward_distribution, AppContext, }, - error::{DecimalConversionSnafu, Error, OtherSnafu}, + error::{DecimalConversionSnafu, OtherSnafu}, model::MasternodeStatsData, storage::{RepositoryOps, SortOrder}, Result, @@ -111,7 +111,8 @@ pub async fn get_count(ctx: &Arc) -> Result { .by_id .list(None, SortOrder::Descending)? .filter_map(|item| { - item.ok().map(|((_, _, token, currency), _)| (token, currency)) + item.ok() + .map(|((_, _, token, currency), _)| (token, currency)) }) .collect::>(); diff --git a/lib/ain-ocean/src/error.rs b/lib/ain-ocean/src/error.rs index 2e32aca456..b4d5bda2e9 100644 --- a/lib/ain-ocean/src/error.rs +++ b/lib/ain-ocean/src/error.rs @@ -212,6 +212,12 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + #[snafu(display("Invalid price ticker sort key: {}", item))] + InvalidPriceTickerSortKey { + item: String, + #[snafu(implicit)] + location: Location, + }, #[snafu(display("Invalid amount format: {}", item))] InvalidAmount { item: String, diff --git a/lib/ain-ocean/src/indexer/loan_token.rs b/lib/ain-ocean/src/indexer/loan_token.rs index d05d2ecd9a..86ead3b1e9 100644 --- a/lib/ain-ocean/src/indexer/loan_token.rs +++ b/lib/ain-ocean/src/indexer/loan_token.rs @@ -6,7 +6,6 @@ use rust_decimal::{prelude::Zero, Decimal}; use rust_decimal_macros::dec; use crate::{ - error::Error, indexer::{Context, Index, Result}, model::{BlockContext, OraclePriceActive, OraclePriceActiveNext, OraclePriceAggregated}, network::Network, @@ -87,18 +86,15 @@ pub fn index_active_price(services: &Arc, block: &BlockContext) -> Res _ => 120, }; if block.height % block_interval == 0 { - let mut set: HashSet<(Token, Currency)> = HashSet::new(); let pairs = services .price_ticker .by_id .list(None, SortOrder::Descending)? - .flat_map(|item| { - let ((_, _, token, currency), _) = item?; - set.insert((token, currency)); - Ok::, Error>(set.clone()) + .filter_map(|item| { + item.ok() + .map(|((_, _, token, currency), _)| (token, currency)) }) - .next() - .unwrap_or(set); + .collect::>(); for (token, currency) in pairs { perform_active_price_tick(services, (token, currency), block)?; @@ -143,26 +139,23 @@ pub fn invalidate_active_price(services: &Arc, block: &BlockContext) - _ => 120, }; if block.height % block_interval == 0 { - let mut set: HashSet<(Token, Currency)> = HashSet::new(); let pairs = services .price_ticker .by_id .list(None, SortOrder::Descending)? - .flat_map(|item| { - let ((_, _, token, currency), _) = item?; - set.insert((token, currency)); - Ok::, Error>(set.clone()) + .filter_map(|item| { + item.ok() + .map(|((_, _, token, currency), _)| (token, currency)) }) - .next() - .unwrap_or(set); + .collect::>(); // convert to vector to reverse the hashset is required - let mut vec = Vec::new(); + let mut rev_pairs = Vec::new(); for pair in pairs { - vec.insert(0, pair); + rev_pairs.insert(0, pair); } - for (token, currency) in vec { + for (token, currency) in rev_pairs { services.oracle_price_active.by_id.delete(&( token, currency, @@ -190,6 +183,10 @@ pub fn perform_active_price_tick( .oracle_price_aggregated .by_id .list(Some(id.clone()), SortOrder::Descending)? + .take_while(|item| match item { + Ok((k, _)) => k.0 == id.0 && k.1 == id.1, + _ => true, + }) .next() .transpose()?; @@ -202,6 +199,10 @@ pub fn perform_active_price_tick( let prev = repo .by_id .list(Some(id.clone()), SortOrder::Descending)? + .take_while(|item| match item { + Ok((k, _)) => k.0 == id.0 && k.1 == id.1, + _ => true, + }) .next() .transpose()?; diff --git a/lib/ain-ocean/src/indexer/mod.rs b/lib/ain-ocean/src/indexer/mod.rs index a572323dcf..d72250fb57 100644 --- a/lib/ain-ocean/src/indexer/mod.rs +++ b/lib/ain-ocean/src/indexer/mod.rs @@ -292,7 +292,7 @@ fn index_script(services: &Arc, ctx: &Context, txs: &[Transaction]) -> let Some(vout) = find_tx_vout(services, &vin, txs)? else { if is_skipped_tx(&vin.txid) { - return Ok(()); + continue; }; return Err(Error::NotFoundIndex { @@ -312,7 +312,7 @@ fn index_script(services: &Arc, ctx: &Context, txs: &[Transaction]) -> index_script_unspent_vout(services, vout, ctx)?; if vout.script_pub_key.hex.starts_with(&[0x6a]) { - return Ok(()); + continue; } index_script_activity_vout(services, vout, ctx)?; @@ -322,23 +322,23 @@ fn index_script(services: &Arc, ctx: &Context, txs: &[Transaction]) -> } // index_script_aggregation - for (_, mut aggregation) in record.clone() { + for (_, mut aggregation) in record { let repo = &services.script_aggregation; let latest = repo .by_id .list(Some((aggregation.hid, [0xffu8; 4])), SortOrder::Descending)? - .take(1) .take_while(|item| match item { Ok(((hid, _), _)) => &aggregation.hid == hid, _ => true, }) + .next() + .transpose()? .map(|item| { - let (_, v) = item?; - Ok(v) - }) - .collect::>>()?; + let (_, v) = item; + v + }); - if let Some(latest) = latest.first().cloned() { + if let Some(latest) = latest { aggregation.statistic.tx_in_count += latest.statistic.tx_in_count; aggregation.statistic.tx_out_count += latest.statistic.tx_out_count; @@ -354,8 +354,6 @@ fn index_script(services: &Arc, ctx: &Context, txs: &[Transaction]) -> &(aggregation.hid, ctx.block.height.to_be_bytes()), &aggregation, )?; - - record.insert(aggregation.hid, aggregation); } log_elapsed(start, format!("Indexed script {:x}", ctx.tx.txid)); @@ -383,7 +381,7 @@ fn invalidate_script(services: &Arc, ctx: &Context, txs: &[Transaction let Some(vout) = find_tx_vout(services, &vin, txs)? else { if is_skipped_tx(&vin.txid) { - return Ok(()); + continue; }; return Err(Error::NotFoundIndex { diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index c9fb315fd3..4874095dce 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -27,10 +27,10 @@ use crate::{ Services, }; -pub const AGGREGATED_INTERVALS: [OracleIntervalSeconds; 3] = [ - OracleIntervalSeconds::FifteenMinutes, - OracleIntervalSeconds::OneDay, - OracleIntervalSeconds::OneHour, +pub const AGGREGATED_INTERVALS: [u32; 3] = [ + OracleIntervalSeconds::FifteenMinutes as u32, + OracleIntervalSeconds::OneDay as u32, + OracleIntervalSeconds::OneHour as u32, ]; impl Index for AppointOracle { @@ -270,16 +270,13 @@ fn map_price_aggregated( pair: &(Token, Currency), ) -> Result> { let (token, currency) = pair; - let oracle_repo = &services.oracle_token_currency; + let max_txid = Txid::from_byte_array([0xffu8; 32]); - let oracles = oracle_repo + let oracles = services + .oracle_token_currency .by_id .list( - Some(( - token.clone(), - currency.clone(), - Txid::from_byte_array([0xffu8; 32]), - )), + Some((token.clone(), currency.clone(), max_txid)), SortOrder::Descending, )? .take_while(|item| matches!(item, Ok((k, _)) if &k.0 == token && &k.1 == currency)) @@ -290,7 +287,6 @@ fn map_price_aggregated( let mut aggregated_count = Decimal::zero(); let mut aggregated_weightage = Decimal::zero(); - let base_id = Txid::from_byte_array([0xffu8; 32]); let oracles_len = oracles.len(); for (id, oracle) in oracles { if oracle.weightage == 0 { @@ -302,9 +298,13 @@ fn map_price_aggregated( .oracle_price_feed .by_id .list( - Some((id.0, id.1, id.2, [0xffu8; 4], base_id)), + Some((id.0.clone(), id.1.clone(), id.2, [0xffu8; 4], max_txid)), SortOrder::Descending, )? + .take_while(|item| match item { + Ok((k, _)) => k.0 == id.0 && k.1 == id.1 && k.2 == id.2, + _ => true, + }) .next() .transpose()?; @@ -359,11 +359,8 @@ fn index_set_oracle_data( context: &Context, pairs: &HashSet<(Token, Currency)>, ) -> Result<()> { - let oracle_repo = &services.oracle_price_aggregated; - for pair in pairs { let price_aggregated = map_price_aggregated(services, context, pair)?; - let Some(price_aggregated) = price_aggregated else { continue; }; @@ -377,20 +374,30 @@ fn index_set_oracle_data( price_aggregated.block.median_time.to_be_bytes(), price_aggregated.block.height.to_be_bytes(), ); - oracle_repo.by_id.put(&id, &price_aggregated)?; + services + .oracle_price_aggregated + .by_id + .put(&id, &price_aggregated)?; - let id = ( + let price_repo = &services.price_ticker; + let sort_key = price_repo.by_key.get(&(token.clone(), currency.clone()))?; + if let Some(sort_key) = sort_key { + price_repo.by_id.delete(&sort_key)?; + } + + let new_sort_key = ( price_aggregated.aggregated.oracles.total.to_be_bytes(), price_aggregated.block.height.to_be_bytes(), - token, - currency, + token.clone(), + currency.clone(), ); - services.price_ticker.by_id.put( - &id, + price_repo.by_id.put( + &new_sort_key, &PriceTicker { price: price_aggregated, }, )?; + price_repo.by_key.put(&(token, currency), &new_sort_key)?; } Ok(()) } @@ -471,7 +478,7 @@ impl Index for SetOracleData { token, currency, &aggregated, - &interval, + interval, )?; } @@ -514,9 +521,14 @@ fn start_new_bucket( token: Token, currency: Currency, aggregated: &OraclePriceAggregated, - interval: OracleIntervalSeconds, + interval: u32, ) -> Result<()> { - let id = (token, currency, interval, block.height.to_be_bytes()); + let id = ( + token, + currency, + interval.to_string(), + block.height.to_be_bytes(), + ); services.oracle_price_aggregated_interval.by_id.put( &id, &OraclePriceAggregatedInterval { @@ -542,7 +554,7 @@ pub fn index_interval_mapper( token: Token, currency: Currency, aggregated: &OraclePriceAggregated, - interval: OracleIntervalSeconds, + interval: u32, ) -> Result<()> { let repo = &services.oracle_price_aggregated_interval; let previous = repo @@ -551,30 +563,31 @@ pub fn index_interval_mapper( Some(( token.clone(), currency.clone(), - interval.clone(), + interval.to_string(), [0xffu8; 4], )), SortOrder::Descending, )? .take_while(|item| match item { Ok(((t, c, i, _), _)) => { - t == &token.clone() && c == ¤cy.clone() && i == &interval.clone() + t == &token.clone() && c == ¤cy.clone() && i == &interval.to_string() } _ => true, }) .next() .transpose()?; - let Some(previous) = previous else { + if previous.is_none() { return start_new_bucket(services, block, token, currency, aggregated, interval); }; - if block.median_time - aggregated.block.median_time > interval.clone() as i64 { - return start_new_bucket(services, block, token, currency, aggregated, interval); + if let Some(previous) = previous { + if block.median_time - previous.1.block.median_time > interval as i64 { + return start_new_bucket(services, block, token, currency, aggregated, interval); + }; + forward_aggregate(services, previous, aggregated)?; }; - forward_aggregate(services, previous, aggregated)?; - Ok(()) } @@ -584,7 +597,7 @@ pub fn invalidate_oracle_interval( token: &str, currency: &str, aggregated: &OraclePriceAggregated, - interval: &OracleIntervalSeconds, + interval: u32, ) -> Result<()> { let repo = &services.oracle_price_aggregated_interval; let previous = repo @@ -593,11 +606,15 @@ pub fn invalidate_oracle_interval( Some(( token.to_string(), currency.to_string(), - interval.clone(), + interval.to_string(), [0xffu8; 4], )), SortOrder::Descending, )? + .take_while(|item| match item { + Ok((k, _)) => k.0 == token && k.1 == currency && k.2 == interval.to_string(), + _ => true, + }) .next() .transpose()?; @@ -605,7 +622,7 @@ pub fn invalidate_oracle_interval( return Err(Error::NotFoundIndex { action: IndexAction::Invalidate, r#type: "Invalidate oracle price aggregated interval".to_string(), - id: format!("{}-{}-{:?}", token, currency, interval), + id: format!("{}-{}-{:?}", token, currency, interval.to_string()), }); }; @@ -743,6 +760,10 @@ fn get_previous_oracle( .oracle_history .by_id .list(Some((oracle_id, u32::MAX)), SortOrder::Descending)? + .take_while(|item| match item { + Ok((k, _)) => k.0 == oracle_id, + _ => true, + }) .next() .transpose()?; diff --git a/lib/ain-ocean/src/indexer/poolswap.rs b/lib/ain-ocean/src/indexer/poolswap.rs index e9ed559528..3ae7514185 100644 --- a/lib/ain-ocean/src/indexer/poolswap.rs +++ b/lib/ain-ocean/src/indexer/poolswap.rs @@ -2,7 +2,6 @@ use std::{str::FromStr, sync::Arc}; use ain_cpp_imports::PoolPairCreationHeight; use ain_dftx::{pool::*, COIN}; -use bitcoin::Txid; use log::trace; use parking_lot::RwLock; use rust_decimal::Decimal; @@ -37,33 +36,24 @@ fn index_swap_aggregated( pool_id: u32, from_token_id: u64, from_amount: i64, - txid: Txid, ) -> Result<()> { for interval in AGGREGATED_INTERVALS { let repo = &services.pool_swap_aggregated; - let prevs = repo + let prev = repo .by_key .list(Some((pool_id, interval, i64::MAX)), SortOrder::Descending)? - .take(1) .take_while(|item| match item { Ok((k, _)) => k.0 == pool_id && k.1 == interval, _ => true, }) - .flatten() - .collect::>(); + .next() + .transpose()?; - if prevs.is_empty() { - log::error!( - "index swap {txid}: Unable to find {pool_id}-{interval} for Aggregate Indexing" - ); - continue; - } - - let Some((_, id)) = prevs.first() else { + let Some((_, id)) = prev else { continue; }; - let aggregated = repo.by_id.get(id)?; + let aggregated = repo.by_id.get(&id)?; let Some(mut aggregated) = aggregated else { continue; @@ -86,7 +76,7 @@ fn index_swap_aggregated( .amounts .insert(from_token_id, format!("{aggregated_amount:.8}")); - repo.by_id.put(id, &aggregated)?; + repo.by_id.put(&id, &aggregated)?; } Ok(()) @@ -97,33 +87,24 @@ fn invalidate_swap_aggregated( pool_id: u32, from_token_id: u64, from_amount: i64, - txid: Txid, ) -> Result<()> { for interval in AGGREGATED_INTERVALS.into_iter().rev() { let repo = &services.pool_swap_aggregated; - let prevs = repo + let prev = repo .by_key .list(Some((pool_id, interval, i64::MAX)), SortOrder::Descending)? - .take(1) .take_while(|item| match item { Ok((k, _)) => k.0 == pool_id && k.1 == interval, _ => true, }) - .flatten() - .collect::>(); + .next() + .transpose()?; - if prevs.is_empty() { - log::error!( - "invalidate swap {txid}: Unable to find {pool_id}-{interval} for Aggregate Indexing" - ); - continue; - } - - let Some((_, id)) = prevs.first() else { + let Some((_, id)) = prev else { continue; }; - let aggregated = repo.by_id.get(id)?; + let aggregated = repo.by_id.get(&id)?; let Some(mut aggregated) = aggregated else { continue; @@ -146,7 +127,7 @@ fn invalidate_swap_aggregated( .amounts .insert(from_token_id, format!("{aggregated_amount:.8}")); - repo.by_id.put(id, &aggregated)?; + repo.by_id.put(&id, &aggregated)?; } Ok(()) @@ -300,7 +281,7 @@ impl Index for PoolSwap { .by_id .put(&(pool_id, ctx.block.height, idx), &swap)?; - index_swap_aggregated(services, pool_id, from_token_id, from_amount, txid)?; + index_swap_aggregated(services, pool_id, from_token_id, from_amount)?; Ok(()) } @@ -323,7 +304,7 @@ impl Index for PoolSwap { .delete(&(pool_id, ctx.block.height, ctx.tx_idx))?; tx_result::invalidate(services, &txid)?; - invalidate_swap_aggregated(services, pool_id, from_token_id, from_amount, txid)?; + invalidate_swap_aggregated(services, pool_id, from_token_id, from_amount)?; Ok(()) } @@ -390,7 +371,7 @@ impl Index for CompositeSwap { .by_id .put(&(pool_id, ctx.block.height, ctx.tx_idx), &swap)?; - index_swap_aggregated(services, pool_id, from_token_id, from_amount, txid)?; + index_swap_aggregated(services, pool_id, from_token_id, from_amount)?; } Ok(()) @@ -400,7 +381,6 @@ impl Index for CompositeSwap { trace!("[ComposoteSwap] Invalidating..."); let from_token_id = self.pool_swap.from_token_id.0; let from_amount = self.pool_swap.from_amount; - let txid = ctx.tx.txid; for pool in self.pools.iter().rev() { let pool_id = pool.id.0 as u32; services @@ -408,7 +388,7 @@ impl Index for CompositeSwap { .by_id .delete(&(pool_id, ctx.block.height, ctx.tx_idx))?; - invalidate_swap_aggregated(services, pool_id, from_token_id, from_amount, txid)?; + invalidate_swap_aggregated(services, pool_id, from_token_id, from_amount)?; } tx_result::invalidate(services, &ctx.tx.txid) } diff --git a/lib/ain-ocean/src/lib.rs b/lib/ain-ocean/src/lib.rs index bde84b9312..c72b6a9f67 100644 --- a/lib/ain-ocean/src/lib.rs +++ b/lib/ain-ocean/src/lib.rs @@ -93,6 +93,7 @@ pub struct OracleHistoryService { pub struct PriceTickerService { by_id: PriceTicker, + by_key: PriceTickerKey, } pub struct ScriptActivityService { @@ -195,6 +196,7 @@ impl Services { }, price_ticker: PriceTickerService { by_id: PriceTicker::new(Arc::clone(&store)), + by_key: PriceTickerKey::new(Arc::clone(&store)), }, script_activity: ScriptActivityService { by_id: ScriptActivity::new(Arc::clone(&store)), diff --git a/lib/ain-ocean/src/model/oracle_price_aggregated_interval.rs b/lib/ain-ocean/src/model/oracle_price_aggregated_interval.rs index 443e4e7e5a..aaa7f70be2 100644 --- a/lib/ain-ocean/src/model/oracle_price_aggregated_interval.rs +++ b/lib/ain-ocean/src/model/oracle_price_aggregated_interval.rs @@ -3,17 +3,42 @@ use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; use super::BlockContext; -pub type OraclePriceAggregatedIntervalId = (Token, Currency, OracleIntervalSeconds, [u8; 4]); //token-currency-interval-height +pub type OraclePriceAggregatedIntervalId = (Token, Currency, String, [u8; 4]); //token-currency-interval-height -pub const FIFTEEN_MINUTES: isize = 15 * 60; -pub const ONE_HOUR: isize = 60 * 60; -pub const ONE_DAY: isize = 24 * 60 * 60; - -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq)] pub enum OracleIntervalSeconds { - FifteenMinutes = FIFTEEN_MINUTES, - OneHour = ONE_HOUR, - OneDay = ONE_DAY, + FifteenMinutes = 900, + OneHour = 3600, + OneDay = 86400, +} + +impl Serialize for OracleIntervalSeconds { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + match self { + Self::FifteenMinutes => serializer.serialize_str("900"), + Self::OneHour => serializer.serialize_str("3600"), + Self::OneDay => serializer.serialize_str("86400"), + } + } +} + +impl<'a> Deserialize<'a> for OracleIntervalSeconds { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'a>, + { + let s = String::deserialize(deserializer).unwrap(); + if s == *"900" { + Ok(Self::FifteenMinutes) + } else if s == *"3600" { + Ok(Self::OneHour) + } else { + Ok(Self::OneDay) + } + } } #[derive(Serialize, Deserialize, Debug, Clone)] diff --git a/lib/ain-ocean/src/model/price_ticker.rs b/lib/ain-ocean/src/model/price_ticker.rs index 158fb96157..e2d7ea8c46 100644 --- a/lib/ain-ocean/src/model/price_ticker.rs +++ b/lib/ain-ocean/src/model/price_ticker.rs @@ -1,8 +1,10 @@ +use ain_dftx::{Currency, Token}; use serde::{Deserialize, Serialize}; use super::oracle_price_aggregated::OraclePriceAggregated; -pub type PriceTickerId = ([u8; 4], [u8; 4], String, String); // total-height-token-currency +pub type PriceTickerId = ([u8; 4], [u8; 4], Token, Currency); // total-height-token-currency +pub type PriceTickerKey = (Token, Currency); #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] diff --git a/lib/ain-ocean/src/storage/mod.rs b/lib/ain-ocean/src/storage/mod.rs index 1f373022c4..2159eda832 100644 --- a/lib/ain-ocean/src/storage/mod.rs +++ b/lib/ain-ocean/src/storage/mod.rs @@ -372,6 +372,14 @@ define_table! { } } +define_table! { + #[derive(Debug)] + pub struct PriceTickerKey { + key_type = model::PriceTickerKey, + value_type = model::PriceTickerId, + } +} + define_table! { #[derive(Debug)] pub struct RawBlock { @@ -508,7 +516,7 @@ define_table! { } } -pub const COLUMN_NAMES: [&str; 27] = [ +pub const COLUMN_NAMES: [&str; 28] = [ Block::NAME, BlockByHeight::NAME, MasternodeStats::NAME, @@ -525,6 +533,7 @@ pub const COLUMN_NAMES: [&str; 27] = [ PoolSwapAggregatedKey::NAME, PoolSwap::NAME, PriceTicker::NAME, + PriceTickerKey::NAME, RawBlock::NAME, ScriptActivity::NAME, ScriptAggregation::NAME,