Skip to content

Commit

Permalink
Add example for using polyflow and kafka - wip
Browse files Browse the repository at this point in the history
  • Loading branch information
p-wunderlich committed Oct 10, 2023
1 parent c21e80b commit ee962b9
Show file tree
Hide file tree
Showing 27 changed files with 2,604 additions and 51 deletions.
65 changes: 65 additions & 0 deletions scenarios/distributed-kafka/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Use this only in dev environments. It's not intended for production usage.
version: '3.9'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- '22181:2181'

kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- '9092:9092'
- '29092:29092'
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

init-kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- kafka
entrypoint: [ '/bin/bash', '-c' ]
command: |
"
# blocks until kafka is reachable
echo -e 'Currently available topics:'
kafka-topics --bootstrap-server kafka:9092 --list
echo -e 'Creating kafka topics...'
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic polyflow-task --replication-factor 1 --partitions 1
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic polyflow-data --replication-factor 1 --partitions 1
echo -e 'Resulting topics:'
kafka-topics --bootstrap-server kafka:9092 --list
"
postgres-engine:
image: postgres:13.2
container_name: postgres-engine
environment:
POSTGRES_USER: polyflow_user
POSTGRES_PASSWORD: S3Cr3T!
POSTGRES_DB: enginedb
ports:
- '25433:5432'

postgres-tasklist:
image: postgres:13.2
container_name: postgres-tasklist
environment:
POSTGRES_USER: polyflow_user
POSTGRES_PASSWORD: S3Cr3T!
POSTGRES_DB: tasklistdb
ports:
- '25432:5432'

8 changes: 4 additions & 4 deletions scenarios/distributed-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@

<dependencyManagement>
<dependencies>

<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-server-connector</artifactId>
<version>4.6.7</version>
<scope>runtime</scope>
<groupId>org.axonframework.extensions.kafka</groupId>
<artifactId>axon-kafka-spring-boot-starter</artifactId>
<version>4.6.0</version>
</dependency>

</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,26 @@
<groupId>io.holunda.polyflow</groupId>
<artifactId>polyflow-datapool-core</artifactId>
</dependency>
<dependency>
<groupId>io.holunda.polyflow</groupId>
<artifactId>polyflow-bus-jackson</artifactId>
</dependency>

<dependency>
<groupId>io.holunda.polyflow</groupId>
<artifactId>polyflow-camunda-bpm-taskpool-job-sender</artifactId>
</dependency>

<!-- Kafka -->
<dependency>
<groupId>org.axonframework.extensions.kafka</groupId>
<artifactId>axon-kafka-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>

<!-- DB and Flyway -->
<dependency>
<groupId>org.postgresql</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,26 @@
package io.holunda.polyflow.example.process.approval

import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.thoughtworks.xstream.XStream
import com.thoughtworks.xstream.security.AnyTypePermission
import io.holunda.polyflow.bus.jackson.config.FallbackPayloadObjectMapperAutoConfiguration.Companion.PAYLOAD_OBJECT_MAPPER
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import io.holunda.polyflow.bus.jackson.ObjectMapperConfigurationHelper
import io.holunda.polyflow.bus.jackson.config.FallbackPayloadObjectMapperAutoConfiguration
import io.holunda.polyflow.bus.jackson.configurePolyflowJacksonObjectMapper
import io.holunda.polyflow.datapool.core.EnablePolyflowDataPool
import io.holunda.polyflow.example.process.approval.RequestApprovalProcessConfiguration
import io.holunda.polyflow.taskpool.core.EnablePolyflowTaskPool
import org.axonframework.commandhandling.CommandBus
import org.axonframework.commandhandling.gateway.CommandGateway
import org.axonframework.commandhandling.gateway.DefaultCommandGateway
import org.axonframework.eventhandling.deadletter.jpa.DeadLetterEventEntry
import org.axonframework.eventhandling.tokenstore.jpa.TokenEntry
import org.axonframework.eventsourcing.eventstore.jpa.DomainEventEntry
import org.axonframework.eventsourcing.eventstore.jpa.SnapshotEventEntry
import org.axonframework.modelling.saga.repository.jpa.SagaEntry
import org.axonframework.serialization.xml.CompactDriver
import org.axonframework.springboot.util.ConditionalOnMissingQualifiedBean
import org.axonframework.springboot.util.XStreamSecurityTypeUtility
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.boot.SpringApplication
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.autoconfigure.domain.EntityScan
import org.springframework.context.ApplicationContext
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Import
import org.springframework.context.annotation.Primary
Expand All @@ -51,35 +47,28 @@ fun main(args: Array<String>) {
basePackageClasses = [
TokenEntry::class,
SagaEntry::class,
DeadLetterEventEntry::class
DeadLetterEventEntry::class,
DomainEventEntry::class,
SnapshotEventEntry::class
]
)
class ExampleProcessApplicationLocalPolyflowDistributedWithKafka {

@Qualifier(FallbackPayloadObjectMapperAutoConfiguration.PAYLOAD_OBJECT_MAPPER)
@Bean
fun objectMapper(): ObjectMapper {
return jacksonObjectMapper()
@Primary
fun objectMapper(): ObjectMapper =
jacksonObjectMapper()
.registerModule(JavaTimeModule())
.configurePolyflowJacksonObjectMapper()
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
}

@Bean("defaultAxonXStream")
@ConditionalOnMissingBean
fun defaultAxonXStream(applicationContext: ApplicationContext): XStream {
val xStream = XStream(CompactDriver())
xStream.allowTypesByWildcard(XStreamSecurityTypeUtility.autoConfigBasePackages(applicationContext))
// This configures XStream to permit any class to be deserialized.
// FIXME: We might want to make this more restrictive to improve security
xStream.addPermission(AnyTypePermission.ANY)
return xStream
}

/*
@Bean
@Primary
@ConditionalOnMissingQualifiedBean(beanClass = CommandGateway::class, qualifier = "unqualified")
fun defaultCommandGateway(bus: CommandBus): CommandGateway = DefaultCommandGateway.builder().commandBus(bus).build()
*/
@Bean("defaultAxonObjectMapper")
@Qualifier("defaultAxonObjectMapper")
fun defaultAxonObjectMapper(): ObjectMapper =
jacksonObjectMapper()
.registerModule(JavaTimeModule())
.configurePolyflowJacksonObjectMapper()
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.holunda.polyflow.example.process.approval.kafka

import javax.validation.constraints.NotNull

/**
* Router to decide where to publish events to.
*/
fun interface KafkaTopicRouter {
/**
* Retrieves the topic name for given payload type.
*
* @param payloadType payload type.
* @return topic or null, if the event should be dropped.
*/
fun topicForPayloadType(payloadType: @NotNull Class<*>): String?
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package io.holunda.polyflow.example.process.approval.kafka

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.ProducerRecord
import org.axonframework.common.AxonConfigurationException
import org.axonframework.config.EventProcessingConfigurer
import org.axonframework.eventhandling.EventMessage
import org.axonframework.eventhandling.PropagatingErrorHandler
import org.axonframework.extensions.kafka.KafkaProperties
import org.axonframework.extensions.kafka.autoconfig.KafkaAutoConfiguration
import org.axonframework.extensions.kafka.eventhandling.DefaultKafkaMessageConverter
import org.axonframework.extensions.kafka.eventhandling.KafkaMessageConverter
import org.axonframework.extensions.kafka.eventhandling.producer.KafkaEventPublisher
import org.axonframework.extensions.kafka.eventhandling.producer.KafkaPublisher
import org.axonframework.extensions.kafka.eventhandling.producer.ProducerFactory
import org.axonframework.serialization.Serializer
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.boot.autoconfigure.AutoConfigureBefore
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Primary
import java.util.*

/**
* Configure to send polyflow events only if kafka is not disabled (is enabled).
*/
@Configuration
@AutoConfigureBefore(
KafkaAutoConfiguration::class
) // we should run before Axon Kafka autoconfiguration
@EnableConfigurationProperties(PolyflowAxonKafkaProperties::class)
class PolyflowAxonKafkaConfiguration {
@ConditionalOnMissingBean
@Bean
fun kafkaTopicRouter(properties: PolyflowAxonKafkaProperties): KafkaTopicRouter {
return KafkaTopicRouter { payloadType ->
properties.topics.firstOrNull { it.payloadType.isAssignableFrom(payloadType) }?.topic
}
}

@Bean
@Primary
fun routingKafkaMessageConverter(
@Qualifier("eventSerializer") eventSerializer: Serializer,
kafkaTopicRouter: KafkaTopicRouter
): KafkaMessageConverter<String, ByteArray> {
val defaultConverter: KafkaMessageConverter<String, ByteArray> =
DefaultKafkaMessageConverter.builder().serializer(eventSerializer).build()
return object : KafkaMessageConverter<String, ByteArray> {
override fun createKafkaMessage(
eventMessage: EventMessage<*>,
topic: String
): ProducerRecord<String, ByteArray> {
val topicOverride = kafkaTopicRouter.topicForPayloadType(eventMessage.getPayloadType())
return defaultConverter.createKafkaMessage(eventMessage, topicOverride ?: topic)
}

override fun readKafkaMessage(consumerRecord: ConsumerRecord<String, ByteArray>): Optional<EventMessage<*>> {
return defaultConverter.readKafkaMessage(consumerRecord)
}
}
}

/**
* Configures a KafkaEventPublisher that sends events to Kafka only if they are routed via kafka event router.
*
* @see KafkaAutoConfiguration.kafkaEventPublisher
*/
@Bean
fun routingKafkaEventPublisher(
kafkaPublisher: KafkaPublisher<String, ByteArray>,
kafkaProperties: KafkaProperties,
eventProcessingConfigurer: EventProcessingConfigurer,
kafkaTopicRouter: KafkaTopicRouter
): KafkaEventPublisher<String, ByteArray> {
val kafkaEventPublisher: KafkaEventPublisher<String, ByteArray> =
RoutingKafkaEventPublisher.builder<String, ByteArray>()
.kafkaPublisher(kafkaPublisher)
.kafkaTopicRouter(kafkaTopicRouter)
.build()

/*
* Register an invocation error handler which re-throws any exception.
* This will ensure a TrackingEventProcessor to enter the error mode which will retry, and it will ensure the
* SubscribingEventProcessor to bubble the exception to the callee. For more information see
* https://docs.axoniq.io/reference-guide/configuring-infrastructure-components/event-processing/event-processors#error-handling
*/
// TODO: Check if this still works. Our publisher is no longer in the default processing group, I think.
eventProcessingConfigurer.registerEventHandler { kafkaEventPublisher }
.registerListenerInvocationErrorHandler(
KafkaEventPublisher.DEFAULT_PROCESSING_GROUP
) { PropagatingErrorHandler.instance() }
.assignHandlerTypesMatching(
KafkaEventPublisher.DEFAULT_PROCESSING_GROUP
) { clazz: Class<*> ->
clazz.isAssignableFrom(
KafkaEventPublisher::class.java
)
}
when (val processorMode: KafkaProperties.EventProcessorMode = kafkaProperties.producer.eventProcessorMode) {
KafkaProperties.EventProcessorMode.SUBSCRIBING -> eventProcessingConfigurer.registerSubscribingEventProcessor(KafkaEventPublisher.DEFAULT_PROCESSING_GROUP)
KafkaProperties.EventProcessorMode.TRACKING -> eventProcessingConfigurer.registerTrackingEventProcessor(KafkaEventPublisher.DEFAULT_PROCESSING_GROUP)
KafkaProperties.EventProcessorMode.POOLED_STREAMING -> eventProcessingConfigurer.registerPooledStreamingEventProcessor(KafkaEventPublisher.DEFAULT_PROCESSING_GROUP)
else -> throw AxonConfigurationException("Unknown Event Processor Mode [$processorMode] detected")
}

return kafkaEventPublisher
}

// We need to duplicate the bean factory from KafkaAutoConfiguration because there is no way to set `publisherAckTimeout` via configuration properties
@Bean(destroyMethod = "shutDown")
fun kafkaAcknowledgingPublisher(
kafkaProducerFactory: ProducerFactory<String, ByteArray>,
kafkaMessageConverter: KafkaMessageConverter<String, ByteArray>,
configuration: org.axonframework.config.Configuration,
properties: KafkaProperties,
serializer: Serializer
): KafkaPublisher<String, ByteArray> {
return KafkaPublisher
.builder<String, ByteArray>()
.producerFactory(kafkaProducerFactory)
.messageConverter(kafkaMessageConverter)
.messageMonitor(configuration.messageMonitor(KafkaPublisher::class.java, "kafkaPublisher"))
.topicResolver { Optional.of(properties.defaultTopic) }
.serializer(serializer)
.publisherAckTimeout(
properties.producer.properties.getOrDefault("delivery.timeout.ms", "30000").toLong() + 1000
)
.build()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.holunda.polyflow.example.process.approval.kafka

import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.boot.context.properties.ConstructorBinding
import org.springframework.boot.context.properties.NestedConfigurationProperty

@ConfigurationProperties(prefix = "polyflow.axon.kafka")
@ConstructorBinding
data class PolyflowAxonKafkaProperties(
/**
* List of mappings of payload class to kafka topic name that payload of this class should be directed to.
*/
@NestedConfigurationProperty
val topics: List<PayloadTypeToTopic>
) {
@ConstructorBinding
data class PayloadTypeToTopic(
val payloadType: Class<*>,
val topic: String
)
}
Loading

0 comments on commit ee962b9

Please sign in to comment.