From 6200fec38750f029b750a43eb1fddff878193d1f Mon Sep 17 00:00:00 2001 From: Kerwin <37063904+zhuangchong@users.noreply.github.com> Date: Tue, 12 Dec 2023 09:59:11 +0800 Subject: [PATCH] [cdc] kafka adds sasl and other parameters to create consumers (#2484) --- .../paimon/flink/action/cdc/kafka/KafkaActionUtils.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java index 8b98c8c0695f..c9ad0e6155e9 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java @@ -241,6 +241,15 @@ public static DataFormat getDataFormat(Configuration kafkaConfig) { public static MessageQueueSchemaUtils.ConsumerWrapper getKafkaEarliestConsumer( Configuration kafkaConfig) { Properties props = new Properties(); + + for (Map.Entry entry : kafkaConfig.toMap().entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + if (key.startsWith(PROPERTIES_PREFIX)) { + props.put(key.substring(PROPERTIES_PREFIX.length()), value); + } + } + props.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.get(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS));