Skip to content

Commit

Permalink
feat: Fill sequence number gaps via message from producer (#1076)
Browse files Browse the repository at this point in the history
* feat: Fill sequence number gaps via message from producer

* bump: Akka 2.9.1-M1

* mima filter

---------

Co-authored-by: Johan Andrén <[email protected]>
  • Loading branch information
patriknw and johanandren authored Nov 29, 2023
1 parent 1653cda commit 7dc0bd9
Show file tree
Hide file tree
Showing 5 changed files with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# adding fill_sequence_number_gaps to ConsumerEventInit
ProblemFilters.exclude[Problem]("akka.projection.grpc.internal.proto.*")
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ message ConsumerEventInit {
string origin_id = 1;
// the stream id of the type of entity the producer wants to push
string stream_id = 2;
// if gaps in sequence numbers may exist and should be filled in
bool fill_sequence_number_gaps = 3;
}

message ConsumerEventStart {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ private[akka] final class EventPusherConsumerServiceImpl(
transformedEventEnvelope.sequenceNr,
transformedEventEnvelope.eventOption.getOrElse(FilteredPayload),
isSnapshotEvent = fromSnapshot(transformedEventEnvelope),
fillSequenceNumberGaps = init.fillSequenceNumberGaps,
transformedEventEnvelope.eventMetadata,
transformedEventEnvelope.tags,
_))(destination.eventProducerPushDestination.settings.journalWriteTimeout, system.scheduler)
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ object Dependencies {
val AkkaProjectionVersionInDocs = "1.5"

object Versions {
val akka = sys.props.getOrElse("build.akka.version", "2.9.0")
val akka = sys.props.getOrElse("build.akka.version", "2.9.1-M1")
val akkaPersistenceCassandra = "1.2.0"
val akkaPersistenceJdbc = "5.3.0"
val akkaPersistenceR2dbc = "1.2.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ akka {
password = ${?DB_PASSWORD}
}
}

# Edge may omit sequence numbers, e.g. due to compaction.
# Such gaps will be handled as filtered events.
typed.event-writer.fill-sequence-number-gaps = on
}

projection.r2dbc {
Expand Down

0 comments on commit 7dc0bd9

Please sign in to comment.