Skip to content

Commit

Permalink
[feat][broker] PIP-264: Add broker web executor metrics (apache#22816)
Browse files Browse the repository at this point in the history
  • Loading branch information
dragosvictor authored Jun 5, 2024
1 parent c23e677 commit 4341f0f
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.web;

import com.google.common.annotations.VisibleForTesting;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;

public class WebExecutorThreadPoolStats implements AutoCloseable {
// Replaces ['pulsar_web_executor_max_threads', 'pulsar_web_executor_min_threads']
public static final String LIMIT_COUNTER = "pulsar.web.executor.thread.limit";
private final ObservableLongUpDownCounter limitCounter;

// Replaces
// ['pulsar_web_executor_active_threads', 'pulsar_web_executor_current_threads', 'pulsar_web_executor_idle_threads']
public static final String USAGE_COUNTER = "pulsar.web.executor.thread.usage";
private final ObservableLongUpDownCounter usageCounter;

public static final AttributeKey<String> LIMIT_TYPE_KEY =
AttributeKey.stringKey("pulsar.web.executor.thread.limit.type");
@VisibleForTesting
enum LimitType {
MAX,
MIN;
public final Attributes attributes = Attributes.of(LIMIT_TYPE_KEY, name().toLowerCase());
}

public static final AttributeKey<String> USAGE_TYPE_KEY =
AttributeKey.stringKey("pulsar.web.executor.thread.usage.type");
@VisibleForTesting
enum UsageType {
ACTIVE,
CURRENT,
IDLE;
public final Attributes attributes = Attributes.of(USAGE_TYPE_KEY, name().toLowerCase());
}

public WebExecutorThreadPoolStats(Meter meter, WebExecutorThreadPool executor) {
limitCounter = meter
.upDownCounterBuilder(LIMIT_COUNTER)
.setUnit("{thread}")
.setDescription("The thread limits for the pulsar-web executor pool.")
.buildWithCallback(measurement -> {
measurement.record(executor.getMaxThreads(), LimitType.MAX.attributes);
measurement.record(executor.getMinThreads(), LimitType.MIN.attributes);
});
usageCounter = meter
.upDownCounterBuilder(USAGE_COUNTER)
.setUnit("{thread}")
.setDescription("The current usage of threads in the pulsar-web executor pool.")
.buildWithCallback(measurement -> {
var idleThreads = executor.getIdleThreads();
var currentThreads = executor.getThreads();
measurement.record(idleThreads, UsageType.IDLE.attributes);
measurement.record(currentThreads, UsageType.CURRENT.attributes);
measurement.record(currentThreads - idleThreads, UsageType.ACTIVE.attributes);
});
}

@Override
public synchronized void close() {
limitCounter.close();
usageCounter.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,21 @@
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Gauge;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;

@Deprecated
class WebExecutorStats implements AutoCloseable {
private static final AtomicBoolean CLOSED = new AtomicBoolean(false);

@PulsarDeprecatedMetric(newMetricName = WebExecutorThreadPoolStats.LIMIT_COUNTER)
private final Gauge maxThreads;
@PulsarDeprecatedMetric(newMetricName = WebExecutorThreadPoolStats.LIMIT_COUNTER)
private final Gauge minThreads;
@PulsarDeprecatedMetric(newMetricName = WebExecutorThreadPoolStats.USAGE_COUNTER)
private final Gauge idleThreads;
@PulsarDeprecatedMetric(newMetricName = WebExecutorThreadPoolStats.USAGE_COUNTER)
private final Gauge activeThreads;
@PulsarDeprecatedMetric(newMetricName = WebExecutorThreadPoolStats.USAGE_COUNTER)
private final Gauge currentThreads;
private final WebExecutorThreadPool executor;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ public class WebService implements AutoCloseable {
private final PulsarService pulsar;
private final Server server;
private final List<Handler> handlers;
@Deprecated
private final WebExecutorStats executorStats;
private final WebExecutorThreadPoolStats webExecutorThreadPoolStats;
private final WebExecutorThreadPool webServiceExecutor;

private final ServerConnector httpConnector;
Expand All @@ -101,6 +103,8 @@ public WebService(PulsarService pulsar) throws PulsarServerException {
"pulsar-web",
config.getHttpServerThreadPoolQueueSize());
this.executorStats = WebExecutorStats.getStats(webServiceExecutor);
this.webExecutorThreadPoolStats =
new WebExecutorThreadPoolStats(pulsar.getOpenTelemetry().getMeter(), webServiceExecutor);
this.server = new Server(webServiceExecutor);
if (config.getMaxHttpServerConnections() > 0) {
server.addBean(new ConnectionLimit(config.getMaxHttpServerConnections(), server));
Expand Down Expand Up @@ -376,6 +380,7 @@ public void close() throws PulsarServerException {
jettyStatisticsCollector = null;
}
webServiceExecutor.join();
webExecutorThreadPoolStats.close();
this.executorStats.close();
log.info("Web service closed");
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
*/
package org.apache.pulsar.broker.web;

import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
Expand Down Expand Up @@ -59,6 +61,8 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.broker.web.WebExecutorThreadPoolStats.LimitType;
import org.apache.pulsar.broker.web.WebExecutorThreadPoolStats.UsageType;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
Expand Down Expand Up @@ -106,6 +110,19 @@ public class WebServiceTest {
@Test
public void testWebExecutorMetrics() throws Exception {
setupEnv(true, false, false, false, -1, false);

var otelMetrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
assertMetricLongSumValue(otelMetrics, WebExecutorThreadPoolStats.LIMIT_COUNTER, LimitType.MAX.attributes,
value -> assertThat(value).isPositive());
assertMetricLongSumValue(otelMetrics, WebExecutorThreadPoolStats.LIMIT_COUNTER, LimitType.MIN.attributes,
value -> assertThat(value).isPositive());
assertMetricLongSumValue(otelMetrics, WebExecutorThreadPoolStats.USAGE_COUNTER, UsageType.ACTIVE.attributes,
value -> assertThat(value).isNotNegative());
assertMetricLongSumValue(otelMetrics, WebExecutorThreadPoolStats.USAGE_COUNTER, UsageType.CURRENT.attributes,
value -> assertThat(value).isPositive());
assertMetricLongSumValue(otelMetrics, WebExecutorThreadPoolStats.USAGE_COUNTER, UsageType.IDLE.attributes,
value -> assertThat(value).isNotNegative());

ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsTestUtil.generate(pulsar, false, false, false, statsOut);
String metricsStr = statsOut.toString();
Expand Down Expand Up @@ -498,6 +515,7 @@ private void setupEnv(boolean enableFilter, boolean enableTls, boolean enableAut
pulsarTestContext = PulsarTestContext.builder()
.spyByDefault()
.config(config)
.enableOpenTelemetry(true)
.build();

pulsar = pulsarTestContext.getPulsarService();
Expand Down

0 comments on commit 4341f0f

Please sign in to comment.