Skip to content

Commit

Permalink
add micrometer metrics to kafka streams
Browse files Browse the repository at this point in the history
  • Loading branch information
rtc11 committed Apr 22, 2022
1 parent 17040d9 commit a7ee688
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 8 deletions.
3 changes: 2 additions & 1 deletion kafka-test/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
dependencies {
api("org.apache.kafka:kafka-streams-test-utils:3.1.0")
implementation(kotlin("test"))
implementation("io.micrometer:micrometer-registry-prometheus:1.8.5")
implementation(project(":kafka"))
implementation(kotlin("test"))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.micrometer.core.instrument.binder.kafka

import io.micrometer.core.instrument.MeterRegistry
import org.apache.kafka.common.Metric
import org.apache.kafka.common.MetricName


/**
* KafkaMetrics is package private.
* KafkaStreams and TestTopologyDriver does not share interface,
* this is a workaround for testing kafka streams metrics
*/
class KtorKafkaMetrics(
registry: MeterRegistry,
metrics: () -> Map<MetricName, Metric>,
) {
init {
val binder = KafkaMetrics(metrics)
binder.bindTo(registry)
}
}
9 changes: 6 additions & 3 deletions kafka-test/main/no/nav/aap/kafka/streams/test/KStreamsMock.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package no.nav.aap.kafka.streams.test

import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.binder.kafka.KtorKafkaMetrics
import no.nav.aap.kafka.KafkaConfig
import no.nav.aap.kafka.plus
import no.nav.aap.kafka.streams.Kafka
Expand All @@ -11,9 +13,10 @@ import java.util.*
class KStreamsMock : Kafka {
lateinit var streams: TopologyTestDriver

override fun start(kafkaConfig: KafkaConfig, streamsBuilder: StreamsBuilder.() -> Unit) {
val topology = StreamsBuilder().apply(streamsBuilder).build()
streams = TopologyTestDriver(topology, kafkaConfig.consumer + kafkaConfig.producer + testConfig)
override fun start(config: KafkaConfig, registry: MeterRegistry, builder: StreamsBuilder.() -> Unit) {
val topology = StreamsBuilder().apply(builder).build()
KtorKafkaMetrics(registry, streams::metrics)
streams = TopologyTestDriver(topology, config.consumer + config.producer + testConfig)
}

inline fun <reified V : Any> inputTopic(topic: Topic<V>): TestInputTopic<String, V> =
Expand Down
1 change: 1 addition & 0 deletions kafka/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ dependencies {
implementation("ch.qos.logback:logback-classic:1.2.11")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.13.2")
implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.13.2")
implementation("io.micrometer:micrometer-registry-prometheus:1.8.5")

api("org.apache.kafka:kafka-clients:3.1.0")
api("org.apache.kafka:kafka-streams:3.1.0")
Expand Down
12 changes: 8 additions & 4 deletions kafka/main/no/nav/aap/kafka/streams/KStreams.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package no.nav.aap.kafka.streams

import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.binder.kafka.KafkaStreamsMetrics
import no.nav.aap.kafka.KafkaConfig
import no.nav.aap.kafka.ProcessingExceptionHandler
import no.nav.aap.kafka.plus
Expand All @@ -22,7 +24,7 @@ typealias Store<V> = ReadOnlyKeyValueStore<String, V>
private val secureLog = LoggerFactory.getLogger("secureLog")

interface Kafka : AutoCloseable {
fun start(kafkaConfig: KafkaConfig, streamsBuilder: StreamsBuilder.() -> Unit)
fun start(config: KafkaConfig, registry: MeterRegistry, builder: StreamsBuilder.() -> Unit)
fun isReady(): Boolean
fun isLive(): Boolean
fun <V> getStore(name: String): Store<V>
Expand All @@ -32,11 +34,12 @@ object KStreams : Kafka {
private lateinit var streams: KafkaStreams
private var isInitiallyStarted: Boolean = false

override fun start(kafkaConfig: KafkaConfig, streamsBuilder: StreamsBuilder.() -> Unit) {
val topology = StreamsBuilder().apply(streamsBuilder).build()
streams = KafkaStreams(topology, kafkaConfig.consumer + kafkaConfig.producer).apply {
override fun start(config: KafkaConfig, registry: MeterRegistry, builder: StreamsBuilder.() -> Unit) {
val topology = StreamsBuilder().apply(builder).build()
streams = KafkaStreams(topology, config.consumer + config.producer).apply {
setUncaughtExceptionHandler(ProcessingExceptionHandler())
setStateListener { state, _ -> if (state == RUNNING) isInitiallyStarted = true }
KafkaStreamsMetrics(this).bindTo(registry)
start()
}
}
Expand All @@ -60,3 +63,4 @@ fun <V> StreamsBuilder.consume(topic: Topic<V>): KStream<String, V?> =

fun <V> StreamsBuilder.globalTable(table: Table<V>): GlobalKTable<String, V> =
globalTable(table.source.name, table.source.consumed("${table.name}-as-globaltable"))

0 comments on commit a7ee688

Please sign in to comment.