Skip to content

Commit

Permalink
[fix][broker] Fix the broker registery cannot recover from the metada…
Browse files Browse the repository at this point in the history
…ta node deletion (apache#23359)
  • Loading branch information
BewareMyPower authored Sep 27, 2024
1 parent e91574a commit 95bd1d1
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@
import java.util.function.BiConsumer;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.NotificationType;

/**
* Responsible for registering the current Broker lookup info to
* the distributed store (e.g. Zookeeper) for broker discovery.
*/
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Unstable
public interface BrokerRegistry extends AutoCloseable {

/**
Expand All @@ -47,7 +51,7 @@ public interface BrokerRegistry extends AutoCloseable {
/**
* Register local broker to metadata store.
*/
void register() throws MetadataStoreException;
CompletableFuture<Void> registerAsync();

/**
* Unregister the broker.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarServerException;
Expand Down Expand Up @@ -69,17 +70,26 @@ protected enum State {
Init,
Started,
Registered,
Unregistering,
Closed
}

private State state;
@VisibleForTesting
final AtomicReference<State> state = new AtomicReference<>(State.Init);

public BrokerRegistryImpl(PulsarService pulsar) {
this.pulsar = pulsar;
this.conf = pulsar.getConfiguration();
this.brokerLookupDataMetadataCache = pulsar.getLocalMetadataStore().getMetadataCache(BrokerLookupData.class);
this.scheduler = pulsar.getLoadManagerExecutor();
this.listeners = new ArrayList<>();
// The registered node is an ephemeral node that could be deleted when the metadata store client's session
// is expired. In this case, we should register again.
this.listeners.add((broker, notificationType) -> {
if (notificationType == NotificationType.Deleted && getBrokerId().equals(broker)) {
registerAsync();
}
});
this.brokerIdKeyPath = keyPath(pulsar.getBrokerId());
this.brokerLookupData = new BrokerLookupData(
pulsar.getWebServiceAddress(),
Expand All @@ -94,44 +104,45 @@ public BrokerRegistryImpl(PulsarService pulsar) {
System.currentTimeMillis(),
pulsar.getBrokerVersion(),
pulsar.getConfig().lookupProperties());
this.state = State.Init;
}

@Override
public synchronized void start() throws PulsarServerException {
if (this.state != State.Init) {
return;
if (!this.state.compareAndSet(State.Init, State.Started)) {
throw new PulsarServerException("Cannot start the broker registry in state " + state.get());
}
pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification);
try {
this.state = State.Started;
this.register();
} catch (MetadataStoreException e) {
throw new PulsarServerException(e);
this.registerAsync().get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
throw PulsarServerException.from(e);
}
}

@Override
public boolean isStarted() {
return this.state == State.Started || this.state == State.Registered;
final var state = this.state.get();
return state == State.Started || state == State.Registered;
}

@Override
public synchronized void register() throws MetadataStoreException {
if (this.state == State.Started) {
try {
brokerLookupDataMetadataCache.put(brokerIdKeyPath, brokerLookupData, EnumSet.of(CreateOption.Ephemeral))
.get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
this.state = State.Registered;
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw MetadataStoreException.unwrap(e);
}
public CompletableFuture<Void> registerAsync() {
final var state = this.state.get();
if (state != State.Started && state != State.Registered) {
log.info("[{}] Skip registering self because the state is {}", getBrokerId(), state);
return CompletableFuture.completedFuture(null);
}
log.info("[{}] Started registering self to {} (state: {})", getBrokerId(), brokerIdKeyPath, state);
return brokerLookupDataMetadataCache.put(brokerIdKeyPath, brokerLookupData, EnumSet.of(CreateOption.Ephemeral))
.thenAccept(__ -> {
this.state.set(State.Registered);
log.info("[{}] Finished registering self", getBrokerId());
});
}

@Override
public synchronized void unregister() throws MetadataStoreException {
if (this.state == State.Registered) {
if (state.compareAndSet(State.Registered, State.Unregistering)) {
try {
brokerLookupDataMetadataCache.delete(brokerIdKeyPath)
.get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
Expand All @@ -144,7 +155,7 @@ public synchronized void unregister() throws MetadataStoreException {
} catch (InterruptedException | TimeoutException e) {
throw MetadataStoreException.unwrap(e);
} finally {
this.state = State.Started;
state.set(State.Started);
}
}
}
Expand Down Expand Up @@ -191,7 +202,7 @@ public synchronized void addListener(BiConsumer<String, NotificationType> listen

@Override
public synchronized void close() throws PulsarServerException {
if (this.state == State.Closed) {
if (this.state.get() == State.Closed) {
return;
}
try {
Expand All @@ -200,7 +211,7 @@ public synchronized void close() throws PulsarServerException {
} catch (Exception ex) {
log.error("Unexpected error when unregistering the broker registry", ex);
} finally {
this.state = State.Closed;
this.state.set(State.Closed);
}
}

Expand Down Expand Up @@ -238,7 +249,7 @@ protected static String keyPath(String brokerId) {
}

private void checkState() throws IllegalStateException {
if (this.state == State.Closed) {
if (this.state.get() == State.Closed) {
throw new IllegalStateException("The registry already closed.");
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.extensions;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.util.PortManager;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker")
public class BrokerRegistryIntegrationTest {

private static final String clusterName = "test";
private final int zkPort = PortManager.nextFreePort();
private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, zkPort, PortManager::nextFreePort);
private PulsarService pulsar;
private BrokerRegistry brokerRegistry;
private String brokerMetadataPath;

@BeforeClass
protected void setup() throws Exception {
bk.start();
pulsar = new PulsarService(brokerConfig());
pulsar.start();
final var admin = pulsar.getAdminClient();
admin.clusters().createCluster(clusterName, ClusterData.builder().build());
admin.tenants().createTenant("public", TenantInfo.builder()
.allowedClusters(Collections.singleton(clusterName)).build());
admin.namespaces().createNamespace("public/default");
brokerRegistry = ((ExtensibleLoadManagerWrapper) pulsar.getLoadManager().get()).get().getBrokerRegistry();
brokerMetadataPath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + pulsar.getBrokerId();
}

@AfterClass(alwaysRun = true)
protected void cleanup() throws Exception {
if (pulsar != null) {
pulsar.close();
}
bk.stop();
}

@Test
public void testRecoverFromNodeDeletion() throws Exception {
// Simulate the case that the node was somehow deleted (e.g. by session timeout)
Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> Assert.assertEquals(
brokerRegistry.getAvailableBrokersAsync().join(), List.of(pulsar.getBrokerId())));
pulsar.getLocalMetadataStore().delete(brokerMetadataPath, Optional.empty());
Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> Assert.assertEquals(
brokerRegistry.getAvailableBrokersAsync().join(), List.of(pulsar.getBrokerId())));

// If the node is deleted by unregister(), it should not recreate the path
brokerRegistry.unregister();
Thread.sleep(3000);
Assert.assertTrue(brokerRegistry.getAvailableBrokersAsync().get().isEmpty());

// Restore the normal state
brokerRegistry.registerAsync().get();
Assert.assertEquals(brokerRegistry.getAvailableBrokersAsync().get(), List.of(pulsar.getBrokerId()));
}

@Test
public void testRegisterAgain() throws Exception {
Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> Assert.assertEquals(
brokerRegistry.getAvailableBrokersAsync().join(), List.of(pulsar.getBrokerId())));
final var metadataStore = pulsar.getLocalMetadataStore();
final var oldResult = metadataStore.get(brokerMetadataPath).get().orElseThrow();
log.info("Old result: {} {}", new String(oldResult.getValue()), oldResult.getStat().getVersion());
brokerRegistry.registerAsync().get();

Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> {
final var newResult = metadataStore.get(brokerMetadataPath).get().orElseThrow();
log.info("New result: {} {}", new String(newResult.getValue()), newResult.getStat().getVersion());
Assert.assertTrue(newResult.getStat().getVersion() > oldResult.getStat().getVersion());
Assert.assertEquals(newResult.getValue(), oldResult.getValue());
});
}

private ServiceConfiguration brokerConfig() {
final var config = new ServiceConfiguration();
config.setClusterName(clusterName);
config.setAdvertisedAddress("localhost");
config.setBrokerServicePort(Optional.of(0));
config.setWebServicePort(Optional.of(0));
config.setMetadataStoreUrl("zk:127.0.0.1:" + bk.getZookeeperPort());
config.setManagedLedgerDefaultWriteQuorum(1);
config.setManagedLedgerDefaultAckQuorum(1);
config.setManagedLedgerDefaultEnsembleSize(1);
config.setDefaultNumberOfNamespaceBundles(16);
config.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
config.setLoadBalancerDebugModeEnabled(true);
config.setBrokerShutdownTimeoutMs(100);
return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ public void testCloseRegister() throws Exception {
assertEquals(getState(brokerRegistry), BrokerRegistryImpl.State.Started);

// Check state after re-register.
brokerRegistry.register();
brokerRegistry.registerAsync().get();
assertEquals(getState(brokerRegistry), BrokerRegistryImpl.State.Registered);

// Check state after close.
Expand Down Expand Up @@ -396,8 +396,8 @@ public void testKeyPath() {
assertEquals(keyPath, LOADBALANCE_BROKERS_ROOT + "/brokerId");
}

public BrokerRegistryImpl.State getState(BrokerRegistryImpl brokerRegistry) {
return WhiteboxImpl.getInternalState(brokerRegistry, BrokerRegistryImpl.State.class);
private static BrokerRegistryImpl.State getState(BrokerRegistryImpl brokerRegistry) {
return brokerRegistry.state.get();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -1211,7 +1211,7 @@ public void testDeployAndRollbackLoadManager() throws Exception {
producer.send("t1");

// Test re-register broker and check the lookup result
loadManager4.getBrokerRegistry().register();
loadManager4.getBrokerRegistry().registerAsync().get();

result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
assertNotNull(result);
Expand Down Expand Up @@ -1423,7 +1423,7 @@ public void testLoadBalancerServiceUnitTableViewSyncer() throws Exception {
producer.send("t1");

// Test re-register broker and check the lookup result
loadManager4.getBrokerRegistry().register();
loadManager4.getBrokerRegistry().registerAsync().get();

result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic);
assertNotNull(result);
Expand Down

0 comments on commit 95bd1d1

Please sign in to comment.