diff --git a/akka-projection-grpc/src/main/mima-filters/1.5.0.backwards.excludes/fill_sequence_number_gaps.excludes b/akka-projection-grpc/src/main/mima-filters/1.5.0.backwards.excludes/fill_sequence_number_gaps.excludes new file mode 100644 index 000000000..2bac2209f --- /dev/null +++ b/akka-projection-grpc/src/main/mima-filters/1.5.0.backwards.excludes/fill_sequence_number_gaps.excludes @@ -0,0 +1,2 @@ +# adding fill_sequence_number_gaps to ConsumerEventInit +ProblemFilters.exclude[Problem]("akka.projection.grpc.internal.proto.*") diff --git a/akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_consumer.proto b/akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_consumer.proto index 9b1a0b5ae..27f2d564a 100644 --- a/akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_consumer.proto +++ b/akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_consumer.proto @@ -53,6 +53,8 @@ message ConsumerEventInit { string origin_id = 1; // the stream id of the type of entity the producer wants to push string stream_id = 2; + // if gaps in sequence numbers may exist and should be filled in + bool fill_sequence_number_gaps = 3; } message ConsumerEventStart { diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala index 0c023a177..435ee2ad1 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala @@ -149,6 +149,7 @@ private[akka] final class EventPusherConsumerServiceImpl( transformedEventEnvelope.sequenceNr, transformedEventEnvelope.eventOption.getOrElse(FilteredPayload), isSnapshotEvent = fromSnapshot(transformedEventEnvelope), + fillSequenceNumberGaps = init.fillSequenceNumberGaps, transformedEventEnvelope.eventMetadata, transformedEventEnvelope.tags, _))(destination.eventProducerPushDestination.settings.journalWriteTimeout, system.scheduler) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index ccf98e3e0..a64d63062 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -19,7 +19,7 @@ object Dependencies { val AkkaProjectionVersionInDocs = "1.5" object Versions { - val akka = sys.props.getOrElse("build.akka.version", "2.9.0") + val akka = sys.props.getOrElse("build.akka.version", "2.9.1-M1") val akkaPersistenceCassandra = "1.2.0" val akkaPersistenceJdbc = "5.3.0" val akkaPersistenceR2dbc = "1.2.0" diff --git a/samples/grpc/iot-service-scala/src/main/resources/persistence.conf b/samples/grpc/iot-service-scala/src/main/resources/persistence.conf index d67f12817..c83f56ae5 100644 --- a/samples/grpc/iot-service-scala/src/main/resources/persistence.conf +++ b/samples/grpc/iot-service-scala/src/main/resources/persistence.conf @@ -23,10 +23,6 @@ akka { password = ${?DB_PASSWORD} } } - - # Edge may omit sequence numbers, e.g. due to compaction. - # Such gaps will be handled as filtered events. - typed.event-writer.fill-sequence-number-gaps = on } projection.r2dbc {