Skip to content

Commit

Permalink
[RIP-46] Enhanced metrics for timing and transactional messages (apac…
Browse files Browse the repository at this point in the history
…he#7500)

* add request codes' distribution and timing messages' distribution

* remove the requestCode distribution.

* add delay message latency distribution.

* add transaction metrics

* transaction metric of topics finished, v1.

* add the transaction metrics, to be tested.

* fix the judgement of putMessageResult

* optimize.

* add config.

* fix test case.

* add unit tests for transactionMetrics.

* remove chinese character

* add rocksdb metrics.

* add more rocksdb metrics.

* fix NPE

* avoid the total time is 0.

* add license

* remove useless import.
  • Loading branch information
GenerousMan authored Jan 22, 2024
1 parent 920dc32 commit 6d75134
Show file tree
Hide file tree
Showing 25 changed files with 856 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import org.apache.rocketmq.broker.topic.TopicQueueMappingManager;
import org.apache.rocketmq.broker.topic.TopicRouteInfoManager;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.TransactionMetricsFlushService;
import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageCheckListener;
Expand Down Expand Up @@ -277,6 +278,7 @@ public class BrokerController {
private BrokerMetricsManager brokerMetricsManager;
private ColdDataPullRequestHoldService coldDataPullRequestHoldService;
private ColdDataCgCtrService coldDataCgCtrService;
private TransactionMetricsFlushService transactionMetricsFlushService;

public BrokerController(
final BrokerConfig brokerConfig,
Expand Down Expand Up @@ -963,6 +965,9 @@ private void initialTransaction() {
}
this.transactionalMessageCheckListener.setBrokerController(this);
this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
this.transactionMetricsFlushService = new TransactionMetricsFlushService(this);
this.transactionMetricsFlushService.start();

}

private void initialAcl() {
Expand Down Expand Up @@ -1440,6 +1445,10 @@ protected void shutdownBasicService() {
this.endTransactionExecutor.shutdown();
}

if (this.transactionMetricsFlushService != null) {
this.transactionMetricsFlushService.shutdown();
}

if (this.escapeBridge != null) {
escapeBridge.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ public static String getTimerCheckPath(final String rootDir) {
public static String getTimerMetricsPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "timermetrics";
}
public static String getTransactionMetricsPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "transactionMetrics";
}

public static String getConsumerFilterPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "consumerFilter.json";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ public class BrokerMetricsConstant {
public static final String GAUGE_CONSUMER_READY_MESSAGES = "rocketmq_consumer_ready_messages";
public static final String COUNTER_CONSUMER_SEND_TO_DLQ_MESSAGES_TOTAL = "rocketmq_send_to_dlq_messages_total";

public static final String COUNTER_COMMIT_MESSAGES_TOTAL = "rocketmq_commit_messages_total";
public static final String COUNTER_ROLLBACK_MESSAGES_TOTAL = "rocketmq_rollback_messages_total";
public static final String HISTOGRAM_FINISH_MSG_LATENCY = "rocketmq_finish_message_latency";
public static final String GAUGE_HALF_MESSAGES = "rocketmq_half_messages";

public static final String LABEL_CLUSTER_NAME = "cluster";
public static final String LABEL_NODE_TYPE = "node_type";
public static final String NODE_TYPE_BROKER = "broker";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,6 @@
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
import io.opentelemetry.sdk.resources.Resource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ConsumerManager;
Expand All @@ -68,12 +61,23 @@
import org.apache.rocketmq.remoting.metrics.RemotingMetricsManager;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant;
import org.slf4j.bridge.SLF4JBridgeHandler;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.AGGREGATION_DELTA;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_COMMIT_MESSAGES_TOTAL;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_CONSUMER_SEND_TO_DLQ_MESSAGES_TOTAL;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_MESSAGES_IN_TOTAL;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_MESSAGES_OUT_TOTAL;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_ROLLBACK_MESSAGES_TOTAL;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_THROUGHPUT_IN_TOTAL;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.COUNTER_THROUGHPUT_OUT_TOTAL;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_BROKER_PERMISSION;
Expand All @@ -83,8 +87,10 @@
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_CONSUMER_LAG_MESSAGES;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_CONSUMER_QUEUEING_LATENCY;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_CONSUMER_READY_MESSAGES;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_HALF_MESSAGES;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_PROCESSOR_WATERMARK;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.GAUGE_PRODUCER_CONNECTIONS;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_FINISH_MSG_LATENCY;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_MESSAGE_SIZE;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_AGGREGATION;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CLUSTER_NAME;
Expand Down Expand Up @@ -141,6 +147,10 @@ public class BrokerMetricsManager {
public static ObservableLongGauge consumerQueueingLatency = new NopObservableLongGauge();
public static ObservableLongGauge consumerReadyMessages = new NopObservableLongGauge();
public static LongCounter sendToDlqMessages = new NopLongCounter();
public static ObservableLongGauge halfMessages = new NopObservableLongGauge();
public static LongCounter commitMessagesTotal = new NopLongCounter();
public static LongCounter rollBackMessagesTotal = new NopLongCounter();
public static LongHistogram transactionFinishLatency = new NopLongHistogram();

public static final List<String> SYSTEM_GROUP_PREFIX_LIST = new ArrayList<String>() {
{
Expand Down Expand Up @@ -348,6 +358,7 @@ private void init() {
initRequestMetrics();
initConnectionMetrics();
initLagAndDlqMetrics();
initTransactionMetrics();
initOtherMetrics();
}

Expand All @@ -361,6 +372,15 @@ private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) {
2d * 1024 * 1024, //2MB
4d * 1024 * 1024 //4MB
);

List<Double> commitLatencyBuckets = Arrays.asList(
1d * 1 * 1 * 5, //5s
1d * 1 * 1 * 60, //1min
1d * 1 * 10 * 60, //10min
1d * 1 * 60 * 60, //1h
1d * 12 * 60 * 60, //12h
1d * 24 * 60 * 60 //24h
);
InstrumentSelector messageSizeSelector = InstrumentSelector.builder()
.setType(InstrumentType.HISTOGRAM)
.setName(HISTOGRAM_MESSAGE_SIZE)
Expand All @@ -371,6 +391,16 @@ private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) {
SdkMeterProviderUtil.setCardinalityLimit(messageSizeViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
providerBuilder.registerView(messageSizeSelector, messageSizeViewBuilder.build());

InstrumentSelector commitLatencySelector = InstrumentSelector.builder()
.setType(InstrumentType.HISTOGRAM)
.setName(HISTOGRAM_FINISH_MSG_LATENCY)
.build();
ViewBuilder commitLatencyViewBuilder = View.builder()
.setAggregation(Aggregation.explicitBucketHistogram(commitLatencyBuckets));
// To config the cardinalityLimit for openTelemetry metrics exporting.
SdkMeterProviderUtil.setCardinalityLimit(commitLatencyViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
providerBuilder.registerView(commitLatencySelector, commitLatencyViewBuilder.build());

for (Pair<InstrumentSelector, ViewBuilder> selectorViewPair : RemotingMetricsManager.getMetricsView()) {
ViewBuilder viewBuilder = selectorViewPair.getObject2();
SdkMeterProviderUtil.setCardinalityLimit(viewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
Expand Down Expand Up @@ -560,6 +590,34 @@ private void initLagAndDlqMetrics() {
.build();
}

private void initTransactionMetrics() {
commitMessagesTotal = brokerMeter.counterBuilder(COUNTER_COMMIT_MESSAGES_TOTAL)
.setDescription("Total number of commit messages")
.build();

rollBackMessagesTotal = brokerMeter.counterBuilder(COUNTER_ROLLBACK_MESSAGES_TOTAL)
.setDescription("Total number of rollback messages")
.build();

transactionFinishLatency = brokerMeter.histogramBuilder(HISTOGRAM_FINISH_MSG_LATENCY)
.setDescription("Transaction finish latency")
.ofLongs()
.setUnit("ms")
.build();

halfMessages = brokerMeter.gaugeBuilder(GAUGE_HALF_MESSAGES)
.setDescription("Half messages of all topics")
.ofLongs()
.buildWithCallback(measurement -> {
brokerController.getTransactionalMessageService().getTransactionMetrics().getTransactionCounts()
.forEach((topic, metric) -> {
measurement.record(
metric.getCount().get(),
newAttributesBuilder().put(DefaultStoreMetricsConstant.LABEL_TOPIC, topic).build()
);
});
});
}
private void initOtherMetrics() {
RemotingMetricsManager.initMetrics(brokerMeter, BrokerMetricsManager::newAttributesBuilder);
messageStore.initMetrics(brokerMeter, BrokerMetricsManager::newAttributesBuilder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.netty.channel.ChannelHandlerContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.broker.transaction.OperationResult;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.common.TopicFilterType;
Expand All @@ -40,6 +41,8 @@
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.BrokerRole;

import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;

/**
* EndTransaction processor: process commit and rollback message
*/
Expand Down Expand Up @@ -144,6 +147,16 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
// successful committed, then total num of half-messages minus 1
this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(msgInner.getTopic(), -1);
BrokerMetricsManager.commitMessagesTotal.add(1, BrokerMetricsManager.newAttributesBuilder()
.put(LABEL_TOPIC, msgInner.getTopic())
.build());
// record the commit latency.
Long commitLatency = (System.currentTimeMillis() - result.getPrepareMessage().getBornTimestamp()) / 1000;
BrokerMetricsManager.transactionFinishLatency.record(commitLatency, BrokerMetricsManager.newAttributesBuilder()
.put(LABEL_TOPIC, msgInner.getTopic())
.build());
}
return sendResult;
}
Expand All @@ -161,6 +174,11 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
// roll back, then total num of half-messages minus 1
this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(result.getPrepareMessage().getProperty(MessageConst.PROPERTY_REAL_TOPIC), -1);
BrokerMetricsManager.rollBackMessagesTotal.add(1, BrokerMetricsManager.newAttributesBuilder()
.put(LABEL_TOPIC, result.getPrepareMessage().getProperty(MessageConst.PROPERTY_REAL_TOPIC))
.build());
}
return res;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@

import io.netty.channel.ChannelHandlerContext;
import io.opentelemetry.api.common.Attributes;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
Expand Down Expand Up @@ -65,10 +60,17 @@
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;

import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_MESSAGE_TYPE;
Expand Down Expand Up @@ -300,7 +302,7 @@ public RemotingCommand sendMessage(final ChannelHandlerContext ctx,

// Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
boolean sendTransactionPrepareMessage = false;
boolean sendTransactionPrepareMessage;
if (Boolean.parseBoolean(traFlag)
&& !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
Expand All @@ -311,6 +313,8 @@ public RemotingCommand sendMessage(final ChannelHandlerContext ctx,
return response;
}
sendTransactionPrepareMessage = true;
} else {
sendTransactionPrepareMessage = false;
}

long beginTimeMillis = this.brokerController.getMessageStore().now();
Expand All @@ -332,6 +336,12 @@ public RemotingCommand sendMessage(final ChannelHandlerContext ctx,
if (responseFuture != null) {
doResponse(ctx, request, responseFuture);
}

// record the transaction metrics, responseFuture == null means put successfully
if (sendTransactionPrepareMessage && (responseFuture == null || responseFuture.getCode() == ResponseCode.SUCCESS)) {
this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC), 1);
}

sendMessageCallback.onComplete(sendMessageContext, response);
}, this.brokerController.getPutMessageFutureExecutor());
// Returns null to release the send message thread
Expand All @@ -344,6 +354,10 @@ public RemotingCommand sendMessage(final ChannelHandlerContext ctx,
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt, beginTimeMillis, mappingContext, BrokerMetricsManager.getMessageType(requestHeader));
// record the transaction metrics
if (putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK && putMessageResult.getAppendMessageResult().isOk()) {
this.brokerController.getTransactionalMessageService().getTransactionMetrics().addAndGet(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC), 1);
}
sendMessageCallback.onComplete(sendMessageContext, response);
return response;
}
Expand Down
Loading

0 comments on commit 6d75134

Please sign in to comment.