From 98cccc2cda421f3f4bcc5c440c9a4d3bcbce4357 Mon Sep 17 00:00:00 2001 From: Peter Vlugter <59895+pvlugter@users.noreply.github.com> Date: Thu, 22 Aug 2024 14:01:24 +1200 Subject: [PATCH] feat: support timestamp offset by slice for grpc projections (#1181) --- .../internal/EventProducerServiceSpec.scala | 20 +++--- .../internal-proto-offset.excludes | 2 + .../akka/projection/grpc/event_producer.proto | 11 +++- .../consumer/scaladsl/GrpcReadJournal.scala | 25 ++------ .../ProtobufProtocolConversions.scala | 64 +++++++++++++------ 5 files changed, 70 insertions(+), 52 deletions(-) create mode 100644 akka-projection-grpc/src/main/mima-filters/1.5.4.backwards.excludes/internal-proto-offset.excludes diff --git a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/EventProducerServiceSpec.scala b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/EventProducerServiceSpec.scala index 2de9e239a..cd7f55497 100644 --- a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/EventProducerServiceSpec.scala +++ b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/EventProducerServiceSpec.scala @@ -279,7 +279,7 @@ class EventProducerServiceSpec "EventProducerService" must { "emit events" in { - val initReq = InitReq(streamId1, 0, 1023, offset = None) + val initReq = InitReq(streamId1, 0, 1023, offset = Seq.empty) val streamIn = Source .single(StreamIn(StreamIn.Message.Init(initReq))) .concat(Source.maybe) @@ -306,7 +306,7 @@ class EventProducerServiceSpec } "emit filtered events" in { - val initReq = InitReq(streamId2, 0, 1023, offset = None) + val initReq = InitReq(streamId2, 0, 1023, offset = Seq.empty) val streamIn = Source .single(StreamIn(StreamIn.Message.Init(initReq))) .concat(Source.maybe) @@ -375,7 +375,7 @@ class EventProducerServiceSpec val directStreamIdFail = interceptedProducerService .eventsBySlices( Source - .single(StreamIn(StreamIn.Message.Init(InitReq("nono-direct", 0, 1023, offset = None)))), + .single(StreamIn(StreamIn.Message.Init(InitReq("nono-direct", 0, 1023, offset = Seq.empty)))), MetadataBuilder.empty) .runWith(Sink.head) .failed @@ -385,7 +385,7 @@ class EventProducerServiceSpec val directMetaFail = interceptedProducerService .eventsBySlices( Source - .single(StreamIn(StreamIn.Message.Init(InitReq("ok", 0, 1023, offset = None)))), + .single(StreamIn(StreamIn.Message.Init(InitReq("ok", 0, 1023, offset = Seq.empty)))), new MetadataBuilder().addText("nono-meta-direct", "value").build()) .runWith(Sink.head) .failed @@ -395,7 +395,7 @@ class EventProducerServiceSpec val asyncStreamFail = interceptedProducerService .eventsBySlices( Source - .single(StreamIn(StreamIn.Message.Init(InitReq("nono-async", 0, 1023, offset = None)))), + .single(StreamIn(StreamIn.Message.Init(InitReq("nono-async", 0, 1023, offset = Seq.empty)))), MetadataBuilder.empty) .runWith(Sink.head) .failed @@ -405,7 +405,7 @@ class EventProducerServiceSpec val passThrough = interceptedProducerService .eventsBySlices( Source - .single(StreamIn(StreamIn.Message.Init(InitReq("ok", 0, 1023, offset = None)))), + .single(StreamIn(StreamIn.Message.Init(InitReq("ok", 0, 1023, offset = Seq.empty)))), MetadataBuilder.empty) .runWith(Sink.head) .failed @@ -427,7 +427,7 @@ class EventProducerServiceSpec "replay events" in { val persistenceId = nextPid(entityType3) - val initReq = InitReq(streamId3, 0, 1023, offset = None) + val initReq = InitReq(streamId3, 0, 1023, offset = Seq.empty) val replayReq = ReplayReq(replayPersistenceIds = List(ReplayPersistenceId(Some(PersistenceIdSeqNr(persistenceId.id, 2L)), filterAfterSeqNr = Long.MaxValue))) val streamIn = @@ -456,7 +456,7 @@ class EventProducerServiceSpec } "emit events StartingFromSnapshots" in { - val initReq = InitReq(streamId4, 0, 1023, offset = None) + val initReq = InitReq(streamId4, 0, 1023, offset = Seq.empty) val streamIn = Source .single(StreamIn(StreamIn.Message.Init(initReq))) .concat(Source.maybe) @@ -489,7 +489,7 @@ class EventProducerServiceSpec "replay events StartingFromSnapshots" in { val persistenceId = nextPid(entityType5) - val initReq = InitReq(streamId5, 0, 1023, offset = None) + val initReq = InitReq(streamId5, 0, 1023, offset = Seq.empty) val replayReq = ReplayReq(replayPersistenceIds = List(ReplayPersistenceId(Some(PersistenceIdSeqNr(persistenceId.id, 1L)), filterAfterSeqNr = Long.MaxValue))) val streamIn = @@ -525,7 +525,7 @@ class EventProducerServiceSpec "filter based on event origin" in { val replicaInfo = ReplicaInfo("replica2", List("replica1", "replica3")) - val initReq = InitReq(streamId6, 0, 1023, offset = None, replicaInfo = Some(replicaInfo)) + val initReq = InitReq(streamId6, 0, 1023, offset = Seq.empty, replicaInfo = Some(replicaInfo)) val streamIn = Source .single(StreamIn(StreamIn.Message.Init(initReq))) .concat(Source.maybe) diff --git a/akka-projection-grpc/src/main/mima-filters/1.5.4.backwards.excludes/internal-proto-offset.excludes b/akka-projection-grpc/src/main/mima-filters/1.5.4.backwards.excludes/internal-proto-offset.excludes new file mode 100644 index 000000000..4aa6645ae --- /dev/null +++ b/akka-projection-grpc/src/main/mima-filters/1.5.4.backwards.excludes/internal-proto-offset.excludes @@ -0,0 +1,2 @@ +# Internal protocol updated for TimestampOffsetBySlice +ProblemFilters.exclude[Problem]("akka.projection.grpc.internal.proto.*") diff --git a/akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_producer.proto b/akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_producer.proto index 60b20eae6..3b9110ac8 100644 --- a/akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_producer.proto +++ b/akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_producer.proto @@ -62,7 +62,10 @@ message InitReq { int32 slice_min = 2; int32 slice_max = 3; // start from this offset - Offset offset = 4; + // if empty, then NoOffset + // if single and no slice defined, then TimestampOffset + // if any and slice defined, then TimestampOffsetBySlice + repeated Offset offset = 4; // consumer defined event filters repeated FilterCriteria filter = 5; ReplicaInfo replica_info = 6; @@ -206,6 +209,8 @@ message Offset { // If empty it is assumed to be the persistence_id -> seq_nr of enclosing Event // or FilteredEvent. repeated PersistenceIdSeqNr seen = 2; + // If defined then using offsets by slice. + optional int32 slice = 3; } message PersistenceIdSeqNr { @@ -242,7 +247,7 @@ message Event { string persistence_id = 1; int64 seq_nr = 2; int32 slice = 3; - Offset offset = 4; + repeated Offset offset = 4; // The event payload may be serialized as Protobuf message when the type_url // prefix is `type.googleapis.com/` or with Akka serialization when the type_url // prefix `ser.akka.io/`. For Akka serialization, the serializer id and manifest @@ -264,7 +269,7 @@ message FilteredEvent { string persistence_id = 1; int64 seq_nr = 2; int32 slice = 3; - Offset offset = 4; + repeated Offset offset = 4; string source = 5; } diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala index 72f7d7382..9e115e814 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala @@ -17,7 +17,6 @@ import akka.grpc.scaladsl.SingleResponseRequestBuilder import akka.grpc.scaladsl.StreamResponseRequestBuilder import akka.grpc.scaladsl.StringEntry import akka.persistence.Persistence -import akka.persistence.query.NoOffset import akka.persistence.query.Offset import akka.persistence.query.TimestampOffset import akka.persistence.query.scaladsl._ @@ -32,7 +31,6 @@ import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal.withChannelBuilder import akka.projection.grpc.internal.ConnectionException import akka.projection.grpc.internal.ProtoAnySerialization import akka.projection.grpc.internal.ProtobufProtocolConversions -import akka.projection.grpc.internal.proto import akka.projection.grpc.internal.proto.Event import akka.projection.grpc.internal.proto.EventProducerServiceClient import akka.projection.grpc.internal.proto.EventTimestampRequest @@ -50,7 +48,6 @@ import akka.stream.OverflowStrategy import akka.stream.scaladsl.Source import akka.util.Timeout import com.google.protobuf.Descriptors -import com.google.protobuf.timestamp.Timestamp import com.typesafe.config.Config import io.grpc.Status import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder @@ -66,6 +63,7 @@ import scala.concurrent.Future import akka.projection.grpc.internal.proto.ReplayPersistenceId import akka.projection.grpc.internal.proto.ReplicaInfo import akka.projection.grpc.replication.scaladsl.ReplicationSettings + object GrpcReadJournal { val Identifier = "akka.projection.grpc.consumer" @@ -289,20 +287,7 @@ final class GrpcReadJournal private ( minSlice <= slice && slice <= maxSlice } - val protoOffset = - offset match { - case o: TimestampOffset => - val protoTimestamp = Timestamp(o.timestamp) - val protoSeen = o.seen.iterator.map { - case (pid, seqNr) => - PersistenceIdSeqNr(pid, seqNr) - }.toSeq - Some(proto.Offset(Some(protoTimestamp), protoSeen)) - case NoOffset => - None - case _ => - throw new IllegalArgumentException(s"Expected TimestampOffset or NoOffset, but got [$offset]") - } + val protoOffset = offsetToProtoOffset(offset) def inReqSource(initCriteria: immutable.Seq[ConsumerFilter.FilterCriteria]): Source[StreamIn, NotUsed] = Source @@ -403,7 +388,7 @@ final class GrpcReadJournal private ( clientSettings.serviceName, event.persistenceId, event.seqNr, - timestampOffset(event.offset.get).timestamp, + timestampOffset(event.offset.head).timestamp, event.source) eventToEnvelope(event, streamId) @@ -415,7 +400,7 @@ final class GrpcReadJournal private ( clientSettings.serviceName, filteredEvent.persistenceId, filteredEvent.seqNr, - timestampOffset(filteredEvent.offset.get).timestamp, + timestampOffset(filteredEvent.offset.head).timestamp, filteredEvent.source) filteredEventToEnvelope(filteredEvent, streamId) @@ -435,7 +420,7 @@ final class GrpcReadJournal private ( } private def filteredEventToEnvelope[Evt](filteredEvent: FilteredEvent, entityType: String): EventEnvelope[Evt] = { - val eventOffset = timestampOffset(filteredEvent.offset.get) + val eventOffset = timestampOffset(filteredEvent.offset.head) new EventEnvelope( eventOffset, filteredEvent.persistenceId, diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala index 7452f0544..3ff15312e 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala @@ -8,6 +8,7 @@ import akka.annotation.InternalApi import akka.persistence.query.NoOffset import akka.persistence.query.Offset import akka.persistence.query.TimestampOffset +import akka.persistence.query.TimestampOffsetBySlice import akka.persistence.query.typed.EventEnvelope import akka.persistence.typed.PersistenceId import akka.projection.grpc.consumer.ConsumerFilter @@ -48,34 +49,59 @@ import scala.util.Success @InternalApi private[akka] object ProtobufProtocolConversions { - def protocolOffsetToOffset(offset: Option[proto.Offset]): Offset = - offset match { - case None => NoOffset - case Some(o) => - val timestamp = - o.timestamp.map(_.asJavaInstant).getOrElse(Instant.EPOCH) - val seen = o.seen.map { - case PersistenceIdSeqNr(pid, seqNr, _) => - pid -> seqNr - }.toMap - TimestampOffset(timestamp, seen) + def protocolOffsetToOffset(offsets: Seq[proto.Offset]): Offset = + if (offsets.isEmpty) NoOffset + else if (offsets.exists(_.slice.isDefined)) { + val offsetBySlice = offsets.flatMap { offset => + offset.slice.map { _ -> protocolOffsetToTimestampOffset(offset) } + }.toMap + TimestampOffsetBySlice(offsetBySlice) + } else { + protocolOffsetToTimestampOffset(offsets.head) } - def offsetToProtoOffset(offset: Offset): Option[proto.Offset] = { + private def protocolOffsetToTimestampOffset(offset: proto.Offset): TimestampOffset = { + val timestamp = offset.timestamp match { + case Some(ts) => ts.asJavaInstant + case None => Instant.EPOCH + } + // optimised for the expected normal case of one element + val seen = if (offset.seen.size == 1) { + Map(offset.seen.head.persistenceId -> offset.seen.head.seqNr) + } else if (offset.seen.nonEmpty) { + offset.seen.map { + case PersistenceIdSeqNr(pid, seqNr, _) => pid -> seqNr + }.toMap + } else { + Map.empty[String, Long] + } + TimestampOffset(timestamp, seen) + } + + def offsetToProtoOffset(offset: Offset): Seq[proto.Offset] = { offset match { - case TimestampOffset(timestamp, _, seen) => - val protoTimestamp = Timestamp(timestamp) - val protoSeen = seen.iterator.map { - case (pid, seqNr) => - PersistenceIdSeqNr(pid, seqNr) + case timestampOffset: TimestampOffset => + Seq(timestampOffsetToProtoOffset(timestampOffset)) + case TimestampOffsetBySlice(offsets) => + offsets.iterator.map { + case (slice, timestampOffset) => + timestampOffsetToProtoOffset(timestampOffset, Some(slice)) }.toSeq - Some(proto.Offset(Some(protoTimestamp), protoSeen)) - case NoOffset => None + case NoOffset => Seq.empty case other => throw new IllegalArgumentException(s"Unexpected offset type [$other]") } } + private def timestampOffsetToProtoOffset(offset: TimestampOffset, slice: Option[Int] = None): proto.Offset = { + val protoTimestamp = Timestamp(offset.timestamp) + val protoSeen = offset.seen.iterator.map { + case (pid, seqNr) => + PersistenceIdSeqNr(pid, seqNr) + }.toSeq + proto.Offset(Some(protoTimestamp), protoSeen, slice) + } + def transformAndEncodeEvent( transformation: Transformation, env: EventEnvelope[_],