diff --git a/connector/Cargo.toml b/connector/Cargo.toml index be926f2..abb5ed1 100644 --- a/connector/Cargo.toml +++ b/connector/Cargo.toml @@ -17,7 +17,7 @@ jsonrpc-core-client = { workspace = true } jsonrpc-derive = "18.0.0" jsonrpc-pubsub = "18.0.0" -solana-rpc = { workspace = true } +# note: avoid solana-rpc dependency solana-rpc-client = "1.17" solana-rpc-client-api = "1.17" solana-client = { workspace = true } diff --git a/connector/examples/call_gpa_gma_example.rs b/connector/examples/call_gpa_gma_example.rs index 90488dd..5b37a86 100644 --- a/connector/examples/call_gpa_gma_example.rs +++ b/connector/examples/call_gpa_gma_example.rs @@ -3,7 +3,6 @@ use clap::Parser; use jsonrpc_core_client::transports::http; -use mango_feeds_connector::GetProgramAccountsClient; use solana_account_decoder::UiAccountEncoding; use solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}; use solana_client::rpc_response::OptionalContext; diff --git a/connector/examples/snapshot_example.rs b/connector/examples/snapshot_example.rs index 13db3b0..0ad3aa4 100644 --- a/connector/examples/snapshot_example.rs +++ b/connector/examples/snapshot_example.rs @@ -3,12 +3,12 @@ use clap::Parser; use jsonrpc_core_client::transports::http; -use mango_feeds_connector::GetProgramAccountsClient; use solana_account_decoder::UiAccountEncoding; use solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}; use solana_client::rpc_response::OptionalContext; use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::pubkey::Pubkey; +use mango_feeds_connector::solana_rpc_minimal::rpc_accounts_scan::RpcAccountsScanClient; #[derive(Parser, Debug, Clone)] #[clap()] @@ -31,7 +31,7 @@ async fn main() -> anyhow::Result<()> { let rpc_http_url = cli.rpc_url; let program_id = cli.program_account; - let rpc_client_scan = http::connect::(&rpc_http_url) + let rpc_client_scan = http::connect::(&rpc_http_url) .await .unwrap(); diff --git a/connector/src/lib.rs b/connector/src/lib.rs index 713cf8d..614d822 100644 --- a/connector/src/lib.rs +++ b/connector/src/lib.rs @@ -4,7 +4,7 @@ pub mod grpc_plugin_source; pub mod metrics; pub mod snapshot; pub mod websocket_source; -mod solana_rpc_minimal; +pub mod solana_rpc_minimal; use itertools::Itertools; use std::str::FromStr; @@ -13,8 +13,6 @@ use { solana_sdk::{account::Account, pubkey::Pubkey}, }; -pub use solana_rpc::rpc::rpc_accounts_scan::AccountsScanClient as GetProgramAccountsClient; - pub use solana_sdk; trait AnyhowWrap { diff --git a/connector/src/snapshot.rs b/connector/src/snapshot.rs index 377ad2f..ab96c0c 100644 --- a/connector/src/snapshot.rs +++ b/connector/src/snapshot.rs @@ -5,11 +5,10 @@ use solana_client::{ rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, rpc_response::{OptionalContext, RpcKeyedAccount}, }; -use solana_rpc::rpc::rpc_accounts::AccountsDataClient; use solana_sdk::{commitment_config::CommitmentConfig, slot_history::Slot}; use crate::AnyhowWrap; -use crate::solana_rpc_minimal::{Rpc, RpcClient}; +use crate::solana_rpc_minimal::rpc_accounts_scan::RpcAccountsScanClient; /// gPA snapshot struct pub struct SnapshotProgramAccounts { @@ -28,7 +27,7 @@ pub async fn get_snapshot_gpa( rpc_http_url: String, program_id: String, ) -> anyhow::Result { - let rpc_client = http::connect::(&rpc_http_url) + let rpc_client = http::connect::(&rpc_http_url) .await .map_err_anyhow()?; @@ -69,7 +68,7 @@ pub async fn get_snapshot_gma( ) -> anyhow::Result { info!("Call patched get_snapshot_gma"); - let rpc_client = http::connect::(rpc_http_url) + let rpc_client = http::connect::(rpc_http_url) .await .map_err_anyhow()?; diff --git a/connector/src/solana_rpc_minimal.rs b/connector/src/solana_rpc_minimal.rs index 7c4fec7..d54879c 100644 --- a/connector/src/solana_rpc_minimal.rs +++ b/connector/src/solana_rpc_minimal.rs @@ -1,52 +1,138 @@ -use jsonrpc_core::Result; -use jsonrpc_derive::rpc; -use jsonrpc_pubsub::typed::Subscriber; -use solana_account_decoder::UiAccount; -use solana_rpc_client_api::config::RpcAccountInfoConfig; -use solana_rpc_client_api::response::Response as RpcResponse; -use jsonrpc_pubsub::SubscriptionId as PubSubSubscriptionId; - -/// this definition is derived from solana-rpc/rpc.rs -/// we want to avoid the heavy dependency to solana-rpc -/// the crate solana-rpc-client provides some client methods but do not expose the ```Context```we need -#[rpc] -pub trait Rpc { - type Metadata; - - #[rpc(meta, name = "getMultipleAccounts")] - fn get_multiple_accounts( - &self, - meta: Self::Metadata, - pubkey_strs: Vec, - config: Option, - ) -> Result>>>; - - // Get notification every time account data is changed - // Accepts pubkey parameter as base-58 encoded string - #[pubsub( - subscription = "accountNotification", - subscribe, - name = "accountSubscribe" - )] - fn account_subscribe( - &self, - meta: Self::Metadata, - subscriber: Subscriber>, - pubkey_str: String, - config: Option, - ); - - // Unsubscribe from account notification subscription. - #[pubsub( - subscription = "accountNotification", - unsubscribe, - name = "accountUnsubscribe" - )] - fn account_unsubscribe( - &self, - meta: Option, - id: PubSubSubscriptionId, - ) -> Result; + + +pub mod rpc_accounts_scan { + use std::sync::Arc; + use jsonrpc_core::Result; + use jsonrpc_derive::rpc; + use jsonrpc_pubsub::typed::Subscriber; + use solana_account_decoder::UiAccount; + use solana_rpc_client_api::config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}; + use solana_rpc_client_api::response::{Response as RpcResponse, RpcKeyedAccount, SlotUpdate, OptionalContext}; + use jsonrpc_pubsub::SubscriptionId as PubSubSubscriptionId; + + /// this definition is derived from solana-rpc/rpc.rs + /// we want to avoid the heavy dependency to solana-rpc + /// the crate solana-rpc-client provides some client methods but do not expose the ```Context```we need + /// + #[rpc] + pub trait RpcAccountsScan { + type Metadata; + + #[rpc(meta, name = "getProgramAccounts")] + fn get_program_accounts( + &self, + meta: Self::Metadata, + program_id_str: String, + config: Option, + ) -> Result>>; + + #[rpc(meta, name = "getMultipleAccounts")] + fn get_multiple_accounts( + &self, + meta: Self::Metadata, + pubkey_strs: Vec, + config: Option, + ) -> Result>>>; + + + } } +pub mod rpc_pubsub { + use std::sync::Arc; + use jsonrpc_core::Result; + use jsonrpc_derive::rpc; + use jsonrpc_pubsub::typed::Subscriber; + use solana_account_decoder::UiAccount; + use solana_rpc_client_api::config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}; + use solana_rpc_client_api::response::{Response as RpcResponse, RpcKeyedAccount, SlotUpdate, OptionalContext}; + use jsonrpc_pubsub::SubscriptionId as PubSubSubscriptionId; + + #[rpc] + pub trait RpcSolPubSub { + type Metadata; + + // Get notification every time account data is changed + // Accepts pubkey parameter as base-58 encoded string + #[pubsub( + subscription = "accountNotification", + subscribe, + name = "accountSubscribe" + )] + fn account_subscribe( + &self, + meta: Self::Metadata, + subscriber: Subscriber>, + pubkey_str: String, + config: Option, + ); + + // Unsubscribe from account notification subscription. + #[pubsub( + subscription = "accountNotification", + unsubscribe, + name = "accountUnsubscribe" + )] + fn account_unsubscribe( + &self, + meta: Option, + id: PubSubSubscriptionId, + ) -> jsonrpc_core::Result; + + + // Get notification every time account data owned by a particular program is changed + // Accepts pubkey parameter as base-58 encoded string + #[pubsub( + subscription = "programNotification", + subscribe, + name = "programSubscribe" + )] + fn program_subscribe( + &self, + meta: Self::Metadata, + subscriber: Subscriber>, + pubkey_str: String, + config: Option, + ); + + // Unsubscribe from account notification subscription. + #[pubsub( + subscription = "programNotification", + unsubscribe, + name = "programUnsubscribe" + )] + fn program_unsubscribe( + &self, + meta: Option, + id: PubSubSubscriptionId, + ) -> jsonrpc_core::Result; + + + // Get series of updates for all slots + #[pubsub( + subscription = "slotsUpdatesNotification", + subscribe, + name = "slotsUpdatesSubscribe" + )] + fn slots_updates_subscribe( + &self, + meta: Self::Metadata, + subscriber: Subscriber>, + ); + + // Unsubscribe from slots updates notification subscription. + #[pubsub( + subscription = "slotsUpdatesNotification", + unsubscribe, + name = "slotsUpdatesUnsubscribe" + )] + fn slots_updates_unsubscribe( + &self, + meta: Option, + id: PubSubSubscriptionId, + ) -> jsonrpc_core::Result; + + } + +} diff --git a/connector/src/websocket_source.rs b/connector/src/websocket_source.rs index 07f49f3..c0c1789 100644 --- a/connector/src/websocket_source.rs +++ b/connector/src/websocket_source.rs @@ -6,7 +6,6 @@ use solana_client::{ rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, rpc_response::{Response, RpcKeyedAccount}, }; -use solana_rpc::rpc_pubsub::RpcSolPubSubClient; use solana_sdk::{ account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey, slot_history::Slot, }; @@ -19,6 +18,7 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; +use solana_rpc_client::rpc_client::RpcClient; use tokio::time::timeout; use crate::snapshot::{ @@ -28,6 +28,7 @@ use crate::{ chain_data::SlotStatus, AccountWrite, AnyhowWrap, EntityFilter, FeedMetadata, FilterConfig, SlotUpdate, SourceConfig, }; +use crate::solana_rpc_minimal::rpc_pubsub::RpcSolPubSubClient; const SNAPSHOT_REFRESH_INTERVAL: Duration = Duration::from_secs(300); const WS_CONNECT_TIMEOUT: Duration = Duration::from_millis(5000);