Skip to content

Commit

Permalink
fix: refine start offset after downscaling
Browse files Browse the repository at this point in the history
  • Loading branch information
pvlugter committed Aug 28, 2024
1 parent 98cccc2 commit cff453c
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1113,18 +1113,25 @@ class R2dbcTimestampOffsetStoreSpec
offsetStore.readManagementState().futureValue shouldBe Some(ManagementState(paused = false))
}

"start from earliest slice when projection key is changed" in {
val projectionId1 = ProjectionId(UUID.randomUUID().toString, "512-767")
val projectionId2 = ProjectionId(projectionId1.name, "768-1023")
val projectionId3 = ProjectionId(projectionId1.name, "512-1023")
"start from earliest slice range when projection key is changed" in {
val projectionId1 = ProjectionId(UUID.randomUUID().toString, "640-767")
val projectionId2 = ProjectionId(projectionId1.name, "512-767")
val projectionId3 = ProjectionId(projectionId1.name, "768-1023")
val projectionId4 = ProjectionId(projectionId1.name, "512-1023")
val offsetStore1 = new R2dbcOffsetStore(
projectionId1,
Some(new TestTimestampSourceProvider(512, 767, clock)),
Some(new TestTimestampSourceProvider(640, 767, clock)),
system,
settings,
r2dbcExecutor)
val offsetStore2 = new R2dbcOffsetStore(
projectionId2,
Some(new TestTimestampSourceProvider(512, 767, clock)),
system,
settings,
r2dbcExecutor)
val offsetStore3 = new R2dbcOffsetStore(
projectionId3,
Some(new TestTimestampSourceProvider(768, 1023, clock)),
system,
settings,
Expand All @@ -1137,32 +1144,32 @@ class R2dbcTimestampOffsetStoreSpec

val time1 = TestClock.nowMicros().instant()
val time2 = time1.plusSeconds(1)
val time3 = time1.plusSeconds(2)
val time3a = time1.minusSeconds(5 * 60) // furthest behind, previous projection key
val time3b = time1.minusSeconds(3 * 60) // far behind
val time4 = time1.plusSeconds(3 * 60) // far ahead

offsetStore1.saveOffset(OffsetPidSeqNr(TimestampOffset(time1, Map(p1 -> 1L)), p1, 1L)).futureValue
offsetStore1.saveOffset(OffsetPidSeqNr(TimestampOffset(time2, Map(p2 -> 1L)), p2, 1L)).futureValue
offsetStore1.saveOffset(OffsetPidSeqNr(TimestampOffset(time3, Map(p3 -> 1L)), p3, 1L)).futureValue
offsetStore2
.saveOffset(OffsetPidSeqNr(TimestampOffset(time4, Map(p4 -> 1L)), p4, 1L))
.futureValue
offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(time1, Map(p1 -> 1L)), p1, 1L)).futureValue
offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(time2, Map(p2 -> 1L)), p2, 1L)).futureValue
offsetStore1.saveOffset(OffsetPidSeqNr(TimestampOffset(time3a, Map(p3 -> 1L)), p3, 1L)).futureValue
offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(time3b, Map(p3 -> 2L)), p3, 2L)).futureValue
offsetStore3.saveOffset(OffsetPidSeqNr(TimestampOffset(time4, Map(p4 -> 1L)), p4, 1L)).futureValue

// after downscaling
val offsetStore3 = new R2dbcOffsetStore(
projectionId3,
val offsetStore4 = new R2dbcOffsetStore(
projectionId4,
Some(new TestTimestampSourceProvider(512, 1023, clock)),
system,
settings,
r2dbcExecutor)

val offset = TimestampOffset.toTimestampOffset(offsetStore3.readOffset().futureValue.get) // this will load from database
offsetStore3.getState().size shouldBe 4
val offset = TimestampOffset.toTimestampOffset(offsetStore4.readOffset().futureValue.get) // this will load from database
offsetStore4.getState().size shouldBe 4

offset.timestamp shouldBe time2
offset.seen shouldBe Map(p2 -> 1L)

// getOffset is used by management api, and that should not be adjusted
TimestampOffset.toTimestampOffset(offsetStore3.getOffset().futureValue.get).timestamp shouldBe time4
TimestampOffset.toTimestampOffset(offsetStore4.getOffset().futureValue.get).timestamp shouldBe time4
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,11 +310,27 @@ private[projection] class R2dbcOffsetStore(
// When downscaling projection instances (changing slice distribution) there
// is a possibility that one of the previous projection instances was further behind than the backtracking
// window, which would cause missed events if we started from latest. In that case we use the latest
// offset of the earliest slice
val latestBySlice = newState.latestBySlice
val earliest = latestBySlice.minBy(_.timestamp)
// there could be other with same timestamp, but not important to reconstruct exactly the right `seen`
Some(TimestampOffset(earliest.timestamp, Map(earliest.pid -> earliest.seqNr)))
// offset of the earliest slice range (distinct projection key).
val latestBySliceWithKey = recordsWithKey
.groupBy(_.record.slice)
.map {
case (_, records) => records.maxBy(_.record.timestamp)
}
.toVector
// Only needed if there's more than one projection key within the latest offsets by slice.
// To handle restarts after previous downscaling, and all latest are from the same instance.
if (moreThanOneProjectionKey(latestBySliceWithKey)) {
// Use the earliest of the latest from each projection instance (distinct projection key).
val latestByKey =
latestBySliceWithKey.groupBy(_.projectionKey).map {
case (_, records) => records.maxBy(_.record.timestamp)
}
val earliest = latestByKey.minBy(_.record.timestamp).record
// there could be other with same timestamp, but not important to reconstruct exactly the right `seen`
Some(TimestampOffset(earliest.timestamp, Map(earliest.pid -> earliest.seqNr)))
} else {
newState.latestOffset
}
} else {
newState.latestOffset
}
Expand Down

0 comments on commit cff453c

Please sign in to comment.