diff --git a/Cargo.lock b/Cargo.lock index d1a57df14..84666a962 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3250,6 +3250,7 @@ version = "0.2.0" dependencies = [ "anyhow", "async-channel", + "async-once-cell", "async-trait", "fixed 1.11.0 (git+https://github.com/blockworks-foundation/fixed.git?branch=v1.11.0-borsh0_10-mango)", "futures 0.3.28", diff --git a/lib/client/src/account_fetcher_trait.rs b/lib/client/src/account_fetcher_trait.rs index 2545767f7..8d4a66db6 100644 --- a/lib/client/src/account_fetcher_trait.rs +++ b/lib/client/src/account_fetcher_trait.rs @@ -1,6 +1,6 @@ +use mango_feeds_connector::account_fetchers::{AccountFetcherFeeds, CachedAccountFetcher, RpcAccountFetcher}; use solana_sdk::account::AccountSharedData; use solana_sdk::pubkey::Pubkey; -use crate::account_fetchers::{AccountFetcherFeeds, CachedAccountFetcher, RpcAccountFetcher}; #[async_trait::async_trait] diff --git a/lib/client/src/account_fetchers.rs b/lib/client/src/account_fetchers.rs deleted file mode 100644 index d16c3004a..000000000 --- a/lib/client/src/account_fetchers.rs +++ /dev/null @@ -1,250 +0,0 @@ -// TODO move to feeds - -use std::collections::HashMap; -use std::sync::Arc; -use std::sync::Mutex; - -use async_once_cell::unpin::Lazy; - -use anyhow::{Context, Error}; - -use anchor_client::ClientError; -use anchor_lang::AccountDeserialize; - -use solana_client::nonblocking::rpc_client::RpcClient as RpcClientAsync; -use solana_sdk::account::{AccountSharedData, ReadableAccount}; -use solana_sdk::pubkey::Pubkey; -use crate::AccountFetcher; - - -#[async_trait::async_trait] -pub trait AccountFetcherFeeds: Sync + Send { - async fn feeds_fetch_raw_account(&self, address: &Pubkey) -> anyhow::Result; - async fn feeds_fetch_program_accounts( - &self, - program: &Pubkey, - discriminator: [u8; 8], - ) -> anyhow::Result>; -} - - -pub struct RpcAccountFetcher { - pub rpc: RpcClientAsync, -} - -#[async_trait::async_trait] -impl AccountFetcherFeeds for RpcAccountFetcher { - async fn feeds_fetch_raw_account(&self, address: &Pubkey) -> anyhow::Result { - let sdfs: Result = self.rpc - .get_account_with_commitment(address, self.rpc.commitment()) - .await - .with_context(|| format!("fetch account {}", *address))? - .value - .ok_or(ClientError::AccountNotFound) - .with_context(|| format!("fetch account {}", *address)) - .map(Into::into); - sdfs - } - - async fn feeds_fetch_program_accounts( - &self, - program: &Pubkey, - discriminator: [u8; 8], - ) -> anyhow::Result> { - use solana_account_decoder::UiAccountEncoding; - use solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}; - use solana_client::rpc_filter::{Memcmp, RpcFilterType}; - let config = RpcProgramAccountsConfig { - filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new_raw_bytes( - 0, - discriminator.to_vec(), - ))]), - account_config: RpcAccountInfoConfig { - encoding: Some(UiAccountEncoding::Base64), - commitment: Some(self.rpc.commitment()), - ..RpcAccountInfoConfig::default() - }, - with_context: Some(true), - }; - let accs = self - .rpc - .get_program_accounts_with_config(program, config) - .await?; - // convert Account -> AccountSharedData - Ok(accs - .into_iter() - .map(|(pk, acc)| (pk, acc.into())) - .collect::>()) - } -} - -struct CoalescedAsyncJob { - jobs: HashMap>>, -} - -impl Default for CoalescedAsyncJob { - fn default() -> Self { - Self { - jobs: Default::default(), - } - } -} - -impl CoalescedAsyncJob { - /// Either returns the job for `key` or registers a new job for it - fn run_coalesced + Send + 'static>( - &mut self, - key: Key, - fut: F, - ) -> Arc> { - self.jobs - .entry(key) - .or_insert_with(|| Arc::new(Lazy::new(Box::pin(fut)))) - .clone() - } - - fn remove(&mut self, key: &Key) { - self.jobs.remove(key); - } -} - -#[derive(Default)] -struct AccountCache { - accounts: HashMap, - keys_for_program_and_discriminator: HashMap<(Pubkey, [u8; 8]), Vec>, - - account_jobs: CoalescedAsyncJob>, - program_accounts_jobs: - CoalescedAsyncJob<(Pubkey, [u8; 8]), anyhow::Result>>, -} - -impl AccountCache { - fn clear(&mut self) { - self.accounts.clear(); - self.keys_for_program_and_discriminator.clear(); - } -} - -pub struct CachedAccountFetcher { - fetcher: Arc, - cache: Arc>, -} - -impl Clone for CachedAccountFetcher { - fn clone(&self) -> Self { - Self { - fetcher: self.fetcher.clone(), - cache: self.cache.clone(), - } - } -} - -impl CachedAccountFetcher { - pub fn new(fetcher: Arc) -> Self { - Self { - fetcher, - cache: Arc::new(Mutex::new(AccountCache::default())), - } - } - - pub fn clear_cache(&self) { - let mut cache = self.cache.lock().unwrap(); - cache.clear(); - } -} - -#[async_trait::async_trait] -impl AccountFetcherFeeds for CachedAccountFetcher { - async fn feeds_fetch_raw_account(&self, address: &Pubkey) -> anyhow::Result { - let fetch_job = { - let mut cache = self.cache.lock().unwrap(); - if let Some(acc) = cache.accounts.get(address) { - return Ok(acc.clone()); - } - - // Start or fetch a reference to the fetch + cache update job - let self_copy = self.clone(); - let address_copy = address.clone(); - cache.account_jobs.run_coalesced(*address, async move { - let result = self_copy.fetcher.feeds_fetch_raw_account(&address_copy).await; - let mut cache = self_copy.cache.lock().unwrap(); - - // remove the job from the job list, so it can be redone if it errored - cache.account_jobs.remove(&address_copy); - - // store a successful fetch - if let Ok(account) = result.as_ref() { - cache.accounts.insert(address_copy, account.clone()); - } - result - }) - }; - - match fetch_job.get().await { - Ok(v) => Ok(v.clone()), - // Can't clone the stored error, so need to stringize it - Err(err) => Err(anyhow::format_err!( - "fetch error in CachedAccountFetcher: {:?}", - err - )), - } - } - - async fn feeds_fetch_program_accounts( - &self, - program: &Pubkey, - discriminator: [u8; 8], - ) -> anyhow::Result> { - let cache_key = (*program, discriminator); - let fetch_job = { - let mut cache = self.cache.lock().unwrap(); - if let Some(accounts) = cache.keys_for_program_and_discriminator.get(&cache_key) { - return Ok(accounts - .iter() - .map(|pk| (*pk, cache.accounts.get(&pk).unwrap().clone())) - .collect::>()); - } - - let self_copy = self.clone(); - let program_copy = program.clone(); - cache - .program_accounts_jobs - .run_coalesced(cache_key.clone(), async move { - let result = self_copy - .fetcher - .feeds_fetch_program_accounts(&program_copy, discriminator) - .await; - let mut cache = self_copy.cache.lock().unwrap(); - cache.program_accounts_jobs.remove(&cache_key); - if let Ok(accounts) = result.as_ref() { - cache - .keys_for_program_and_discriminator - .insert(cache_key, accounts.iter().map(|(pk, _)| *pk).collect()); - for (pk, acc) in accounts.iter() { - cache.accounts.insert(*pk, acc.clone()); - } - } - result - }) - }; - - match fetch_job.get().await { - Ok(v) => Ok(v.clone()), - // Can't clone the stored error, so need to stringize it - Err(err) => Err(anyhow::format_err!( - "fetch error in CachedAccountFetcher: {:?}", - err - )), - } - } -} - - -#[test] -fn errors() { - let foo = ClientError::AccountNotFound; - - let err: anyhow::Error = foo.into(); - - -} \ No newline at end of file diff --git a/lib/client/src/chain_data.rs b/lib/client/src/chain_data.rs index 5dd0ea72d..19c31a5b9 100644 --- a/lib/client/src/chain_data.rs +++ b/lib/client/src/chain_data.rs @@ -1,2 +1,3 @@ pub use mango_feeds_connector::feeds_chain_data_fetcher::*; pub use mango_feeds_connector::chain_data::*; +pub use mango_feeds_connector::account_fetcher::*; diff --git a/lib/client/src/client.rs b/lib/client/src/client.rs index 4e54e0d89..61f920aab 100644 --- a/lib/client/src/client.rs +++ b/lib/client/src/client.rs @@ -33,12 +33,12 @@ use solana_sdk::signer::keypair; use solana_sdk::transaction::TransactionError; use crate::account_fetcher_trait::*; -use crate::account_fetchers::*; use crate::context::MangoGroupContext; use crate::gpa::{fetch_anchor_account, fetch_mango_accounts}; use crate::{jupiter, util}; use anyhow::Context; +use mango_feeds_connector::account_fetchers::{CachedAccountFetcher, RpcAccountFetcher}; use solana_sdk::account::ReadableAccount; use solana_sdk::instruction::{AccountMeta, Instruction}; use solana_sdk::signature::{Keypair, Signature}; diff --git a/lib/client/src/lib.rs b/lib/client/src/lib.rs index 37ceed8d4..1f72cfda1 100644 --- a/lib/client/src/lib.rs +++ b/lib/client/src/lib.rs @@ -4,7 +4,6 @@ pub use context::*; pub use util::*; mod account_fetcher_trait; -mod account_fetchers; mod account_fetcher_utils; pub mod account_update_stream; pub mod chain_data;