Skip to content

Commit

Permalink
chore(#1036): add new column for event timestamp and use as discrimin…
Browse files Browse the repository at this point in the history
…ator for event order

- closes #1036
  • Loading branch information
Michael von Bargen committed Aug 19, 2024
1 parent 2409469 commit 6240334
Show file tree
Hide file tree
Showing 13 changed files with 94 additions and 22 deletions.
5 changes: 5 additions & 0 deletions docs/reference-guide/components/view-jpa.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ polyflow.view.jpa:
stored-items: task, data-entry, process-instance, process-definition
payload-attribute-level-limit: 2
include-correlated-data-entries-in-data-entry-queries: false
process-old-events: false
data-entry-filters:
include: myProperty2.myOtherEmbeddedProperty3, myProperty2.myOtherEmbeddedProperty2
# exclude: myProperty
Expand All @@ -79,6 +80,10 @@ The `include-correlated-data-entries-in-data-entry-queries` flag controls whethe
the payload of correlated data entries. The data entry attributes (such as `entry_type`, `state.state`, ...) of correlated data entries are not considered.
*Note:* Only one level of correlation depth is considered here and there is no option yet to change the depth.

With the property `process-old-events` you can configure the view such that all events are processed, even when the event timestamp is older than a different event
that was already processed. This might be helpful when doing technical updates but should be used with care as old event will override more recent changes if the
order is not guaranteed. Defaults to `false`.

The attributes `data-entry-filters` and `task-filters` hold `include` / `exclude` lists of property paths which will be taken in
consideration during the search index creation.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ import io.holunda.polyflow.view.query.data.*
import mu.KLogging
import org.axonframework.config.ProcessingGroup
import org.axonframework.eventhandling.EventHandler
import org.axonframework.eventhandling.Timestamp
import org.axonframework.messaging.MetaData
import org.axonframework.queryhandling.QueryHandler
import org.axonframework.queryhandling.QueryResponseMessage
import org.axonframework.queryhandling.QueryUpdateEmitter
import org.springframework.data.domain.Page
import org.springframework.data.repository.findByIdOrNull
import org.springframework.stereotype.Component
import java.time.Instant

/**
* Implementation of the Polyflow Data Entry View API using JPA to create the persistence model.
Expand Down Expand Up @@ -100,17 +102,18 @@ class JpaPolyflowViewDataEntryService(

@Suppress("unused")
@EventHandler
override fun on(event: DataEntryCreatedEvent, metaData: MetaData) {
override fun on(event: DataEntryCreatedEvent, metaData: MetaData, @Timestamp eventTimestamp: Instant) {
if (isDisabledByProperty()) return

val savedEntity = dataEntryRepository.findByIdOrNull(DataEntryId(entryType = event.entryType, entryId = event.entryId))
val entity = if (savedEntity == null || savedEntity.lastModifiedDate < event.createModification.time.toInstant()) {
val entity = if (savedEntity == null || (savedEntity.versionTimestamp < eventTimestamp.toEpochMilli() || polyflowJpaViewProperties.processOldEvents)) {
/*
* save the entity only if there is no newer entity in the database (possibly written by another instance of this service in HA setup)
*/
dataEntryRepository.save(
event.toEntity(
objectMapper = objectMapper,
eventTimestamp = eventTimestamp,
revisionValue = RevisionValue.fromMetaData(metaData),
limit = polyflowJpaViewProperties.payloadAttributeLevelLimit,
filters = polyflowJpaViewProperties.dataEntryJsonPathFilters()
Expand All @@ -126,17 +129,18 @@ class JpaPolyflowViewDataEntryService(

@Suppress("unused")
@EventHandler
override fun on(event: DataEntryUpdatedEvent, metaData: MetaData) {
override fun on(event: DataEntryUpdatedEvent, metaData: MetaData, @Timestamp eventTimestamp: Instant) {
if (isDisabledByProperty()) return

val savedEntity = dataEntryRepository.findByIdOrNull(DataEntryId(entryType = event.entryType, entryId = event.entryId))
val entity = if (savedEntity == null || savedEntity.lastModifiedDate < event.updateModification.time.toInstant()) {
val entity = if (savedEntity == null || (savedEntity.versionTimestamp < eventTimestamp.toEpochMilli() || polyflowJpaViewProperties.processOldEvents)) {
/*
* save the entity only if there is no newer entity in the database (possibly written by another instance of this service in HA setup)
*/
dataEntryRepository.save(
event.toEntity(
objectMapper = objectMapper,
eventTimestamp = eventTimestamp,
revisionValue = RevisionValue.fromMetaData(metaData),
oldEntry = savedEntity,
limit = polyflowJpaViewProperties.payloadAttributeLevelLimit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,13 @@ data class PolyflowJpaViewProperties(
/**
* Controls if DataEntryQueries should consider the payload attributes of correlated data entries. Defaults to "false".
*/
val includeCorrelatedDataEntriesInDataEntryQueries: Boolean = false
val includeCorrelatedDataEntriesInDataEntryQueries: Boolean = false,

/**
* By default if an Event with a more recent timestamp was processed older events will be ignored. If this is set to "true"
* all events will be processed. Note that this can cause issues as older events can override more recent changes. Defaults to "false"
*/
val processOldEvents: Boolean = false

) {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import io.holunda.polyflow.view.jpa.auth.AuthorizationPrincipalType.GROUP
import io.holunda.polyflow.view.jpa.auth.AuthorizationPrincipalType.USER
import io.holunda.polyflow.view.jpa.payload.PayloadAttribute
import org.camunda.bpm.engine.variable.Variables
import java.time.Instant

/**
* Converts the entity into API type.
Expand Down Expand Up @@ -69,7 +70,7 @@ fun DataEntryState.toState() = DataEntryStateEmbeddable(processingType = this.pr
/**
* Event to entity.
*/
fun DataEntryCreatedEvent.toEntity(objectMapper: ObjectMapper, revisionValue: RevisionValue, limit: Int, filters: List<Pair<JsonPathFilterFunction, FilterType>>) = DataEntryEntity(
fun DataEntryCreatedEvent.toEntity(objectMapper: ObjectMapper, eventTimestamp: Instant, revisionValue: RevisionValue, limit: Int, filters: List<Pair<JsonPathFilterFunction, FilterType>>) = DataEntryEntity(
dataEntryId = DataEntryId(entryType = this.entryType, entryId = this.entryId),
payload = this.payload.toPayloadJson(objectMapper),
payloadAttributes = this.payload.toJsonPathsWithValues(limit, filters).map { attr -> PayloadAttribute(attr) }.toMutableSet(),
Expand All @@ -86,6 +87,7 @@ fun DataEntryCreatedEvent.toEntity(objectMapper: ObjectMapper, revisionValue: Re
} else {
0L
},
versionTimestamp = eventTimestamp.toEpochMilli(),
authorizedPrincipals = AuthorizationChange.applyUserAuthorization(mutableSetOf(), this.authorizations).map { user(it).toString() }
.plus(AuthorizationChange.applyGroupAuthorization(mutableSetOf(), this.authorizations).map { group(it).toString() }).toMutableSet(),
correlations = this.correlations.toMutableMap().map { entry -> DataEntryId(entryType = entry.key, entryId = entry.value.toString()) }.toMutableSet()
Expand All @@ -98,6 +100,7 @@ fun DataEntryCreatedEvent.toEntity(objectMapper: ObjectMapper, revisionValue: Re
*/
fun DataEntryUpdatedEvent.toEntity(
objectMapper: ObjectMapper,
eventTimestamp: Instant,
revisionValue: RevisionValue,
oldEntry: DataEntryEntity?,
limit: Int,
Expand All @@ -123,6 +126,7 @@ fun DataEntryUpdatedEvent.toEntity(
} else {
0L
},
versionTimestamp = eventTimestamp.toEpochMilli(),
)
} else {
oldEntry.also {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ class DataEntryEntity(
@Column(name = "AUTHORIZED_PRINCIPAL", nullable = false)
var authorizedPrincipals: MutableSet<String> = mutableSetOf(),

@Column(name = "VERSION_TIMESTAMP")
var versionTimestamp: Long = 0L,

@ElementCollection(fetch = FetchType.EAGER)
@CollectionTable(
name = "PLF_DATA_ENTRY_PAYLOAD_ATTRIBUTES",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import io.holunda.camunda.taskpool.api.business.DataEntryCreatedEvent
import io.holunda.camunda.taskpool.api.business.DataEntryDeletedEvent
import io.holunda.camunda.taskpool.api.business.DataEntryUpdatedEvent
import org.axonframework.messaging.MetaData
import java.time.Instant

/**
* Interface for receiving all data entry relevant events.
Expand All @@ -14,12 +15,12 @@ interface DataEntryEventHandler {
/**
* Data entry created.
*/
fun on(event: DataEntryCreatedEvent, metaData: MetaData)
fun on(event: DataEntryCreatedEvent, metaData: MetaData, eventTimestamp: Instant)

/**
* Data entry updated.
*/
fun on(event: DataEntryUpdatedEvent, metaData: MetaData)
fun on(event: DataEntryUpdatedEvent, metaData: MetaData, eventTimestamp: Instant)

/**
* Data entry deleted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ internal class JpaPolyflowViewServiceDataEntryCorrelationITest {
logNotes = "Created the entry"
)
),
metaData = MetaData.emptyInstance()
metaData = MetaData.emptyInstance(),
eventTimestamp = now
)


Expand All @@ -103,7 +104,8 @@ internal class JpaPolyflowViewServiceDataEntryCorrelationITest {
),
correlations = Variables.createVariables().addCorrelation("io.polyflow.test", id1)
),
metaData = MetaData.emptyInstance()
metaData = MetaData.emptyInstance(),
eventTimestamp = now
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ internal class JpaPolyflowViewServiceDataEntryITest {
logNotes = "Created the entry"
)
),
metaData = RevisionValue(revision = 1).toMetaData()
metaData = RevisionValue(revision = 1).toMetaData(),
now
)

jpaPolyflowViewService.on(
Expand All @@ -125,7 +126,8 @@ internal class JpaPolyflowViewServiceDataEntryITest {
logNotes = "Updates the entry"
)
),
metaData = RevisionValue(revision = 2).toMetaData()
metaData = RevisionValue(revision = 2).toMetaData(),
now
)

jpaPolyflowViewService.on(
Expand All @@ -147,7 +149,8 @@ internal class JpaPolyflowViewServiceDataEntryITest {
logNotes = "Updated the entry"
)
),
metaData = RevisionValue(revision = 3).toMetaData()
metaData = RevisionValue(revision = 3).toMetaData(),
now
)

jpaPolyflowViewService.on(
Expand All @@ -170,7 +173,8 @@ internal class JpaPolyflowViewServiceDataEntryITest {
logNotes = "Created the entry"
)
),
metaData = MetaData.emptyInstance()
metaData = MetaData.emptyInstance(),
now
)

jpaPolyflowViewService.on(
Expand All @@ -193,7 +197,8 @@ internal class JpaPolyflowViewServiceDataEntryITest {
logNotes = "Created the entry"
)
),
metaData = MetaData.emptyInstance()
metaData = MetaData.emptyInstance(),
now
)

jpaPolyflowViewService.on(
Expand Down Expand Up @@ -231,7 +236,33 @@ internal class JpaPolyflowViewServiceDataEntryITest {
),
correlations = Variables.createVariables().addCorrelation("io.polyflow.test", id2)
),
metaData = MetaData.emptyInstance()
metaData = MetaData.emptyInstance(),
now
)

jpaPolyflowViewService.on(
event = DataEntryUpdatedEvent(
entryType = "io.polyflow.test",
entryId = id4,
type = "Test sort",
applicationName = "test-application",
name = "Updated Test Entry 4", // should be ignored
state = ProcessingType.IN_PROGRESS.of("In review"),
payload = serialize(payload = mapOf("key-int" to 4, "key" to "other-value"), mapper = objectMapper),
authorizations = listOf(
addUser("hulk"),
addGroup("avenger")
),
updateModification = Modification(
time = OffsetDateTime.ofInstant(now, ZoneOffset.UTC),
username = "piggy",
log = "Created",
logNotes = "Created the entry"
),
correlations = Variables.createVariables().addCorrelation("io.polyflow.test", id2)
),
metaData = MetaData.emptyInstance(),
now.minusSeconds(1)
)
}

Expand Down Expand Up @@ -356,6 +387,15 @@ internal class JpaPolyflowViewServiceDataEntryITest {
assertThat(result.payload.elements.map { it.entryId }).containsExactly(id2) // id4 is not found by correlation to id2, due to property
}

@Test
fun `should ignore past events`() {
val result = jpaPolyflowViewService.query(
DataEntryForIdentityQuery(entryType = "io.polyflow.test", entryId = id4)
)

assertThat(result.payload.name).isEqualTo("Test Entry 4")
}

@Test
fun `should anonymize data entry`() {
jpaPolyflowViewService.on(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.test.context.ActiveProfiles
import org.springframework.transaction.annotation.Transactional
import org.testcontainers.junit.jupiter.Testcontainers
import java.time.Instant
import java.time.OffsetDateTime
import java.time.ZoneOffset
Expand Down Expand Up @@ -180,7 +179,8 @@ internal class JpaPolyflowViewServiceTaskITest {
logNotes = "Created the entry"
)
),
metaData = RevisionValue(revision = 1).toMetaData()
metaData = RevisionValue(revision = 1).toMetaData(),
now
)

// for testing: fun query(query: TasksWithDataEntriesForUserQuery)
Expand Down Expand Up @@ -223,7 +223,8 @@ internal class JpaPolyflowViewServiceTaskITest {
logNotes = "Created the entry"
)
),
metaData = RevisionValue(revision = 1).toMetaData()
metaData = RevisionValue(revision = 1).toMetaData(),
now
)

jpaPolyflowViewService.on(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ import io.holunda.polyflow.view.jpa.process.SourceReferenceEmbeddable
import io.holunda.polyflow.view.jpa.task.TaskEntity
import io.holunda.polyflow.view.jpa.task.TaskRepository
import io.holunda.polyflow.view.query.data.*
import org.axonframework.eventhandling.Timestamp
import org.axonframework.messaging.MetaData
import org.axonframework.queryhandling.QueryResponseMessage
import org.springframework.stereotype.Component
import org.springframework.transaction.annotation.Propagation
import org.springframework.transaction.annotation.Transactional
import java.time.Instant
import java.util.*

data class Pojo(
Expand Down Expand Up @@ -78,9 +80,9 @@ class JpaPolyflowViewServiceTxFacade(private val implementation: JpaPolyflowView
override fun query(query: DataEntriesQuery, metaData: MetaData): QueryResponseMessage<DataEntriesQueryResult> =
implementation.query(query = query, metaData = metaData)

override fun on(event: DataEntryCreatedEvent, metaData: MetaData) = implementation.on(event = event, metaData = metaData)
override fun on(event: DataEntryCreatedEvent, metaData: MetaData, @Timestamp eventTimestamp: Instant) = implementation.on(event = event, metaData = metaData, eventTimestamp)

override fun on(event: DataEntryUpdatedEvent, metaData: MetaData) = implementation.on(event = event, metaData = metaData)
override fun on(event: DataEntryUpdatedEvent, metaData: MetaData, @Timestamp eventTimestamp: Instant) = implementation.on(event = event, metaData = metaData, eventTimestamp)

override fun on(event: DataEntryDeletedEvent, metaData: MetaData) = implementation.on(event = event, metaData = metaData)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class ConverterExtKtTest {
formKey = "test-entry-form"
)

val entity = event.toEntity(objectMapper, RevisionValue.NO_REVISION, 2, listOf())
val entity = event.toEntity(objectMapper, Instant.now(), RevisionValue.NO_REVISION, 2, listOf())

assertThat(entity.dataEntryId.entryId).isEqualTo("id")
assertThat(entity.dataEntryId.entryType).isEqualTo("io.holunda.test")
Expand Down Expand Up @@ -104,6 +104,7 @@ class ConverterExtKtTest {

val entity = event.toEntity(
objectMapper,
Instant.now(),
RevisionValue.NO_REVISION,
null,
2,
Expand Down Expand Up @@ -169,6 +170,7 @@ class ConverterExtKtTest {

val entity = event.toEntity(
objectMapper,
Instant.now(),
RevisionValue.NO_REVISION,
null,
2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ CREATE TABLE plf_data_entry (
processing_type VARCHAR(255) NOT NULL,
state VARCHAR(255) NOT NULL,
type VARCHAR(255) NOT NULL,
version_timestamp INT8,
PRIMARY KEY (entry_id, entry_type)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ create table plf_data_entry
processing_type varchar(255) not null,
state varchar(255) not null,
type varchar(255) not null,
version_timestamp bigint,
primary key (entry_id, entry_type)
);

Expand Down

0 comments on commit 6240334

Please sign in to comment.