Skip to content

Commit

Permalink
[hotfix] Fix unstable Kafka CDC starup mode tests (#3125)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin authored Apr 1, 2024
1 parent efbf547 commit 2d1a106
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -63,6 +64,7 @@
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -278,6 +280,10 @@ public static List<String> readLines(String resource) throws IOException {
}

void writeRecordsToKafka(String topic, List<String> lines) throws Exception {
writeRecordsToKafka(topic, lines, false);
}

void writeRecordsToKafka(String topic, List<String> lines, boolean wait) throws Exception {
Properties producerProperties = getStandardProps();
producerProperties.setProperty("retries", "0");
producerProperties.put(
Expand All @@ -289,7 +295,11 @@ void writeRecordsToKafka(String topic, List<String> lines) throws Exception {
try {
JsonNode jsonNode = objectMapper.readTree(lines.get(i));
if (!StringUtils.isEmpty(lines.get(i))) {
kafkaProducer.send(new ProducerRecord<>(topic, lines.get(i)));
Future<RecordMetadata> sendFuture =
kafkaProducer.send(new ProducerRecord<>(topic, lines.get(i)));
if (wait) {
sendFuture.get();
}
}
} catch (Exception e) {
// ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ public void testStarUpOptionLatest() throws Exception {
// ---------- Write the Canal json into Kafka -------------------
List<String> lines = readLines("kafka/canal/table/startupmode/canal-data-1.txt");
try {
writeRecordsToKafka(topic, lines);
writeRecordsToKafka(topic, lines, true);
} catch (Exception e) {
throw new Exception("Failed to write canal data to Kafka.", e);
}
Expand Down Expand Up @@ -764,7 +764,7 @@ public void testStarUpOptionTimestamp() throws Exception {
// ---------- Write the Canal json into Kafka -------------------
List<String> lines = readLines("kafka/canal/table/startupmode/canal-data-1.txt");
try {
writeRecordsToKafka(topic, lines);
writeRecordsToKafka(topic, lines, true);
} catch (Exception e) {
throw new Exception("Failed to write canal data to Kafka.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ protected void testStarUpOptionLatest(String format) throws Exception {
readLines(
String.format("kafka/%s/table/startupmode/%s-data-1.txt", format, format));
try {
writeRecordsToKafka(topic, lines);
writeRecordsToKafka(topic, lines, true);
} catch (Exception e) {
throw new Exception(String.format("Failed to write %s data to Kafka.", format), e);
}
Expand Down

0 comments on commit 2d1a106

Please sign in to comment.