From af43a3e71f2bdb4765294f7d6314b1428737849d Mon Sep 17 00:00:00 2001 From: Liu Shengzhong Date: Tue, 30 Apr 2024 12:46:47 +0800 Subject: [PATCH] Fix exception when pop messages with multiple LMQ indexes (#7863) --- .../rocketmq/client/impl/MQClientAPIImpl.java | 35 ++++---- .../client/impl/MQClientAPIImplTest.java | 81 +++++++++++++++++++ 2 files changed, 101 insertions(+), 15 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 12d305b612e..0c58affa34a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -30,6 +30,7 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.Validators; @@ -1155,15 +1156,18 @@ private PopResult processPopResponse(final String brokerName, final RemotingComm final Long msgQueueOffset; if (MixAll.isLmq(topic) && messageExt.getReconsumeTimes() == 0 && StringUtils.isNotEmpty( messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))) { - // process LMQ, LMQ topic has only 1 queue, which queue id is 0 + // process LMQ + String[] queues = messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH) + .split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); + String[] queueOffsets = messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET) + .split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); + long offset = Long.parseLong(queueOffsets[ArrayUtils.indexOf(queues, topic)]); + // LMQ topic has only 1 queue, which queue id is 0 queueIdKey = ExtraInfoUtil.getStartOffsetInfoMapKey(topic, MixAll.LMQ_QUEUE_ID); - queueOffsetKey = ExtraInfoUtil.getQueueOffsetMapKey(topic, MixAll.LMQ_QUEUE_ID, Long.parseLong( - messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET))); - index = sortMap.get(queueIdKey).indexOf( - Long.parseLong(messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET))); + queueOffsetKey = ExtraInfoUtil.getQueueOffsetMapKey(topic, MixAll.LMQ_QUEUE_ID, offset); + index = sortMap.get(queueIdKey).indexOf(offset); msgQueueOffset = msgOffsetInfo.get(queueIdKey).get(index); - if (msgQueueOffset != Long.parseLong( - messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET))) { + if (msgQueueOffset != offset) { log.warn("Queue offset[%d] of msg is strange, not equal to the stored in msg, %s", msgQueueOffset, messageExt); } @@ -1217,14 +1221,15 @@ private static Map> buildQueueOffsetSortedMap(String topic, L final String key; if (MixAll.isLmq(topic) && messageExt.getReconsumeTimes() == 0 && StringUtils.isNotEmpty(messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))) { - // process LMQ, LMQ topic has only 1 queue, which queue id is 0 - key = ExtraInfoUtil.getStartOffsetInfoMapKey( - messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH), 0); - if (!sortMap.containsKey(key)) { - sortMap.put(key, new ArrayList<>(4)); - } - sortMap.get(key).add( - Long.parseLong(messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET))); + // process LMQ + String[] queues = messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH) + .split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); + String[] queueOffsets = messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET) + .split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); + // LMQ topic has only 1 queue, which queue id is 0 + key = ExtraInfoUtil.getStartOffsetInfoMapKey(topic, MixAll.LMQ_QUEUE_ID); + sortMap.putIfAbsent(key, new ArrayList<>(4)); + sortMap.get(key).add(Long.parseLong(queueOffsets[ArrayUtils.indexOf(queues, topic)])); continue; } // Value of POP_CK is used to determine whether it is a pop retry, diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java index 08e7fbe09a8..dc892a3548b 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java @@ -41,6 +41,7 @@ import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; @@ -570,6 +571,86 @@ public void onException(Throwable e) { done.await(); } + @Test + public void testPopMultiLmqMessage_async() throws Exception { + final long popTime = System.currentTimeMillis(); + final int invisibleTime = 10 * 1000; + final String lmqTopic = MixAll.LMQ_PREFIX + "lmq1"; + final String lmqTopic2 = MixAll.LMQ_PREFIX + "lmq2"; + final String multiDispatch = String.join(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER, lmqTopic, lmqTopic2); + final String multiOffset = String.join(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER, "0", "0"); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock mock) throws Throwable { + InvokeCallback callback = mock.getArgument(3); + RemotingCommand request = mock.getArgument(1); + ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null); + RemotingCommand response = RemotingCommand.createResponseCommand(PopMessageResponseHeader.class); + response.setCode(ResponseCode.SUCCESS); + response.setOpaque(request.getOpaque()); + + PopMessageResponseHeader responseHeader = (PopMessageResponseHeader) response.readCustomHeader(); + responseHeader.setInvisibleTime(invisibleTime); + responseHeader.setPopTime(popTime); + responseHeader.setReviveQid(0); + responseHeader.setRestNum(1); + StringBuilder startOffsetInfo = new StringBuilder(64); + ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, topic, 0, 0L); + responseHeader.setStartOffsetInfo(startOffsetInfo.toString()); + StringBuilder msgOffsetInfo = new StringBuilder(64); + ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, topic, 0, Collections.singletonList(0L)); + responseHeader.setMsgOffsetInfo(msgOffsetInfo.toString()); + response.setRemark("FOUND"); + response.makeCustomHeaderToNet(); + + MessageExt message = new MessageExt(); + message.setQueueId(0); + message.setFlag(0); + message.setQueueOffset(10L); + message.setCommitLogOffset(10000L); + message.setSysFlag(0); + message.setBornTimestamp(System.currentTimeMillis()); + message.setBornHost(new InetSocketAddress("127.0.0.1", 10)); + message.setStoreTimestamp(System.currentTimeMillis()); + message.setStoreHost(new InetSocketAddress("127.0.0.1", 11)); + message.setBody("body".getBytes()); + message.setTopic(topic); + MessageAccessor.putProperty(message, MessageConst.PROPERTY_INNER_MULTI_DISPATCH, multiDispatch); + MessageAccessor.putProperty(message, MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, multiOffset); + response.setBody(MessageDecoder.encode(message, false)); + responseFuture.setResponseCommand(response); + callback.operationSucceed(responseFuture.getResponseCommand()); + return null; + } + }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class)); + final CountDownLatch done = new CountDownLatch(1); + final PopMessageRequestHeader requestHeader = new PopMessageRequestHeader(); + requestHeader.setTopic(lmqTopic); + mqClientAPI.popMessageAsync(brokerName, brokerAddr, requestHeader, 10 * 1000, new PopCallback() { + @Override + public void onSuccess(PopResult popResult) { + assertThat(popResult.getPopStatus()).isEqualTo(PopStatus.FOUND); + assertThat(popResult.getRestNum()).isEqualTo(1); + assertThat(popResult.getInvisibleTime()).isEqualTo(invisibleTime); + assertThat(popResult.getPopTime()).isEqualTo(popTime); + assertThat(popResult.getMsgFoundList()).size().isEqualTo(1); + assertThat(popResult.getMsgFoundList().get(0).getTopic()).isEqualTo(lmqTopic); + assertThat(popResult.getMsgFoundList().get(0).getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)) + .isEqualTo(multiDispatch); + assertThat(popResult.getMsgFoundList().get(0).getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)) + .isEqualTo(multiOffset); + done.countDown(); + } + + @Override + public void onException(Throwable e) { + Assertions.fail("want no exception but got one", e); + done.countDown(); + } + }); + done.await(); + } + @Test public void testAckMessageAsync_Success() throws Exception { doAnswer(new Answer() {