Skip to content

Commit

Permalink
[fix][broker][branch-3.0] Set ServiceUnitStateChannel topic compactio…
Browse files Browse the repository at this point in the history
…n threshold explicitly, improve getOwnerAsync, and fix other bugs (#22064) (#22154)

(cherry picked from commit 6df0265)
  • Loading branch information
heesung-sn committed Feb 29, 2024
1 parent 71022f5 commit 6d2ce89
Showing 10 changed files with 325 additions and 182 deletions.
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@
import static java.lang.String.format;
import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Follower;
import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Leader;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.TOPIC;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Admin;
import static org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.getNamespaceBundle;
@@ -117,6 +118,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {

private static final long MONITOR_INTERVAL_IN_MILLIS = 120_000;

public static final long COMPACTION_THRESHOLD = 5 * 1024 * 1024;

private static final String ELECTION_ROOT = "/loadbalance/extension/leader";

private PulsarService pulsar;
@@ -173,6 +176,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {

private volatile boolean started = false;

private boolean configuredSystemTopics = false;

private final AssignCounter assignCounter = new AssignCounter();
@Getter
private final UnloadCounter unloadCounter = new UnloadCounter();
@@ -262,6 +267,10 @@ public static boolean isLoadManagerExtensionEnabled(ServiceConfiguration conf) {
return ExtensibleLoadManagerImpl.class.getName().equals(conf.getLoadManagerClassName());
}

public static boolean isLoadManagerExtensionEnabled(PulsarService pulsar) {
return pulsar.getLoadManager().get() instanceof ExtensibleLoadManagerWrapper;
}

public static ExtensibleLoadManagerImpl get(LoadManager loadManager) {
if (!(loadManager instanceof ExtensibleLoadManagerWrapper loadManagerWrapper)) {
throw new IllegalArgumentException("The load manager should be 'ExtensibleLoadManagerWrapper'.");
@@ -291,6 +300,27 @@ private static void createSystemTopics(PulsarService pulsar) throws PulsarServer
createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
}

private static boolean configureSystemTopics(PulsarService pulsar) {
try {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)
&& (pulsar.getConfiguration().isSystemTopicEnabled()
&& pulsar.getConfiguration().isTopicLevelPoliciesEnabled())) {
Long threshold = pulsar.getAdminClient().topicPolicies().getCompactionThreshold(TOPIC);
if (threshold == null || COMPACTION_THRESHOLD != threshold.longValue()) {
pulsar.getAdminClient().topicPolicies().setCompactionThreshold(TOPIC, COMPACTION_THRESHOLD);
log.info("Set compaction threshold: {} bytes for system topic {}.", COMPACTION_THRESHOLD, TOPIC);
}
} else {
log.warn("System topic or topic level policies is disabled. "
+ "{} compaction threshold follows the broker or namespace policies.", TOPIC);
}
return true;
} catch (Exception e) {
log.error("Failed to set compaction threshold for system topic:{}", TOPIC, e);
}
return false;
}

@Override
public void start() throws PulsarServerException {
if (this.started) {
@@ -329,9 +359,9 @@ public void start() throws PulsarServerException {

try {
this.brokerLoadDataStore = LoadDataStoreFactory
.create(pulsar.getClient(), BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class);
.create(pulsar, BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class);
this.topBundlesLoadDataStore = LoadDataStoreFactory
.create(pulsar.getClient(), TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class);
.create(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class);
} catch (LoadDataStoreException e) {
throw new PulsarServerException(e);
}
@@ -389,6 +419,7 @@ public void start() throws PulsarServerException {
this.splitScheduler.start();
this.initWaiter.countDown();
this.started = true;
log.info("Started load manager.");
} catch (Exception ex) {
log.error("Failed to start the extensible load balance and close broker registry {}.",
this.brokerRegistry, ex);
@@ -523,7 +554,7 @@ private CompletableFuture<Optional<BrokerLookupData>> dedupeLookupRequest(
if (ex != null) {
assignCounter.incrementFailure();
}
lookupRequests.remove(key, newFutureCreated.getValue());
lookupRequests.remove(key);
});
}
}
@@ -736,13 +767,13 @@ public void close() throws PulsarServerException {
}

public static boolean isInternalTopic(String topic) {
return topic.startsWith(ServiceUnitStateChannelImpl.TOPIC)
return topic.startsWith(TOPIC)
|| topic.startsWith(BROKER_LOAD_DATA_STORE_TOPIC)
|| topic.startsWith(TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
}

@VisibleForTesting
void playLeader() {
synchronized void playLeader() {
log.info("This broker:{} is setting the role from {} to {}",
pulsar.getBrokerId(), role, Leader);
int retry = 0;
@@ -760,7 +791,7 @@ void playLeader() {
serviceUnitStateChannel.scheduleOwnershipMonitor();
break;
} catch (Throwable e) {
log.error("The broker:{} failed to set the role. Retrying {} th ...",
log.warn("The broker:{} failed to set the role. Retrying {} th ...",
pulsar.getBrokerId(), ++retry, e);
try {
Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
@@ -780,7 +811,7 @@ void playLeader() {
}

@VisibleForTesting
void playFollower() {
synchronized void playFollower() {
log.info("This broker:{} is setting the role from {} to {}",
pulsar.getBrokerId(), role, Follower);
int retry = 0;
@@ -794,7 +825,7 @@ void playFollower() {
topBundlesLoadDataStore.startProducer();
break;
} catch (Throwable e) {
log.error("The broker:{} failed to set the role. Retrying {} th ...",
log.warn("The broker:{} failed to set the role. Retrying {} th ...",
pulsar.getBrokerId(), ++retry, e);
try {
Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
@@ -834,14 +865,21 @@ public List<Metrics> getMetrics() {
return metricsCollection;
}

private void monitor() {

@VisibleForTesting
protected void monitor() {
try {
initWaiter.await();

// Monitor role
// Periodically check the role in case ZK watcher fails.
var isChannelOwner = serviceUnitStateChannel.isChannelOwner();
if (isChannelOwner) {
// System topic config might fail due to the race condition
// with topic policy init(Topic policies cache have not init).
if (!configuredSystemTopics) {
configuredSystemTopics = configureSystemTopics(pulsar);
}
if (role != Leader) {
log.warn("Current role:{} does not match with the channel ownership:{}. "
+ "Playing the leader role.", role, isChannelOwner);
Original file line number Diff line number Diff line change
@@ -492,21 +492,28 @@ private CompletableFuture<Optional<String>> getActiveOwnerAsync(
String serviceUnit,
ServiceUnitState state,
Optional<String> owner) {
CompletableFuture<Optional<String>> activeOwner = owner.isPresent()
? brokerRegistry.lookupAsync(owner.get()).thenApply(lookupData -> lookupData.flatMap(__ -> owner))
: CompletableFuture.completedFuture(Optional.empty());

return activeOwner
.thenCompose(broker -> broker
.map(__ -> activeOwner)
.orElseGet(() -> deferGetOwnerRequest(serviceUnit).thenApply(Optional::ofNullable)))
.whenComplete((__, e) -> {
return deferGetOwnerRequest(serviceUnit)
.thenCompose(newOwner -> {
if (newOwner == null) {
return CompletableFuture.completedFuture(null);
}

return brokerRegistry.lookupAsync(newOwner)
.thenApply(lookupData -> {
if (lookupData.isPresent()) {
return newOwner;
} else {
throw new IllegalStateException(
"The new owner " + newOwner + " is inactive.");
}
});
}).whenComplete((__, e) -> {
if (e != null) {
log.error("Failed to get active owner broker. serviceUnit:{}, state:{}, owner:{}",
serviceUnit, state, owner, e);
log.error("{} failed to get active owner broker. serviceUnit:{}, state:{}, owner:{}",
brokerId, serviceUnit, state, owner, e);
ownerLookUpCounters.get(state).getFailure().incrementAndGet();
}
});
}).thenApply(Optional::ofNullable);
}

public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
@@ -544,6 +551,25 @@ public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
}
}

private Optional<String> getOwner(String serviceUnit) {
ServiceUnitStateData data = tableview.get(serviceUnit);
ServiceUnitState state = state(data);
switch (state) {
case Owned -> {
return Optional.of(data.dstBroker());
}
case Splitting -> {
return Optional.of(data.sourceBroker());
}
case Init, Free -> {
return Optional.empty();
}
default -> {
return null;
}
}
}

private long getNextVersionId(String serviceUnit) {
var data = tableview.get(serviceUnit);
return getNextVersionId(data);
@@ -697,7 +723,7 @@ private AtomicLong getHandlerCounter(ServiceUnitStateData data, boolean total) {

private void log(Throwable e, String serviceUnit, ServiceUnitStateData data, ServiceUnitStateData next) {
if (e == null) {
if (log.isDebugEnabled() || isTransferCommand(data)) {
if (debug() || isTransferCommand(data)) {
long handlerTotalCount = getHandlerTotalCounter(data).get();
long handlerFailureCount = getHandlerFailureCounter(data).get();
log.info("{} handled {} event for serviceUnit:{}, cur:{}, next:{}, "
@@ -736,6 +762,9 @@ private void handleSkippedEvent(String serviceUnit) {
private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) {
var getOwnerRequest = getOwnerRequests.remove(serviceUnit);
if (getOwnerRequest != null) {
if (debug()) {
log.info("Returned owner request for serviceUnit:{}", serviceUnit);
}
getOwnerRequest.complete(data.dstBroker());
}
stateChangeListeners.notify(serviceUnit, data, null);
@@ -848,26 +877,52 @@ private boolean isTargetBroker(String broker) {
}

private CompletableFuture<String> deferGetOwnerRequest(String serviceUnit) {

var requested = new MutableObject<CompletableFuture<String>>();
try {
return getOwnerRequests
.computeIfAbsent(serviceUnit, k -> {
CompletableFuture<String> future = new CompletableFuture<>();
var ownerBefore = getOwner(serviceUnit);
if (ownerBefore != null && ownerBefore.isPresent()) {
// Here, we do a quick active check first with the computeIfAbsent lock
brokerRegistry.lookupAsync(ownerBefore.get()).getNow(Optional.empty())
.ifPresent(__ -> requested.setValue(
CompletableFuture.completedFuture(ownerBefore.get())));

if (requested.getValue() != null) {
return requested.getValue();
}
}


CompletableFuture<String> future =
new CompletableFuture<String>().orTimeout(inFlightStateWaitingTimeInMillis,
TimeUnit.MILLISECONDS)
.exceptionally(e -> {
var ownerAfter = getOwner(serviceUnit);
log.warn("{} failed to wait for owner for serviceUnit:{}; Trying to "
+ "return the current owner:{}",
brokerId, serviceUnit, ownerAfter, e);
if (ownerAfter == null) {
throw new IllegalStateException(e);
}
return ownerAfter.orElse(null);
});
if (debug()) {
log.info("{} is waiting for owner for serviceUnit:{}", brokerId, serviceUnit);
}
requested.setValue(future);
return future;
});
} finally {
var future = requested.getValue();
if (future != null) {
future.orTimeout(inFlightStateWaitingTimeInMillis + 5 * 1000, TimeUnit.MILLISECONDS)
.whenComplete((v, e) -> {
if (e != null) {
getOwnerRequests.remove(serviceUnit, future);
log.warn("Failed to getOwner for serviceUnit:{}",
serviceUnit, e);
}
}
);
future.whenComplete((__, e) -> {
getOwnerRequests.remove(serviceUnit);
if (e != null) {
log.warn("{} failed to getOwner for serviceUnit:{}", brokerId, serviceUnit, e);
}
});
}
}
}
Original file line number Diff line number Diff line change
@@ -18,15 +18,16 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions.store;

import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.broker.PulsarService;

/**
* The load data store factory, use to create the load data store.
*/
public class LoadDataStoreFactory {

public static <T> LoadDataStore<T> create(PulsarClient client, String name, Class<T> clazz)
public static <T> LoadDataStore<T> create(PulsarService pulsar, String name,
Class<T> clazz)
throws LoadDataStoreException {
return new TableViewLoadDataStoreImpl<>(client, name, clazz);
return new TableViewLoadDataStoreImpl<>(pulsar, name, clazz);
}
}
Loading

0 comments on commit 6d2ce89

Please sign in to comment.