diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 11f00fb28e34b..96ea2004be8d7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -1363,8 +1363,8 @@ public static CompletableFuture> getMigratedClusterUrlAsync .getClusterPoliciesAsync(pulsar.getConfig().getClusterName()) .thenCombine(isNamespaceMigrationEnabledAsync(pulsar, topic), ((clusterData, isNamespaceMigrationEnabled) -> { - Optional url = ((clusterData.isPresent() && clusterData.get().isMigrated()) - || isNamespaceMigrationEnabled) + Optional url = (clusterData.isPresent() && (clusterData.get().isMigrated() + || isNamespaceMigrationEnabled)) ? Optional.ofNullable(clusterData.get().getMigratedClusterUrl()) : Optional.empty(); return url; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java index e56a3495600f0..e6a7d049366e4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java @@ -297,7 +297,7 @@ public void testClusterMigration() throws Exception { assertFalse(topic2.getProducers().isEmpty()); ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getWebServiceAddress(), pulsar2.getWebServiceAddressTls(), - pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls()); + pulsar2.getBrokerServiceUrl(), null); admin1.clusters().updateClusterMigration("r1", true, migratedUrl); assertEquals(admin1.clusters().getClusterMigration("r1").getMigratedClusterUrl(), migratedUrl); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 24163c631ffe9..35c41455e8987 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -710,9 +710,12 @@ protected void handleReachedEndOfTopic(CommandReachedEndOfTopic commandReachedEn @Override protected void handleTopicMigrated(CommandTopicMigrated commandTopicMigrated) { final long resourceId = commandTopicMigrated.getResourceId(); - final String serviceUrl = commandTopicMigrated.getBrokerServiceUrl(); - final String serviceUrlTls = commandTopicMigrated.getBrokerServiceUrlTls(); - + final String serviceUrl = commandTopicMigrated.hasBrokerServiceUrl() + ? commandTopicMigrated.getBrokerServiceUrl() + : null; + final String serviceUrlTls = commandTopicMigrated.hasBrokerServiceUrlTls() + ? commandTopicMigrated.getBrokerServiceUrlTls() + : null; HandlerState resource = commandTopicMigrated.getResourceType() == ResourceType.Producer ? producers.get(resourceId) : consumers.get(resourceId); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 8635368f00f0b..19aa9907549d9 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -88,6 +88,7 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.CommandTcClientConnectResponse; +import org.apache.pulsar.common.api.proto.CommandTopicMigrated; import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType; import org.apache.pulsar.common.api.proto.FeatureFlags; import org.apache.pulsar.common.api.proto.IntRange; @@ -768,11 +769,14 @@ public static ByteBuf newReachedEndOfTopic(long consumerId) { public static ByteBuf newTopicMigrated(ResourceType type, long resourceId, String brokerUrl, String brokerUrlTls) { BaseCommand cmd = localCmd(Type.TOPIC_MIGRATED); - cmd.setTopicMigrated() - .setResourceType(type) - .setResourceId(resourceId) - .setBrokerServiceUrl(brokerUrl) - .setBrokerServiceUrlTls(brokerUrlTls); + CommandTopicMigrated migratedCmd = cmd.setTopicMigrated(); + migratedCmd.setResourceType(type).setResourceId(resourceId); + if (StringUtils.isNotBlank(brokerUrl)) { + migratedCmd.setBrokerServiceUrl(brokerUrl); + } + if (StringUtils.isNotBlank(brokerUrlTls)) { + migratedCmd.setBrokerServiceUrlTls(brokerUrlTls); + } return serializeWithSize(cmd); }