From 4341f0f301e0da344bb5ce07bc62c373e7ce48ef Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Wed, 5 Jun 2024 16:34:56 -0700 Subject: [PATCH] [feat][broker] PIP-264: Add broker web executor metrics (#22816) --- .../web/WebExecutorThreadPoolStats.java | 83 +++++++++++++++++++ .../pulsar/broker/web/WebExecutorStats.java | 7 ++ .../apache/pulsar/broker/web/WebService.java | 5 ++ .../pulsar/broker/web/WebServiceTest.java | 18 ++++ 4 files changed, 113 insertions(+) create mode 100644 pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/WebExecutorThreadPoolStats.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/WebExecutorThreadPoolStats.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/WebExecutorThreadPoolStats.java new file mode 100644 index 0000000000000..6bfe4e33b8e5b --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/WebExecutorThreadPoolStats.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.web; + +import com.google.common.annotations.VisibleForTesting; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableLongUpDownCounter; + +public class WebExecutorThreadPoolStats implements AutoCloseable { + // Replaces ['pulsar_web_executor_max_threads', 'pulsar_web_executor_min_threads'] + public static final String LIMIT_COUNTER = "pulsar.web.executor.thread.limit"; + private final ObservableLongUpDownCounter limitCounter; + + // Replaces + // ['pulsar_web_executor_active_threads', 'pulsar_web_executor_current_threads', 'pulsar_web_executor_idle_threads'] + public static final String USAGE_COUNTER = "pulsar.web.executor.thread.usage"; + private final ObservableLongUpDownCounter usageCounter; + + public static final AttributeKey LIMIT_TYPE_KEY = + AttributeKey.stringKey("pulsar.web.executor.thread.limit.type"); + @VisibleForTesting + enum LimitType { + MAX, + MIN; + public final Attributes attributes = Attributes.of(LIMIT_TYPE_KEY, name().toLowerCase()); + } + + public static final AttributeKey USAGE_TYPE_KEY = + AttributeKey.stringKey("pulsar.web.executor.thread.usage.type"); + @VisibleForTesting + enum UsageType { + ACTIVE, + CURRENT, + IDLE; + public final Attributes attributes = Attributes.of(USAGE_TYPE_KEY, name().toLowerCase()); + } + + public WebExecutorThreadPoolStats(Meter meter, WebExecutorThreadPool executor) { + limitCounter = meter + .upDownCounterBuilder(LIMIT_COUNTER) + .setUnit("{thread}") + .setDescription("The thread limits for the pulsar-web executor pool.") + .buildWithCallback(measurement -> { + measurement.record(executor.getMaxThreads(), LimitType.MAX.attributes); + measurement.record(executor.getMinThreads(), LimitType.MIN.attributes); + }); + usageCounter = meter + .upDownCounterBuilder(USAGE_COUNTER) + .setUnit("{thread}") + .setDescription("The current usage of threads in the pulsar-web executor pool.") + .buildWithCallback(measurement -> { + var idleThreads = executor.getIdleThreads(); + var currentThreads = executor.getThreads(); + measurement.record(idleThreads, UsageType.IDLE.attributes); + measurement.record(currentThreads, UsageType.CURRENT.attributes); + measurement.record(currentThreads - idleThreads, UsageType.ACTIVE.attributes); + }); + } + + @Override + public synchronized void close() { + limitCounter.close(); + usageCounter.close(); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebExecutorStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebExecutorStats.java index 585df813027d7..28cfa7430cbe6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebExecutorStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebExecutorStats.java @@ -21,14 +21,21 @@ import io.prometheus.client.CollectorRegistry; import io.prometheus.client.Gauge; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric; +@Deprecated class WebExecutorStats implements AutoCloseable { private static final AtomicBoolean CLOSED = new AtomicBoolean(false); + @PulsarDeprecatedMetric(newMetricName = WebExecutorThreadPoolStats.LIMIT_COUNTER) private final Gauge maxThreads; + @PulsarDeprecatedMetric(newMetricName = WebExecutorThreadPoolStats.LIMIT_COUNTER) private final Gauge minThreads; + @PulsarDeprecatedMetric(newMetricName = WebExecutorThreadPoolStats.USAGE_COUNTER) private final Gauge idleThreads; + @PulsarDeprecatedMetric(newMetricName = WebExecutorThreadPoolStats.USAGE_COUNTER) private final Gauge activeThreads; + @PulsarDeprecatedMetric(newMetricName = WebExecutorThreadPoolStats.USAGE_COUNTER) private final Gauge currentThreads; private final WebExecutorThreadPool executor; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java index 9a439268a8b4f..bf484d4f41f65 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java @@ -75,7 +75,9 @@ public class WebService implements AutoCloseable { private final PulsarService pulsar; private final Server server; private final List handlers; + @Deprecated private final WebExecutorStats executorStats; + private final WebExecutorThreadPoolStats webExecutorThreadPoolStats; private final WebExecutorThreadPool webServiceExecutor; private final ServerConnector httpConnector; @@ -101,6 +103,8 @@ public WebService(PulsarService pulsar) throws PulsarServerException { "pulsar-web", config.getHttpServerThreadPoolQueueSize()); this.executorStats = WebExecutorStats.getStats(webServiceExecutor); + this.webExecutorThreadPoolStats = + new WebExecutorThreadPoolStats(pulsar.getOpenTelemetry().getMeter(), webServiceExecutor); this.server = new Server(webServiceExecutor); if (config.getMaxHttpServerConnections() > 0) { server.addBean(new ConnectionLimit(config.getMaxHttpServerConnections(), server)); @@ -376,6 +380,7 @@ public void close() throws PulsarServerException { jettyStatisticsCollector = null; } webServiceExecutor.join(); + webExecutorThreadPoolStats.close(); this.executorStats.close(); log.info("Web service closed"); } catch (Exception e) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java index 17588a7ecac8b..30644237a7405 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java @@ -18,8 +18,10 @@ */ package org.apache.pulsar.broker.web; +import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric; import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics; +import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -59,6 +61,8 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.testcontext.PulsarTestContext; +import org.apache.pulsar.broker.web.WebExecutorThreadPoolStats.LimitType; +import org.apache.pulsar.broker.web.WebExecutorThreadPoolStats.UsageType; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException; @@ -106,6 +110,19 @@ public class WebServiceTest { @Test public void testWebExecutorMetrics() throws Exception { setupEnv(true, false, false, false, -1, false); + + var otelMetrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + assertMetricLongSumValue(otelMetrics, WebExecutorThreadPoolStats.LIMIT_COUNTER, LimitType.MAX.attributes, + value -> assertThat(value).isPositive()); + assertMetricLongSumValue(otelMetrics, WebExecutorThreadPoolStats.LIMIT_COUNTER, LimitType.MIN.attributes, + value -> assertThat(value).isPositive()); + assertMetricLongSumValue(otelMetrics, WebExecutorThreadPoolStats.USAGE_COUNTER, UsageType.ACTIVE.attributes, + value -> assertThat(value).isNotNegative()); + assertMetricLongSumValue(otelMetrics, WebExecutorThreadPoolStats.USAGE_COUNTER, UsageType.CURRENT.attributes, + value -> assertThat(value).isPositive()); + assertMetricLongSumValue(otelMetrics, WebExecutorThreadPoolStats.USAGE_COUNTER, UsageType.IDLE.attributes, + value -> assertThat(value).isNotNegative()); + ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); PrometheusMetricsTestUtil.generate(pulsar, false, false, false, statsOut); String metricsStr = statsOut.toString(); @@ -498,6 +515,7 @@ private void setupEnv(boolean enableFilter, boolean enableTls, boolean enableAut pulsarTestContext = PulsarTestContext.builder() .spyByDefault() .config(config) + .enableOpenTelemetry(true) .build(); pulsar = pulsarTestContext.getPulsarService();