Skip to content

Commit

Permalink
Work around gMA and grpc account subscription limits
Browse files Browse the repository at this point in the history
  • Loading branch information
ckamm committed Apr 25, 2024
1 parent 5e72a9e commit 8e465c4
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 26 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ clap = { version = "3.1.8", features = ["derive", "env"] }

tokio = { version = "1", features = ["full"] }
tokio-tungstenite = "0.17"
tokio-stream = { version = "0.1.9"}
rustls = "0.20.8"

warp = "0.3"
Expand Down
1 change: 1 addition & 0 deletions connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ solana-sdk = { workspace = true }
solana-logger = { workspace = true }

tokio = { workspace = true }
tokio-stream = { workspace = true }
rustls = { workspace = true }

serde = { workspace = true }
Expand Down
93 changes: 71 additions & 22 deletions connector/src/grpc_plugin_source.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use futures::stream::once;
use futures::stream::{self, once};
use itertools::Itertools;
use jsonrpc_core::futures::StreamExt;

use solana_account_decoder::UiAccount;
use solana_sdk::{account::Account, pubkey::Pubkey};

use futures::{future, future::FutureExt};
use tokio_stream::StreamMap;
use yellowstone_grpc_proto::tonic::{
metadata::MetadataValue,
transport::{Certificate, Channel, ClientTlsConfig, Identity},
Expand All @@ -29,6 +31,10 @@ use crate::{
};
use crate::{EntityFilter, FilterConfig};

const MAX_GRPC_ACCOUNT_SUBSCRIPTIONS: usize = 100;
const MAX_GMA_ACCOUNTS: usize = 100;
const MAX_PARALLEL_GMA_REQUESTS: usize = 4;

struct SnapshotData {
slot: u64,
accounts: Vec<(String, Option<UiAccount>)>,
Expand Down Expand Up @@ -122,21 +128,46 @@ async fn feed_data_geyser(
},
);

let request = SubscribeRequest {
accounts,
blocks,
blocks_meta,
entry: Default::default(),
commitment: None,
slots,
transactions,
accounts_data_slice: vec![],
ping: None,
};
info!("Going to send request: {:?}", request);
let mut subscriptions = StreamMap::new();

{
let request = SubscribeRequest {
accounts: Default::default(),
blocks,
blocks_meta,
entry: Default::default(),
commitment: None,
slots,
transactions,
accounts_data_slice: vec![],
ping: None,
};
let response = client.subscribe(once(async move { request })).await?;
subscriptions.insert(usize::MAX, response.into_inner());
}

let response = client.subscribe(once(async move { request })).await?;
let mut update_stream = response.into_inner();
// account subscriptions may have at most 100 at a time
let account_chunks = accounts
.into_iter()
.chunks(MAX_GRPC_ACCOUNT_SUBSCRIPTIONS)
.into_iter()
.map(|chunk| chunk.collect::<HashMap<String, SubscribeRequestFilterAccounts>>())
.collect_vec();
for (i, accounts) in account_chunks.into_iter().enumerate() {
let request = SubscribeRequest {
accounts,
blocks: Default::default(),
blocks_meta: Default::default(),
entry: Default::default(),
commitment: None,
slots: Default::default(),
transactions: Default::default(),
accounts_data_slice: vec![],
ping: None,
};
let response = client.subscribe(once(async move { request })).await?;
subscriptions.insert(i, response.into_inner());
}

// We can't get a snapshot immediately since the finalized snapshot would be for a
// slot in the past and we'd be missing intermediate updates.
Expand Down Expand Up @@ -169,7 +200,7 @@ async fn feed_data_geyser(
// which will have "finalized" commitment.
let mut rooted_to_finalized_slots = 30;

let mut snapshot_gma = future::Fuse::terminated();
let (snapshot_gma_sender, mut snapshot_gma_receiver) = tokio::sync::mpsc::unbounded_channel();
let mut snapshot_gpa = future::Fuse::terminated();

// The plugin sends a ping every 5s or so
Expand Down Expand Up @@ -197,9 +228,13 @@ async fn feed_data_geyser(

loop {
tokio::select! {
update = update_stream.next() => {
update = subscriptions.next() => {
let Some(data) = update
else {
anyhow::bail!("geyser plugin has closed the stream");
};
use subscribe_update::UpdateOneof;
let mut update = update.ok_or(anyhow::anyhow!("geyser plugin has closed the stream"))??;
let mut update = data.1?;
match update.update_oneof.as_mut().expect("invalid grpc") {
UpdateOneof::Slot(slot_update) => {
let status = slot_update.status;
Expand All @@ -219,8 +254,18 @@ async fn feed_data_geyser(
snapshot_needed = false;
match &filter_config.entity_filter {
EntityFilter::FilterByAccountIds(account_ids) => {
let account_ids_typed = account_ids.iter().map(Pubkey::to_string).collect();
snapshot_gma = tokio::spawn(get_snapshot_gma(snapshot_rpc_http_url.clone(), account_ids_typed)).fuse();
let account_ids_typed = account_ids.iter().map(Pubkey::to_string).collect_vec();
let sender = snapshot_gma_sender.clone();
let snapshot_rpc_http_url = snapshot_rpc_http_url.clone();
tokio::spawn(async move {
stream::iter(account_ids_typed).chunks(MAX_GMA_ACCOUNTS).map(|keys| async {
let snapshot = get_snapshot_gma(&snapshot_rpc_http_url, keys).await;
sender.send(snapshot).unwrap();
}).buffer_unordered(MAX_PARALLEL_GMA_REQUESTS)
// just consume, we already sent the data into the channel
.for_each(|_| async {})
.await
});
},
EntityFilter::FilterByProgramId(program_id) => {
snapshot_gpa = tokio::spawn(get_snapshot_gpa(snapshot_rpc_http_url.clone(), program_id.to_string())).fuse();
Expand Down Expand Up @@ -277,8 +322,12 @@ async fn feed_data_geyser(
}
sender.send(Message::GrpcUpdate(update)).await.expect("send success");
},
snapshot = &mut snapshot_gma => {
let snapshot = snapshot??;
snapshot_message = snapshot_gma_receiver.recv() => {
let Some(snapshot_result) = snapshot_message
else {
anyhow::bail!("snapshot channel closed");
};
let snapshot = snapshot_result?;
info!("snapshot is for slot {}, first full slot was {}", snapshot.slot, first_full_slot);
if snapshot.slot >= first_full_slot {
sender.send(Message::Snapshot(SnapshotData {
Expand Down
4 changes: 2 additions & 2 deletions connector/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ pub async fn get_snapshot_gpa(
}

pub async fn get_snapshot_gma(
rpc_http_url: String,
rpc_http_url: &str,
ids: Vec<String>,
) -> anyhow::Result<SnapshotMultipleAccounts> {
let rpc_client = http::connect::<AccountsDataClient>(&rpc_http_url)
let rpc_client = http::connect::<AccountsDataClient>(rpc_http_url)
.await
.map_err_anyhow()?;

Expand Down
3 changes: 1 addition & 2 deletions connector/src/websocket_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ async fn feed_data_by_accounts(
// including the first time
if last_snapshot + SNAPSHOT_REFRESH_INTERVAL <= Instant::now() {
let snapshot_rpc_http_url = config.snapshot.rpc_http_url.clone();
let response =
get_snapshot_gma(snapshot_rpc_http_url.clone(), account_ids.clone()).await;
let response = get_snapshot_gma(&snapshot_rpc_http_url, account_ids.clone()).await;
let snapshot = response.context("gma snapshot response").map_err_anyhow();
match snapshot {
Ok(SnapshotMultipleAccounts {
Expand Down

0 comments on commit 8e465c4

Please sign in to comment.