Skip to content

Commit

Permalink
[ISSUE apache#7878] Fix query message offset return wrong offset with…
Browse files Browse the repository at this point in the history
… boundary type (apache#7962)
  • Loading branch information
lizhimins authored Mar 25, 2024
1 parent d1cc742 commit b6efbb1
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -289,38 +289,54 @@ public CompletableFuture<Long> getQueueOffsetByTimeAsync(long timestamp, Boundar
return CompletableFuture.completedFuture(cqMin);
}

ByteBuffer buffer = getMessageAsync(cqMax).join();
long storeTime = MessageFormatUtil.getStoreTimeStamp(buffer);
if (storeTime < timestamp) {
log.info("FlatMessageFile getQueueOffsetByTimeAsync, exceeded maximum time, " +
"filePath={}, timestamp={}, result={}", filePath, timestamp, cqMax + 1);
return CompletableFuture.completedFuture(cqMax + 1);
}

buffer = getMessageAsync(cqMin).join();
storeTime = MessageFormatUtil.getStoreTimeStamp(buffer);
if (storeTime > timestamp) {
log.info("FlatMessageFile getQueueOffsetByTimeAsync, less than minimum time, " +
"filePath={}, timestamp={}, result={}", filePath, timestamp, cqMin);
return CompletableFuture.completedFuture(cqMin);
}

// binary search lower bound index in a sorted array
long minOffset = cqMin;
long maxOffset = cqMax;
List<String> queryLog = new ArrayList<>();
while (minOffset < maxOffset) {
long middle = minOffset + (maxOffset - minOffset) / 2;
ByteBuffer buffer = this.getMessageAsync(middle).join();
long storeTime = MessageFormatUtil.getStoreTimeStamp(buffer);
queryLog.add(String.format(
"(range=%d-%d, middle=%d, timestamp=%d)", minOffset, maxOffset, middle, storeTime));
if (storeTime == timestamp) {
minOffset = middle;
break;
} else if (storeTime < timestamp) {
buffer = this.getMessageAsync(middle).join();
storeTime = MessageFormatUtil.getStoreTimeStamp(buffer);
queryLog.add(String.format("(range=%d-%d, middle=%d, timestamp=%d, diff=%dms)",
minOffset, maxOffset, middle, storeTime, timestamp - storeTime));
if (storeTime < timestamp) {
minOffset = middle + 1;
} else {
maxOffset = middle - 1;
maxOffset = middle;
}
}

long offset = minOffset;
while (true) {
long next = boundaryType == BoundaryType.LOWER ? offset - 1 : offset + 1;
if (next < cqMin || next > cqMax) {
break;
}
ByteBuffer buffer = this.getMessageAsync(next).join();
long storeTime = MessageFormatUtil.getStoreTimeStamp(buffer);
if (storeTime == timestamp) {
offset = next;
continue;
if (boundaryType == BoundaryType.UPPER) {
while (true) {
long next = offset + 1;
if (next > cqMax) {
break;
}
buffer = this.getMessageAsync(next).join();
storeTime = MessageFormatUtil.getStoreTimeStamp(buffer);
if (storeTime == timestamp) {
offset = next;
} else {
break;
}
}
break;
}

log.info("FlatMessageFile getQueueOffsetByTimeAsync, filePath={}, timestamp={}, result={}, log={}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,11 @@ public void testGetOffsetInQueueByTime() throws Exception {

Assert.assertEquals(100L, fetcher.getOffsetInQueueByTime(mq.getTopic(), 1, 10, BoundaryType.LOWER));
Assert.assertEquals(100L, fetcher.getOffsetInQueueByTime(mq.getTopic(), 1, 11, BoundaryType.LOWER));
Assert.assertEquals(199L, fetcher.getOffsetInQueueByTime(mq.getTopic(), 1, 12, BoundaryType.LOWER));
Assert.assertEquals(200L, fetcher.getOffsetInQueueByTime(mq.getTopic(), 1, 12, BoundaryType.LOWER));

Assert.assertEquals(100L, fetcher.getOffsetInQueueByTime(mq.getTopic(), 1, 10, BoundaryType.UPPER));
Assert.assertEquals(199L, fetcher.getOffsetInQueueByTime(mq.getTopic(), 1, 11, BoundaryType.UPPER));
Assert.assertEquals(199L, fetcher.getOffsetInQueueByTime(mq.getTopic(), 1, 12, BoundaryType.UPPER));
Assert.assertEquals(200L, fetcher.getOffsetInQueueByTime(mq.getTopic(), 1, 12, BoundaryType.UPPER));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,24 +188,31 @@ public void testBinarySearchInQueueByTime() {
// commit message will increase max consume queue offset
Assert.assertTrue(flatFile.commitAsync().join());

Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp3 + 1, BoundaryType.UPPER).join().longValue());
Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp3, BoundaryType.UPPER).join().longValue());
// offset: 50, 51, 52, 53, 54
// inject store time: 0, +100, +100, +100, +200
Assert.assertEquals(50, flatFile.getQueueOffsetByTimeAsync(0, BoundaryType.LOWER).join().longValue());
Assert.assertEquals(50, flatFile.getQueueOffsetByTimeAsync(0, BoundaryType.UPPER).join().longValue());

Assert.assertEquals(50, flatFile.getQueueOffsetByTimeAsync(timestamp1 - 1, BoundaryType.LOWER).join().longValue());
Assert.assertEquals(50, flatFile.getQueueOffsetByTimeAsync(timestamp1 - 1, BoundaryType.UPPER).join().longValue());

Assert.assertEquals(50, flatFile.getQueueOffsetByTimeAsync(timestamp1, BoundaryType.LOWER).join().longValue());
Assert.assertEquals(50, flatFile.getQueueOffsetByTimeAsync(timestamp1, BoundaryType.UPPER).join().longValue());

Assert.assertEquals(51, flatFile.getQueueOffsetByTimeAsync(timestamp1 + 1, BoundaryType.LOWER).join().longValue());
Assert.assertEquals(51, flatFile.getQueueOffsetByTimeAsync(timestamp2, BoundaryType.LOWER).join().longValue());
Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp2 + 1, BoundaryType.LOWER).join().longValue());
Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp3, BoundaryType.LOWER).join().longValue());

Assert.assertEquals(50, flatFile.getQueueOffsetByTimeAsync(timestamp1, BoundaryType.UPPER).join().longValue());
Assert.assertEquals(51, flatFile.getQueueOffsetByTimeAsync(timestamp1 + 1, BoundaryType.UPPER).join().longValue());

Assert.assertEquals(51, flatFile.getQueueOffsetByTimeAsync(timestamp2, BoundaryType.LOWER).join().longValue());
Assert.assertEquals(53, flatFile.getQueueOffsetByTimeAsync(timestamp2, BoundaryType.UPPER).join().longValue());

Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp2 + 1, BoundaryType.UPPER).join().longValue());
Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp2 + 1, BoundaryType.LOWER).join().longValue());

Assert.assertEquals(50, flatFile.getQueueOffsetByTimeAsync(timestamp1 - 1, BoundaryType.UPPER).join().longValue());
Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp3 + 1, BoundaryType.LOWER).join().longValue());
Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp3, BoundaryType.LOWER).join().longValue());
Assert.assertEquals(54, flatFile.getQueueOffsetByTimeAsync(timestamp3, BoundaryType.UPPER).join().longValue());

Assert.assertEquals(55, flatFile.getQueueOffsetByTimeAsync(timestamp3 + 1, BoundaryType.LOWER).join().longValue());
Assert.assertEquals(55, flatFile.getQueueOffsetByTimeAsync(timestamp3 + 1, BoundaryType.UPPER).join().longValue());

flatFile.destroy();
}
Expand Down

0 comments on commit b6efbb1

Please sign in to comment.