From 8e492e86b7a1bd24824ea138342a88ec9f76a0eb Mon Sep 17 00:00:00 2001 From: Peter Vlugter <59895+pvlugter@users.noreply.github.com> Date: Thu, 15 Aug 2024 14:13:13 +1200 Subject: [PATCH 1/4] feat: support timestamp offset by slice for grpc projections --- .../akka/projection/grpc/event_producer.proto | 7 +++ .../consumer/scaladsl/GrpcReadJournal.scala | 19 +------ .../ProtobufProtocolConversions.scala | 50 +++++++++++++------ 3 files changed, 45 insertions(+), 31 deletions(-) diff --git a/akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_producer.proto b/akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_producer.proto index 60b20eae6..652ff5ad0 100644 --- a/akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_producer.proto +++ b/akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_producer.proto @@ -206,6 +206,8 @@ message Offset { // If empty it is assumed to be the persistence_id -> seq_nr of enclosing Event // or FilteredEvent. repeated PersistenceIdSeqNr seen = 2; + // Offsets by slice. If this is being used, then other offset fields will be empty. + repeated OffsetBySlice offsetsBySlice = 3; } message PersistenceIdSeqNr { @@ -213,6 +215,11 @@ message PersistenceIdSeqNr { int64 seq_nr = 2; } +message OffsetBySlice { + int32 slice = 1; + Offset offset = 2; +} + // Used for Replicated Event Sourcing to filter events based on origin. // For edge topologies, like star topologies, an edge replica is not connected // to all other replicas, but should be able to receive events indirectly via diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala index 72f7d7382..9ea5718c3 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala @@ -17,7 +17,6 @@ import akka.grpc.scaladsl.SingleResponseRequestBuilder import akka.grpc.scaladsl.StreamResponseRequestBuilder import akka.grpc.scaladsl.StringEntry import akka.persistence.Persistence -import akka.persistence.query.NoOffset import akka.persistence.query.Offset import akka.persistence.query.TimestampOffset import akka.persistence.query.scaladsl._ @@ -32,7 +31,6 @@ import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal.withChannelBuilder import akka.projection.grpc.internal.ConnectionException import akka.projection.grpc.internal.ProtoAnySerialization import akka.projection.grpc.internal.ProtobufProtocolConversions -import akka.projection.grpc.internal.proto import akka.projection.grpc.internal.proto.Event import akka.projection.grpc.internal.proto.EventProducerServiceClient import akka.projection.grpc.internal.proto.EventTimestampRequest @@ -50,7 +48,6 @@ import akka.stream.OverflowStrategy import akka.stream.scaladsl.Source import akka.util.Timeout import com.google.protobuf.Descriptors -import com.google.protobuf.timestamp.Timestamp import com.typesafe.config.Config import io.grpc.Status import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder @@ -66,6 +63,7 @@ import scala.concurrent.Future import akka.projection.grpc.internal.proto.ReplayPersistenceId import akka.projection.grpc.internal.proto.ReplicaInfo import akka.projection.grpc.replication.scaladsl.ReplicationSettings + object GrpcReadJournal { val Identifier = "akka.projection.grpc.consumer" @@ -289,20 +287,7 @@ final class GrpcReadJournal private ( minSlice <= slice && slice <= maxSlice } - val protoOffset = - offset match { - case o: TimestampOffset => - val protoTimestamp = Timestamp(o.timestamp) - val protoSeen = o.seen.iterator.map { - case (pid, seqNr) => - PersistenceIdSeqNr(pid, seqNr) - }.toSeq - Some(proto.Offset(Some(protoTimestamp), protoSeen)) - case NoOffset => - None - case _ => - throw new IllegalArgumentException(s"Expected TimestampOffset or NoOffset, but got [$offset]") - } + val protoOffset = offsetToProtoOffset(offset) def inReqSource(initCriteria: immutable.Seq[ConsumerFilter.FilterCriteria]): Source[StreamIn, NotUsed] = Source diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala index 7452f0544..dfbc87522 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala @@ -8,6 +8,7 @@ import akka.annotation.InternalApi import akka.persistence.query.NoOffset import akka.persistence.query.Offset import akka.persistence.query.TimestampOffset +import akka.persistence.query.TimestampOffsetBySlice import akka.persistence.query.typed.EventEnvelope import akka.persistence.typed.PersistenceId import akka.projection.grpc.consumer.ConsumerFilter @@ -52,30 +53,51 @@ private[akka] object ProtobufProtocolConversions { offset match { case None => NoOffset case Some(o) => - val timestamp = - o.timestamp.map(_.asJavaInstant).getOrElse(Instant.EPOCH) - val seen = o.seen.map { - case PersistenceIdSeqNr(pid, seqNr, _) => - pid -> seqNr - }.toMap - TimestampOffset(timestamp, seen) + if (o.offsetsBySlice.nonEmpty) { + val offsets = o.offsetsBySlice.flatMap { offsetBySlice => + offsetBySlice.offset.map(protocolOffsetToTimestampOffset).map(offsetBySlice.slice.->) + } + TimestampOffsetBySlice(offsets.toMap) + } else { + protocolOffsetToTimestampOffset(o) + } } + private def protocolOffsetToTimestampOffset(offset: proto.Offset): TimestampOffset = { + val timestamp = + offset.timestamp.map(_.asJavaInstant).getOrElse(Instant.EPOCH) + val seen = offset.seen.map { + case PersistenceIdSeqNr(pid, seqNr, _) => + pid -> seqNr + }.toMap + TimestampOffset(timestamp, seen) + } + def offsetToProtoOffset(offset: Offset): Option[proto.Offset] = { offset match { - case TimestampOffset(timestamp, _, seen) => - val protoTimestamp = Timestamp(timestamp) - val protoSeen = seen.iterator.map { - case (pid, seqNr) => - PersistenceIdSeqNr(pid, seqNr) - }.toSeq - Some(proto.Offset(Some(protoTimestamp), protoSeen)) + case timestampOffset: TimestampOffset => + Some(timestampOffsetToProtoOffset(timestampOffset)) + case TimestampOffsetBySlice(offsets) => + val offsetsBySlice = offsets.toSeq.map { + case (slice, timestampOffset) => + proto.OffsetBySlice(slice, Some(timestampOffsetToProtoOffset(timestampOffset))) + } + Some(proto.Offset(offsetsBySlice = offsetsBySlice)) case NoOffset => None case other => throw new IllegalArgumentException(s"Unexpected offset type [$other]") } } + private def timestampOffsetToProtoOffset(offset: TimestampOffset): proto.Offset = { + val protoTimestamp = Timestamp(offset.timestamp) + val protoSeen = offset.seen.iterator.map { + case (pid, seqNr) => + PersistenceIdSeqNr(pid, seqNr) + }.toSeq + proto.Offset(Some(protoTimestamp), protoSeen) + } + def transformAndEncodeEvent( transformation: Transformation, env: EventEnvelope[_], From 6a730ec41ed3031f43987b88d84b71836ac808b6 Mon Sep 17 00:00:00 2001 From: Peter Vlugter <59895+pvlugter@users.noreply.github.com> Date: Fri, 16 Aug 2024 13:44:03 +1200 Subject: [PATCH 2/4] optimise offset protocol conversions --- .../ProtobufProtocolConversions.scala | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala index dfbc87522..0987782b9 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala @@ -55,7 +55,7 @@ private[akka] object ProtobufProtocolConversions { case Some(o) => if (o.offsetsBySlice.nonEmpty) { val offsets = o.offsetsBySlice.flatMap { offsetBySlice => - offsetBySlice.offset.map(protocolOffsetToTimestampOffset).map(offsetBySlice.slice.->) + offsetBySlice.offset.map(offset => offsetBySlice.slice -> protocolOffsetToTimestampOffset(offset)) } TimestampOffsetBySlice(offsets.toMap) } else { @@ -64,12 +64,20 @@ private[akka] object ProtobufProtocolConversions { } private def protocolOffsetToTimestampOffset(offset: proto.Offset): TimestampOffset = { - val timestamp = - offset.timestamp.map(_.asJavaInstant).getOrElse(Instant.EPOCH) - val seen = offset.seen.map { - case PersistenceIdSeqNr(pid, seqNr, _) => - pid -> seqNr - }.toMap + val timestamp = offset.timestamp match { + case Some(ts) => ts.asJavaInstant + case None => Instant.EPOCH + } + // optimised for the expected normal case of one element + val seen = if (offset.seen.size == 1) { + Map(offset.seen.head.persistenceId -> offset.seen.head.seqNr) + } else if (offset.seen.nonEmpty) { + offset.seen.map { + case PersistenceIdSeqNr(pid, seqNr, _) => pid -> seqNr + }.toMap + } else { + Map.empty[String, Long] + } TimestampOffset(timestamp, seen) } @@ -78,10 +86,10 @@ private[akka] object ProtobufProtocolConversions { case timestampOffset: TimestampOffset => Some(timestampOffsetToProtoOffset(timestampOffset)) case TimestampOffsetBySlice(offsets) => - val offsetsBySlice = offsets.toSeq.map { + val offsetsBySlice = offsets.iterator.map { case (slice, timestampOffset) => proto.OffsetBySlice(slice, Some(timestampOffsetToProtoOffset(timestampOffset))) - } + }.toSeq Some(proto.Offset(offsetsBySlice = offsetsBySlice)) case NoOffset => None case other => From 4bad32d7cc899d3520478518971ef8ff324466eb Mon Sep 17 00:00:00 2001 From: Peter Vlugter <59895+pvlugter@users.noreply.github.com> Date: Fri, 16 Aug 2024 14:12:14 +1200 Subject: [PATCH 3/4] update protocol to use repeated offset with optional slice --- .../internal/EventProducerServiceSpec.scala | 20 +++++------ .../akka/projection/grpc/event_producer.proto | 18 +++++----- .../consumer/scaladsl/GrpcReadJournal.scala | 6 ++-- .../ProtobufProtocolConversions.scala | 36 +++++++++---------- 4 files changed, 37 insertions(+), 43 deletions(-) diff --git a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/EventProducerServiceSpec.scala b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/EventProducerServiceSpec.scala index 2de9e239a..cd7f55497 100644 --- a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/EventProducerServiceSpec.scala +++ b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/EventProducerServiceSpec.scala @@ -279,7 +279,7 @@ class EventProducerServiceSpec "EventProducerService" must { "emit events" in { - val initReq = InitReq(streamId1, 0, 1023, offset = None) + val initReq = InitReq(streamId1, 0, 1023, offset = Seq.empty) val streamIn = Source .single(StreamIn(StreamIn.Message.Init(initReq))) .concat(Source.maybe) @@ -306,7 +306,7 @@ class EventProducerServiceSpec } "emit filtered events" in { - val initReq = InitReq(streamId2, 0, 1023, offset = None) + val initReq = InitReq(streamId2, 0, 1023, offset = Seq.empty) val streamIn = Source .single(StreamIn(StreamIn.Message.Init(initReq))) .concat(Source.maybe) @@ -375,7 +375,7 @@ class EventProducerServiceSpec val directStreamIdFail = interceptedProducerService .eventsBySlices( Source - .single(StreamIn(StreamIn.Message.Init(InitReq("nono-direct", 0, 1023, offset = None)))), + .single(StreamIn(StreamIn.Message.Init(InitReq("nono-direct", 0, 1023, offset = Seq.empty)))), MetadataBuilder.empty) .runWith(Sink.head) .failed @@ -385,7 +385,7 @@ class EventProducerServiceSpec val directMetaFail = interceptedProducerService .eventsBySlices( Source - .single(StreamIn(StreamIn.Message.Init(InitReq("ok", 0, 1023, offset = None)))), + .single(StreamIn(StreamIn.Message.Init(InitReq("ok", 0, 1023, offset = Seq.empty)))), new MetadataBuilder().addText("nono-meta-direct", "value").build()) .runWith(Sink.head) .failed @@ -395,7 +395,7 @@ class EventProducerServiceSpec val asyncStreamFail = interceptedProducerService .eventsBySlices( Source - .single(StreamIn(StreamIn.Message.Init(InitReq("nono-async", 0, 1023, offset = None)))), + .single(StreamIn(StreamIn.Message.Init(InitReq("nono-async", 0, 1023, offset = Seq.empty)))), MetadataBuilder.empty) .runWith(Sink.head) .failed @@ -405,7 +405,7 @@ class EventProducerServiceSpec val passThrough = interceptedProducerService .eventsBySlices( Source - .single(StreamIn(StreamIn.Message.Init(InitReq("ok", 0, 1023, offset = None)))), + .single(StreamIn(StreamIn.Message.Init(InitReq("ok", 0, 1023, offset = Seq.empty)))), MetadataBuilder.empty) .runWith(Sink.head) .failed @@ -427,7 +427,7 @@ class EventProducerServiceSpec "replay events" in { val persistenceId = nextPid(entityType3) - val initReq = InitReq(streamId3, 0, 1023, offset = None) + val initReq = InitReq(streamId3, 0, 1023, offset = Seq.empty) val replayReq = ReplayReq(replayPersistenceIds = List(ReplayPersistenceId(Some(PersistenceIdSeqNr(persistenceId.id, 2L)), filterAfterSeqNr = Long.MaxValue))) val streamIn = @@ -456,7 +456,7 @@ class EventProducerServiceSpec } "emit events StartingFromSnapshots" in { - val initReq = InitReq(streamId4, 0, 1023, offset = None) + val initReq = InitReq(streamId4, 0, 1023, offset = Seq.empty) val streamIn = Source .single(StreamIn(StreamIn.Message.Init(initReq))) .concat(Source.maybe) @@ -489,7 +489,7 @@ class EventProducerServiceSpec "replay events StartingFromSnapshots" in { val persistenceId = nextPid(entityType5) - val initReq = InitReq(streamId5, 0, 1023, offset = None) + val initReq = InitReq(streamId5, 0, 1023, offset = Seq.empty) val replayReq = ReplayReq(replayPersistenceIds = List(ReplayPersistenceId(Some(PersistenceIdSeqNr(persistenceId.id, 1L)), filterAfterSeqNr = Long.MaxValue))) val streamIn = @@ -525,7 +525,7 @@ class EventProducerServiceSpec "filter based on event origin" in { val replicaInfo = ReplicaInfo("replica2", List("replica1", "replica3")) - val initReq = InitReq(streamId6, 0, 1023, offset = None, replicaInfo = Some(replicaInfo)) + val initReq = InitReq(streamId6, 0, 1023, offset = Seq.empty, replicaInfo = Some(replicaInfo)) val streamIn = Source .single(StreamIn(StreamIn.Message.Init(initReq))) .concat(Source.maybe) diff --git a/akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_producer.proto b/akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_producer.proto index 652ff5ad0..3b9110ac8 100644 --- a/akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_producer.proto +++ b/akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_producer.proto @@ -62,7 +62,10 @@ message InitReq { int32 slice_min = 2; int32 slice_max = 3; // start from this offset - Offset offset = 4; + // if empty, then NoOffset + // if single and no slice defined, then TimestampOffset + // if any and slice defined, then TimestampOffsetBySlice + repeated Offset offset = 4; // consumer defined event filters repeated FilterCriteria filter = 5; ReplicaInfo replica_info = 6; @@ -206,8 +209,8 @@ message Offset { // If empty it is assumed to be the persistence_id -> seq_nr of enclosing Event // or FilteredEvent. repeated PersistenceIdSeqNr seen = 2; - // Offsets by slice. If this is being used, then other offset fields will be empty. - repeated OffsetBySlice offsetsBySlice = 3; + // If defined then using offsets by slice. + optional int32 slice = 3; } message PersistenceIdSeqNr { @@ -215,11 +218,6 @@ message PersistenceIdSeqNr { int64 seq_nr = 2; } -message OffsetBySlice { - int32 slice = 1; - Offset offset = 2; -} - // Used for Replicated Event Sourcing to filter events based on origin. // For edge topologies, like star topologies, an edge replica is not connected // to all other replicas, but should be able to receive events indirectly via @@ -249,7 +247,7 @@ message Event { string persistence_id = 1; int64 seq_nr = 2; int32 slice = 3; - Offset offset = 4; + repeated Offset offset = 4; // The event payload may be serialized as Protobuf message when the type_url // prefix is `type.googleapis.com/` or with Akka serialization when the type_url // prefix `ser.akka.io/`. For Akka serialization, the serializer id and manifest @@ -271,7 +269,7 @@ message FilteredEvent { string persistence_id = 1; int64 seq_nr = 2; int32 slice = 3; - Offset offset = 4; + repeated Offset offset = 4; string source = 5; } diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala index 9ea5718c3..9e115e814 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala @@ -388,7 +388,7 @@ final class GrpcReadJournal private ( clientSettings.serviceName, event.persistenceId, event.seqNr, - timestampOffset(event.offset.get).timestamp, + timestampOffset(event.offset.head).timestamp, event.source) eventToEnvelope(event, streamId) @@ -400,7 +400,7 @@ final class GrpcReadJournal private ( clientSettings.serviceName, filteredEvent.persistenceId, filteredEvent.seqNr, - timestampOffset(filteredEvent.offset.get).timestamp, + timestampOffset(filteredEvent.offset.head).timestamp, filteredEvent.source) filteredEventToEnvelope(filteredEvent, streamId) @@ -420,7 +420,7 @@ final class GrpcReadJournal private ( } private def filteredEventToEnvelope[Evt](filteredEvent: FilteredEvent, entityType: String): EventEnvelope[Evt] = { - val eventOffset = timestampOffset(filteredEvent.offset.get) + val eventOffset = timestampOffset(filteredEvent.offset.head) new EventEnvelope( eventOffset, filteredEvent.persistenceId, diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala index 0987782b9..3ff15312e 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala @@ -49,18 +49,15 @@ import scala.util.Success @InternalApi private[akka] object ProtobufProtocolConversions { - def protocolOffsetToOffset(offset: Option[proto.Offset]): Offset = - offset match { - case None => NoOffset - case Some(o) => - if (o.offsetsBySlice.nonEmpty) { - val offsets = o.offsetsBySlice.flatMap { offsetBySlice => - offsetBySlice.offset.map(offset => offsetBySlice.slice -> protocolOffsetToTimestampOffset(offset)) - } - TimestampOffsetBySlice(offsets.toMap) - } else { - protocolOffsetToTimestampOffset(o) - } + def protocolOffsetToOffset(offsets: Seq[proto.Offset]): Offset = + if (offsets.isEmpty) NoOffset + else if (offsets.exists(_.slice.isDefined)) { + val offsetBySlice = offsets.flatMap { offset => + offset.slice.map { _ -> protocolOffsetToTimestampOffset(offset) } + }.toMap + TimestampOffsetBySlice(offsetBySlice) + } else { + protocolOffsetToTimestampOffset(offsets.head) } private def protocolOffsetToTimestampOffset(offset: proto.Offset): TimestampOffset = { @@ -81,29 +78,28 @@ private[akka] object ProtobufProtocolConversions { TimestampOffset(timestamp, seen) } - def offsetToProtoOffset(offset: Offset): Option[proto.Offset] = { + def offsetToProtoOffset(offset: Offset): Seq[proto.Offset] = { offset match { case timestampOffset: TimestampOffset => - Some(timestampOffsetToProtoOffset(timestampOffset)) + Seq(timestampOffsetToProtoOffset(timestampOffset)) case TimestampOffsetBySlice(offsets) => - val offsetsBySlice = offsets.iterator.map { + offsets.iterator.map { case (slice, timestampOffset) => - proto.OffsetBySlice(slice, Some(timestampOffsetToProtoOffset(timestampOffset))) + timestampOffsetToProtoOffset(timestampOffset, Some(slice)) }.toSeq - Some(proto.Offset(offsetsBySlice = offsetsBySlice)) - case NoOffset => None + case NoOffset => Seq.empty case other => throw new IllegalArgumentException(s"Unexpected offset type [$other]") } } - private def timestampOffsetToProtoOffset(offset: TimestampOffset): proto.Offset = { + private def timestampOffsetToProtoOffset(offset: TimestampOffset, slice: Option[Int] = None): proto.Offset = { val protoTimestamp = Timestamp(offset.timestamp) val protoSeen = offset.seen.iterator.map { case (pid, seqNr) => PersistenceIdSeqNr(pid, seqNr) }.toSeq - proto.Offset(Some(protoTimestamp), protoSeen) + proto.Offset(Some(protoTimestamp), protoSeen, slice) } def transformAndEncodeEvent( From 05a18765c28616d8a8cb4db9448d850e365cc3ed Mon Sep 17 00:00:00 2001 From: Peter Vlugter <59895+pvlugter@users.noreply.github.com> Date: Mon, 19 Aug 2024 12:01:03 +1200 Subject: [PATCH 4/4] mima filter for internal proto changes --- .../1.5.4.backwards.excludes/internal-proto-offset.excludes | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 akka-projection-grpc/src/main/mima-filters/1.5.4.backwards.excludes/internal-proto-offset.excludes diff --git a/akka-projection-grpc/src/main/mima-filters/1.5.4.backwards.excludes/internal-proto-offset.excludes b/akka-projection-grpc/src/main/mima-filters/1.5.4.backwards.excludes/internal-proto-offset.excludes new file mode 100644 index 000000000..4aa6645ae --- /dev/null +++ b/akka-projection-grpc/src/main/mima-filters/1.5.4.backwards.excludes/internal-proto-offset.excludes @@ -0,0 +1,2 @@ +# Internal protocol updated for TimestampOffsetBySlice +ProblemFilters.exclude[Problem]("akka.projection.grpc.internal.proto.*")