From 2d1a106475b1423208e7cf1efc14d4034e2a7e0d Mon Sep 17 00:00:00 2001 From: yuzelin <33053040+yuzelin@users.noreply.github.com> Date: Mon, 1 Apr 2024 08:55:04 +0800 Subject: [PATCH] [hotfix] Fix unstable Kafka CDC starup mode tests (#3125) --- .../action/cdc/kafka/KafkaActionITCaseBase.java | 12 +++++++++++- .../cdc/kafka/KafkaCanalSyncTableActionITCase.java | 4 ++-- .../action/cdc/kafka/KafkaSyncTableActionITCase.java | 2 +- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java index 9db87a556f96..002280079c6d 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java @@ -34,6 +34,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.junit.jupiter.api.AfterEach; @@ -63,6 +64,7 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; @@ -278,6 +280,10 @@ public static List readLines(String resource) throws IOException { } void writeRecordsToKafka(String topic, List lines) throws Exception { + writeRecordsToKafka(topic, lines, false); + } + + void writeRecordsToKafka(String topic, List lines, boolean wait) throws Exception { Properties producerProperties = getStandardProps(); producerProperties.setProperty("retries", "0"); producerProperties.put( @@ -289,7 +295,11 @@ void writeRecordsToKafka(String topic, List lines) throws Exception { try { JsonNode jsonNode = objectMapper.readTree(lines.get(i)); if (!StringUtils.isEmpty(lines.get(i))) { - kafkaProducer.send(new ProducerRecord<>(topic, lines.get(i))); + Future sendFuture = + kafkaProducer.send(new ProducerRecord<>(topic, lines.get(i))); + if (wait) { + sendFuture.get(); + } } } catch (Exception e) { // ignore diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java index 148f647e7998..40743fae67fb 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java @@ -719,7 +719,7 @@ public void testStarUpOptionLatest() throws Exception { // ---------- Write the Canal json into Kafka ------------------- List lines = readLines("kafka/canal/table/startupmode/canal-data-1.txt"); try { - writeRecordsToKafka(topic, lines); + writeRecordsToKafka(topic, lines, true); } catch (Exception e) { throw new Exception("Failed to write canal data to Kafka.", e); } @@ -764,7 +764,7 @@ public void testStarUpOptionTimestamp() throws Exception { // ---------- Write the Canal json into Kafka ------------------- List lines = readLines("kafka/canal/table/startupmode/canal-data-1.txt"); try { - writeRecordsToKafka(topic, lines); + writeRecordsToKafka(topic, lines, true); } catch (Exception e) { throw new Exception("Failed to write canal data to Kafka.", e); } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java index f2f8fc246682..e13ab86e9469 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java @@ -278,7 +278,7 @@ protected void testStarUpOptionLatest(String format) throws Exception { readLines( String.format("kafka/%s/table/startupmode/%s-data-1.txt", format, format)); try { - writeRecordsToKafka(topic, lines); + writeRecordsToKafka(topic, lines, true); } catch (Exception e) { throw new Exception(String.format("Failed to write %s data to Kafka.", format), e); }