Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement store metrics of topic/queue store-size and reserve-time #653

Merged
merged 3 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ public class BrokerController implements Lifecycle {
private final MessageService messageService;
private final ExtendMessageService extendMessageService;

private final S3MetadataService s3MetadataService;

public BrokerController(BrokerConfig brokerConfig) throws Exception {
this.brokerConfig = brokerConfig;

Expand All @@ -100,7 +102,7 @@ public BrokerController(BrokerConfig brokerConfig) throws Exception {
metadataStore = MetadataStoreBuilder.build(brokerConfig);

proxyMetadataService = new DefaultProxyMetadataService(metadataStore);
S3MetadataService s3MetadataService = new DefaultS3MetadataService(metadataStore.config(),
s3MetadataService = new DefaultS3MetadataService(metadataStore.config(),
metadataStore.sessionFactory(), metadataStore.asyncExecutor());
storeMetadataService = new DefaultStoreMetadataService(metadataStore, s3MetadataService);

Expand Down Expand Up @@ -170,7 +172,8 @@ public BrokerController(BrokerConfig brokerConfig) throws Exception {
}

// Init the metrics exporter before accept requests.
metricsExporter = new MetricsExporter(brokerConfig, messageStore, messagingProcessor, resource, sdkTracerProvider);
metricsExporter = new MetricsExporter(brokerConfig, messageStore, messagingProcessor, resource,
sdkTracerProvider, metadataStore, s3MetadataService);

// Init the profiler agent.
ProfilerConfig profilerConfig = brokerConfig.profiler();
Expand Down Expand Up @@ -204,6 +207,7 @@ public void start() throws Exception {
remotingServer.start();
metadataStore.registerCurrentNode(brokerConfig.name(), brokerConfig.advertiseAddress(), brokerConfig.instanceId());
metricsExporter.start();
s3MetadataService.start();

startThreadPoolMonitor();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import com.automq.rocketmq.common.config.BrokerConfig;
import com.automq.rocketmq.common.config.MetricsConfig;
import com.automq.rocketmq.common.util.Lifecycle;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.metadata.service.S3MetadataService;
import com.automq.rocketmq.controller.server.TopicMetricsManager;
import com.automq.rocketmq.proxy.metrics.ProxyMetricsManager;
import com.automq.rocketmq.proxy.processor.ExtendMessagingProcessor;
import com.automq.rocketmq.store.MessageStoreImpl;
Expand Down Expand Up @@ -84,15 +87,19 @@ public class MetricsExporter implements Lifecycle {
private final StoreMetricsManager storeMetricsManager;
private final StreamMetricsManager streamMetricsManager;

private final TopicMetricsManager topicMetricsManager;

public static Supplier<AttributesBuilder> attributesBuilderSupplier = Attributes::builder;

public MetricsExporter(BrokerConfig brokerConfig, MessageStoreImpl messageStore,
ExtendMessagingProcessor messagingProcessor, Resource resource, SdkTracerProvider tracerProvider) {
ExtendMessagingProcessor messagingProcessor, Resource resource, SdkTracerProvider tracerProvider,
MetadataStore metadataStore, S3MetadataService s3MetadataService) {
this.brokerConfig = brokerConfig;
this.metricsConfig = brokerConfig.metrics();
this.proxyMetricsManager = new ProxyMetricsManager(messagingProcessor);
this.storeMetricsManager = new StoreMetricsManager(metricsConfig, messageStore);
this.streamMetricsManager = new StreamMetricsManager();
this.topicMetricsManager = new TopicMetricsManager(metadataStore, s3MetadataService);
init(resource, tracerProvider);
S3StreamMetricsRegistry.setMetricsGroup(this.streamMetricsManager);
}
Expand Down Expand Up @@ -240,18 +247,21 @@ private void initAttributesBuilder() {
streamMetricsManager.initAttributesBuilder(MetricsExporter::newAttributesBuilder);
storeMetricsManager.initAttributesBuilder(MetricsExporter::newAttributesBuilder);
proxyMetricsManager.initAttributesBuilder(MetricsExporter::newAttributesBuilder);
topicMetricsManager.initAttributesBuilder(MetricsExporter::newAttributesBuilder);
}

private void initStaticMetrics() {
streamMetricsManager.initStaticMetrics(brokerMeter);
storeMetricsManager.initStaticMetrics(brokerMeter);
proxyMetricsManager.initStaticMetrics(brokerMeter);
topicMetricsManager.initStaticMetrics(brokerMeter);
}

private void initDynamicMetrics() {
streamMetricsManager.initDynamicMetrics(brokerMeter);
storeMetricsManager.initDynamicMetrics(brokerMeter);
proxyMetricsManager.initDynamicMetrics(brokerMeter);
topicMetricsManager.initDynamicMetrics(brokerMeter);
storeMetricsManager.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ public interface MetadataStore extends Closeable {

ElectionService electionService();

List<QueueAssignment> assignmentsOf(int nodeId);

List<Long> streamsOf(long topicId, int queueId);

void start();

CompletableFuture<Cluster> describeCluster(DescribeClusterRequest request);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.controller.server;

import com.automq.rocketmq.common.MetricsManager;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.metadata.dao.QueueAssignment;
import com.automq.rocketmq.metadata.service.S3MetadataService;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableLongGauge;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public class TopicMetricsManager implements MetricsManager {

private static Supplier<AttributesBuilder> attributesBuilderSupplier;
public static final String GAUGE_STORAGE_SIZE = "rocketmq_storage_size";
public static final String GAUGE_STORAGE_MESSAGE_RESERVE_TIME = "rocketmq_storage_message_reserve_time";

public static final String LABEL_TOPIC = "topic";
public static final String LABEL_QUEUE = "queue";

private final MetadataStore metadataStore;

private final S3MetadataService s3MetadataService;

private ObservableLongGauge storeGauge;

Check warning on line 46 in controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java

View workflow job for this annotation

GitHub Actions / Qodana for JVM

Field can be local

Field can be converted to a local variable

private ObservableLongGauge reserveTimeGauge;

Check warning on line 48 in controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java

View workflow job for this annotation

GitHub Actions / Qodana for JVM

Field can be local

Field can be converted to a local variable

public TopicMetricsManager(MetadataStore metadataStore, S3MetadataService s3MetadataService) {
this.metadataStore = metadataStore;
this.s3MetadataService = s3MetadataService;
}

Check warning on line 53 in controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java#L50-L53

Added lines #L50 - L53 were not covered by tests

@Override
public void initAttributesBuilder(Supplier<AttributesBuilder> attributesBuilderSupplier) {
TopicMetricsManager.attributesBuilderSupplier = attributesBuilderSupplier;
}

Check warning on line 58 in controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java#L57-L58

Added lines #L57 - L58 were not covered by tests

private void measureStoreSize(ObservableLongMeasurement measurement) {
List<QueueAssignment> list = metadataStore.assignmentsOf(metadataStore.config().nodeId());

Check warning on line 61 in controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java#L61

Added line #L61 was not covered by tests
for (QueueAssignment assignment : list) {
List<Long> streamIds = metadataStore.streamsOf(assignment.getTopicId(), assignment.getQueueId());
long total = 0;

Check warning on line 64 in controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java#L63-L64

Added lines #L63 - L64 were not covered by tests
for (long id : streamIds) {
total += s3MetadataService.streamDataSize(id);

Check warning on line 66 in controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java#L66

Added line #L66 was not covered by tests
}
Optional<String> topic = metadataStore.topicManager().nameOf(assignment.getTopicId());

Check warning on line 68 in controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java#L68

Added line #L68 was not covered by tests
if (topic.isEmpty()) {
continue;

Check warning on line 70 in controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java#L70

Added line #L70 was not covered by tests
}
measurement.record(total,
attributesBuilderSupplier.get().put(LABEL_TOPIC, topic.get())
.put(LABEL_QUEUE, assignment.getQueueId()).build()

Check warning on line 74 in controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java#L72-L74

Added lines #L72 - L74 were not covered by tests
);
}
}

Check warning on line 77 in controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java#L77

Added line #L77 was not covered by tests

private void measureReserveTime(ObservableLongMeasurement measurement) {
List<QueueAssignment> list = metadataStore.assignmentsOf(metadataStore.config().nodeId());
long currentTs = System.currentTimeMillis();

Check warning on line 81 in controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java#L80-L81

Added lines #L80 - L81 were not covered by tests
for (QueueAssignment assignment : list) {
List<Long> streamIds = metadataStore.streamsOf(assignment.getTopicId(), assignment.getQueueId());
long startTime = Long.MAX_VALUE;

Check warning on line 84 in controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java#L83-L84

Added lines #L83 - L84 were not covered by tests
for (long id : streamIds) {
long ts = s3MetadataService.streamStartTime(id);

Check warning on line 86 in controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java#L86

Added line #L86 was not covered by tests
if (ts < startTime) {
startTime = ts;

Check warning on line 88 in controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java#L88

Added line #L88 was not covered by tests
}
}
Optional<String> topic = metadataStore.topicManager().nameOf(assignment.getTopicId());

Check warning on line 91 in controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java#L91

Added line #L91 was not covered by tests
if (topic.isEmpty()) {
continue;

Check warning on line 93 in controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java#L93

Added line #L93 was not covered by tests
}
long reserveTime = 0;

Check warning on line 95 in controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java#L95

Added line #L95 was not covered by tests
if (startTime < currentTs) {
reserveTime = TimeUnit.MILLISECONDS.toSeconds(currentTs - startTime);

Check warning on line 97 in controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java#L97

Added line #L97 was not covered by tests
}
measurement.record(reserveTime,
attributesBuilderSupplier.get().put(LABEL_TOPIC, topic.get())
.put(LABEL_QUEUE, assignment.getQueueId()).build()

Check warning on line 101 in controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java#L99-L101

Added lines #L99 - L101 were not covered by tests
);
}
}

Check warning on line 104 in controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java#L104

Added line #L104 was not covered by tests

@Override
public void initStaticMetrics(Meter meter) {
storeGauge = meter.gaugeBuilder(GAUGE_STORAGE_SIZE)
.setUnit("byte")
.ofLongs().setDescription("Storage size per topic/queue")
.buildWithCallback(this::measureStoreSize);

Check warning on line 111 in controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java#L108-L111

Added lines #L108 - L111 were not covered by tests

reserveTimeGauge = meter.gaugeBuilder(GAUGE_STORAGE_MESSAGE_RESERVE_TIME)
.setUnit("Second")
.ofLongs()
.setDescription("Spanning duration of the reserved messages per topic/queue")
.buildWithCallback(this::measureReserveTime);
}

Check warning on line 118 in controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java#L113-L118

Added lines #L113 - L118 were not covered by tests

@Override
public void initDynamicMetrics(Meter meter) {
}

Check warning on line 122 in controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/TopicMetricsManager.java#L122

Added line #L122 was not covered by tests
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,16 @@
return electionService;
}

@Override
public List<QueueAssignment> assignmentsOf(int nodeId) {
return topicManager.getAssignmentCache().byNode(nodeId);

Check warning on line 176 in controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java#L176

Added line #L176 was not covered by tests
}

@Override
public List<Long> streamsOf(long topicId, int queueId) {
return topicManager.getStreamCache().streamsOf(topicId, queueId);

Check warning on line 181 in controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java#L181

Added line #L181 was not covered by tests
}

/**
* Expose for test purpose only.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@
this.topicNameRequests = new ConcurrentHashMap<>();
}

public Optional<String> nameOf(long topicId) {
Topic topic = topicCache.byId(topicId);

Check warning on line 93 in controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/TopicManager.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/TopicManager.java#L93

Added line #L93 was not covered by tests
if (null == topic) {
return Optional.empty();

Check warning on line 95 in controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/TopicManager.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/TopicManager.java#L95

Added line #L95 was not covered by tests
}
return Optional.of(topic.getName());

Check warning on line 97 in controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/TopicManager.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/TopicManager.java#L97

Added line #L97 was not covered by tests
}

public TopicCache getTopicCache() {
return topicCache;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.automq.rocketmq.controller.server.store.impl.cache;

import com.automq.rocketmq.metadata.dao.QueueAssignment;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -42,6 +43,27 @@
}
}

public List<QueueAssignment> byNode(int nodeId) {
List<QueueAssignment> result = new ArrayList<>();
for (Map.Entry<Long, Map<Integer, QueueAssignment>> entry : assignments.entrySet()) {
for (Map.Entry<Integer, QueueAssignment> e : entry.getValue().entrySet()) {
switch (e.getValue().getStatus()) {
case ASSIGNMENT_STATUS_YIELDING -> {
if (e.getValue().getSrcNodeId() == nodeId) {
result.add(e.getValue());

Check warning on line 53 in controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/cache/AssignmentCache.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/cache/AssignmentCache.java#L53

Added line #L53 was not covered by tests
}
}

Check warning on line 55 in controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/cache/AssignmentCache.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/cache/AssignmentCache.java#L55

Added line #L55 was not covered by tests
case ASSIGNMENT_STATUS_ASSIGNED -> {
if (e.getValue().getDstNodeId() == nodeId) {
result.add(e.getValue());
}
}
}
}
}
return result;
}

public Map<Integer, QueueAssignment> byTopicId(Long topicId) {
return assignments.get(topicId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import apache.rocketmq.controller.v1.StreamState;
import com.automq.rocketmq.metadata.dao.Stream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -42,6 +44,16 @@ public int streamNumOfNode(int nodeId) {
return count;
}

public List<Long> streamsOf(long topicId, int queueId) {
List<Long> result = new ArrayList<>();
for (Map.Entry<Long, Stream> entry : streams.entrySet()) {
if (entry.getValue().getTopicId() == topicId && entry.getValue().getQueueId() == queueId) {
result.add(entry.getKey());
}
}
return result;
}

public void apply(Collection<Stream> streams) {
for (Stream stream : streams) {
cacheItem(stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ class AssignmentCacheTest {
@Test
public void testApply() {
AssignmentCache cache = new AssignmentCache();

QueueAssignment assignment = new QueueAssignment();
assignment.setStatus(AssignmentStatus.ASSIGNMENT_STATUS_ASSIGNED);
assignment.setTopicId(1);
Expand All @@ -45,7 +44,21 @@ public void testApply() {

Assertions.assertEquals(1, cache.queueNumOfNode(4));
Assertions.assertEquals(0, cache.queueNumOfNode(3));
}

@Test
public void testByNode() {
AssignmentCache cache = new AssignmentCache();
QueueAssignment assignment = new QueueAssignment();
assignment.setStatus(AssignmentStatus.ASSIGNMENT_STATUS_ASSIGNED);
assignment.setTopicId(1);
assignment.setQueueId(2);
assignment.setSrcNodeId(3);
assignment.setDstNodeId(4);
cache.apply(List.of(assignment));

Assertions.assertFalse(cache.byNode(4).isEmpty());
Assertions.assertTrue(cache.byNode(3).isEmpty());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,23 @@ public void testApply() {
Assertions.assertEquals(0, cache.streamNumOfNode(6));
}

@Test
public void testStreamsOf() {
StreamCache cache = new StreamCache();
Stream stream = new Stream();
stream.setStreamRole(StreamRole.STREAM_ROLE_DATA);
stream.setRangeId(1);
stream.setTopicId(2L);
stream.setQueueId(3);
stream.setId(4L);
stream.setEpoch(5L);
stream.setSrcNodeId(6);
stream.setDstNodeId(7);
stream.setState(StreamState.OPEN);
cache.apply(List.of(stream));

List<Long> streams = cache.streamsOf(2, 3);
Assertions.assertFalse(streams.isEmpty());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,6 @@ public interface S3ObjectMapper {
int prepare(S3Object s3Object);

int rollback(@Param("current")Date current);

long totalDataSize(@Param("streamId") long streamId);
}
Loading