Skip to content

Commit

Permalink
Ocean: fixes (#3128)
Browse files Browse the repository at this point in the history
* refix prices db

* filter_map as set

* fmt_rs

* note

* filter_map to get sub_root

* rm clone

* clean

* only store latest data in db

* add PriceTickerKey

* fmt_rs

* fix oracle interval type

* rm debug

* take(1) to next

* missing grab prefix - take_while

* fmt_rs

* rm unuse

* fix oracle interval index

* fix oracle interval index 2

* missing dash

* fix missing script_agg

* fix OraclePriceAggregatedIntervalResponse

* fix OraclePriceAggregatedIntervalResponse sort

* fix OraclePriceAggregatedIntervalResponse sort

* fix OraclePriceAggregatedIntervalResponse sort

* skip

* decimal

* update get_price api

* update get_price api

* update get_price api

* rename

* fmt
  • Loading branch information
canonbrother authored Dec 24, 2024
1 parent 997a654 commit 75c0f17
Show file tree
Hide file tree
Showing 16 changed files with 262 additions and 181 deletions.
13 changes: 6 additions & 7 deletions lib/ain-ocean/src/api/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,19 +174,18 @@ fn get_latest_aggregation(
.script_aggregation
.by_id
.list(Some((hid, [0xffu8; 4])), SortOrder::Descending)?
.take(1)
.take_while(|item| match item {
Ok(((v, _), _)) => v == &hid,
_ => true,
})
.next()
.transpose()?
.map(|item| {
let (_, v) = item?;
let res = v.into();
Ok(res)
})
.collect::<Result<Vec<_>>>()?;
let (_, v) = item;
v.into()
});

Ok(latest.first().cloned())
Ok(latest)
}

#[ocean_endpoint]
Expand Down
29 changes: 28 additions & 1 deletion lib/ain-ocean/src/api/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ use super::query::PaginationQuery;
use crate::{
error::{
Error::ToArrayError, InvalidAmountSnafu, InvalidFixedIntervalPriceSnafu,
InvalidPoolPairSymbolSnafu, InvalidTokenCurrencySnafu,
InvalidPoolPairSymbolSnafu, InvalidPriceTickerSortKeySnafu, InvalidTokenCurrencySnafu,
},
hex_encoder::as_sha256,
model::PriceTickerId,
network::Network,
Result,
};
Expand Down Expand Up @@ -128,6 +129,32 @@ pub fn parse_query_height_txid(item: &str) -> Result<(u32, Txid)> {
Ok((height, txid))
}

pub fn parse_price_ticker_sort(item: &str) -> Result<PriceTickerId> {
let mut parts = item.split('-');
let count_height_token = parts
.next()
.context(InvalidPriceTickerSortKeySnafu { item })?;
let encoded_count = &count_height_token[..8];
let encoded_height = &count_height_token[8..16];
let token = &count_height_token[16..];
let token = token.to_string();

let count: [u8; 4] = hex::decode(encoded_count)?
.try_into()
.map_err(|_| ToArrayError)?;

let height: [u8; 4] = hex::decode(encoded_height)?
.try_into()
.map_err(|_| ToArrayError)?;

let currency = parts
.next()
.context(InvalidTokenCurrencySnafu { item })?
.to_string();

Ok((count, height, token, currency))
}

#[must_use]
pub fn format_number(v: Decimal) -> String {
if v == dec!(0) {
Expand Down
9 changes: 8 additions & 1 deletion lib/ain-ocean/src/api/loan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,14 @@ async fn list_loan_token(
.services
.oracle_price_active
.by_id
.list(Some((token, currency, [0xffu8; 4])), SortOrder::Descending)?
.list(
Some((token.clone(), currency.clone(), [0xffu8; 4])),
SortOrder::Descending,
)?
.take_while(|item| match item {
Ok((k, _)) => k.0 == token && k.1 == currency,
_ => true,
})
.next()
.map(|item| {
let (_, v) = item?;
Expand Down
2 changes: 1 addition & 1 deletion lib/ain-ocean/src/api/pool_pair/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl PoolSwapVerboseResponse {
Self {
id: format!("{}-{}", v.pool_id, v.txid),
sort: format!(
"{}{}",
"{}-{}",
hex::encode(v.block.height.to_be_bytes()),
hex::encode(v.txno.to_be_bytes()),
),
Expand Down
16 changes: 8 additions & 8 deletions lib/ain-ocean/src/api/pool_pair/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,22 +594,22 @@ fn call_dftx(ctx: &Arc<AppContext>, txid: Txid) -> Result<Option<DfTx>> {
.transaction
.vout_by_id
.list(Some((txid, 0)), SortOrder::Ascending)?
.take(1)
.take_while(|item| match item {
Ok((_, vout)) => vout.txid == txid,
_ => true,
})
.next()
.transpose()?
.map(|item| {
let (_, v) = item?;
Ok(v)
})
.collect::<Result<Vec<_>>>()?;
let (_, v) = item;
v
});

if vout.is_empty() {
let Some(vout) = vout else {
return Ok(None);
}
};

let bytes = &vout[0].script.hex;
let bytes = &vout.script.hex;
if bytes.len() > 6 && bytes[0] == 0x6a && bytes[1] <= 0x4e {
let offset = 1 + match bytes[1] {
0x4c => 2,
Expand Down
99 changes: 51 additions & 48 deletions lib/ain-ocean/src/api/prices.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashSet, str::FromStr, sync::Arc};
use std::{str::FromStr, sync::Arc};

use ain_dftx::{Currency, Token, Weightage, COIN};
use ain_macros::ocean_endpoint;
Expand All @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize};
use serde_with::skip_serializing_none;

use super::{
common::parse_token_currency,
common::{parse_price_ticker_sort, parse_token_currency},
oracle::OraclePriceFeedResponse,
query::PaginationQuery,
response::{ApiPagedResponse, Response},
Expand All @@ -23,7 +23,7 @@ use crate::{
error::{ApiError, Error},
model::{
BlockContext, OracleIntervalSeconds, OraclePriceActive,
OraclePriceAggregatedIntervalAggregated, PriceTicker,
OraclePriceAggregatedIntervalAggregatedOracles, PriceTicker,
},
storage::{RepositoryOps, SortOrder},
Result,
Expand Down Expand Up @@ -119,28 +119,24 @@ async fn list_prices(
Query(query): Query<PaginationQuery>,
Extension(ctx): Extension<Arc<AppContext>>,
) -> Result<ApiPagedResponse<PriceTickerResponse>> {
let mut set: HashSet<(Token, Currency)> = HashSet::new();
let next = query
.next
.map(|item| {
let id = parse_price_ticker_sort(&item)?;
Ok::<([u8; 4], [u8; 4], Token, Currency), Error>(id)
})
.transpose()?;

let prices = ctx
.services
.price_ticker
.by_id
.list(None, SortOrder::Descending)?
.flat_map(|item| {
.list(next, SortOrder::Descending)?
.map(|item| {
let ((_, _, token, currency), v) = item?;
let has_key = set.contains(&(token.clone(), currency.clone()));
if !has_key {
set.insert((token.clone(), currency.clone()));
Ok::<Option<PriceTickerResponse>, Error>(Some(PriceTickerResponse::from((
(token, currency),
v,
))))
} else {
Ok(None)
}
Ok(PriceTickerResponse::from(((token, currency), v)))
})
.flatten()
.collect::<Vec<_>>();
.collect::<Result<Vec<_>>>()?;

Ok(ApiPagedResponse::of(prices, query.size, |price| {
price.sort.to_string()
Expand All @@ -154,18 +150,14 @@ async fn get_price(
) -> Result<Response<Option<PriceTickerResponse>>> {
let (token, currency) = parse_token_currency(&key)?;

let price_ticker = ctx
.services
.price_ticker
.by_id
.list(
Some(([0xffu8; 4], [0xffu8; 4], token.clone(), currency.clone())),
SortOrder::Descending,
)?
.next()
.transpose()?;
let price_repo = &ctx.services.price_ticker;
let sort_key = price_repo.by_key.get(&(token.clone(), currency.clone()))?;
let Some(sort_key) = sort_key else {
return Ok(Response::new(None));
};
let price_ticker = price_repo.by_id.get(&sort_key)?;

let Some((_, price_ticker)) = price_ticker else {
let Some(price_ticker) = price_ticker else {
return Ok(Response::new(None));
};

Expand Down Expand Up @@ -217,7 +209,7 @@ async fn get_feed(
token: token.clone(),
currency: currency.clone(),
aggregated: OraclePriceAggregatedAggregatedResponse {
amount: format!("{:.8}", v.aggregated.amount),
amount: format!("{:.8}", v.aggregated.amount / Decimal::from(COIN)),
weightage: v.aggregated.weightage.to_i32().unwrap_or_default(),
oracles: OraclePriceActiveNextOraclesResponse {
active: v.aggregated.oracles.active.to_i32().unwrap_or_default(),
Expand Down Expand Up @@ -334,8 +326,19 @@ pub struct OraclePriceAggregatedIntervalResponse {
pub sort: String, // medianTime-height
pub token: Token,
pub currency: Currency,
pub aggregated: OraclePriceAggregatedIntervalAggregated,
pub aggregated: OraclePriceAggregatedIntervalAggregatedResponse,
pub block: BlockContext,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct OraclePriceAggregatedIntervalAggregatedResponse {
#[serde(with = "rust_decimal::serde::str")]
pub amount: Decimal,
#[serde(with = "rust_decimal::serde::str")]
pub weightage: Decimal,
pub count: i32,
pub oracles: OraclePriceAggregatedIntervalAggregatedOracles,
/**
* Aggregated interval time range in seconds.
* - Interval that aggregated in seconds
Expand All @@ -362,23 +365,26 @@ async fn get_feed_with_interval(
let (token, currency) = parse_token_currency(&key)?;
let interval = interval.parse::<i64>()?;

let interval_type = match interval {
let interval = match interval {
900 => OracleIntervalSeconds::FifteenMinutes,
3600 => OracleIntervalSeconds::OneHour,
86400 => OracleIntervalSeconds::OneDay,
_ => return Err(From::from("Invalid oracle interval")),
};
let interval = interval as u32;

let next = query
.next
.map(|q| {
let height = q.parse::<u32>()?.to_be_bytes();
let height = hex::decode(q)?
.try_into()
.map_err(|_| Error::ToArrayError)?;
Ok::<[u8; 4], Error>(height)
})
.transpose()?
.unwrap_or([0xffu8; 4]);

let id = (token.clone(), currency.clone(), interval_type.clone(), next);
let id = (token.clone(), currency.clone(), interval.to_string(), next);

let items = ctx
.services
Expand All @@ -388,40 +394,37 @@ async fn get_feed_with_interval(
.take(query.size)
.take_while(|item| match item {
Ok(((t, c, i, _), _)) => {
t == &token.clone() && c == &currency.clone() && i == &interval_type.clone()
t == &token.clone() && c == &currency.clone() && i == &interval.to_string()
}
_ => true,
})
.flatten()
.collect::<Vec<_>>();

let interval = interval as i64;
let mut prices = Vec::new();
for (id, item) in items {
let start = item.block.median_time - (item.block.median_time % interval);
let height = u32::from_be_bytes(id.3);

let price = OraclePriceAggregatedIntervalResponse {
id: format!("{}-{}-{:?}-{}", id.0, id.1, id.2, height),
key: format!("{}-{}-{:?}", id.0, id.1, id.2),
sort: format!(
"{}{}",
hex::encode(item.block.median_time.to_be_bytes()),
hex::encode(item.block.height.to_be_bytes()),
),
id: format!("{}-{}-{}-{}", id.0, id.1, id.2, height),
key: format!("{}-{}-{}", id.0, id.1, id.2),
sort: hex::encode(item.block.height.to_be_bytes()).to_string(),
token: token.clone(),
currency: currency.clone(),
aggregated: OraclePriceAggregatedIntervalAggregated {
aggregated: OraclePriceAggregatedIntervalAggregatedResponse {
amount: item.aggregated.amount,
weightage: item.aggregated.weightage,
oracles: item.aggregated.oracles,
count: item.aggregated.count,
time: OraclePriceAggregatedIntervalTime {
interval,
start,
end: start + interval,
},
},
block: item.block,
time: OraclePriceAggregatedIntervalTime {
interval,
start,
end: start + interval,
},
};
prices.push(price);
}
Expand Down
5 changes: 3 additions & 2 deletions lib/ain-ocean/src/api/stats/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
stats::get_block_reward_distribution,
AppContext,
},
error::{DecimalConversionSnafu, Error, OtherSnafu},
error::{DecimalConversionSnafu, OtherSnafu},
model::MasternodeStatsData,
storage::{RepositoryOps, SortOrder},
Result,
Expand Down Expand Up @@ -111,7 +111,8 @@ pub async fn get_count(ctx: &Arc<AppContext>) -> Result<Count> {
.by_id
.list(None, SortOrder::Descending)?
.filter_map(|item| {
item.ok().map(|((_, _, token, currency), _)| (token, currency))
item.ok()
.map(|((_, _, token, currency), _)| (token, currency))
})
.collect::<HashSet<(Token, Currency)>>();

Expand Down
6 changes: 6 additions & 0 deletions lib/ain-ocean/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid price ticker sort key: {}", item))]
InvalidPriceTickerSortKey {
item: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid amount format: {}", item))]
InvalidAmount {
item: String,
Expand Down
Loading

0 comments on commit 75c0f17

Please sign in to comment.