Skip to content

Commit

Permalink
[flink] Fix kafka CDC Ingestion multi-topic exception (#2030)
Browse files Browse the repository at this point in the history
  • Loading branch information
MonsterChenzhuo authored Sep 18, 2023
1 parent a280ec6 commit 119ec42
Showing 1 changed file with 7 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,14 @@ static Schema buildPaimonSchema(
static KafkaSource<String> buildKafkaSource(Configuration kafkaConfig) {
validateKafkaConfig(kafkaConfig);
KafkaSourceBuilder<String> kafkaSourceBuilder = KafkaSource.builder();

List<String> topics =
kafkaConfig.get(KafkaConnectorOptions.TOPIC).stream()
.flatMap(topic -> Arrays.stream(topic.split(",")))
.collect(Collectors.toList());

kafkaSourceBuilder
.setTopics(kafkaConfig.get(KafkaConnectorOptions.TOPIC))
.setTopics(topics)
.setValueOnlyDeserializer(new SimpleStringSchema())
.setGroupId(kafkaPropertiesGroupId(kafkaConfig));
Properties properties = new Properties();
Expand Down

0 comments on commit 119ec42

Please sign in to comment.