Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ocean: fixes #3128

Merged
merged 31 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
ccae506
refix prices db
canonbrother Nov 28, 2024
583ca3f
filter_map as set
canonbrother Nov 28, 2024
186dbcf
fmt_rs
canonbrother Nov 28, 2024
813cbce
note
canonbrother Nov 29, 2024
850fa5a
filter_map to get sub_root
canonbrother Dec 4, 2024
ea3f3c6
rm clone
canonbrother Dec 4, 2024
746fdb7
clean
canonbrother Dec 5, 2024
0f652e3
only store latest data in db
canonbrother Dec 6, 2024
baac91c
add PriceTickerKey
canonbrother Dec 9, 2024
7421bf9
fmt_rs
canonbrother Dec 9, 2024
f3080d6
fix oracle interval type
canonbrother Dec 10, 2024
cac09f3
rm debug
canonbrother Dec 12, 2024
ad59ab7
take(1) to next
canonbrother Dec 12, 2024
ffa2aa2
missing grab prefix - take_while
canonbrother Dec 12, 2024
b6c50cd
fmt_rs
canonbrother Dec 14, 2024
29246ff
rm unuse
canonbrother Dec 16, 2024
de33a9b
fix oracle interval index
canonbrother Dec 16, 2024
583c635
fix oracle interval index 2
canonbrother Dec 17, 2024
4c58253
missing dash
canonbrother Dec 18, 2024
ada8b8c
fix missing script_agg
canonbrother Dec 18, 2024
f446bf3
fix OraclePriceAggregatedIntervalResponse
canonbrother Dec 19, 2024
9af4ce8
fix OraclePriceAggregatedIntervalResponse sort
canonbrother Dec 19, 2024
24a7145
fix OraclePriceAggregatedIntervalResponse sort
canonbrother Dec 19, 2024
be0e724
fix OraclePriceAggregatedIntervalResponse sort
canonbrother Dec 19, 2024
96f101e
skip
canonbrother Dec 19, 2024
57b07bf
decimal
canonbrother Dec 19, 2024
82d7bb6
update get_price api
canonbrother Dec 19, 2024
d5ded95
update get_price api
canonbrother Dec 19, 2024
9934c5b
update get_price api
canonbrother Dec 19, 2024
cff077f
rename
canonbrother Dec 19, 2024
7515ded
fmt
canonbrother Dec 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions lib/ain-ocean/src/api/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,19 +174,18 @@ fn get_latest_aggregation(
.script_aggregation
.by_id
.list(Some((hid, [0xffu8; 4])), SortOrder::Descending)?
.take(1)
.take_while(|item| match item {
Ok(((v, _), _)) => v == &hid,
_ => true,
})
.next()
.transpose()?
.map(|item| {
let (_, v) = item?;
let res = v.into();
Ok(res)
})
.collect::<Result<Vec<_>>>()?;
let (_, v) = item;
v.into()
});

Ok(latest.first().cloned())
Ok(latest)
}

#[ocean_endpoint]
Expand Down
29 changes: 28 additions & 1 deletion lib/ain-ocean/src/api/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ use super::query::PaginationQuery;
use crate::{
error::{
Error::ToArrayError, InvalidAmountSnafu, InvalidFixedIntervalPriceSnafu,
InvalidPoolPairSymbolSnafu, InvalidTokenCurrencySnafu,
InvalidPoolPairSymbolSnafu, InvalidPriceTickerSortKeySnafu, InvalidTokenCurrencySnafu,
},
hex_encoder::as_sha256,
model::PriceTickerId,
network::Network,
Result,
};
Expand Down Expand Up @@ -128,6 +129,32 @@ pub fn parse_query_height_txid(item: &str) -> Result<(u32, Txid)> {
Ok((height, txid))
}

pub fn parse_price_ticker_sort(item: &str) -> Result<PriceTickerId> {
let mut parts = item.split('-');
let count_height_token = parts
.next()
.context(InvalidPriceTickerSortKeySnafu { item })?;
let encoded_count = &count_height_token[..8];
let encoded_height = &count_height_token[8..16];
let token = &count_height_token[16..];
let token = token.to_string();

let count: [u8; 4] = hex::decode(encoded_count)?
.try_into()
.map_err(|_| ToArrayError)?;

let height: [u8; 4] = hex::decode(encoded_height)?
.try_into()
.map_err(|_| ToArrayError)?;

let currency = parts
.next()
.context(InvalidTokenCurrencySnafu { item })?
.to_string();

Ok((count, height, token, currency))
}

#[must_use]
pub fn format_number(v: Decimal) -> String {
if v == dec!(0) {
Expand Down
9 changes: 8 additions & 1 deletion lib/ain-ocean/src/api/loan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,14 @@ async fn list_loan_token(
.services
.oracle_price_active
.by_id
.list(Some((token, currency, [0xffu8; 4])), SortOrder::Descending)?
.list(
Some((token.clone(), currency.clone(), [0xffu8; 4])),
SortOrder::Descending,
)?
.take_while(|item| match item {
Ok((k, _)) => k.0 == token && k.1 == currency,
_ => true,
})
.next()
.map(|item| {
let (_, v) = item?;
Expand Down
2 changes: 1 addition & 1 deletion lib/ain-ocean/src/api/pool_pair/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl PoolSwapVerboseResponse {
Self {
id: format!("{}-{}", v.pool_id, v.txid),
sort: format!(
"{}{}",
"{}-{}",
hex::encode(v.block.height.to_be_bytes()),
hex::encode(v.txno.to_be_bytes()),
),
Expand Down
16 changes: 8 additions & 8 deletions lib/ain-ocean/src/api/pool_pair/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,22 +594,22 @@ fn call_dftx(ctx: &Arc<AppContext>, txid: Txid) -> Result<Option<DfTx>> {
.transaction
.vout_by_id
.list(Some((txid, 0)), SortOrder::Ascending)?
.take(1)
.take_while(|item| match item {
Ok((_, vout)) => vout.txid == txid,
_ => true,
})
.next()
.transpose()?
.map(|item| {
let (_, v) = item?;
Ok(v)
})
.collect::<Result<Vec<_>>>()?;
let (_, v) = item;
v
});

if vout.is_empty() {
let Some(vout) = vout else {
return Ok(None);
}
};

let bytes = &vout[0].script.hex;
let bytes = &vout.script.hex;
if bytes.len() > 6 && bytes[0] == 0x6a && bytes[1] <= 0x4e {
let offset = 1 + match bytes[1] {
0x4c => 2,
Expand Down
99 changes: 51 additions & 48 deletions lib/ain-ocean/src/api/prices.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashSet, str::FromStr, sync::Arc};
use std::{str::FromStr, sync::Arc};

use ain_dftx::{Currency, Token, Weightage, COIN};
use ain_macros::ocean_endpoint;
Expand All @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize};
use serde_with::skip_serializing_none;

use super::{
common::parse_token_currency,
common::{parse_price_ticker_sort, parse_token_currency},
oracle::OraclePriceFeedResponse,
query::PaginationQuery,
response::{ApiPagedResponse, Response},
Expand All @@ -23,7 +23,7 @@ use crate::{
error::{ApiError, Error},
model::{
BlockContext, OracleIntervalSeconds, OraclePriceActive,
OraclePriceAggregatedIntervalAggregated, PriceTicker,
OraclePriceAggregatedIntervalAggregatedOracles, PriceTicker,
},
storage::{RepositoryOps, SortOrder},
Result,
Expand Down Expand Up @@ -119,28 +119,24 @@ async fn list_prices(
Query(query): Query<PaginationQuery>,
Extension(ctx): Extension<Arc<AppContext>>,
) -> Result<ApiPagedResponse<PriceTickerResponse>> {
let mut set: HashSet<(Token, Currency)> = HashSet::new();
let next = query
.next
.map(|item| {
let id = parse_price_ticker_sort(&item)?;
Ok::<([u8; 4], [u8; 4], Token, Currency), Error>(id)
})
.transpose()?;

let prices = ctx
.services
.price_ticker
.by_id
.list(None, SortOrder::Descending)?
.flat_map(|item| {
.list(next, SortOrder::Descending)?
.map(|item| {
let ((_, _, token, currency), v) = item?;
let has_key = set.contains(&(token.clone(), currency.clone()));
if !has_key {
set.insert((token.clone(), currency.clone()));
Ok::<Option<PriceTickerResponse>, Error>(Some(PriceTickerResponse::from((
(token, currency),
v,
))))
} else {
Ok(None)
}
Ok(PriceTickerResponse::from(((token, currency), v)))
})
.flatten()
.collect::<Vec<_>>();
.collect::<Result<Vec<_>>>()?;

Ok(ApiPagedResponse::of(prices, query.size, |price| {
price.sort.to_string()
Expand All @@ -154,18 +150,14 @@ async fn get_price(
) -> Result<Response<Option<PriceTickerResponse>>> {
let (token, currency) = parse_token_currency(&key)?;

let price_ticker = ctx
.services
.price_ticker
.by_id
.list(
Some(([0xffu8; 4], [0xffu8; 4], token.clone(), currency.clone())),
SortOrder::Descending,
)?
.next()
.transpose()?;
let price_repo = &ctx.services.price_ticker;
let sort_key = price_repo.by_key.get(&(token.clone(), currency.clone()))?;
let Some(sort_key) = sort_key else {
return Ok(Response::new(None));
};
let price_ticker = price_repo.by_id.get(&sort_key)?;

let Some((_, price_ticker)) = price_ticker else {
let Some(price_ticker) = price_ticker else {
return Ok(Response::new(None));
};

Expand Down Expand Up @@ -217,7 +209,7 @@ async fn get_feed(
token: token.clone(),
currency: currency.clone(),
aggregated: OraclePriceAggregatedAggregatedResponse {
amount: format!("{:.8}", v.aggregated.amount),
amount: format!("{:.8}", v.aggregated.amount / Decimal::from(COIN)),
weightage: v.aggregated.weightage.to_i32().unwrap_or_default(),
oracles: OraclePriceActiveNextOraclesResponse {
active: v.aggregated.oracles.active.to_i32().unwrap_or_default(),
Expand Down Expand Up @@ -334,8 +326,19 @@ pub struct OraclePriceAggregatedIntervalResponse {
pub sort: String, // medianTime-height
pub token: Token,
pub currency: Currency,
pub aggregated: OraclePriceAggregatedIntervalAggregated,
pub aggregated: OraclePriceAggregatedIntervalAggregatedResponse,
pub block: BlockContext,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct OraclePriceAggregatedIntervalAggregatedResponse {
#[serde(with = "rust_decimal::serde::str")]
pub amount: Decimal,
#[serde(with = "rust_decimal::serde::str")]
pub weightage: Decimal,
pub count: i32,
pub oracles: OraclePriceAggregatedIntervalAggregatedOracles,
/**
* Aggregated interval time range in seconds.
* - Interval that aggregated in seconds
Expand All @@ -362,23 +365,26 @@ async fn get_feed_with_interval(
let (token, currency) = parse_token_currency(&key)?;
let interval = interval.parse::<i64>()?;

let interval_type = match interval {
let interval = match interval {
900 => OracleIntervalSeconds::FifteenMinutes,
3600 => OracleIntervalSeconds::OneHour,
86400 => OracleIntervalSeconds::OneDay,
_ => return Err(From::from("Invalid oracle interval")),
};
let interval = interval as u32;

let next = query
.next
.map(|q| {
let height = q.parse::<u32>()?.to_be_bytes();
let height = hex::decode(q)?
.try_into()
.map_err(|_| Error::ToArrayError)?;
Ok::<[u8; 4], Error>(height)
})
.transpose()?
.unwrap_or([0xffu8; 4]);

let id = (token.clone(), currency.clone(), interval_type.clone(), next);
let id = (token.clone(), currency.clone(), interval.to_string(), next);

let items = ctx
.services
Expand All @@ -388,40 +394,37 @@ async fn get_feed_with_interval(
.take(query.size)
.take_while(|item| match item {
Ok(((t, c, i, _), _)) => {
t == &token.clone() && c == &currency.clone() && i == &interval_type.clone()
t == &token.clone() && c == &currency.clone() && i == &interval.to_string()
}
_ => true,
})
.flatten()
.collect::<Vec<_>>();

let interval = interval as i64;
let mut prices = Vec::new();
for (id, item) in items {
let start = item.block.median_time - (item.block.median_time % interval);
let height = u32::from_be_bytes(id.3);

let price = OraclePriceAggregatedIntervalResponse {
id: format!("{}-{}-{:?}-{}", id.0, id.1, id.2, height),
key: format!("{}-{}-{:?}", id.0, id.1, id.2),
sort: format!(
"{}{}",
hex::encode(item.block.median_time.to_be_bytes()),
hex::encode(item.block.height.to_be_bytes()),
),
id: format!("{}-{}-{}-{}", id.0, id.1, id.2, height),
key: format!("{}-{}-{}", id.0, id.1, id.2),
sort: hex::encode(item.block.height.to_be_bytes()).to_string(),
token: token.clone(),
currency: currency.clone(),
aggregated: OraclePriceAggregatedIntervalAggregated {
aggregated: OraclePriceAggregatedIntervalAggregatedResponse {
amount: item.aggregated.amount,
weightage: item.aggregated.weightage,
oracles: item.aggregated.oracles,
count: item.aggregated.count,
time: OraclePriceAggregatedIntervalTime {
interval,
start,
end: start + interval,
},
},
block: item.block,
time: OraclePriceAggregatedIntervalTime {
interval,
start,
end: start + interval,
},
};
prices.push(price);
}
Expand Down
5 changes: 3 additions & 2 deletions lib/ain-ocean/src/api/stats/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
stats::get_block_reward_distribution,
AppContext,
},
error::{DecimalConversionSnafu, Error, OtherSnafu},
error::{DecimalConversionSnafu, OtherSnafu},
model::MasternodeStatsData,
storage::{RepositoryOps, SortOrder},
Result,
Expand Down Expand Up @@ -111,7 +111,8 @@ pub async fn get_count(ctx: &Arc<AppContext>) -> Result<Count> {
.by_id
.list(None, SortOrder::Descending)?
.filter_map(|item| {
item.ok().map(|((_, _, token, currency), _)| (token, currency))
item.ok()
.map(|((_, _, token, currency), _)| (token, currency))
})
.collect::<HashSet<(Token, Currency)>>();

Expand Down
6 changes: 6 additions & 0 deletions lib/ain-ocean/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid price ticker sort key: {}", item))]
InvalidPriceTickerSortKey {
item: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid amount format: {}", item))]
InvalidAmount {
item: String,
Expand Down
Loading
Loading