From 9e7ebd1d16b78cd3c08ccad50508e82a2b6035c8 Mon Sep 17 00:00:00 2001 From: Serge FARNY Date: Mon, 6 May 2024 21:09:55 +0200 Subject: [PATCH] grpc: add a metdata feed to indicate missing account, snapshot start and end --- .../examples/combined_example_consumer.rs | 1 + connector/examples/geyser_example_consumer.rs | 1 + connector/src/grpc_plugin_source.rs | 28 +++++++++++++++++-- connector/src/lib.rs | 6 ++++ 4 files changed, 34 insertions(+), 2 deletions(-) diff --git a/connector/examples/combined_example_consumer.rs b/connector/examples/combined_example_consumer.rs index ef390c7..c332eb0 100644 --- a/connector/examples/combined_example_consumer.rs +++ b/connector/examples/combined_example_consumer.rs @@ -98,6 +98,7 @@ async fn main() -> anyhow::Result<()> { &config, &filter_config, account_write_queue_sender, + None, slot_queue_sender, metrics_tx.clone(), exit.clone(), diff --git a/connector/examples/geyser_example_consumer.rs b/connector/examples/geyser_example_consumer.rs index ef390c7..c332eb0 100644 --- a/connector/examples/geyser_example_consumer.rs +++ b/connector/examples/geyser_example_consumer.rs @@ -98,6 +98,7 @@ async fn main() -> anyhow::Result<()> { &config, &filter_config, account_write_queue_sender, + None, slot_queue_sender, metrics_tx.clone(), exit.clone(), diff --git a/connector/src/grpc_plugin_source.rs b/connector/src/grpc_plugin_source.rs index 309558e..564e94b 100644 --- a/connector/src/grpc_plugin_source.rs +++ b/connector/src/grpc_plugin_source.rs @@ -27,7 +27,8 @@ use crate::snapshot::{get_snapshot_gma, get_snapshot_gpa}; use crate::{ chain_data::SlotStatus, metrics::{MetricType, Metrics}, - AccountWrite, GrpcSourceConfig, SlotUpdate, SnapshotSourceConfig, SourceConfig, TlsConfig, + AccountWrite, FeedMetadata, GrpcSourceConfig, SlotUpdate, SnapshotSourceConfig, SourceConfig, + TlsConfig, }; use crate::{EntityFilter, FilterConfig}; @@ -415,6 +416,7 @@ pub async fn process_events( config: &SourceConfig, filter_config: &FilterConfig, account_write_queue_sender: async_channel::Sender, + metdata_write_queue_sender: Option>, slot_queue_sender: async_channel::Sender, metrics_sender: Metrics, exit: Arc, @@ -504,6 +506,14 @@ pub async fn process_events( let mut metric_snapshot_account_writes = metrics_sender.register_u64("grpc_snapshot_account_writes".into(), MetricType::Counter); + let metadata_sender = |msg| { + if let Some(sender) = &metdata_write_queue_sender { + sender.send_blocking(msg) + } else { + Ok(()) + } + }; + loop { if exit.load(Ordering::Relaxed) { warn!("shutting down grpc_plugin_source..."); @@ -593,6 +603,10 @@ pub async fn process_events( Message::Snapshot(update) => { metric_snapshots.increment(); info!("processing snapshot..."); + if let Err(e) = metadata_sender(FeedMetadata::SnapshotStart) { + warn!("failed to send feed matadata event: {}", e); + } + for account in update.accounts.iter() { metric_snapshot_account_writes.increment(); metric_account_queue.set(account_write_queue_sender.len() as u64); @@ -607,10 +621,20 @@ pub async fn process_events( .await .expect("send success"); } - (key, None) => warn!("account not found {}", key), + (key, None) => { + warn!("account not found {}", key); + let pubkey = Pubkey::from_str(key).unwrap(); + if let Err(e) = metadata_sender(FeedMetadata::InvalidAccount(pubkey)) { + warn!("failed to send feed matadata event: {}", e); + } + } } } + info!("processing snapshot done"); + if let Err(e) = metadata_sender(FeedMetadata::SnapshotEnd) { + warn!("failed to send feed matadata event: {}", e); + } } } } diff --git a/connector/src/lib.rs b/connector/src/lib.rs index 62e2cde..1d3c1ce 100644 --- a/connector/src/lib.rs +++ b/connector/src/lib.rs @@ -28,6 +28,12 @@ impl AnyhowWrap for Result { } } +pub enum FeedMetadata { + InvalidAccount(Pubkey), + SnapshotStart, + SnapshotEnd, +} + #[derive(Clone, PartialEq, Debug)] pub struct AccountWrite { pub pubkey: Pubkey,