diff --git a/components/approval/backend/src/main/kotlin/io/holunda/polyflow/example/process/approval/service/SystemInfoPrinter.kt b/components/approval/backend/src/main/kotlin/io/holunda/polyflow/example/process/approval/service/SystemInfoPrinter.kt index bbfae71d..449ddb8d 100644 --- a/components/approval/backend/src/main/kotlin/io/holunda/polyflow/example/process/approval/service/SystemInfoPrinter.kt +++ b/components/approval/backend/src/main/kotlin/io/holunda/polyflow/example/process/approval/service/SystemInfoPrinter.kt @@ -8,6 +8,7 @@ import io.holunda.polyflow.view.query.process.ProcessInstancesByStateQuery import mu.KLogging import org.axonframework.queryhandling.QueryGateway import org.springframework.boot.ApplicationRunner +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration @@ -47,6 +48,7 @@ class SystemInfoPrinter( } } + @ConditionalOnProperty(value = ["axon.axonserver.enabled"], havingValue = "true", matchIfMissing = false) @Bean fun processInstancePrinter(): ApplicationRunner { return ApplicationRunner { diff --git a/scenarios/distributed-kafka/process-application-local-polyflow/src/main/kotlin/io/holunda/polyflow/example/process/approval/infrastructure/NoToastPostgresSQLDialect.kt b/scenarios/distributed-kafka/process-application-local-polyflow/src/main/kotlin/io/holunda/polyflow/example/process/approval/infrastructure/NoToastPostgresSQLDialect.kt new file mode 100644 index 00000000..775297e3 --- /dev/null +++ b/scenarios/distributed-kafka/process-application-local-polyflow/src/main/kotlin/io/holunda/polyflow/example/process/approval/infrastructure/NoToastPostgresSQLDialect.kt @@ -0,0 +1,18 @@ +package io.holunda.polyflow.example.process.approval.infrastructure + +import org.hibernate.dialect.PostgreSQL94Dialect +import org.hibernate.type.descriptor.sql.BinaryTypeDescriptor +import org.hibernate.type.descriptor.sql.SqlTypeDescriptor +import java.sql.Types + +class NoToastPostgresSQLDialect : PostgreSQL94Dialect() { + init { + this.registerColumnType(Types.BLOB, "BYTEA") + } + + override fun remapSqlTypeDescriptor(sqlTypeDescriptor: SqlTypeDescriptor): SqlTypeDescriptor { + return if (sqlTypeDescriptor.sqlType == Types.BLOB) { + BinaryTypeDescriptor.INSTANCE + } else super.remapSqlTypeDescriptor(sqlTypeDescriptor) + } +} diff --git a/scenarios/distributed-kafka/process-application-local-polyflow/src/main/resources/application.yml b/scenarios/distributed-kafka/process-application-local-polyflow/src/main/resources/application.yml index 73de12e4..85b326c4 100644 --- a/scenarios/distributed-kafka/process-application-local-polyflow/src/main/resources/application.yml +++ b/scenarios/distributed-kafka/process-application-local-polyflow/src/main/resources/application.yml @@ -12,7 +12,7 @@ spring: hibernate.ddl-auto: validate show-sql: false open-in-view: false - database-platform: io.holunda.polyflow.example.infrastructure.jpa.NoToastPostgresSQLDialect + database-platform: io.holunda.polyflow.example.process.approval.infrastructure.NoToastPostgresSQLDialect flyway: enabled: true locations: "classpath:db/migrations" diff --git a/scenarios/distributed-kafka/process-platform-view-only/src/main/kotlin/io/holunda/polyflow/example/process/platform/infrastructure/AxonKafkaExtendedProperties.kt b/scenarios/distributed-kafka/process-platform-view-only/src/main/kotlin/io/holunda/polyflow/example/process/platform/infrastructure/AxonKafkaExtendedProperties.kt new file mode 100644 index 00000000..6919dd93 --- /dev/null +++ b/scenarios/distributed-kafka/process-platform-view-only/src/main/kotlin/io/holunda/polyflow/example/process/platform/infrastructure/AxonKafkaExtendedProperties.kt @@ -0,0 +1,12 @@ +package io.holunda.polyflow.example.process.platform.infrastructure + +import org.springframework.boot.context.properties.ConfigurationProperties +import org.springframework.boot.context.properties.ConstructorBinding + +@ConfigurationProperties(prefix = "polyflow.axon.kafka") +@ConstructorBinding +data class AxonKafkaExtendedProperties( + val enabled: Boolean = true, + val topicTasks: String, + val topicDataEntries: String +) diff --git a/scenarios/distributed-kafka/process-platform-view-only/src/main/kotlin/io/holunda/polyflow/example/process/platform/infrastructure/AxonKafkaIngressConfiguration.kt b/scenarios/distributed-kafka/process-platform-view-only/src/main/kotlin/io/holunda/polyflow/example/process/platform/infrastructure/AxonKafkaIngressConfiguration.kt new file mode 100644 index 00000000..f695ccbc --- /dev/null +++ b/scenarios/distributed-kafka/process-platform-view-only/src/main/kotlin/io/holunda/polyflow/example/process/platform/infrastructure/AxonKafkaIngressConfiguration.kt @@ -0,0 +1,126 @@ +package io.holunda.polyflow.example.process.platform.infrastructure + +import org.axonframework.extensions.kafka.KafkaProperties +import org.axonframework.extensions.kafka.eventhandling.KafkaMessageConverter +import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerFactory +import org.axonframework.extensions.kafka.eventhandling.consumer.DefaultConsumerFactory +import org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher +import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.KafkaEventMessage +import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.SortedKafkaMessageBuffer +import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource +import org.axonframework.serialization.Serializer +import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.beans.factory.annotation.Value +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration + +@Configuration +@ConditionalOnProperty(value = ["polyflow.axon.kafka.enabled"], havingValue = "true", matchIfMissing = true) +@EnableConfigurationProperties( + AxonKafkaExtendedProperties::class +) +class AxonKafkaIngressConfiguration( + @Value("\${HOSTNAME:localhost}") val hostname: String +) { + /** + * Consumer factory for tasks. + * + * @param properties kafka properties + * @return consumer factory. + */ + @Bean + @Qualifier("polyflowTask") + fun kafkaConsumerFactoryPolyflowTask(properties: KafkaProperties): ConsumerFactory { + properties.clientId = "polyflow-task-$hostname" + return DefaultConsumerFactory(properties.buildConsumerProperties()) + } + + /** + * Consumer factory for data entries. + * + * @param properties kafka properties + * @return consumer factory. + */ + @Bean + @Qualifier("polyflowData") + fun kafkaConsumerFactoryPolyflowData(properties: KafkaProperties): ConsumerFactory { + properties.clientId = "polyflow-data-$hostname" + return DefaultConsumerFactory(properties.buildConsumerProperties()) + } + + /** + * Creates a streamable kafka message source. + * The name of this bean is referenced in the application.yaml. + * + * @param kafkaProperties standard kafka properties. + * @param extendedProperties extended properties for Polyflow. + * @param kafkaConsumerFactory consumer factory. + * @param kafkaFetcher fetcher instance. + * @param serializer serializer. + * @param meterRegistry meter registry. + * @return streaming source. + */ + @Bean("kafkaMessageSourcePolyflowData") + @ConditionalOnProperty(value = ["axon.kafka.consumer.event-processor-mode"], havingValue = "TRACKING") + fun kafkaMessageSourcePolyflowData( + kafkaProperties: KafkaProperties, + extendedProperties: AxonKafkaExtendedProperties, + @Qualifier("polyflowData") kafkaConsumerFactory: ConsumerFactory, + kafkaFetcher: Fetcher, + @Qualifier("eventSerializer") serializer: Serializer, + messageConverter: KafkaMessageConverter + ): StreamableKafkaMessageSource { + return StreamableKafkaMessageSource + .builder() + .topics(listOf(extendedProperties.topicDataEntries)) + .consumerFactory(kafkaConsumerFactory) + .serializer(serializer) + .fetcher(kafkaFetcher) + .messageConverter(messageConverter) + .bufferFactory { + SortedKafkaMessageBuffer( + kafkaProperties.fetcher.bufferSize + ) + } + .build() + } + + /** + * Creates a streamable kafka message source. + * The name of this bean is referenced in the application.yaml. + * + * @param kafkaProperties standard kafka properties. + * @param extendedProperties extended properties for Polyflow. + * @param kafkaConsumerFactory consumer factory. + * @param kafkaFetcher fetcher instance. + * @param serializer serializer. + * @param meterRegistry meter registry. + * @return streaming source. + */ + @Bean("kafkaMessageSourcePolyflowTask") + @ConditionalOnProperty(value = ["axon.kafka.consumer.event-processor-mode"], havingValue = "TRACKING") + fun kafkaMessageSourcePolyflowTask( + kafkaProperties: KafkaProperties, + extendedProperties: AxonKafkaExtendedProperties, + @Qualifier("polyflowTask") kafkaConsumerFactory: ConsumerFactory, + kafkaFetcher: Fetcher, + @Qualifier("eventSerializer") serializer: Serializer, + messageConverter: KafkaMessageConverter + ): StreamableKafkaMessageSource { + return StreamableKafkaMessageSource + .builder() + .topics(listOf(extendedProperties.topicTasks)) + .consumerFactory(kafkaConsumerFactory) + .serializer(serializer) + .fetcher(kafkaFetcher) + .messageConverter(messageConverter) + .bufferFactory { + SortedKafkaMessageBuffer( + kafkaProperties.fetcher.bufferSize + ) + } + .build() + } +} diff --git a/scenarios/distributed-kafka/process-platform-view-only/src/main/kotlin/io/holunda/polyflow/example/process/platform/infrastructure/NoToastPostgresSQLDialect.kt b/scenarios/distributed-kafka/process-platform-view-only/src/main/kotlin/io/holunda/polyflow/example/process/platform/infrastructure/NoToastPostgresSQLDialect.kt new file mode 100644 index 00000000..ec26bf07 --- /dev/null +++ b/scenarios/distributed-kafka/process-platform-view-only/src/main/kotlin/io/holunda/polyflow/example/process/platform/infrastructure/NoToastPostgresSQLDialect.kt @@ -0,0 +1,18 @@ +package io.holunda.polyflow.example.process.platform.infrastructure + +import org.hibernate.dialect.PostgreSQL94Dialect +import org.hibernate.type.descriptor.sql.BinaryTypeDescriptor +import org.hibernate.type.descriptor.sql.SqlTypeDescriptor +import java.sql.Types + +class NoToastPostgresSQLDialect : PostgreSQL94Dialect() { + init { + this.registerColumnType(Types.BLOB, "BYTEA") + } + + override fun remapSqlTypeDescriptor(sqlTypeDescriptor: SqlTypeDescriptor): SqlTypeDescriptor { + return if (sqlTypeDescriptor.sqlType == Types.BLOB) { + BinaryTypeDescriptor.INSTANCE + } else super.remapSqlTypeDescriptor(sqlTypeDescriptor) + } +} diff --git a/scenarios/distributed-kafka/process-platform-view-only/src/main/resources/application.yml b/scenarios/distributed-kafka/process-platform-view-only/src/main/resources/application.yml index 2283a29a..6610a48b 100755 --- a/scenarios/distributed-kafka/process-platform-view-only/src/main/resources/application.yml +++ b/scenarios/distributed-kafka/process-platform-view-only/src/main/resources/application.yml @@ -13,7 +13,7 @@ spring: hibernate.ddl-auto: validate show-sql: false open-in-view: false - database-platform: io.holunda.polyflow.example.infrastructure.NoToastPostgresSQLDialect + database-platform: io.holunda.polyflow.example.process.platform.infrastructure.NoToastPostgresSQLDialect flyway: enabled: true locations: "classpath:db/migrations"