Skip to content

Commit

Permalink
polish
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Mar 30, 2024
1 parent c3b3211 commit 863decd
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
Expand Down Expand Up @@ -70,15 +72,16 @@
public abstract class KafkaActionITCaseBase extends CdcActionITCaseBase {

private final ObjectMapper objectMapper = new ObjectMapper();
protected KafkaProducer<String, String> kafkaProducer;
private KafkaConsumer<String, String> kafkaConsumer;
private AdminClient adminClient;

private static final Logger LOG = LoggerFactory.getLogger(KafkaActionITCaseBase.class);

private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
private static final Network NETWORK = Network.newNetwork();
private static final int zkTimeoutMills = 30000;
private static final int ZK_TIMEOUT_MILLIS = 30000;

protected static KafkaProducer<String, String> KAFKA_PRODUCER;
private static KafkaConsumer<String, String> KAFKA_CONSUMER;
private static AdminClient ADMIN_CLIENT;

// Timer for scheduling logging task if the test hangs
private final Timer loggingTimer = new Timer("Debug Logging Timer");
Expand All @@ -104,8 +107,8 @@ protected void doStart() {
// test run
.withEnv("KAFKA_LOG_RETENTION_MS", "-1");

@BeforeEach
public void setup() {
@BeforeAll
public static void beforeAll() {
// create KafkaProducer
Properties producerProperties = getStandardProps();
producerProperties.setProperty("retries", "0");
Expand All @@ -115,7 +118,7 @@ public void setup() {
producerProperties.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getCanonicalName());
kafkaProducer = new KafkaProducer<>(producerProperties);
KAFKA_PRODUCER = new KafkaProducer<>(producerProperties);

// create KafkaConsumer
Properties consumerProperties = getStandardProps();
Expand All @@ -126,11 +129,22 @@ public void setup() {
consumerProperties.setProperty(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getCanonicalName());
kafkaConsumer = new KafkaConsumer<>(consumerProperties);
KAFKA_CONSUMER = new KafkaConsumer<>(consumerProperties);

// create AdminClient
adminClient = AdminClient.create(getStandardProps());
ADMIN_CLIENT = AdminClient.create(getStandardProps());
}

@AfterAll
public static void afterAll() {
// Close Kafka objects
KAFKA_PRODUCER.close();
KAFKA_CONSUMER.close();
ADMIN_CLIENT.close();
}

@BeforeEach
public void setup() {
// Probe Kafka broker status per 30 seconds
scheduleTimeoutLogger(
Duration.ofSeconds(30),
Expand All @@ -152,14 +166,10 @@ public void after() throws Exception {
cancelTimeoutLogger();
// Delete topics for avoid reusing topics of Kafka cluster
deleteTopics();
// Close Kafka objects
kafkaProducer.close();
kafkaConsumer.close();
adminClient.close();
}

private void deleteTopics() throws ExecutionException, InterruptedException {
adminClient.deleteTopics(adminClient.listTopics().names().get()).all().get();
ADMIN_CLIENT.deleteTopics(ADMIN_CLIENT.listTopics().names().get()).all().get();
}

// ------------------------ For Debug Logging Purpose ----------------------------------
Expand All @@ -186,11 +196,11 @@ private void cancelTimeoutLogger() {
private Map<String, TopicDescription> describeExternalTopics() {
try {
final List<String> topics =
adminClient.listTopics().listings().get().stream()
ADMIN_CLIENT.listTopics().listings().get().stream()
.filter(listing -> !listing.isInternal())
.map(TopicListing::name)
.collect(Collectors.toList());
return adminClient.describeTopics(topics).all().get();
return ADMIN_CLIENT.describeTopics(topics).all().get();
} catch (Exception e) {
throw new RuntimeException("Failed to list Kafka topics", e);
}
Expand All @@ -208,8 +218,8 @@ private void logTopicPartitionStatus(Map<String, TopicDescription> topicDescript
new TopicPartition(
topic, tpInfo.partition()))));
final Map<TopicPartition, Long> beginningOffsets =
kafkaConsumer.beginningOffsets(partitions);
final Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(partitions);
KAFKA_CONSUMER.beginningOffsets(partitions);
final Map<TopicPartition, Long> endOffsets = KAFKA_CONSUMER.endOffsets(partitions);
partitions.forEach(
partition ->
LOG.info(
Expand All @@ -226,8 +236,8 @@ public static Properties getStandardProps() {
standardProps.put("enable.auto.commit", false);
standardProps.put("auto.offset.reset", "earliest");
standardProps.put("max.partition.fetch.bytes", 256);
standardProps.put("zookeeper.session.timeout.ms", zkTimeoutMills);
standardProps.put("zookeeper.connection.timeout.ms", zkTimeoutMills);
standardProps.put("zookeeper.session.timeout.ms", ZK_TIMEOUT_MILLIS);
standardProps.put("zookeeper.connection.timeout.ms", ZK_TIMEOUT_MILLIS);
standardProps.put("default.api.timeout.ms", "120000");
return standardProps;
}
Expand Down Expand Up @@ -276,7 +286,7 @@ protected void createTestTopic(String topic, int numPartitions, int replicationF
Map<String, Object> properties = new HashMap<>();
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
try {
adminClient
ADMIN_CLIENT
.createTopics(
Collections.singletonList(
new NewTopic(topic, numPartitions, (short) replicationFactor)))
Expand All @@ -300,7 +310,7 @@ protected void writeRecordsToKafka(String topic, String resourceDirFormat, Objec
Files.readAllLines(Paths.get(url.toURI())).stream()
.filter(this::isRecordLine)
.map(s -> new ProducerRecord<>(topic, (String) null, s))
.forEach(kafkaProducer::send);
.forEach(KAFKA_PRODUCER::send);
}

private boolean isRecordLine(String line) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public void testMessageWithNullValue() throws Exception {

writeRecordsToKafka(topic, "kafka/debezium/table/nullvalue/debezium-data-1.txt");
// write null value
kafkaProducer.send(new ProducerRecord<>(topic, null));
KAFKA_PRODUCER.send(new ProducerRecord<>(topic, null));
writeRecordsToKafka(topic, "kafka/debezium/table/nullvalue/debezium-data-2.txt");

Map<String, String> kafkaConfig = getBasicKafkaConfig();
Expand Down

0 comments on commit 863decd

Please sign in to comment.