Skip to content

Commit

Permalink
[fix][broker] Fix Broker migration NPE while broker tls url not confi…
Browse files Browse the repository at this point in the history
…gured (apache#23534)
  • Loading branch information
rdhabalia authored Nov 1, 2024
1 parent f196e2c commit 6bd0308
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1363,8 +1363,8 @@ public static CompletableFuture<Optional<ClusterUrl>> getMigratedClusterUrlAsync
.getClusterPoliciesAsync(pulsar.getConfig().getClusterName())
.thenCombine(isNamespaceMigrationEnabledAsync(pulsar, topic),
((clusterData, isNamespaceMigrationEnabled) -> {
Optional<ClusterUrl> url = ((clusterData.isPresent() && clusterData.get().isMigrated())
|| isNamespaceMigrationEnabled)
Optional<ClusterUrl> url = (clusterData.isPresent() && (clusterData.get().isMigrated()
|| isNamespaceMigrationEnabled))
? Optional.ofNullable(clusterData.get().getMigratedClusterUrl())
: Optional.empty();
return url;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ public void testClusterMigration() throws Exception {
assertFalse(topic2.getProducers().isEmpty());

ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getWebServiceAddress(), pulsar2.getWebServiceAddressTls(),
pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls());
pulsar2.getBrokerServiceUrl(), null);
admin1.clusters().updateClusterMigration("r1", true, migratedUrl);
assertEquals(admin1.clusters().getClusterMigration("r1").getMigratedClusterUrl(), migratedUrl);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,9 +710,12 @@ protected void handleReachedEndOfTopic(CommandReachedEndOfTopic commandReachedEn
@Override
protected void handleTopicMigrated(CommandTopicMigrated commandTopicMigrated) {
final long resourceId = commandTopicMigrated.getResourceId();
final String serviceUrl = commandTopicMigrated.getBrokerServiceUrl();
final String serviceUrlTls = commandTopicMigrated.getBrokerServiceUrlTls();

final String serviceUrl = commandTopicMigrated.hasBrokerServiceUrl()
? commandTopicMigrated.getBrokerServiceUrl()
: null;
final String serviceUrlTls = commandTopicMigrated.hasBrokerServiceUrlTls()
? commandTopicMigrated.getBrokerServiceUrlTls()
: null;
HandlerState resource = commandTopicMigrated.getResourceType() == ResourceType.Producer
? producers.get(resourceId)
: consumers.get(resourceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.CommandTcClientConnectResponse;
import org.apache.pulsar.common.api.proto.CommandTopicMigrated;
import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType;
import org.apache.pulsar.common.api.proto.FeatureFlags;
import org.apache.pulsar.common.api.proto.IntRange;
Expand Down Expand Up @@ -768,11 +769,14 @@ public static ByteBuf newReachedEndOfTopic(long consumerId) {

public static ByteBuf newTopicMigrated(ResourceType type, long resourceId, String brokerUrl, String brokerUrlTls) {
BaseCommand cmd = localCmd(Type.TOPIC_MIGRATED);
cmd.setTopicMigrated()
.setResourceType(type)
.setResourceId(resourceId)
.setBrokerServiceUrl(brokerUrl)
.setBrokerServiceUrlTls(brokerUrlTls);
CommandTopicMigrated migratedCmd = cmd.setTopicMigrated();
migratedCmd.setResourceType(type).setResourceId(resourceId);
if (StringUtils.isNotBlank(brokerUrl)) {
migratedCmd.setBrokerServiceUrl(brokerUrl);
}
if (StringUtils.isNotBlank(brokerUrlTls)) {
migratedCmd.setBrokerServiceUrlTls(brokerUrlTls);
}
return serializeWithSize(cmd);
}

Expand Down

0 comments on commit 6bd0308

Please sign in to comment.