Skip to content

Commit

Permalink
add kafka factory to streams implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
rtc11 committed Apr 25, 2022
1 parent c18ac70 commit 8107428
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@ package no.nav.aap.kafka.streams.test
import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.binder.kafka.KtorKafkaMetrics
import no.nav.aap.kafka.KFactory
import no.nav.aap.kafka.KafkaConfig
import no.nav.aap.kafka.plus
import no.nav.aap.kafka.streams.Kafka
import no.nav.aap.kafka.streams.KStreams
import no.nav.aap.kafka.streams.Store
import no.nav.aap.kafka.streams.Topic
import org.apache.kafka.clients.consumer.MockConsumer
import org.apache.kafka.clients.consumer.OffsetResetStrategy.EARLIEST
import org.apache.kafka.clients.producer.MockProducer
import org.apache.kafka.streams.*
import java.util.*

class KStreamsMock : Kafka {
class KafkaStreamsMock : KFactory, KStreams {
lateinit var streams: TopologyTestDriver
var schemaRegistryUrl: String? = null

Expand All @@ -32,6 +36,9 @@ class KStreamsMock : Kafka {
override fun isLive() = true
override fun <V> getStore(name: String): Store<V> = streams.getKeyValueStore(name)

override fun <V : Any> createConsumer(config: KafkaConfig, topic: Topic<V>) = MockConsumer<String, V>(EARLIEST)
override fun <V : Any> createProducer(config: KafkaConfig, topic: Topic<V>) = MockProducer<String, V>()

override fun close() {
streams.close()
schemaRegistryUrl?.let { MockSchemaRegistry.dropScope(it) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class KStreamsMockTest {

@Test
fun `streams without schema registry`() {
val kafka = KStreamsMock()
val kafka = KafkaStreamsMock()
val registry = SimpleMeterRegistry()
val config = KafkaConfig(
"app",
Expand All @@ -32,7 +32,7 @@ class KStreamsMockTest {

@Test
fun `streams with schema registry`() {
val kafka = KStreamsMock()
val kafka = KafkaStreamsMock()
val registry = SimpleMeterRegistry()
val schemaUrl = "mock://schema-reg"
val config = KafkaConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,15 @@ import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.Producer

interface KafkaFacade {
fun <V : Any> createConsumer(config: KafkaConfig, topic: Topic<V>): Consumer<String, V>
fun <V : Any> createProducer(config: KafkaConfig, topic: Topic<V>): Producer<String, V>
}

object KafkaFactory : KafkaFacade {
override fun <V : Any> createConsumer(config: KafkaConfig, topic: Topic<V>): Consumer<String, V> = KafkaConsumer(
interface KFactory {
fun <V : Any> createConsumer(config: KafkaConfig, topic: Topic<V>): Consumer<String, V> = KafkaConsumer(
config.consumer + mapOf(
CommonClientConfigs.CLIENT_ID_CONFIG to "client-${topic.name}",
ConsumerConfig.GROUP_ID_CONFIG to "${topic.name}-1",
), topic.keySerde.deserializer(), topic.valueSerde.deserializer()
)

override fun <V : Any> createProducer(config: KafkaConfig, topic: Topic<V>): Producer<String, V> = KafkaProducer(
fun <V : Any> createProducer(config: KafkaConfig, topic: Topic<V>): Producer<String, V> = KafkaProducer(
config.producer + mapOf(CommonClientConfigs.CLIENT_ID_CONFIG to "client-${topic.name}"),
topic.keySerde.serializer(),
topic.valueSerde.serializer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@ package no.nav.aap.kafka.streams

import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.binder.kafka.KafkaStreamsMetrics
import no.nav.aap.kafka.KFactory
import no.nav.aap.kafka.KafkaConfig
import no.nav.aap.kafka.ProcessingExceptionHandler
import no.nav.aap.kafka.plus
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.common.utils.Bytes
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.KafkaStreams.State.*
import org.apache.kafka.streams.StoreQueryParameters.fromNameAndType
import org.apache.kafka.streams.StreamsBuilder
Expand All @@ -18,25 +24,26 @@ import org.apache.kafka.streams.state.KeyValueStore
import org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore
import org.slf4j.LoggerFactory
import org.apache.kafka.streams.KafkaStreams as ApacheKafkaStreams

typealias Store<V> = ReadOnlyKeyValueStore<String, V>

private val secureLog = LoggerFactory.getLogger("secureLog")

interface Kafka : AutoCloseable {
interface KStreams : AutoCloseable {
fun start(config: KafkaConfig, registry: MeterRegistry, builder: StreamsBuilder.() -> Unit)
fun isReady(): Boolean
fun isLive(): Boolean
fun <V> getStore(name: String): Store<V>
}

object KStreams : Kafka {
private lateinit var streams: KafkaStreams
object KafkaStreams : KFactory, KStreams {
private lateinit var streams: ApacheKafkaStreams
private var isInitiallyStarted: Boolean = false

override fun start(config: KafkaConfig, registry: MeterRegistry, builder: StreamsBuilder.() -> Unit) {
val topology = StreamsBuilder().apply(builder).build()
streams = KafkaStreams(topology, config.consumer + config.producer).apply {
streams = ApacheKafkaStreams(topology, config.consumer + config.producer).apply {
setUncaughtExceptionHandler(ProcessingExceptionHandler())
setStateListener { state, _ -> if (state == RUNNING) isInitiallyStarted = true }
KafkaStreamsMetrics(this).bindTo(registry)
Expand Down
18 changes: 0 additions & 18 deletions kafka/test/no/nav/aap/kafka/KafkaFactoryMock.kt

This file was deleted.

5 changes: 3 additions & 2 deletions kafka/test/no/nav/aap/kafka/KafkaFactoryTest.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package no.nav.aap.kafka

import no.nav.aap.kafka.serde.json.JsonSerde
import no.nav.aap.kafka.streams.KafkaStreams
import no.nav.aap.kafka.streams.Topic
import org.junit.Test
import kotlin.test.assertEquals
Expand All @@ -10,14 +11,14 @@ class KafkaFactoryTest {
@Test
fun consumer() {
val config = defaultKafkaTestConfig.copy(credstorePsw = "")
val consumer = KafkaFactory.createConsumer(config, Topic("topic", JsonSerde.jackson()))
val consumer = KafkaStreams.createConsumer(config, Topic("topic", JsonSerde.jackson()))
val groupId = consumer.groupMetadata().groupId()
assertEquals("topic-1", groupId)
}

@Test
fun producer() {
val config = defaultKafkaTestConfig.copy(credstorePsw = "")
KafkaFactory.createProducer(config, Topic("topic", JsonSerde.jackson()))
KafkaStreams.createProducer(config, Topic("topic", JsonSerde.jackson()))
}
}

0 comments on commit 8107428

Please sign in to comment.