From 0a2ffe4743799dc253814d49fa3fd29b933444ac Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Sun, 8 Dec 2024 22:26:51 +0800 Subject: [PATCH] [fix][admin] Listen partitioned topic creation event (#23680) Signed-off-by: Zixuan Liu --- .../pulsar/broker/admin/AdminResource.java | 15 ++++++++++++ .../broker/TopicEventsListenerTest.java | 23 ++++++++++++++----- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 45772dc279bab..4d890a3d5db4d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -45,6 +45,8 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.resources.ClusterResources; +import org.apache.pulsar.broker.service.TopicEventsListener.EventStage; +import org.apache.pulsar.broker.service.TopicEventsListener.TopicEvent; import org.apache.pulsar.broker.service.TopicPoliciesService; import org.apache.pulsar.broker.service.plugin.InvalidEntryFilterException; import org.apache.pulsar.broker.web.PulsarWebResource; @@ -166,6 +168,10 @@ public CompletableFuture validatePoliciesReadOnlyAccessAsync() { protected CompletableFuture tryCreatePartitionsAsync(int numPartitions) { if (!topicName.isPersistent()) { + for (int i = 0; i < numPartitions; i++) { + pulsar().getBrokerService().getTopicEventsDispatcher() + .notify(topicName.getPartition(i).toString(), TopicEvent.CREATE, EventStage.SUCCESS); + } return CompletableFuture.completedFuture(null); } List> futures = new ArrayList<>(numPartitions); @@ -201,6 +207,8 @@ private CompletableFuture tryCreatePartitionAsync(final int partition) { } return null; }); + pulsar().getBrokerService().getTopicEventsDispatcher() + .notifyOnCompletion(result, topicName.getPartition(partition).toString(), TopicEvent.CREATE); return result; } @@ -594,6 +602,13 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n throw new RestException(Status.CONFLICT, "This topic already exists"); } }) + .thenRun(() -> { + for (int i = 0; i < numPartitions; i++) { + pulsar().getBrokerService().getTopicEventsDispatcher() + .notify(topicName.getPartition(i).toString(), TopicEvent.CREATE, + EventStage.BEFORE); + } + }) .thenCompose(__ -> provisionPartitionedTopicPath(numPartitions, createLocalTopicOnly, properties)) .thenCompose(__ -> tryCreatePartitionsAsync(numPartitions)) .thenRun(() -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java index ceb3c1d0d9335..152b4aeeeb213 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java @@ -265,12 +265,23 @@ public void testTopicAutoGC(String topicTypePersistence, String topicTypePartiti private void createTopicAndVerifyEvents(String topicDomain, String topicTypePartitioned, String topicName) throws Exception { final String[] expectedEvents; if (topicDomain.equalsIgnoreCase("persistent") || topicTypePartitioned.equals("partitioned")) { - expectedEvents = new String[]{ - "LOAD__BEFORE", - "CREATE__BEFORE", - "CREATE__SUCCESS", - "LOAD__SUCCESS" - }; + if (topicTypePartitioned.equals("partitioned")) { + expectedEvents = new String[]{ + "CREATE__BEFORE", + "CREATE__SUCCESS", + "LOAD__BEFORE", + "CREATE__BEFORE", + "CREATE__SUCCESS", + "LOAD__SUCCESS" + }; + } else { + expectedEvents = new String[]{ + "LOAD__BEFORE", + "CREATE__BEFORE", + "CREATE__SUCCESS", + "LOAD__SUCCESS" + }; + } } else { expectedEvents = new String[]{ // Before https://github.com/apache/pulsar/pull/21995, Pulsar will skip create topic if the topic