Skip to content

Commit

Permalink
feat: implement store metrics of topic/queue store-size and reserve-t…
Browse files Browse the repository at this point in the history
…ime (#653)

Signed-off-by: Li Zhanhui <[email protected]>
  • Loading branch information
lizhanhui authored and SCNieh committed Nov 17, 2023
1 parent 327d542 commit d149297
Show file tree
Hide file tree
Showing 22 changed files with 609 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ public class BrokerController implements Lifecycle {
private final MessageService messageService;
private final ExtendMessageService extendMessageService;

private final S3MetadataService s3MetadataService;

public BrokerController(BrokerConfig brokerConfig) throws Exception {
this.brokerConfig = brokerConfig;

Expand All @@ -100,7 +102,7 @@ public BrokerController(BrokerConfig brokerConfig) throws Exception {
metadataStore = MetadataStoreBuilder.build(brokerConfig);

proxyMetadataService = new DefaultProxyMetadataService(metadataStore);
S3MetadataService s3MetadataService = new DefaultS3MetadataService(metadataStore.config(),
s3MetadataService = new DefaultS3MetadataService(metadataStore.config(),
metadataStore.sessionFactory(), metadataStore.asyncExecutor());
storeMetadataService = new DefaultStoreMetadataService(metadataStore, s3MetadataService);

Expand Down Expand Up @@ -170,7 +172,8 @@ public BrokerController(BrokerConfig brokerConfig) throws Exception {
}

// Init the metrics exporter before accept requests.
metricsExporter = new MetricsExporter(brokerConfig, messageStore, messagingProcessor, resource, sdkTracerProvider);
metricsExporter = new MetricsExporter(brokerConfig, messageStore, messagingProcessor, resource,
sdkTracerProvider, metadataStore, s3MetadataService);

// Init the profiler agent.
ProfilerConfig profilerConfig = brokerConfig.profiler();
Expand Down Expand Up @@ -204,6 +207,7 @@ public void start() throws Exception {
remotingServer.start();
metadataStore.registerCurrentNode(brokerConfig.name(), brokerConfig.advertiseAddress(), brokerConfig.instanceId());
metricsExporter.start();
s3MetadataService.start();

startThreadPoolMonitor();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import com.automq.rocketmq.common.config.BrokerConfig;
import com.automq.rocketmq.common.config.MetricsConfig;
import com.automq.rocketmq.common.util.Lifecycle;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.metadata.service.S3MetadataService;
import com.automq.rocketmq.controller.server.TopicMetricsManager;
import com.automq.rocketmq.proxy.metrics.ProxyMetricsManager;
import com.automq.rocketmq.proxy.processor.ExtendMessagingProcessor;
import com.automq.rocketmq.store.MessageStoreImpl;
Expand Down Expand Up @@ -84,15 +87,19 @@ public class MetricsExporter implements Lifecycle {
private final StoreMetricsManager storeMetricsManager;
private final StreamMetricsManager streamMetricsManager;

private final TopicMetricsManager topicMetricsManager;

public static Supplier<AttributesBuilder> attributesBuilderSupplier = Attributes::builder;

public MetricsExporter(BrokerConfig brokerConfig, MessageStoreImpl messageStore,
ExtendMessagingProcessor messagingProcessor, Resource resource, SdkTracerProvider tracerProvider) {
ExtendMessagingProcessor messagingProcessor, Resource resource, SdkTracerProvider tracerProvider,
MetadataStore metadataStore, S3MetadataService s3MetadataService) {
this.brokerConfig = brokerConfig;
this.metricsConfig = brokerConfig.metrics();
this.proxyMetricsManager = new ProxyMetricsManager(messagingProcessor);
this.storeMetricsManager = new StoreMetricsManager(metricsConfig, messageStore);
this.streamMetricsManager = new StreamMetricsManager();
this.topicMetricsManager = new TopicMetricsManager(metadataStore, s3MetadataService);
init(resource, tracerProvider);
S3StreamMetricsRegistry.setMetricsGroup(this.streamMetricsManager);
}
Expand Down Expand Up @@ -240,18 +247,21 @@ private void initAttributesBuilder() {
streamMetricsManager.initAttributesBuilder(MetricsExporter::newAttributesBuilder);
storeMetricsManager.initAttributesBuilder(MetricsExporter::newAttributesBuilder);
proxyMetricsManager.initAttributesBuilder(MetricsExporter::newAttributesBuilder);
topicMetricsManager.initAttributesBuilder(MetricsExporter::newAttributesBuilder);
}

private void initStaticMetrics() {
streamMetricsManager.initStaticMetrics(brokerMeter);
storeMetricsManager.initStaticMetrics(brokerMeter);
proxyMetricsManager.initStaticMetrics(brokerMeter);
topicMetricsManager.initStaticMetrics(brokerMeter);
}

private void initDynamicMetrics() {
streamMetricsManager.initDynamicMetrics(brokerMeter);
storeMetricsManager.initDynamicMetrics(brokerMeter);
proxyMetricsManager.initDynamicMetrics(brokerMeter);
topicMetricsManager.initDynamicMetrics(brokerMeter);
storeMetricsManager.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ public interface MetadataStore extends Closeable {

ElectionService electionService();

List<QueueAssignment> assignmentsOf(int nodeId);

List<Long> streamsOf(long topicId, int queueId);

void start();

CompletableFuture<Cluster> describeCluster(DescribeClusterRequest request);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.controller.server;

import com.automq.rocketmq.common.MetricsManager;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.metadata.dao.QueueAssignment;
import com.automq.rocketmq.metadata.service.S3MetadataService;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableLongGauge;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public class TopicMetricsManager implements MetricsManager {

private static Supplier<AttributesBuilder> attributesBuilderSupplier;
public static final String GAUGE_STORAGE_SIZE = "rocketmq_storage_size";
public static final String GAUGE_STORAGE_MESSAGE_RESERVE_TIME = "rocketmq_storage_message_reserve_time";

public static final String LABEL_TOPIC = "topic";
public static final String LABEL_QUEUE = "queue";

private final MetadataStore metadataStore;

private final S3MetadataService s3MetadataService;

private ObservableLongGauge storeGauge;

private ObservableLongGauge reserveTimeGauge;

public TopicMetricsManager(MetadataStore metadataStore, S3MetadataService s3MetadataService) {
this.metadataStore = metadataStore;
this.s3MetadataService = s3MetadataService;
}

@Override
public void initAttributesBuilder(Supplier<AttributesBuilder> attributesBuilderSupplier) {
TopicMetricsManager.attributesBuilderSupplier = attributesBuilderSupplier;
}

private void measureStoreSize(ObservableLongMeasurement measurement) {
List<QueueAssignment> list = metadataStore.assignmentsOf(metadataStore.config().nodeId());
for (QueueAssignment assignment : list) {
List<Long> streamIds = metadataStore.streamsOf(assignment.getTopicId(), assignment.getQueueId());
long total = 0;
for (long id : streamIds) {
total += s3MetadataService.streamDataSize(id);
}
Optional<String> topic = metadataStore.topicManager().nameOf(assignment.getTopicId());
if (topic.isEmpty()) {
continue;
}
measurement.record(total,
attributesBuilderSupplier.get().put(LABEL_TOPIC, topic.get())
.put(LABEL_QUEUE, assignment.getQueueId()).build()
);
}
}

private void measureReserveTime(ObservableLongMeasurement measurement) {
List<QueueAssignment> list = metadataStore.assignmentsOf(metadataStore.config().nodeId());
long currentTs = System.currentTimeMillis();
for (QueueAssignment assignment : list) {
List<Long> streamIds = metadataStore.streamsOf(assignment.getTopicId(), assignment.getQueueId());
long startTime = Long.MAX_VALUE;
for (long id : streamIds) {
long ts = s3MetadataService.streamStartTime(id);
if (ts < startTime) {
startTime = ts;
}
}
Optional<String> topic = metadataStore.topicManager().nameOf(assignment.getTopicId());
if (topic.isEmpty()) {
continue;
}
long reserveTime = 0;
if (startTime < currentTs) {
reserveTime = TimeUnit.MILLISECONDS.toSeconds(currentTs - startTime);
}
measurement.record(reserveTime,
attributesBuilderSupplier.get().put(LABEL_TOPIC, topic.get())
.put(LABEL_QUEUE, assignment.getQueueId()).build()
);
}
}

@Override
public void initStaticMetrics(Meter meter) {
storeGauge = meter.gaugeBuilder(GAUGE_STORAGE_SIZE)
.setUnit("byte")
.ofLongs().setDescription("Storage size per topic/queue")
.buildWithCallback(this::measureStoreSize);

reserveTimeGauge = meter.gaugeBuilder(GAUGE_STORAGE_MESSAGE_RESERVE_TIME)
.setUnit("Second")
.ofLongs()
.setDescription("Spanning duration of the reserved messages per topic/queue")
.buildWithCallback(this::measureReserveTime);
}

@Override
public void initDynamicMetrics(Meter meter) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,16 @@ public ElectionService electionService() {
return electionService;
}

@Override
public List<QueueAssignment> assignmentsOf(int nodeId) {
return topicManager.getAssignmentCache().byNode(nodeId);
}

@Override
public List<Long> streamsOf(long topicId, int queueId) {
return topicManager.getStreamCache().streamsOf(topicId, queueId);
}

/**
* Expose for test purpose only.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ public TopicManager(MetadataStore metadataStore) {
this.topicNameRequests = new ConcurrentHashMap<>();
}

public Optional<String> nameOf(long topicId) {
Topic topic = topicCache.byId(topicId);
if (null == topic) {
return Optional.empty();
}
return Optional.of(topic.getName());
}

public TopicCache getTopicCache() {
return topicCache;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.automq.rocketmq.controller.server.store.impl.cache;

import com.automq.rocketmq.metadata.dao.QueueAssignment;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -42,6 +43,27 @@ public void apply(List<QueueAssignment> assignments) {
}
}

public List<QueueAssignment> byNode(int nodeId) {
List<QueueAssignment> result = new ArrayList<>();
for (Map.Entry<Long, Map<Integer, QueueAssignment>> entry : assignments.entrySet()) {
for (Map.Entry<Integer, QueueAssignment> e : entry.getValue().entrySet()) {
switch (e.getValue().getStatus()) {
case ASSIGNMENT_STATUS_YIELDING -> {
if (e.getValue().getSrcNodeId() == nodeId) {
result.add(e.getValue());
}
}
case ASSIGNMENT_STATUS_ASSIGNED -> {
if (e.getValue().getDstNodeId() == nodeId) {
result.add(e.getValue());
}
}
}
}
}
return result;
}

public Map<Integer, QueueAssignment> byTopicId(Long topicId) {
return assignments.get(topicId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import apache.rocketmq.controller.v1.StreamState;
import com.automq.rocketmq.metadata.dao.Stream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -42,6 +44,16 @@ public int streamNumOfNode(int nodeId) {
return count;
}

public List<Long> streamsOf(long topicId, int queueId) {
List<Long> result = new ArrayList<>();
for (Map.Entry<Long, Stream> entry : streams.entrySet()) {
if (entry.getValue().getTopicId() == topicId && entry.getValue().getQueueId() == queueId) {
result.add(entry.getKey());
}
}
return result;
}

public void apply(Collection<Stream> streams) {
for (Stream stream : streams) {
cacheItem(stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ class AssignmentCacheTest {
@Test
public void testApply() {
AssignmentCache cache = new AssignmentCache();

QueueAssignment assignment = new QueueAssignment();
assignment.setStatus(AssignmentStatus.ASSIGNMENT_STATUS_ASSIGNED);
assignment.setTopicId(1);
Expand All @@ -45,7 +44,21 @@ public void testApply() {

Assertions.assertEquals(1, cache.queueNumOfNode(4));
Assertions.assertEquals(0, cache.queueNumOfNode(3));
}

@Test
public void testByNode() {
AssignmentCache cache = new AssignmentCache();
QueueAssignment assignment = new QueueAssignment();
assignment.setStatus(AssignmentStatus.ASSIGNMENT_STATUS_ASSIGNED);
assignment.setTopicId(1);
assignment.setQueueId(2);
assignment.setSrcNodeId(3);
assignment.setDstNodeId(4);
cache.apply(List.of(assignment));

Assertions.assertFalse(cache.byNode(4).isEmpty());
Assertions.assertTrue(cache.byNode(3).isEmpty());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,23 @@ public void testApply() {
Assertions.assertEquals(0, cache.streamNumOfNode(6));
}

@Test
public void testStreamsOf() {
StreamCache cache = new StreamCache();
Stream stream = new Stream();
stream.setStreamRole(StreamRole.STREAM_ROLE_DATA);
stream.setRangeId(1);
stream.setTopicId(2L);
stream.setQueueId(3);
stream.setId(4L);
stream.setEpoch(5L);
stream.setSrcNodeId(6);
stream.setDstNodeId(7);
stream.setState(StreamState.OPEN);
cache.apply(List.of(stream));

List<Long> streams = cache.streamsOf(2, 3);
Assertions.assertFalse(streams.isEmpty());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,6 @@ public interface S3ObjectMapper {
int prepare(S3Object s3Object);

int rollback(@Param("current")Date current);

long totalDataSize(@Param("streamId") long streamId);
}
Loading

0 comments on commit d149297

Please sign in to comment.