Skip to content

Commit

Permalink
Merge pull request #17 from companieshouse/feature/dsnd-577-add-chs-d…
Browse files Browse the repository at this point in the history
…eserialiser

Add a CHS delta deserialiser and the starting functionality for transforms
  • Loading branch information
bcullerton authored Mar 30, 2022
2 parents c8a73ff + 1ec3263 commit 1542ca0
Show file tree
Hide file tree
Showing 21 changed files with 982 additions and 27 deletions.
290 changes: 263 additions & 27 deletions pom.xml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package uk.gov.companieshouse.disqualifiedofficers.delta;

import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Import;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ActiveProfiles;
import uk.gov.companieshouse.disqualifiedofficers.delta.config.KafkaTestContainerConfig;

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@DirtiesContext
@Import(KafkaTestContainerConfig.class)
@ActiveProfiles({"test"})
public abstract class AbstractIntegrationTest {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package uk.gov.companieshouse.disqualifiedofficers.delta.config;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
import uk.gov.companieshouse.delta.ChsDelta;
import uk.gov.companieshouse.disqualifiedofficers.delta.serialization.ChsDeltaDeserializer;
import uk.gov.companieshouse.disqualifiedofficers.delta.serialization.ChsDeltaSerializer;
import uk.gov.companieshouse.kafka.producer.CHKafkaProducer;

@TestConfiguration
public class KafkaTestContainerConfig {

@MockBean
private CHKafkaProducer chKafkaProducer;

private final ChsDeltaDeserializer chsDeltaDeserializer;

@Autowired
public KafkaTestContainerConfig(ChsDeltaDeserializer chsDeltaDeserializer) {
this.chsDeltaDeserializer = chsDeltaDeserializer;
}

@Bean
public KafkaContainer kafkaContainer() {
KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
kafkaContainer.start();
return kafkaContainer;
}

@Bean
ConcurrentKafkaListenerContainerFactory<String, ChsDelta> listenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ChsDelta> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory());
return factory;
}

@Bean
public ConsumerFactory<String, ChsDelta> kafkaConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(kafkaContainer()),
new StringDeserializer(),
chsDeltaDeserializer);
}

@Bean
public Map<String, Object> consumerConfigs(KafkaContainer kafkaContainer) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "insolvency-delta-consumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ChsDeltaDeserializer.class);
return props;
}

@Bean
public ProducerFactory<String, ChsDelta> producerFactory(KafkaContainer kafkaContainer) {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ChsDeltaSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, ChsDelta> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory(kafkaContainer()));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package uk.gov.companieshouse.disqualifiedofficers.delta.consumer;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import uk.gov.companieshouse.delta.ChsDelta;
import uk.gov.companieshouse.disqualifiedofficers.delta.AbstractIntegrationTest;

public class DisqualifiedOfficersDeltaConsumerITest extends AbstractIntegrationTest {

@Autowired
public KafkaTemplate<String, ChsDelta> kafkaTemplate;

@Value("${disqualified-officers.delta.topic.main}")
private String mainTopic;

@Test
public void testSendingKafkaMessage() {
ChsDelta chsDelta = new ChsDelta("{ \"key\": \"value\" }", 1, "some_id");
kafkaTemplate.send(mainTopic, chsDelta);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package uk.gov.companieshouse.disqualifiedofficers.delta.config;

import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import uk.gov.companieshouse.api.InternalApiClient;
import uk.gov.companieshouse.environment.EnvironmentReader;
import uk.gov.companieshouse.environment.impl.EnvironmentReaderImpl;
import uk.gov.companieshouse.kafka.serialization.SerializerFactory;
import uk.gov.companieshouse.sdk.manager.ApiSdkManager;

@Configuration
public class ApplicationConfig implements WebMvcConfigurer {

@Bean
SerializerFactory serializerFactory() {
return new SerializerFactory();
}

@Bean
EnvironmentReader environmentReader() {
return new EnvironmentReaderImpl();
}

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public InternalApiClient internalApiClient() {
return ApiSdkManager.getPrivateSDK();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package uk.gov.companieshouse.disqualifiedofficers.delta.config;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import uk.gov.companieshouse.delta.ChsDelta;
import uk.gov.companieshouse.disqualifiedofficers.delta.serialization.ChsDeltaDeserializer;
import uk.gov.companieshouse.kafka.producer.Acks;
import uk.gov.companieshouse.kafka.producer.CHKafkaProducer;
import uk.gov.companieshouse.kafka.producer.ProducerConfig;

@Configuration
@Profile("!test")
public class KafkaConfig {

private final ChsDeltaDeserializer chsDeltaDeserializer;

private final String bootstrapServers;

public KafkaConfig(ChsDeltaDeserializer chsDeltaDeserializer, @Value("${spring.kafka"
+ ".bootstrap-servers}") String bootstrapServers) {
this.chsDeltaDeserializer = chsDeltaDeserializer;
this.bootstrapServers = bootstrapServers;
}

/**
* Kafka Consumer Factory.
*/
@Bean
public ConsumerFactory<String, ChsDelta> kafkaConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
chsDeltaDeserializer);
}

/**
* Kafka Listener Container Factory.
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ChsDelta> listenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ChsDelta> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory());
return factory;
}

/**
* CH Kafka Producer.
*/
@Bean
public CHKafkaProducer chKafkaProducer() {
final ProducerConfig config = createProducerConfig();
config.setRoundRobinPartitioner(true);
config.setAcks(Acks.WAIT_FOR_ALL);
config.setRetries(10);
config.setRequestTimeoutMilliseconds(3000);
return new CHKafkaProducer(config);
}

private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ChsDeltaDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

return props;
}

private ProducerConfig createProducerConfig() {
final ProducerConfig config = new ProducerConfig();
config.setBrokerAddresses(bootstrapServers.split(","));
return config;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package uk.gov.companieshouse.disqualifiedofficers.delta.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import uk.gov.companieshouse.logging.Logger;
import uk.gov.companieshouse.logging.LoggerFactory;

/**
* Configuration class for logging.
*/
@Configuration
public class LoggingConfig {

@Value("${logger.namespace}")
private String loggerNamespace;

/**
* Main application logger with component specific namespace.
*
* @return the {@link LoggerFactory} for the specified namespace
*/
@Bean
public Logger logger() {
return LoggerFactory.getLogger(loggerNamespace);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package uk.gov.companieshouse.disqualifiedofficers.delta.consumer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import uk.gov.companieshouse.delta.ChsDelta;
import uk.gov.companieshouse.disqualifiedofficers.delta.processor.DisqualifiedOfficersDeltaProcessor;
import uk.gov.companieshouse.logging.Logger;


@Component
public class DisqualifiedOfficersDeltaConsumer {

private final DisqualifiedOfficersDeltaProcessor deltaProcessor;
private final Logger logger;

@Autowired
public DisqualifiedOfficersDeltaConsumer(DisqualifiedOfficersDeltaProcessor deltaProcessor,
Logger logger) {
this.deltaProcessor = deltaProcessor;
this.logger = logger;
}

/**
* Receives Main topic messages.
*/
@KafkaListener(id = "${disqualified-officers.delta.main-id}",
topics = "${disqualified-officers.delta.topic.main}",
groupId = "${disqualified-officers.delta.group-id}",
containerFactory = "listenerContainerFactory")
public void receiveMainMessages(Message<ChsDelta> chsDeltaMessage) {
logger.info("A new message read from MAIN topic with payload: "
+ chsDeltaMessage.getPayload());
deltaProcessor.processDelta(chsDeltaMessage);
}

/**
* Receives Retry topic messages.
*/
@KafkaListener(id = "${disqualified-officers.delta.retry-id}",
topics = "${disqualified-officers.delta.topic.retry}",
groupId = "${disqualified-officers.delta.group-id}",
containerFactory = "listenerContainerFactory")
public void receiveRetryMessages(Message<ChsDelta> message) {
logger.info(String.format("A new message read from RETRY topic with payload:%s "
+ "and headers:%s ", message.getPayload(), message.getHeaders()));
deltaProcessor.processDelta(message);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package uk.gov.companieshouse.disqualifiedofficers.delta.exception;

public class NonRetryableErrorException extends RuntimeException {
public NonRetryableErrorException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package uk.gov.companieshouse.disqualifiedofficers.delta.exception;

public class RetryableErrorException extends RuntimeException {
public RetryableErrorException(String message) {
super(message);
}
}
Loading

0 comments on commit 1542ca0

Please sign in to comment.