Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support timestamp offset by slice for grpc projections #1181

Merged
merged 4 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ class EventProducerServiceSpec

"EventProducerService" must {
"emit events" in {
val initReq = InitReq(streamId1, 0, 1023, offset = None)
val initReq = InitReq(streamId1, 0, 1023, offset = Seq.empty)
val streamIn = Source
.single(StreamIn(StreamIn.Message.Init(initReq)))
.concat(Source.maybe)
Expand All @@ -306,7 +306,7 @@ class EventProducerServiceSpec
}

"emit filtered events" in {
val initReq = InitReq(streamId2, 0, 1023, offset = None)
val initReq = InitReq(streamId2, 0, 1023, offset = Seq.empty)
val streamIn = Source
.single(StreamIn(StreamIn.Message.Init(initReq)))
.concat(Source.maybe)
Expand Down Expand Up @@ -375,7 +375,7 @@ class EventProducerServiceSpec
val directStreamIdFail = interceptedProducerService
.eventsBySlices(
Source
.single(StreamIn(StreamIn.Message.Init(InitReq("nono-direct", 0, 1023, offset = None)))),
.single(StreamIn(StreamIn.Message.Init(InitReq("nono-direct", 0, 1023, offset = Seq.empty)))),
MetadataBuilder.empty)
.runWith(Sink.head)
.failed
Expand All @@ -385,7 +385,7 @@ class EventProducerServiceSpec
val directMetaFail = interceptedProducerService
.eventsBySlices(
Source
.single(StreamIn(StreamIn.Message.Init(InitReq("ok", 0, 1023, offset = None)))),
.single(StreamIn(StreamIn.Message.Init(InitReq("ok", 0, 1023, offset = Seq.empty)))),
new MetadataBuilder().addText("nono-meta-direct", "value").build())
.runWith(Sink.head)
.failed
Expand All @@ -395,7 +395,7 @@ class EventProducerServiceSpec
val asyncStreamFail = interceptedProducerService
.eventsBySlices(
Source
.single(StreamIn(StreamIn.Message.Init(InitReq("nono-async", 0, 1023, offset = None)))),
.single(StreamIn(StreamIn.Message.Init(InitReq("nono-async", 0, 1023, offset = Seq.empty)))),
MetadataBuilder.empty)
.runWith(Sink.head)
.failed
Expand All @@ -405,7 +405,7 @@ class EventProducerServiceSpec
val passThrough = interceptedProducerService
.eventsBySlices(
Source
.single(StreamIn(StreamIn.Message.Init(InitReq("ok", 0, 1023, offset = None)))),
.single(StreamIn(StreamIn.Message.Init(InitReq("ok", 0, 1023, offset = Seq.empty)))),
MetadataBuilder.empty)
.runWith(Sink.head)
.failed
Expand All @@ -427,7 +427,7 @@ class EventProducerServiceSpec

"replay events" in {
val persistenceId = nextPid(entityType3)
val initReq = InitReq(streamId3, 0, 1023, offset = None)
val initReq = InitReq(streamId3, 0, 1023, offset = Seq.empty)
val replayReq = ReplayReq(replayPersistenceIds =
List(ReplayPersistenceId(Some(PersistenceIdSeqNr(persistenceId.id, 2L)), filterAfterSeqNr = Long.MaxValue)))
val streamIn =
Expand Down Expand Up @@ -456,7 +456,7 @@ class EventProducerServiceSpec
}

"emit events StartingFromSnapshots" in {
val initReq = InitReq(streamId4, 0, 1023, offset = None)
val initReq = InitReq(streamId4, 0, 1023, offset = Seq.empty)
val streamIn = Source
.single(StreamIn(StreamIn.Message.Init(initReq)))
.concat(Source.maybe)
Expand Down Expand Up @@ -489,7 +489,7 @@ class EventProducerServiceSpec

"replay events StartingFromSnapshots" in {
val persistenceId = nextPid(entityType5)
val initReq = InitReq(streamId5, 0, 1023, offset = None)
val initReq = InitReq(streamId5, 0, 1023, offset = Seq.empty)
val replayReq = ReplayReq(replayPersistenceIds =
List(ReplayPersistenceId(Some(PersistenceIdSeqNr(persistenceId.id, 1L)), filterAfterSeqNr = Long.MaxValue)))
val streamIn =
Expand Down Expand Up @@ -525,7 +525,7 @@ class EventProducerServiceSpec

"filter based on event origin" in {
val replicaInfo = ReplicaInfo("replica2", List("replica1", "replica3"))
val initReq = InitReq(streamId6, 0, 1023, offset = None, replicaInfo = Some(replicaInfo))
val initReq = InitReq(streamId6, 0, 1023, offset = Seq.empty, replicaInfo = Some(replicaInfo))
val streamIn = Source
.single(StreamIn(StreamIn.Message.Init(initReq)))
.concat(Source.maybe)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Internal protocol updated for TimestampOffsetBySlice
ProblemFilters.exclude[Problem]("akka.projection.grpc.internal.proto.*")
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ message InitReq {
int32 slice_min = 2;
int32 slice_max = 3;
// start from this offset
Offset offset = 4;
// if empty, then NoOffset
// if single and no slice defined, then TimestampOffset
// if any and slice defined, then TimestampOffsetBySlice
repeated Offset offset = 4;
// consumer defined event filters
repeated FilterCriteria filter = 5;
ReplicaInfo replica_info = 6;
Expand Down Expand Up @@ -206,6 +209,8 @@ message Offset {
// If empty it is assumed to be the persistence_id -> seq_nr of enclosing Event
// or FilteredEvent.
repeated PersistenceIdSeqNr seen = 2;
// If defined then using offsets by slice.
optional int32 slice = 3;
}

message PersistenceIdSeqNr {
Expand Down Expand Up @@ -242,7 +247,7 @@ message Event {
string persistence_id = 1;
int64 seq_nr = 2;
int32 slice = 3;
Offset offset = 4;
repeated Offset offset = 4;
// The event payload may be serialized as Protobuf message when the type_url
// prefix is `type.googleapis.com/` or with Akka serialization when the type_url
// prefix `ser.akka.io/`. For Akka serialization, the serializer id and manifest
Expand All @@ -264,7 +269,7 @@ message FilteredEvent {
string persistence_id = 1;
int64 seq_nr = 2;
int32 slice = 3;
Offset offset = 4;
repeated Offset offset = 4;
string source = 5;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import akka.grpc.scaladsl.SingleResponseRequestBuilder
import akka.grpc.scaladsl.StreamResponseRequestBuilder
import akka.grpc.scaladsl.StringEntry
import akka.persistence.Persistence
import akka.persistence.query.NoOffset
import akka.persistence.query.Offset
import akka.persistence.query.TimestampOffset
import akka.persistence.query.scaladsl._
Expand All @@ -32,7 +31,6 @@ import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal.withChannelBuilder
import akka.projection.grpc.internal.ConnectionException
import akka.projection.grpc.internal.ProtoAnySerialization
import akka.projection.grpc.internal.ProtobufProtocolConversions
import akka.projection.grpc.internal.proto
import akka.projection.grpc.internal.proto.Event
import akka.projection.grpc.internal.proto.EventProducerServiceClient
import akka.projection.grpc.internal.proto.EventTimestampRequest
Expand All @@ -50,7 +48,6 @@ import akka.stream.OverflowStrategy
import akka.stream.scaladsl.Source
import akka.util.Timeout
import com.google.protobuf.Descriptors
import com.google.protobuf.timestamp.Timestamp
import com.typesafe.config.Config
import io.grpc.Status
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder
Expand All @@ -66,6 +63,7 @@ import scala.concurrent.Future
import akka.projection.grpc.internal.proto.ReplayPersistenceId
import akka.projection.grpc.internal.proto.ReplicaInfo
import akka.projection.grpc.replication.scaladsl.ReplicationSettings

object GrpcReadJournal {
val Identifier = "akka.projection.grpc.consumer"

Expand Down Expand Up @@ -289,20 +287,7 @@ final class GrpcReadJournal private (
minSlice <= slice && slice <= maxSlice
}

val protoOffset =
offset match {
case o: TimestampOffset =>
val protoTimestamp = Timestamp(o.timestamp)
val protoSeen = o.seen.iterator.map {
case (pid, seqNr) =>
PersistenceIdSeqNr(pid, seqNr)
}.toSeq
Some(proto.Offset(Some(protoTimestamp), protoSeen))
case NoOffset =>
None
case _ =>
throw new IllegalArgumentException(s"Expected TimestampOffset or NoOffset, but got [$offset]")
}
val protoOffset = offsetToProtoOffset(offset)

def inReqSource(initCriteria: immutable.Seq[ConsumerFilter.FilterCriteria]): Source[StreamIn, NotUsed] =
Source
Expand Down Expand Up @@ -403,7 +388,7 @@ final class GrpcReadJournal private (
clientSettings.serviceName,
event.persistenceId,
event.seqNr,
timestampOffset(event.offset.get).timestamp,
timestampOffset(event.offset.head).timestamp,
event.source)

eventToEnvelope(event, streamId)
Expand All @@ -415,7 +400,7 @@ final class GrpcReadJournal private (
clientSettings.serviceName,
filteredEvent.persistenceId,
filteredEvent.seqNr,
timestampOffset(filteredEvent.offset.get).timestamp,
timestampOffset(filteredEvent.offset.head).timestamp,
filteredEvent.source)

filteredEventToEnvelope(filteredEvent, streamId)
Expand All @@ -435,7 +420,7 @@ final class GrpcReadJournal private (
}

private def filteredEventToEnvelope[Evt](filteredEvent: FilteredEvent, entityType: String): EventEnvelope[Evt] = {
val eventOffset = timestampOffset(filteredEvent.offset.get)
val eventOffset = timestampOffset(filteredEvent.offset.head)
new EventEnvelope(
eventOffset,
filteredEvent.persistenceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import akka.annotation.InternalApi
import akka.persistence.query.NoOffset
import akka.persistence.query.Offset
import akka.persistence.query.TimestampOffset
import akka.persistence.query.TimestampOffsetBySlice
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.typed.PersistenceId
import akka.projection.grpc.consumer.ConsumerFilter
Expand Down Expand Up @@ -48,34 +49,59 @@ import scala.util.Success
@InternalApi
private[akka] object ProtobufProtocolConversions {

def protocolOffsetToOffset(offset: Option[proto.Offset]): Offset =
offset match {
case None => NoOffset
case Some(o) =>
val timestamp =
o.timestamp.map(_.asJavaInstant).getOrElse(Instant.EPOCH)
val seen = o.seen.map {
case PersistenceIdSeqNr(pid, seqNr, _) =>
pid -> seqNr
}.toMap
TimestampOffset(timestamp, seen)
def protocolOffsetToOffset(offsets: Seq[proto.Offset]): Offset =
if (offsets.isEmpty) NoOffset
else if (offsets.exists(_.slice.isDefined)) {
val offsetBySlice = offsets.flatMap { offset =>
offset.slice.map { _ -> protocolOffsetToTimestampOffset(offset) }
}.toMap
TimestampOffsetBySlice(offsetBySlice)
} else {
protocolOffsetToTimestampOffset(offsets.head)
}

def offsetToProtoOffset(offset: Offset): Option[proto.Offset] = {
private def protocolOffsetToTimestampOffset(offset: proto.Offset): TimestampOffset = {
val timestamp = offset.timestamp match {
case Some(ts) => ts.asJavaInstant
case None => Instant.EPOCH
}
// optimised for the expected normal case of one element
val seen = if (offset.seen.size == 1) {
Map(offset.seen.head.persistenceId -> offset.seen.head.seqNr)
} else if (offset.seen.nonEmpty) {
offset.seen.map {
case PersistenceIdSeqNr(pid, seqNr, _) => pid -> seqNr
}.toMap
} else {
Map.empty[String, Long]
}
TimestampOffset(timestamp, seen)
}
pvlugter marked this conversation as resolved.
Show resolved Hide resolved

def offsetToProtoOffset(offset: Offset): Seq[proto.Offset] = {
offset match {
case TimestampOffset(timestamp, _, seen) =>
val protoTimestamp = Timestamp(timestamp)
val protoSeen = seen.iterator.map {
case (pid, seqNr) =>
PersistenceIdSeqNr(pid, seqNr)
case timestampOffset: TimestampOffset =>
Seq(timestampOffsetToProtoOffset(timestampOffset))
case TimestampOffsetBySlice(offsets) =>
offsets.iterator.map {
case (slice, timestampOffset) =>
timestampOffsetToProtoOffset(timestampOffset, Some(slice))
}.toSeq
Some(proto.Offset(Some(protoTimestamp), protoSeen))
case NoOffset => None
case NoOffset => Seq.empty
case other =>
throw new IllegalArgumentException(s"Unexpected offset type [$other]")
}
}

private def timestampOffsetToProtoOffset(offset: TimestampOffset, slice: Option[Int] = None): proto.Offset = {
val protoTimestamp = Timestamp(offset.timestamp)
val protoSeen = offset.seen.iterator.map {
case (pid, seqNr) =>
PersistenceIdSeqNr(pid, seqNr)
}.toSeq
proto.Offset(Some(protoTimestamp), protoSeen, slice)
}

def transformAndEncodeEvent(
transformation: Transformation,
env: EventEnvelope[_],
Expand Down
Loading