Skip to content

Commit

Permalink
feat: support timestamp offset by slice for grpc projections (#1181)
Browse files Browse the repository at this point in the history
  • Loading branch information
pvlugter authored Aug 22, 2024
1 parent 61b4810 commit 98cccc2
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ class EventProducerServiceSpec

"EventProducerService" must {
"emit events" in {
val initReq = InitReq(streamId1, 0, 1023, offset = None)
val initReq = InitReq(streamId1, 0, 1023, offset = Seq.empty)
val streamIn = Source
.single(StreamIn(StreamIn.Message.Init(initReq)))
.concat(Source.maybe)
Expand All @@ -306,7 +306,7 @@ class EventProducerServiceSpec
}

"emit filtered events" in {
val initReq = InitReq(streamId2, 0, 1023, offset = None)
val initReq = InitReq(streamId2, 0, 1023, offset = Seq.empty)
val streamIn = Source
.single(StreamIn(StreamIn.Message.Init(initReq)))
.concat(Source.maybe)
Expand Down Expand Up @@ -375,7 +375,7 @@ class EventProducerServiceSpec
val directStreamIdFail = interceptedProducerService
.eventsBySlices(
Source
.single(StreamIn(StreamIn.Message.Init(InitReq("nono-direct", 0, 1023, offset = None)))),
.single(StreamIn(StreamIn.Message.Init(InitReq("nono-direct", 0, 1023, offset = Seq.empty)))),
MetadataBuilder.empty)
.runWith(Sink.head)
.failed
Expand All @@ -385,7 +385,7 @@ class EventProducerServiceSpec
val directMetaFail = interceptedProducerService
.eventsBySlices(
Source
.single(StreamIn(StreamIn.Message.Init(InitReq("ok", 0, 1023, offset = None)))),
.single(StreamIn(StreamIn.Message.Init(InitReq("ok", 0, 1023, offset = Seq.empty)))),
new MetadataBuilder().addText("nono-meta-direct", "value").build())
.runWith(Sink.head)
.failed
Expand All @@ -395,7 +395,7 @@ class EventProducerServiceSpec
val asyncStreamFail = interceptedProducerService
.eventsBySlices(
Source
.single(StreamIn(StreamIn.Message.Init(InitReq("nono-async", 0, 1023, offset = None)))),
.single(StreamIn(StreamIn.Message.Init(InitReq("nono-async", 0, 1023, offset = Seq.empty)))),
MetadataBuilder.empty)
.runWith(Sink.head)
.failed
Expand All @@ -405,7 +405,7 @@ class EventProducerServiceSpec
val passThrough = interceptedProducerService
.eventsBySlices(
Source
.single(StreamIn(StreamIn.Message.Init(InitReq("ok", 0, 1023, offset = None)))),
.single(StreamIn(StreamIn.Message.Init(InitReq("ok", 0, 1023, offset = Seq.empty)))),
MetadataBuilder.empty)
.runWith(Sink.head)
.failed
Expand All @@ -427,7 +427,7 @@ class EventProducerServiceSpec

"replay events" in {
val persistenceId = nextPid(entityType3)
val initReq = InitReq(streamId3, 0, 1023, offset = None)
val initReq = InitReq(streamId3, 0, 1023, offset = Seq.empty)
val replayReq = ReplayReq(replayPersistenceIds =
List(ReplayPersistenceId(Some(PersistenceIdSeqNr(persistenceId.id, 2L)), filterAfterSeqNr = Long.MaxValue)))
val streamIn =
Expand Down Expand Up @@ -456,7 +456,7 @@ class EventProducerServiceSpec
}

"emit events StartingFromSnapshots" in {
val initReq = InitReq(streamId4, 0, 1023, offset = None)
val initReq = InitReq(streamId4, 0, 1023, offset = Seq.empty)
val streamIn = Source
.single(StreamIn(StreamIn.Message.Init(initReq)))
.concat(Source.maybe)
Expand Down Expand Up @@ -489,7 +489,7 @@ class EventProducerServiceSpec

"replay events StartingFromSnapshots" in {
val persistenceId = nextPid(entityType5)
val initReq = InitReq(streamId5, 0, 1023, offset = None)
val initReq = InitReq(streamId5, 0, 1023, offset = Seq.empty)
val replayReq = ReplayReq(replayPersistenceIds =
List(ReplayPersistenceId(Some(PersistenceIdSeqNr(persistenceId.id, 1L)), filterAfterSeqNr = Long.MaxValue)))
val streamIn =
Expand Down Expand Up @@ -525,7 +525,7 @@ class EventProducerServiceSpec

"filter based on event origin" in {
val replicaInfo = ReplicaInfo("replica2", List("replica1", "replica3"))
val initReq = InitReq(streamId6, 0, 1023, offset = None, replicaInfo = Some(replicaInfo))
val initReq = InitReq(streamId6, 0, 1023, offset = Seq.empty, replicaInfo = Some(replicaInfo))
val streamIn = Source
.single(StreamIn(StreamIn.Message.Init(initReq)))
.concat(Source.maybe)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Internal protocol updated for TimestampOffsetBySlice
ProblemFilters.exclude[Problem]("akka.projection.grpc.internal.proto.*")
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ message InitReq {
int32 slice_min = 2;
int32 slice_max = 3;
// start from this offset
Offset offset = 4;
// if empty, then NoOffset
// if single and no slice defined, then TimestampOffset
// if any and slice defined, then TimestampOffsetBySlice
repeated Offset offset = 4;
// consumer defined event filters
repeated FilterCriteria filter = 5;
ReplicaInfo replica_info = 6;
Expand Down Expand Up @@ -206,6 +209,8 @@ message Offset {
// If empty it is assumed to be the persistence_id -> seq_nr of enclosing Event
// or FilteredEvent.
repeated PersistenceIdSeqNr seen = 2;
// If defined then using offsets by slice.
optional int32 slice = 3;
}

message PersistenceIdSeqNr {
Expand Down Expand Up @@ -242,7 +247,7 @@ message Event {
string persistence_id = 1;
int64 seq_nr = 2;
int32 slice = 3;
Offset offset = 4;
repeated Offset offset = 4;
// The event payload may be serialized as Protobuf message when the type_url
// prefix is `type.googleapis.com/` or with Akka serialization when the type_url
// prefix `ser.akka.io/`. For Akka serialization, the serializer id and manifest
Expand All @@ -264,7 +269,7 @@ message FilteredEvent {
string persistence_id = 1;
int64 seq_nr = 2;
int32 slice = 3;
Offset offset = 4;
repeated Offset offset = 4;
string source = 5;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import akka.grpc.scaladsl.SingleResponseRequestBuilder
import akka.grpc.scaladsl.StreamResponseRequestBuilder
import akka.grpc.scaladsl.StringEntry
import akka.persistence.Persistence
import akka.persistence.query.NoOffset
import akka.persistence.query.Offset
import akka.persistence.query.TimestampOffset
import akka.persistence.query.scaladsl._
Expand All @@ -32,7 +31,6 @@ import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal.withChannelBuilder
import akka.projection.grpc.internal.ConnectionException
import akka.projection.grpc.internal.ProtoAnySerialization
import akka.projection.grpc.internal.ProtobufProtocolConversions
import akka.projection.grpc.internal.proto
import akka.projection.grpc.internal.proto.Event
import akka.projection.grpc.internal.proto.EventProducerServiceClient
import akka.projection.grpc.internal.proto.EventTimestampRequest
Expand All @@ -50,7 +48,6 @@ import akka.stream.OverflowStrategy
import akka.stream.scaladsl.Source
import akka.util.Timeout
import com.google.protobuf.Descriptors
import com.google.protobuf.timestamp.Timestamp
import com.typesafe.config.Config
import io.grpc.Status
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder
Expand All @@ -66,6 +63,7 @@ import scala.concurrent.Future
import akka.projection.grpc.internal.proto.ReplayPersistenceId
import akka.projection.grpc.internal.proto.ReplicaInfo
import akka.projection.grpc.replication.scaladsl.ReplicationSettings

object GrpcReadJournal {
val Identifier = "akka.projection.grpc.consumer"

Expand Down Expand Up @@ -289,20 +287,7 @@ final class GrpcReadJournal private (
minSlice <= slice && slice <= maxSlice
}

val protoOffset =
offset match {
case o: TimestampOffset =>
val protoTimestamp = Timestamp(o.timestamp)
val protoSeen = o.seen.iterator.map {
case (pid, seqNr) =>
PersistenceIdSeqNr(pid, seqNr)
}.toSeq
Some(proto.Offset(Some(protoTimestamp), protoSeen))
case NoOffset =>
None
case _ =>
throw new IllegalArgumentException(s"Expected TimestampOffset or NoOffset, but got [$offset]")
}
val protoOffset = offsetToProtoOffset(offset)

def inReqSource(initCriteria: immutable.Seq[ConsumerFilter.FilterCriteria]): Source[StreamIn, NotUsed] =
Source
Expand Down Expand Up @@ -403,7 +388,7 @@ final class GrpcReadJournal private (
clientSettings.serviceName,
event.persistenceId,
event.seqNr,
timestampOffset(event.offset.get).timestamp,
timestampOffset(event.offset.head).timestamp,
event.source)

eventToEnvelope(event, streamId)
Expand All @@ -415,7 +400,7 @@ final class GrpcReadJournal private (
clientSettings.serviceName,
filteredEvent.persistenceId,
filteredEvent.seqNr,
timestampOffset(filteredEvent.offset.get).timestamp,
timestampOffset(filteredEvent.offset.head).timestamp,
filteredEvent.source)

filteredEventToEnvelope(filteredEvent, streamId)
Expand All @@ -435,7 +420,7 @@ final class GrpcReadJournal private (
}

private def filteredEventToEnvelope[Evt](filteredEvent: FilteredEvent, entityType: String): EventEnvelope[Evt] = {
val eventOffset = timestampOffset(filteredEvent.offset.get)
val eventOffset = timestampOffset(filteredEvent.offset.head)
new EventEnvelope(
eventOffset,
filteredEvent.persistenceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import akka.annotation.InternalApi
import akka.persistence.query.NoOffset
import akka.persistence.query.Offset
import akka.persistence.query.TimestampOffset
import akka.persistence.query.TimestampOffsetBySlice
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.typed.PersistenceId
import akka.projection.grpc.consumer.ConsumerFilter
Expand Down Expand Up @@ -48,34 +49,59 @@ import scala.util.Success
@InternalApi
private[akka] object ProtobufProtocolConversions {

def protocolOffsetToOffset(offset: Option[proto.Offset]): Offset =
offset match {
case None => NoOffset
case Some(o) =>
val timestamp =
o.timestamp.map(_.asJavaInstant).getOrElse(Instant.EPOCH)
val seen = o.seen.map {
case PersistenceIdSeqNr(pid, seqNr, _) =>
pid -> seqNr
}.toMap
TimestampOffset(timestamp, seen)
def protocolOffsetToOffset(offsets: Seq[proto.Offset]): Offset =
if (offsets.isEmpty) NoOffset
else if (offsets.exists(_.slice.isDefined)) {
val offsetBySlice = offsets.flatMap { offset =>
offset.slice.map { _ -> protocolOffsetToTimestampOffset(offset) }
}.toMap
TimestampOffsetBySlice(offsetBySlice)
} else {
protocolOffsetToTimestampOffset(offsets.head)
}

def offsetToProtoOffset(offset: Offset): Option[proto.Offset] = {
private def protocolOffsetToTimestampOffset(offset: proto.Offset): TimestampOffset = {
val timestamp = offset.timestamp match {
case Some(ts) => ts.asJavaInstant
case None => Instant.EPOCH
}
// optimised for the expected normal case of one element
val seen = if (offset.seen.size == 1) {
Map(offset.seen.head.persistenceId -> offset.seen.head.seqNr)
} else if (offset.seen.nonEmpty) {
offset.seen.map {
case PersistenceIdSeqNr(pid, seqNr, _) => pid -> seqNr
}.toMap
} else {
Map.empty[String, Long]
}
TimestampOffset(timestamp, seen)
}

def offsetToProtoOffset(offset: Offset): Seq[proto.Offset] = {
offset match {
case TimestampOffset(timestamp, _, seen) =>
val protoTimestamp = Timestamp(timestamp)
val protoSeen = seen.iterator.map {
case (pid, seqNr) =>
PersistenceIdSeqNr(pid, seqNr)
case timestampOffset: TimestampOffset =>
Seq(timestampOffsetToProtoOffset(timestampOffset))
case TimestampOffsetBySlice(offsets) =>
offsets.iterator.map {
case (slice, timestampOffset) =>
timestampOffsetToProtoOffset(timestampOffset, Some(slice))
}.toSeq
Some(proto.Offset(Some(protoTimestamp), protoSeen))
case NoOffset => None
case NoOffset => Seq.empty
case other =>
throw new IllegalArgumentException(s"Unexpected offset type [$other]")
}
}

private def timestampOffsetToProtoOffset(offset: TimestampOffset, slice: Option[Int] = None): proto.Offset = {
val protoTimestamp = Timestamp(offset.timestamp)
val protoSeen = offset.seen.iterator.map {
case (pid, seqNr) =>
PersistenceIdSeqNr(pid, seqNr)
}.toSeq
proto.Offset(Some(protoTimestamp), protoSeen, slice)
}

def transformAndEncodeEvent(
transformation: Transformation,
env: EventEnvelope[_],
Expand Down

0 comments on commit 98cccc2

Please sign in to comment.