Skip to content

Commit

Permalink
[cdc] kafka adds sasl and other parameters to create consumers (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuangchong authored Dec 12, 2023
1 parent 117bcb0 commit 6200fec
Showing 1 changed file with 9 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,15 @@ public static DataFormat getDataFormat(Configuration kafkaConfig) {
public static MessageQueueSchemaUtils.ConsumerWrapper getKafkaEarliestConsumer(
Configuration kafkaConfig) {
Properties props = new Properties();

for (Map.Entry<String, String> entry : kafkaConfig.toMap().entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
if (key.startsWith(PROPERTIES_PREFIX)) {
props.put(key.substring(PROPERTIES_PREFIX.length()), value);
}
}

props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaConfig.get(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS));
Expand Down

0 comments on commit 6200fec

Please sign in to comment.