Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support source ids for kafka sources #91

Merged
merged 5 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,4 @@ workflows:
test:
- "test_kafka_source"
- "test_closes_subscription_on_changed_metadata"
- "test_prioritizes_source_id"
9 changes: 8 additions & 1 deletion doc/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,25 @@ hooks:
sources:
- type: kafka

# The source ID for this counter source. The source ID is used as a unique identifier, thus must be
# distinct from other source IDs, regardless of type.
#
## Optional (defaults to `topic`)
id: my-kafka-source

# The topic name for this Kafka source. The source ID defaults to the topic name.
#
## Required
topic: 'my-topic'

- type: counter

# The source ID for this counter source. The source ID is used as a unique identifier, thus must be
# distinct from other source IDs, regardless of type.
#
## Required

id: counter1

# The interval at which the counter source emits events
#
## Required
Expand Down
58 changes: 43 additions & 15 deletions src/kiwi/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@
#[serde(rename_all = "lowercase")]
pub enum SourceType {
Kafka {
id: Option<SourceId>,
topic: String,
},
Counter {
id: String,
id: SourceId,
min: u64,
#[serde(default)]
max: Option<u64>,
Expand All @@ -49,6 +50,15 @@
},
}

impl SourceType {
pub fn id(&self) -> &SourceId {
match self {
SourceType::Kafka { id, topic } => id.as_ref().unwrap_or(topic),
SourceType::Counter { id, .. } => id,
}
}
}

#[derive(Debug, Clone, Deserialize)]
pub struct Kafka {
#[serde(default = "Kafka::default_group_prefix")]
Expand Down Expand Up @@ -315,29 +325,27 @@
let mut seen = HashSet::new();

for typ in config.sources.iter() {
let incoming = match typ {
SourceType::Kafka { topic } => topic,
SourceType::Counter { id, .. } => id,
};
let id_incoming = typ.id();

if !seen.insert(incoming) {
if !seen.insert(id_incoming) {
return Err(anyhow::anyhow!(
"Found duplicate source ID in configuration: {}",
incoming
id_incoming
));
}

match sources.entry(incoming.clone()) {
match sources.entry(id_incoming.clone()) {
std::collections::btree_map::Entry::Occupied(_) => {
// Source already exists
continue;
}
std::collections::btree_map::Entry::Vacant(entry) => {
// Build and add source
let source = match typ {
SourceType::Kafka { topic } => {
SourceType::Kafka { topic, .. } => {
if let Some(kafka_config) = config.kafka.as_ref() {
<B as KafkaSourceBuilder>::build_source(
typ.id().clone(),
topic.clone(),
&kafka_config.bootstrap_servers,
&kafka_config.group_id_prefix,
Expand Down Expand Up @@ -370,10 +378,7 @@
}

sources.retain(|id, _| {
if !config.sources.iter().any(|typ| match typ {
SourceType::Kafka { topic } => topic == id,
SourceType::Counter { id: incoming, .. } => incoming == id,
}) {
if !config.sources.iter().any(|typ| typ.id() == id) {
tracing::info!("Removing source due to configuration change: {}", id);
false
} else {
Expand Down Expand Up @@ -458,8 +463,9 @@

assert!(config.sources.len() == 2);
assert!(
matches!(config.sources[0].clone(), SourceType::Kafka { topic } if topic == "test")
matches!(config.sources[0].clone(), SourceType::Kafka { topic, .. } if topic == "test")
);
assert_eq!(config.sources[0].id(), "test");

assert!(matches!(
config.sources[1].clone(),
Expand Down Expand Up @@ -496,6 +502,18 @@
max,
} if id == "test" && min == 0 && interval_ms == 100 && lazy && max == Some(100)
));

let config = "
sources:
- type: kafka
topic: topic1
id: test
server:
address: '127.0.0.1:8000'
";

let config = Config::from_str(config).unwrap();
assert_eq!(config.sources[0].id(), "test");
}

#[test]
Expand Down Expand Up @@ -554,6 +572,10 @@
) -> &Option<tokio::sync::mpsc::UnboundedSender<crate::source::SourceMetadata>> {
&None
}

fn as_any(&self) -> &dyn std::any::Any {
self
}

Check warning on line 578 in src/kiwi/src/config.rs

View check run for this annotation

Codecov / codecov/patch

src/kiwi/src/config.rs#L576-L578

Added lines #L576 - L578 were not covered by tests
}

struct TestWasmHook;
Expand Down Expand Up @@ -591,7 +613,8 @@

impl KafkaSourceBuilder for TestSourceBuilder {
fn build_source(
topic: SourceId,
_id: SourceId,
topic: String,
_bootstrap_servers: &[String],
_group_id_prefix: &str,
) -> Result<Box<dyn Source + Send + Sync>, anyhow::Error> {
Expand Down Expand Up @@ -619,6 +642,7 @@
},
SourceType::Kafka {
topic: "test".into(),
id: None,
},
],
hooks: None,
Expand All @@ -644,6 +668,7 @@
let config = Config {
sources: vec![SourceType::Kafka {
topic: "test".into(),
id: None,
}],
hooks: None,
server: Server {
Expand All @@ -658,6 +683,7 @@
let config = Config {
sources: vec![SourceType::Kafka {
topic: "test".into(),
id: None,
}],
hooks: None,
server: Server {
Expand Down Expand Up @@ -726,6 +752,7 @@
sources.lock().unwrap().insert(
"topic1".into(),
<TestSourceBuilder as KafkaSourceBuilder>::build_source(
"topic1".into(),
"topic1".into(),
&["localhost:9092".into()],
"kiwi-",
Expand Down Expand Up @@ -774,6 +801,7 @@
sources.lock().unwrap().insert(
"topic1".into(),
<TestSourceBuilder as KafkaSourceBuilder>::build_source(
"topic1".into(),
"topic1".into(),
&["localhost:9092".into()],
"kiwi-",
Expand Down
5 changes: 5 additions & 0 deletions src/kiwi/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,10 @@
fn metadata_tx(&self) -> &Option<tokio::sync::mpsc::UnboundedSender<SourceMetadata>> {
&None
}

fn as_any(&self) -> &dyn std::any::Any {
self
}

Check warning on line 363 in src/kiwi/src/connection.rs

View check run for this annotation

Codecov / codecov/patch

src/kiwi/src/connection.rs#L361-L363

Added lines #L361 - L363 were not covered by tests
}

#[derive(Debug, Clone)]
Expand All @@ -375,6 +379,7 @@

fn test_kafka_source_result() -> SourceResult {
SourceResult::Kafka(crate::source::kafka::KafkaSourceResult {
id: "test".to_string(),
key: None,
payload: None,
topic: "test".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion src/kiwi/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl From<source::SourceResult> for SourceResult {
source::SourceResult::Kafka(kafka) => Self::Kafka {
key: kafka.key,
payload: kafka.payload,
source_id: kafka.topic,
source_id: kafka.id,
partition: kafka.partition,
offset: kafka.offset,
timestamp: kafka.timestamp,
Expand Down
4 changes: 4 additions & 0 deletions src/kiwi/src/source/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@
fn metadata_tx(&self) -> &Option<tokio::sync::mpsc::UnboundedSender<SourceMetadata>> {
&None
}

fn as_any(&self) -> &dyn std::any::Any {
self
}

Check warning on line 103 in src/kiwi/src/source/counter.rs

View check run for this annotation

Codecov / codecov/patch

src/kiwi/src/source/counter.rs#L101-L103

Added lines #L101 - L103 were not covered by tests
}

pub struct CounterTask {
Expand Down
Loading
Loading