Skip to content

Commit

Permalink
[ISSUE apache#7961] use BoundaryType in binarySearchInCQByTime
Browse files Browse the repository at this point in the history
Co-authored-by: 凯铎 <[email protected]>
  • Loading branch information
Koado and 凯铎 authored Mar 28, 2024
1 parent 093cb84 commit fb6f9e4
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,8 @@ public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, Bo
if (high == null || high == -1) {
return 0;
}
return this.rocksDBConsumeQueueTable.binarySearchInCQByTime(topic, queueId, high, low, timestamp, minPhysicOffset);
return this.rocksDBConsumeQueueTable.binarySearchInCQByTime(topic, queueId, high, low, timestamp,
minPhysicOffset, boundaryType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.List;

import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
Expand Down Expand Up @@ -180,10 +181,10 @@ public void destroyCQ(final String topic, final int queueId, WriteBatch writeBat
}

public long binarySearchInCQByTime(String topic, int queueId, long high, long low, long timestamp,
long minPhysicOffset) throws RocksDBException {
long result = 0;
long minPhysicOffset, BoundaryType boundaryType) throws RocksDBException {
long result = -1L;
long targetOffset = -1L, leftOffset = -1L, rightOffset = -1L;
long leftValue = -1L, rightValue = -1L;
long ceiling = high, floor = low;
while (high >= low) {
long midOffset = low + ((high - low) >>> 1);
ByteBuffer byteBuffer = getCQInKV(topic, queueId, midOffset);
Expand All @@ -209,22 +210,64 @@ public long binarySearchInCQByTime(String topic, int queueId, long high, long lo
} else if (storeTime > timestamp) {
high = midOffset - 1;
rightOffset = midOffset;
rightValue = storeTime;
} else {
low = midOffset + 1;
leftOffset = midOffset;
leftValue = storeTime;
}
}
if (targetOffset != -1) {
// offset next to it might also share the same store-timestamp.
switch (boundaryType) {
case LOWER: {
while (true) {
long nextOffset = targetOffset - 1;
if (nextOffset < floor) {
break;
}
ByteBuffer byteBuffer = getCQInKV(topic, queueId, nextOffset);
long storeTime = byteBuffer.getLong(MSG_STORE_TIME_SIZE_OFFSET);
if (storeTime != timestamp) {
break;
}
targetOffset = nextOffset;
}
break;
}
case UPPER: {
while (true) {
long nextOffset = targetOffset + 1;
if (nextOffset > ceiling) {
break;
}
ByteBuffer byteBuffer = getCQInKV(topic, queueId, nextOffset);
long storeTime = byteBuffer.getLong(MSG_STORE_TIME_SIZE_OFFSET);
if (storeTime != timestamp) {
break;
}
targetOffset = nextOffset;
}
break;
}
default: {
log.warn("Unknown boundary type");
break;
}
}
result = targetOffset;
} else {
if (leftValue == -1) {
result = rightOffset;
} else if (rightValue == -1) {
result = leftOffset;
} else {
result = Math.abs(timestamp - leftValue) > Math.abs(timestamp - rightValue) ? rightOffset : leftOffset;
switch (boundaryType) {
case LOWER: {
result = rightOffset;
break;
}
case UPPER: {
result = leftOffset;
break;
}
default: {
log.warn("Unknown boundary type");
break;
}
}
}
return result;
Expand Down

0 comments on commit fb6f9e4

Please sign in to comment.