diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFile.java index 8a319ed3899..6ac0939571f 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFile.java @@ -60,4 +60,17 @@ public CompletableFuture getMinOffsetFromFileAsync() { return firstOffset.get(); }); } + + @Override + public void destroyExpiredFile(long expireTimestamp) { + long beforeOffset = this.getMinOffset(); + super.destroyExpiredFile(expireTimestamp); + long afterOffset = this.getMinOffset(); + + if (beforeOffset != afterOffset) { + log.info("CommitLog min cq offset reset, filePath={}, offset={}, expireTimestamp={}, change={}-{}", + filePath, firstOffset.get(), expireTimestamp, beforeOffset, afterOffset); + firstOffset.set(GET_OFFSET_ERROR); + } + } } diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFileTest.java index 7e030d305eb..1e912690b2f 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFileTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFileTest.java @@ -93,19 +93,33 @@ public void getMinOffsetFromFileAsyncTest() { for (int i = 6; i < 9; i++) { ByteBuffer byteBuffer = MessageFormatUtilTest.buildMockedMessageBuffer(); byteBuffer.putLong(MessageFormatUtil.QUEUE_OFFSET_POSITION, i); - Assert.assertEquals(AppendResult.SUCCESS, flatFile.append(byteBuffer, 1L)); + Assert.assertEquals(AppendResult.SUCCESS, flatFile.append(byteBuffer, i)); } Assert.assertEquals(-1L, flatFile.getMinOffsetFromFileAsync().join().longValue()); // append some messages for (int i = 9; i < 30; i++) { + if (i == 20) { + flatFile.commitAsync().join(); + flatFile.rollingNewFile(flatFile.getAppendOffset()); + } ByteBuffer byteBuffer = MessageFormatUtilTest.buildMockedMessageBuffer(); byteBuffer.putLong(MessageFormatUtil.QUEUE_OFFSET_POSITION, i); - Assert.assertEquals(AppendResult.SUCCESS, flatFile.append(byteBuffer, 1L)); + Assert.assertEquals(AppendResult.SUCCESS, flatFile.append(byteBuffer, i)); } flatFile.commitAsync().join(); Assert.assertEquals(6L, flatFile.getMinOffsetFromFile()); Assert.assertEquals(6L, flatFile.getMinOffsetFromFileAsync().join().longValue()); + + // recalculate min offset here + flatFile.destroyExpiredFile(20L); + Assert.assertEquals(20L, flatFile.getMinOffsetFromFile()); + Assert.assertEquals(20L, flatFile.getMinOffsetFromFileAsync().join().longValue()); + + // clean expired file again + flatFile.destroyExpiredFile(20L); + Assert.assertEquals(20L, flatFile.getMinOffsetFromFile()); + Assert.assertEquals(20L, flatFile.getMinOffsetFromFileAsync().join().longValue()); } } \ No newline at end of file diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileStoreTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileStoreTest.java index 79647932dae..2a007af4e9d 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileStoreTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileStoreTest.java @@ -46,7 +46,7 @@ public void init() { storeConfig = new MessageStoreConfig(); storeConfig.setStorePathRootDir(storePath); storeConfig.setTieredBackendServiceProvider(PosixFileSegment.class.getName()); - storeConfig.setBrokerName(storeConfig.getBrokerName()); + storeConfig.setBrokerName("brokerName"); metadataStore = new DefaultMetadataStore(storeConfig); }