diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 477a92395386c..f6fb4503b03ba 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -88,7 +88,6 @@ import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TableView; import org.apache.pulsar.common.naming.NamespaceBundle; @@ -1377,8 +1376,8 @@ private synchronized void doCleanup(String broker) { } try { - producer.flush(); - } catch (PulsarClientException e) { + producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, MILLISECONDS); + } catch (Exception e) { log.error("Failed to flush the in-flight non-system bundle override messages.", e); } @@ -1401,8 +1400,8 @@ private synchronized void doCleanup(String broker) { } try { - producer.flush(); - } catch (PulsarClientException e) { + producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, MILLISECONDS); + } catch (Exception e) { log.error("Failed to flush the in-flight system bundle override messages.", e); } @@ -1580,8 +1579,8 @@ protected void monitorOwnerships(List brokers) { } try { - producer.flush(); - } catch (PulsarClientException e) { + producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, MILLISECONDS); + } catch (Exception e) { log.error("Failed to flush the in-flight messages.", e); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 6844b44419ddd..187290566d456 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -795,6 +795,11 @@ public CompletableFuture unloadNamespaceBundle(NamespaceBundle bundle, } public CompletableFuture isNamespaceBundleOwned(NamespaceBundle bundle) { + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { + ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get()); + return extensibleLoadManager.getOwnershipAsync(Optional.empty(), bundle) + .thenApply(Optional::isPresent); + } return pulsar.getLocalMetadataStore().exists(ServiceUnitUtils.path(bundle)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 8af654633b415..22ee11d8eaf32 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -580,7 +580,8 @@ public CompletableFuture stopReplProducers() { @Override public CompletableFuture checkReplication() { TopicName name = TopicName.get(topic); - if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) { + if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name) + || ExtensibleLoadManagerImpl.isInternalTopic(topic)) { return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index f363132f94496..bd53191a261a9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1817,7 +1817,8 @@ CompletableFuture checkPersistencePolicies() { @Override public CompletableFuture checkReplication() { TopicName name = TopicName.get(topic); - if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) { + if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name) + || ExtensibleLoadManagerImpl.isInternalTopic(topic)) { return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java index 479c1e616e340..514e0207fbfb1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java @@ -27,6 +27,8 @@ import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; +import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.ConsumerImpl; @@ -38,6 +40,8 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Factory; import org.testng.annotations.Test; import java.lang.reflect.Method; @@ -52,6 +56,18 @@ public class ReplicatorGlobalNSTest extends ReplicatorTestBase { protected String methodName; + @DataProvider(name = "loadManagerClassName") + public static Object[][] loadManagerClassName() { + return new Object[][]{ + {ModularLoadManagerImpl.class.getName()}, + {ExtensibleLoadManagerImpl.class.getName()} + }; + } + + @Factory(dataProvider = "loadManagerClassName") + public ReplicatorGlobalNSTest(String loadManagerClassName) { + this.loadManagerClassName = loadManagerClassName; + } @BeforeMethod public void beforeMethod(Method m) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java index 39cd13fbba5f5..d87f896e31a1c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java @@ -119,6 +119,11 @@ public abstract class ReplicatorTestBase extends TestRetrySupport { protected final String cluster2 = "r2"; protected final String cluster3 = "r3"; protected final String cluster4 = "r4"; + protected String loadManagerClassName; + + protected String getLoadManagerClassName() { + return loadManagerClassName; + } // Default frequency public int getBrokerServicePurgeInactiveFrequency() { @@ -271,8 +276,9 @@ protected void setup() throws Exception { .brokerClientTlsTrustStoreType(keyStoreType) .build()); - admin1.tenants().createTenant("pulsar", - new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet("r1", "r2", "r3"))); + updateTenantInfo("pulsar", + new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"), + Sets.newHashSet("r1", "r2", "r3"))); admin1.namespaces().createNamespace("pulsar/ns", Sets.newHashSet("r1", "r2", "r3")); admin1.namespaces().createNamespace("pulsar/ns1", Sets.newHashSet("r1", "r2")); @@ -344,6 +350,7 @@ private void setConfigDefaults(ServiceConfiguration config, String clusterName, config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED); config.setEnableReplicatedSubscriptions(true); config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000); + config.setLoadManagerClassName(getLoadManagerClassName()); } public void resetConfig1() { @@ -436,6 +443,14 @@ protected void cleanup() throws Exception { resetConfig4(); } + protected void updateTenantInfo(String tenant, TenantInfoImpl tenantInfo) throws Exception { + if (!admin1.tenants().getTenants().contains(tenant)) { + admin1.tenants().createTenant(tenant, tenantInfo); + } else { + admin1.tenants().updateTenant(tenant, tenantInfo); + } + } + static class MessageProducer implements AutoCloseable { URL url; String namespace;