Skip to content

Commit

Permalink
[ISSUE apache#8076] Fix correct min cq offset when delete tiered stor…
Browse files Browse the repository at this point in the history
…age CommitLog (apache#8082)
  • Loading branch information
lizhimins authored May 7, 2024
1 parent a15088c commit 0f0324a
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,17 @@ public CompletableFuture<Long> getMinOffsetFromFileAsync() {
return firstOffset.get();
});
}

@Override
public void destroyExpiredFile(long expireTimestamp) {
long beforeOffset = this.getMinOffset();
super.destroyExpiredFile(expireTimestamp);
long afterOffset = this.getMinOffset();

if (beforeOffset != afterOffset) {
log.info("CommitLog min cq offset reset, filePath={}, offset={}, expireTimestamp={}, change={}-{}",
filePath, firstOffset.get(), expireTimestamp, beforeOffset, afterOffset);
firstOffset.set(GET_OFFSET_ERROR);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,33 @@ public void getMinOffsetFromFileAsyncTest() {
for (int i = 6; i < 9; i++) {
ByteBuffer byteBuffer = MessageFormatUtilTest.buildMockedMessageBuffer();
byteBuffer.putLong(MessageFormatUtil.QUEUE_OFFSET_POSITION, i);
Assert.assertEquals(AppendResult.SUCCESS, flatFile.append(byteBuffer, 1L));
Assert.assertEquals(AppendResult.SUCCESS, flatFile.append(byteBuffer, i));
}
Assert.assertEquals(-1L, flatFile.getMinOffsetFromFileAsync().join().longValue());

// append some messages
for (int i = 9; i < 30; i++) {
if (i == 20) {
flatFile.commitAsync().join();
flatFile.rollingNewFile(flatFile.getAppendOffset());
}
ByteBuffer byteBuffer = MessageFormatUtilTest.buildMockedMessageBuffer();
byteBuffer.putLong(MessageFormatUtil.QUEUE_OFFSET_POSITION, i);
Assert.assertEquals(AppendResult.SUCCESS, flatFile.append(byteBuffer, 1L));
Assert.assertEquals(AppendResult.SUCCESS, flatFile.append(byteBuffer, i));
}

flatFile.commitAsync().join();
Assert.assertEquals(6L, flatFile.getMinOffsetFromFile());
Assert.assertEquals(6L, flatFile.getMinOffsetFromFileAsync().join().longValue());

// recalculate min offset here
flatFile.destroyExpiredFile(20L);
Assert.assertEquals(20L, flatFile.getMinOffsetFromFile());
Assert.assertEquals(20L, flatFile.getMinOffsetFromFileAsync().join().longValue());

// clean expired file again
flatFile.destroyExpiredFile(20L);
Assert.assertEquals(20L, flatFile.getMinOffsetFromFile());
Assert.assertEquals(20L, flatFile.getMinOffsetFromFileAsync().join().longValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void init() {
storeConfig = new MessageStoreConfig();
storeConfig.setStorePathRootDir(storePath);
storeConfig.setTieredBackendServiceProvider(PosixFileSegment.class.getName());
storeConfig.setBrokerName(storeConfig.getBrokerName());
storeConfig.setBrokerName("brokerName");
metadataStore = new DefaultMetadataStore(storeConfig);
}

Expand Down

0 comments on commit 0f0324a

Please sign in to comment.