Skip to content

Commit

Permalink
fix transaction msg stats (apache#7766)
Browse files Browse the repository at this point in the history
  • Loading branch information
kingkh1995 authored Feb 29, 2024
1 parent 471dbc0 commit 01761bd
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,9 @@ private RemotingCommand sendFinalMessage(MessageExtBrokerInner msgInner) {
switch (putMessageResult.getPutMessageStatus()) {
// Success
case PUT_OK:
this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes());
this.brokerController.getBrokerStatsManager().incBrokerPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum());
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.stats.Stats;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
Expand Down Expand Up @@ -56,6 +57,8 @@
@RunWith(MockitoJUnitRunner.class)
public class EndTransactionProcessorTest {

private static final String TOPIC = "trans_topic_test";

private EndTransactionProcessor endTransactionProcessor;

@Mock
Expand Down Expand Up @@ -95,20 +98,26 @@ private OperationResult createResponse(int status) {
public void testProcessRequest() throws RemotingCommandException {
when(transactionMsgService.commitMessage(any(EndTransactionRequestHeader.class))).thenReturn(createResponse(ResponseCode.SUCCESS));
when(messageStore.putMessage(any(MessageExtBrokerInner.class)))
.thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
.thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, createAppendMessageResult(AppendMessageStatus.PUT_OK)));
RemotingCommand request = createEndTransactionMsgCommand(MessageSysFlag.TRANSACTION_COMMIT_TYPE, false);
RemotingCommand response = endTransactionProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
assertThat(brokerController.getBrokerStatsManager().getStatsItem(Stats.BROKER_PUT_NUMS, brokerController.getBrokerConfig().getBrokerClusterName()).getValue().sum()).isEqualTo(1);
assertThat(brokerController.getBrokerStatsManager().getStatsItem(Stats.TOPIC_PUT_NUMS, TOPIC).getValue().sum()).isEqualTo(1L);
assertThat(brokerController.getBrokerStatsManager().getStatsItem(Stats.TOPIC_PUT_SIZE, TOPIC).getValue().sum()).isEqualTo(1L);
}

@Test
public void testProcessRequest_CheckMessage() throws RemotingCommandException {
when(transactionMsgService.commitMessage(any(EndTransactionRequestHeader.class))).thenReturn(createResponse(ResponseCode.SUCCESS));
when(messageStore.putMessage(any(MessageExtBrokerInner.class)))
.thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
.thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, createAppendMessageResult(AppendMessageStatus.PUT_OK)));
RemotingCommand request = createEndTransactionMsgCommand(MessageSysFlag.TRANSACTION_COMMIT_TYPE, true);
RemotingCommand response = endTransactionProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
assertThat(brokerController.getBrokerStatsManager().getStatsItem(Stats.BROKER_PUT_NUMS, brokerController.getBrokerConfig().getBrokerClusterName()).getValue().sum()).isEqualTo(1);
assertThat(brokerController.getBrokerStatsManager().getStatsItem(Stats.TOPIC_PUT_NUMS, TOPIC).getValue().sum()).isEqualTo(1L);
assertThat(brokerController.getBrokerStatsManager().getStatsItem(Stats.TOPIC_PUT_SIZE, TOPIC).getValue().sum()).isEqualTo(1L);
}

@Test
Expand Down Expand Up @@ -148,6 +157,7 @@ private MessageExt createDefaultMessageExt() {
messageExt.setQueueId(0);
messageExt.setCommitLogOffset(123456789L);
messageExt.setQueueOffset(1234);
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_REAL_TOPIC, TOPIC);
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_REAL_QUEUE_ID, "0");
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_PRODUCER_GROUP, "testTransactionGroup");
Expand Down Expand Up @@ -195,4 +205,12 @@ private MessageExt createRejectMessageExt() {
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, "60");
return messageExt;
}

private AppendMessageResult createAppendMessageResult(AppendMessageStatus status) {
AppendMessageResult result = new AppendMessageResult(status);
result.setMsgId("12345678");
result.setMsgNum(1);
result.setWroteBytes(1);
return result;
}
}

0 comments on commit 01761bd

Please sign in to comment.