From 119ec42940cf6cac2dc48fe9e43450a03f4b8a45 Mon Sep 17 00:00:00 2001 From: monster <60029759+MonsterChenzhuo@users.noreply.github.com> Date: Mon, 18 Sep 2023 18:17:09 +0800 Subject: [PATCH] [flink] Fix kafka CDC Ingestion multi-topic exception (#2030) --- .../paimon/flink/action/cdc/kafka/KafkaActionUtils.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java index 02a3b0640a60..c249db8eecd0 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java @@ -89,8 +89,14 @@ static Schema buildPaimonSchema( static KafkaSource buildKafkaSource(Configuration kafkaConfig) { validateKafkaConfig(kafkaConfig); KafkaSourceBuilder kafkaSourceBuilder = KafkaSource.builder(); + + List topics = + kafkaConfig.get(KafkaConnectorOptions.TOPIC).stream() + .flatMap(topic -> Arrays.stream(topic.split(","))) + .collect(Collectors.toList()); + kafkaSourceBuilder - .setTopics(kafkaConfig.get(KafkaConnectorOptions.TOPIC)) + .setTopics(topics) .setValueOnlyDeserializer(new SimpleStringSchema()) .setGroupId(kafkaPropertiesGroupId(kafkaConfig)); Properties properties = new Properties();