diff --git a/relay-kafka/src/producer/mod.rs b/relay-kafka/src/producer/mod.rs index c4c137faa6..b08131f494 100644 --- a/relay-kafka/src/producer/mod.rs +++ b/relay-kafka/src/producer/mod.rs @@ -186,6 +186,23 @@ impl KafkaClient { })?; producer.send(key, headers, variant, payload) } + + /// Flush all messages. + pub fn flush(&self, timeout: Duration) { + let start = Instant::now(); + for (topic, producer) in &self.producers { + if let Err(e) = producer + .producer + .flush(timeout.saturating_sub(start.elapsed())) + { + relay_log::error!( + error = &e as &dyn std::error::Error, + tags.topic = ?topic, + "error while flushing kafka topic" + ); + } + } + } } /// Helper structure responsible for building the actual [`KafkaClient`]. diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 1af723ed68..2c03d76d10 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -6,7 +6,7 @@ use std::borrow::Cow; use std::collections::BTreeMap; use std::error::Error; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use bytes::Bytes; use relay_base_schema::data_category::DataCategory; @@ -22,7 +22,7 @@ use relay_metrics::{ }; use relay_quotas::Scoping; use relay_statsd::metric; -use relay_system::{Addr, FromMessage, Interface, NoResponse, Service}; +use relay_system::{Addr, Controller, FromMessage, Interface, NoResponse, Service, Shutdown}; use serde::{Deserialize, Serialize}; use serde_json::value::RawValue; use serde_json::Deserializer; @@ -164,6 +164,10 @@ impl StoreService { }) } + fn flush(&self, timeout: Duration) { + self.producer.client.flush(timeout); + } + fn handle_store_envelope(&self, message: StoreEnvelope) { let StoreEnvelope { envelope: mut managed, @@ -1047,14 +1051,27 @@ impl Service for StoreService { fn spawn_handler(self, mut rx: relay_system::Receiver) { let this = Arc::new(self); + let mut shutdown = Controller::shutdown_handle(); + tokio::spawn(async move { relay_log::info!("store forwarder started"); - while let Some(message) = rx.recv().await { - let service = Arc::clone(&this); - this.workers - .spawn(move || service.handle_message(message)) - .await; + loop { + tokio::select! { + Some(message) = rx.recv() => { + let service = Arc::clone(&this); + this.workers + .spawn(move || service.handle_message(message)) + .await; + }, + Shutdown{ timeout: Some(timeout) } = shutdown.notified() => { + let service = Arc::clone(&this); + this.workers.spawn(move || { + service.flush(timeout); + }).await; + }, + else => break, + } } relay_log::info!("store forwarder stopped");