Skip to content

Commit

Permalink
[improve][broker] Cache the internal writer when sent to system topic. (
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- authored Feb 24, 2024
1 parent 1c652f5 commit 8607905
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.broker.service;

import static java.util.Objects.requireNonNull;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import java.util.HashSet;
Expand All @@ -29,6 +31,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.tuple.MutablePair;
Expand Down Expand Up @@ -84,10 +87,25 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
@VisibleForTesting
final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners = new ConcurrentHashMap<>();

private final AsyncLoadingCache<NamespaceName, SystemTopicClient.Writer<PulsarEvent>> writerCaches;

public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) {
this.pulsarService = pulsarService;
this.clusterName = pulsarService.getConfiguration().getClusterName();
this.localCluster = Sets.newHashSet(clusterName);
this.writerCaches = Caffeine.newBuilder()
.expireAfterAccess(5, TimeUnit.MINUTES)
.removalListener((namespaceName, writer, cause) -> {
((SystemTopicClient.Writer) writer).closeAsync().exceptionally(ex -> {
log.error("[{}] Close writer error.", namespaceName, ex);
return null;
});
})
.buildAsync((namespaceName, executor) -> {
SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
.createTopicPoliciesSystemTopicClient(namespaceName);
return systemTopicClient.newWriterAsync();
});
}

@Override
Expand Down Expand Up @@ -122,39 +140,32 @@ private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, Action
} catch (PulsarServerException e) {
return CompletableFuture.failedFuture(e);
}

SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());

return systemTopicClient.newWriterAsync()
.thenCompose(writer -> {
PulsarEvent event = getPulsarEvent(topicName, actionType, policies);
CompletableFuture<MessageId> writeFuture =
ActionType.DELETE.equals(actionType) ? writer.deleteAsync(getEventKey(event), event)
: writer.writeAsync(getEventKey(event), event);
return writeFuture.handle((messageId, e) -> {
if (e != null) {
return CompletableFuture.failedFuture(e);
CompletableFuture<Void> result = new CompletableFuture<>();
writerCaches.get(topicName.getNamespaceObject())
.whenComplete((writer, cause) -> {
if (cause != null) {
writerCaches.synchronous().invalidate(topicName.getNamespaceObject());
result.completeExceptionally(cause);
} else {
if (messageId != null) {
return CompletableFuture.completedFuture(null);
} else {
return CompletableFuture.failedFuture(
new RuntimeException("Got message id is null."));
}
}
}).thenRun(() ->
writer.closeAsync().whenComplete((v, cause) -> {
if (cause != null) {
log.error("[{}] Close writer error.", topicName, cause);
PulsarEvent event = getPulsarEvent(topicName, actionType, policies);
CompletableFuture<MessageId> writeFuture = ActionType.DELETE.equals(actionType)
? writer.deleteAsync(getEventKey(event), event)
: writer.writeAsync(getEventKey(event), event);
writeFuture.whenComplete((messageId, e) -> {
if (e != null) {
result.completeExceptionally(e);
} else {
if (messageId != null) {
result.complete(null);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Close writer success.", topicName);
}
result.completeExceptionally(
new RuntimeException("Got message id is null."));
}
})
);
}
});
}
});
return result;
});
}

Expand Down Expand Up @@ -364,7 +375,7 @@ public CompletableFuture<Void> removeOwnedNamespaceBundleAsync(NamespaceBundle n
}
AtomicInteger bundlesCount = ownedBundlesCountPerNamespace.get(namespace);
if (bundlesCount == null || bundlesCount.decrementAndGet() <= 0) {
cleanCacheAndCloseReader(namespace, true);
cleanCacheAndCloseReader(namespace, true, true);
}
return CompletableFuture.completedFuture(null);
}
Expand Down Expand Up @@ -440,6 +451,14 @@ private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, Comp
}

private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean cleanOwnedBundlesCount) {
cleanCacheAndCloseReader(namespace, cleanOwnedBundlesCount, false);
}

private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean cleanOwnedBundlesCount,
boolean cleanWriterCache) {
if (cleanWriterCache) {
writerCaches.synchronous().invalidate(namespace);
}
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerFuture = readerCaches.remove(namespace);

if (cleanOwnedBundlesCount) {
Expand Down Expand Up @@ -688,5 +707,10 @@ protected Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> getListeners(
return listeners;
}

@VisibleForTesting
protected AsyncLoadingCache<NamespaceName, SystemTopicClient.Writer<PulsarEvent>> getWriterCaches() {
return writerCaches;
}

private static final Logger log = LoggerFactory.getLogger(SystemTopicBasedTopicPoliciesService.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.events.ActionType;
import org.apache.pulsar.common.events.PulsarEvent;
import org.apache.pulsar.common.naming.TopicName;
Expand All @@ -41,13 +43,17 @@
*/
public class TopicPoliciesSystemTopicClient extends SystemTopicClientBase<PulsarEvent> {

static Schema<PulsarEvent> avroSchema = DefaultImplementation.getDefaultImplementation()
.newAvroSchema(SchemaDefinition.builder().withPojo(PulsarEvent.class).build());

public TopicPoliciesSystemTopicClient(PulsarClient client, TopicName topicName) {
super(client, topicName);

}

@Override
protected CompletableFuture<Writer<PulsarEvent>> newWriterAsyncInternal() {
return client.newProducer(Schema.AVRO(PulsarEvent.class))
return client.newProducer(avroSchema)
.topic(topicName.toString())
.enableBatching(false)
.createAsync()
Expand All @@ -61,7 +67,7 @@ protected CompletableFuture<Writer<PulsarEvent>> newWriterAsyncInternal() {

@Override
protected CompletableFuture<Reader<PulsarEvent>> newReaderAsyncInternal() {
return client.newReader(Schema.AVRO(PulsarEvent.class))
return client.newReader(avroSchema)
.topic(topicName.toString())
.startMessageId(MessageId.earliest)
.readCompacted(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -43,6 +44,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.BackoffBuilder;
import org.apache.pulsar.common.events.PulsarEvent;
Expand All @@ -67,6 +69,8 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
private static final String NAMESPACE2 = "system-topic/namespace-2";
private static final String NAMESPACE3 = "system-topic/namespace-3";

private static final String NAMESPACE4 = "system-topic/namespace-4";

private static final TopicName TOPIC1 = TopicName.get("persistent", NamespaceName.get(NAMESPACE1), "topic-1");
private static final TopicName TOPIC2 = TopicName.get("persistent", NamespaceName.get(NAMESPACE1), "topic-2");
private static final TopicName TOPIC3 = TopicName.get("persistent", NamespaceName.get(NAMESPACE2), "topic-1");
Expand Down Expand Up @@ -430,4 +434,35 @@ public void testGetTopicPoliciesWithCleanCache() throws Exception {

result.join();
}

@Test
public void testWriterCache() throws Exception {
admin.namespaces().createNamespace(NAMESPACE4);
for (int i = 1; i <= 5; i ++) {
final String topicName = "persistent://" + NAMESPACE4 + "/testWriterCache" + i;
admin.topics().createNonPartitionedTopic(topicName);
pulsarClient.newProducer(Schema.STRING).topic(topicName).create().close();
}
@Cleanup("shutdown")
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 1; i <= 5; i ++) {
int finalI = i;
executorService.execute(() -> {
final String topicName = "persistent://" + NAMESPACE4 + "/testWriterCache" + finalI;
try {
admin.topicPolicies().setMaxConsumers(topicName, 2);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
SystemTopicBasedTopicPoliciesService service = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
Assert.assertNotNull(service.getWriterCaches().synchronous().get(NamespaceName.get(NAMESPACE4)));
for (int i = 1; i <= 5; i ++) {
final String topicName = "persistent://" + NAMESPACE4 + "/testWriterCache" + i;
admin.topics().delete(topicName);
}
admin.namespaces().deleteNamespace(NAMESPACE4);
Assert.assertNull(service.getWriterCaches().synchronous().getIfPresent(NamespaceName.get(NAMESPACE4)));
}
}

0 comments on commit 8607905

Please sign in to comment.