Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/kafka example #236

Merged
merged 3 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.holunda.polyflow.view.query.process.ProcessInstancesByStateQuery
import mu.KLogging
import org.axonframework.queryhandling.QueryGateway
import org.springframework.boot.ApplicationRunner
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

Expand Down Expand Up @@ -47,6 +48,7 @@ class SystemInfoPrinter(
}
}

@ConditionalOnProperty(value = ["axon.axonserver.enabled"], havingValue = "true", matchIfMissing = false)
@Bean
fun processInstancePrinter(): ApplicationRunner {
return ApplicationRunner {
Expand Down
65 changes: 65 additions & 0 deletions scenarios/distributed-kafka/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Use this only in dev environments. It's not intended for production usage.
version: '3.9'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- '22181:2181'

kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- '9092:9092'
- '29092:29092'
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

init-kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- kafka
entrypoint: [ '/bin/bash', '-c' ]
command: |
"
# blocks until kafka is reachable
echo -e 'Currently available topics:'
kafka-topics --bootstrap-server kafka:9092 --list

echo -e 'Creating kafka topics...'
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic polyflow-task --replication-factor 1 --partitions 1
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic polyflow-data --replication-factor 1 --partitions 1

echo -e 'Resulting topics:'
kafka-topics --bootstrap-server kafka:9092 --list
"

postgres-engine:
image: postgres:13.2
container_name: postgres-engine
environment:
POSTGRES_USER: polyflow_user
POSTGRES_PASSWORD: S3Cr3T!
POSTGRES_DB: enginedb
ports:
- '25433:5432'

postgres-tasklist:
image: postgres:13.2
container_name: postgres-tasklist
environment:
POSTGRES_USER: polyflow_user
POSTGRES_PASSWORD: S3Cr3T!
POSTGRES_DB: tasklistdb
ports:
- '25432:5432'

38 changes: 38 additions & 0 deletions scenarios/distributed-kafka/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.holunda.polyflow</groupId>
<artifactId>polyflow-example-scenario-root</artifactId>
<version>3.8.3-SNAPSHOT</version>
</parent>

<artifactId>polyflow-example-scenario-distributed-kafka</artifactId>
<name>POM: examples/${project.artifactId}</name>
<packaging>pom</packaging>

<properties>
<maven.deploy.skip>true</maven.deploy.skip>
</properties>

<modules>
<module>process-application-local-polyflow</module>
<module>process-platform-view-only</module>
</modules>

<dependencyManagement>
<dependencies>

<dependency>
<groupId>org.axonframework.extensions.kafka</groupId>
<artifactId>axon-kafka-spring-boot-starter</artifactId>
<version>4.6.0</version>
</dependency>

</dependencies>

</dependencyManagement>

</project>
144 changes: 144 additions & 0 deletions scenarios/distributed-kafka/process-application-local-polyflow/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.holunda.polyflow</groupId>
<artifactId>polyflow-example-scenario-distributed-kafka</artifactId>
<version>3.8.3-SNAPSHOT</version>
</parent>

<artifactId>example-distributed-kafka-process-application-local-polyflow</artifactId>
<name>examples/${project.artifactId}</name>

<properties>
<maven.deploy.skip>true</maven.deploy.skip>
</properties>

<dependencies>

<dependency>
<groupId>io.holunda.polyflow</groupId>
<artifactId>polyflow-example-approval-backend</artifactId>
</dependency>
<dependency>
<groupId>io.holunda.polyflow</groupId>
<artifactId>polyflow-taskpool-core</artifactId>
</dependency>
<dependency>
<groupId>io.holunda.polyflow</groupId>
<artifactId>polyflow-datapool-core</artifactId>
</dependency>
<dependency>
<groupId>io.holunda.polyflow</groupId>
<artifactId>polyflow-bus-jackson</artifactId>
</dependency>

<dependency>
<groupId>io.holunda.polyflow</groupId>
<artifactId>polyflow-camunda-bpm-taskpool-job-sender</artifactId>
</dependency>

<!-- Kafka -->
<dependency>
<groupId>org.axonframework.extensions.kafka</groupId>
<artifactId>axon-kafka-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>

<!-- DB and Flyway -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-core</artifactId>
</dependency>

<dependency>
<groupId>io.holunda</groupId>
<artifactId>camunda-platform-7-autologin</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<!-- kotlin compiler -->
<artifactId>kotlin-maven-plugin</artifactId>
<groupId>org.jetbrains.kotlin</groupId>
</plugin>
<!-- java compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
<plugin>
<!-- for packaging springboot application -->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<requiresUnpack>
<dependency>
<groupId>io.holunda.polyflow</groupId>
<artifactId>polyflow-example-approval-backend</artifactId>
</dependency>
</requiresUnpack>
</configuration>
</plugin>
</plugins>
</build>

<profiles>
<profile>
<id>frontend</id>
<activation>
<property>
<name>!skipFrontend</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>io.holunda.polyflow</groupId>
<artifactId>polyflow-example-approval-forms</artifactId>
</dependency>
</dependencies>
</profile>

<profile>
<id>camunda-ce</id>
<activation>
<property>
<name>!camunda-ee</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.camunda.bpm.springboot</groupId>
<artifactId>camunda-bpm-spring-boot-starter-webapp</artifactId>
</dependency>
</dependencies>
</profile>
<profile>
<id>camunda-ee</id>
<activation>
<property>
<name>camunda-ee</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.camunda.bpm.springboot</groupId>
<artifactId>camunda-bpm-spring-boot-starter-webapp-ee</artifactId>
</dependency>
</dependencies>
</profile>
</profiles>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package io.holunda.polyflow.example.process.approval

import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import io.holunda.polyflow.bus.jackson.ObjectMapperConfigurationHelper
import io.holunda.polyflow.bus.jackson.config.FallbackPayloadObjectMapperAutoConfiguration
import io.holunda.polyflow.bus.jackson.configurePolyflowJacksonObjectMapper
import io.holunda.polyflow.datapool.core.EnablePolyflowDataPool
import io.holunda.polyflow.example.process.approval.RequestApprovalProcessConfiguration
import io.holunda.polyflow.taskpool.core.EnablePolyflowTaskPool
import org.axonframework.eventhandling.deadletter.jpa.DeadLetterEventEntry
import org.axonframework.eventhandling.tokenstore.jpa.TokenEntry
import org.axonframework.eventsourcing.eventstore.jpa.DomainEventEntry
import org.axonframework.eventsourcing.eventstore.jpa.SnapshotEventEntry
import org.axonframework.modelling.saga.repository.jpa.SagaEntry
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.boot.SpringApplication
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.autoconfigure.domain.EntityScan
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Import
import org.springframework.context.annotation.Primary

/**
* Starts example application approval process.
*/
fun main(args: Array<String>) {
SpringApplication.run(ExampleProcessApplicationLocalPolyflowDistributedWithKafka::class.java, *args)
}

/**
* Process application approval only.
* Includes:
* - approval-process-backend
* - taskpool-core
* - datapool-core
*/
@SpringBootApplication
@EnablePolyflowDataPool
@EnablePolyflowTaskPool
@Import(RequestApprovalProcessConfiguration::class)
@EntityScan(
basePackageClasses = [
TokenEntry::class,
SagaEntry::class,
DeadLetterEventEntry::class,
DomainEventEntry::class,
SnapshotEventEntry::class
]
)
class ExampleProcessApplicationLocalPolyflowDistributedWithKafka {

@Qualifier(FallbackPayloadObjectMapperAutoConfiguration.PAYLOAD_OBJECT_MAPPER)
@Bean
@Primary
fun objectMapper(): ObjectMapper =
jacksonObjectMapper()
.registerModule(JavaTimeModule())
.configurePolyflowJacksonObjectMapper()
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)

@Bean("defaultAxonObjectMapper")
@Qualifier("defaultAxonObjectMapper")
fun defaultAxonObjectMapper(): ObjectMapper =
jacksonObjectMapper()
.registerModule(JavaTimeModule())
.configurePolyflowJacksonObjectMapper()
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.holunda.polyflow.example.process.approval.infrastructure

import org.hibernate.dialect.PostgreSQL94Dialect
import org.hibernate.type.descriptor.sql.BinaryTypeDescriptor
import org.hibernate.type.descriptor.sql.SqlTypeDescriptor
import java.sql.Types

class NoToastPostgresSQLDialect : PostgreSQL94Dialect() {
init {
this.registerColumnType(Types.BLOB, "BYTEA")
}

override fun remapSqlTypeDescriptor(sqlTypeDescriptor: SqlTypeDescriptor): SqlTypeDescriptor {
return if (sqlTypeDescriptor.sqlType == Types.BLOB) {
BinaryTypeDescriptor.INSTANCE
} else super.remapSqlTypeDescriptor(sqlTypeDescriptor)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.holunda.polyflow.example.process.approval.kafka

import javax.validation.constraints.NotNull

/**
* Router to decide where to publish events to.
*/
fun interface KafkaTopicRouter {
/**
* Retrieves the topic name for given payload type.
*
* @param payloadType payload type.
* @return topic or null, if the event should be dropped.
*/
fun topicForPayloadType(payloadType: @NotNull Class<*>): String?
}
Loading