From 2049c72417c9cc06ce0fd58e1ed2a4e94abebe02 Mon Sep 17 00:00:00 2001 From: Heesung Sohn <103456639+heesung-sn@users.noreply.github.com> Date: Tue, 16 Apr 2024 00:34:59 -0700 Subject: [PATCH] [fix][broker] Fix Replicated Topic unload bug when ExtensibleLoadManager is enabled (#22496) (cherry picked from commit 203f305bf449dd335b39501177f210cfcb73d5fa) (cherry picked from commit f467f37a75cae11841161a5a7c6e4be5671100fd) (cherry picked from commit 2557db63d48a6534951d9a988c1d7afb5fa64dce) --- .../channel/ServiceUnitStateChannelImpl.java | 13 ++++++------- .../pulsar/broker/namespace/NamespaceService.java | 5 +++++ .../service/nonpersistent/NonPersistentTopic.java | 4 +++- .../broker/service/persistent/PersistentTopic.java | 3 ++- .../broker/service/ReplicatorGlobalNSTest.java | 7 +++---- 5 files changed, 19 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index f66ed2a5c9062..7cd0e22670030 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -88,7 +88,6 @@ import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TableView; import org.apache.pulsar.common.naming.NamespaceBundle; @@ -1338,8 +1337,8 @@ private synchronized void doCleanup(String broker) { } try { - producer.flush(); - } catch (PulsarClientException e) { + producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, MILLISECONDS); + } catch (Exception e) { log.error("Failed to flush the in-flight non-system bundle override messages.", e); } @@ -1362,8 +1361,8 @@ private synchronized void doCleanup(String broker) { } try { - producer.flush(); - } catch (PulsarClientException e) { + producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, MILLISECONDS); + } catch (Exception e) { log.error("Failed to flush the in-flight system bundle override messages.", e); } @@ -1541,8 +1540,8 @@ protected void monitorOwnerships(List brokers) { } try { - producer.flush(); - } catch (PulsarClientException e) { + producer.flushAsync().get(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS, MILLISECONDS); + } catch (Exception e) { log.error("Failed to flush the in-flight messages.", e); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index c62e9c52a768e..30e84570ce455 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -793,6 +793,11 @@ public CompletableFuture unloadNamespaceBundle(NamespaceBundle bundle, } public CompletableFuture isNamespaceBundleOwned(NamespaceBundle bundle) { + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { + ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get()); + return extensibleLoadManager.getOwnershipAsync(Optional.empty(), bundle) + .thenApply(Optional::isPresent); + } return pulsar.getLocalMetadataStore().exists(ServiceUnitUtils.path(bundle)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 23d938d22fd4b..01aec2cbb8653 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -39,6 +39,7 @@ import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.resources.NamespaceResources; import org.apache.pulsar.broker.service.AbstractReplicator; @@ -550,7 +551,8 @@ public CompletableFuture stopReplProducers() { @Override public CompletableFuture checkReplication() { TopicName name = TopicName.get(topic); - if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) { + if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name) + || ExtensibleLoadManagerImpl.isInternalTopic(topic)) { return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index f2b723c38cbca..28fa026e3f54a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1750,7 +1750,8 @@ CompletableFuture checkPersistencePolicies() { @Override public CompletableFuture checkReplication() { TopicName name = TopicName.get(topic); - if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) { + if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name) + || ExtensibleLoadManagerImpl.isInternalTopic(topic)) { return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java index 057126e6981d4..c09809d4ad873 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java @@ -20,14 +20,16 @@ import static org.testng.Assert.fail; import com.google.common.collect.Sets; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import lombok.Cleanup; -import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.ConsumerImpl; @@ -40,9 +42,6 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.lang.reflect.Method; -import java.util.concurrent.TimeUnit; - /** * The tests in this class should be denied in a production pulsar cluster. they are very dangerous, which leads to * a lot of topic deletion and makes namespace policies being incorrect.