diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java index c9ad0e6155e9..99628907842f 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java @@ -76,14 +76,7 @@ public static KafkaSource buildKafkaSource(Configuration kafkaConfig) { .setTopics(topics) .setValueOnlyDeserializer(new SimpleStringSchema()) .setGroupId(kafkaPropertiesGroupId(kafkaConfig)); - Properties properties = new Properties(); - for (Map.Entry entry : kafkaConfig.toMap().entrySet()) { - String key = entry.getKey(); - String value = entry.getValue(); - if (key.startsWith(PROPERTIES_PREFIX)) { - properties.put(key.substring(PROPERTIES_PREFIX.length()), value); - } - } + Properties properties = createKafkaProperties(kafkaConfig); StartupMode startupMode = fromOption(kafkaConfig.get(KafkaConnectorOptions.SCAN_STARTUP_MODE)); @@ -240,15 +233,7 @@ public static DataFormat getDataFormat(Configuration kafkaConfig) { public static MessageQueueSchemaUtils.ConsumerWrapper getKafkaEarliestConsumer( Configuration kafkaConfig) { - Properties props = new Properties(); - - for (Map.Entry entry : kafkaConfig.toMap().entrySet()) { - String key = entry.getKey(); - String value = entry.getValue(); - if (key.startsWith(PROPERTIES_PREFIX)) { - props.put(key.substring(PROPERTIES_PREFIX.length()), value); - } - } + Properties props = createKafkaProperties(kafkaConfig); props.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, @@ -282,6 +267,18 @@ public static MessageQueueSchemaUtils.ConsumerWrapper getKafkaEarliestConsumer( return new KafkaConsumerWrapper(consumer, topic); } + private static Properties createKafkaProperties(Configuration kafkaConfig) { + Properties props = new Properties(); + for (Map.Entry entry : kafkaConfig.toMap().entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + if (key.startsWith(PROPERTIES_PREFIX)) { + props.put(key.substring(PROPERTIES_PREFIX.length()), value); + } + } + return props; + } + private static class KafkaConsumerWrapper implements MessageQueueSchemaUtils.ConsumerWrapper { private final KafkaConsumer consumer;