Skip to content

Commit

Permalink
Lagt til KafkaErrorHandler som er kopi fra familie-felles for å kunne…
Browse files Browse the repository at this point in the history
… skrive seg bort fra å ha avhengigheter til familie-felles (#75)
  • Loading branch information
blommish authored Oct 30, 2024
1 parent 22d745f commit 91115fd
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 0 deletions.
15 changes: 15 additions & 0 deletions kafka/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
plugins {
kotlin("plugin.spring") version "2.0.20"
}

dependencies {
implementation(project(":log"))

implementation("org.springframework.kafka:spring-kafka")
implementation("org.slf4j:slf4j-api")

}

tasks.sourcesJar {
duplicatesStrategy = DuplicatesStrategy.WARN
}
87 changes: 87 additions & 0 deletions kafka/main/no/nav/tilleggsstonader/libs/kafka/KafkaErrorHandler.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package no.nav.tilleggsstonader.libs.kafka

import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler
import org.springframework.kafka.listener.MessageListenerContainer
import org.springframework.scheduling.TaskScheduler
import org.springframework.stereotype.Component
import java.time.Duration
import java.time.Instant
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong

@Component
class KafkaErrorHandler(private val taskScheduler: TaskScheduler) : CommonContainerStoppingErrorHandler() {
private val logger: Logger = LoggerFactory.getLogger(javaClass)
private val secureLogger: Logger = LoggerFactory.getLogger("secureLogger")

private val counter = AtomicInteger(0)
private val lastError = AtomicLong(0)

override fun handleRemaining(
e: Exception,
records: List<ConsumerRecord<*, *>>,
consumer: Consumer<*, *>,
container: MessageListenerContainer,
) {
if (records.isEmpty()) {
logger.error(
"Feil ved konsumering av melding. Ingen records. ${consumer.subscription()} (Forsøk nr ${counter.getAndAdd(1)})",
e,
)
scheduleRestart(e, records, consumer, container, "Ukjent topic")
} else {
records.first().run {
logger.error(
"Feil ved konsumering av melding fra ${this.topic()}. id ${this.key()}, " +
"offset: ${this.offset()}, partition: ${this.partition()} (Forsøk nr ${counter.getAndAdd(1)})",
)
secureLogger.error("${this.topic()} - Problemer med prosessering av $records (Forsøk nr ${counter.getAndAdd(1)})", e)
scheduleRestart(e, records, consumer, container, this.topic())
}
}
}

private fun scheduleRestart(
e: Exception,
records: List<ConsumerRecord<*, *>>,
consumer: Consumer<*, *>,
container: MessageListenerContainer,
topic: String,
) {
val now = System.currentTimeMillis()
if (now - lastError.getAndSet(now) > COUNTER_RESET_TIME) {
counter.set(0)
}
val numErrors = counter.incrementAndGet()
val delayTime = if (numErrors > SLOW_ERROR_COUNT) LONG_TIME else SHORT_TIME * numErrors
taskScheduler.schedule(
{
try {
logger.warn("Starter kafka container for {}", topic)
container.start()
} catch (exception: Exception) {
logger.error("Feil oppstod ved venting og oppstart av kafka container", exception)
}
},
Instant.ofEpochMilli(now + delayTime),
)
logger.warn("Stopper kafka container for {} i {}", topic, Duration.ofMillis(delayTime).toString())
super.handleRemaining(
Exception("Sjekk securelogs for mer info - ${e::class.java.simpleName}"),
records,
consumer,
container,
)
}

companion object {
private val LONG_TIME = Duration.ofHours(3).toMillis()
private val SHORT_TIME = Duration.ofSeconds(20).toMillis()
private const val SLOW_ERROR_COUNT = 10
private val COUNTER_RESET_TIME = SHORT_TIME * SLOW_ERROR_COUNT * 2
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package no.nav.tilleggsstonader.libs.kafka

import io.mockk.MockKAnnotations
import io.mockk.clearAllMocks
import io.mockk.impl.annotations.MockK
import io.mockk.mockk
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.catchThrowable
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.springframework.kafka.listener.MessageListenerContainer
import org.springframework.scheduling.TaskScheduler

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class KafkaErrorHandlerTest {
@MockK(relaxed = true)
lateinit var container: MessageListenerContainer

@MockK(relaxed = true)
lateinit var consumer: Consumer<*, *>

private val taskScheduler: TaskScheduler = mockk(relaxed = true)

private val errorHandler: KafkaErrorHandler = KafkaErrorHandler(taskScheduler)

@BeforeEach
internal fun setUp() {
MockKAnnotations.init(this)
clearAllMocks()
}

@Test
fun `skal stoppe container hvis man mottar feil med en tom liste med records`() {
val throwable =
catchThrowable {
errorHandler.handleRemaining(RuntimeException("Feil i test"), emptyList(), consumer, container)
}

assertThat(throwable)
.hasStackTraceContaining("Sjekk securelogs for mer info")
.hasCauseExactlyInstanceOf(Exception::class.java)
assertThat(throwable.stackTraceToString()).doesNotContain("Feil i test")
}

@Test
fun `skal stoppe container hvis man mottar feil med en liste med records`() {
val consumerRecord = ConsumerRecord("topic", 1, 1, 1, "record")
val throwable =
catchThrowable {
errorHandler.handleRemaining(
RuntimeException("Feil i test"),
listOf(consumerRecord),
consumer,
container,
)
}
assertThat(throwable)
.hasStackTraceContaining("Sjekk securelogs for mer info")
.hasCauseExactlyInstanceOf(Exception::class.java)
assertThat(throwable.stackTraceToString()).doesNotContain("Feil i test")
}
}
1 change: 1 addition & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ plugins {

include("http-client")
include("log")
include("kafka")
include("sikkerhet")
include("util")
include("unleash")
Expand Down

0 comments on commit 91115fd

Please sign in to comment.