From f38f0d703d949c590da07c54e880962284ca15e7 Mon Sep 17 00:00:00 2001 From: rkrishn7 Date: Tue, 5 Mar 2024 16:38:29 -0800 Subject: [PATCH 1/5] feat: allow specifying id for kafka sources --- src/kiwi/src/config.rs | 58 +++++++++++++++++++++++++--------- src/kiwi/src/connection.rs | 4 +++ src/kiwi/src/source/counter.rs | 4 +++ src/kiwi/src/source/kafka.rs | 32 ++++++++++++------- src/kiwi/src/source/mod.rs | 2 ++ 5 files changed, 74 insertions(+), 26 deletions(-) diff --git a/src/kiwi/src/config.rs b/src/kiwi/src/config.rs index 24c4a26..a0f59d9 100644 --- a/src/kiwi/src/config.rs +++ b/src/kiwi/src/config.rs @@ -36,10 +36,11 @@ pub struct Config { #[serde(rename_all = "lowercase")] pub enum SourceType { Kafka { + id: Option, topic: String, }, Counter { - id: String, + id: SourceId, min: u64, #[serde(default)] max: Option, @@ -49,6 +50,15 @@ pub enum SourceType { }, } +impl SourceType { + pub fn id(&self) -> &SourceId { + match self { + SourceType::Kafka { id, topic } => id.as_ref().unwrap_or_else(|| topic), + SourceType::Counter { id, .. } => id, + } + } +} + #[derive(Debug, Clone, Deserialize)] pub struct Kafka { #[serde(default = "Kafka::default_group_prefix")] @@ -315,19 +325,16 @@ impl let mut seen = HashSet::new(); for typ in config.sources.iter() { - let incoming = match typ { - SourceType::Kafka { topic } => topic, - SourceType::Counter { id, .. } => id, - }; + let id_incoming = typ.id(); - if !seen.insert(incoming) { + if !seen.insert(id_incoming) { return Err(anyhow::anyhow!( "Found duplicate source ID in configuration: {}", - incoming + id_incoming )); } - match sources.entry(incoming.clone()) { + match sources.entry(id_incoming.clone()) { std::collections::btree_map::Entry::Occupied(_) => { // Source already exists continue; @@ -335,9 +342,10 @@ impl std::collections::btree_map::Entry::Vacant(entry) => { // Build and add source let source = match typ { - SourceType::Kafka { topic } => { + SourceType::Kafka { topic, .. } => { if let Some(kafka_config) = config.kafka.as_ref() { ::build_source( + typ.id().clone(), topic.clone(), &kafka_config.bootstrap_servers, &kafka_config.group_id_prefix, @@ -370,10 +378,7 @@ impl } sources.retain(|id, _| { - if !config.sources.iter().any(|typ| match typ { - SourceType::Kafka { topic } => topic == id, - SourceType::Counter { id: incoming, .. } => incoming == id, - }) { + if !config.sources.iter().any(|typ| typ.id() == id) { tracing::info!("Removing source due to configuration change: {}", id); false } else { @@ -458,8 +463,9 @@ mod tests { assert!(config.sources.len() == 2); assert!( - matches!(config.sources[0].clone(), SourceType::Kafka { topic } if topic == "test") + matches!(config.sources[0].clone(), SourceType::Kafka { topic, .. } if topic == "test") ); + assert_eq!(config.sources[0].id(), "test"); assert!(matches!( config.sources[1].clone(), @@ -496,6 +502,18 @@ mod tests { max, } if id == "test" && min == 0 && interval_ms == 100 && lazy && max == Some(100) )); + + let config = " + sources: + - type: kafka + topic: topic1 + id: test + server: + address: '127.0.0.1:8000' + "; + + let config = Config::from_str(config).unwrap(); + assert_eq!(config.sources[0].id(), "test"); } #[test] @@ -554,6 +572,10 @@ mod tests { ) -> &Option> { &None } + + fn as_any(&self) -> &dyn std::any::Any { + self + } } struct TestWasmHook; @@ -591,7 +613,8 @@ mod tests { impl KafkaSourceBuilder for TestSourceBuilder { fn build_source( - topic: SourceId, + _id: SourceId, + topic: String, _bootstrap_servers: &[String], _group_id_prefix: &str, ) -> Result, anyhow::Error> { @@ -619,6 +642,7 @@ mod tests { }, SourceType::Kafka { topic: "test".into(), + id: None, }, ], hooks: None, @@ -644,6 +668,7 @@ mod tests { let config = Config { sources: vec![SourceType::Kafka { topic: "test".into(), + id: None, }], hooks: None, server: Server { @@ -658,6 +683,7 @@ mod tests { let config = Config { sources: vec![SourceType::Kafka { topic: "test".into(), + id: None, }], hooks: None, server: Server { @@ -726,6 +752,7 @@ mod tests { sources.lock().unwrap().insert( "topic1".into(), ::build_source( + "topic1".into(), "topic1".into(), &["localhost:9092".into()], "kiwi-", @@ -774,6 +801,7 @@ mod tests { sources.lock().unwrap().insert( "topic1".into(), ::build_source( + "topic1".into(), "topic1".into(), &["localhost:9092".into()], "kiwi-", diff --git a/src/kiwi/src/connection.rs b/src/kiwi/src/connection.rs index 8a14b01..1f8bcce 100644 --- a/src/kiwi/src/connection.rs +++ b/src/kiwi/src/connection.rs @@ -357,6 +357,10 @@ mod tests { fn metadata_tx(&self) -> &Option> { &None } + + fn as_any(&self) -> &dyn std::any::Any { + self + } } #[derive(Debug, Clone)] diff --git a/src/kiwi/src/source/counter.rs b/src/kiwi/src/source/counter.rs index f81cd65..4ad40c8 100644 --- a/src/kiwi/src/source/counter.rs +++ b/src/kiwi/src/source/counter.rs @@ -97,6 +97,10 @@ impl Source for CounterSource { fn metadata_tx(&self) -> &Option> { &None } + + fn as_any(&self) -> &dyn std::any::Any { + self + } } pub struct CounterTask { diff --git a/src/kiwi/src/source/kafka.rs b/src/kiwi/src/source/kafka.rs index 4949d81..2bd7ed5 100644 --- a/src/kiwi/src/source/kafka.rs +++ b/src/kiwi/src/source/kafka.rs @@ -119,6 +119,7 @@ impl PartitionConsumer { type ShutdownTrigger = oneshot::Sender<()>; pub struct KafkaTopicSource { + id: SourceId, topic: String, // Map of partition ID -> shutdown trigger _partition_consumers: Arc>>, @@ -132,16 +133,21 @@ impl Source for KafkaTopicSource { } fn source_id(&self) -> &SourceId { - &self.topic + &self.id } fn metadata_tx(&self) -> &Option> { &self.metadata_tx } + + fn as_any(&self) -> &dyn std::any::Any { + self + } } impl KafkaTopicSource { pub fn new( + id: SourceId, topic: String, bootstrap_servers: &[String], group_id_prefix: &str, @@ -197,6 +203,7 @@ impl KafkaTopicSource { let weak_tasks = Arc::downgrade(&consumer_tasks); let result = Self { + id, topic: topic.clone(), _partition_consumers: consumer_tasks, tx: tx.clone(), @@ -332,7 +339,7 @@ fn fetch_partition_metadata( pub fn start_partition_discovery( bootstrap_servers: &[String], - topic_sources: Arc>>>, + sources: Arc>>>, poll_interval: Duration, ) -> anyhow::Result<()> { let client = create_metadata_client(bootstrap_servers)?; @@ -340,21 +347,22 @@ pub fn start_partition_discovery( std::thread::spawn(move || loop { std::thread::sleep(poll_interval); - let topics = topic_sources + let kafka_sources = sources .lock() .expect("poisoned lock") - .keys() - .cloned() + .iter() + .filter_map(|(_, source)| { + source + .as_any() + .downcast_ref::() + .map(|source| (source.id.clone(), source.topic.clone())) + }) .collect::>(); - for topic in topics.iter() { + for (id, topic) in kafka_sources.iter() { match fetch_partition_metadata(topic.as_str(), &client) { Ok(metadata) => { - if let Some(source) = topic_sources - .lock() - .expect("poisoned lock") - .get(topic.as_str()) - { + if let Some(source) = sources.lock().expect("poisoned lock").get(id) { for partition_metadata in metadata { let metadata_tx = source.metadata_tx(); @@ -379,11 +387,13 @@ pub fn start_partition_discovery( pub trait KafkaSourceBuilder { fn build_source( + id: SourceId, topic: String, bootstrap_servers: &[String], group_id_prefix: &str, ) -> anyhow::Result> { Ok(Box::new(KafkaTopicSource::new( + id, topic, bootstrap_servers, group_id_prefix, diff --git a/src/kiwi/src/source/mod.rs b/src/kiwi/src/source/mod.rs index 7ddd782..bd9205d 100644 --- a/src/kiwi/src/source/mod.rs +++ b/src/kiwi/src/source/mod.rs @@ -37,6 +37,8 @@ pub trait Source { fn source_id(&self) -> &SourceId; fn metadata_tx(&self) -> &Option>; + + fn as_any(&self) -> &dyn std::any::Any; } pub type SourceId = String; From 8a4890c7037a2605e950d8c785cf8bada8508812 Mon Sep 17 00:00:00 2001 From: rkrishn7 Date: Tue, 5 Mar 2024 16:40:54 -0800 Subject: [PATCH 2/5] update configuration doc --- doc/CONFIGURATION.md | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/doc/CONFIGURATION.md b/doc/CONFIGURATION.md index 134d877..e261d5f 100644 --- a/doc/CONFIGURATION.md +++ b/doc/CONFIGURATION.md @@ -33,18 +33,25 @@ hooks: sources: - type: kafka + # The source ID for this counter source. The source ID is used as a unique identifier, thus must be + # distinct from other source IDs, regardless of type. + # + ## Optional (defaults to `topic`) + id: my-kafka-source + # The topic name for this Kafka source. The source ID defaults to the topic name. # ## Required topic: 'my-topic' - type: counter + # The source ID for this counter source. The source ID is used as a unique identifier, thus must be # distinct from other source IDs, regardless of type. # ## Required - id: counter1 + # The interval at which the counter source emits events # ## Required From d806fe175f2e3ce461dd080fe91a0c5dd053677d Mon Sep 17 00:00:00 2001 From: rkrishn7 Date: Tue, 5 Mar 2024 16:48:05 -0800 Subject: [PATCH 3/5] fix clippy warnings --- src/kiwi/src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/kiwi/src/config.rs b/src/kiwi/src/config.rs index a0f59d9..d72d51d 100644 --- a/src/kiwi/src/config.rs +++ b/src/kiwi/src/config.rs @@ -53,7 +53,7 @@ pub enum SourceType { impl SourceType { pub fn id(&self) -> &SourceId { match self { - SourceType::Kafka { id, topic } => id.as_ref().unwrap_or_else(|| topic), + SourceType::Kafka { id, topic } => id.as_ref().unwrap_or(topic), SourceType::Counter { id, .. } => id, } } From bc1c8f1e551ee3291b0df6ac20625142643c60bb Mon Sep 17 00:00:00 2001 From: rkrishn7 Date: Tue, 5 Mar 2024 17:50:28 -0800 Subject: [PATCH 4/5] include source id in kafka source result --- src/kiwi/src/connection.rs | 1 + src/kiwi/src/protocol.rs | 2 +- src/kiwi/src/source/kafka.rs | 41 ++++++++++++++++-------------------- src/kiwi/src/subscription.rs | 14 ++++++++++++ 4 files changed, 34 insertions(+), 24 deletions(-) diff --git a/src/kiwi/src/connection.rs b/src/kiwi/src/connection.rs index 1f8bcce..1a6e101 100644 --- a/src/kiwi/src/connection.rs +++ b/src/kiwi/src/connection.rs @@ -379,6 +379,7 @@ mod tests { fn test_kafka_source_result() -> SourceResult { SourceResult::Kafka(crate::source::kafka::KafkaSourceResult { + id: "test".to_string(), key: None, payload: None, topic: "test".to_string(), diff --git a/src/kiwi/src/protocol.rs b/src/kiwi/src/protocol.rs index 6d832ed..a878c06 100644 --- a/src/kiwi/src/protocol.rs +++ b/src/kiwi/src/protocol.rs @@ -145,7 +145,7 @@ impl From for SourceResult { source::SourceResult::Kafka(kafka) => Self::Kafka { key: kafka.key, payload: kafka.payload, - source_id: kafka.topic, + source_id: kafka.id, partition: kafka.partition, offset: kafka.offset, timestamp: kafka.timestamp, diff --git a/src/kiwi/src/source/kafka.rs b/src/kiwi/src/source/kafka.rs index 2bd7ed5..5e0c668 100644 --- a/src/kiwi/src/source/kafka.rs +++ b/src/kiwi/src/source/kafka.rs @@ -9,7 +9,6 @@ use maplit::btreemap; use rdkafka::client::{Client, DefaultClientContext}; use rdkafka::{ consumer::{Consumer, StreamConsumer}, - message::OwnedMessage, ClientConfig, }; use rdkafka::{Message, TopicPartitionList}; @@ -24,11 +23,13 @@ use super::{Source, SourceId, SourceMessage, SourceMetadata, SourceResult, Subsc #[derive(Debug, Clone, PartialEq, Eq)] pub struct KafkaSourceResult { + /// Source ID + pub id: SourceId, /// Event key pub key: Option>, /// Event payload pub payload: Option>, - /// Source ID this event was produced from + /// Topic this event was produced from pub topic: String, /// Timestamp at which the message was produced pub timestamp: Option, @@ -44,6 +45,7 @@ pub struct KafkaSourceMetadata { } pub struct PartitionConsumer { + source_id: SourceId, consumer: StreamConsumer, shutdown_rx: Fuse>, tx: Sender, @@ -51,6 +53,7 @@ pub struct PartitionConsumer { impl PartitionConsumer { pub fn new<'a>( + source_id: SourceId, topic: &'a str, partition: i32, offset: rdkafka::Offset, @@ -77,6 +80,7 @@ impl PartitionConsumer { ))?; Ok(Self { + source_id, consumer, shutdown_rx, tx, @@ -104,7 +108,15 @@ impl PartitionConsumer { // An error here does not mean future calls will fail, since new subscribers // may be created. If there are no subscribers, we simply discard the message // and move on - let _ = self.tx.send(SourceMessage::Result(owned_message.into())); + let _ = self.tx.send(SourceMessage::Result(SourceResult::Kafka(KafkaSourceResult { + id: self.source_id.clone(), + key: owned_message.key().map(|k| k.to_owned()), + payload: owned_message.payload().map(|p| p.to_owned()), + topic: owned_message.topic().to_string(), + timestamp: owned_message.timestamp().to_millis(), + partition: owned_message.partition(), + offset: owned_message.offset(), + }))); } }; }, @@ -180,6 +192,7 @@ impl KafkaTopicSource { let (shutdown_trigger, shutdown_rx) = oneshot::channel::<()>(); let partition_consumer = PartitionConsumer::new( + id.clone(), topic.as_str(), partition_metadata.partition, rdkafka::Offset::Offset(partition_metadata.hi_watermark), @@ -203,7 +216,7 @@ impl KafkaTopicSource { let weak_tasks = Arc::downgrade(&consumer_tasks); let result = Self { - id, + id: id.clone(), topic: topic.clone(), _partition_consumers: consumer_tasks, tx: tx.clone(), @@ -231,6 +244,7 @@ impl KafkaTopicSource { oneshot::channel::<()>(); match PartitionConsumer::new( + id.clone(), topic.as_str(), partition, rdkafka::Offset::Offset(hi_watermark), @@ -401,25 +415,6 @@ pub trait KafkaSourceBuilder { } } -impl From for SourceResult { - fn from(value: OwnedMessage) -> Self { - Self::Kafka(value.into()) - } -} - -impl From for KafkaSourceResult { - fn from(value: OwnedMessage) -> Self { - Self { - key: value.key().map(|k| k.to_owned()), - payload: value.payload().map(|p| p.to_owned()), - topic: value.topic().to_string(), - timestamp: value.timestamp().to_millis(), - partition: value.partition(), - offset: value.offset(), - } - } -} - impl From for hook::intercept::types::KafkaEventCtx { fn from(value: KafkaSourceResult) -> Self { Self { diff --git a/src/kiwi/src/subscription.rs b/src/kiwi/src/subscription.rs index dbdf2ea..040455c 100644 --- a/src/kiwi/src/subscription.rs +++ b/src/kiwi/src/subscription.rs @@ -190,6 +190,7 @@ mod tests { let mut stream = subscription.source_stream(); let message = SourceMessage::Result(SourceResult::Kafka(KafkaSourceResult { + id: "test".into(), partition: 0, offset: 0, topic: "test".into(), @@ -233,6 +234,7 @@ mod tests { for _ in 0..2 { let message = SourceMessage::Result(SourceResult::Kafka(KafkaSourceResult { + id: "test".into(), partition: 0, offset: 0, topic: "test".into(), @@ -260,6 +262,7 @@ mod tests { for _ in 0..2 { let message = SourceMessage::Result(SourceResult::Kafka(KafkaSourceResult { + id: "test".into(), partition: 0, offset: 0, topic: "test".into(), @@ -325,6 +328,7 @@ mod tests { for _ in 0..5 { let message = SourceMessage::Result(SourceResult::Kafka(KafkaSourceResult { + id: "test".into(), partition: 0, offset: 0, topic: "test".into(), @@ -354,6 +358,7 @@ mod tests { for _ in 0..5 { let message = SourceMessage::Result(SourceResult::Kafka(KafkaSourceResult { + id: "test".into(), partition: 0, offset: 0, topic: "test".into(), @@ -380,6 +385,7 @@ mod tests { // Pass another message through the stream to trigger a subsequent poll let message = SourceMessage::Result(SourceResult::Kafka(KafkaSourceResult { + id: "test".into(), partition: 0, offset: -1, topic: "test".into(), @@ -414,6 +420,7 @@ mod tests { for _ in 0..5 { let message = SourceMessage::Result(SourceResult::Kafka(KafkaSourceResult { + id: "test".into(), partition: 0, offset: 0, topic: "test".into(), @@ -440,6 +447,7 @@ mod tests { // Pass another message through the stream to trigger a subsequent poll let message = SourceMessage::Result(SourceResult::Kafka(KafkaSourceResult { + id: "test".into(), partition: 0, offset: -1, topic: "test".into(), @@ -488,6 +496,7 @@ mod tests { for _ in 0..5 { let message = SourceMessage::Result(SourceResult::Kafka(KafkaSourceResult { + id: "test".into(), partition: 0, offset: 0, topic: "test".into(), @@ -516,6 +525,7 @@ mod tests { pull.add_requests(1); let message = SourceMessage::Result(SourceResult::Kafka(KafkaSourceResult { + id: "test".into(), partition: 0, offset: 0, topic: "test".into(), @@ -552,6 +562,7 @@ mod tests { for _ in 0..3 { let message = SourceMessage::Result(SourceResult::Kafka(KafkaSourceResult { + id: "test".into(), partition: 0, offset: 0, topic: "test".into(), @@ -578,6 +589,7 @@ mod tests { // Pass another message through the stream to trigger a subsequent poll let message = SourceMessage::Result(SourceResult::Kafka(KafkaSourceResult { + id: "test".into(), partition: 0, offset: -1, topic: "test".into(), @@ -618,6 +630,7 @@ mod tests { for _ in 0..6 { let message = SourceMessage::Result(SourceResult::Kafka(KafkaSourceResult { + id: "test".into(), partition: 0, offset: 0, topic: "test".into(), @@ -635,6 +648,7 @@ mod tests { )); let message = SourceMessage::Result(SourceResult::Kafka(KafkaSourceResult { + id: "test".into(), partition: 0, offset: 0, topic: "test".into(), From 847b8afae29fe1aae4eb198026d17b04e0de7568 Mon Sep 17 00:00:00 2001 From: rkrishn7 Date: Tue, 5 Mar 2024 17:51:06 -0800 Subject: [PATCH 5/5] add test --- .circleci/config.yml | 1 + src/kiwi/tests/kafka.rs | 109 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 109 insertions(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 776dc11..3d38202 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -50,3 +50,4 @@ workflows: test: - "test_kafka_source" - "test_closes_subscription_on_changed_metadata" + - "test_prioritizes_source_id" diff --git a/src/kiwi/tests/kafka.rs b/src/kiwi/tests/kafka.rs index 6a0f08f..40ebebb 100644 --- a/src/kiwi/tests/kafka.rs +++ b/src/kiwi/tests/kafka.rs @@ -177,7 +177,7 @@ server: let admin_client = create_admin_client(); create_topic("topic1", &admin_client).await; - let mut proc = start_kiwi(config.path().to_str().unwrap())?; + let mut proc: process::Child = start_kiwi(config.path().to_str().unwrap())?; tokio::time::sleep(Duration::from_secs(2)).await; @@ -242,3 +242,110 @@ server: Ok(()) } + +#[tokio::test] +async fn test_prioritizes_source_id() -> anyhow::Result<()> { + let config = create_temp_config( + r#" +sources: + - type: kafka + id: my-kafka-source + topic: topic1 + +kafka: + partition_discovery_enabled: false + bootstrap_servers: + - 'kafka:19092' +server: + address: '127.0.0.1:8000' + "#, + ) + .unwrap(); + + let admin_client = create_admin_client(); + create_topic("topic1", &admin_client).await; + + let mut proc: process::Child = start_kiwi(config.path().to_str().unwrap())?; + + tokio::time::sleep(Duration::from_secs(2)).await; + + let (ws_stream, _) = connect_async("ws://127.0.0.1:8000") + .await + .expect("Failed to connect"); + + let (mut write, mut read) = ws_stream.split(); + + let cmd = Command::Subscribe { + source_id: "topic1".into(), + mode: SubscriptionMode::Push, + }; + + write + .send(tungstenite::protocol::Message::Text( + serde_json::to_string(&cmd).unwrap(), + )) + .await?; + + let resp = read.next().await.expect("Expected response")?; + + let resp: Message = serde_json::from_str(&resp.to_text().unwrap())?; + + match resp { + Message::CommandResponse(CommandResponse::SubscribeError { source_id, .. }) => { + assert_eq!(source_id, "topic1".to_string()); + } + _ => panic!("Expected subscribe error"), + } + + let cmd = Command::Subscribe { + source_id: "my-kafka-source".into(), + mode: SubscriptionMode::Push, + }; + + write + .send(tungstenite::protocol::Message::Text( + serde_json::to_string(&cmd).unwrap(), + )) + .await?; + + let resp = read.next().await.expect("Expected response")?; + + let resp: Message = serde_json::from_str(&resp.to_text().unwrap())?; + + match resp { + Message::CommandResponse(CommandResponse::SubscribeOk { source_id }) => { + assert_eq!(source_id, "my-kafka-source".to_string()); + } + _ => panic!("Expected subscribe ok"), + } + + let producer: FutureProducer = rdkafka::config::ClientConfig::new() + .set("bootstrap.servers", "kafka:19092") + .set("message.timeout.ms", "5000") + .create() + .expect("Producer creation error"); + + let record = FutureRecord::to("topic1").payload("test").key("test"); + + producer + .send(record, Duration::from_secs(0)) + .await + .expect("Failed to enqueue"); + + let msg = read.next().await.unwrap().unwrap(); + let msg: Message = serde_json::from_str(&msg.to_text().unwrap()).unwrap(); + + match msg { + Message::Result(msg) => match msg { + kiwi::protocol::SourceResult::Kafka { source_id, .. } => { + assert_eq!(source_id.as_ref(), "my-kafka-source".to_string()); + } + _ => panic!("Expected Kafka message. Received {:?}", msg), + }, + m => panic!("Expected message. Received {:?}", m), + } + + let _ = proc.kill(); + + Ok(()) +}