Skip to content

Commit

Permalink
add all methods + pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Jul 29, 2024
1 parent 3c34750 commit 2b55a1e
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 61 deletions.
2 changes: 1 addition & 1 deletion connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jsonrpc-core-client = { workspace = true }
jsonrpc-derive = "18.0.0"
jsonrpc-pubsub = "18.0.0"

solana-rpc = { workspace = true }
# note: avoid solana-rpc dependency
solana-rpc-client = "1.17"
solana-rpc-client-api = "1.17"
solana-client = { workspace = true }
Expand Down
1 change: 0 additions & 1 deletion connector/examples/call_gpa_gma_example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use clap::Parser;

use jsonrpc_core_client::transports::http;
use mango_feeds_connector::GetProgramAccountsClient;
use solana_account_decoder::UiAccountEncoding;
use solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig};
use solana_client::rpc_response::OptionalContext;
Expand Down
4 changes: 2 additions & 2 deletions connector/examples/snapshot_example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
use clap::Parser;

use jsonrpc_core_client::transports::http;
use mango_feeds_connector::GetProgramAccountsClient;
use solana_account_decoder::UiAccountEncoding;
use solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig};
use solana_client::rpc_response::OptionalContext;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;
use mango_feeds_connector::solana_rpc_minimal::rpc_accounts_scan::RpcAccountsScanClient;

#[derive(Parser, Debug, Clone)]
#[clap()]
Expand All @@ -31,7 +31,7 @@ async fn main() -> anyhow::Result<()> {
let rpc_http_url = cli.rpc_url;
let program_id = cli.program_account;

let rpc_client_scan = http::connect::<GetProgramAccountsClient>(&rpc_http_url)
let rpc_client_scan = http::connect::<RpcAccountsScanClient>(&rpc_http_url)
.await
.unwrap();

Expand Down
4 changes: 1 addition & 3 deletions connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub mod grpc_plugin_source;
pub mod metrics;
pub mod snapshot;
pub mod websocket_source;
mod solana_rpc_minimal;
pub mod solana_rpc_minimal;

use itertools::Itertools;
use std::str::FromStr;
Expand All @@ -13,8 +13,6 @@ use {
solana_sdk::{account::Account, pubkey::Pubkey},
};

pub use solana_rpc::rpc::rpc_accounts_scan::AccountsScanClient as GetProgramAccountsClient;

pub use solana_sdk;

trait AnyhowWrap {
Expand Down
7 changes: 3 additions & 4 deletions connector/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ use solana_client::{
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
rpc_response::{OptionalContext, RpcKeyedAccount},
};
use solana_rpc::rpc::rpc_accounts::AccountsDataClient;
use solana_sdk::{commitment_config::CommitmentConfig, slot_history::Slot};

use crate::AnyhowWrap;
use crate::solana_rpc_minimal::{Rpc, RpcClient};
use crate::solana_rpc_minimal::rpc_accounts_scan::RpcAccountsScanClient;

/// gPA snapshot struct
pub struct SnapshotProgramAccounts {
Expand All @@ -28,7 +27,7 @@ pub async fn get_snapshot_gpa(
rpc_http_url: String,
program_id: String,
) -> anyhow::Result<SnapshotProgramAccounts> {
let rpc_client = http::connect::<crate::GetProgramAccountsClient>(&rpc_http_url)
let rpc_client = http::connect::<RpcAccountsScanClient>(&rpc_http_url)
.await
.map_err_anyhow()?;

Expand Down Expand Up @@ -69,7 +68,7 @@ pub async fn get_snapshot_gma(
) -> anyhow::Result<SnapshotMultipleAccounts> {

info!("Call patched get_snapshot_gma");
let rpc_client = http::connect::<RpcClient>(rpc_http_url)
let rpc_client = http::connect::<RpcAccountsScanClient>(rpc_http_url)
.await
.map_err_anyhow()?;

Expand Down
184 changes: 135 additions & 49 deletions connector/src/solana_rpc_minimal.rs
Original file line number Diff line number Diff line change
@@ -1,52 +1,138 @@
use jsonrpc_core::Result;
use jsonrpc_derive::rpc;
use jsonrpc_pubsub::typed::Subscriber;
use solana_account_decoder::UiAccount;
use solana_rpc_client_api::config::RpcAccountInfoConfig;
use solana_rpc_client_api::response::Response as RpcResponse;
use jsonrpc_pubsub::SubscriptionId as PubSubSubscriptionId;

/// this definition is derived from solana-rpc/rpc.rs
/// we want to avoid the heavy dependency to solana-rpc
/// the crate solana-rpc-client provides some client methods but do not expose the ```Context```we need
#[rpc]
pub trait Rpc {
type Metadata;

#[rpc(meta, name = "getMultipleAccounts")]
fn get_multiple_accounts(
&self,
meta: Self::Metadata,
pubkey_strs: Vec<String>,
config: Option<RpcAccountInfoConfig>,
) -> Result<RpcResponse<Vec<Option<UiAccount>>>>;

// Get notification every time account data is changed
// Accepts pubkey parameter as base-58 encoded string
#[pubsub(
subscription = "accountNotification",
subscribe,
name = "accountSubscribe"
)]
fn account_subscribe(
&self,
meta: Self::Metadata,
subscriber: Subscriber<RpcResponse<UiAccount>>,
pubkey_str: String,
config: Option<RpcAccountInfoConfig>,
);

// Unsubscribe from account notification subscription.
#[pubsub(
subscription = "accountNotification",
unsubscribe,
name = "accountUnsubscribe"
)]
fn account_unsubscribe(
&self,
meta: Option<Self::Metadata>,
id: PubSubSubscriptionId,
) -> Result<bool>;


pub mod rpc_accounts_scan {
use std::sync::Arc;
use jsonrpc_core::Result;
use jsonrpc_derive::rpc;
use jsonrpc_pubsub::typed::Subscriber;
use solana_account_decoder::UiAccount;
use solana_rpc_client_api::config::{RpcAccountInfoConfig, RpcProgramAccountsConfig};
use solana_rpc_client_api::response::{Response as RpcResponse, RpcKeyedAccount, SlotUpdate, OptionalContext};
use jsonrpc_pubsub::SubscriptionId as PubSubSubscriptionId;

/// this definition is derived from solana-rpc/rpc.rs
/// we want to avoid the heavy dependency to solana-rpc
/// the crate solana-rpc-client provides some client methods but do not expose the ```Context```we need
///
#[rpc]
pub trait RpcAccountsScan {
type Metadata;

#[rpc(meta, name = "getProgramAccounts")]
fn get_program_accounts(
&self,
meta: Self::Metadata,
program_id_str: String,
config: Option<RpcProgramAccountsConfig>,
) -> Result<OptionalContext<Vec<RpcKeyedAccount>>>;

#[rpc(meta, name = "getMultipleAccounts")]
fn get_multiple_accounts(
&self,
meta: Self::Metadata,
pubkey_strs: Vec<String>,
config: Option<RpcAccountInfoConfig>,
) -> Result<RpcResponse<Vec<Option<UiAccount>>>>;


}

}

pub mod rpc_pubsub {
use std::sync::Arc;
use jsonrpc_core::Result;
use jsonrpc_derive::rpc;
use jsonrpc_pubsub::typed::Subscriber;
use solana_account_decoder::UiAccount;
use solana_rpc_client_api::config::{RpcAccountInfoConfig, RpcProgramAccountsConfig};
use solana_rpc_client_api::response::{Response as RpcResponse, RpcKeyedAccount, SlotUpdate, OptionalContext};
use jsonrpc_pubsub::SubscriptionId as PubSubSubscriptionId;

#[rpc]
pub trait RpcSolPubSub {
type Metadata;

// Get notification every time account data is changed
// Accepts pubkey parameter as base-58 encoded string
#[pubsub(
subscription = "accountNotification",
subscribe,
name = "accountSubscribe"
)]
fn account_subscribe(
&self,
meta: Self::Metadata,
subscriber: Subscriber<RpcResponse<UiAccount>>,
pubkey_str: String,
config: Option<RpcAccountInfoConfig>,
);

// Unsubscribe from account notification subscription.
#[pubsub(
subscription = "accountNotification",
unsubscribe,
name = "accountUnsubscribe"
)]
fn account_unsubscribe(
&self,
meta: Option<Self::Metadata>,
id: PubSubSubscriptionId,
) -> jsonrpc_core::Result<bool>;


// Get notification every time account data owned by a particular program is changed
// Accepts pubkey parameter as base-58 encoded string
#[pubsub(
subscription = "programNotification",
subscribe,
name = "programSubscribe"
)]
fn program_subscribe(
&self,
meta: Self::Metadata,
subscriber: Subscriber<RpcResponse<RpcKeyedAccount>>,
pubkey_str: String,
config: Option<RpcProgramAccountsConfig>,
);

// Unsubscribe from account notification subscription.
#[pubsub(
subscription = "programNotification",
unsubscribe,
name = "programUnsubscribe"
)]
fn program_unsubscribe(
&self,
meta: Option<Self::Metadata>,
id: PubSubSubscriptionId,
) -> jsonrpc_core::Result<bool>;


// Get series of updates for all slots
#[pubsub(
subscription = "slotsUpdatesNotification",
subscribe,
name = "slotsUpdatesSubscribe"
)]
fn slots_updates_subscribe(
&self,
meta: Self::Metadata,
subscriber: Subscriber<Arc<SlotUpdate>>,
);

// Unsubscribe from slots updates notification subscription.
#[pubsub(
subscription = "slotsUpdatesNotification",
unsubscribe,
name = "slotsUpdatesUnsubscribe"
)]
fn slots_updates_unsubscribe(
&self,
meta: Option<Self::Metadata>,
id: PubSubSubscriptionId,
) -> jsonrpc_core::Result<bool>;

}

}
3 changes: 2 additions & 1 deletion connector/src/websocket_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use solana_client::{
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
rpc_response::{Response, RpcKeyedAccount},
};
use solana_rpc::rpc_pubsub::RpcSolPubSubClient;
use solana_sdk::{
account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey, slot_history::Slot,
};
Expand All @@ -19,6 +18,7 @@ use std::{
sync::Arc,
time::{Duration, Instant},
};
use solana_rpc_client::rpc_client::RpcClient;
use tokio::time::timeout;

use crate::snapshot::{
Expand All @@ -28,6 +28,7 @@ use crate::{
chain_data::SlotStatus, AccountWrite, AnyhowWrap, EntityFilter, FeedMetadata, FilterConfig,
SlotUpdate, SourceConfig,
};
use crate::solana_rpc_minimal::rpc_pubsub::RpcSolPubSubClient;

const SNAPSHOT_REFRESH_INTERVAL: Duration = Duration::from_secs(300);
const WS_CONNECT_TIMEOUT: Duration = Duration::from_millis(5000);
Expand Down

0 comments on commit 2b55a1e

Please sign in to comment.