Skip to content

Commit

Permalink
Fix exception when pop messages with multiple LMQ indexes (apache#7863)
Browse files Browse the repository at this point in the history
  • Loading branch information
redlsz authored Apr 30, 2024
1 parent 04dddec commit af43a3e
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.Validators;
Expand Down Expand Up @@ -1155,15 +1156,18 @@ private PopResult processPopResponse(final String brokerName, final RemotingComm
final Long msgQueueOffset;
if (MixAll.isLmq(topic) && messageExt.getReconsumeTimes() == 0 && StringUtils.isNotEmpty(
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))) {
// process LMQ, LMQ topic has only 1 queue, which queue id is 0
// process LMQ
String[] queues = messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)
.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
String[] queueOffsets = messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)
.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
long offset = Long.parseLong(queueOffsets[ArrayUtils.indexOf(queues, topic)]);
// LMQ topic has only 1 queue, which queue id is 0
queueIdKey = ExtraInfoUtil.getStartOffsetInfoMapKey(topic, MixAll.LMQ_QUEUE_ID);
queueOffsetKey = ExtraInfoUtil.getQueueOffsetMapKey(topic, MixAll.LMQ_QUEUE_ID, Long.parseLong(
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)));
index = sortMap.get(queueIdKey).indexOf(
Long.parseLong(messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)));
queueOffsetKey = ExtraInfoUtil.getQueueOffsetMapKey(topic, MixAll.LMQ_QUEUE_ID, offset);
index = sortMap.get(queueIdKey).indexOf(offset);
msgQueueOffset = msgOffsetInfo.get(queueIdKey).get(index);
if (msgQueueOffset != Long.parseLong(
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET))) {
if (msgQueueOffset != offset) {
log.warn("Queue offset[%d] of msg is strange, not equal to the stored in msg, %s",
msgQueueOffset, messageExt);
}
Expand Down Expand Up @@ -1217,14 +1221,15 @@ private static Map<String, List<Long>> buildQueueOffsetSortedMap(String topic, L
final String key;
if (MixAll.isLmq(topic) && messageExt.getReconsumeTimes() == 0
&& StringUtils.isNotEmpty(messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))) {
// process LMQ, LMQ topic has only 1 queue, which queue id is 0
key = ExtraInfoUtil.getStartOffsetInfoMapKey(
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH), 0);
if (!sortMap.containsKey(key)) {
sortMap.put(key, new ArrayList<>(4));
}
sortMap.get(key).add(
Long.parseLong(messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)));
// process LMQ
String[] queues = messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)
.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
String[] queueOffsets = messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)
.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
// LMQ topic has only 1 queue, which queue id is 0
key = ExtraInfoUtil.getStartOffsetInfoMapKey(topic, MixAll.LMQ_QUEUE_ID);
sortMap.putIfAbsent(key, new ArrayList<>(4));
sortMap.get(key).add(Long.parseLong(queueOffsets[ArrayUtils.indexOf(queues, topic)]));
continue;
}
// Value of POP_CK is used to determine whether it is a pop retry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
Expand Down Expand Up @@ -570,6 +571,86 @@ public void onException(Throwable e) {
done.await();
}

@Test
public void testPopMultiLmqMessage_async() throws Exception {
final long popTime = System.currentTimeMillis();
final int invisibleTime = 10 * 1000;
final String lmqTopic = MixAll.LMQ_PREFIX + "lmq1";
final String lmqTopic2 = MixAll.LMQ_PREFIX + "lmq2";
final String multiDispatch = String.join(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER, lmqTopic, lmqTopic2);
final String multiOffset = String.join(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER, "0", "0");
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock mock) throws Throwable {
InvokeCallback callback = mock.getArgument(3);
RemotingCommand request = mock.getArgument(1);
ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null);
RemotingCommand response = RemotingCommand.createResponseCommand(PopMessageResponseHeader.class);
response.setCode(ResponseCode.SUCCESS);
response.setOpaque(request.getOpaque());

PopMessageResponseHeader responseHeader = (PopMessageResponseHeader) response.readCustomHeader();
responseHeader.setInvisibleTime(invisibleTime);
responseHeader.setPopTime(popTime);
responseHeader.setReviveQid(0);
responseHeader.setRestNum(1);
StringBuilder startOffsetInfo = new StringBuilder(64);
ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, topic, 0, 0L);
responseHeader.setStartOffsetInfo(startOffsetInfo.toString());
StringBuilder msgOffsetInfo = new StringBuilder(64);
ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, topic, 0, Collections.singletonList(0L));
responseHeader.setMsgOffsetInfo(msgOffsetInfo.toString());
response.setRemark("FOUND");
response.makeCustomHeaderToNet();

MessageExt message = new MessageExt();
message.setQueueId(0);
message.setFlag(0);
message.setQueueOffset(10L);
message.setCommitLogOffset(10000L);
message.setSysFlag(0);
message.setBornTimestamp(System.currentTimeMillis());
message.setBornHost(new InetSocketAddress("127.0.0.1", 10));
message.setStoreTimestamp(System.currentTimeMillis());
message.setStoreHost(new InetSocketAddress("127.0.0.1", 11));
message.setBody("body".getBytes());
message.setTopic(topic);
MessageAccessor.putProperty(message, MessageConst.PROPERTY_INNER_MULTI_DISPATCH, multiDispatch);
MessageAccessor.putProperty(message, MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, multiOffset);
response.setBody(MessageDecoder.encode(message, false));
responseFuture.setResponseCommand(response);
callback.operationSucceed(responseFuture.getResponseCommand());
return null;
}
}).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
final CountDownLatch done = new CountDownLatch(1);
final PopMessageRequestHeader requestHeader = new PopMessageRequestHeader();
requestHeader.setTopic(lmqTopic);
mqClientAPI.popMessageAsync(brokerName, brokerAddr, requestHeader, 10 * 1000, new PopCallback() {
@Override
public void onSuccess(PopResult popResult) {
assertThat(popResult.getPopStatus()).isEqualTo(PopStatus.FOUND);
assertThat(popResult.getRestNum()).isEqualTo(1);
assertThat(popResult.getInvisibleTime()).isEqualTo(invisibleTime);
assertThat(popResult.getPopTime()).isEqualTo(popTime);
assertThat(popResult.getMsgFoundList()).size().isEqualTo(1);
assertThat(popResult.getMsgFoundList().get(0).getTopic()).isEqualTo(lmqTopic);
assertThat(popResult.getMsgFoundList().get(0).getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))
.isEqualTo(multiDispatch);
assertThat(popResult.getMsgFoundList().get(0).getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET))
.isEqualTo(multiOffset);
done.countDown();
}

@Override
public void onException(Throwable e) {
Assertions.fail("want no exception but got one", e);
done.countDown();
}
});
done.await();
}

@Test
public void testAckMessageAsync_Success() throws Exception {
doAnswer(new Answer<Void>() {
Expand Down

0 comments on commit af43a3e

Please sign in to comment.