Skip to content

Commit

Permalink
[fix][broker][branch-3.0] Set ServiceUnitStateChannel topic compactio…
Browse files Browse the repository at this point in the history
…n threshold explicitly, improve getOwnerAsync, and fix other bugs (apache#22064) (apache#22154)

(cherry picked from commit 6df0265)
(cherry picked from commit 6d2ce89)
  • Loading branch information
heesung-sn authored and mukesh-ctds committed Mar 1, 2024
1 parent 1461d3d commit 7f6d874
Show file tree
Hide file tree
Showing 10 changed files with 325 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static java.lang.String.format;
import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Follower;
import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Leader;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.TOPIC;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Admin;
import static org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.getNamespaceBundle;
Expand Down Expand Up @@ -117,6 +118,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {

private static final long MONITOR_INTERVAL_IN_MILLIS = 120_000;

public static final long COMPACTION_THRESHOLD = 5 * 1024 * 1024;

private static final String ELECTION_ROOT = "/loadbalance/extension/leader";

private PulsarService pulsar;
Expand Down Expand Up @@ -173,6 +176,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {

private volatile boolean started = false;

private boolean configuredSystemTopics = false;

private final AssignCounter assignCounter = new AssignCounter();
@Getter
private final UnloadCounter unloadCounter = new UnloadCounter();
Expand Down Expand Up @@ -262,6 +267,10 @@ public static boolean isLoadManagerExtensionEnabled(ServiceConfiguration conf) {
return ExtensibleLoadManagerImpl.class.getName().equals(conf.getLoadManagerClassName());
}

public static boolean isLoadManagerExtensionEnabled(PulsarService pulsar) {
return pulsar.getLoadManager().get() instanceof ExtensibleLoadManagerWrapper;
}

public static ExtensibleLoadManagerImpl get(LoadManager loadManager) {
if (!(loadManager instanceof ExtensibleLoadManagerWrapper loadManagerWrapper)) {
throw new IllegalArgumentException("The load manager should be 'ExtensibleLoadManagerWrapper'.");
Expand Down Expand Up @@ -291,6 +300,27 @@ private static void createSystemTopics(PulsarService pulsar) throws PulsarServer
createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
}

private static boolean configureSystemTopics(PulsarService pulsar) {
try {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)
&& (pulsar.getConfiguration().isSystemTopicEnabled()
&& pulsar.getConfiguration().isTopicLevelPoliciesEnabled())) {
Long threshold = pulsar.getAdminClient().topicPolicies().getCompactionThreshold(TOPIC);
if (threshold == null || COMPACTION_THRESHOLD != threshold.longValue()) {
pulsar.getAdminClient().topicPolicies().setCompactionThreshold(TOPIC, COMPACTION_THRESHOLD);
log.info("Set compaction threshold: {} bytes for system topic {}.", COMPACTION_THRESHOLD, TOPIC);
}
} else {
log.warn("System topic or topic level policies is disabled. "
+ "{} compaction threshold follows the broker or namespace policies.", TOPIC);
}
return true;
} catch (Exception e) {
log.error("Failed to set compaction threshold for system topic:{}", TOPIC, e);
}
return false;
}

@Override
public void start() throws PulsarServerException {
if (this.started) {
Expand Down Expand Up @@ -329,9 +359,9 @@ public void start() throws PulsarServerException {

try {
this.brokerLoadDataStore = LoadDataStoreFactory
.create(pulsar.getClient(), BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class);
.create(pulsar, BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class);
this.topBundlesLoadDataStore = LoadDataStoreFactory
.create(pulsar.getClient(), TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class);
.create(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class);
} catch (LoadDataStoreException e) {
throw new PulsarServerException(e);
}
Expand Down Expand Up @@ -389,6 +419,7 @@ public void start() throws PulsarServerException {
this.splitScheduler.start();
this.initWaiter.countDown();
this.started = true;
log.info("Started load manager.");
} catch (Exception ex) {
log.error("Failed to start the extensible load balance and close broker registry {}.",
this.brokerRegistry, ex);
Expand Down Expand Up @@ -523,7 +554,7 @@ private CompletableFuture<Optional<BrokerLookupData>> dedupeLookupRequest(
if (ex != null) {
assignCounter.incrementFailure();
}
lookupRequests.remove(key, newFutureCreated.getValue());
lookupRequests.remove(key);
});
}
}
Expand Down Expand Up @@ -736,13 +767,13 @@ public void close() throws PulsarServerException {
}

public static boolean isInternalTopic(String topic) {
return topic.startsWith(ServiceUnitStateChannelImpl.TOPIC)
return topic.startsWith(TOPIC)
|| topic.startsWith(BROKER_LOAD_DATA_STORE_TOPIC)
|| topic.startsWith(TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
}

@VisibleForTesting
void playLeader() {
synchronized void playLeader() {
log.info("This broker:{} is setting the role from {} to {}",
pulsar.getBrokerId(), role, Leader);
int retry = 0;
Expand All @@ -760,7 +791,7 @@ void playLeader() {
serviceUnitStateChannel.scheduleOwnershipMonitor();
break;
} catch (Throwable e) {
log.error("The broker:{} failed to set the role. Retrying {} th ...",
log.warn("The broker:{} failed to set the role. Retrying {} th ...",
pulsar.getBrokerId(), ++retry, e);
try {
Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
Expand All @@ -780,7 +811,7 @@ void playLeader() {
}

@VisibleForTesting
void playFollower() {
synchronized void playFollower() {
log.info("This broker:{} is setting the role from {} to {}",
pulsar.getBrokerId(), role, Follower);
int retry = 0;
Expand All @@ -794,7 +825,7 @@ void playFollower() {
topBundlesLoadDataStore.startProducer();
break;
} catch (Throwable e) {
log.error("The broker:{} failed to set the role. Retrying {} th ...",
log.warn("The broker:{} failed to set the role. Retrying {} th ...",
pulsar.getBrokerId(), ++retry, e);
try {
Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
Expand Down Expand Up @@ -834,14 +865,21 @@ public List<Metrics> getMetrics() {
return metricsCollection;
}

private void monitor() {

@VisibleForTesting
protected void monitor() {
try {
initWaiter.await();

// Monitor role
// Periodically check the role in case ZK watcher fails.
var isChannelOwner = serviceUnitStateChannel.isChannelOwner();
if (isChannelOwner) {
// System topic config might fail due to the race condition
// with topic policy init(Topic policies cache have not init).
if (!configuredSystemTopics) {
configuredSystemTopics = configureSystemTopics(pulsar);
}
if (role != Leader) {
log.warn("Current role:{} does not match with the channel ownership:{}. "
+ "Playing the leader role.", role, isChannelOwner);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,21 +492,28 @@ private CompletableFuture<Optional<String>> getActiveOwnerAsync(
String serviceUnit,
ServiceUnitState state,
Optional<String> owner) {
CompletableFuture<Optional<String>> activeOwner = owner.isPresent()
? brokerRegistry.lookupAsync(owner.get()).thenApply(lookupData -> lookupData.flatMap(__ -> owner))
: CompletableFuture.completedFuture(Optional.empty());

return activeOwner
.thenCompose(broker -> broker
.map(__ -> activeOwner)
.orElseGet(() -> deferGetOwnerRequest(serviceUnit).thenApply(Optional::ofNullable)))
.whenComplete((__, e) -> {
return deferGetOwnerRequest(serviceUnit)
.thenCompose(newOwner -> {
if (newOwner == null) {
return CompletableFuture.completedFuture(null);
}

return brokerRegistry.lookupAsync(newOwner)
.thenApply(lookupData -> {
if (lookupData.isPresent()) {
return newOwner;
} else {
throw new IllegalStateException(
"The new owner " + newOwner + " is inactive.");
}
});
}).whenComplete((__, e) -> {
if (e != null) {
log.error("Failed to get active owner broker. serviceUnit:{}, state:{}, owner:{}",
serviceUnit, state, owner, e);
log.error("{} failed to get active owner broker. serviceUnit:{}, state:{}, owner:{}",
brokerId, serviceUnit, state, owner, e);
ownerLookUpCounters.get(state).getFailure().incrementAndGet();
}
});
}).thenApply(Optional::ofNullable);
}

public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
Expand Down Expand Up @@ -544,6 +551,25 @@ public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
}
}

private Optional<String> getOwner(String serviceUnit) {
ServiceUnitStateData data = tableview.get(serviceUnit);
ServiceUnitState state = state(data);
switch (state) {
case Owned -> {
return Optional.of(data.dstBroker());
}
case Splitting -> {
return Optional.of(data.sourceBroker());
}
case Init, Free -> {
return Optional.empty();
}
default -> {
return null;
}
}
}

private long getNextVersionId(String serviceUnit) {
var data = tableview.get(serviceUnit);
return getNextVersionId(data);
Expand Down Expand Up @@ -697,7 +723,7 @@ private AtomicLong getHandlerCounter(ServiceUnitStateData data, boolean total) {

private void log(Throwable e, String serviceUnit, ServiceUnitStateData data, ServiceUnitStateData next) {
if (e == null) {
if (log.isDebugEnabled() || isTransferCommand(data)) {
if (debug() || isTransferCommand(data)) {
long handlerTotalCount = getHandlerTotalCounter(data).get();
long handlerFailureCount = getHandlerFailureCounter(data).get();
log.info("{} handled {} event for serviceUnit:{}, cur:{}, next:{}, "
Expand Down Expand Up @@ -736,6 +762,9 @@ private void handleSkippedEvent(String serviceUnit) {
private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) {
var getOwnerRequest = getOwnerRequests.remove(serviceUnit);
if (getOwnerRequest != null) {
if (debug()) {
log.info("Returned owner request for serviceUnit:{}", serviceUnit);
}
getOwnerRequest.complete(data.dstBroker());
}
stateChangeListeners.notify(serviceUnit, data, null);
Expand Down Expand Up @@ -848,26 +877,52 @@ private boolean isTargetBroker(String broker) {
}

private CompletableFuture<String> deferGetOwnerRequest(String serviceUnit) {

var requested = new MutableObject<CompletableFuture<String>>();
try {
return getOwnerRequests
.computeIfAbsent(serviceUnit, k -> {
CompletableFuture<String> future = new CompletableFuture<>();
var ownerBefore = getOwner(serviceUnit);
if (ownerBefore != null && ownerBefore.isPresent()) {
// Here, we do a quick active check first with the computeIfAbsent lock
brokerRegistry.lookupAsync(ownerBefore.get()).getNow(Optional.empty())
.ifPresent(__ -> requested.setValue(
CompletableFuture.completedFuture(ownerBefore.get())));

if (requested.getValue() != null) {
return requested.getValue();
}
}


CompletableFuture<String> future =
new CompletableFuture<String>().orTimeout(inFlightStateWaitingTimeInMillis,
TimeUnit.MILLISECONDS)
.exceptionally(e -> {
var ownerAfter = getOwner(serviceUnit);
log.warn("{} failed to wait for owner for serviceUnit:{}; Trying to "
+ "return the current owner:{}",
brokerId, serviceUnit, ownerAfter, e);
if (ownerAfter == null) {
throw new IllegalStateException(e);
}
return ownerAfter.orElse(null);
});
if (debug()) {
log.info("{} is waiting for owner for serviceUnit:{}", brokerId, serviceUnit);
}
requested.setValue(future);
return future;
});
} finally {
var future = requested.getValue();
if (future != null) {
future.orTimeout(inFlightStateWaitingTimeInMillis + 5 * 1000, TimeUnit.MILLISECONDS)
.whenComplete((v, e) -> {
if (e != null) {
getOwnerRequests.remove(serviceUnit, future);
log.warn("Failed to getOwner for serviceUnit:{}",
serviceUnit, e);
}
}
);
future.whenComplete((__, e) -> {
getOwnerRequests.remove(serviceUnit);
if (e != null) {
log.warn("{} failed to getOwner for serviceUnit:{}", brokerId, serviceUnit, e);
}
});
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions.store;

import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.broker.PulsarService;

/**
* The load data store factory, use to create the load data store.
*/
public class LoadDataStoreFactory {

public static <T> LoadDataStore<T> create(PulsarClient client, String name, Class<T> clazz)
public static <T> LoadDataStore<T> create(PulsarService pulsar, String name,
Class<T> clazz)
throws LoadDataStoreException {
return new TableViewLoadDataStoreImpl<>(client, name, clazz);
return new TableViewLoadDataStoreImpl<>(pulsar, name, clazz);
}
}
Loading

0 comments on commit 7f6d874

Please sign in to comment.