Skip to content

Commit

Permalink
[cdc][refactor] Refactor records writing in Kafka CDC tests
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Apr 1, 2024
1 parent 85f1cfd commit 1a74d61
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 636 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.paimon.flink.action.cdc.CdcActionITCaseBase;
import org.apache.paimon.utils.StringUtils;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.flink.util.DockerImageVersions;
Expand All @@ -33,11 +33,15 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
Expand All @@ -50,10 +54,9 @@
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -67,8 +70,6 @@
import java.util.concurrent.Future;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;

/** Base test class for Kafka synchronization. */
public abstract class KafkaActionITCaseBase extends CdcActionITCaseBase {

Expand All @@ -78,7 +79,11 @@ public abstract class KafkaActionITCaseBase extends CdcActionITCaseBase {

private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
private static final Network NETWORK = Network.newNetwork();
private static final int zkTimeoutMills = 30000;
private static final int ZK_TIMEOUT_MILLIS = 30000;

protected static KafkaProducer<String, String> KAFKA_PRODUCER;
private static KafkaConsumer<String, String> KAFKA_CONSUMER;
private static AdminClient ADMIN_CLIENT;

// Timer for scheduling logging task if the test hangs
private final Timer loggingTimer = new Timer("Debug Logging Timer");
Expand All @@ -104,6 +109,42 @@ protected void doStart() {
// test run
.withEnv("KAFKA_LOG_RETENTION_MS", "-1");

@BeforeAll
public static void beforeAll() {
// create KafkaProducer
Properties producerProperties = getStandardProps();
producerProperties.setProperty("retries", "0");
producerProperties.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getCanonicalName());
producerProperties.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getCanonicalName());
KAFKA_PRODUCER = new KafkaProducer<>(producerProperties);

// create KafkaConsumer
Properties consumerProperties = getStandardProps();
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-tests-debugging");
consumerProperties.setProperty(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getCanonicalName());
consumerProperties.setProperty(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getCanonicalName());
KAFKA_CONSUMER = new KafkaConsumer<>(consumerProperties);

// create AdminClient
ADMIN_CLIENT = AdminClient.create(getStandardProps());
}

@AfterAll
public static void afterAll() {
// Close Kafka objects
KAFKA_PRODUCER.close();
KAFKA_CONSUMER.close();
ADMIN_CLIENT.close();
}

@BeforeEach
public void setup() {
// Probe Kafka broker status per 30 seconds
Expand All @@ -130,8 +171,7 @@ public void after() throws Exception {
}

private void deleteTopics() throws ExecutionException, InterruptedException {
final AdminClient adminClient = AdminClient.create(getStandardProps());
adminClient.deleteTopics(adminClient.listTopics().names().get()).all().get();
ADMIN_CLIENT.deleteTopics(ADMIN_CLIENT.listTopics().names().get()).all().get();
}

// ------------------------ For Debug Logging Purpose ----------------------------------
Expand All @@ -156,29 +196,19 @@ private void cancelTimeoutLogger() {
}

private Map<String, TopicDescription> describeExternalTopics() {
try (final AdminClient adminClient = AdminClient.create(getStandardProps())) {
try {
final List<String> topics =
adminClient.listTopics().listings().get().stream()
ADMIN_CLIENT.listTopics().listings().get().stream()
.filter(listing -> !listing.isInternal())
.map(TopicListing::name)
.collect(Collectors.toList());

return adminClient.describeTopics(topics).all().get();
return ADMIN_CLIENT.describeTopics(topics).all().get();
} catch (Exception e) {
throw new RuntimeException("Failed to list Kafka topics", e);
}
}

private void logTopicPartitionStatus(Map<String, TopicDescription> topicDescriptions) {
final Properties properties = getStandardProps();
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-tests-debugging");
properties.setProperty(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getCanonicalName());
properties.setProperty(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getCanonicalName());
final KafkaConsumer<?, ?> consumer = new KafkaConsumer<String, String>(properties);
List<TopicPartition> partitions = new ArrayList<>();
topicDescriptions.forEach(
(topic, description) ->
Expand All @@ -189,8 +219,9 @@ private void logTopicPartitionStatus(Map<String, TopicDescription> topicDescript
partitions.add(
new TopicPartition(
topic, tpInfo.partition()))));
final Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(partitions);
final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitions);
final Map<TopicPartition, Long> beginningOffsets =
KAFKA_CONSUMER.beginningOffsets(partitions);
final Map<TopicPartition, Long> endOffsets = KAFKA_CONSUMER.endOffsets(partitions);
partitions.forEach(
partition ->
LOG.info(
Expand All @@ -207,8 +238,8 @@ public static Properties getStandardProps() {
standardProps.put("enable.auto.commit", false);
standardProps.put("auto.offset.reset", "earliest");
standardProps.put("max.partition.fetch.bytes", 256);
standardProps.put("zookeeper.session.timeout.ms", zkTimeoutMills);
standardProps.put("zookeeper.connection.timeout.ms", zkTimeoutMills);
standardProps.put("zookeeper.session.timeout.ms", ZK_TIMEOUT_MILLIS);
standardProps.put("zookeeper.connection.timeout.ms", ZK_TIMEOUT_MILLIS);
standardProps.put("default.api.timeout.ms", "120000");
return standardProps;
}
Expand Down Expand Up @@ -253,11 +284,12 @@ public KafkaSyncDatabaseActionBuilder(Map<String, String> kafkaConfig) {
}
}

public void createTestTopic(String topic, int numPartitions, int replicationFactor) {
protected void createTestTopic(String topic, int numPartitions, int replicationFactor) {
Map<String, Object> properties = new HashMap<>();
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
try (AdminClient admin = AdminClient.create(properties)) {
admin.createTopics(
try {
ADMIN_CLIENT
.createTopics(
Collections.singletonList(
new NewTopic(topic, numPartitions, (short) replicationFactor)))
.all()
Expand All @@ -271,42 +303,41 @@ public void createTestTopic(String topic, int numPartitions, int replicationFact
}
}

public static List<String> readLines(String resource) throws IOException {
final URL url =
KafkaCanalSyncTableActionITCase.class.getClassLoader().getResource(resource);
assertThat(url).isNotNull();
java.nio.file.Path path = new File(url.getFile()).toPath();
return Files.readAllLines(path);
protected void writeRecordsToKafka(String topic, String resourceDirFormat, Object... args)
throws Exception {
writeRecordsToKafka(topic, resourceDirFormat, false, args);
}

void writeRecordsToKafka(String topic, List<String> lines) throws Exception {
writeRecordsToKafka(topic, lines, false);
protected void writeRecordsToKafka(
String topic, String resourceDirFormat, boolean wait, Object... args) throws Exception {
URL url =
KafkaCanalSyncTableActionITCase.class
.getClassLoader()
.getResource(String.format(resourceDirFormat, args));
Files.readAllLines(Paths.get(url.toURI())).stream()
.filter(this::isRecordLine)
.forEach(r -> send(topic, r, wait));
}

void writeRecordsToKafka(String topic, List<String> lines, boolean wait) throws Exception {
Properties producerProperties = getStandardProps();
producerProperties.setProperty("retries", "0");
producerProperties.put(
"key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProperties.put(
"value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer kafkaProducer = new KafkaProducer(producerProperties);
for (int i = 0; i < lines.size(); i++) {
private boolean isRecordLine(String line) {
try {
objectMapper.readTree(line);
return !StringUtils.isEmpty(line);
} catch (JsonProcessingException e) {
return false;
}
}

private void send(String topic, String record, boolean wait) {
Future<RecordMetadata> sendFuture =
KAFKA_PRODUCER.send(new ProducerRecord<>(topic, record));
if (wait) {
try {
JsonNode jsonNode = objectMapper.readTree(lines.get(i));
if (!StringUtils.isEmpty(lines.get(i))) {
Future<RecordMetadata> sendFuture =
kafkaProducer.send(new ProducerRecord<>(topic, lines.get(i)));
if (wait) {
sendFuture.get();
}
}
} catch (Exception e) {
// ignore
sendFuture.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}

kafkaProducer.close();
}

/** Kafka container extension for junit5. */
Expand Down
Loading

0 comments on commit 1a74d61

Please sign in to comment.