Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Prevent repeated calls to failed exchanges #264

Merged
merged 7 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 133 additions & 93 deletions src/xrc/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,88 +33,105 @@ const STABLECOIN_BASES: &[&str] = &[DAI, USDC];
const MIN_NUM_RATES_FOR_PRIVILEGED_CANISTERS: usize =
if cfg!(feature = "ipv4-support") { 3 } else { 2 };

#[derive(Clone, Debug)]
struct QueriedExchangeRateWithFailedExchanges {
queried_exchange_rate: QueriedExchangeRate,
failed_exchanges: Vec<Exchange>,
}

#[async_trait]
trait CallExchanges {
async fn get_cryptocurrency_usdt_rate(
&self,
exchanges: &[&Exchange],
asset: &Asset,
timestamp: u64,
) -> Result<QueriedExchangeRate, CallExchangeError>;
) -> Result<QueriedExchangeRateWithFailedExchanges, CallExchangeError>;

async fn get_stablecoin_rates(
&self,
exchanges: &[&Exchange],
symbols: &[&str],
timestamp: u64,
) -> Vec<Result<QueriedExchangeRate, CallExchangeError>>;
) -> Vec<Result<QueriedExchangeRateWithFailedExchanges, CallExchangeError>>;
}

struct CallExchangesImpl;

#[async_trait]
impl CallExchanges for CallExchangesImpl {
async fn get_cryptocurrency_usdt_rate(
&self,
asset: &Asset,
timestamp: u64,
) -> Result<QueriedExchangeRate, CallExchangeError> {
let futures = EXCHANGES.iter().filter_map(|exchange| {
if !exchange.is_available() {
return None;
}

Some(call_exchange(
exchange,
CallExchangeArgs {
timestamp,
quote_asset: usdt_asset(),
base_asset: asset.clone(),
},
))
});
let results = join_all(futures).await;

let mut rates = vec![];
let mut errors = vec![];
for result in results {
match result {
Ok(rate) => rates.push(rate),
Err(err) => {
ic_cdk::println!(
"{} Timestamp: {}, Asset: {:?}, Error: {}",
LOG_PREFIX,
timestamp,
asset,
err,
);
errors.push(err);
}
}
}

if rates.is_empty() {
return Err(CallExchangeError::NoRatesFound);
}

Ok(QueriedExchangeRate::new(
asset.clone(),
usdt_asset(),
timestamp,
&rates,
rates.len() + errors.len(),
rates.len(),
None,
))
&self,
exchanges: &[&Exchange],
asset: &Asset,
timestamp: u64,
) -> Result<QueriedExchangeRateWithFailedExchanges, CallExchangeError> {
let futures = exchanges.iter().map(|exchange| {
call_exchange(
exchange,
CallExchangeArgs {
timestamp,
quote_asset: usdt_asset(),
base_asset: asset.clone(),
},
)
});
let results = join_all(futures).await;

let mut rates = vec![];
let mut failed_exchanges = vec![];
for result in results {
match result {
Ok(rate) => rates.push(rate),
Err(err) => {
ic_cdk::println!(
"{} Timestamp: {}, Asset: {:?}, Error: {}",
LOG_PREFIX,
timestamp,
asset,
err,
);

if let CallExchangeError::Http { exchange, error: _} = err {
if let Some(exchange) = exchanges.iter().find(|e| e.to_string() == exchange) {
failed_exchanges.push((*exchange).clone());
}
else {
ic_cdk::println!("{} Exchange not found for failed exchanges: {} @ {}", LOG_PREFIX, exchange, timestamp);
}
}
}
}
}

if rates.is_empty() {
return Err(CallExchangeError::NoRatesFound);
}

Ok(QueriedExchangeRateWithFailedExchanges {
queried_exchange_rate: QueriedExchangeRate::new(
asset.clone(),
usdt_asset(),
timestamp,
&rates,
exchanges.len(),
rates.len(),
None,
),
failed_exchanges,
})
}

async fn get_stablecoin_rates(
&self,
exchanges: &[&Exchange],
symbols: &[&str],
timestamp: u64,
) -> Vec<Result<QueriedExchangeRate, CallExchangeError>> {
) -> Vec<Result<QueriedExchangeRateWithFailedExchanges, CallExchangeError>> {
join_all(
symbols
.iter()
.map(|symbol| get_stablecoin_rate(symbol, timestamp)),
.map(|symbol| get_stablecoin_rate(exchanges, symbol, timestamp)),
)
.await
}
Expand All @@ -136,6 +153,11 @@ pub fn usd_asset() -> Asset {
}
}

/// Helper function to get a list of available exchanges.
fn get_available_exchanges() -> Vec<&'static Exchange> {
EXCHANGES.iter().filter(|e| e.is_available()).collect::<Vec<_>>()
}

/// This function retrieves the requested rate from the exchanges. The median rate of all collected
/// rates is used as the exchange rate and a set of metadata is returned giving information on
/// how the rate was retrieved.
Expand Down Expand Up @@ -489,10 +511,11 @@ fn charge_cycles(
async fn handle_cryptocurrency_pair(
env: &impl Environment,
call_exchanges_impl: &impl CallExchanges,
request: &GetExchangeRateRequest,
request: &GetExchangeRateRequest
) -> Result<QueriedExchangeRate, ExchangeRateError> {
let requested_timestamp = get_normalized_timestamp(env, request);

let mut failed_exchanges = vec![];
let mut exchanges = get_available_exchanges();
let caller = env.caller();
let (maybe_base_rate, maybe_quote_rate) = with_cache_mut(|cache| {
(
Expand Down Expand Up @@ -544,37 +567,41 @@ async fn handle_cryptocurrency_pair(
let base_rate = match maybe_base_rate {
Some(base_rate) => base_rate,
None => {
let base_rate = call_exchanges_impl
let response = call_exchanges_impl
.get_cryptocurrency_usdt_rate(
&exchanges,
&request.base_asset,
requested_timestamp.value,
)
.await
.map_err(|_| ExchangeRateError::CryptoBaseAssetNotFound)?;
with_cache_mut(|cache| {
cache.insert(&base_rate);
cache.insert(&response.queried_exchange_rate);
});
base_rate
failed_exchanges.extend(response.failed_exchanges);
response.queried_exchange_rate
}
};
exchanges.retain(|exchange| !failed_exchanges.contains(exchange));

let quote_rate = match maybe_quote_rate {
Some(quote_rate) => quote_rate,
None => {
let quote_rate = call_exchanges_impl
let response = call_exchanges_impl
.get_cryptocurrency_usdt_rate(
&exchanges,
&request.quote_asset,
requested_timestamp.value,
)
.await
.map_err(|_| ExchangeRateError::CryptoQuoteAssetNotFound)?;
with_cache_mut(|cache| {
cache.insert(&quote_rate);
cache.insert(&response.queried_exchange_rate);
});
quote_rate
failed_exchanges.extend(response.failed_exchanges);
response.queried_exchange_rate
}
};

(base_rate / quote_rate).validate()
}),
)
Expand All @@ -588,6 +615,8 @@ async fn handle_crypto_base_fiat_quote_pair(
) -> Result<QueriedExchangeRate, ExchangeRateError> {
let requested_timestamp = get_normalized_timestamp(env, request);
let caller = env.caller();
let mut failed_exchanges_list = vec![];
let mut exchanges = get_available_exchanges();
let maybe_crypto_base_rate = with_cache_mut(|cache| {
get_rate_from_cache(
cache,
Expand All @@ -596,7 +625,6 @@ async fn handle_crypto_base_fiat_quote_pair(
requested_timestamp.value,
)
});

let mut num_rates_needed: usize = 0;
if maybe_crypto_base_rate.is_none() {
num_rates_needed = num_rates_needed.saturating_add(1);
Expand Down Expand Up @@ -657,17 +685,21 @@ async fn handle_crypto_base_fiat_quote_pair(
// Retrieve the missing stablecoin results. For each rate retrieved, cache it and add it to the
// stablecoin rates vector.
let stablecoin_results = call_exchanges_impl
.get_stablecoin_rates(&missed_stablecoin_symbols, requested_timestamp.value)
.get_stablecoin_rates(&exchanges, &missed_stablecoin_symbols, requested_timestamp.value)
.await;

stablecoin_results
.iter()
.into_iter()
.zip(missed_stablecoin_symbols)
.for_each(|(result, symbol)| match result {
Ok(rate) => {
stablecoin_rates.push(rate.clone());
Ok(QueriedExchangeRateWithFailedExchanges {
failed_exchanges,
queried_exchange_rate,
}) => {
failed_exchanges_list.extend(failed_exchanges);
stablecoin_rates.push(queried_exchange_rate.clone());
with_cache_mut(|cache| {
cache.insert(rate);
cache.insert(&queried_exchange_rate);
});
}
Err(error) => {
Expand All @@ -681,27 +713,29 @@ async fn handle_crypto_base_fiat_quote_pair(
}
});

exchanges.retain(|exchange| !failed_exchanges_list.contains(exchange));
let crypto_base_rate = match maybe_crypto_base_rate {
Some(base_rate) => base_rate,
None => {
let base_rate = call_exchanges_impl
let response = call_exchanges_impl
.get_cryptocurrency_usdt_rate(
&exchanges,
&request.base_asset,
requested_timestamp.value,
)
.await
.map_err(|_| ExchangeRateError::CryptoBaseAssetNotFound)?;
with_cache_mut(|cache| {
cache.insert(&base_rate);
cache.insert(&response.queried_exchange_rate);
});
base_rate
failed_exchanges_list.extend(response.failed_exchanges);
response.queried_exchange_rate
}
};

let stablecoin_rate = stablecoin::get_stablecoin_rate(&stablecoin_rates, &usd_asset())
.map_err(ExchangeRateError::from)?;
let crypto_usd_base_rate = crypto_base_rate * stablecoin_rate;

(crypto_usd_base_rate / forex_rate).validate()
}),
)
Expand Down Expand Up @@ -736,15 +770,12 @@ fn handle_fiat_pair(
}

async fn get_stablecoin_rate(
exchanges: &[&Exchange],
symbol: &str,
timestamp: u64,
) -> Result<QueriedExchangeRate, CallExchangeError> {
) -> Result<QueriedExchangeRateWithFailedExchanges, CallExchangeError> {
let mut futures = vec![];
EXCHANGES.iter().for_each(|exchange| {
if !cfg!(feature = "ipv4-support") && !exchange.supports_ipv6() {
return;
}

exchanges.iter().for_each(|exchange| {
THLO marked this conversation as resolved.
Show resolved Hide resolved
let maybe_pair = exchange
.supported_stablecoin_pairs()
.iter()
Expand All @@ -769,8 +800,7 @@ async fn get_stablecoin_rate(
let results = join_all(futures).await;

let mut rates = vec![];
let mut errors = vec![];

let mut failed_exchanges = vec![];
for result in results {
match result {
Ok(rate) => rates.push(rate),
Expand All @@ -782,7 +812,14 @@ async fn get_stablecoin_rate(
timestamp,
error
);
errors.push(error);
if let CallExchangeError::Http { exchange, error: _} = error {
if let Some(exchange) = exchanges.iter().find(|e| e.to_string() == exchange) {
failed_exchanges.push((*exchange).clone());
}
else {
ic_cdk::println!("{} Exchange not found for failed exchanges: {} @ {}", LOG_PREFIX, exchange, timestamp);
}
}
}
}
}
Expand All @@ -791,18 +828,21 @@ async fn get_stablecoin_rate(
return Err(CallExchangeError::NoRatesFound);
}

Ok(QueriedExchangeRate::new(
Asset {
Ok(QueriedExchangeRateWithFailedExchanges {
queried_exchange_rate: QueriedExchangeRate::new(
Asset {
symbol: symbol.to_string(),
class: AssetClass::Cryptocurrency,
},
usdt_asset(),
timestamp,
&rates,
rates.len() + errors.len(),
rates.len(),
None,
))
},
usdt_asset(),
timestamp,
&rates,
exchanges.len(),
rates.len(),
None,
),
failed_exchanges,
})
}

async fn call_exchange_for_stablecoin(
Expand Down
Loading
Loading