Skip to content

Commit

Permalink
Mindre forbedring av standard Kafkaoppsett
Browse files Browse the repository at this point in the history
  • Loading branch information
jolarsen committed Mar 24, 2024
1 parent adab474 commit 2cb4385
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 95 deletions.
10 changes: 9 additions & 1 deletion integrasjon/kafka-properties/README.MD
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
# Kafka - Aiven - oppsett
Denne modulen inneholder en støttemetode for oppsett av Kafka Producers og Streams + standard String-serdes for topics som bruker JSON.
Denne modulen inneholder støttemetoder for oppsett av Kafka Producers og Consumer + String-standard for topics som bruker JSON.

Producer:
* Enten opprett en KafkaSender for hver topic og bruk send med key + message
* Eller opprett global KafkaSender med topic = null og bruk send med key + message + topic

Consumers
* Håndterere (ofte Transactional) implementerer KafkaMessageHandler / KafkaStringMessageHandler
* Applikasjonene definerer en Controllable som lager en KafkaConsumerManager av handlere og starter/stopper den

Enkelte applikasjoner konsumerer AVRO-topics - de må selv legge til avhengighet til confluent.io og sette opp Schema Registry og deserialisering av Avro
5 changes: 0 additions & 5 deletions integrasjon/kafka-properties/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,6 @@
<artifactId>kafka-clients</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,29 @@

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;

public class KafkaConsumerManager<K,V> {

private static final Duration CLOSE_TIMEOUT = Duration.ofSeconds(10);

private final List<KafkaMessageHandler<K,V>> handlers;
private final List<KafkaConsumerLoop<K,V>> consumers = new ArrayList<>();
private final List<KafkaConsumerLoop<K,V>> consumers;

public KafkaConsumerManager(List<KafkaMessageHandler<K, V>> handlers) {
this.handlers = handlers;

public KafkaConsumerManager(KafkaMessageHandler<K, V> handler) {
this(List.of(handler));
}

public KafkaConsumerManager(List<? extends KafkaMessageHandler<K, V>> handlers) {
this.consumers = handlers.stream().map(KafkaConsumerLoop::new).toList();
}

public void start(BiConsumer<String, Throwable> errorlogger) {
consumers.addAll(handlers.stream().map(KafkaConsumerLoop::new).toList());
consumers.forEach(c -> {
var ct = new Thread(c, "KC-" + c.handler().groupId());
ct.setUncaughtExceptionHandler((t, e) -> { errorlogger.accept(c.handler().topic(), e); stop(); });
Expand Down Expand Up @@ -53,7 +54,7 @@ public boolean allStopped() {
}

public String topicNames() {
return handlers.stream().map(KafkaMessageHandler::topic).collect(Collectors.joining(","));
return consumers.stream().map(KafkaConsumerLoop::handler).map(KafkaMessageHandler::topic).collect(Collectors.joining(","));
}

private record KafkaConsumerCloser<K,V>(List<KafkaConsumerLoop<K,V>> consumers) implements Runnable {
Expand All @@ -66,7 +67,7 @@ public void run() {
public static class KafkaConsumerLoop<K,V> implements Runnable {

private static final Duration POLL_TIMEOUT = Duration.ofMillis(100);
private static final Duration CLOSE_TIMEOUT = Duration.ofSeconds(10);

private enum ConsumerState { UNINITIALIZED, RUNNING, STOPPING, STOPPED }
private static final int RUNNING = ConsumerState.RUNNING.hashCode();

Expand All @@ -77,23 +78,25 @@ private enum ConsumerState { UNINITIALIZED, RUNNING, STOPPING, STOPPED }
public KafkaConsumerLoop(KafkaMessageHandler<K,V> handler) {
this.handler = handler;
}

// Implementert som at-least-once - krever passe idempotente handleRecord og regner med at de er Transactional (commit hvert kall)
// Hvis man vil komplisere ting så kan gå for exactly-once - håndtere OffsetCommit (set property ENABLE_AUTO_COMMIT_CONFIG false)
// Man må da være bevisst på samspill DB-commit og Offset-commit - lage en Transactional handleRecords for alle som er pollet.
// handleRecords må ta inn ConsumerRecords (alle pollet) og 2 callbacks som a) legger til konsumert og b) kaller commitAsync(konsumert)
// Dessuten må man catche WakeupException og andre exceptions og avstemme håndtering (OffsetCommit) med DB-TX-Commit
@Override
public void run() {
try(var key = handler.keyDeserializer().get(); var value = handler.valueDeserializer().get()) {
var props = KafkaProperties.forConsumerGenericValue(handler.groupId(), key, value, handler.autoOffsetReset());
var props = KafkaProperties.forConsumerGenericValue(handler.groupId(), key, value, handler.autoOffsetReset().orElse(null));
consumer = new KafkaConsumer<>(props, key, value);
consumer.subscribe(List.of(handler.topic()));
running.set(RUNNING);
while (running.get() == RUNNING) {
var records = consumer.poll(POLL_TIMEOUT);
// Hvis man vil komplisere ting så kan man håndtere både OffsetCommit og DBcommit i en Transcational handleRecords.
// handleRecords må ta inn alle som er pollet (records) og 2 callbacks som a) legger til konsumert og b) commitAsync(konsumert)
for (var record : records) {
handler.handleRecord(record.key(), record.value());
}
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
if (consumer != null) {
consumer.close(CLOSE_TIMEOUT);
Expand All @@ -108,9 +111,7 @@ public void shutdown() {
} else {
running.set(ConsumerState.STOPPED.hashCode());
}
if (consumer != null) {
consumer.wakeup();
}
// Kan vurdere consumer.wakeup() + håndtere WakeupException ovenfor - men har utelatt til fordel for en tilstand og polling med kort timeout
}

public KafkaMessageHandler<K, V> handler() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package no.nav.vedtak.felles.integrasjon.kafka;

import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
Expand All @@ -13,17 +12,7 @@
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.apache.kafka.streams.state.internals.BlockBasedTableConfigWithAccessibleCache;
import org.rocksdb.BloomFilter;
import org.rocksdb.LRUCache;
import org.rocksdb.Options;

import no.nav.foreldrepenger.konfig.Environment;

Expand Down Expand Up @@ -55,23 +44,18 @@ public static Properties forProducer() {
return props;
}

// Alle som konsumerer Json-meldinger
public static Properties forConsumerStringValue(String groupId) {
return forConsumerGenericValue(groupId, new StringDeserializer(), new StringDeserializer(), Optional.empty());
}

public static <K,V> Properties forConsumerGenericValue(String groupId, Deserializer<K> valueKey, Deserializer<V> valueSerde, Optional<OffsetResetStrategy> offsetReset) {
public static <K,V> Properties forConsumerGenericValue(String groupId, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, OffsetResetStrategy offsetReset) {
final Properties props = new Properties();

props.put(CommonClientConfigs.GROUP_ID_CONFIG, groupId);
props.put(CommonClientConfigs.CLIENT_ID_CONFIG, generateClientId());
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, getAivenConfig(AivenProperty.KAFKA_BROKERS));
offsetReset.ifPresent(or -> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, or.toString()));
Optional.ofNullable(offsetReset).ifPresent(or -> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, or.toString()));

putSecurity(props);

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, valueKey.getClass());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueSerde.getClass());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass());

// Polling
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100"); // Unngå store Tx dersom alle prosesseres innen samme Tx. Default 500
Expand All @@ -80,33 +64,13 @@ public static <K,V> Properties forConsumerGenericValue(String groupId, Deseriali
return props;
}

// Alle som konsumerer Json-meldinger
public static Properties forStreamsStringValue(String applicationId) {
return forStreamsGenericValue(applicationId, Serdes.String());
}

public static <T> Properties forStreamsGenericValue(String applicationId, Serde<T> valueSerde) {
final Properties props = new Properties();

props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.CLIENT_ID_CONFIG, generateClientId());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, getAivenConfig(AivenProperty.KAFKA_BROKERS));

putSecurity(props);

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueSerde.getClass());
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndFailExceptionHandler.class);

props.put(StreamsConfig.DEFAULT_DSL_STORE_CONFIG, StreamsConfig.IN_MEMORY);
props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, StreamsRocksReadOnly.class);

// Polling
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "200");
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000");
/*
* Streams-config er fjernet. Ved evt re-innføring husk at det trends read+write til topic for å unngå log-spamming.
* - APPLICATION_ID_CONFIG = tisvarende verdi som brukes for GROUP_ID_CONFIG (men kan ikke ha både streams og consumer)
* - KEY+VALUE SERDE - typisk Serdes.String() + derserialization_exception = LogAndFailExceptionHandler
* - Bør se på rocksdb-setting (se i historikk)
*/

return props;
}

// Trengs kun for de som skal konsumere Avro. Ellers ikke
public static String getAvroSchemaRegistryURL() {
Expand Down Expand Up @@ -148,26 +112,4 @@ private static void putSecurity(Properties props) {
}
}

public static class StreamsRocksReadOnly implements RocksDBConfigSetter {

@Override
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {

BlockBasedTableConfigWithAccessibleCache tableConfig = new BlockBasedTableConfigWithAccessibleCache();
tableConfig.setBlockCache(new LRUCache(1024 * 1024L));
tableConfig.setBlockSize(4096L);
tableConfig.setFilterPolicy(new BloomFilter());
tableConfig.setCacheIndexAndFilterBlocks(true);
options.setTableFormatConfig(tableConfig);

options.setWriteBufferSize(512 * 1024L);
options.setMaxWriteBufferNumber(2);
}

@Override
public void close(final String storeName, final Options options) {
// NOOP
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package no.nav.vedtak.felles.integrasjon.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
Expand All @@ -11,24 +12,31 @@ public class KafkaSender {
private final Producer<String, String> producer;
private final String topic;

public KafkaSender(Producer<String, String> producer, String topic) {
this.producer = producer;
public KafkaSender(String topic) {
this.producer = new KafkaProducer<>(KafkaProperties.forProducer());
this.topic = topic;
}

public RecordMetadata send(String key, String message) {
if (topic == null) {
throw kafkaPubliseringException("null", new IllegalArgumentException());
}
return send(key, message, this.topic);
}

public RecordMetadata send(String key, String message, String topic) {
try {
var record = new ProducerRecord<>(topic, key, message);
return producer.send(record).get();
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw kafkaPubliseringException(e);
throw kafkaPubliseringException(topic, e);
}
}

private IntegrasjonException kafkaPubliseringException(Exception e) {
private IntegrasjonException kafkaPubliseringException(String topic, Exception e) {
return new IntegrasjonException("F-KAFKA-925475", "Unexpected error when sending message to topic " + topic, e);
}

Expand Down

0 comments on commit 2cb4385

Please sign in to comment.