Skip to content

Commit

Permalink
[improve][broker] Change rgCalculatedQuotaBytes and rgCalculatedQuota…
Browse files Browse the repository at this point in the history
…Messages types to gauge

Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece committed May 29, 2024
1 parent 8da4484 commit 0372fe8
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.Summary;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -573,11 +574,21 @@ protected BytesAndMessagesCount getPublishRateLimiters (String rgName) throws Pu

// Visibility for testing.
protected static double getRgQuotaByteCount (String rgName, String monClassName) {
return rgCalculatedQuotaBytesTotal.labels(rgName, monClassName).get();
}

// Visibility for testing.
protected static double getRgQuotaByte (String rgName, String monClassName) {
return rgCalculatedQuotaBytes.labels(rgName, monClassName).get();
}

// Visibility for testing.
protected static double getRgQuotaMessageCount (String rgName, String monClassName) {
return rgCalculatedQuotaMessagesTotal.labels(rgName, monClassName).get();
}

// Visibility for testing.
protected static double getRgQuotaMessage(String rgName, String monClassName) {
return rgCalculatedQuotaMessages.labels(rgName, monClassName).get();
}

Expand Down Expand Up @@ -725,13 +736,7 @@ protected void calculateQuotaForAllResourceGroups() {
globUsageMessagesArray);

BytesAndMessagesCount oldBMCount = resourceGroup.updateLocalQuota(monClass, updatedQuota);
// Guard against unconfigured quota settings, for which computeLocalQuota will return negative.
if (updatedQuota.messages >= 0) {
rgCalculatedQuotaMessages.labels(rgName, monClass.name()).inc(updatedQuota.messages);
}
if (updatedQuota.bytes >= 0) {
rgCalculatedQuotaBytes.labels(rgName, monClass.name()).inc(updatedQuota.bytes);
}
incRgCalculatedQuota(rgName, monClass, updatedQuota, resourceUsagePublishPeriodInSeconds);
if (oldBMCount != null) {
long messagesIncrement = updatedQuota.messages - oldBMCount.messages;
long bytesIncrement = updatedQuota.bytes - oldBMCount.bytes;
Expand Down Expand Up @@ -822,6 +827,21 @@ private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policie
}
}

static void incRgCalculatedQuota(String rgName, ResourceGroupMonitoringClass monClass,
BytesAndMessagesCount updatedQuota, long resourceUsagePublishPeriodInSeconds) {
// Guard against unconfigured quota settings, for which computeLocalQuota will return negative.
if (updatedQuota.messages >= 0) {
rgCalculatedQuotaMessagesTotal.labels(rgName, monClass.name())
.inc(updatedQuota.messages * resourceUsagePublishPeriodInSeconds);
rgCalculatedQuotaMessages.labels(rgName, monClass.name()).set(updatedQuota.messages);
}
if (updatedQuota.bytes >= 0) {
rgCalculatedQuotaBytesTotal.labels(rgName, monClass.name())
.inc(updatedQuota.bytes * resourceUsagePublishPeriodInSeconds);
rgCalculatedQuotaBytes.labels(rgName, monClass.name()).set(updatedQuota.bytes);
}
}

@VisibleForTesting
protected Cache<String, BytesAndMessagesCount> newStatsCache(long durationMS) {
return Caffeine.newBuilder()
Expand Down Expand Up @@ -870,12 +890,24 @@ protected Cache<String, BytesAndMessagesCount> newStatsCache(long durationMS) {
private static final String[] resourceGroupLabel = {"ResourceGroup"};
private static final String[] resourceGroupMonitoringclassLabels = {"ResourceGroup", "MonitoringClass"};

private static final Counter rgCalculatedQuotaBytes = Counter.build()
// Using _total is compatible with pulsar 2.11 or later.
private static final Counter rgCalculatedQuotaBytesTotal = Counter.build()
.name("pulsar_resource_group_calculated_bytes_quota_total")
.help("Bytes quota calculated for resource group")
.labelNames(resourceGroupMonitoringclassLabels)
.register();
private static final Gauge rgCalculatedQuotaBytes = Gauge.build()
.name("pulsar_resource_group_calculated_bytes_quota")
.help("Bytes quota calculated for resource group")
.labelNames(resourceGroupMonitoringclassLabels)
.register();
private static final Counter rgCalculatedQuotaMessages = Counter.build()
// Using _total is compatible with pulsar 2.11 or later.
private static final Counter rgCalculatedQuotaMessagesTotal = Counter.build()
.name("pulsar_resource_group_calculated_messages_quota_total")
.help("Messages quota calculated for resource group")
.labelNames(resourceGroupMonitoringclassLabels)
.register();
private static final Gauge rgCalculatedQuotaMessages = Gauge.build()
.name("pulsar_resource_group_calculated_messages_quota")
.help("Messages quota calculated for resource group")
.labelNames(resourceGroupMonitoringclassLabels)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.resourcegroup;

import static org.testng.Assert.assertEquals;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass;
import org.testng.annotations.Test;

public class ResourceGroupMetricTest {
@Test
public void testLocalQuotaMetric() {
String rgName = "my-resource-group";
ResourceGroupMonitoringClass publish = ResourceGroupMonitoringClass.Publish;
int reportPeriod = 2;
BytesAndMessagesCount b = new BytesAndMessagesCount();
b.messages = 10;
b.bytes = 20;
int incTimes = 2;
for (int i = 0; i < 2; i++) {
ResourceGroupService.incRgCalculatedQuota(rgName, publish, b, reportPeriod);
}
double rgLocalUsageByteCount = ResourceGroupService.getRgQuotaByteCount(rgName, publish.name());
double rgQuotaMessageCount = ResourceGroupService.getRgQuotaMessageCount(rgName, publish.name());
assertEquals(rgLocalUsageByteCount, incTimes * b.bytes * reportPeriod);
assertEquals(rgQuotaMessageCount, incTimes * b.messages * reportPeriod);

double rgLocalUsageByte = ResourceGroupService.getRgQuotaByte(rgName, publish.name());
double rgQuotaMessage = ResourceGroupService.getRgQuotaMessage(rgName, publish.name());
assertEquals(rgLocalUsageByte, b.bytes);
assertEquals(rgQuotaMessage, b.messages);
}
}

0 comments on commit 0372fe8

Please sign in to comment.