Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Clean stats when resource is closed #23

Merged
merged 2 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.Summary;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import lombok.Getter;
import lombok.val;
import org.apache.pulsar.broker.PulsarService;
Expand Down Expand Up @@ -293,6 +296,36 @@ public void unRegisterNameSpace(String resourceGroupName, NamespaceName fqNamesp
throw new PulsarAdminException(errMesg);
}

aggregateLock.lock();

Set<String> invalidateAllKeyForProduce = new HashSet<>();
topicProduceStats.asMap().forEach((key, value) -> {
TopicName topicName = TopicName.get(key);
if (topicName.getNamespaceObject().equals(fqNamespaceName)) {
invalidateAllKeyForProduce.add(key);
}
});
topicProduceStats.invalidateAll(invalidateAllKeyForProduce);

Set<String> invalidateAllKeyForReplication = new HashSet<>();
topicToReplicatorsMap.forEach((key, value) -> {
TopicName topicName = TopicName.get(key);
if (topicName.getNamespaceObject().equals(fqNamespaceName)) {
value.forEach(n -> invalidateAllKeyForReplication.add(key));
}
});
replicationDispatchStats.invalidateAll(invalidateAllKeyForReplication);

Set<TopicName> invalidateAllKeyForConsumer = new HashSet<>();
topicConsumeStats.asMap().forEach((key, value) -> {
TopicName topicName = TopicName.get(key);
if (topicName.getNamespaceObject().equals(fqNamespaceName)) {
invalidateAllKeyForConsumer.add(topicName);
}
});
topicConsumeStats.invalidate(invalidateAllKeyForConsumer);

aggregateLock.unlock();
// Dissociate this NS-name from the RG.
this.namespaceToRGsMap.remove(fqNamespaceName, rg);
rgNamespaceUnRegisters.labels(resourceGroupName).inc();
Expand Down Expand Up @@ -326,15 +359,24 @@ public void registerTopic(String resourceGroupName, TopicName topicName) {
/**
* UnRegisters a topic from a resource group.
*
* @param topicName complete topic name
* @param topicName complete topic name
*/
public void unRegisterTopic(TopicName topicName) {
ResourceGroup remove = this.topicToRGsMap.remove(topicName);
aggregateLock.lock();
String topicNameString = topicName.toString();
ResourceGroup remove = topicToRGsMap.remove(topicName);
if (remove != null) {
remove.registerUsage(topicName.toString(), ResourceGroupRefTypes.Topics,
remove.registerUsage(topicNameString, ResourceGroupRefTypes.Topics,
false, this.resourceUsageTransportManagerMgr);
rgTopicUnRegisters.labels(remove.resourceGroupName).inc();
topicProduceStats.invalidate(topicNameString);
topicConsumeStats.invalidate(topicNameString);
Set<String> replicators = topicToReplicatorsMap.remove(topicNameString);
if (replicators != null) {
replicationDispatchStats.invalidateAll(replicators);
}
}
aggregateLock.unlock();
}

/**
Expand Down Expand Up @@ -496,6 +538,10 @@ private ResourceGroup checkResourceGroupExists(String rgName) throws PulsarAdmin
return rg;
}

private String getReplicatorKey(String topic, String replicationRemoteCluster) {
return topic + replicationRemoteCluster;
}

// Find the difference between the last time stats were updated for this topic, and the current
// time. If the difference is positive, update the stats.
@VisibleForTesting
Expand Down Expand Up @@ -530,7 +576,14 @@ protected void updateStatsWithDiff(String topicName, String replicationRemoteClu

String key;
if (monClass == ResourceGroupMonitoringClass.ReplicationDispatch) {
key = topicName + replicationRemoteCluster;
key = getReplicatorKey(topicName, replicationRemoteCluster);
topicToReplicatorsMap.compute(key, (n, value) -> {
if (value == null) {
value = new CopyOnWriteArraySet<>();
}
value.add(replicationRemoteCluster);
return value;
});
} else {
key = topicName;
}
Expand Down Expand Up @@ -644,6 +697,7 @@ protected void aggregateResourceGroupLocalUsages() {
BrokerService bs = this.pulsar.getBrokerService();
Map<String, TopicStatsImpl> topicStatsMap = bs.getTopicStats();

aggregateLock.lock();
for (Map.Entry<String, TopicStatsImpl> entry : topicStatsMap.entrySet()) {
final String topicName = entry.getKey();
final TopicStats topicStats = entry.getValue();
Expand Down Expand Up @@ -676,6 +730,7 @@ protected void aggregateResourceGroupLocalUsages() {
topicStats.getBytesOutCounter(), topicStats.getMsgOutCounter(),
ResourceGroupMonitoringClass.Dispatch);
}
aggregateLock.unlock();
double diffTimeSeconds = aggrUsageTimer.observeDuration();
if (log.isDebugEnabled()) {
log.debug("aggregateResourceGroupLocalUsages took {} milliseconds", diffTimeSeconds * 1000);
Expand Down Expand Up @@ -869,6 +924,8 @@ protected Cache<String, BytesAndMessagesCount> newStatsCache(long durationMS) {
// Given a qualified NS-name (i.e., in "tenant/namespace" format), record its associated resource-group
private ConcurrentHashMap<NamespaceName, ResourceGroup> namespaceToRGsMap = new ConcurrentHashMap<>();
private ConcurrentHashMap<TopicName, ResourceGroup> topicToRGsMap = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, Set<String>> topicToReplicatorsMap = new ConcurrentHashMap<>();
private ReentrantLock aggregateLock = new ReentrantLock();

// Maps to maintain the usage per topic, in produce/consume directions.
private Cache<String, BytesAndMessagesCount> topicProduceStats;
Expand Down Expand Up @@ -990,6 +1047,11 @@ Cache<String, BytesAndMessagesCount> getReplicationDispatchStats() {
return this.replicationDispatchStats;
}

@VisibleForTesting
ConcurrentHashMap<String, Set<String>> getTopicToReplicatorsMap() {
return this.topicToReplicatorsMap;
}

@VisibleForTesting
ScheduledFuture<?> getAggregateLocalUsagePeriodicTask() {
return this.aggregateLocalUsagePeriodicTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1173,7 +1173,10 @@ protected void closeResourceGroupLimiter() {
this.resourceGroupPublishLimiter = null;
this.resourceGroupDispatchRateLimiter = Optional.empty();
this.resourceGroupRateLimitingEnabled = false;
brokerService.getPulsar().getResourceGroupServiceManager().unRegisterTopic(TopicName.get(topic));
}
ResourceGroupService resourceGroupServiceManager = brokerService.getPulsar().getResourceGroupServiceManager();
if (resourceGroupServiceManager != null) {
resourceGroupServiceManager.unRegisterTopic(TopicName.get(topic));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Policy.Expiration;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.PerBrokerUsageStats;
Expand All @@ -33,9 +35,12 @@
import org.apache.pulsar.broker.service.resource.usage.NetworkUsage;
import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -277,6 +282,57 @@ public void testResourceGroupOps() throws PulsarAdminException, InterruptedExcep
Assert.assertThrows(PulsarAdminException.class, () -> rgs.getPublishRateLimiters(rgName));

Assert.assertEquals(rgs.getNumResourceGroups(), 0);

Assert.assertEquals(rgs.getTopicConsumeStats().estimatedSize(), 0);
Assert.assertEquals(rgs.getTopicProduceStats().estimatedSize(), 0);
Assert.assertEquals(rgs.getReplicationDispatchStats().estimatedSize(), 0);
Assert.assertEquals(rgs.getTopicToReplicatorsMap().size(), 0);
}

@Test
public void testCleanupStatsWhenNamespaceDeleted()
throws PulsarAdminException, PulsarClientException, InterruptedException {
String tenantName = UUID.randomUUID().toString();
admin.tenants().createTenant(tenantName,
TenantInfo.builder().allowedClusters(new HashSet<>(admin.clusters().getClusters())).build());
String nsName = tenantName + "/" + UUID.randomUUID();
admin.namespaces().createNamespace(nsName);
org.apache.pulsar.common.policies.data.ResourceGroup rgConfig =
new org.apache.pulsar.common.policies.data.ResourceGroup();
final String rgName = UUID.randomUUID().toString();
rgConfig.setPublishRateInBytes(15000L);
rgConfig.setPublishRateInMsgs(100);
rgConfig.setDispatchRateInBytes(40000L);
rgConfig.setDispatchRateInMsgs(500);
rgConfig.setReplicationDispatchRateInBytes(2000L);
rgConfig.setReplicationDispatchRateInMsgs(400L);

admin.resourcegroups().createResourceGroup(rgName, rgConfig);
admin.namespaces().setNamespaceResourceGroup(nsName, rgName);
Awaitility.await().ignoreExceptions().untilAsserted(() -> {
Assert.assertNotNull(rgs.getNamespaceResourceGroup(NamespaceName.get(nsName)));
});

String topic = nsName + "/" + UUID.randomUUID();
@Cleanup
Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).create();
producer.send("hi".getBytes(StandardCharsets.UTF_8));

rgs.aggregateResourceGroupLocalUsages();
producer.close();
Assert.assertEquals(rgs.getTopicProduceStats().asMap().size(), 1);
Assert.assertEquals(rgs.getTopicConsumeStats().asMap().size(), 0);
Assert.assertEquals(rgs.getReplicationDispatchStats().asMap().size(), 0);
admin.topics().delete(topic);
admin.namespaces().deleteNamespace(nsName);
admin.resourcegroups().deleteResourceGroup(rgName);
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(rgs.getTopicProduceStats().asMap().size(), 0);
Assert.assertEquals(rgs.getTopicConsumeStats().asMap().size(), 0);
Assert.assertEquals(rgs.getReplicationDispatchStats().asMap().size(), 0);
Assert.assertNull(rgs.getNamespaceResourceGroup(NamespaceName.get(nsName)));
});
}

@Test
Expand Down
Loading