Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
p-wunderlich committed Oct 10, 2023
1 parent ee962b9 commit ef9bbea
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.holunda.polyflow.view.query.process.ProcessInstancesByStateQuery
import mu.KLogging
import org.axonframework.queryhandling.QueryGateway
import org.springframework.boot.ApplicationRunner
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

Expand Down Expand Up @@ -47,6 +48,7 @@ class SystemInfoPrinter(
}
}

@ConditionalOnProperty(value = ["axon.axonserver.enabled"], havingValue = "true", matchIfMissing = false)
@Bean
fun processInstancePrinter(): ApplicationRunner {
return ApplicationRunner {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.holunda.polyflow.example.process.approval.infrastructure

import org.hibernate.dialect.PostgreSQL94Dialect
import org.hibernate.type.descriptor.sql.BinaryTypeDescriptor
import org.hibernate.type.descriptor.sql.SqlTypeDescriptor
import java.sql.Types

class NoToastPostgresSQLDialect : PostgreSQL94Dialect() {
init {
this.registerColumnType(Types.BLOB, "BYTEA")
}

override fun remapSqlTypeDescriptor(sqlTypeDescriptor: SqlTypeDescriptor): SqlTypeDescriptor {
return if (sqlTypeDescriptor.sqlType == Types.BLOB) {
BinaryTypeDescriptor.INSTANCE
} else super.remapSqlTypeDescriptor(sqlTypeDescriptor)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ spring:
hibernate.ddl-auto: validate
show-sql: false
open-in-view: false
database-platform: io.holunda.polyflow.example.infrastructure.jpa.NoToastPostgresSQLDialect
database-platform: io.holunda.polyflow.example.process.approval.infrastructure.NoToastPostgresSQLDialect
flyway:
enabled: true
locations: "classpath:db/migrations"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.holunda.polyflow.example.process.platform.infrastructure

import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.boot.context.properties.ConstructorBinding

@ConfigurationProperties(prefix = "polyflow.axon.kafka")
@ConstructorBinding
data class AxonKafkaExtendedProperties(
val enabled: Boolean = true,
val topicTasks: String,
val topicDataEntries: String
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package io.holunda.polyflow.example.process.platform.infrastructure

import org.axonframework.extensions.kafka.KafkaProperties
import org.axonframework.extensions.kafka.eventhandling.KafkaMessageConverter
import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerFactory
import org.axonframework.extensions.kafka.eventhandling.consumer.DefaultConsumerFactory
import org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.KafkaEventMessage
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.SortedKafkaMessageBuffer
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource
import org.axonframework.serialization.Serializer
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration
@ConditionalOnProperty(value = ["polyflow.axon.kafka.enabled"], havingValue = "true", matchIfMissing = true)
@EnableConfigurationProperties(
AxonKafkaExtendedProperties::class
)
class AxonKafkaIngressConfiguration(
@Value("\${HOSTNAME:localhost}") val hostname: String
) {
/**
* Consumer factory for tasks.
*
* @param properties kafka properties
* @return consumer factory.
*/
@Bean
@Qualifier("polyflowTask")
fun kafkaConsumerFactoryPolyflowTask(properties: KafkaProperties): ConsumerFactory<String, ByteArray> {
properties.clientId = "polyflow-task-$hostname"
return DefaultConsumerFactory<String, ByteArray>(properties.buildConsumerProperties())
}

/**
* Consumer factory for data entries.
*
* @param properties kafka properties
* @return consumer factory.
*/
@Bean
@Qualifier("polyflowData")
fun kafkaConsumerFactoryPolyflowData(properties: KafkaProperties): ConsumerFactory<String, ByteArray> {
properties.clientId = "polyflow-data-$hostname"
return DefaultConsumerFactory<String, ByteArray>(properties.buildConsumerProperties())
}

/**
* Creates a streamable kafka message source.
* The name of this bean is referenced in the application.yaml.
*
* @param kafkaProperties standard kafka properties.
* @param extendedProperties extended properties for Polyflow.
* @param kafkaConsumerFactory consumer factory.
* @param kafkaFetcher fetcher instance.
* @param serializer serializer.
* @param meterRegistry meter registry.
* @return streaming source.
*/
@Bean("kafkaMessageSourcePolyflowData")
@ConditionalOnProperty(value = ["axon.kafka.consumer.event-processor-mode"], havingValue = "TRACKING")
fun kafkaMessageSourcePolyflowData(
kafkaProperties: KafkaProperties,
extendedProperties: AxonKafkaExtendedProperties,
@Qualifier("polyflowData") kafkaConsumerFactory: ConsumerFactory<String, ByteArray>,
kafkaFetcher: Fetcher<String, ByteArray, KafkaEventMessage>,
@Qualifier("eventSerializer") serializer: Serializer,
messageConverter: KafkaMessageConverter<String, ByteArray>
): StreamableKafkaMessageSource<String, ByteArray> {
return StreamableKafkaMessageSource
.builder<String, ByteArray>()
.topics(listOf(extendedProperties.topicDataEntries))
.consumerFactory(kafkaConsumerFactory)
.serializer(serializer)
.fetcher(kafkaFetcher)
.messageConverter(messageConverter)
.bufferFactory {
SortedKafkaMessageBuffer(
kafkaProperties.fetcher.bufferSize
)
}
.build()
}

/**
* Creates a streamable kafka message source.
* The name of this bean is referenced in the application.yaml.
*
* @param kafkaProperties standard kafka properties.
* @param extendedProperties extended properties for Polyflow.
* @param kafkaConsumerFactory consumer factory.
* @param kafkaFetcher fetcher instance.
* @param serializer serializer.
* @param meterRegistry meter registry.
* @return streaming source.
*/
@Bean("kafkaMessageSourcePolyflowTask")
@ConditionalOnProperty(value = ["axon.kafka.consumer.event-processor-mode"], havingValue = "TRACKING")
fun kafkaMessageSourcePolyflowTask(
kafkaProperties: KafkaProperties,
extendedProperties: AxonKafkaExtendedProperties,
@Qualifier("polyflowTask") kafkaConsumerFactory: ConsumerFactory<String, ByteArray>,
kafkaFetcher: Fetcher<String?, ByteArray, KafkaEventMessage>,
@Qualifier("eventSerializer") serializer: Serializer,
messageConverter: KafkaMessageConverter<String, ByteArray>
): StreamableKafkaMessageSource<String, ByteArray> {
return StreamableKafkaMessageSource
.builder<String, ByteArray>()
.topics(listOf(extendedProperties.topicTasks))
.consumerFactory(kafkaConsumerFactory)
.serializer(serializer)
.fetcher(kafkaFetcher)
.messageConverter(messageConverter)
.bufferFactory {
SortedKafkaMessageBuffer(
kafkaProperties.fetcher.bufferSize
)
}
.build()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.holunda.polyflow.example.process.platform.infrastructure

import org.hibernate.dialect.PostgreSQL94Dialect
import org.hibernate.type.descriptor.sql.BinaryTypeDescriptor
import org.hibernate.type.descriptor.sql.SqlTypeDescriptor
import java.sql.Types

class NoToastPostgresSQLDialect : PostgreSQL94Dialect() {
init {
this.registerColumnType(Types.BLOB, "BYTEA")
}

override fun remapSqlTypeDescriptor(sqlTypeDescriptor: SqlTypeDescriptor): SqlTypeDescriptor {
return if (sqlTypeDescriptor.sqlType == Types.BLOB) {
BinaryTypeDescriptor.INSTANCE
} else super.remapSqlTypeDescriptor(sqlTypeDescriptor)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ spring:
hibernate.ddl-auto: validate
show-sql: false
open-in-view: false
database-platform: io.holunda.polyflow.example.infrastructure.NoToastPostgresSQLDialect
database-platform: io.holunda.polyflow.example.process.platform.infrastructure.NoToastPostgresSQLDialect
flyway:
enabled: true
locations: "classpath:db/migrations"
Expand Down

0 comments on commit ef9bbea

Please sign in to comment.