Skip to content

Commit

Permalink
[test] Remove the loggingTimer to fix the CDC build error
Browse files Browse the repository at this point in the history
  • Loading branch information
Tan-JiaLiang committed Dec 30, 2024
1 parent c0023f0 commit ac127e0
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,17 @@
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
Expand All @@ -60,17 +56,12 @@
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

/** Base test class for Kafka synchronization. */
public abstract class KafkaActionITCaseBase extends CdcActionITCaseBase {
Expand All @@ -88,9 +79,6 @@ public abstract class KafkaActionITCaseBase extends CdcActionITCaseBase {
private static KafkaConsumer<String, String> kafkaConsumer;
private static AdminClient adminClient;

// Timer for scheduling logging task if the test hangs
private final Timer loggingTimer = new Timer("Debug Logging Timer");

@RegisterExtension
@Order(1)
public static final KafkaContainerExtension KAFKA_CONTAINER =
Expand Down Expand Up @@ -164,27 +152,9 @@ public static void afterAll() {
adminClient.close();
}

@BeforeEach
public void setup() {
// Probe Kafka broker status per 30 seconds
scheduleTimeoutLogger(
Duration.ofSeconds(30),
() -> {
// List all non-internal topics
final Map<String, TopicDescription> topicDescriptions =
describeExternalTopics();
LOG.info("Current existing topics: {}", topicDescriptions.keySet());

// Log status of topics
logTopicPartitionStatus(topicDescriptions);
});
}

@AfterEach
public void after() throws Exception {
super.after();
// Cancel timer for debug logging
cancelTimeoutLogger();
// Delete topics for avoid reusing topics of Kafka cluster
deleteTopics();
}
Expand All @@ -193,64 +163,6 @@ private void deleteTopics() throws ExecutionException, InterruptedException {
adminClient.deleteTopics(adminClient.listTopics().names().get()).all().get();
}

// ------------------------ For Debug Logging Purpose ----------------------------------

private void scheduleTimeoutLogger(Duration period, Runnable loggingAction) {
TimerTask timeoutLoggerTask =
new TimerTask() {
@Override
public void run() {
try {
loggingAction.run();
} catch (Exception e) {
throw new RuntimeException("Failed to execute logging action", e);
}
}
};
loggingTimer.schedule(timeoutLoggerTask, 0L, period.toMillis());
}

private void cancelTimeoutLogger() {
loggingTimer.cancel();
}

private Map<String, TopicDescription> describeExternalTopics() {
try {
final List<String> topics =
adminClient.listTopics().listings().get().stream()
.filter(listing -> !listing.isInternal())
.map(TopicListing::name)
.collect(Collectors.toList());
return adminClient.describeTopics(topics).allTopicNames().get();
} catch (Exception e) {
throw new RuntimeException("Failed to list Kafka topics", e);
}
}

private synchronized void logTopicPartitionStatus(
Map<String, TopicDescription> topicDescriptions) {
List<TopicPartition> partitions = new ArrayList<>();
topicDescriptions.forEach(
(topic, description) ->
description
.partitions()
.forEach(
tpInfo ->
partitions.add(
new TopicPartition(
topic, tpInfo.partition()))));
final Map<TopicPartition, Long> beginningOffsets =
kafkaConsumer.beginningOffsets(partitions);
final Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(partitions);
partitions.forEach(
partition ->
LOG.info(
"TopicPartition \"{}\": starting offset: {}, stopping offset: {}",
partition,
beginningOffsets.get(partition),
endOffsets.get(partition)));
}

public static Properties getStandardProps() {
Properties standardProps = new Properties();
standardProps.put("bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ public class KafkaDebeziumAvroSyncTableActionITCase extends KafkaActionITCaseBas

@BeforeEach
public void setup() {
super.setup();
// Init avro serializer for kafka key/value
Map<String, Object> props = new HashMap<>();
props.put(SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl());
Expand Down

0 comments on commit ac127e0

Please sign in to comment.