From 038c574343b2b8ff99dbb93ffbf643bedc584592 Mon Sep 17 00:00:00 2001 From: 278903642 <129270163+278903642@users.noreply.github.com> Date: Mon, 25 Dec 2023 11:13:11 +0800 Subject: [PATCH] [improve] PIP-241: add TopicEventListener / topic events for the BrokerService (#5) Co-authored-by: liudezhi --- .../pulsar/broker/service/BrokerService.java | 67 +++- .../broker/service/TopicEventsDispatcher.java | 137 ++++++++ .../broker/service/TopicEventsListener.java | 62 ++++ .../broker/TopicEventsListenerTest.java | 306 ++++++++++++++++++ .../pulsar/broker/service/BrokerTestBase.java | 2 +- .../pulsar/broker/service/ServerCnxTest.java | 1 - 6 files changed, 571 insertions(+), 4 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicEventsDispatcher.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicEventsListener.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 4198c253144da..e52f8957b2246 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -114,6 +114,8 @@ import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; +import org.apache.pulsar.broker.service.TopicEventsListener.EventStage; +import org.apache.pulsar.broker.service.TopicEventsListener.TopicEvent; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; @@ -287,6 +289,8 @@ public class BrokerService implements Closeable { private Set brokerEntryMetadataInterceptors; private Set brokerEntryPayloadProcessors; + private final TopicEventsDispatcher topicEventsDispatcher = new TopicEventsDispatcher(); + public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws Exception { this.pulsar = pulsar; this.preciseTopicPublishRateLimitingEnable = @@ -401,6 +405,16 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws this.bundlesQuotas = new BundlesQuotas(pulsar.getLocalMetadataStore()); } + public void addTopicEventListener(TopicEventsListener... listeners) { + topicEventsDispatcher.addTopicEventListener(listeners); + getTopics().keys().forEach(topic -> + TopicEventsDispatcher.notify(listeners, topic, TopicEvent.LOAD, EventStage.SUCCESS, null)); + } + + public void removeTopicEventListener(TopicEventsListener... listeners) { + topicEventsDispatcher.removeTopicEventListener(listeners); + } + // This call is used for starting additional protocol handlers public void startProtocolHandlers( Map>> protocolHandlers) { @@ -1010,17 +1024,37 @@ public CompletableFuture> getTopic(final String topic, boolean c }); } else { return topics.computeIfAbsent(topic, (name) -> { + topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.BEFORE); if (topicName.isPartitioned()) { final TopicName partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName()); return this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata) -> { if (topicName.getPartitionIndex() < metadata.partitions) { - return createNonPersistentTopic(name); + topicEventsDispatcher + .notify(topicName.toString(), TopicEvent.CREATE, EventStage.BEFORE); + + CompletableFuture> res = createNonPersistentTopic(name); + + CompletableFuture> eventFuture = topicEventsDispatcher + .notifyOnCompletion(res, topicName.toString(), TopicEvent.CREATE); + topicEventsDispatcher + .notifyOnCompletion(eventFuture, topicName.toString(), TopicEvent.LOAD); + return res; } + topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.FAILURE); return CompletableFuture.completedFuture(Optional.empty()); }); } else if (createIfMissing) { - return createNonPersistentTopic(name); + topicEventsDispatcher.notify(topicName.toString(), TopicEvent.CREATE, EventStage.BEFORE); + + CompletableFuture> res = createNonPersistentTopic(name); + + CompletableFuture> eventFuture = topicEventsDispatcher + .notifyOnCompletion(res, topicName.toString(), TopicEvent.CREATE); + topicEventsDispatcher + .notifyOnCompletion(eventFuture, topicName.toString(), TopicEvent.LOAD); + return res; } else { + topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.FAILURE); return CompletableFuture.completedFuture(Optional.empty()); } }); @@ -1066,6 +1100,13 @@ public CompletableFuture deleteTopic(String topic, boolean forceDelete) { } public CompletableFuture deleteTopic(String topic, boolean forceDelete, boolean deleteSchema) { + topicEventsDispatcher.notify(topic, TopicEvent.DELETE, EventStage.BEFORE); + CompletableFuture result = deleteTopicInternal(topic, forceDelete, deleteSchema); + topicEventsDispatcher.notifyOnCompletion(result, topic, TopicEvent.DELETE); + return result; + } + + public CompletableFuture deleteTopicInternal(String topic, boolean forceDelete, boolean deleteSchema) { Optional optTopic = getTopicReference(topic); if (optTopic.isPresent()) { Topic t = optTopic.get(); @@ -1526,6 +1567,24 @@ private void createPersistentTopic(final String topic, boolean createIfMissing, managedLedgerConfig.setCreateIfMissing(createIfMissing); managedLedgerConfig.setProperties(properties); + topicEventsDispatcher.notify(topic, TopicEvent.LOAD, EventStage.BEFORE); + // load can fail with topicFuture completed non-exceptionally + // work around this + final CompletableFuture loadFuture = new CompletableFuture<>(); + topicFuture.whenComplete((res, ex) -> { + if (ex == null) { + loadFuture.complete(null); + } else { + loadFuture.completeExceptionally(ex); + } + }); + + if (createIfMissing) { + topicEventsDispatcher.notify(topic, TopicEvent.CREATE, EventStage.BEFORE); + topicEventsDispatcher.notifyOnCompletion(topicFuture, topic, TopicEvent.CREATE); + } + topicEventsDispatcher.notifyOnCompletion(loadFuture, topic, TopicEvent.LOAD); + // Once we have the configuration, we can proceed with the async open operation managedLedgerFactory.asyncOpen(topicName.getPersistenceNamingEncoding(), managedLedgerConfig, new OpenLedgerCallback() { @@ -1594,6 +1653,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { if (!createIfMissing && exception instanceof ManagedLedgerNotFoundException) { // We were just trying to load a topic and the topic doesn't exist + loadFuture.completeExceptionally(exception); topicFuture.complete(Optional.empty()); } else { log.warn("Failed to create topic {}", topic, exception); @@ -2085,6 +2145,8 @@ private void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle, String bundleName = namespaceBundle.toString(); String namespaceName = TopicName.get(topic).getNamespaceObject().toString(); + topicEventsDispatcher.notify(topic, TopicEvent.UNLOAD, EventStage.BEFORE); + synchronized (multiLayerTopicsMap) { ConcurrentOpenHashMap> namespaceMap = multiLayerTopicsMap .get(namespaceName); @@ -2119,6 +2181,7 @@ private void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle, if (compactor != null) { compactor.getStats().removeTopic(topic); } + topicEventsDispatcher.notify(topic, TopicEvent.UNLOAD, EventStage.SUCCESS); } public int getNumberOfNamespaceBundles() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicEventsDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicEventsDispatcher.java new file mode 100644 index 0000000000000..bb8c7b4ec41b8 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicEventsDispatcher.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import lombok.extern.slf4j.Slf4j; + +/** + * Utility class to dispatch topic events. + */ +@Slf4j +public class TopicEventsDispatcher { + private final List topicEventListeners = new CopyOnWriteArrayList<>(); + + /** + * Adds listeners, ignores null listeners. + * @param listeners + */ + public void addTopicEventListener(TopicEventsListener... listeners) { + Objects.requireNonNull(listeners); + Arrays.stream(listeners) + .filter(x -> x != null) + .forEach(topicEventListeners::add); + } + + /** + * Removes listeners. + * @param listeners + */ + public void removeTopicEventListener(TopicEventsListener... listeners) { + Objects.requireNonNull(listeners); + Arrays.stream(listeners) + .filter(x -> x != null) + .forEach(topicEventListeners::remove); + } + + /** + * Dispatches notification to all currently added listeners. + * @param topic + * @param event + * @param stage + */ + public void notify(String topic, + TopicEventsListener.TopicEvent event, + TopicEventsListener.EventStage stage) { + notify(topic, event, stage, null); + } + + /** + * Dispatches notification to all currently added listeners. + * @param topic + * @param event + * @param stage + * @param t + */ + public void notify(String topic, + TopicEventsListener.TopicEvent event, + TopicEventsListener.EventStage stage, + Throwable t) { + topicEventListeners + .forEach(listener -> notify(listener, topic, event, stage, t)); + } + + /** + * Dispatches SUCCESS/FAILURE notification to all currently added listeners on completion of the future. + * @param future + * @param topic + * @param event + * @param + * @return future of a new completion stage + */ + public CompletableFuture notifyOnCompletion(CompletableFuture future, + String topic, + TopicEventsListener.TopicEvent event) { + return future.whenComplete((r, ex) -> notify(topic, + event, + ex == null ? TopicEventsListener.EventStage.SUCCESS : TopicEventsListener.EventStage.FAILURE, + ex)); + } + + /** + * Dispatches notification to specified listeners. + * @param listeners + * @param topic + * @param event + * @param stage + * @param t + */ + public static void notify(TopicEventsListener[] listeners, + String topic, + TopicEventsListener.TopicEvent event, + TopicEventsListener.EventStage stage, + Throwable t) { + Objects.requireNonNull(listeners); + for (TopicEventsListener listener: listeners) { + notify(listener, topic, event, stage, t); + } + } + + private static void notify(TopicEventsListener listener, + String topic, + TopicEventsListener.TopicEvent event, + TopicEventsListener.EventStage stage, + Throwable t) { + if (listener == null) { + return; + } + + try { + listener.handleEvent(topic, event, stage, t); + } catch (Throwable ex) { + log.error("TopicEventsListener {} exception while handling {}_{} for topic {}", + listener, event, stage, topic, ex); + } + } + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicEventsListener.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicEventsListener.java new file mode 100644 index 0000000000000..24a0cbac67c90 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicEventsListener.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import org.apache.pulsar.common.classification.InterfaceAudience; +import org.apache.pulsar.common.classification.InterfaceStability; + +/** + * Listener for the Topic events. + */ +@InterfaceStability.Evolving +@InterfaceAudience.LimitedPrivate +public interface TopicEventsListener { + + /** + * Types of events currently supported. + * create/load/unload/delete + */ + enum TopicEvent { + // create events included into load events + CREATE, + LOAD, + UNLOAD, + DELETE, + } + + /** + * Stages of events currently supported. + * before starting the event/successful completion/failed completion + */ + enum EventStage { + BEFORE, + SUCCESS, + FAILURE + } + + /** + * Handle topic event. + * Choice of the thread / maintenance of the thread pool is up to the event handlers. + * @param topicName - name of the topic + * @param event - TopicEvent + * @param stage - EventStage + * @param t - exception in case of FAILURE, if present/known + */ + void handleEvent(String topicName, TopicEvent event, EventStage stage, Throwable t); +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java new file mode 100644 index 0000000000000..4a60df1f4d804 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker; + +import com.google.common.collect.Sets; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; +import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.awaitility.Awaitility; +import org.testng.Assert; +import org.testng.annotations.*; + +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +@Slf4j +public class TopicEventsListenerTest extends BrokerTestBase { + + final static Queue events = new ConcurrentLinkedQueue<>(); + volatile String topicNameToWatch; + String namespace; + + @DataProvider(name = "topicType") + public static Object[][] topicType() { + return new Object[][] { + {"persistent", "partitioned", true}, + {"persistent", "non-partitioned", true}, + {"non-persistent", "partitioned", true}, + {"non-persistent", "non-partitioned", true}, + {"persistent", "partitioned", false}, + {"persistent", "non-partitioned", false}, + {"non-persistent", "partitioned", false}, + {"non-persistent", "non-partitioned", false} + }; + } + + @DataProvider(name = "topicTypeNoDelete") + public static Object[][] topicTypeNoDelete() { + return new Object[][] { + {"persistent", "partitioned"}, + {"persistent", "non-partitioned"}, + {"non-persistent", "partitioned"}, + {"non-persistent", "non-partitioned"} + }; + } + + @BeforeClass + @Override + protected void setup() throws Exception { + super.baseSetup(); + pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true); + + pulsar.getBrokerService().addTopicEventListener((topic, event, stage, t) -> { + log.info("got event {}__{} for topic {}", event, stage, topic); + if (topic.equals(topicNameToWatch)) { + if (log.isDebugEnabled()) { + log.debug("got event {}__{} for topic {} with detailed stack", + event, stage, topic, new Exception("tracing event source")); + } + events.add(event.toString() + "__" + stage.toString()); + } + }); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @BeforeMethod + protected void setupTest() throws Exception { + namespace = "prop/" + UUID.randomUUID(); + admin.namespaces().createNamespace(namespace, Sets.newHashSet("test")); + assertTrue(admin.namespaces().getNamespaces("prop").contains(namespace)); + admin.namespaces().setRetention(namespace, new RetentionPolicies(3, 10)); + try (PulsarAdmin admin2 = createPulsarAdmin()) { + Awaitility.await().untilAsserted(() -> + assertEquals(admin2.namespaces().getRetention(namespace), new RetentionPolicies(3, 10))); + } + + events.clear(); + } + + @AfterMethod(alwaysRun = true) + protected void cleanupTest() throws Exception { + deleteNamespaceWithRetry(namespace, true); + } + + @Test(dataProvider = "topicType") + public void testEvents(String topicTypePersistence, String topicTypePartitioned, + boolean forceDelete) throws Exception { + String topicName = topicTypePersistence + "://" + namespace + "/" + "topic-" + UUID.randomUUID(); + + createTopicAndVerifyEvents(topicTypePartitioned, topicName); + + events.clear(); + if (topicTypePartitioned.equals("partitioned")) { + admin.topics().deletePartitionedTopic(topicName, forceDelete); + } else { + admin.topics().delete(topicName, forceDelete); + } + + Awaitility.waitAtMost(10, TimeUnit.SECONDS).untilAsserted(() -> + Assert.assertEquals(events.toArray(), new String[]{ + "DELETE__BEFORE", + "UNLOAD__BEFORE", + "UNLOAD__SUCCESS", + "DELETE__SUCCESS" + }) + ); + } + + @Test(dataProvider = "topicType") + public void testEventsWithUnload(String topicTypePersistence, String topicTypePartitioned, + boolean forceDelete) throws Exception { + String topicName = topicTypePersistence + "://" + namespace + "/" + "topic-" + UUID.randomUUID(); + + createTopicAndVerifyEvents(topicTypePartitioned, topicName); + + events.clear(); + admin.topics().unload(topicName); + + Awaitility.waitAtMost(10, TimeUnit.SECONDS).untilAsserted(() -> + Assert.assertEquals(events.toArray(), new String[]{ + "UNLOAD__BEFORE", + "UNLOAD__SUCCESS" + }) + ); + + events.clear(); + if (topicTypePartitioned.equals("partitioned")) { + admin.topics().deletePartitionedTopic(topicName, forceDelete); + } else { + admin.topics().delete(topicName, forceDelete); + } + + Awaitility.waitAtMost(10, TimeUnit.SECONDS).untilAsserted(() -> + Assert.assertEquals(events.toArray(), new String[]{ + "DELETE__BEFORE", + "DELETE__SUCCESS" + }) + ); + } + + @Test(dataProvider = "topicType") + public void testEventsActiveSub(String topicTypePersistence, String topicTypePartitioned, + boolean forceDelete) throws Exception { + String topicName = topicTypePersistence + "://" + namespace + "/" + "topic-" + UUID.randomUUID(); + + createTopicAndVerifyEvents(topicTypePartitioned, topicName); + + Consumer consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("sub").subscribe(); + Producer producer = pulsarClient.newProducer().topic(topicName).create(); + for (int i = 0; i < 10; i++) { + producer.send("hello".getBytes()); + } + consumer.receive(); + + events.clear(); + try { + if (topicTypePartitioned.equals("partitioned")) { + admin.topics().deletePartitionedTopic(topicName, forceDelete); + } else { + admin.topics().delete(topicName, forceDelete); + } + } catch (PulsarAdminException e) { + if (forceDelete) { + throw e; + } + assertTrue(e.getMessage().contains("Topic has active producers/subscriptions") + || e.getMessage().contains("connected producers/consumers")); + } + + final String[] expectedEvents; + + if (forceDelete) { + expectedEvents = new String[]{ + "DELETE__BEFORE", + "UNLOAD__BEFORE", + "UNLOAD__SUCCESS", + "DELETE__SUCCESS", + }; + } else { + expectedEvents = new String[]{ + "DELETE__BEFORE", + "DELETE__FAILURE" + }; + } + + Awaitility.waitAtMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + // only care about first 4 events max, the rest will be from client recreating deleted topic + String[] eventsToArray = (events.size() <= 4) + ? events.toArray(new String[0]) + : ArrayUtils.subarray(events.toArray(new String[0]), 0, 4); + Assert.assertEquals(eventsToArray, expectedEvents); + }); + + consumer.close(); + producer.close(); + } + + @Test(dataProvider = "topicTypeNoDelete") + public void testTopicAutoGC(String topicTypePersistence, String topicTypePartitioned) throws Exception { + String topicName = topicTypePersistence + "://" + namespace + "/" + "topic-" + UUID.randomUUID(); + + createTopicAndVerifyEvents(topicTypePartitioned, topicName); + + admin.namespaces().setInactiveTopicPolicies(namespace, + new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true)); + + // Remove retention + admin.namespaces().setRetention(namespace, new RetentionPolicies()); + try (PulsarAdmin admin2 = createPulsarAdmin()) { + Awaitility.await().untilAsserted(() -> + assertEquals(admin2.namespaces().getRetention(namespace), new RetentionPolicies())); + } + + events.clear(); + + runGC(); + + Awaitility.waitAtMost(10, TimeUnit.SECONDS).untilAsserted(() -> + Assert.assertEquals(events.toArray(), new String[]{ + "UNLOAD__BEFORE", + "UNLOAD__SUCCESS", + }) + ); + } + + private void createTopicAndVerifyEvents(String topicTypePartitioned, String topicName) throws Exception { + final String[] expectedEvents; + if (topicTypePartitioned.equals("partitioned")) { + topicNameToWatch = topicName + "-partition-1"; + admin.topics().createPartitionedTopic(topicName, 2); + triggerPartitionsCreation(topicName); + + expectedEvents = new String[]{ + "LOAD__BEFORE", + "CREATE__BEFORE", + "CREATE__SUCCESS", + "LOAD__SUCCESS" + }; + + } else { + topicNameToWatch = topicName; + admin.topics().createNonPartitionedTopic(topicName); + + expectedEvents = new String[]{ + "LOAD__BEFORE", + "LOAD__FAILURE", + "LOAD__BEFORE", + "CREATE__BEFORE", + "CREATE__SUCCESS", + "LOAD__SUCCESS" + }; + + } + + Awaitility.waitAtMost(10, TimeUnit.SECONDS).untilAsserted(() -> + Assert.assertEquals(events.toArray(), expectedEvents)); + } + + private PulsarAdmin createPulsarAdmin() throws PulsarClientException { + return PulsarAdmin.builder() + .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString()) + .build(); + } + + private void triggerPartitionsCreation(String topicName) throws Exception { + Producer producer = pulsarClient.newProducer() + .topic(topicName) + .create(); + producer.close(); + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java index 69db17f46932d..bbba414cabc09 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerTestBase.java @@ -58,7 +58,7 @@ void rolloverPerIntervalStats() { } } - void runGC() { + protected void runGC() { try { pulsar.getBrokerService().forEachTopic(topic -> { if (topic instanceof AbstractTopic) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index c461dd102d079..e401b314e2ecc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -123,7 +123,6 @@ import org.apache.pulsar.common.api.proto.CommandProducerSuccess; import org.apache.pulsar.common.api.proto.CommandSendError; import org.apache.pulsar.common.api.proto.CommandSendReceipt; -import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.CommandSuccess;