From 91115fdcae1ea5bcc48cc2130704d0eab5f31ce2 Mon Sep 17 00:00:00 2001 From: Johan Blomgren Date: Wed, 30 Oct 2024 09:55:25 +0100 Subject: [PATCH] =?UTF-8?q?Lagt=20til=20KafkaErrorHandler=20som=20er=20kop?= =?UTF-8?q?i=20fra=20familie-felles=20for=20=C3=A5=20kunne=20skrive=20seg?= =?UTF-8?q?=20bort=20fra=20=C3=A5=20ha=20avhengigheter=20til=20familie-fel?= =?UTF-8?q?les=20(#75)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- kafka/build.gradle.kts | 15 ++++ .../libs/kafka/KafkaErrorHandler.kt | 87 +++++++++++++++++++ .../libs/kafka/KafkaErrorHandlerTest.kt | 65 ++++++++++++++ settings.gradle.kts | 1 + 4 files changed, 168 insertions(+) create mode 100644 kafka/build.gradle.kts create mode 100644 kafka/main/no/nav/tilleggsstonader/libs/kafka/KafkaErrorHandler.kt create mode 100644 kafka/test/no/nav/tilleggsstonader/libs/kafka/KafkaErrorHandlerTest.kt diff --git a/kafka/build.gradle.kts b/kafka/build.gradle.kts new file mode 100644 index 0000000..a9c0b06 --- /dev/null +++ b/kafka/build.gradle.kts @@ -0,0 +1,15 @@ +plugins { + kotlin("plugin.spring") version "2.0.20" +} + +dependencies { + implementation(project(":log")) + + implementation("org.springframework.kafka:spring-kafka") + implementation("org.slf4j:slf4j-api") + +} + +tasks.sourcesJar { + duplicatesStrategy = DuplicatesStrategy.WARN +} diff --git a/kafka/main/no/nav/tilleggsstonader/libs/kafka/KafkaErrorHandler.kt b/kafka/main/no/nav/tilleggsstonader/libs/kafka/KafkaErrorHandler.kt new file mode 100644 index 0000000..61371b3 --- /dev/null +++ b/kafka/main/no/nav/tilleggsstonader/libs/kafka/KafkaErrorHandler.kt @@ -0,0 +1,87 @@ +package no.nav.tilleggsstonader.libs.kafka + +import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler +import org.springframework.kafka.listener.MessageListenerContainer +import org.springframework.scheduling.TaskScheduler +import org.springframework.stereotype.Component +import java.time.Duration +import java.time.Instant +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLong + +@Component +class KafkaErrorHandler(private val taskScheduler: TaskScheduler) : CommonContainerStoppingErrorHandler() { + private val logger: Logger = LoggerFactory.getLogger(javaClass) + private val secureLogger: Logger = LoggerFactory.getLogger("secureLogger") + + private val counter = AtomicInteger(0) + private val lastError = AtomicLong(0) + + override fun handleRemaining( + e: Exception, + records: List>, + consumer: Consumer<*, *>, + container: MessageListenerContainer, + ) { + if (records.isEmpty()) { + logger.error( + "Feil ved konsumering av melding. Ingen records. ${consumer.subscription()} (Forsøk nr ${counter.getAndAdd(1)})", + e, + ) + scheduleRestart(e, records, consumer, container, "Ukjent topic") + } else { + records.first().run { + logger.error( + "Feil ved konsumering av melding fra ${this.topic()}. id ${this.key()}, " + + "offset: ${this.offset()}, partition: ${this.partition()} (Forsøk nr ${counter.getAndAdd(1)})", + ) + secureLogger.error("${this.topic()} - Problemer med prosessering av $records (Forsøk nr ${counter.getAndAdd(1)})", e) + scheduleRestart(e, records, consumer, container, this.topic()) + } + } + } + + private fun scheduleRestart( + e: Exception, + records: List>, + consumer: Consumer<*, *>, + container: MessageListenerContainer, + topic: String, + ) { + val now = System.currentTimeMillis() + if (now - lastError.getAndSet(now) > COUNTER_RESET_TIME) { + counter.set(0) + } + val numErrors = counter.incrementAndGet() + val delayTime = if (numErrors > SLOW_ERROR_COUNT) LONG_TIME else SHORT_TIME * numErrors + taskScheduler.schedule( + { + try { + logger.warn("Starter kafka container for {}", topic) + container.start() + } catch (exception: Exception) { + logger.error("Feil oppstod ved venting og oppstart av kafka container", exception) + } + }, + Instant.ofEpochMilli(now + delayTime), + ) + logger.warn("Stopper kafka container for {} i {}", topic, Duration.ofMillis(delayTime).toString()) + super.handleRemaining( + Exception("Sjekk securelogs for mer info - ${e::class.java.simpleName}"), + records, + consumer, + container, + ) + } + + companion object { + private val LONG_TIME = Duration.ofHours(3).toMillis() + private val SHORT_TIME = Duration.ofSeconds(20).toMillis() + private const val SLOW_ERROR_COUNT = 10 + private val COUNTER_RESET_TIME = SHORT_TIME * SLOW_ERROR_COUNT * 2 + } +} diff --git a/kafka/test/no/nav/tilleggsstonader/libs/kafka/KafkaErrorHandlerTest.kt b/kafka/test/no/nav/tilleggsstonader/libs/kafka/KafkaErrorHandlerTest.kt new file mode 100644 index 0000000..248cc19 --- /dev/null +++ b/kafka/test/no/nav/tilleggsstonader/libs/kafka/KafkaErrorHandlerTest.kt @@ -0,0 +1,65 @@ +package no.nav.tilleggsstonader.libs.kafka + +import io.mockk.MockKAnnotations +import io.mockk.clearAllMocks +import io.mockk.impl.annotations.MockK +import io.mockk.mockk +import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.catchThrowable +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.TestInstance +import org.springframework.kafka.listener.MessageListenerContainer +import org.springframework.scheduling.TaskScheduler + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class KafkaErrorHandlerTest { + @MockK(relaxed = true) + lateinit var container: MessageListenerContainer + + @MockK(relaxed = true) + lateinit var consumer: Consumer<*, *> + + private val taskScheduler: TaskScheduler = mockk(relaxed = true) + + private val errorHandler: KafkaErrorHandler = KafkaErrorHandler(taskScheduler) + + @BeforeEach + internal fun setUp() { + MockKAnnotations.init(this) + clearAllMocks() + } + + @Test + fun `skal stoppe container hvis man mottar feil med en tom liste med records`() { + val throwable = + catchThrowable { + errorHandler.handleRemaining(RuntimeException("Feil i test"), emptyList(), consumer, container) + } + + assertThat(throwable) + .hasStackTraceContaining("Sjekk securelogs for mer info") + .hasCauseExactlyInstanceOf(Exception::class.java) + assertThat(throwable.stackTraceToString()).doesNotContain("Feil i test") + } + + @Test + fun `skal stoppe container hvis man mottar feil med en liste med records`() { + val consumerRecord = ConsumerRecord("topic", 1, 1, 1, "record") + val throwable = + catchThrowable { + errorHandler.handleRemaining( + RuntimeException("Feil i test"), + listOf(consumerRecord), + consumer, + container, + ) + } + assertThat(throwable) + .hasStackTraceContaining("Sjekk securelogs for mer info") + .hasCauseExactlyInstanceOf(Exception::class.java) + assertThat(throwable.stackTraceToString()).doesNotContain("Feil i test") + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 8a41da6..8a9aaea 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -6,6 +6,7 @@ plugins { include("http-client") include("log") +include("kafka") include("sikkerhet") include("util") include("unleash")