Skip to content

Commit

Permalink
[cdc] Extract method in KafkaActionUtils
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Dec 12, 2023
1 parent 6200fec commit 49a51d3
Showing 1 changed file with 14 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,7 @@ public static KafkaSource<String> buildKafkaSource(Configuration kafkaConfig) {
.setTopics(topics)
.setValueOnlyDeserializer(new SimpleStringSchema())
.setGroupId(kafkaPropertiesGroupId(kafkaConfig));
Properties properties = new Properties();
for (Map.Entry<String, String> entry : kafkaConfig.toMap().entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
if (key.startsWith(PROPERTIES_PREFIX)) {
properties.put(key.substring(PROPERTIES_PREFIX.length()), value);
}
}
Properties properties = createKafkaProperties(kafkaConfig);

StartupMode startupMode =
fromOption(kafkaConfig.get(KafkaConnectorOptions.SCAN_STARTUP_MODE));
Expand Down Expand Up @@ -240,15 +233,7 @@ public static DataFormat getDataFormat(Configuration kafkaConfig) {

public static MessageQueueSchemaUtils.ConsumerWrapper getKafkaEarliestConsumer(
Configuration kafkaConfig) {
Properties props = new Properties();

for (Map.Entry<String, String> entry : kafkaConfig.toMap().entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
if (key.startsWith(PROPERTIES_PREFIX)) {
props.put(key.substring(PROPERTIES_PREFIX.length()), value);
}
}
Properties props = createKafkaProperties(kafkaConfig);

props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
Expand Down Expand Up @@ -282,6 +267,18 @@ public static MessageQueueSchemaUtils.ConsumerWrapper getKafkaEarliestConsumer(
return new KafkaConsumerWrapper(consumer, topic);
}

private static Properties createKafkaProperties(Configuration kafkaConfig) {
Properties props = new Properties();
for (Map.Entry<String, String> entry : kafkaConfig.toMap().entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
if (key.startsWith(PROPERTIES_PREFIX)) {
props.put(key.substring(PROPERTIES_PREFIX.length()), value);
}
}
return props;
}

private static class KafkaConsumerWrapper implements MessageQueueSchemaUtils.ConsumerWrapper {

private final KafkaConsumer<String, String> consumer;
Expand Down

0 comments on commit 49a51d3

Please sign in to comment.