Skip to content

Commit

Permalink
[fix][broker] Fix leader broker cannot be determined when the adverti…
Browse files Browse the repository at this point in the history
…sed address and advertised listeners are configured (apache#21894)
  • Loading branch information
lhotari authored Jan 19, 2024
1 parent c66167b commit 3158fd3
Show file tree
Hide file tree
Showing 50 changed files with 959 additions and 796 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private TransactionPendingAckStoreProvider transactionPendingAckStoreProvider;
private final ExecutorProvider transactionExecutorProvider;
private final DefaultMonotonicSnapshotClock monotonicSnapshotClock;
private String brokerId;

public enum State {
Init, Started, Closing, Closed
Expand Down Expand Up @@ -307,6 +308,7 @@ public PulsarService(ServiceConfiguration config,
// Validate correctness of configuration
PulsarConfigurationLoader.isComplete(config);
TransactionBatchedWriteValidator.validate(config);
this.config = config;

// validate `advertisedAddress`, `advertisedListeners`, `internalListenerName`
this.advertisedListeners = MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
Expand All @@ -317,7 +319,6 @@ public PulsarService(ServiceConfiguration config,
// use `internalListenerName` listener as `advertisedAddress`
this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getBindAddress());
this.brokerVersion = PulsarVersion.getVersion();
this.config = config;
this.processTerminator = processTerminator;
this.loadManagerExecutor = Executors
.newSingleThreadScheduledExecutor(new ExecutorProvider.ExtendedThreadFactory("pulsar-load-manager"));
Expand Down Expand Up @@ -828,6 +829,12 @@ public void start() throws PulsarServerException {
this.brokerServiceUrl = brokerUrl(config);
this.brokerServiceUrlTls = brokerUrlTls(config);

// the broker id is used in the load manager to identify the broker
this.brokerId =
String.format("%s:%s", advertisedAddress, config.getWebServicePortTls().isPresent()
? config.getWebServicePortTls().get()
: config.getWebServicePort().orElseThrow());

if (this.compactionServiceFactory == null) {
this.compactionServiceFactory = loadCompactionServiceFactory();
}
Expand Down Expand Up @@ -1099,7 +1106,7 @@ private void addWebSocketServiceHandler(WebService webService,
}

private void handleDeleteCluster(Notification notification) {
if (ClusterResources.pathRepresentsClusterName(notification.getPath())
if (isRunning() && ClusterResources.pathRepresentsClusterName(notification.getPath())
&& notification.getType() == NotificationType.Deleted) {
final String clusterName = ClusterResources.clusterNameFromPath(notification.getPath());
getBrokerService().closeAndRemoveReplicationClient(clusterName);
Expand Down Expand Up @@ -1137,7 +1144,8 @@ protected void startLeaderElectionService() {
LOG.info("The load manager extension is enabled. Skipping PulsarService LeaderElectionService.");
return;
}
this.leaderElectionService = new LeaderElectionService(coordinationService, getSafeWebServiceAddress(),
this.leaderElectionService =
new LeaderElectionService(coordinationService, getBrokerId(), getSafeWebServiceAddress(),
state -> {
if (state == LeaderElectionState.Leading) {
LOG.info("This broker was elected leader");
Expand Down Expand Up @@ -1185,7 +1193,7 @@ protected void startLeaderElectionService() {
protected void acquireSLANamespace() {
try {
// Namespace not created hence no need to unload it
NamespaceName nsName = NamespaceService.getSLAMonitorNamespace(getLookupServiceAddress(), config);
NamespaceName nsName = NamespaceService.getSLAMonitorNamespace(getBrokerId(), config);
if (!this.pulsarResources.getNamespaceResources().namespaceExists(nsName)) {
LOG.info("SLA Namespace = {} doesn't exist.", nsName);
return;
Expand Down Expand Up @@ -1694,10 +1702,9 @@ public String getSafeBrokerServiceUrl() {
return brokerServiceUrlTls != null ? brokerServiceUrlTls : brokerServiceUrl;
}

public String getLookupServiceAddress() {
return String.format("%s:%s", advertisedAddress, config.getWebServicePortTls().isPresent()
? config.getWebServicePortTls().get()
: config.getWebServicePort().orElseThrow());
public String getBrokerId() {
return Objects.requireNonNull(brokerId,
"brokerId is not initialized before start has been called");
}

public synchronized void addPrometheusRawMetricsProvider(PrometheusRawMetricsProvider metricsProvider) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ public void getLeaderBroker(@Suspended final AsyncResponse asyncResponse) {
validateSuperUserAccessAsync().thenAccept(__ -> {
LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader()
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Couldn't find leader broker"));
BrokerInfo brokerInfo = BrokerInfo.builder().serviceUrl(leaderBroker.getServiceUrl()).build();
BrokerInfo brokerInfo = BrokerInfo.builder()
.serviceUrl(leaderBroker.getServiceUrl())
.brokerId(leaderBroker.getBrokerId()).build();
LOG.info("[{}] Successfully to get the information of the leader broker.", clientAppId());
asyncResponse.resume(brokerInfo);
})
Expand All @@ -164,7 +166,7 @@ public void getOwnedNamespaces(@Suspended final AsyncResponse asyncResponse,
@PathParam("clusterName") String cluster,
@PathParam("broker-webserviceurl") String broker) {
validateSuperUserAccessAsync()
.thenAccept(__ -> validateBrokerName(broker))
.thenCompose(__ -> maybeRedirectToBroker(broker))
.thenCompose(__ -> validateClusterOwnershipAsync(cluster))
.thenCompose(__ -> pulsar().getNamespaceService().getOwnedNameSpacesStatusAsync())
.thenAccept(asyncResponse::resume)
Expand Down Expand Up @@ -396,10 +398,10 @@ private void checkDeadlockedThreads() {


private CompletableFuture<Void> internalRunHealthCheck(TopicVersion topicVersion) {
String lookupServiceAddress = pulsar().getLookupServiceAddress();
String brokerId = pulsar().getBrokerId();
NamespaceName namespaceName = (topicVersion == TopicVersion.V2)
? NamespaceService.getHeartbeatNamespaceV2(lookupServiceAddress, pulsar().getConfiguration())
: NamespaceService.getHeartbeatNamespace(lookupServiceAddress, pulsar().getConfiguration());
? NamespaceService.getHeartbeatNamespaceV2(brokerId, pulsar().getConfiguration())
: NamespaceService.getHeartbeatNamespace(brokerId, pulsar().getConfiguration());
final String topicName = String.format("persistent://%s/%s", namespaceName, HEALTH_CHECK_TOPIC_SUFFIX);
LOG.info("[{}] Running healthCheck with topic={}", clientAppId(), topicName);
final String messageStr = UUID.randomUUID().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -923,9 +923,9 @@ private CompletableFuture<Void> validateLeaderBrokerAsync() {
return FutureUtil.failedFuture(new RestException(Response.Status.PRECONDITION_FAILED, errorStr));
}
LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader().get();
String leaderBrokerUrl = leaderBroker.getServiceUrl();
String leaderBrokerId = leaderBroker.getBrokerId();
return pulsar().getNamespaceService()
.createLookupResult(leaderBrokerUrl, false, null)
.createLookupResult(leaderBrokerId, false, null)
.thenCompose(lookupResult -> {
String redirectUrl = isRequestHttps() ? lookupResult.getLookupData().getHttpUrlTls()
: lookupResult.getLookupData().getHttpUrl();
Expand All @@ -948,7 +948,7 @@ private CompletableFuture<Void> validateLeaderBrokerAsync() {
return FutureUtil.failedFuture((
new WebApplicationException(Response.temporaryRedirect(redirect).build())));
} catch (MalformedURLException exception) {
log.error("The leader broker url is malformed - {}", leaderBrokerUrl);
log.error("The redirect url is malformed - {}", redirectUrl);
return FutureUtil.failedFuture(new RestException(exception));
}
});
Expand Down Expand Up @@ -984,8 +984,11 @@ public CompletableFuture<Void> setNamespaceBundleAffinityAsync(String bundleRang
}

public CompletableFuture<Void> internalUnloadNamespaceBundleAsync(String bundleRange,
String destinationBroker,
String destinationBrokerParam,
boolean authoritative) {
String destinationBroker = StringUtils.isBlank(destinationBrokerParam) ? null :
// ensure backward compatibility: strip the possible http:// or https:// prefix
destinationBrokerParam.replaceFirst("http[s]?://", "");
return validateSuperUserAccessAsync()
.thenCompose(__ -> setNamespaceBundleAffinityAsync(bundleRange, destinationBroker))
.thenAccept(__ -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,23 @@
@AllArgsConstructor
@NoArgsConstructor
public class LeaderBroker {
private String brokerId;
private String serviceUrl;

public String getBrokerId() {
if (brokerId != null) {
return brokerId;
} else {
// for backward compatibility at runtime with older versions of Pulsar
return parseHostAndPort(serviceUrl);
}
}

private static String parseHostAndPort(String serviceUrl) {
int uriSeparatorPos = serviceUrl.indexOf("://");
if (uriSeparatorPos == -1) {
throw new IllegalArgumentException("'" + serviceUrl + "' isn't an URI.");
}
return serviceUrl.substring(uriSeparatorPos + 3);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,17 @@ public class LeaderElectionService implements AutoCloseable {
private final LeaderElection<LeaderBroker> leaderElection;
private final LeaderBroker localValue;

public LeaderElectionService(CoordinationService cs, String localWebServiceAddress,
Consumer<LeaderElectionState> listener) {
this(cs, localWebServiceAddress, ELECTION_ROOT, listener);
public LeaderElectionService(CoordinationService cs, String brokerId,
String serviceUrl, Consumer<LeaderElectionState> listener) {
this(cs, brokerId, serviceUrl, ELECTION_ROOT, listener);
}

public LeaderElectionService(CoordinationService cs,
String localWebServiceAddress,
String electionRoot,
String brokerId,
String serviceUrl, String electionRoot,
Consumer<LeaderElectionState> listener) {
this.leaderElection = cs.getLeaderElection(LeaderBroker.class, electionRoot, listener);
this.localValue = new LeaderBroker(localWebServiceAddress);
this.localValue = new LeaderBroker(brokerId, serviceUrl);
}

public void start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
public class NoopLoadManager implements LoadManager {

private PulsarService pulsar;
private String lookupServiceAddress;
private String brokerId;
private ResourceUnit localResourceUnit;
private LockManager<LocalBrokerData> lockManager;
private Map<String, String> bundleBrokerAffinityMap;
Expand All @@ -57,16 +57,15 @@ public void initialize(PulsarService pulsar) {

@Override
public void start() throws PulsarServerException {
lookupServiceAddress = pulsar.getLookupServiceAddress();
localResourceUnit = new SimpleResourceUnit(String.format("http://%s", lookupServiceAddress),
new PulsarResourceDescription());
brokerId = pulsar.getBrokerId();
localResourceUnit = new SimpleResourceUnit(brokerId, new PulsarResourceDescription());

LocalBrokerData localData = new LocalBrokerData(pulsar.getWebServiceAddress(),
pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls(), pulsar.getAdvertisedListeners());
localData.setProtocols(pulsar.getProtocolDataToAdvertise());
localData.setLoadManagerClassName(this.pulsar.getConfig().getLoadManagerClassName());
String brokerReportPath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
String brokerReportPath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + brokerId;

try {
log.info("Acquiring broker resource lock on {}", brokerReportPath);
Expand Down Expand Up @@ -129,12 +128,12 @@ public void disableBroker() throws Exception {

@Override
public Set<String> getAvailableBrokers() throws Exception {
return Collections.singleton(lookupServiceAddress);
return Collections.singleton(brokerId);
}

@Override
public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
return CompletableFuture.completedFuture(Collections.singleton(lookupServiceAddress));
return CompletableFuture.completedFuture(Collections.singleton(brokerId));
}

@Override
Expand All @@ -153,7 +152,6 @@ public String setNamespaceBundleAffinity(String bundle, String broker) {
if (StringUtils.isBlank(broker)) {
return this.bundleBrokerAffinityMap.remove(bundle);
}
broker = broker.replaceFirst("http[s]?://", "");
return this.bundleBrokerAffinityMap.put(bundle, broker);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
*/
public interface ResourceUnit extends Comparable<ResourceUnit> {

String PROPERTY_KEY_BROKER_ZNODE_NAME = "__advertised_addr";

String getResourceId();

ResourceDescription getAvailableResource();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public BrokerRegistryImpl(PulsarService pulsar) {
this.brokerLookupDataLockManager = pulsar.getCoordinationService().getLockManager(BrokerLookupData.class);
this.scheduler = pulsar.getLoadManagerExecutor();
this.listeners = new ArrayList<>();
this.brokerId = pulsar.getLookupServiceAddress();
this.brokerId = pulsar.getBrokerId();
this.brokerLookupData = new BrokerLookupData(
pulsar.getWebServiceAddress(),
pulsar.getWebServiceAddressTls(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,8 @@ public void start() throws PulsarServerException {
try {
this.brokerRegistry = new BrokerRegistryImpl(pulsar);
this.leaderElectionService = new LeaderElectionService(
pulsar.getCoordinationService(), pulsar.getSafeWebServiceAddress(), ELECTION_ROOT,
pulsar.getCoordinationService(), pulsar.getBrokerId(),
pulsar.getSafeWebServiceAddress(), ELECTION_ROOT,
state -> {
pulsar.getLoadManagerExecutor().execute(() -> {
if (state == LeaderElectionState.Leading) {
Expand All @@ -366,7 +367,7 @@ public void start() throws PulsarServerException {
this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
this.brokerRegistry.start();
this.splitManager = new SplitManager(splitCounter);
this.unloadManager = new UnloadManager(unloadCounter, pulsar.getLookupServiceAddress());
this.unloadManager = new UnloadManager(unloadCounter, pulsar.getBrokerId());
this.serviceUnitStateChannel.listen(unloadManager);
this.serviceUnitStateChannel.listen(splitManager);
this.leaderElectionService.start();
Expand Down Expand Up @@ -795,7 +796,7 @@ public static boolean isInternalTopic(String topic) {
@VisibleForTesting
void playLeader() {
log.info("This broker:{} is setting the role from {} to {}",
pulsar.getLookupServiceAddress(), role, Leader);
pulsar.getBrokerId(), role, Leader);
int retry = 0;
while (!Thread.currentThread().isInterrupted()) {
try {
Expand All @@ -812,7 +813,7 @@ void playLeader() {
break;
} catch (Throwable e) {
log.error("The broker:{} failed to set the role. Retrying {} th ...",
pulsar.getLookupServiceAddress(), ++retry, e);
pulsar.getBrokerId(), ++retry, e);
try {
Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
} catch (InterruptedException ex) {
Expand All @@ -823,7 +824,7 @@ void playLeader() {
}
}
role = Leader;
log.info("This broker:{} plays the leader now.", pulsar.getLookupServiceAddress());
log.info("This broker:{} plays the leader now.", pulsar.getBrokerId());

// flush the load data when the leader is elected.
brokerLoadDataReporter.reportAsync(true);
Expand All @@ -833,7 +834,7 @@ void playLeader() {
@VisibleForTesting
void playFollower() {
log.info("This broker:{} is setting the role from {} to {}",
pulsar.getLookupServiceAddress(), role, Follower);
pulsar.getBrokerId(), role, Follower);
int retry = 0;
while (!Thread.currentThread().isInterrupted()) {
try {
Expand All @@ -846,7 +847,7 @@ void playFollower() {
break;
} catch (Throwable e) {
log.error("The broker:{} failed to set the role. Retrying {} th ...",
pulsar.getLookupServiceAddress(), ++retry, e);
pulsar.getBrokerId(), ++retry, e);
try {
Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
} catch (InterruptedException ex) {
Expand All @@ -857,7 +858,7 @@ void playFollower() {
}
}
role = Follower;
log.info("This broker:{} plays a follower now.", pulsar.getLookupServiceAddress());
log.info("This broker:{} plays a follower now.", pulsar.getBrokerId());

// flush the load data when the leader is elected.
brokerLoadDataReporter.reportAsync(true);
Expand Down
Loading

0 comments on commit 3158fd3

Please sign in to comment.