From ccae50636d54e85161b9405bee103cb4ad72099c Mon Sep 17 00:00:00 2001 From: canonbrother Date: Thu, 28 Nov 2024 22:55:54 +0800 Subject: [PATCH 01/31] refix prices db --- lib/ain-ocean/src/api/common.rs | 27 +++++++++++++++++++++++- lib/ain-ocean/src/api/prices.rs | 32 ++++++++++++++--------------- lib/ain-ocean/src/error.rs | 6 ++++++ lib/ain-ocean/src/indexer/oracle.rs | 24 ++++++++++++++-------- 4 files changed, 63 insertions(+), 26 deletions(-) diff --git a/lib/ain-ocean/src/api/common.rs b/lib/ain-ocean/src/api/common.rs index 64d79f0d9f..5482bff46d 100644 --- a/lib/ain-ocean/src/api/common.rs +++ b/lib/ain-ocean/src/api/common.rs @@ -11,9 +11,10 @@ use super::query::PaginationQuery; use crate::{ error::{ Error::ToArrayError, InvalidAmountSnafu, InvalidFixedIntervalPriceSnafu, - InvalidPoolPairSymbolSnafu, InvalidTokenCurrencySnafu, + InvalidPriceTickerSortKeySnafu, InvalidPoolPairSymbolSnafu, InvalidTokenCurrencySnafu, }, hex_encoder::as_sha256, + model::PriceTickerId, network::Network, Result, }; @@ -128,6 +129,30 @@ pub fn parse_query_height_txid(item: &str) -> Result<(u32, Txid)> { Ok((height, txid)) } +pub fn parse_price_ticker_sort(item: &str) -> Result { + let mut parts = item.split('-'); + let count_height_token = parts.next().context(InvalidPriceTickerSortKeySnafu { item })?; + let encoded_count = &count_height_token[..8]; + let encoded_height = &count_height_token[8..16]; + let token = &count_height_token[16..]; + let token = token.to_string(); + + let count: [u8; 4] = hex::decode(encoded_count)? + .try_into() + .map_err(|_| ToArrayError)?; + + let height: [u8; 4] = hex::decode(encoded_height)? + .try_into() + .map_err(|_| ToArrayError)?; + + let currency = parts + .next() + .context(InvalidTokenCurrencySnafu { item })? + .to_string(); + + Ok((count, height, token, currency)) +} + #[must_use] pub fn format_number(v: Decimal) -> String { if v == dec!(0) { diff --git a/lib/ain-ocean/src/api/prices.rs b/lib/ain-ocean/src/api/prices.rs index d320e3b07d..df3e79e47c 100644 --- a/lib/ain-ocean/src/api/prices.rs +++ b/lib/ain-ocean/src/api/prices.rs @@ -1,4 +1,4 @@ -use std::{collections::HashSet, str::FromStr, sync::Arc}; +use std::{str::FromStr, sync::Arc}; use ain_dftx::{Currency, Token, Weightage, COIN}; use ain_macros::ocean_endpoint; @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize}; use serde_with::skip_serializing_none; use super::{ - common::parse_token_currency, + common::{parse_token_currency, parse_price_ticker_sort}, oracle::OraclePriceFeedResponse, query::PaginationQuery, response::{ApiPagedResponse, Response}, @@ -119,28 +119,26 @@ async fn list_prices( Query(query): Query, Extension(ctx): Extension>, ) -> Result> { - let mut set: HashSet<(Token, Currency)> = HashSet::new(); + let next = query + .next + .map(|item| { + let id = parse_price_ticker_sort(&item)?; + Ok::<([u8; 4], [u8; 4], Token, Currency), Error>(id) + }) + .transpose()?; let prices = ctx .services .price_ticker .by_id - .list(None, SortOrder::Descending)? - .flat_map(|item| { + .list(next.clone(), SortOrder::Descending)? + .take(query.size + usize::from(next.clone().is_some())) + .skip(usize::from(next.is_some())) + .map(|item| { let ((_, _, token, currency), v) = item?; - let has_key = set.contains(&(token.clone(), currency.clone())); - if !has_key { - set.insert((token.clone(), currency.clone())); - Ok::, Error>(Some(PriceTickerResponse::from(( - (token, currency), - v, - )))) - } else { - Ok(None) - } + Ok(PriceTickerResponse::from(((token, currency), v))) }) - .flatten() - .collect::>(); + .collect::>>()?; Ok(ApiPagedResponse::of(prices, query.size, |price| { price.sort.to_string() diff --git a/lib/ain-ocean/src/error.rs b/lib/ain-ocean/src/error.rs index 2e32aca456..b4d5bda2e9 100644 --- a/lib/ain-ocean/src/error.rs +++ b/lib/ain-ocean/src/error.rs @@ -212,6 +212,12 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + #[snafu(display("Invalid price ticker sort key: {}", item))] + InvalidPriceTickerSortKey { + item: String, + #[snafu(implicit)] + location: Location, + }, #[snafu(display("Invalid amount format: {}", item))] InvalidAmount { item: String, diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index c9fb315fd3..589313c842 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -359,11 +359,8 @@ fn index_set_oracle_data( context: &Context, pairs: &HashSet<(Token, Currency)>, ) -> Result<()> { - let oracle_repo = &services.oracle_price_aggregated; - for pair in pairs { let price_aggregated = map_price_aggregated(services, context, pair)?; - let Some(price_aggregated) = price_aggregated else { continue; }; @@ -377,15 +374,26 @@ fn index_set_oracle_data( price_aggregated.block.median_time.to_be_bytes(), price_aggregated.block.height.to_be_bytes(), ); - oracle_repo.by_id.put(&id, &price_aggregated)?; - + services.oracle_price_aggregated.by_id.put(&id, &price_aggregated)?; + let price_repo = &services.price_ticker; let id = ( price_aggregated.aggregated.oracles.total.to_be_bytes(), price_aggregated.block.height.to_be_bytes(), - token, - currency, + token.clone(), + currency.clone(), ); - services.price_ticker.by_id.put( + let prev_price = price_repo + .by_id + .list(Some(id.clone()), SortOrder::Descending)? + .find(|item| match item { + Ok(((_, _, t, c), _)) => t == &token && c == ¤cy, + _ => true + }) + .transpose()?; + if let Some((k, _)) = prev_price { + price_repo.by_id.delete(&k)? + } + price_repo.by_id.put( &id, &PriceTicker { price: price_aggregated, From 583ca3f168b4a2e2ab9381281e781e6322043be6 Mon Sep 17 00:00:00 2001 From: canonbrother Date: Thu, 28 Nov 2024 22:57:02 +0800 Subject: [PATCH 02/31] filter_map as set --- lib/ain-ocean/src/indexer/loan_token.rs | 26 +++++++++---------------- 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/lib/ain-ocean/src/indexer/loan_token.rs b/lib/ain-ocean/src/indexer/loan_token.rs index d05d2ecd9a..1182d4c36c 100644 --- a/lib/ain-ocean/src/indexer/loan_token.rs +++ b/lib/ain-ocean/src/indexer/loan_token.rs @@ -87,18 +87,14 @@ pub fn index_active_price(services: &Arc, block: &BlockContext) -> Res _ => 120, }; if block.height % block_interval == 0 { - let mut set: HashSet<(Token, Currency)> = HashSet::new(); let pairs = services .price_ticker .by_id .list(None, SortOrder::Descending)? - .flat_map(|item| { - let ((_, _, token, currency), _) = item?; - set.insert((token, currency)); - Ok::, Error>(set.clone()) + .filter_map(|item| { + item.ok().map(|((_, _, token, currency), _)| (token, currency)) }) - .next() - .unwrap_or(set); + .collect::>(); for (token, currency) in pairs { perform_active_price_tick(services, (token, currency), block)?; @@ -143,26 +139,22 @@ pub fn invalidate_active_price(services: &Arc, block: &BlockContext) - _ => 120, }; if block.height % block_interval == 0 { - let mut set: HashSet<(Token, Currency)> = HashSet::new(); let pairs = services .price_ticker .by_id .list(None, SortOrder::Descending)? - .flat_map(|item| { - let ((_, _, token, currency), _) = item?; - set.insert((token, currency)); - Ok::, Error>(set.clone()) + .filter_map(|item| { + item.ok().map(|((_, _, token, currency), _)| (token, currency)) }) - .next() - .unwrap_or(set); + .collect::>(); // convert to vector to reverse the hashset is required - let mut vec = Vec::new(); + let mut rev_pairs = Vec::new(); for pair in pairs { - vec.insert(0, pair); + rev_pairs.insert(0, pair); } - for (token, currency) in vec { + for (token, currency) in rev_pairs { services.oracle_price_active.by_id.delete(&( token, currency, From 186dbcf15a9229fdabbdf1a737942b06def25f42 Mon Sep 17 00:00:00 2001 From: canonbrother Date: Thu, 28 Nov 2024 23:07:04 +0800 Subject: [PATCH 03/31] fmt_rs --- lib/ain-ocean/src/api/common.rs | 6 ++++-- lib/ain-ocean/src/api/prices.rs | 2 +- lib/ain-ocean/src/api/stats/cache.rs | 5 +++-- lib/ain-ocean/src/indexer/loan_token.rs | 7 ++++--- lib/ain-ocean/src/indexer/oracle.rs | 7 +++++-- 5 files changed, 17 insertions(+), 10 deletions(-) diff --git a/lib/ain-ocean/src/api/common.rs b/lib/ain-ocean/src/api/common.rs index 5482bff46d..1b70b257ff 100644 --- a/lib/ain-ocean/src/api/common.rs +++ b/lib/ain-ocean/src/api/common.rs @@ -11,7 +11,7 @@ use super::query::PaginationQuery; use crate::{ error::{ Error::ToArrayError, InvalidAmountSnafu, InvalidFixedIntervalPriceSnafu, - InvalidPriceTickerSortKeySnafu, InvalidPoolPairSymbolSnafu, InvalidTokenCurrencySnafu, + InvalidPoolPairSymbolSnafu, InvalidPriceTickerSortKeySnafu, InvalidTokenCurrencySnafu, }, hex_encoder::as_sha256, model::PriceTickerId, @@ -131,7 +131,9 @@ pub fn parse_query_height_txid(item: &str) -> Result<(u32, Txid)> { pub fn parse_price_ticker_sort(item: &str) -> Result { let mut parts = item.split('-'); - let count_height_token = parts.next().context(InvalidPriceTickerSortKeySnafu { item })?; + let count_height_token = parts + .next() + .context(InvalidPriceTickerSortKeySnafu { item })?; let encoded_count = &count_height_token[..8]; let encoded_height = &count_height_token[8..16]; let token = &count_height_token[16..]; diff --git a/lib/ain-ocean/src/api/prices.rs b/lib/ain-ocean/src/api/prices.rs index df3e79e47c..2986bc042f 100644 --- a/lib/ain-ocean/src/api/prices.rs +++ b/lib/ain-ocean/src/api/prices.rs @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize}; use serde_with::skip_serializing_none; use super::{ - common::{parse_token_currency, parse_price_ticker_sort}, + common::{parse_price_ticker_sort, parse_token_currency}, oracle::OraclePriceFeedResponse, query::PaginationQuery, response::{ApiPagedResponse, Response}, diff --git a/lib/ain-ocean/src/api/stats/cache.rs b/lib/ain-ocean/src/api/stats/cache.rs index cdc60a00b8..9e5d853bbb 100644 --- a/lib/ain-ocean/src/api/stats/cache.rs +++ b/lib/ain-ocean/src/api/stats/cache.rs @@ -27,7 +27,7 @@ use crate::{ stats::get_block_reward_distribution, AppContext, }, - error::{DecimalConversionSnafu, Error, OtherSnafu}, + error::{DecimalConversionSnafu, OtherSnafu}, model::MasternodeStatsData, storage::{RepositoryOps, SortOrder}, Result, @@ -111,7 +111,8 @@ pub async fn get_count(ctx: &Arc) -> Result { .by_id .list(None, SortOrder::Descending)? .filter_map(|item| { - item.ok().map(|((_, _, token, currency), _)| (token, currency)) + item.ok() + .map(|((_, _, token, currency), _)| (token, currency)) }) .collect::>(); diff --git a/lib/ain-ocean/src/indexer/loan_token.rs b/lib/ain-ocean/src/indexer/loan_token.rs index 1182d4c36c..46fc56be97 100644 --- a/lib/ain-ocean/src/indexer/loan_token.rs +++ b/lib/ain-ocean/src/indexer/loan_token.rs @@ -6,7 +6,6 @@ use rust_decimal::{prelude::Zero, Decimal}; use rust_decimal_macros::dec; use crate::{ - error::Error, indexer::{Context, Index, Result}, model::{BlockContext, OraclePriceActive, OraclePriceActiveNext, OraclePriceAggregated}, network::Network, @@ -92,7 +91,8 @@ pub fn index_active_price(services: &Arc, block: &BlockContext) -> Res .by_id .list(None, SortOrder::Descending)? .filter_map(|item| { - item.ok().map(|((_, _, token, currency), _)| (token, currency)) + item.ok() + .map(|((_, _, token, currency), _)| (token, currency)) }) .collect::>(); @@ -144,7 +144,8 @@ pub fn invalidate_active_price(services: &Arc, block: &BlockContext) - .by_id .list(None, SortOrder::Descending)? .filter_map(|item| { - item.ok().map(|((_, _, token, currency), _)| (token, currency)) + item.ok() + .map(|((_, _, token, currency), _)| (token, currency)) }) .collect::>(); diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index 589313c842..351af57461 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -374,7 +374,10 @@ fn index_set_oracle_data( price_aggregated.block.median_time.to_be_bytes(), price_aggregated.block.height.to_be_bytes(), ); - services.oracle_price_aggregated.by_id.put(&id, &price_aggregated)?; + services + .oracle_price_aggregated + .by_id + .put(&id, &price_aggregated)?; let price_repo = &services.price_ticker; let id = ( price_aggregated.aggregated.oracles.total.to_be_bytes(), @@ -387,7 +390,7 @@ fn index_set_oracle_data( .list(Some(id.clone()), SortOrder::Descending)? .find(|item| match item { Ok(((_, _, t, c), _)) => t == &token && c == ¤cy, - _ => true + _ => true, }) .transpose()?; if let Some((k, _)) = prev_price { From 813cbcefa131c2fbffdb7a81ce3d01af72076286 Mon Sep 17 00:00:00 2001 From: canonbrother Date: Fri, 29 Nov 2024 14:49:55 +0800 Subject: [PATCH 04/31] note --- lib/ain-ocean/src/indexer/oracle.rs | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index 351af57461..15a9ee920a 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -385,16 +385,20 @@ fn index_set_oracle_data( token.clone(), currency.clone(), ); - let prev_price = price_repo - .by_id - .list(Some(id.clone()), SortOrder::Descending)? - .find(|item| match item { - Ok(((_, _, t, c), _)) => t == &token && c == ¤cy, - _ => true, - }) - .transpose()?; - if let Some((k, _)) = prev_price { - price_repo.by_id.delete(&k)? + // NOTE(canonbrother): rocksdb sort by key by default + // temp solution: clean up extra data to allow limit by `token-currency` but sort by `count-height-token-currency` + { + let prev_price = price_repo + .by_id + .list(Some(id.clone()), SortOrder::Descending)? + .find(|item| match item { + Ok(((_, _, t, c), _)) => t == &token && c == ¤cy, + _ => true, + }) + .transpose()?; + if let Some((k, _)) = prev_price { + price_repo.by_id.delete(&k)? + } } price_repo.by_id.put( &id, From 850fa5a5c94dfdf0ab4fa3fec90a85d6b77ea82f Mon Sep 17 00:00:00 2001 From: canonbrother Date: Thu, 5 Dec 2024 00:53:13 +0800 Subject: [PATCH 05/31] filter_map to get sub_root --- lib/ain-ocean/src/api/prices.rs | 13 +++++++++---- lib/ain-ocean/src/indexer/oracle.rs | 19 ++----------------- lib/ain-ocean/src/model/price_ticker.rs | 3 ++- 3 files changed, 13 insertions(+), 22 deletions(-) diff --git a/lib/ain-ocean/src/api/prices.rs b/lib/ain-ocean/src/api/prices.rs index 2986bc042f..4e371f1af0 100644 --- a/lib/ain-ocean/src/api/prices.rs +++ b/lib/ain-ocean/src/api/prices.rs @@ -1,4 +1,4 @@ -use std::{str::FromStr, sync::Arc}; +use std::{collections::HashSet, str::FromStr, sync::Arc}; use ain_dftx::{Currency, Token, Weightage, COIN}; use ain_macros::ocean_endpoint; @@ -127,6 +127,7 @@ async fn list_prices( }) .transpose()?; + let mut seen = HashSet::new(); let prices = ctx .services .price_ticker @@ -134,9 +135,13 @@ async fn list_prices( .list(next.clone(), SortOrder::Descending)? .take(query.size + usize::from(next.clone().is_some())) .skip(usize::from(next.is_some())) - .map(|item| { - let ((_, _, token, currency), v) = item?; - Ok(PriceTickerResponse::from(((token, currency), v))) + .filter_map(|item| { + let ((_, _, token, currency), v) = item.ok()?; + if seen.contains(&(token.clone(), currency.clone())) { + return None; + } + seen.insert((token.clone(), currency.clone())); + Some(Ok(PriceTickerResponse::from(((token, currency), v)))) }) .collect::>>()?; diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index 15a9ee920a..10debc9814 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -378,29 +378,14 @@ fn index_set_oracle_data( .oracle_price_aggregated .by_id .put(&id, &price_aggregated)?; - let price_repo = &services.price_ticker; + let id = ( price_aggregated.aggregated.oracles.total.to_be_bytes(), price_aggregated.block.height.to_be_bytes(), token.clone(), currency.clone(), ); - // NOTE(canonbrother): rocksdb sort by key by default - // temp solution: clean up extra data to allow limit by `token-currency` but sort by `count-height-token-currency` - { - let prev_price = price_repo - .by_id - .list(Some(id.clone()), SortOrder::Descending)? - .find(|item| match item { - Ok(((_, _, t, c), _)) => t == &token && c == ¤cy, - _ => true, - }) - .transpose()?; - if let Some((k, _)) = prev_price { - price_repo.by_id.delete(&k)? - } - } - price_repo.by_id.put( + services.price_ticker.by_id.put( &id, &PriceTicker { price: price_aggregated, diff --git a/lib/ain-ocean/src/model/price_ticker.rs b/lib/ain-ocean/src/model/price_ticker.rs index 158fb96157..713c058b53 100644 --- a/lib/ain-ocean/src/model/price_ticker.rs +++ b/lib/ain-ocean/src/model/price_ticker.rs @@ -1,8 +1,9 @@ +use ain_dftx::{Currency, Token}; use serde::{Deserialize, Serialize}; use super::oracle_price_aggregated::OraclePriceAggregated; -pub type PriceTickerId = ([u8; 4], [u8; 4], String, String); // total-height-token-currency +pub type PriceTickerId = ([u8; 4], [u8; 4], Token, Currency); // total-height-token-currency #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] From ea3f3c6fdded490bfb83e276e13ef30c6e15b400 Mon Sep 17 00:00:00 2001 From: canonbrother Date: Thu, 5 Dec 2024 01:08:54 +0800 Subject: [PATCH 06/31] rm clone --- lib/ain-ocean/src/indexer/oracle.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index 10debc9814..2b6765bc0a 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -382,8 +382,8 @@ fn index_set_oracle_data( let id = ( price_aggregated.aggregated.oracles.total.to_be_bytes(), price_aggregated.block.height.to_be_bytes(), - token.clone(), - currency.clone(), + token, + currency, ); services.price_ticker.by_id.put( &id, From 746fdb7d207614a11f01d503cb5ec601c8cea3d5 Mon Sep 17 00:00:00 2001 From: canonbrother Date: Thu, 5 Dec 2024 15:31:44 +0800 Subject: [PATCH 07/31] clean --- lib/ain-ocean/src/indexer/oracle.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index 2b6765bc0a..8fc2eccfed 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -270,9 +270,9 @@ fn map_price_aggregated( pair: &(Token, Currency), ) -> Result> { let (token, currency) = pair; - let oracle_repo = &services.oracle_token_currency; - let oracles = oracle_repo + let oracles = services + .oracle_token_currency .by_id .list( Some(( From 0f652e3d2f19354c5097473e6c982ac274903ffa Mon Sep 17 00:00:00 2001 From: canonbrother Date: Fri, 6 Dec 2024 09:45:14 +0800 Subject: [PATCH 08/31] only store latest data in db --- lib/ain-ocean/src/api/prices.rs | 17 +++++------------ lib/ain-ocean/src/indexer/oracle.rs | 18 +++++++++++++++++- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/lib/ain-ocean/src/api/prices.rs b/lib/ain-ocean/src/api/prices.rs index 4e371f1af0..31bb6eb8f2 100644 --- a/lib/ain-ocean/src/api/prices.rs +++ b/lib/ain-ocean/src/api/prices.rs @@ -1,4 +1,4 @@ -use std::{collections::HashSet, str::FromStr, sync::Arc}; +use std::{str::FromStr, sync::Arc}; use ain_dftx::{Currency, Token, Weightage, COIN}; use ain_macros::ocean_endpoint; @@ -127,21 +127,14 @@ async fn list_prices( }) .transpose()?; - let mut seen = HashSet::new(); let prices = ctx .services .price_ticker .by_id - .list(next.clone(), SortOrder::Descending)? - .take(query.size + usize::from(next.clone().is_some())) - .skip(usize::from(next.is_some())) - .filter_map(|item| { - let ((_, _, token, currency), v) = item.ok()?; - if seen.contains(&(token.clone(), currency.clone())) { - return None; - } - seen.insert((token.clone(), currency.clone())); - Some(Ok(PriceTickerResponse::from(((token, currency), v)))) + .list(next, SortOrder::Descending)? + .map(|item| { + let ((_, _, token, currency), v) = item?; + Ok(PriceTickerResponse::from(((token, currency), v))) }) .collect::>>()?; diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index 8fc2eccfed..a62ce614c7 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -379,13 +379,29 @@ fn index_set_oracle_data( .by_id .put(&id, &price_aggregated)?; + let price_repo = &services.price_ticker.by_id; + let prev = price_repo + .list( + Some(([0xffu8; 4], [0xffu8; 4], token.clone(), currency.clone())), + SortOrder::Descending, + )? + .find(|item| match item { + Ok((k, _)) => k.2 == token.clone() && k.3 == currency.clone(), + _ => true, + }) + .transpose()?; + + if let Some((k, _)) = prev { + price_repo.delete(&k)?; + } + let id = ( price_aggregated.aggregated.oracles.total.to_be_bytes(), price_aggregated.block.height.to_be_bytes(), token, currency, ); - services.price_ticker.by_id.put( + price_repo.put( &id, &PriceTicker { price: price_aggregated, From baac91cc46d5cb56e39a418a0794cc5aa5a082a5 Mon Sep 17 00:00:00 2001 From: canonbrother Date: Mon, 9 Dec 2024 16:15:48 +0800 Subject: [PATCH 09/31] add PriceTickerKey --- lib/ain-ocean/src/indexer/oracle.rs | 32 ++++++++++--------------- lib/ain-ocean/src/lib.rs | 2 ++ lib/ain-ocean/src/model/price_ticker.rs | 1 + lib/ain-ocean/src/storage/mod.rs | 11 ++++++++- 4 files changed, 26 insertions(+), 20 deletions(-) diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index a62ce614c7..a6c0cfb522 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -379,34 +379,28 @@ fn index_set_oracle_data( .by_id .put(&id, &price_aggregated)?; - let price_repo = &services.price_ticker.by_id; - let prev = price_repo - .list( - Some(([0xffu8; 4], [0xffu8; 4], token.clone(), currency.clone())), - SortOrder::Descending, - )? - .find(|item| match item { - Ok((k, _)) => k.2 == token.clone() && k.3 == currency.clone(), - _ => true, - }) - .transpose()?; - - if let Some((k, _)) = prev { - price_repo.delete(&k)?; + let price_repo = &services.price_ticker; + let sort_key = price_repo.by_key.get(&(token.clone(), currency.clone()))?; + if let Some(sort_key) = sort_key { + price_repo.by_id.delete(&sort_key)?; } - let id = ( + let new_sort_key = ( price_aggregated.aggregated.oracles.total.to_be_bytes(), price_aggregated.block.height.to_be_bytes(), - token, - currency, + token.clone(), + currency.clone(), ); - price_repo.put( - &id, + price_repo.by_id.put( + &new_sort_key, &PriceTicker { price: price_aggregated, }, )?; + price_repo.by_key.put( + &(token, currency), + &new_sort_key + )?; } Ok(()) } diff --git a/lib/ain-ocean/src/lib.rs b/lib/ain-ocean/src/lib.rs index bde84b9312..c72b6a9f67 100644 --- a/lib/ain-ocean/src/lib.rs +++ b/lib/ain-ocean/src/lib.rs @@ -93,6 +93,7 @@ pub struct OracleHistoryService { pub struct PriceTickerService { by_id: PriceTicker, + by_key: PriceTickerKey, } pub struct ScriptActivityService { @@ -195,6 +196,7 @@ impl Services { }, price_ticker: PriceTickerService { by_id: PriceTicker::new(Arc::clone(&store)), + by_key: PriceTickerKey::new(Arc::clone(&store)), }, script_activity: ScriptActivityService { by_id: ScriptActivity::new(Arc::clone(&store)), diff --git a/lib/ain-ocean/src/model/price_ticker.rs b/lib/ain-ocean/src/model/price_ticker.rs index 713c058b53..e2d7ea8c46 100644 --- a/lib/ain-ocean/src/model/price_ticker.rs +++ b/lib/ain-ocean/src/model/price_ticker.rs @@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize}; use super::oracle_price_aggregated::OraclePriceAggregated; pub type PriceTickerId = ([u8; 4], [u8; 4], Token, Currency); // total-height-token-currency +pub type PriceTickerKey = (Token, Currency); #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] diff --git a/lib/ain-ocean/src/storage/mod.rs b/lib/ain-ocean/src/storage/mod.rs index 1f373022c4..2159eda832 100644 --- a/lib/ain-ocean/src/storage/mod.rs +++ b/lib/ain-ocean/src/storage/mod.rs @@ -372,6 +372,14 @@ define_table! { } } +define_table! { + #[derive(Debug)] + pub struct PriceTickerKey { + key_type = model::PriceTickerKey, + value_type = model::PriceTickerId, + } +} + define_table! { #[derive(Debug)] pub struct RawBlock { @@ -508,7 +516,7 @@ define_table! { } } -pub const COLUMN_NAMES: [&str; 27] = [ +pub const COLUMN_NAMES: [&str; 28] = [ Block::NAME, BlockByHeight::NAME, MasternodeStats::NAME, @@ -525,6 +533,7 @@ pub const COLUMN_NAMES: [&str; 27] = [ PoolSwapAggregatedKey::NAME, PoolSwap::NAME, PriceTicker::NAME, + PriceTickerKey::NAME, RawBlock::NAME, ScriptActivity::NAME, ScriptAggregation::NAME, From 7421bf9b23d951de025ea9fcbfa92762db64d43f Mon Sep 17 00:00:00 2001 From: canonbrother Date: Mon, 9 Dec 2024 22:53:14 +0800 Subject: [PATCH 10/31] fmt_rs --- lib/ain-ocean/src/indexer/oracle.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index a6c0cfb522..7af1e81774 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -397,10 +397,7 @@ fn index_set_oracle_data( price: price_aggregated, }, )?; - price_repo.by_key.put( - &(token, currency), - &new_sort_key - )?; + price_repo.by_key.put(&(token, currency), &new_sort_key)?; } Ok(()) } From f3080d6a6da35a141bb348bfd89628d98345f134 Mon Sep 17 00:00:00 2001 From: canonbrother Date: Tue, 10 Dec 2024 22:19:21 +0800 Subject: [PATCH 11/31] fix oracle interval type --- lib/ain-ocean/src/api/prices.rs | 8 ++-- lib/ain-ocean/src/indexer/oracle.rs | 33 ++++++++------ .../model/oracle_price_aggregated_interval.rs | 43 +++++++++++++++---- 3 files changed, 58 insertions(+), 26 deletions(-) diff --git a/lib/ain-ocean/src/api/prices.rs b/lib/ain-ocean/src/api/prices.rs index 31bb6eb8f2..8c7e88fbda 100644 --- a/lib/ain-ocean/src/api/prices.rs +++ b/lib/ain-ocean/src/api/prices.rs @@ -358,12 +358,13 @@ async fn get_feed_with_interval( let (token, currency) = parse_token_currency(&key)?; let interval = interval.parse::()?; - let interval_type = match interval { + let interval = match interval { 900 => OracleIntervalSeconds::FifteenMinutes, 3600 => OracleIntervalSeconds::OneHour, 86400 => OracleIntervalSeconds::OneDay, _ => return Err(From::from("Invalid oracle interval")), }; + let interval = interval as u32; let next = query .next @@ -374,7 +375,7 @@ async fn get_feed_with_interval( .transpose()? .unwrap_or([0xffu8; 4]); - let id = (token.clone(), currency.clone(), interval_type.clone(), next); + let id = (token.clone(), currency.clone(), interval.to_string(), next); let items = ctx .services @@ -384,13 +385,14 @@ async fn get_feed_with_interval( .take(query.size) .take_while(|item| match item { Ok(((t, c, i, _), _)) => { - t == &token.clone() && c == ¤cy.clone() && i == &interval_type.clone() + t == &token.clone() && c == ¤cy.clone() && i == &interval.to_string() } _ => true, }) .flatten() .collect::>(); + let interval = interval as i64; let mut prices = Vec::new(); for (id, item) in items { let start = item.block.median_time - (item.block.median_time % interval); diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index 7af1e81774..75bdbaa149 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -27,10 +27,10 @@ use crate::{ Services, }; -pub const AGGREGATED_INTERVALS: [OracleIntervalSeconds; 3] = [ - OracleIntervalSeconds::FifteenMinutes, - OracleIntervalSeconds::OneDay, - OracleIntervalSeconds::OneHour, +pub const AGGREGATED_INTERVALS: [u32; 3] = [ + OracleIntervalSeconds::FifteenMinutes as u32, + OracleIntervalSeconds::OneDay as u32, + OracleIntervalSeconds::OneHour as u32, ]; impl Index for AppointOracle { @@ -478,7 +478,7 @@ impl Index for SetOracleData { token, currency, &aggregated, - &interval, + interval, )?; } @@ -521,9 +521,14 @@ fn start_new_bucket( token: Token, currency: Currency, aggregated: &OraclePriceAggregated, - interval: OracleIntervalSeconds, + interval: u32, ) -> Result<()> { - let id = (token, currency, interval, block.height.to_be_bytes()); + let id = ( + token, + currency, + interval.to_string(), + block.height.to_be_bytes(), + ); services.oracle_price_aggregated_interval.by_id.put( &id, &OraclePriceAggregatedInterval { @@ -549,7 +554,7 @@ pub fn index_interval_mapper( token: Token, currency: Currency, aggregated: &OraclePriceAggregated, - interval: OracleIntervalSeconds, + interval: u32, ) -> Result<()> { let repo = &services.oracle_price_aggregated_interval; let previous = repo @@ -558,14 +563,14 @@ pub fn index_interval_mapper( Some(( token.clone(), currency.clone(), - interval.clone(), + interval.to_string(), [0xffu8; 4], )), SortOrder::Descending, )? .take_while(|item| match item { Ok(((t, c, i, _), _)) => { - t == &token.clone() && c == ¤cy.clone() && i == &interval.clone() + t == &token.clone() && c == ¤cy.clone() && i == &interval.to_string() } _ => true, }) @@ -576,7 +581,7 @@ pub fn index_interval_mapper( return start_new_bucket(services, block, token, currency, aggregated, interval); }; - if block.median_time - aggregated.block.median_time > interval.clone() as i64 { + if block.median_time - aggregated.block.median_time > interval as i64 { return start_new_bucket(services, block, token, currency, aggregated, interval); }; @@ -591,7 +596,7 @@ pub fn invalidate_oracle_interval( token: &str, currency: &str, aggregated: &OraclePriceAggregated, - interval: &OracleIntervalSeconds, + interval: u32, ) -> Result<()> { let repo = &services.oracle_price_aggregated_interval; let previous = repo @@ -600,7 +605,7 @@ pub fn invalidate_oracle_interval( Some(( token.to_string(), currency.to_string(), - interval.clone(), + interval.to_string(), [0xffu8; 4], )), SortOrder::Descending, @@ -612,7 +617,7 @@ pub fn invalidate_oracle_interval( return Err(Error::NotFoundIndex { action: IndexAction::Invalidate, r#type: "Invalidate oracle price aggregated interval".to_string(), - id: format!("{}-{}-{:?}", token, currency, interval), + id: format!("{}-{}-{:?}", token, currency, interval.to_string()), }); }; diff --git a/lib/ain-ocean/src/model/oracle_price_aggregated_interval.rs b/lib/ain-ocean/src/model/oracle_price_aggregated_interval.rs index 443e4e7e5a..aaa7f70be2 100644 --- a/lib/ain-ocean/src/model/oracle_price_aggregated_interval.rs +++ b/lib/ain-ocean/src/model/oracle_price_aggregated_interval.rs @@ -3,17 +3,42 @@ use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; use super::BlockContext; -pub type OraclePriceAggregatedIntervalId = (Token, Currency, OracleIntervalSeconds, [u8; 4]); //token-currency-interval-height +pub type OraclePriceAggregatedIntervalId = (Token, Currency, String, [u8; 4]); //token-currency-interval-height -pub const FIFTEEN_MINUTES: isize = 15 * 60; -pub const ONE_HOUR: isize = 60 * 60; -pub const ONE_DAY: isize = 24 * 60 * 60; - -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq)] pub enum OracleIntervalSeconds { - FifteenMinutes = FIFTEEN_MINUTES, - OneHour = ONE_HOUR, - OneDay = ONE_DAY, + FifteenMinutes = 900, + OneHour = 3600, + OneDay = 86400, +} + +impl Serialize for OracleIntervalSeconds { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + match self { + Self::FifteenMinutes => serializer.serialize_str("900"), + Self::OneHour => serializer.serialize_str("3600"), + Self::OneDay => serializer.serialize_str("86400"), + } + } +} + +impl<'a> Deserialize<'a> for OracleIntervalSeconds { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'a>, + { + let s = String::deserialize(deserializer).unwrap(); + if s == *"900" { + Ok(Self::FifteenMinutes) + } else if s == *"3600" { + Ok(Self::OneHour) + } else { + Ok(Self::OneDay) + } + } } #[derive(Serialize, Deserialize, Debug, Clone)] From cac09f339c91c5131449cd0e5d7d66eebd7c47ec Mon Sep 17 00:00:00 2001 From: canonbrother Date: Thu, 12 Dec 2024 16:59:52 +0800 Subject: [PATCH 12/31] rm debug --- lib/ain-ocean/src/api/prices.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/ain-ocean/src/api/prices.rs b/lib/ain-ocean/src/api/prices.rs index 8c7e88fbda..783573724b 100644 --- a/lib/ain-ocean/src/api/prices.rs +++ b/lib/ain-ocean/src/api/prices.rs @@ -399,8 +399,8 @@ async fn get_feed_with_interval( let height = u32::from_be_bytes(id.3); let price = OraclePriceAggregatedIntervalResponse { - id: format!("{}-{}-{:?}-{}", id.0, id.1, id.2, height), - key: format!("{}-{}-{:?}", id.0, id.1, id.2), + id: format!("{}-{}-{}-{}", id.0, id.1, id.2, height), + key: format!("{}-{}-{}", id.0, id.1, id.2), sort: format!( "{}{}", hex::encode(item.block.median_time.to_be_bytes()), From ad59ab7b05f570621b497f5cbac1d58cee4c3509 Mon Sep 17 00:00:00 2001 From: canonbrother Date: Thu, 12 Dec 2024 16:58:46 +0800 Subject: [PATCH 13/31] take(1) to next --- lib/ain-ocean/src/api/address.rs | 13 +++--- lib/ain-ocean/src/api/pool_pair/service.rs | 16 +++---- lib/ain-ocean/src/indexer/mod.rs | 12 ++--- lib/ain-ocean/src/indexer/poolswap.rs | 52 +++++++--------------- 4 files changed, 36 insertions(+), 57 deletions(-) diff --git a/lib/ain-ocean/src/api/address.rs b/lib/ain-ocean/src/api/address.rs index be6ee17749..2ab5a2f8ad 100644 --- a/lib/ain-ocean/src/api/address.rs +++ b/lib/ain-ocean/src/api/address.rs @@ -174,19 +174,18 @@ fn get_latest_aggregation( .script_aggregation .by_id .list(Some((hid, [0xffu8; 4])), SortOrder::Descending)? - .take(1) .take_while(|item| match item { Ok(((v, _), _)) => v == &hid, _ => true, }) + .next() + .transpose()? .map(|item| { - let (_, v) = item?; - let res = v.into(); - Ok(res) - }) - .collect::>>()?; + let (_, v) = item; + v.into() + }); - Ok(latest.first().cloned()) + Ok(latest) } #[ocean_endpoint] diff --git a/lib/ain-ocean/src/api/pool_pair/service.rs b/lib/ain-ocean/src/api/pool_pair/service.rs index c12d3780a4..d45b1ff93f 100644 --- a/lib/ain-ocean/src/api/pool_pair/service.rs +++ b/lib/ain-ocean/src/api/pool_pair/service.rs @@ -594,22 +594,22 @@ fn call_dftx(ctx: &Arc, txid: Txid) -> Result> { .transaction .vout_by_id .list(Some((txid, 0)), SortOrder::Ascending)? - .take(1) .take_while(|item| match item { Ok((_, vout)) => vout.txid == txid, _ => true, }) + .next() + .transpose()? .map(|item| { - let (_, v) = item?; - Ok(v) - }) - .collect::>>()?; + let (_, v) = item; + v + }); - if vout.is_empty() { + let Some(vout) = vout else { return Ok(None); - } + }; - let bytes = &vout[0].script.hex; + let bytes = &vout.script.hex; if bytes.len() > 6 && bytes[0] == 0x6a && bytes[1] <= 0x4e { let offset = 1 + match bytes[1] { 0x4c => 2, diff --git a/lib/ain-ocean/src/indexer/mod.rs b/lib/ain-ocean/src/indexer/mod.rs index a572323dcf..da9d98e34f 100644 --- a/lib/ain-ocean/src/indexer/mod.rs +++ b/lib/ain-ocean/src/indexer/mod.rs @@ -327,18 +327,18 @@ fn index_script(services: &Arc, ctx: &Context, txs: &[Transaction]) -> let latest = repo .by_id .list(Some((aggregation.hid, [0xffu8; 4])), SortOrder::Descending)? - .take(1) .take_while(|item| match item { Ok(((hid, _), _)) => &aggregation.hid == hid, _ => true, }) + .next() + .transpose()? .map(|item| { - let (_, v) = item?; - Ok(v) - }) - .collect::>>()?; + let (_, v) = item; + v + }); - if let Some(latest) = latest.first().cloned() { + if let Some(latest) = latest { aggregation.statistic.tx_in_count += latest.statistic.tx_in_count; aggregation.statistic.tx_out_count += latest.statistic.tx_out_count; diff --git a/lib/ain-ocean/src/indexer/poolswap.rs b/lib/ain-ocean/src/indexer/poolswap.rs index e9ed559528..3ae7514185 100644 --- a/lib/ain-ocean/src/indexer/poolswap.rs +++ b/lib/ain-ocean/src/indexer/poolswap.rs @@ -2,7 +2,6 @@ use std::{str::FromStr, sync::Arc}; use ain_cpp_imports::PoolPairCreationHeight; use ain_dftx::{pool::*, COIN}; -use bitcoin::Txid; use log::trace; use parking_lot::RwLock; use rust_decimal::Decimal; @@ -37,33 +36,24 @@ fn index_swap_aggregated( pool_id: u32, from_token_id: u64, from_amount: i64, - txid: Txid, ) -> Result<()> { for interval in AGGREGATED_INTERVALS { let repo = &services.pool_swap_aggregated; - let prevs = repo + let prev = repo .by_key .list(Some((pool_id, interval, i64::MAX)), SortOrder::Descending)? - .take(1) .take_while(|item| match item { Ok((k, _)) => k.0 == pool_id && k.1 == interval, _ => true, }) - .flatten() - .collect::>(); + .next() + .transpose()?; - if prevs.is_empty() { - log::error!( - "index swap {txid}: Unable to find {pool_id}-{interval} for Aggregate Indexing" - ); - continue; - } - - let Some((_, id)) = prevs.first() else { + let Some((_, id)) = prev else { continue; }; - let aggregated = repo.by_id.get(id)?; + let aggregated = repo.by_id.get(&id)?; let Some(mut aggregated) = aggregated else { continue; @@ -86,7 +76,7 @@ fn index_swap_aggregated( .amounts .insert(from_token_id, format!("{aggregated_amount:.8}")); - repo.by_id.put(id, &aggregated)?; + repo.by_id.put(&id, &aggregated)?; } Ok(()) @@ -97,33 +87,24 @@ fn invalidate_swap_aggregated( pool_id: u32, from_token_id: u64, from_amount: i64, - txid: Txid, ) -> Result<()> { for interval in AGGREGATED_INTERVALS.into_iter().rev() { let repo = &services.pool_swap_aggregated; - let prevs = repo + let prev = repo .by_key .list(Some((pool_id, interval, i64::MAX)), SortOrder::Descending)? - .take(1) .take_while(|item| match item { Ok((k, _)) => k.0 == pool_id && k.1 == interval, _ => true, }) - .flatten() - .collect::>(); + .next() + .transpose()?; - if prevs.is_empty() { - log::error!( - "invalidate swap {txid}: Unable to find {pool_id}-{interval} for Aggregate Indexing" - ); - continue; - } - - let Some((_, id)) = prevs.first() else { + let Some((_, id)) = prev else { continue; }; - let aggregated = repo.by_id.get(id)?; + let aggregated = repo.by_id.get(&id)?; let Some(mut aggregated) = aggregated else { continue; @@ -146,7 +127,7 @@ fn invalidate_swap_aggregated( .amounts .insert(from_token_id, format!("{aggregated_amount:.8}")); - repo.by_id.put(id, &aggregated)?; + repo.by_id.put(&id, &aggregated)?; } Ok(()) @@ -300,7 +281,7 @@ impl Index for PoolSwap { .by_id .put(&(pool_id, ctx.block.height, idx), &swap)?; - index_swap_aggregated(services, pool_id, from_token_id, from_amount, txid)?; + index_swap_aggregated(services, pool_id, from_token_id, from_amount)?; Ok(()) } @@ -323,7 +304,7 @@ impl Index for PoolSwap { .delete(&(pool_id, ctx.block.height, ctx.tx_idx))?; tx_result::invalidate(services, &txid)?; - invalidate_swap_aggregated(services, pool_id, from_token_id, from_amount, txid)?; + invalidate_swap_aggregated(services, pool_id, from_token_id, from_amount)?; Ok(()) } @@ -390,7 +371,7 @@ impl Index for CompositeSwap { .by_id .put(&(pool_id, ctx.block.height, ctx.tx_idx), &swap)?; - index_swap_aggregated(services, pool_id, from_token_id, from_amount, txid)?; + index_swap_aggregated(services, pool_id, from_token_id, from_amount)?; } Ok(()) @@ -400,7 +381,6 @@ impl Index for CompositeSwap { trace!("[ComposoteSwap] Invalidating..."); let from_token_id = self.pool_swap.from_token_id.0; let from_amount = self.pool_swap.from_amount; - let txid = ctx.tx.txid; for pool in self.pools.iter().rev() { let pool_id = pool.id.0 as u32; services @@ -408,7 +388,7 @@ impl Index for CompositeSwap { .by_id .delete(&(pool_id, ctx.block.height, ctx.tx_idx))?; - invalidate_swap_aggregated(services, pool_id, from_token_id, from_amount, txid)?; + invalidate_swap_aggregated(services, pool_id, from_token_id, from_amount)?; } tx_result::invalidate(services, &ctx.tx.txid) } From ffa2aa248c030cc8a11aafb81aaa2d9edbf98817 Mon Sep 17 00:00:00 2001 From: canonbrother Date: Thu, 12 Dec 2024 17:31:13 +0800 Subject: [PATCH 14/31] missing grab prefix - take_while --- lib/ain-ocean/src/api/loan.rs | 6 +++++- lib/ain-ocean/src/indexer/loan_token.rs | 8 ++++++++ lib/ain-ocean/src/indexer/oracle.rs | 14 +++++++++++++- 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/lib/ain-ocean/src/api/loan.rs b/lib/ain-ocean/src/api/loan.rs index fb67b4b63f..d48f2f2b9c 100644 --- a/lib/ain-ocean/src/api/loan.rs +++ b/lib/ain-ocean/src/api/loan.rs @@ -271,7 +271,11 @@ async fn list_loan_token( .services .oracle_price_active .by_id - .list(Some((token, currency, [0xffu8; 4])), SortOrder::Descending)? + .list(Some((token.clone(), currency.clone(), [0xffu8; 4])), SortOrder::Descending)? + .take_while(|item| match item { + Ok((k, _)) => k.0 == token && k.1 == currency, + _ => true, + }) .next() .map(|item| { let (_, v) = item?; diff --git a/lib/ain-ocean/src/indexer/loan_token.rs b/lib/ain-ocean/src/indexer/loan_token.rs index 46fc56be97..86ead3b1e9 100644 --- a/lib/ain-ocean/src/indexer/loan_token.rs +++ b/lib/ain-ocean/src/indexer/loan_token.rs @@ -183,6 +183,10 @@ pub fn perform_active_price_tick( .oracle_price_aggregated .by_id .list(Some(id.clone()), SortOrder::Descending)? + .take_while(|item| match item { + Ok((k, _)) => k.0 == id.0 && k.1 == id.1, + _ => true, + }) .next() .transpose()?; @@ -195,6 +199,10 @@ pub fn perform_active_price_tick( let prev = repo .by_id .list(Some(id.clone()), SortOrder::Descending)? + .take_while(|item| match item { + Ok((k, _)) => k.0 == id.0 && k.1 == id.1, + _ => true, + }) .next() .transpose()?; diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index 75bdbaa149..b6aa005fb5 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -302,9 +302,13 @@ fn map_price_aggregated( .oracle_price_feed .by_id .list( - Some((id.0, id.1, id.2, [0xffu8; 4], base_id)), + Some((id.0.clone(), id.1.clone(), id.2, [0xffu8; 4], base_id)), SortOrder::Descending, )? + .take_while(|item| match item { + Ok((k, _)) => k.0 == id.0 && k.1 == id.1 && k.2 == id.2, + _ => true, + }) .next() .transpose()?; @@ -610,6 +614,10 @@ pub fn invalidate_oracle_interval( )), SortOrder::Descending, )? + .take_while(|item| match item { + Ok((k, _)) => k.0 == token && k.1 == currency && k.2 == interval.to_string(), + _ => true, + }) .next() .transpose()?; @@ -755,6 +763,10 @@ fn get_previous_oracle( .oracle_history .by_id .list(Some((oracle_id, u32::MAX)), SortOrder::Descending)? + .take_while(|item| match item { + Ok((k, _)) => k.0 == oracle_id, + _ => true, + }) .next() .transpose()?; From b6c50cd8fbadcfbce5aa02b4b05e28bbbd143976 Mon Sep 17 00:00:00 2001 From: canonbrother Date: Sat, 14 Dec 2024 14:29:34 +0800 Subject: [PATCH 15/31] fmt_rs --- lib/ain-ocean/src/api/loan.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/ain-ocean/src/api/loan.rs b/lib/ain-ocean/src/api/loan.rs index d48f2f2b9c..905c491eae 100644 --- a/lib/ain-ocean/src/api/loan.rs +++ b/lib/ain-ocean/src/api/loan.rs @@ -271,7 +271,10 @@ async fn list_loan_token( .services .oracle_price_active .by_id - .list(Some((token.clone(), currency.clone(), [0xffu8; 4])), SortOrder::Descending)? + .list( + Some((token.clone(), currency.clone(), [0xffu8; 4])), + SortOrder::Descending, + )? .take_while(|item| match item { Ok((k, _)) => k.0 == token && k.1 == currency, _ => true, From 29246ff3a6f04d162433f51407114df1f2999be5 Mon Sep 17 00:00:00 2001 From: canonbrother Date: Mon, 16 Dec 2024 15:43:08 +0800 Subject: [PATCH 16/31] rm unuse --- lib/ain-ocean/src/indexer/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/ain-ocean/src/indexer/mod.rs b/lib/ain-ocean/src/indexer/mod.rs index da9d98e34f..550a526440 100644 --- a/lib/ain-ocean/src/indexer/mod.rs +++ b/lib/ain-ocean/src/indexer/mod.rs @@ -354,8 +354,6 @@ fn index_script(services: &Arc, ctx: &Context, txs: &[Transaction]) -> &(aggregation.hid, ctx.block.height.to_be_bytes()), &aggregation, )?; - - record.insert(aggregation.hid, aggregation); } log_elapsed(start, format!("Indexed script {:x}", ctx.tx.txid)); From de33a9b62d017c8e12b5df4192871f6f8b0f340f Mon Sep 17 00:00:00 2001 From: canonbrother Date: Mon, 16 Dec 2024 16:38:32 +0800 Subject: [PATCH 17/31] fix oracle interval index --- lib/ain-ocean/src/indexer/oracle.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index b6aa005fb5..0c6cc37d08 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -581,16 +581,14 @@ pub fn index_interval_mapper( .next() .transpose()?; - let Some(previous) = previous else { + if previous.is_none() || block.median_time - aggregated.block.median_time > interval as i64 { return start_new_bucket(services, block, token, currency, aggregated, interval); }; - if block.median_time - aggregated.block.median_time > interval as i64 { - return start_new_bucket(services, block, token, currency, aggregated, interval); + if let Some(previous) = previous { + forward_aggregate(services, previous, aggregated)?; }; - forward_aggregate(services, previous, aggregated)?; - Ok(()) } From 583c635d9f95a6d43a2db10a4c66aa1a2f8083e2 Mon Sep 17 00:00:00 2001 From: canonbrother Date: Tue, 17 Dec 2024 17:42:44 +0800 Subject: [PATCH 18/31] fix oracle interval index 2 --- lib/ain-ocean/src/indexer/oracle.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index 0c6cc37d08..e9138f298b 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -581,11 +581,14 @@ pub fn index_interval_mapper( .next() .transpose()?; - if previous.is_none() || block.median_time - aggregated.block.median_time > interval as i64 { + if previous.is_none() { return start_new_bucket(services, block, token, currency, aggregated, interval); }; if let Some(previous) = previous { + if block.median_time - previous.1.block.median_time > interval as i64 { + return start_new_bucket(services, block, token, currency, aggregated, interval); + }; forward_aggregate(services, previous, aggregated)?; }; From 4c582530283a58a15f057497e45791d0f082ff1d Mon Sep 17 00:00:00 2001 From: canonbrother Date: Wed, 18 Dec 2024 08:14:26 +0800 Subject: [PATCH 19/31] missing dash --- lib/ain-ocean/src/api/pool_pair/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/ain-ocean/src/api/pool_pair/mod.rs b/lib/ain-ocean/src/api/pool_pair/mod.rs index 045e035cf1..73a535b7c3 100644 --- a/lib/ain-ocean/src/api/pool_pair/mod.rs +++ b/lib/ain-ocean/src/api/pool_pair/mod.rs @@ -79,7 +79,7 @@ impl PoolSwapVerboseResponse { Self { id: format!("{}-{}", v.pool_id, v.txid), sort: format!( - "{}{}", + "{}-{}", hex::encode(v.block.height.to_be_bytes()), hex::encode(v.txno.to_be_bytes()), ), From ada8b8c3f05b532fcd70997d4a0d4bf9b10921ba Mon Sep 17 00:00:00 2001 From: canonbrother Date: Wed, 18 Dec 2024 16:48:06 +0800 Subject: [PATCH 20/31] fix missing script_agg --- lib/ain-ocean/src/indexer/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/ain-ocean/src/indexer/mod.rs b/lib/ain-ocean/src/indexer/mod.rs index 550a526440..628821b62e 100644 --- a/lib/ain-ocean/src/indexer/mod.rs +++ b/lib/ain-ocean/src/indexer/mod.rs @@ -292,7 +292,7 @@ fn index_script(services: &Arc, ctx: &Context, txs: &[Transaction]) -> let Some(vout) = find_tx_vout(services, &vin, txs)? else { if is_skipped_tx(&vin.txid) { - return Ok(()); + continue; }; return Err(Error::NotFoundIndex { @@ -312,7 +312,7 @@ fn index_script(services: &Arc, ctx: &Context, txs: &[Transaction]) -> index_script_unspent_vout(services, vout, ctx)?; if vout.script_pub_key.hex.starts_with(&[0x6a]) { - return Ok(()); + continue; } index_script_activity_vout(services, vout, ctx)?; @@ -322,7 +322,7 @@ fn index_script(services: &Arc, ctx: &Context, txs: &[Transaction]) -> } // index_script_aggregation - for (_, mut aggregation) in record.clone() { + for (_, mut aggregation) in record { let repo = &services.script_aggregation; let latest = repo .by_id From f446bf3bd64380d02c41b2cbe287088eaf1923aa Mon Sep 17 00:00:00 2001 From: canonbrother Date: Thu, 19 Dec 2024 13:39:02 +0800 Subject: [PATCH 21/31] fix OraclePriceAggregatedIntervalResponse --- lib/ain-ocean/src/api/prices.rs | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/lib/ain-ocean/src/api/prices.rs b/lib/ain-ocean/src/api/prices.rs index 783573724b..cf66cc7790 100644 --- a/lib/ain-ocean/src/api/prices.rs +++ b/lib/ain-ocean/src/api/prices.rs @@ -23,7 +23,7 @@ use crate::{ error::{ApiError, Error}, model::{ BlockContext, OracleIntervalSeconds, OraclePriceActive, - OraclePriceAggregatedIntervalAggregated, PriceTicker, + OraclePriceAggregatedIntervalAggregatedOracles, PriceTicker, }, storage::{RepositoryOps, SortOrder}, Result, @@ -330,8 +330,20 @@ pub struct OraclePriceAggregatedIntervalResponse { pub sort: String, // medianTime-height pub token: Token, pub currency: Currency, - pub aggregated: OraclePriceAggregatedIntervalAggregated, + pub aggregated: OraclePriceAggregatedIntervalAggregatedResponse, pub block: BlockContext, + +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct OraclePriceAggregatedIntervalAggregatedResponse { + #[serde(with = "rust_decimal::serde::str")] + pub amount: Decimal, + #[serde(with = "rust_decimal::serde::str")] + pub weightage: Decimal, + pub count: i32, + pub oracles: OraclePriceAggregatedIntervalAggregatedOracles, /** * Aggregated interval time range in seconds. * - Interval that aggregated in seconds @@ -408,18 +420,18 @@ async fn get_feed_with_interval( ), token: token.clone(), currency: currency.clone(), - aggregated: OraclePriceAggregatedIntervalAggregated { + aggregated: OraclePriceAggregatedIntervalAggregatedResponse { amount: item.aggregated.amount, weightage: item.aggregated.weightage, oracles: item.aggregated.oracles, count: item.aggregated.count, + time: OraclePriceAggregatedIntervalTime { + interval, + start, + end: start + interval, + }, }, block: item.block, - time: OraclePriceAggregatedIntervalTime { - interval, - start, - end: start + interval, - }, }; prices.push(price); } From 9af4ce8c710c85bbfccc5263183aa399f700ce6f Mon Sep 17 00:00:00 2001 From: canonbrother Date: Thu, 19 Dec 2024 13:47:47 +0800 Subject: [PATCH 22/31] fix OraclePriceAggregatedIntervalResponse sort --- lib/ain-ocean/src/api/prices.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/ain-ocean/src/api/prices.rs b/lib/ain-ocean/src/api/prices.rs index cf66cc7790..322221cd9f 100644 --- a/lib/ain-ocean/src/api/prices.rs +++ b/lib/ain-ocean/src/api/prices.rs @@ -414,9 +414,7 @@ async fn get_feed_with_interval( id: format!("{}-{}-{}-{}", id.0, id.1, id.2, height), key: format!("{}-{}-{}", id.0, id.1, id.2), sort: format!( - "{}{}", - hex::encode(item.block.median_time.to_be_bytes()), - hex::encode(item.block.height.to_be_bytes()), + "{}", hex::encode(item.block.height.to_be_bytes().to_string()), ), token: token.clone(), currency: currency.clone(), From 24a71453bb7c5899239a38197a063e4a9955d7a4 Mon Sep 17 00:00:00 2001 From: canonbrother Date: Thu, 19 Dec 2024 13:49:08 +0800 Subject: [PATCH 23/31] fix OraclePriceAggregatedIntervalResponse sort --- lib/ain-ocean/src/api/prices.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/ain-ocean/src/api/prices.rs b/lib/ain-ocean/src/api/prices.rs index 322221cd9f..7e32a264c4 100644 --- a/lib/ain-ocean/src/api/prices.rs +++ b/lib/ain-ocean/src/api/prices.rs @@ -413,9 +413,7 @@ async fn get_feed_with_interval( let price = OraclePriceAggregatedIntervalResponse { id: format!("{}-{}-{}-{}", id.0, id.1, id.2, height), key: format!("{}-{}-{}", id.0, id.1, id.2), - sort: format!( - "{}", hex::encode(item.block.height.to_be_bytes().to_string()), - ), + sort: hex::encode(item.block.height.to_be_bytes()).to_string(), token: token.clone(), currency: currency.clone(), aggregated: OraclePriceAggregatedIntervalAggregatedResponse { From be0e724728f78f52b340a37445971e4d10980e97 Mon Sep 17 00:00:00 2001 From: canonbrother Date: Thu, 19 Dec 2024 14:01:39 +0800 Subject: [PATCH 24/31] fix OraclePriceAggregatedIntervalResponse sort --- lib/ain-ocean/src/api/prices.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/ain-ocean/src/api/prices.rs b/lib/ain-ocean/src/api/prices.rs index 7e32a264c4..5192ead881 100644 --- a/lib/ain-ocean/src/api/prices.rs +++ b/lib/ain-ocean/src/api/prices.rs @@ -332,7 +332,6 @@ pub struct OraclePriceAggregatedIntervalResponse { pub currency: Currency, pub aggregated: OraclePriceAggregatedIntervalAggregatedResponse, pub block: BlockContext, - } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -381,7 +380,9 @@ async fn get_feed_with_interval( let next = query .next .map(|q| { - let height = q.parse::()?.to_be_bytes(); + let height = hex::decode(q)? + .try_into() + .map_err(|_| Error::ToArrayError)?; Ok::<[u8; 4], Error>(height) }) .transpose()? From 96f101e4079982ee28f7f86cbe51b6a8fd85c11c Mon Sep 17 00:00:00 2001 From: canonbrother Date: Thu, 19 Dec 2024 14:11:12 +0800 Subject: [PATCH 25/31] skip --- lib/ain-ocean/src/indexer/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/ain-ocean/src/indexer/mod.rs b/lib/ain-ocean/src/indexer/mod.rs index 628821b62e..d72250fb57 100644 --- a/lib/ain-ocean/src/indexer/mod.rs +++ b/lib/ain-ocean/src/indexer/mod.rs @@ -381,7 +381,7 @@ fn invalidate_script(services: &Arc, ctx: &Context, txs: &[Transaction let Some(vout) = find_tx_vout(services, &vin, txs)? else { if is_skipped_tx(&vin.txid) { - return Ok(()); + continue; }; return Err(Error::NotFoundIndex { From 57b07bfdc88dbc6b86f2008c0879b04b2b304da8 Mon Sep 17 00:00:00 2001 From: canonbrother Date: Thu, 19 Dec 2024 18:42:58 +0800 Subject: [PATCH 26/31] decimal --- lib/ain-ocean/src/api/prices.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/ain-ocean/src/api/prices.rs b/lib/ain-ocean/src/api/prices.rs index 5192ead881..7b55a01165 100644 --- a/lib/ain-ocean/src/api/prices.rs +++ b/lib/ain-ocean/src/api/prices.rs @@ -213,7 +213,7 @@ async fn get_feed( token: token.clone(), currency: currency.clone(), aggregated: OraclePriceAggregatedAggregatedResponse { - amount: format!("{:.8}", v.aggregated.amount), + amount: format!("{:.8}", v.aggregated.amount / Decimal::from(COIN)), weightage: v.aggregated.weightage.to_i32().unwrap_or_default(), oracles: OraclePriceActiveNextOraclesResponse { active: v.aggregated.oracles.active.to_i32().unwrap_or_default(), From 82d7bb6b23d8ecb590931e69a6680339afb638c3 Mon Sep 17 00:00:00 2001 From: canonbrother Date: Thu, 19 Dec 2024 20:46:22 +0800 Subject: [PATCH 27/31] update get_price api --- lib/ain-ocean/src/api/prices.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/lib/ain-ocean/src/api/prices.rs b/lib/ain-ocean/src/api/prices.rs index 7b55a01165..7d2c6a22a6 100644 --- a/lib/ain-ocean/src/api/prices.rs +++ b/lib/ain-ocean/src/api/prices.rs @@ -150,16 +150,15 @@ async fn get_price( ) -> Result>> { let (token, currency) = parse_token_currency(&key)?; + let sort_key = price_repo.by_key.get(&(token.clone(), currency.clone()))?; + let Some(sort_key) = sort_key else { + return Ok(Response::new(None)); + }; let price_ticker = ctx .services .price_ticker .by_id - .list( - Some(([0xffu8; 4], [0xffu8; 4], token.clone(), currency.clone())), - SortOrder::Descending, - )? - .next() - .transpose()?; + .get(&sort_key)?; let Some((_, price_ticker)) = price_ticker else { return Ok(Response::new(None)); From d5ded9560fa5a7c8428ee4d1cd23d8f7e881eef7 Mon Sep 17 00:00:00 2001 From: canonbrother Date: Thu, 19 Dec 2024 20:49:00 +0800 Subject: [PATCH 28/31] update get_price api --- lib/ain-ocean/src/api/prices.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/lib/ain-ocean/src/api/prices.rs b/lib/ain-ocean/src/api/prices.rs index 7d2c6a22a6..2d4999900f 100644 --- a/lib/ain-ocean/src/api/prices.rs +++ b/lib/ain-ocean/src/api/prices.rs @@ -150,15 +150,12 @@ async fn get_price( ) -> Result>> { let (token, currency) = parse_token_currency(&key)?; + let price_repo = &ctx.services.price_ticker; let sort_key = price_repo.by_key.get(&(token.clone(), currency.clone()))?; let Some(sort_key) = sort_key else { return Ok(Response::new(None)); }; - let price_ticker = ctx - .services - .price_ticker - .by_id - .get(&sort_key)?; + let price_ticker = price_repo.by_id.get(&sort_key)?; let Some((_, price_ticker)) = price_ticker else { return Ok(Response::new(None)); From 9934c5b70b0d2e94ae9be2aaf7af6db667e87757 Mon Sep 17 00:00:00 2001 From: canonbrother Date: Thu, 19 Dec 2024 20:54:11 +0800 Subject: [PATCH 29/31] update get_price api --- lib/ain-ocean/src/api/prices.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/ain-ocean/src/api/prices.rs b/lib/ain-ocean/src/api/prices.rs index 2d4999900f..e1dd0ebf28 100644 --- a/lib/ain-ocean/src/api/prices.rs +++ b/lib/ain-ocean/src/api/prices.rs @@ -157,7 +157,7 @@ async fn get_price( }; let price_ticker = price_repo.by_id.get(&sort_key)?; - let Some((_, price_ticker)) = price_ticker else { + let Some(price_ticker) = price_ticker else { return Ok(Response::new(None)); }; From cff077f266d6bce5846edc10ccaf941b9cde53dc Mon Sep 17 00:00:00 2001 From: canonbrother Date: Thu, 19 Dec 2024 21:09:19 +0800 Subject: [PATCH 30/31] rename --- lib/ain-ocean/src/indexer/oracle.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index e9138f298b..1c521a2d7a 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -270,6 +270,7 @@ fn map_price_aggregated( pair: &(Token, Currency), ) -> Result> { let (token, currency) = pair; + let max_txid = Txid::from_byte_array([0xffu8; 32]); let oracles = services .oracle_token_currency @@ -278,7 +279,7 @@ fn map_price_aggregated( Some(( token.clone(), currency.clone(), - Txid::from_byte_array([0xffu8; 32]), + max_txid, )), SortOrder::Descending, )? @@ -290,7 +291,6 @@ fn map_price_aggregated( let mut aggregated_count = Decimal::zero(); let mut aggregated_weightage = Decimal::zero(); - let base_id = Txid::from_byte_array([0xffu8; 32]); let oracles_len = oracles.len(); for (id, oracle) in oracles { if oracle.weightage == 0 { @@ -302,7 +302,7 @@ fn map_price_aggregated( .oracle_price_feed .by_id .list( - Some((id.0.clone(), id.1.clone(), id.2, [0xffu8; 4], base_id)), + Some((id.0.clone(), id.1.clone(), id.2, [0xffu8; 4], max_txid)), SortOrder::Descending, )? .take_while(|item| match item { From 7515ded16d5264d9666f7c4d714c9950a5044026 Mon Sep 17 00:00:00 2001 From: canonbrother Date: Thu, 19 Dec 2024 21:10:00 +0800 Subject: [PATCH 31/31] fmt --- lib/ain-ocean/src/indexer/oracle.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/lib/ain-ocean/src/indexer/oracle.rs b/lib/ain-ocean/src/indexer/oracle.rs index 1c521a2d7a..4874095dce 100644 --- a/lib/ain-ocean/src/indexer/oracle.rs +++ b/lib/ain-ocean/src/indexer/oracle.rs @@ -276,11 +276,7 @@ fn map_price_aggregated( .oracle_token_currency .by_id .list( - Some(( - token.clone(), - currency.clone(), - max_txid, - )), + Some((token.clone(), currency.clone(), max_txid)), SortOrder::Descending, )? .take_while(|item| matches!(item, Ok((k, _)) if &k.0 == token && &k.1 == currency))