Skip to content

Commit

Permalink
always trigger deletion after eviction
Browse files Browse the repository at this point in the history
  • Loading branch information
pvlugter committed Aug 30, 2024
1 parent 5e4b32c commit 3673969
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1106,85 +1106,20 @@ class R2dbcTimestampOffsetStoreSpec
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t8, Map(p8 -> 1L)), p8, 1L)).futureValue

offsetStore.getState().size shouldBe 8 // no eviction
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 2 // deleted t1|p1 and t2|p2, kept t3|p3 (latest)
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 2 // deleted p1@t1 and p2@t2, kept p3@t3 (latest)

val t9 = t0.plus(timeWindow.plus(evictInterval).plusSeconds(3))
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t9, Map(p8 -> 2L)), p8, 2L)).futureValue

offsetStore.getState().size shouldBe 8 // no eviction (outside eviction window, but within keep-number-of-entries)
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 2 // deleted t4|p4 and t5|p5, kept t3|p3 (latest)
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 2 // deleted p4@t4 and p5@t5, kept p3@t3 (latest)

offsetStore.getState().byPid.keySet shouldBe Set(p1, p2, p3, p4, p5, p6, p7, p8)
offsetStore.readOffset().futureValue // reload from database
offsetStore.getState().byPid.keySet shouldBe Set(p3, p6, p7, p8)
}

"delete old records triggered by time window, after eviction, given old latest by slice" in {
val projectionId = genRandomProjectionId()
val evictSettings = settings
.withKeepNumberOfEntries(5)
.withTimeWindow(JDuration.ofSeconds(100))
.withEvictInterval(JDuration.ofSeconds(10))
val offsetStore = createOffsetStore(projectionId, evictSettings)

import evictSettings.{ evictInterval, timeWindow }

val t0 = TestClock.nowMicros().instant()
log.debug("Start time [{}]", t0)

val p1 = "p500" // slice 645
val p2 = "p92" // slice 905
val p3 = "p108" // slice 905
val p4 = "p863" // slice 645
val p5 = "p984" // slice 645
val p6 = "p3080" // slice 645
val p7 = "p4290" // slice 645
val p8 = "p20180" // slice 645

val t1 = t0.plusSeconds(1)
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t1, Map(p1 -> 1L)), p1, 1L)).futureValue

val t2 = t0.plusSeconds(2)
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t2, Map(p2 -> 1L)), p2, 1L)).futureValue

val t3 = t0.plusSeconds(3)
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t3, Map(p3 -> 1L)), p3, 1L)).futureValue

val t4 = t0.plus(evictInterval).plusSeconds(1)
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t4, Map(p4 -> 1L)), p4, 1L)).futureValue

val t5 = t0.plus(evictInterval).plusSeconds(2)
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t5, Map(p5 -> 1L)), p5, 1L)).futureValue

val t6 = t0.plus(evictInterval).plusSeconds(3)
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t6, Map(p6 -> 1L)), p6, 1L)).futureValue

offsetStore.getState().size shouldBe 6

val t7 = t0.plus(timeWindow.minusSeconds(10))
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t7, Map(p7 -> 1L)), p7, 1L)).futureValue

offsetStore.getState().size shouldBe 7 // no eviction
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // no deletion (within time window)

val t8 = t0.plus(timeWindow.plus(evictInterval).minusSeconds(3))
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t8, Map(p8 -> 1L)), p8, 1L)).futureValue

offsetStore.getState().size shouldBe 8 // no eviction
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 2 // deleted t1|p1 and t2|p2, kept t3|p3 (latest)

val t9 = t0.plus(timeWindow.plus(evictInterval).plusSeconds(3))
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t9, Map(p8 -> 2L)), p8, 2L)).futureValue

offsetStore.getState().byPid.keySet shouldBe Set(p3, p4, p5, p6, p7, p8) // evicted t1|p1 and t2|p2
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 2 // deleted t4|p4 and t5|p5, kept t3|p3 (latest)

offsetStore.getState().byPid.keySet shouldBe Set(p3, p4, p5, p6, p7, p8)
offsetStore.readOffset().futureValue // reload from database
offsetStore.getState().byPid.keySet shouldBe Set(p3, p6, p7, p8)
}

"delete old records triggered by number of entries, after eviction, given new persistence ids" in {
"delete old records triggered after eviction" in {
val projectionId = genRandomProjectionId()
val evictSettings = settings
.withKeepNumberOfEntries(5)
Expand All @@ -1207,6 +1142,9 @@ class R2dbcTimestampOffsetStoreSpec
val p7 = "p4290"
val p8 = "p20180"
val p9 = "p21390"
val p10 = "p31070"
val p11 = "p31191"
val p12 = "p32280"

val t1 = t0.plusSeconds(1)
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t1, Map(p1 -> 1L)), p1, 1L)).futureValue
Expand All @@ -1226,9 +1164,10 @@ class R2dbcTimestampOffsetStoreSpec
val t6 = t0.plus(evictInterval).plusSeconds(9)
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t6, Map(p6 -> 1L)), p6, 1L)).futureValue

offsetStore.getState().size shouldBe 6
offsetStore.getState().size shouldBe 6 // no eviction
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // no deletion

val t7 = t0.plus(timeWindow.minusSeconds(10))
val t7 = t0.plus(timeWindow.minus(evictInterval))
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t7, Map(p7 -> 1L)), p7, 1L)).futureValue

offsetStore.getState().size shouldBe 7 // no eviction
Expand All @@ -1237,30 +1176,48 @@ class R2dbcTimestampOffsetStoreSpec
val t8 = t0.plus(timeWindow.plus(evictInterval).plusSeconds(3))
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t8, Map(p8 -> 1L)), p8, 1L)).futureValue

offsetStore.getState().byPid.keySet shouldBe Set(p4, p5, p6, p7, p8) // evicted t1|p1, t2|p2, and t3|p3
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // no deletion, as already evicted older records
offsetStore.getState().byPid.keySet shouldBe Set(p4, p5, p6, p7, p8) // evicted p1@t1, p2@t2, and p3@t3
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 3 // deletion triggered by eviction

val t9 = t0.plus(timeWindow.plus(evictInterval.multipliedBy(2)).plusSeconds(10))
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t9, Map(p8 -> 2L)), p8, 2L)).futureValue

offsetStore.getState().size shouldBe 5 // no eviction (outside time window, but still within limit)
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 3 // deleted p4@t4, p5@t5, p6@t6 (outside window)

val t10 = t0.plus(timeWindow.plus(evictInterval.multipliedBy(2)).plusSeconds(11))
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t10, Map(p9 -> 1L)), p9, 1L)).futureValue

offsetStore.getState().byPid.keySet shouldBe Set(p5, p6, p7, p8, p9) // evicted p4@t4
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // deletion triggered, but nothing to delete

val t11 = t0.plus(timeWindow.plus(evictInterval.multipliedBy(2)).plusSeconds(12))
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t11, Map(p10 -> 1L)), p10, 1L)).futureValue

offsetStore.getState().byPid.keySet shouldBe Set(p6, p7, p8, p9, p10) // evicted p5@t5
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // deletion triggered, but nothing to delete

val t9 = t0.plus(timeWindow.plus(evictInterval).plusSeconds(4))
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t9, Map(p7 -> 2L)), p7, 2L)).futureValue
val t12 = t0.plus(timeWindow.plus(evictInterval.multipliedBy(2)).plusSeconds(13))
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t12, Map(p11 -> 1L)), p11, 1L)).futureValue

offsetStore.getState().byPid.size shouldBe 5 // no eviction (within time window and limit)
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // no deletion (still within limit)
offsetStore.getState().byPid.keySet shouldBe Set(p7, p8, p9, p10, p11) // evicted p6@t6
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // deletion triggered, but nothing to delete

val t10 = t0.plus(timeWindow.plus(evictInterval).plusSeconds(5))
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t10, Map(p8 -> 2L)), p8, 2L)).futureValue
val t13 = t0.plus(timeWindow.plus(evictInterval.multipliedBy(2)).plusSeconds(14))
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t13, Map(p12 -> 1L)), p12, 1L)).futureValue

offsetStore.getState().byPid.size shouldBe 5 // no eviction (within time window and limit)
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // no deletion (still within limit)
offsetStore.getState().byPid.keySet shouldBe Set(p7, p8, p9, p10, p11, p12) // no eviction (within time window)
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // no deletion

val t11 = t0.plus(timeWindow.plus(evictInterval).plusSeconds(6))
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t11, Map(p9 -> 1L)), p9, 1L)).futureValue // new pid
val t14 = t0.plus(timeWindow.multipliedBy(2).plus(evictInterval.multipliedBy(3)).plusSeconds(1))
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t14, Map(p12 -> 2L)), p12, 2L)).futureValue

offsetStore.getState().byPid.size shouldBe 6 // no eviction (over limit but still within time window)
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 3 // deleted t1|p1, t2|p2, and t3|p3
offsetStore.getState().byPid.keySet shouldBe Set(p8, p9, p10, p11, p12) // evicted p7@t7
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 3 // triggered by evict, deleted p7@t7, p8@t8, p8@t9

offsetStore.getState().byPid.keySet shouldBe Set(p4, p5, p6, p7, p8, p9)
offsetStore.getState().byPid.keySet shouldBe Set(p8, p9, p10, p11, p12)
offsetStore.readOffset().futureValue // reload from database
offsetStore.getState().byPid.keySet shouldBe Set(p4, p5, p6, p7, p8, p9)
offsetStore.getState().byPid.keySet shouldBe Set(p9, p10, p11, p12)
}

"set offset" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,9 @@ private[projection] class R2dbcOffsetStore(
// To avoid delete requests when no new offsets have been stored since previous delete
private val idle = new AtomicBoolean(false)

// To trigger next deletion after in-memory eviction
private val triggerDeletion = new AtomicBoolean(false)

system.scheduler.scheduleWithFixedDelay(
settings.deleteInterval,
settings.deleteInterval,
Expand Down Expand Up @@ -438,6 +441,7 @@ private[projection] class R2dbcOffsetStore(
.compareTo(evictWindow) > 0) {
val evictUntil = newState.latestTimestamp.minus(settings.timeWindow)
val s = newState.evict(evictUntil, settings.keepNumberOfEntries)
triggerDeletion.set(true)
logger.debugN(
"Evicted [{}] records until [{}], keeping [{}] records. Latest [{}].",
newState.size - s.size,
Expand Down Expand Up @@ -722,7 +726,7 @@ private[projection] class R2dbcOffsetStore(
Future.successful(0)
} else {
val currentState = getState()
if (currentState.size <= settings.keepNumberOfEntries && currentState.window.compareTo(settings.timeWindow) < 0) {
if (!triggerDeletion.getAndSet(false) && currentState.window.compareTo(settings.timeWindow) < 0) {
// it hasn't filled up the window yet
Future.successful(0)
} else {
Expand Down

0 comments on commit 3673969

Please sign in to comment.