diff --git a/broker/src/main/java/com/automq/rocketmq/broker/BrokerController.java b/broker/src/main/java/com/automq/rocketmq/broker/BrokerController.java index a74793145..f9ed3ca33 100644 --- a/broker/src/main/java/com/automq/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/com/automq/rocketmq/broker/BrokerController.java @@ -91,6 +91,8 @@ public class BrokerController implements Lifecycle { private final MessageService messageService; private final ExtendMessageService extendMessageService; + private final S3MetadataService s3MetadataService; + public BrokerController(BrokerConfig brokerConfig) throws Exception { this.brokerConfig = brokerConfig; @@ -100,7 +102,7 @@ public BrokerController(BrokerConfig brokerConfig) throws Exception { metadataStore = MetadataStoreBuilder.build(brokerConfig); proxyMetadataService = new DefaultProxyMetadataService(metadataStore); - S3MetadataService s3MetadataService = new DefaultS3MetadataService(metadataStore.config(), + s3MetadataService = new DefaultS3MetadataService(metadataStore.config(), metadataStore.sessionFactory(), metadataStore.asyncExecutor()); storeMetadataService = new DefaultStoreMetadataService(metadataStore, s3MetadataService); @@ -170,7 +172,8 @@ public BrokerController(BrokerConfig brokerConfig) throws Exception { } // Init the metrics exporter before accept requests. - metricsExporter = new MetricsExporter(brokerConfig, messageStore, messagingProcessor, resource, sdkTracerProvider); + metricsExporter = new MetricsExporter(brokerConfig, messageStore, messagingProcessor, resource, + sdkTracerProvider, metadataStore, s3MetadataService); // Init the profiler agent. ProfilerConfig profilerConfig = brokerConfig.profiler(); @@ -204,6 +207,7 @@ public void start() throws Exception { remotingServer.start(); metadataStore.registerCurrentNode(brokerConfig.name(), brokerConfig.advertiseAddress(), brokerConfig.instanceId()); metricsExporter.start(); + s3MetadataService.start(); startThreadPoolMonitor(); } diff --git a/broker/src/main/java/com/automq/rocketmq/broker/MetricsExporter.java b/broker/src/main/java/com/automq/rocketmq/broker/MetricsExporter.java index 07715eb9b..06b47bf3a 100644 --- a/broker/src/main/java/com/automq/rocketmq/broker/MetricsExporter.java +++ b/broker/src/main/java/com/automq/rocketmq/broker/MetricsExporter.java @@ -20,6 +20,9 @@ import com.automq.rocketmq.common.config.BrokerConfig; import com.automq.rocketmq.common.config.MetricsConfig; import com.automq.rocketmq.common.util.Lifecycle; +import com.automq.rocketmq.controller.MetadataStore; +import com.automq.rocketmq.metadata.service.S3MetadataService; +import com.automq.rocketmq.controller.server.TopicMetricsManager; import com.automq.rocketmq.proxy.metrics.ProxyMetricsManager; import com.automq.rocketmq.proxy.processor.ExtendMessagingProcessor; import com.automq.rocketmq.store.MessageStoreImpl; @@ -84,15 +87,19 @@ public class MetricsExporter implements Lifecycle { private final StoreMetricsManager storeMetricsManager; private final StreamMetricsManager streamMetricsManager; + private final TopicMetricsManager topicMetricsManager; + public static Supplier attributesBuilderSupplier = Attributes::builder; public MetricsExporter(BrokerConfig brokerConfig, MessageStoreImpl messageStore, - ExtendMessagingProcessor messagingProcessor, Resource resource, SdkTracerProvider tracerProvider) { + ExtendMessagingProcessor messagingProcessor, Resource resource, SdkTracerProvider tracerProvider, + MetadataStore metadataStore, S3MetadataService s3MetadataService) { this.brokerConfig = brokerConfig; this.metricsConfig = brokerConfig.metrics(); this.proxyMetricsManager = new ProxyMetricsManager(messagingProcessor); this.storeMetricsManager = new StoreMetricsManager(metricsConfig, messageStore); this.streamMetricsManager = new StreamMetricsManager(); + this.topicMetricsManager = new TopicMetricsManager(metadataStore, s3MetadataService); init(resource, tracerProvider); S3StreamMetricsRegistry.setMetricsGroup(this.streamMetricsManager); } @@ -240,18 +247,21 @@ private void initAttributesBuilder() { streamMetricsManager.initAttributesBuilder(MetricsExporter::newAttributesBuilder); storeMetricsManager.initAttributesBuilder(MetricsExporter::newAttributesBuilder); proxyMetricsManager.initAttributesBuilder(MetricsExporter::newAttributesBuilder); + topicMetricsManager.initAttributesBuilder(MetricsExporter::newAttributesBuilder); } private void initStaticMetrics() { streamMetricsManager.initStaticMetrics(brokerMeter); storeMetricsManager.initStaticMetrics(brokerMeter); proxyMetricsManager.initStaticMetrics(brokerMeter); + topicMetricsManager.initStaticMetrics(brokerMeter); } private void initDynamicMetrics() { streamMetricsManager.initDynamicMetrics(brokerMeter); storeMetricsManager.initDynamicMetrics(brokerMeter); proxyMetricsManager.initDynamicMetrics(brokerMeter); + topicMetricsManager.initDynamicMetrics(brokerMeter); storeMetricsManager.start(); } diff --git a/controller/src/main/java/com/automq/rocketmq/controller/MetadataStore.java b/controller/src/main/java/com/automq/rocketmq/controller/MetadataStore.java index cb1cd279d..78d759ce2 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/MetadataStore.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/MetadataStore.java @@ -80,6 +80,10 @@ public interface MetadataStore extends Closeable { ElectionService electionService(); + List assignmentsOf(int nodeId); + + List streamsOf(long topicId, int queueId); + void start(); CompletableFuture describeCluster(DescribeClusterRequest request); diff --git a/controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java b/controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java new file mode 100644 index 000000000..471c549f4 --- /dev/null +++ b/controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.rocketmq.controller.server; + +import com.automq.rocketmq.common.MetricsManager; +import com.automq.rocketmq.controller.MetadataStore; +import com.automq.rocketmq.metadata.dao.QueueAssignment; +import com.automq.rocketmq.metadata.service.S3MetadataService; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableLongGauge; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +public class TopicMetricsManager implements MetricsManager { + + private static Supplier attributesBuilderSupplier; + public static final String GAUGE_STORAGE_SIZE = "rocketmq_storage_size"; + public static final String GAUGE_STORAGE_MESSAGE_RESERVE_TIME = "rocketmq_storage_message_reserve_time"; + + public static final String LABEL_TOPIC = "topic"; + public static final String LABEL_QUEUE = "queue"; + + private final MetadataStore metadataStore; + + private final S3MetadataService s3MetadataService; + + private ObservableLongGauge storeGauge; + + private ObservableLongGauge reserveTimeGauge; + + public TopicMetricsManager(MetadataStore metadataStore, S3MetadataService s3MetadataService) { + this.metadataStore = metadataStore; + this.s3MetadataService = s3MetadataService; + } + + @Override + public void initAttributesBuilder(Supplier attributesBuilderSupplier) { + TopicMetricsManager.attributesBuilderSupplier = attributesBuilderSupplier; + } + + private void measureStoreSize(ObservableLongMeasurement measurement) { + List list = metadataStore.assignmentsOf(metadataStore.config().nodeId()); + for (QueueAssignment assignment : list) { + List streamIds = metadataStore.streamsOf(assignment.getTopicId(), assignment.getQueueId()); + long total = 0; + for (long id : streamIds) { + total += s3MetadataService.streamDataSize(id); + } + Optional topic = metadataStore.topicManager().nameOf(assignment.getTopicId()); + if (topic.isEmpty()) { + continue; + } + measurement.record(total, + attributesBuilderSupplier.get().put(LABEL_TOPIC, topic.get()) + .put(LABEL_QUEUE, assignment.getQueueId()).build() + ); + } + } + + private void measureReserveTime(ObservableLongMeasurement measurement) { + List list = metadataStore.assignmentsOf(metadataStore.config().nodeId()); + long currentTs = System.currentTimeMillis(); + for (QueueAssignment assignment : list) { + List streamIds = metadataStore.streamsOf(assignment.getTopicId(), assignment.getQueueId()); + long startTime = Long.MAX_VALUE; + for (long id : streamIds) { + long ts = s3MetadataService.streamStartTime(id); + if (ts < startTime) { + startTime = ts; + } + } + Optional topic = metadataStore.topicManager().nameOf(assignment.getTopicId()); + if (topic.isEmpty()) { + continue; + } + long reserveTime = 0; + if (startTime < currentTs) { + reserveTime = TimeUnit.MILLISECONDS.toSeconds(currentTs - startTime); + } + measurement.record(reserveTime, + attributesBuilderSupplier.get().put(LABEL_TOPIC, topic.get()) + .put(LABEL_QUEUE, assignment.getQueueId()).build() + ); + } + } + + @Override + public void initStaticMetrics(Meter meter) { + storeGauge = meter.gaugeBuilder(GAUGE_STORAGE_SIZE) + .setUnit("byte") + .ofLongs().setDescription("Storage size per topic/queue") + .buildWithCallback(this::measureStoreSize); + + reserveTimeGauge = meter.gaugeBuilder(GAUGE_STORAGE_MESSAGE_RESERVE_TIME) + .setUnit("Second") + .ofLongs() + .setDescription("Spanning duration of the reserved messages per topic/queue") + .buildWithCallback(this::measureReserveTime); + } + + @Override + public void initDynamicMetrics(Meter meter) { + } +} diff --git a/controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java b/controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java index 531b2ce3c..7492cbda3 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java @@ -171,6 +171,16 @@ public ElectionService electionService() { return electionService; } + @Override + public List assignmentsOf(int nodeId) { + return topicManager.getAssignmentCache().byNode(nodeId); + } + + @Override + public List streamsOf(long topicId, int queueId) { + return topicManager.getStreamCache().streamsOf(topicId, queueId); + } + /** * Expose for test purpose only. * diff --git a/controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/TopicManager.java b/controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/TopicManager.java index 115f9af66..91567ed8a 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/TopicManager.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/TopicManager.java @@ -89,6 +89,14 @@ public TopicManager(MetadataStore metadataStore) { this.topicNameRequests = new ConcurrentHashMap<>(); } + public Optional nameOf(long topicId) { + Topic topic = topicCache.byId(topicId); + if (null == topic) { + return Optional.empty(); + } + return Optional.of(topic.getName()); + } + public TopicCache getTopicCache() { return topicCache; } diff --git a/controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/cache/AssignmentCache.java b/controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/cache/AssignmentCache.java index f26c69928..1a8cc3472 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/cache/AssignmentCache.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/cache/AssignmentCache.java @@ -18,6 +18,7 @@ package com.automq.rocketmq.controller.server.store.impl.cache; import com.automq.rocketmq.metadata.dao.QueueAssignment; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -42,6 +43,27 @@ public void apply(List assignments) { } } + public List byNode(int nodeId) { + List result = new ArrayList<>(); + for (Map.Entry> entry : assignments.entrySet()) { + for (Map.Entry e : entry.getValue().entrySet()) { + switch (e.getValue().getStatus()) { + case ASSIGNMENT_STATUS_YIELDING -> { + if (e.getValue().getSrcNodeId() == nodeId) { + result.add(e.getValue()); + } + } + case ASSIGNMENT_STATUS_ASSIGNED -> { + if (e.getValue().getDstNodeId() == nodeId) { + result.add(e.getValue()); + } + } + } + } + } + return result; + } + public Map byTopicId(Long topicId) { return assignments.get(topicId); } diff --git a/controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/cache/StreamCache.java b/controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/cache/StreamCache.java index ee3712322..9cbe3181c 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/cache/StreamCache.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/cache/StreamCache.java @@ -19,7 +19,9 @@ import apache.rocketmq.controller.v1.StreamState; import com.automq.rocketmq.metadata.dao.Stream; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -42,6 +44,16 @@ public int streamNumOfNode(int nodeId) { return count; } + public List streamsOf(long topicId, int queueId) { + List result = new ArrayList<>(); + for (Map.Entry entry : streams.entrySet()) { + if (entry.getValue().getTopicId() == topicId && entry.getValue().getQueueId() == queueId) { + result.add(entry.getKey()); + } + } + return result; + } + public void apply(Collection streams) { for (Stream stream : streams) { cacheItem(stream); diff --git a/controller/src/test/java/com/automq/rocketmq/controller/server/store/impl/cache/AssignmentCacheTest.java b/controller/src/test/java/com/automq/rocketmq/controller/server/store/impl/cache/AssignmentCacheTest.java index 2b7e71652..7865acfa2 100644 --- a/controller/src/test/java/com/automq/rocketmq/controller/server/store/impl/cache/AssignmentCacheTest.java +++ b/controller/src/test/java/com/automq/rocketmq/controller/server/store/impl/cache/AssignmentCacheTest.java @@ -29,7 +29,6 @@ class AssignmentCacheTest { @Test public void testApply() { AssignmentCache cache = new AssignmentCache(); - QueueAssignment assignment = new QueueAssignment(); assignment.setStatus(AssignmentStatus.ASSIGNMENT_STATUS_ASSIGNED); assignment.setTopicId(1); @@ -45,7 +44,21 @@ public void testApply() { Assertions.assertEquals(1, cache.queueNumOfNode(4)); Assertions.assertEquals(0, cache.queueNumOfNode(3)); + } + + @Test + public void testByNode() { + AssignmentCache cache = new AssignmentCache(); + QueueAssignment assignment = new QueueAssignment(); + assignment.setStatus(AssignmentStatus.ASSIGNMENT_STATUS_ASSIGNED); + assignment.setTopicId(1); + assignment.setQueueId(2); + assignment.setSrcNodeId(3); + assignment.setDstNodeId(4); + cache.apply(List.of(assignment)); + Assertions.assertFalse(cache.byNode(4).isEmpty()); + Assertions.assertTrue(cache.byNode(3).isEmpty()); } } \ No newline at end of file diff --git a/controller/src/test/java/com/automq/rocketmq/controller/server/store/impl/cache/StreamCacheTest.java b/controller/src/test/java/com/automq/rocketmq/controller/server/store/impl/cache/StreamCacheTest.java index 5f9657cd1..0eaeb143c 100644 --- a/controller/src/test/java/com/automq/rocketmq/controller/server/store/impl/cache/StreamCacheTest.java +++ b/controller/src/test/java/com/automq/rocketmq/controller/server/store/impl/cache/StreamCacheTest.java @@ -50,4 +50,23 @@ public void testApply() { Assertions.assertEquals(0, cache.streamNumOfNode(6)); } + @Test + public void testStreamsOf() { + StreamCache cache = new StreamCache(); + Stream stream = new Stream(); + stream.setStreamRole(StreamRole.STREAM_ROLE_DATA); + stream.setRangeId(1); + stream.setTopicId(2L); + stream.setQueueId(3); + stream.setId(4L); + stream.setEpoch(5L); + stream.setSrcNodeId(6); + stream.setDstNodeId(7); + stream.setState(StreamState.OPEN); + cache.apply(List.of(stream)); + + List streams = cache.streamsOf(2, 3); + Assertions.assertFalse(streams.isEmpty()); + } + } \ No newline at end of file diff --git a/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/mapper/S3ObjectMapper.java b/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/mapper/S3ObjectMapper.java index 412d293be..3a9468944 100644 --- a/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/mapper/S3ObjectMapper.java +++ b/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/mapper/S3ObjectMapper.java @@ -42,4 +42,6 @@ public interface S3ObjectMapper { int prepare(S3Object s3Object); int rollback(@Param("current")Date current); + + long totalDataSize(@Param("streamId") long streamId); } diff --git a/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/service/DefaultS3MetadataService.java b/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/service/DefaultS3MetadataService.java index 4bd2a2a24..0fb92333a 100644 --- a/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/service/DefaultS3MetadataService.java +++ b/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/service/DefaultS3MetadataService.java @@ -38,6 +38,9 @@ import com.automq.rocketmq.metadata.mapper.S3WalObjectMapper; import com.automq.rocketmq.metadata.mapper.SequenceMapper; import com.automq.rocketmq.metadata.mapper.StreamMapper; +import com.automq.rocketmq.metadata.service.cache.S3ObjectCache; +import com.automq.rocketmq.metadata.service.cache.S3StreamObjectCache; +import com.automq.rocketmq.metadata.service.cache.S3WalObjectCache; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.TextFormat; import com.google.protobuf.util.JsonFormat; @@ -77,12 +80,31 @@ public class DefaultS3MetadataService implements S3MetadataService { private final S3StreamObjectCache s3StreamObjectCache; + private final S3ObjectCache s3ObjectCache; + + private final S3WalObjectCache s3WalObjectCache; + + public DefaultS3MetadataService(ControllerConfig nodeConfig, SqlSessionFactory sessionFactory, ExecutorService asyncExecutorService) { this.nodeConfig = nodeConfig; this.sessionFactory = sessionFactory; this.asyncExecutorService = asyncExecutorService; this.s3StreamObjectCache = new S3StreamObjectCache(); + this.s3ObjectCache = new S3ObjectCache(sessionFactory); + this.s3WalObjectCache = new S3WalObjectCache(sessionFactory); + } + + public void start() { + this.s3WalObjectCache.load(nodeConfig.nodeId()); + } + + public S3ObjectCache getS3ObjectCache() { + return s3ObjectCache; + } + + public S3WalObjectCache getS3WalObjectCache() { + return s3WalObjectCache; } public CompletableFuture prepareS3Objects(int count, int ttlInMinutes) { @@ -285,6 +307,8 @@ public CompletableFuture commitWalObject(S3WALObject walObject, : toCache.entrySet()) { s3StreamObjectCache.cache(entry.getKey(), entry.getValue()); } + s3WalObjectCache.onCommit(walObject); + s3WalObjectCache.onCompact(compactedObjects); LOGGER.info("broker[broke-id={}] commit wal object[object-id={}] success, compacted objects[{}], stream objects[{}]", brokerId, walObject.getObjectId(), compactedObjects, streamObjects); future.complete(null); @@ -387,7 +411,7 @@ public CompletableFuture> listWALObjects() { List walObjects = s3WalObjectMapper.list(nodeConfig.nodeId(), null).stream() .map(s3WALObject -> { try { - return buildS3WALObject(s3WALObject, decode(s3WALObject.getSubStreams())); + return Helper.buildS3WALObject(s3WALObject, Helper.decode(s3WALObject.getSubStreams())); } catch (InvalidProtocolBufferException e) { LOGGER.error("Failed to deserialize SubStreams", e); return null; @@ -421,7 +445,7 @@ public CompletableFuture> listWALObjects(long streamId, long s s3WalObjects.stream() .map(s3WalObject -> { try { - Map subStreams = decode(s3WalObject.getSubStreams()).getSubStreamsMap(); + Map subStreams = Helper.decode(s3WalObject.getSubStreams()).getSubStreamsMap(); Map streamsRecords = new HashMap<>(); if (subStreams.containsKey(streamId)) { SubStream subStream = subStreams.get(streamId); @@ -430,7 +454,7 @@ public CompletableFuture> listWALObjects(long streamId, long s } } if (!streamsRecords.isEmpty()) { - return buildS3WALObject(s3WalObject, SubStreams.newBuilder() + return Helper.buildS3WALObject(s3WalObject, SubStreams.newBuilder() .putAllSubStreams(streamsRecords) .build()); } @@ -507,26 +531,7 @@ private S3StreamObject buildS3StreamObject( .build(); } - private S3WALObject buildS3WALObject( - S3WalObject originalObject, - SubStreams subStreams) { - return S3WALObject.newBuilder() - .setObjectId(originalObject.getObjectId()) - .setObjectSize(originalObject.getObjectSize()) - .setBrokerId(originalObject.getNodeId()) - .setSequenceId(originalObject.getSequenceId()) - .setBaseDataTimestamp(originalObject.getBaseDataTimestamp().getTime()) - .setCommittedTimestamp(originalObject.getCommittedTimestamp() != null ? - originalObject.getCommittedTimestamp().getTime() : S3Constants.NOOP_OBJECT_COMMIT_TIMESTAMP) - .setSubStreams(subStreams) - .build(); - } - private SubStreams decode(String json) throws InvalidProtocolBufferException { - SubStreams.Builder builder = SubStreams.newBuilder(); - JsonFormat.parser().ignoringUnknownFields().merge(json, builder); - return builder.build(); - } private boolean commitObject(Long objectId, long streamId, long objectSize, SqlSession session) { S3ObjectMapper s3ObjectMapper = session.getMapper(S3ObjectMapper.class); @@ -555,7 +560,9 @@ private boolean commitObject(Long objectId, long streamId, long objectSize, SqlS s3Object.setStreamId(streamId); s3Object.setObjectSize(objectSize); s3Object.setState(S3ObjectState.BOS_COMMITTED); - return s3ObjectMapper.commit(s3Object) == 1; + boolean ok = s3ObjectMapper.commit(s3Object) == 1; + s3ObjectCache.onObjectAdd(List.of(s3Object)); + return ok; } private void extendRange(SqlSession session, Map> segments) { @@ -611,13 +618,13 @@ public CompletableFuture, List>> listObje .stream() .map(s3WalObject -> { try { - Map subStreams = decode(s3WalObject.getSubStreams()).getSubStreamsMap(); + Map subStreams = Helper.decode(s3WalObject.getSubStreams()).getSubStreamsMap(); Map streamsRecords = new HashMap<>(); subStreams.entrySet().stream() .filter(entry -> !Objects.isNull(entry) && entry.getKey().equals(streamId)) .filter(entry -> entry.getValue().getStartOffset() <= endOffset && entry.getValue().getEndOffset() > startOffset) .forEach(entry -> streamsRecords.put(entry.getKey(), entry.getValue())); - return streamsRecords.isEmpty() ? null : buildS3WALObject(s3WalObject, + return streamsRecords.isEmpty() ? null : Helper.buildS3WALObject(s3WalObject, SubStreams.newBuilder().putAllSubStreams(streamsRecords).build()); } catch (InvalidProtocolBufferException e) { LOGGER.error("Failed to deserialize SubStreams", e); @@ -753,6 +760,7 @@ public CompletableFuture trimStream(long streamId, long streamEpoch, long S3Object s3Object = s3ObjectMapper.getById(streamObject.getObjectId()); s3Object.setMarkedForDeletionTimestamp(new Date()); s3ObjectMapper.markToDelete(s3Object.getId(), new Date()); + s3ObjectCache.onObjectDelete(s3Object.getStreamId(), List.of(s3Object.getId())); } }); @@ -760,7 +768,7 @@ public CompletableFuture trimStream(long streamId, long streamEpoch, long s3WALObjectMapper.list(stream.getDstNodeId(), null).stream() .map(s3WALObject -> { try { - return buildS3WALObject(s3WALObject, decode(s3WALObject.getSubStreams())); + return Helper.buildS3WALObject(s3WALObject, Helper.decode(s3WALObject.getSubStreams())); } catch (InvalidProtocolBufferException e) { LOGGER.error("Failed to decode"); return null; @@ -795,6 +803,26 @@ public CompletableFuture trimStream(long streamId, long streamEpoch, long return future; } + @Override + public long streamDataSize(long streamId) { + return s3WalObjectCache.streamDataSize(streamId) + s3ObjectCache.streamDataSize(streamId); + } + + @Override + public long streamStartTime(long streamId) { + return Long.min(s3ObjectCache.streamStartTime(streamId), s3WalObjectCache.streamStartTime(streamId)); + } + + @Override + public void onStreamOpen(long streamId) { + s3ObjectCache.onStreamOpen(streamId); + } + + @Override + public void onStreamClose(long streamId) { + s3ObjectCache.onStreamClose(streamId); + } + @Override public void close() throws IOException { diff --git a/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/service/Helper.java b/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/service/Helper.java new file mode 100644 index 000000000..70db9db5a --- /dev/null +++ b/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/service/Helper.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.rocketmq.metadata.service; + +import apache.rocketmq.controller.v1.S3WALObject; +import apache.rocketmq.controller.v1.SubStreams; +import com.automq.rocketmq.common.system.S3Constants; +import com.automq.rocketmq.metadata.dao.S3WalObject; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.util.JsonFormat; + +public final class Helper { + + public static S3WALObject buildS3WALObject(S3WalObject s3WalObject) throws InvalidProtocolBufferException { + return buildS3WALObject(s3WalObject, decode(s3WalObject.getSubStreams())); + } + + static S3WALObject buildS3WALObject(S3WalObject s3WalObject, SubStreams subStreams) { + return S3WALObject.newBuilder() + .setObjectId(s3WalObject.getObjectId()) + .setObjectSize(s3WalObject.getObjectSize()) + .setBrokerId(s3WalObject.getNodeId()) + .setSequenceId(s3WalObject.getSequenceId()) + .setBaseDataTimestamp(s3WalObject.getBaseDataTimestamp().getTime()) + .setCommittedTimestamp(s3WalObject.getCommittedTimestamp() != null ? + s3WalObject.getCommittedTimestamp().getTime() : S3Constants.NOOP_OBJECT_COMMIT_TIMESTAMP) + .setSubStreams(subStreams) + .build(); + } + + static SubStreams decode(String json) throws InvalidProtocolBufferException { + SubStreams.Builder builder = SubStreams.newBuilder(); + JsonFormat.parser().ignoringUnknownFields().merge(json, builder); + return builder.build(); + } +} diff --git a/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/service/S3MetadataService.java b/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/service/S3MetadataService.java index cfcb9e50e..89e16ac3e 100644 --- a/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/service/S3MetadataService.java +++ b/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/service/S3MetadataService.java @@ -29,6 +29,8 @@ */ public interface S3MetadataService extends Closeable { + public void start(); + CompletableFuture prepareS3Objects(int count, int ttlInMinutes); CompletableFuture commitWalObject(S3WALObject walObject, List streamObjects, @@ -48,4 +50,12 @@ CompletableFuture, List>> listObjects(lon long endOffset, int limit); CompletableFuture trimStream(long streamId, long streamEpoch, long newStartOffset); + + void onStreamOpen(long streamId); + + void onStreamClose(long streamId); + + long streamDataSize(long streamId); + + long streamStartTime(long streamId); } diff --git a/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/service/cache/S3ObjectCache.java b/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/service/cache/S3ObjectCache.java new file mode 100644 index 000000000..6fb98c7ac --- /dev/null +++ b/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/service/cache/S3ObjectCache.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.rocketmq.metadata.service.cache; + +import apache.rocketmq.controller.v1.S3ObjectState; +import com.automq.rocketmq.metadata.dao.S3Object; +import com.automq.rocketmq.metadata.mapper.S3ObjectMapper; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.ibatis.session.SqlSession; +import org.apache.ibatis.session.SqlSessionFactory; + +public class S3ObjectCache { + private final SqlSessionFactory sessionFactory; + private final ConcurrentMap> cache; + + public S3ObjectCache(SqlSessionFactory sessionFactory) { + cache = new ConcurrentHashMap<>(); + this.sessionFactory = sessionFactory; + } + + public void onStreamOpen(long streamId) { + try (SqlSession session = sessionFactory.openSession()) { + S3ObjectMapper mapper = session.getMapper(S3ObjectMapper.class); + List list = mapper.list(S3ObjectState.BOS_COMMITTED, streamId); + cache.computeIfAbsent(streamId, k -> { + ConcurrentMap map = new ConcurrentHashMap<>(); + list.forEach(obj -> map.put(obj.getId(), obj)); + return map; + }); + } + } + + public void onObjectDelete(long streamId, Collection objectIds) { + if (null == objectIds || objectIds.isEmpty()) { + return; + } + + cache.computeIfPresent(streamId, (k, m) -> { + objectIds.forEach(m::remove); + return m; + }); + } + + public void onObjectAdd(Collection objects) { + objects.forEach(object -> { + if (!cache.containsKey(object.getStreamId())) { + return; + } + + cache.get(object.getStreamId()).put(object.getId(), object); + }); + } + + public void onStreamClose(long streamId) { + cache.remove(streamId); + } + + public long streamDataSize(long streamId) { + Map objs = cache.get(streamId); + if (null == objs) { + return 0; + } + return objs.values().stream().mapToLong(S3Object::getObjectSize).sum(); + } + + public long streamStartTime(long streamId) { + Map objs = cache.get(streamId); + long startTime = Long.MAX_VALUE; + if (null == objs) { + return startTime; + } + for (Map.Entry entry : objs.entrySet()) { + long ts = entry.getValue().getCommittedTimestamp().getTime(); + if (ts < startTime) { + startTime = ts; + } + } + return startTime; + } +} diff --git a/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/service/S3StreamObjectCache.java b/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/service/cache/S3StreamObjectCache.java similarity index 98% rename from metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/service/S3StreamObjectCache.java rename to metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/service/cache/S3StreamObjectCache.java index 188833ca1..537d28df6 100644 --- a/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/service/S3StreamObjectCache.java +++ b/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/service/cache/S3StreamObjectCache.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package com.automq.rocketmq.metadata.service; +package com.automq.rocketmq.metadata.service.cache; import com.automq.rocketmq.metadata.dao.S3StreamObject; import com.google.common.collect.Lists; diff --git a/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/service/cache/S3WalObjectCache.java b/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/service/cache/S3WalObjectCache.java new file mode 100644 index 000000000..ebe6ba2d8 --- /dev/null +++ b/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/service/cache/S3WalObjectCache.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.rocketmq.metadata.service.cache; + +import apache.rocketmq.controller.v1.S3WALObject; +import apache.rocketmq.controller.v1.SubStream; +import com.automq.rocketmq.metadata.dao.S3WalObject; +import com.automq.rocketmq.metadata.mapper.S3WalObjectMapper; +import com.automq.rocketmq.metadata.service.Helper; +import com.google.protobuf.InvalidProtocolBufferException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.ibatis.session.SqlSession; +import org.apache.ibatis.session.SqlSessionFactory; + +public class S3WalObjectCache { + + private final ConcurrentMap cache; + private final SqlSessionFactory sessionFactory; + + public S3WalObjectCache(SqlSessionFactory sessionFactory) { + this.sessionFactory = sessionFactory; + cache = new ConcurrentHashMap<>(); + } + + public void load(int nodeId) { + try (SqlSession session = sessionFactory.openSession()) { + S3WalObjectMapper mapper = session.getMapper(S3WalObjectMapper.class); + List list = mapper.list(nodeId, null); + list.forEach(obj -> { + try { + cache.put(obj.getObjectId(), Helper.buildS3WALObject(obj)); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }); + } + } + + public void onCompact(Collection objectIds) { + if (null == objectIds || objectIds.isEmpty()) { + return; + } + objectIds.forEach(cache::remove); + } + + public void onCommit(S3WALObject object) { + cache.put(object.getObjectId(), object); + } + + public long streamDataSize(long streamId) { + long total = 0; + for (Map.Entry entry : cache.entrySet()) { + for (Map.Entry e : entry.getValue().getSubStreams().getSubStreamsMap().entrySet()) { + if (e.getValue().getStreamId() == streamId) { + total += e.getValue().getDataSize(); + } + } + } + return total; + } + + public long streamStartTime(long streamId) { + long startTime = Long.MAX_VALUE; + for (Map.Entry entry : cache.entrySet()) { + for (Map.Entry e : entry.getValue().getSubStreams().getSubStreamsMap().entrySet()) { + if (e.getValue().getStreamId() == streamId) { + if (entry.getValue().getBaseDataTimestamp() < startTime) { + startTime = entry.getValue().getBaseDataTimestamp(); + } + } + } + } + return startTime; + } +} diff --git a/metadata-jdbc/src/main/resources/database/mapper/S3ObjectMapper.xml b/metadata-jdbc/src/main/resources/database/mapper/S3ObjectMapper.xml index fd60f1de9..cc245df2a 100644 --- a/metadata-jdbc/src/main/resources/database/mapper/S3ObjectMapper.xml +++ b/metadata-jdbc/src/main/resources/database/mapper/S3ObjectMapper.xml @@ -77,5 +77,12 @@ AND stream_id = #{streamId} - + + + + \ No newline at end of file diff --git a/metadata-jdbc/src/test/java/com/automq/rocketmq/metadata/service/cache/S3ObjectCacheTest.java b/metadata-jdbc/src/test/java/com/automq/rocketmq/metadata/service/cache/S3ObjectCacheTest.java new file mode 100644 index 000000000..bfb8cc06b --- /dev/null +++ b/metadata-jdbc/src/test/java/com/automq/rocketmq/metadata/service/cache/S3ObjectCacheTest.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.rocketmq.metadata.service.cache; + + +class S3ObjectCacheTest { + +} \ No newline at end of file diff --git a/metadata-jdbc/src/test/java/com/automq/rocketmq/metadata/service/cache/S3WalObjectCacheTest.java b/metadata-jdbc/src/test/java/com/automq/rocketmq/metadata/service/cache/S3WalObjectCacheTest.java new file mode 100644 index 000000000..ad8cbe2a5 --- /dev/null +++ b/metadata-jdbc/src/test/java/com/automq/rocketmq/metadata/service/cache/S3WalObjectCacheTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.rocketmq.metadata.service.cache; + +import com.automq.rocketmq.metadata.DatabaseTestBase; +import java.io.IOException; +import org.junit.jupiter.api.Test; + +class S3WalObjectCacheTest extends DatabaseTestBase { + + @Test + public void testLoad() throws IOException { + S3WalObjectCache cache = new S3WalObjectCache(getSessionFactory()); + cache.load(1); + } +} \ No newline at end of file diff --git a/metadata/src/main/java/com/automq/rocketmq/metadata/DefaultStoreMetadataService.java b/metadata/src/main/java/com/automq/rocketmq/metadata/DefaultStoreMetadataService.java index eec92247f..3f5927791 100644 --- a/metadata/src/main/java/com/automq/rocketmq/metadata/DefaultStoreMetadataService.java +++ b/metadata/src/main/java/com/automq/rocketmq/metadata/DefaultStoreMetadataService.java @@ -127,10 +127,12 @@ public CompletableFuture trimStream(long streamId, long streamEpoch, long @Override public CompletableFuture openStream(long streamId, long streamEpoch) { + AtomicBoolean loop = new AtomicBoolean(true); return Futures.loop(loop::get, () -> metadataStore.openStream(streamId, streamEpoch, metadataStore.config().nodeId()) .thenApply(res -> { loop.set(false); + s3MetadataService.onStreamOpen(streamId); return res; }), MoreExecutors.directExecutor()); } @@ -141,6 +143,7 @@ public CompletableFuture closeStream(long streamId, long streamEpoch) { return Futures.loop(loop::get, () -> metadataStore.closeStream(streamId, streamEpoch, metadataStore.config().nodeId()) .thenApply(res -> { loop.set(false); + s3MetadataService.onStreamClose(streamId); return res; }), MoreExecutors.directExecutor()); } diff --git a/proto/src/main/proto/controller/model.proto b/proto/src/main/proto/controller/model.proto index 44c9b0ee0..dea475b73 100644 --- a/proto/src/main/proto/controller/model.proto +++ b/proto/src/main/proto/controller/model.proto @@ -261,6 +261,10 @@ message SubStream { int64 stream_id = 1; // The start offset of the range, inclusive. int64 start_offset = 2; + // End offset of this range, exclusive. int64 end_offset = 3; + + // Total records size in bytes + int64 data_size = 4; } \ No newline at end of file