diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java index 7123332410c..a214059442b 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java @@ -289,38 +289,54 @@ public CompletableFuture getQueueOffsetByTimeAsync(long timestamp, Boundar return CompletableFuture.completedFuture(cqMin); } + ByteBuffer buffer = getMessageAsync(cqMax).join(); + long storeTime = MessageFormatUtil.getStoreTimeStamp(buffer); + if (storeTime < timestamp) { + log.info("FlatMessageFile getQueueOffsetByTimeAsync, exceeded maximum time, " + + "filePath={}, timestamp={}, result={}", filePath, timestamp, cqMax + 1); + return CompletableFuture.completedFuture(cqMax + 1); + } + + buffer = getMessageAsync(cqMin).join(); + storeTime = MessageFormatUtil.getStoreTimeStamp(buffer); + if (storeTime > timestamp) { + log.info("FlatMessageFile getQueueOffsetByTimeAsync, less than minimum time, " + + "filePath={}, timestamp={}, result={}", filePath, timestamp, cqMin); + return CompletableFuture.completedFuture(cqMin); + } + + // binary search lower bound index in a sorted array long minOffset = cqMin; long maxOffset = cqMax; List queryLog = new ArrayList<>(); while (minOffset < maxOffset) { long middle = minOffset + (maxOffset - minOffset) / 2; - ByteBuffer buffer = this.getMessageAsync(middle).join(); - long storeTime = MessageFormatUtil.getStoreTimeStamp(buffer); - queryLog.add(String.format( - "(range=%d-%d, middle=%d, timestamp=%d)", minOffset, maxOffset, middle, storeTime)); - if (storeTime == timestamp) { - minOffset = middle; - break; - } else if (storeTime < timestamp) { + buffer = this.getMessageAsync(middle).join(); + storeTime = MessageFormatUtil.getStoreTimeStamp(buffer); + queryLog.add(String.format("(range=%d-%d, middle=%d, timestamp=%d, diff=%dms)", + minOffset, maxOffset, middle, storeTime, timestamp - storeTime)); + if (storeTime < timestamp) { minOffset = middle + 1; } else { - maxOffset = middle - 1; + maxOffset = middle; } } long offset = minOffset; - while (true) { - long next = boundaryType == BoundaryType.LOWER ? offset - 1 : offset + 1; - if (next < cqMin || next > cqMax) { - break; - } - ByteBuffer buffer = this.getMessageAsync(next).join(); - long storeTime = MessageFormatUtil.getStoreTimeStamp(buffer); - if (storeTime == timestamp) { - offset = next; - continue; + if (boundaryType == BoundaryType.UPPER) { + while (true) { + long next = offset + 1; + if (next > cqMax) { + break; + } + buffer = this.getMessageAsync(next).join(); + storeTime = MessageFormatUtil.getStoreTimeStamp(buffer); + if (storeTime == timestamp) { + offset = next; + } else { + break; + } } - break; } log.info("FlatMessageFile getQueueOffsetByTimeAsync, filePath={}, timestamp={}, result={}, log={}", diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImplTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImplTest.java index ce380776ae5..7b8b17d5bbc 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImplTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImplTest.java @@ -208,11 +208,11 @@ public void testGetOffsetInQueueByTime() throws Exception { Assert.assertEquals(100L, fetcher.getOffsetInQueueByTime(mq.getTopic(), 1, 10, BoundaryType.LOWER)); Assert.assertEquals(100L, fetcher.getOffsetInQueueByTime(mq.getTopic(), 1, 11, BoundaryType.LOWER)); - Assert.assertEquals(199L, fetcher.getOffsetInQueueByTime(mq.getTopic(), 1, 12, BoundaryType.LOWER)); + Assert.assertEquals(200L, fetcher.getOffsetInQueueByTime(mq.getTopic(), 1, 12, BoundaryType.LOWER)); Assert.assertEquals(100L, fetcher.getOffsetInQueueByTime(mq.getTopic(), 1, 10, BoundaryType.UPPER)); Assert.assertEquals(199L, fetcher.getOffsetInQueueByTime(mq.getTopic(), 1, 11, BoundaryType.UPPER)); - Assert.assertEquals(199L, fetcher.getOffsetInQueueByTime(mq.getTopic(), 1, 12, BoundaryType.UPPER)); + Assert.assertEquals(200L, fetcher.getOffsetInQueueByTime(mq.getTopic(), 1, 12, BoundaryType.UPPER)); } @Test diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java index 95245aa27ef..8a417f54a74 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java @@ -188,24 +188,31 @@ public void testBinarySearchInQueueByTime() { // commit message will increase max consume queue offset Assert.assertTrue(flatFile.commitAsync().join()); - Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp3 + 1, BoundaryType.UPPER).join().longValue()); - Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp3, BoundaryType.UPPER).join().longValue()); + // offset: 50, 51, 52, 53, 54 + // inject store time: 0, +100, +100, +100, +200 + Assert.assertEquals(50, flatFile.getQueueOffsetByTimeAsync(0, BoundaryType.LOWER).join().longValue()); + Assert.assertEquals(50, flatFile.getQueueOffsetByTimeAsync(0, BoundaryType.UPPER).join().longValue()); Assert.assertEquals(50, flatFile.getQueueOffsetByTimeAsync(timestamp1 - 1, BoundaryType.LOWER).join().longValue()); + Assert.assertEquals(50, flatFile.getQueueOffsetByTimeAsync(timestamp1 - 1, BoundaryType.UPPER).join().longValue()); + Assert.assertEquals(50, flatFile.getQueueOffsetByTimeAsync(timestamp1, BoundaryType.LOWER).join().longValue()); + Assert.assertEquals(50, flatFile.getQueueOffsetByTimeAsync(timestamp1, BoundaryType.UPPER).join().longValue()); Assert.assertEquals(51, flatFile.getQueueOffsetByTimeAsync(timestamp1 + 1, BoundaryType.LOWER).join().longValue()); - Assert.assertEquals(51, flatFile.getQueueOffsetByTimeAsync(timestamp2, BoundaryType.LOWER).join().longValue()); - Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp2 + 1, BoundaryType.LOWER).join().longValue()); - Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp3, BoundaryType.LOWER).join().longValue()); - - Assert.assertEquals(50, flatFile.getQueueOffsetByTimeAsync(timestamp1, BoundaryType.UPPER).join().longValue()); Assert.assertEquals(51, flatFile.getQueueOffsetByTimeAsync(timestamp1 + 1, BoundaryType.UPPER).join().longValue()); + + Assert.assertEquals(51, flatFile.getQueueOffsetByTimeAsync(timestamp2, BoundaryType.LOWER).join().longValue()); Assert.assertEquals(53, flatFile.getQueueOffsetByTimeAsync(timestamp2, BoundaryType.UPPER).join().longValue()); + Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp2 + 1, BoundaryType.UPPER).join().longValue()); + Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp2 + 1, BoundaryType.LOWER).join().longValue()); - Assert.assertEquals(50, flatFile.getQueueOffsetByTimeAsync(timestamp1 - 1, BoundaryType.UPPER).join().longValue()); - Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp3 + 1, BoundaryType.LOWER).join().longValue()); + Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp3, BoundaryType.LOWER).join().longValue()); + Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp3, BoundaryType.UPPER).join().longValue()); + + Assert.assertEquals(55, flatFile.getQueueOffsetByTimeAsync(timestamp3 + 1, BoundaryType.LOWER).join().longValue()); + Assert.assertEquals(55, flatFile.getQueueOffsetByTimeAsync(timestamp3 + 1, BoundaryType.UPPER).join().longValue()); flatFile.destroy(); }