diff --git a/s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java b/s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java index 9e0a652d5..91f999952 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java +++ b/s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java @@ -40,6 +40,7 @@ public class DirectByteBufAlloc { public static final int STREAM_OBJECT_COMPACTION_WRITE = 8; public static final int STREAM_SET_OBJECT_COMPACTION_READ = 9; public static final int STREAM_SET_OBJECT_COMPACTION_WRITE = 10; + public static DirectByteBufAllocMetric directByteBufAllocMetric = null; static { registerAllocType(DEFAULT, "default"); @@ -77,11 +78,13 @@ public static ByteBuf byteBuffer(int initCapacity, int type) { if (now - lastMetricLogTime > 60000) { // it's ok to be not thread safe lastMetricLogTime = now; - LOGGER.info("Direct Memory usage: netty.usedDirectMemory={}, DirectByteBufAlloc={}", ALLOC.metric().usedDirectMemory(), metric()); + DirectByteBufAlloc.directByteBufAllocMetric = new DirectByteBufAllocMetric(); + LOGGER.info("Direct Memory usage: {}", DirectByteBufAlloc.directByteBufAllocMetric); } return new WrappedByteBuf(ALLOC.directBuffer(initCapacity), () -> usage.add(-initCapacity)); } catch (OutOfMemoryError e) { - LOGGER.error("alloc direct buffer OOM, netty.usedDirectMemory={}, DirectByteBufAlloc={}", ALLOC.metric().usedDirectMemory(), metric(), e); + DirectByteBufAlloc.directByteBufAllocMetric = new DirectByteBufAllocMetric(); + LOGGER.error("alloc direct buffer OOM, {}", DirectByteBufAlloc.directByteBufAllocMetric, e); System.err.println("alloc direct buffer OOM"); Runtime.getRuntime().halt(1); throw e; @@ -95,28 +98,36 @@ public static void registerAllocType(int type, String name) { ALLOC_TYPE.put(type, name); } - public static Metric metric() { - return new Metric(); - } + public static class DirectByteBufAllocMetric { + private final long usedDirectMemory; + private final long allocatedDirectMemory; + private final Map detail = new HashMap<>(); - public static class Metric { - private final long usage; - private final Map detail; + public DirectByteBufAllocMetric() { + USAGE_STATS.forEach((k, v) -> { + detail.put(k + "/" + ALLOC_TYPE.get(k), v.longValue()); + }); + this.usedDirectMemory = ALLOC.metric().usedDirectMemory(); + this.allocatedDirectMemory = this.detail.values().stream().mapToLong(Long::longValue).sum(); + } + + public long getUsedDirectMemory() { + return usedDirectMemory; + } - public Metric() { - Map detail = new HashMap<>(); - USAGE_STATS.forEach((k, v) -> detail.put(k, v.longValue())); - this.detail = detail; - this.usage = this.detail.values().stream().mapToLong(Long::longValue).sum(); + public Map getDetailedMap() { + return detail; } @Override public String toString() { - StringBuilder sb = new StringBuilder("DirectByteBufAllocMetric{usage="); - sb.append(usage); + StringBuilder sb = new StringBuilder("DirectByteBufAllocMetric{usedDirectMemory="); + sb.append(usedDirectMemory); + sb.append(", allocatedDirectMemory="); + sb.append(allocatedDirectMemory); sb.append(", detail="); - for (Map.Entry entry : detail.entrySet()) { - sb.append(entry.getKey()).append("/").append(ALLOC_TYPE.get(entry.getKey())).append("=").append(entry.getValue()).append(","); + for (Map.Entry entry : detail.entrySet()) { + sb.append(entry.getKey()).append("=").append(entry.getValue()).append(","); } sb.append("}"); return sb.toString(); diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/AttributesUtils.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/AttributesUtils.java index c0369dd1a..e4583d978 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/AttributesUtils.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/AttributesUtils.java @@ -52,12 +52,6 @@ public static Attributes buildAttributes(S3ObjectStage objectStage) { .build(); } - public static Attributes buildAttributes(String source) { - return Attributes.builder() - .put(S3StreamMetricsConstant.LABEL_ALLOCATE_BYTE_BUF_SOURCE, source) - .build(); - } - public static String getObjectBucketLabel(long objectSize) { int index = (int) Math.ceil(Math.log((double) objectSize / (16 * 1024)) / Math.log(2)); index = Math.min(S3StreamMetricsConstant.OBJECT_SIZE_BUCKET_NAMES.length - 1, Math.max(0, index)); diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/MultiAttributes.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/MultiAttributes.java new file mode 100644 index 000000000..7adfaff47 --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/MultiAttributes.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.stream.s3.metrics; + +import com.automq.stream.s3.metrics.wrapper.ConfigListener; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class MultiAttributes implements ConfigListener { + private final Map attributesMap = new ConcurrentHashMap<>(); + private final AttributeKey keyName; + private Attributes baseAttributes; + + public MultiAttributes(Attributes baseAttributes, AttributeKey keyName) { + this.baseAttributes = baseAttributes; + this.keyName = keyName; + } + + public Attributes get(K key) { + return attributesMap.computeIfAbsent(key, k -> buildAttributes(baseAttributes, Attributes.of(keyName, key))); + } + + private Attributes buildAttributes(Attributes baseAttributes, Attributes attributes) { + return Attributes.builder().putAll(baseAttributes).putAll(attributes).build(); + } + + private void reBuildAttributes(Attributes baseAttributes) { + for (Map.Entry entry : attributesMap.entrySet()) { + attributesMap.replace(entry.getKey(), buildAttributes(baseAttributes, entry.getValue())); + } + } + + @Override + public void onConfigChange(MetricsConfig metricsConfig) { + this.baseAttributes = metricsConfig.getBaseAttributes(); + reBuildAttributes(metricsConfig.getBaseAttributes()); + } +} diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java index a96bda3a8..fdbbe7cbb 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java @@ -79,7 +79,6 @@ public class S3StreamMetricsConstant { public static final String NETWORK_OUTBOUND_LIMITER_QUEUE_SIZE_METRIC_NAME = "network_outbound_limiter_queue_size"; public static final String NETWORK_INBOUND_LIMITER_QUEUE_TIME_METRIC_NAME = "network_inbound_limiter_queue_time"; public static final String NETWORK_OUTBOUND_LIMITER_QUEUE_TIME_METRIC_NAME = "network_outbound_limiter_queue_time"; - public static final String ALLOCATE_BYTE_BUF_SIZE_METRIC_NAME = "allocate_byte_buf_size"; public static final String READ_AHEAD_SIZE_METRIC_NAME = "read_ahead_size"; public static final String WAL_START_OFFSET = "wal_start_offset"; public static final String WAL_TRIMMED_OFFSET = "wal_trimmed_offset"; @@ -92,12 +91,14 @@ public class S3StreamMetricsConstant { public static final String INFLIGHT_WAL_UPLOAD_TASKS_COUNT_METRIC_NAME = "inflight_wal_upload_tasks_count"; public static final String COMPACTION_READ_SIZE_METRIC_NAME = "compaction_read_size"; public static final String COMPACTION_WRITE_SIZE_METRIC_NAME = "compaction_write_size"; + public static final String ALLOCATED_DIRECT_MEMORY_SIZE_METRIC_NAME = "allocated_direct_memory_size"; + public static final String USED_DIRECT_MEMORY_SIZE_METRIC_NAME = "used_direct_memory_size"; public static final AttributeKey LABEL_OPERATION_TYPE = AttributeKey.stringKey("operation_type"); public static final AttributeKey LABEL_OPERATION_NAME = AttributeKey.stringKey("operation_name"); public static final AttributeKey LABEL_SIZE_NAME = AttributeKey.stringKey("size"); public static final AttributeKey LABEL_STAGE = AttributeKey.stringKey("stage"); public static final AttributeKey LABEL_STATUS = AttributeKey.stringKey("status"); - public static final AttributeKey LABEL_ALLOCATE_BYTE_BUF_SOURCE = AttributeKey.stringKey("source"); + public static final AttributeKey LABEL_ALLOC_TYPE = AttributeKey.stringKey("type"); public static final String LABEL_STATUS_SUCCESS = "success"; public static final String LABEL_STATUS_FAILED = "failed"; public static final String LABEL_STATUS_HIT = "hit"; diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java index 8450a1e80..1a7a96b7e 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java @@ -11,6 +11,7 @@ package com.automq.stream.s3.metrics; +import com.automq.stream.s3.DirectByteBufAlloc; import com.automq.stream.s3.metrics.wrapper.CounterMetric; import com.automq.stream.s3.metrics.operations.S3ObjectStage; import com.automq.stream.s3.metrics.operations.S3Operation; @@ -25,6 +26,7 @@ import io.opentelemetry.api.metrics.ObservableLongGauge; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.function.Supplier; public class S3StreamMetricsManager { @@ -44,7 +46,6 @@ public class S3StreamMetricsManager { private static ObservableLongGauge networkOutboundLimiterQueueSize = new NoopObservableLongGauge(); private static LongHistogram networkInboundLimiterQueueTime = new NoopLongHistogram(); private static LongHistogram networkOutboundLimiterQueueTime = new NoopLongHistogram(); - private static LongHistogram allocateByteBufSize = new NoopLongHistogram(); private static LongHistogram readAheadSize = new NoopLongHistogram(); private static LongHistogram readAheadLimierQueueTime = new NoopLongHistogram(); private static ObservableLongGauge deltaWalStartOffset = new NoopObservableLongGauge(); @@ -55,6 +56,8 @@ public class S3StreamMetricsManager { private static ObservableLongGauge availableInflightS3ReadQuota = new NoopObservableLongGauge(); private static ObservableLongGauge availableInflightS3WriteQuota = new NoopObservableLongGauge(); private static ObservableLongGauge inflightWALUploadTasksCount = new NoopObservableLongGauge(); + private static ObservableLongGauge allocatedDirectMemorySize = new NoopObservableLongGauge(); + private static ObservableLongGauge usedDirectMemorySize = new NoopObservableLongGauge(); private static LongCounter compactionReadSizeInTotal = new NoopLongCounter(); private static LongCounter compactionWriteSizeInTotal = new NoopLongCounter(); private static Supplier networkInboundAvailableBandwidthSupplier = () -> 0L; @@ -70,6 +73,12 @@ public class S3StreamMetricsManager { private static Supplier availableInflightS3WriteQuotaSupplier = () -> 0; private static Supplier inflightWALUploadTasksCountSupplier = () -> 0; private static MetricsConfig metricsConfig = new MetricsConfig(MetricsLevel.INFO, Attributes.empty()); + private static final MultiAttributes ALLOC_TYPE_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(), + S3StreamMetricsConstant.LABEL_ALLOC_TYPE); + + static { + BASE_ATTRIBUTES_LISTENERS.add(ALLOC_TYPE_ATTRIBUTES); + } public static void configure(MetricsConfig metricsConfig) { synchronized (BASE_ATTRIBUTES_LISTENERS) { @@ -172,11 +181,6 @@ public static void initMetrics(Meter meter, String prefix) { .ofLongs() .setExplicitBucketBoundariesAdvice(S3StreamMetricsConstant.LATENCY_BOUNDARIES) .build(); - allocateByteBufSize = meter.histogramBuilder(prefix + S3StreamMetricsConstant.ALLOCATE_BYTE_BUF_SIZE_METRIC_NAME) - .setDescription("Allocate byte buf size") - .setUnit("bytes") - .ofLongs() - .build(); readAheadSize = meter.histogramBuilder(prefix + S3StreamMetricsConstant.READ_AHEAD_SIZE_METRIC_NAME) .setDescription("Read ahead size") .setUnit("bytes") @@ -263,6 +267,27 @@ public static void initMetrics(Meter meter, String prefix) { .setDescription("Compaction write size") .setUnit("bytes") .build(); + allocatedDirectMemorySize = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.ALLOCATED_DIRECT_MEMORY_SIZE_METRIC_NAME) + .setDescription("Allocated direct memory size") + .setUnit("bytes") + .ofLongs() + .buildWithCallback(result -> { + if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel()) && DirectByteBufAlloc.directByteBufAllocMetric != null) { + Map allocateSizeMap = DirectByteBufAlloc.directByteBufAllocMetric.getDetailedMap(); + for (Map.Entry entry : allocateSizeMap.entrySet()) { + result.record(entry.getValue(), ALLOC_TYPE_ATTRIBUTES.get(entry.getKey())); + } + } + }); + usedDirectMemorySize = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.USED_DIRECT_MEMORY_SIZE_METRIC_NAME) + .setDescription("Used direct memory size") + .setUnit("bytes") + .ofLongs() + .buildWithCallback(result -> { + if (MetricsLevel.DEBUG.isWithin(metricsConfig.getMetricsLevel()) && DirectByteBufAlloc.directByteBufAllocMetric != null) { + result.record(DirectByteBufAlloc.directByteBufAllocMetric.getUsedDirectMemory(), metricsConfig.getBaseAttributes()); + } + }); } public static void registerNetworkLimiterSupplier(AsyncNetworkBandwidthLimiter.Type type, @@ -425,14 +450,6 @@ public static HistogramMetric buildNetworkOutboundLimiterQueueTimeMetric() { } } - public static HistogramMetric buildAllocateByteBufSizeMetric(String source) { - synchronized (BASE_ATTRIBUTES_LISTENERS) { - HistogramMetric metric = new HistogramMetric(metricsConfig, AttributesUtils.buildAttributes(source), allocateByteBufSize); - BASE_ATTRIBUTES_LISTENERS.add(metric); - return metric; - } - } - public static HistogramMetric buildReadAheadSizeMetric() { synchronized (BASE_ATTRIBUTES_LISTENERS) { HistogramMetric metric = new HistogramMetric(metricsConfig, readAheadSize); diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/ByteBufStats.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/ByteBufStats.java deleted file mode 100644 index 3a81533ee..000000000 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/stats/ByteBufStats.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright 2024, AutoMQ CO.,LTD. - * - * Use of this software is governed by the Business Source License - * included in the file BSL.md - * - * As of the Change Date specified in that file, in accordance with - * the Business Source License, use of this software will be governed - * by the Apache License, Version 2.0 - */ - -package com.automq.stream.s3.metrics.stats; - -import com.automq.stream.s3.metrics.S3StreamMetricsManager; -import com.automq.stream.s3.metrics.wrapper.HistogramMetric; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -public class ByteBufStats { - private volatile static ByteBufStats instance = null; - - private final Map allocateByteBufSizeStats = new ConcurrentHashMap<>(); - - private ByteBufStats() { - } - - public static ByteBufStats getInstance() { - if (instance == null) { - synchronized (ByteBufStats.class) { - if (instance == null) { - instance = new ByteBufStats(); - } - } - } - return instance; - } - - public HistogramMetric allocateByteBufSizeStats(String source) { - return allocateByteBufSizeStats.computeIfAbsent(source, S3StreamMetricsManager::buildAllocateByteBufSizeMetric); - } -} diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java index 500902cf4..08897e39c 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockImpl.java @@ -14,7 +14,6 @@ import com.automq.stream.s3.DirectByteBufAlloc; import com.automq.stream.s3.metrics.MetricsLevel; import com.automq.stream.s3.metrics.TimerUtil; -import com.automq.stream.s3.metrics.stats.ByteBufStats; import com.automq.stream.s3.metrics.stats.StorageOperationStats; import com.automq.stream.s3.wal.util.WALUtil; import io.netty.buffer.ByteBuf; @@ -107,10 +106,8 @@ public ByteBuf data() { data = DirectByteBufAlloc.compositeByteBuffer(); for (Supplier supplier : records) { ByteBuf record = supplier.get(); - ByteBufStats.getInstance().allocateByteBufSizeStats("wal_record").record(MetricsLevel.DEBUG, record.readableBytes()); data.addComponent(true, record); } - ByteBufStats.getInstance().allocateByteBufSizeStats("wal_block").record(MetricsLevel.DEBUG, data.readableBytes()); return data; }