From f9295b0d9fbe877dc7ce08ec9dcc5c53a5c8598a Mon Sep 17 00:00:00 2001 From: Serge FARNY Date: Tue, 7 May 2024 09:19:41 +0200 Subject: [PATCH] websocket: add a metdata feed to indicate missing account, snapshot start and end --- .../examples/websocket_example_consumer.rs | 1 + connector/src/websocket_source.rs | 24 ++++++++++++++++--- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/connector/examples/websocket_example_consumer.rs b/connector/examples/websocket_example_consumer.rs index cde15b5..138427e 100644 --- a/connector/examples/websocket_example_consumer.rs +++ b/connector/examples/websocket_example_consumer.rs @@ -80,6 +80,7 @@ async fn main() -> anyhow::Result<()> { &config, &filter_config, account_write_queue_sender, + None, slot_queue_sender, ) .await; diff --git a/connector/src/websocket_source.rs b/connector/src/websocket_source.rs index 986df98..07f49f3 100644 --- a/connector/src/websocket_source.rs +++ b/connector/src/websocket_source.rs @@ -25,8 +25,8 @@ use crate::snapshot::{ get_snapshot_gma, get_snapshot_gpa, SnapshotMultipleAccounts, SnapshotProgramAccounts, }; use crate::{ - chain_data::SlotStatus, AccountWrite, AnyhowWrap, EntityFilter, FilterConfig, SlotUpdate, - SourceConfig, + chain_data::SlotStatus, AccountWrite, AnyhowWrap, EntityFilter, FeedMetadata, FilterConfig, + SlotUpdate, SourceConfig, }; const SNAPSHOT_REFRESH_INTERVAL: Duration = Duration::from_secs(300); @@ -294,6 +294,7 @@ pub async fn process_events( config: &SourceConfig, filter_config: &FilterConfig, account_write_queue_sender: async_channel::Sender, + metdata_write_queue_sender: Option>, slot_queue_sender: async_channel::Sender, ) { // Subscribe to program account updates websocket @@ -325,6 +326,13 @@ pub async fn process_events( // The thread that pulls updates and forwards them to postgres // + let metadata_sender = |msg| { + if let Some(sender) = &metdata_write_queue_sender { + sender.send_blocking(msg) + } else { + Ok(()) + } + }; // consume websocket updates from rust channels loop { let update = update_receiver.recv().await.unwrap(); @@ -342,9 +350,13 @@ pub async fn process_events( } WebsocketMessage::SnapshotUpdate((slot, accounts)) => { trace!("snapshot update {slot}"); + if let Err(e) = metadata_sender(FeedMetadata::SnapshotStart) { + warn!("failed to send feed matadata event: {}", e); + } + for (pubkey, account) in accounts { + let pubkey = Pubkey::from_str(&pubkey).unwrap(); if let Some(account) = account { - let pubkey = Pubkey::from_str(&pubkey).unwrap(); account_write_queue_sender .send(AccountWrite::from( pubkey, @@ -354,8 +366,14 @@ pub async fn process_events( )) .await .expect("send success"); + } else if let Err(e) = metadata_sender(FeedMetadata::InvalidAccount(pubkey)) { + warn!("failed to send feed matadata event: {}", e); } } + + if let Err(e) = metadata_sender(FeedMetadata::SnapshotEnd) { + warn!("failed to send feed matadata event: {}", e); + } } WebsocketMessage::SlotUpdate(update) => { trace!("slot update");