Skip to content

Commit

Permalink
[improve][broker] Improve rgCalculatedQuota and rgRemoteUsageReports …
Browse files Browse the repository at this point in the history
…metrics

Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece committed May 29, 2024
1 parent 8da4484 commit 973e717
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -449,12 +450,12 @@ protected BytesAndMessagesCount getRgPublishRateLimiterValues() {

// Visibility for unit testing
protected static double getRgRemoteUsageByteCount (String rgName, String monClassName, String brokerName) {
return rgRemoteUsageReportsBytes.labels(rgName, monClassName, brokerName).get();
return rgRemoteUsageReportsBytesTotal.labels(rgName, monClassName, brokerName).get();
}

// Visibility for unit testing
protected static double getRgRemoteUsageMessageCount (String rgName, String monClassName, String brokerName) {
return rgRemoteUsageReportsMessages.labels(rgName, monClassName, brokerName).get();
return rgRemoteUsageReportsMessagesTotal.labels(rgName, monClassName, brokerName).get();
}

// Visibility for unit testing
Expand Down Expand Up @@ -572,9 +573,10 @@ private void getUsageFromMonitoredEntity(ResourceGroupMonitoringClass monClass,
} finally {
monEntity.usageFromOtherBrokersLock.unlock();
}
rgRemoteUsageReportsBytes.labels(this.ruConsumer.getID(), monClass.name(), broker).inc(newByteCount);
rgRemoteUsageReportsMessages.labels(this.ruConsumer.getID(), monClass.name(), broker).inc(newMessageCount);

rgRemoteUsageReportsBytesTotal.labels(this.ruConsumer.getID(), monClass.name(), broker).inc(newByteCount);
rgRemoteUsageReportsMessagesTotal.labels(this.ruConsumer.getID(), monClass.name(), broker).inc(newMessageCount);
rgRemoteUsageReportsBytes.labels(this.ruConsumer.getID(), monClass.name(), broker).set(newByteCount);
rgRemoteUsageReportsMessages.labels(this.ruConsumer.getID(), monClass.name(), broker).set(newMessageCount);
oldByteCount = oldMessageCount = -1;
if (oldUsageStats != null) {
oldByteCount = oldUsageStats.usedValues.bytes;
Expand Down Expand Up @@ -687,12 +689,22 @@ PerMonitoringClassFields getMonitoredEntity(ResourceGroupMonitoringClass monClas
private static final String[] resourceGroupMontoringclassRemotebrokerLabels =
{"ResourceGroup", "MonitoringClass", "RemoteBroker"};

private static final Counter rgRemoteUsageReportsBytes = Counter.build()
private static final Counter rgRemoteUsageReportsBytesTotal = Counter.build()
.name("pulsar_resource_group_remote_usage_bytes_used_total")
.help("Bytes used reported about this <RG, monitoring class> from a remote broker")
.labelNames(resourceGroupMontoringclassRemotebrokerLabels)
.register();
private static final Gauge rgRemoteUsageReportsBytes = Gauge.build()
.name("pulsar_resource_group_remote_usage_bytes_used")
.help("Bytes used reported about this <RG, monitoring class> from a remote broker")
.labelNames(resourceGroupMontoringclassRemotebrokerLabels)
.register();
private static final Counter rgRemoteUsageReportsMessages = Counter.build()
private static final Counter rgRemoteUsageReportsMessagesTotal = Counter.build()
.name("pulsar_resource_group_remote_usage_messages_used_total")
.help("Messages used reported about this <RG, monitoring class> from a remote broker")
.labelNames(resourceGroupMontoringclassRemotebrokerLabels)
.register();
private static final Gauge rgRemoteUsageReportsMessages = Gauge.build()
.name("pulsar_resource_group_remote_usage_messages_used")
.help("Messages used reported about this <RG, monitoring class> from a remote broker")
.labelNames(resourceGroupMontoringclassRemotebrokerLabels)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.Summary;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -573,11 +574,21 @@ protected BytesAndMessagesCount getPublishRateLimiters (String rgName) throws Pu

// Visibility for testing.
protected static double getRgQuotaByteCount (String rgName, String monClassName) {
return rgCalculatedQuotaBytesTotal.labels(rgName, monClassName).get();
}

// Visibility for testing.
protected static double getRgQuotaByte (String rgName, String monClassName) {
return rgCalculatedQuotaBytes.labels(rgName, monClassName).get();
}

// Visibility for testing.
protected static double getRgQuotaMessageCount (String rgName, String monClassName) {
return rgCalculatedQuotaMessagesTotal.labels(rgName, monClassName).get();
}

// Visibility for testing.
protected static double getRgQuotaMessage(String rgName, String monClassName) {
return rgCalculatedQuotaMessages.labels(rgName, monClassName).get();
}

Expand Down Expand Up @@ -725,13 +736,7 @@ protected void calculateQuotaForAllResourceGroups() {
globUsageMessagesArray);

BytesAndMessagesCount oldBMCount = resourceGroup.updateLocalQuota(monClass, updatedQuota);
// Guard against unconfigured quota settings, for which computeLocalQuota will return negative.
if (updatedQuota.messages >= 0) {
rgCalculatedQuotaMessages.labels(rgName, monClass.name()).inc(updatedQuota.messages);
}
if (updatedQuota.bytes >= 0) {
rgCalculatedQuotaBytes.labels(rgName, monClass.name()).inc(updatedQuota.bytes);
}
incRgCalculatedQuota(rgName, monClass, updatedQuota, resourceUsagePublishPeriodInSeconds);
if (oldBMCount != null) {
long messagesIncrement = updatedQuota.messages - oldBMCount.messages;
long bytesIncrement = updatedQuota.bytes - oldBMCount.bytes;
Expand Down Expand Up @@ -822,6 +827,21 @@ private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policie
}
}

static void incRgCalculatedQuota(String rgName, ResourceGroupMonitoringClass monClass,
BytesAndMessagesCount updatedQuota, long resourceUsagePublishPeriodInSeconds) {
// Guard against unconfigured quota settings, for which computeLocalQuota will return negative.
if (updatedQuota.messages >= 0) {
rgCalculatedQuotaMessagesTotal.labels(rgName, monClass.name())
.inc(updatedQuota.messages * resourceUsagePublishPeriodInSeconds);
rgCalculatedQuotaMessages.labels(rgName, monClass.name()).set(updatedQuota.messages);
}
if (updatedQuota.bytes >= 0) {
rgCalculatedQuotaBytesTotal.labels(rgName, monClass.name())
.inc(updatedQuota.bytes * resourceUsagePublishPeriodInSeconds);
rgCalculatedQuotaBytes.labels(rgName, monClass.name()).set(updatedQuota.bytes);
}
}

@VisibleForTesting
protected Cache<String, BytesAndMessagesCount> newStatsCache(long durationMS) {
return Caffeine.newBuilder()
Expand Down Expand Up @@ -870,12 +890,22 @@ protected Cache<String, BytesAndMessagesCount> newStatsCache(long durationMS) {
private static final String[] resourceGroupLabel = {"ResourceGroup"};
private static final String[] resourceGroupMonitoringclassLabels = {"ResourceGroup", "MonitoringClass"};

private static final Counter rgCalculatedQuotaBytes = Counter.build()
private static final Counter rgCalculatedQuotaBytesTotal = Counter.build()
.name("pulsar_resource_group_calculated_bytes_quota_total")
.help("Bytes quota calculated for resource group")
.labelNames(resourceGroupMonitoringclassLabels)
.register();
private static final Gauge rgCalculatedQuotaBytes = Gauge.build()
.name("pulsar_resource_group_calculated_bytes_quota")
.help("Bytes quota calculated for resource group")
.labelNames(resourceGroupMonitoringclassLabels)
.register();
private static final Counter rgCalculatedQuotaMessages = Counter.build()
private static final Counter rgCalculatedQuotaMessagesTotal = Counter.build()
.name("pulsar_resource_group_calculated_messages_quota_total")
.help("Messages quota calculated for resource group")
.labelNames(resourceGroupMonitoringclassLabels)
.register();
private static final Gauge rgCalculatedQuotaMessages = Gauge.build()
.name("pulsar_resource_group_calculated_messages_quota")
.help("Messages quota calculated for resource group")
.labelNames(resourceGroupMonitoringclassLabels)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.resourcegroup;

import static org.testng.Assert.assertEquals;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass;
import org.testng.annotations.Test;

public class ResourceGroupMetricTest {
@Test
public void testLocalQuotaMetric() {
String rgName = "my-resource-group";
ResourceGroupMonitoringClass publish = ResourceGroupMonitoringClass.Publish;
int reportPeriod = 2;
BytesAndMessagesCount b = new BytesAndMessagesCount();
b.messages = 10;
b.bytes = 20;
int incTimes = 2;
for (int i = 0; i < 2; i++) {
ResourceGroupService.incRgCalculatedQuota(rgName, publish, b, reportPeriod);
}
double rgLocalUsageByteCount = ResourceGroupService.getRgQuotaByteCount(rgName, publish.name());
double rgQuotaMessageCount = ResourceGroupService.getRgQuotaMessageCount(rgName, publish.name());
assertEquals(rgLocalUsageByteCount, incTimes * b.bytes * reportPeriod);
assertEquals(rgQuotaMessageCount, incTimes * b.messages * reportPeriod);

double rgLocalUsageByte = ResourceGroupService.getRgQuotaByte(rgName, publish.name());
double rgQuotaMessage = ResourceGroupService.getRgQuotaMessage(rgName, publish.name());
assertEquals(rgLocalUsageByte, b.bytes);
assertEquals(rgQuotaMessage, b.messages);
}
}

0 comments on commit 973e717

Please sign in to comment.