Skip to content

Commit

Permalink
Update project for Spring Boot 3.3 and modern Java
Browse files Browse the repository at this point in the history
Also drops the millisecond based config support, as that has not been mentioned in the README for many years.

Due to breaking change, upping the major version
  • Loading branch information
fatso83 committed Aug 9, 2024
1 parent 1673605 commit 7fcacde
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 136 deletions.
17 changes: 12 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>me.bvn13.kafka.health</groupId>
<artifactId>kafka-health-check</artifactId>
<version>1.5.6-SNAPSHOT</version>
<version>2.0.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>Kafka Health Check</name>
Expand All @@ -16,29 +16,36 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.7.1</version>
<version>3.3.1</version>
<relativePath />
</parent>

<properties>
<!-- Java -->
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>16</maven.compiler.source>
<maven.compiler.target>16</maven.compiler.target>

<!-- Encoding -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

<!-- Versions -->
<nexus.url>https://s01.oss.sonatype.org</nexus.url>
<guava.version>30.1.1-jre</guava.version>
<guava.version>32.0.0-jre</guava.version>
<maven-deploy-plugin.version>2.8.2</maven-deploy-plugin.version>
<nexus-staging-maven-plugin.version>1.6.13</nexus-staging-maven-plugin.version>
<maven-gpg-plugin.version>1.6</maven-gpg-plugin.version>
<gitflow-maven-plugin.version>1.18.0</gitflow-maven-plugin.version>
</properties>

<dependencies>

<dependency>
<groupId>jakarta.annotation</groupId>
<artifactId>jakarta.annotation-api</artifactId>
<version>3.0.0</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.StreamSupport;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;

public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {

Expand Down Expand Up @@ -73,9 +73,9 @@ public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties
MeterRegistry meterRegistry) {

logger.info("Initializing kafka health check with properties: {}", kafkaHealthProperties);
this.topic = kafkaHealthProperties.getTopic();
this.sendReceiveTimeout = kafkaHealthProperties.getSendReceiveTimeout();
this.pollTimeout = kafkaHealthProperties.getPollTimeout();
this.topic = kafkaHealthProperties.topic();
this.sendReceiveTimeout = kafkaHealthProperties.sendReceiveTimeout();
this.pollTimeout = kafkaHealthProperties.pollTimeout();

Map<String, Object> kafkaConsumerPropertiesCopy = new HashMap<>(kafkaConsumerProperties);

Expand All @@ -92,7 +92,7 @@ public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties
this.running = new AtomicBoolean(true);
this.cache = Caffeine.newBuilder()
.expireAfterWrite(sendReceiveTimeout)
.maximumSize(kafkaHealthProperties.getCache().getMaximumSize())
.maximumSize(kafkaHealthProperties.cache().getMaximumSize())
.build();

enableCacheMetrics(cache, meterRegistry);
Expand All @@ -102,7 +102,7 @@ public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties
}

@PostConstruct
void subscribeAndSendMessage() throws InterruptedException {
void subscribeAndSendMessage() {
subscribeToTopic();

if (kafkaCommunicationResult.isFailure()) {
Expand Down Expand Up @@ -137,7 +137,7 @@ private String getUniqueConsumerGroupId(Map<String, Object> kafkaConsumerPropert
}
}

private void subscribeToTopic() throws InterruptedException {
private void subscribeToTopic() {

final AtomicBoolean subscribed = new AtomicBoolean(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ public KafkaHealthProperties kafkaHealthProperties() {
@ConditionalOnMissingBean(KafkaConsumingHealthIndicator.class)
public KafkaConsumingHealthIndicator kafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties,
KafkaProperties kafkaProperties) {
return new KafkaConsumingHealthIndicator(kafkaHealthProperties, kafkaProperties.buildConsumerProperties(),
kafkaProperties.buildProducerProperties());
return new KafkaConsumingHealthIndicator(kafkaHealthProperties, kafkaProperties.buildConsumerProperties(null),
kafkaProperties.buildProducerProperties(null));
}

}
62 changes: 13 additions & 49 deletions src/main/java/me/bvn13/kafka/health/KafkaHealthProperties.java
Original file line number Diff line number Diff line change
@@ -1,60 +1,24 @@
package me.bvn13.kafka.health;

import java.time.Duration;

public class KafkaHealthProperties {

private String topic = "health-checks";
private Duration sendReceiveTimeout = Duration.ofMillis(2500);
private Duration pollTimeout = Duration.ofMillis(200);
private KafkaHealthCheckCacheProperties cache = new KafkaHealthCheckCacheProperties();

public String getTopic() {
return topic;
}

public void setTopic(String topic) {
this.topic = topic;
}

public Duration getSendReceiveTimeout() {
return sendReceiveTimeout;
}
import org.springframework.boot.context.properties.bind.ConstructorBinding;

public void setSendReceiveTimeout(Duration sendReceiveTimeout) {
this.sendReceiveTimeout = sendReceiveTimeout;
}

@Deprecated
public void setSendReceiveTimeoutMs(long sendReceiveTimeoutMs) {
setSendReceiveTimeout(Duration.ofMillis(sendReceiveTimeoutMs));
}

public Duration getPollTimeout() {
return pollTimeout;
}

public void setPollTimeout(Duration pollTimeout) {
this.pollTimeout = pollTimeout;
}
import java.time.Duration;

@Deprecated
public void setPollTimeoutMs(long pollTimeoutMs) {
setPollTimeout(Duration.ofMillis(pollTimeoutMs));
}
public record KafkaHealthProperties (String topic, Duration sendReceiveTimeout, Duration pollTimeout, KafkaHealthCheckCacheProperties cache){

public KafkaHealthCheckCacheProperties getCache() {
return cache;
@ConstructorBinding
public KafkaHealthProperties {
topic = coalesce(topic,"health-checks");
sendReceiveTimeout = coalesce(sendReceiveTimeout, Duration.ofMillis(2500));
pollTimeout = coalesce(pollTimeout, Duration.ofMillis(200));
cache = coalesce(cache, new KafkaHealthCheckCacheProperties());
}

public void setCache(KafkaHealthCheckCacheProperties cache) {
this.cache = cache;
public KafkaHealthProperties(){
this(null,null,null,null);
}

@Override
public String toString() {
return "KafkaHealthProperties{" + "topic='" + topic + '\'' + ", sendReceiveTimeout=" + sendReceiveTimeout +
", pollTimeout=" + pollTimeout + ", cacheProperties=" +
cache + '}';
public static <T> T coalesce(T a, T b) {
return a == null ? b : a;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.EmbeddedKafkaZKBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.core.BrokerAddress;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.context.junit.jupiter.SpringExtension;

Expand All @@ -31,53 +31,54 @@
@EmbeddedKafka(topics = TOPIC)
public class KafkaConsumingHealthIndicatorTest {

static final String TOPIC = "health-checks";
static final String TOPIC = "health-checks";

private Consumer<String, String> consumer;
private Consumer<String, String> consumer;

@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;

@BeforeEach
public void setUp() {
Map<String, Object> consumerConfigs =
new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
consumer = new DefaultKafkaConsumerFactory<>(consumerConfigs, new StringDeserializer(),
new StringDeserializer()).createConsumer();
consumer.subscribe(Collections.singletonList(TOPIC));
consumer.poll(Duration.ofSeconds(1));
}
@BeforeEach
public void setUp() {
Map<String, Object> consumerConfigs =
new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
consumer = new DefaultKafkaConsumerFactory<>(consumerConfigs, new StringDeserializer(),
new StringDeserializer()).createConsumer();
consumer.subscribe(Collections.singletonList(TOPIC));
consumer.poll(Duration.ofSeconds(1));
}

@AfterEach
public void tearDown() {
consumer.close();
embeddedKafkaBroker.getKafkaServers().forEach(KafkaServer::shutdown);
embeddedKafkaBroker.getKafkaServers().forEach(KafkaServer::awaitShutdown);
}
@AfterEach
public void tearDown() {
consumer.close();
if(embeddedKafkaBroker instanceof final EmbeddedKafkaZKBroker embeddedKafkaZKBroker) {
embeddedKafkaZKBroker.getKafkaServers().forEach(KafkaServer::shutdown);
embeddedKafkaZKBroker.getKafkaServers().forEach(KafkaServer::awaitShutdown);
}
}

@Test
public void kafkaIsDown() throws Exception {
final KafkaHealthProperties kafkaHealthProperties = new KafkaHealthProperties();
kafkaHealthProperties.setTopic(TOPIC);
@Test
public void given_that_the_server_is_first_up_it_should_report_kafka_as_down_when_the_server_has_been_shutdown() throws Exception {
final KafkaHealthProperties kafkaHealthProperties = new KafkaHealthProperties(TOPIC, Duration.ofMillis(100),Duration.ofMillis(100),null);

final KafkaProperties kafkaProperties = new KafkaProperties();
final BrokerAddress[] brokerAddresses = embeddedKafkaBroker.getBrokerAddresses();
kafkaProperties.setBootstrapServers(Collections.singletonList(brokerAddresses[0].toString()));
final KafkaProperties kafkaProperties = new KafkaProperties();
final var brokerAddresses = embeddedKafkaBroker.getBrokersAsString();
kafkaProperties.setBootstrapServers(Collections.singletonList(brokerAddresses));

final KafkaConsumingHealthIndicator healthIndicator =
new KafkaConsumingHealthIndicator(kafkaHealthProperties, kafkaProperties.buildConsumerProperties(),
kafkaProperties.buildProducerProperties());
healthIndicator.subscribeAndSendMessage();
final KafkaConsumingHealthIndicator healthIndicator =
new KafkaConsumingHealthIndicator(kafkaHealthProperties, kafkaProperties.buildConsumerProperties(null),
kafkaProperties.buildProducerProperties(null));
healthIndicator.subscribeAndSendMessage();

Health health = healthIndicator.health();
assertThat(health.getStatus()).isEqualTo(Status.UP);
Health health = healthIndicator.health();
assertThat(health.getStatus()).isEqualTo(Status.UP);

shutdownKafka();
shutdownKafka();

Awaitility.await().untilAsserted(() -> assertThat(healthIndicator.health().getStatus()).isEqualTo(Status.DOWN));
}
Awaitility.await().untilAsserted(() -> assertThat(healthIndicator.health().getStatus()).isEqualTo(Status.DOWN));
}

private void shutdownKafka() {
this.embeddedKafkaBroker.destroy();
}
private void shutdownKafka() {
this.embeddedKafkaBroker.destroy();
}
}
47 changes: 14 additions & 33 deletions src/test/java/me/bvn13/kafka/health/KafkaHealthPropertiesTest.java
Original file line number Diff line number Diff line change
@@ -1,55 +1,36 @@
package me.bvn13.kafka.health;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.params.provider.Arguments.arguments;

import com.google.common.collect.ImmutableMap;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.boot.context.properties.source.ConfigurationPropertySource;
import org.springframework.boot.context.properties.source.MapConfigurationPropertySource;

import java.time.Duration;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;

public class KafkaHealthPropertiesTest {

// @formatter:off
// @formatter:off
private static final ConfigurationPropertySource DURATION_PROPERTY_SOURCE = new MapConfigurationPropertySource(ImmutableMap.of(
"kafka.health.topic", "custom-topic",
"kafka.health.send-receive-timeout", "1m",
"kafka.health.poll-timeout", "2s",
"kafka.health.cache.maximum-size", "42"
));

private static final ConfigurationPropertySource MILLISECONDS_PROPERTY_SOURCE = new MapConfigurationPropertySource(ImmutableMap.of(
"kafka.health.topic", "custom-topic",
"kafka.health.send-receive-timeout-ms", "60000",
"kafka.health.poll-timeout-ms", "2000",
"kafka.health.cache.maximum-size", "42"
));
// @formatter:on

@ParameterizedTest(name = "using {0} based setters")
@MethodSource("configurationPropertySources")
@SuppressWarnings("unused")
public void test_that_properties_bind_to_KafkaHealthProperties(String sourceName,
ConfigurationPropertySource propertySource) {

KafkaHealthProperties kafkaHealthProperties =
new Binder(propertySource).bind("kafka.health", KafkaHealthProperties.class).get();

assertThat(kafkaHealthProperties.getTopic()).isEqualTo("custom-topic");
assertThat(kafkaHealthProperties.getSendReceiveTimeout()).isEqualTo(Duration.ofMinutes(1));
assertThat(kafkaHealthProperties.getPollTimeout()).isEqualTo(Duration.ofSeconds(2));
assertThat(kafkaHealthProperties.getCache().getMaximumSize()).isEqualTo(42);
}
@SuppressWarnings("unused")
@Test
public void test_that_properties_bind_to_KafkaHealthProperties() {

static Stream<Arguments> configurationPropertySources() {
return Stream.of(arguments("Duration", DURATION_PROPERTY_SOURCE),
arguments("long (milliseconds)", MILLISECONDS_PROPERTY_SOURCE));
}
KafkaHealthProperties kafkaHealthProperties =
new Binder(DURATION_PROPERTY_SOURCE).bind("kafka.health", KafkaHealthProperties.class).get();

assertThat(kafkaHealthProperties.topic()).isEqualTo("custom-topic");
assertThat(kafkaHealthProperties.sendReceiveTimeout()).isEqualTo(Duration.ofMinutes(1));
assertThat(kafkaHealthProperties.pollTimeout()).isEqualTo(Duration.ofSeconds(2));
assertThat(kafkaHealthProperties.cache().getMaximumSize()).isEqualTo(42);
}
}

0 comments on commit 7fcacde

Please sign in to comment.