Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Set ServiceUnitStateChannel topic compaction threshold explicitly, improve getOwnerAsync, and fix other bugs #22064

Merged
merged 4 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static java.lang.String.format;
import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Follower;
import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Leader;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.TOPIC;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Admin;
import static org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.getNamespaceBundle;
Expand Down Expand Up @@ -118,6 +119,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {

private static final long MONITOR_INTERVAL_IN_MILLIS = 120_000;

public static final long COMPACTION_THRESHOLD = 5 * 1024 * 1024;

private static final String ELECTION_ROOT = "/loadbalance/extension/leader";

private PulsarService pulsar;
Expand Down Expand Up @@ -174,6 +177,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {

private volatile boolean started = false;

private boolean configuredSystemTopics = false;

private final AssignCounter assignCounter = new AssignCounter();
@Getter
private final UnloadCounter unloadCounter = new UnloadCounter();
Expand Down Expand Up @@ -270,7 +275,7 @@ public static boolean isLoadManagerExtensionEnabled(ServiceConfiguration conf) {
}

public static boolean isLoadManagerExtensionEnabled(PulsarService pulsar) {
return pulsar.getLoadManager().get() instanceof ExtensibleLoadManagerImpl;
return pulsar.getLoadManager().get() instanceof ExtensibleLoadManagerWrapper;
}

public static ExtensibleLoadManagerImpl get(LoadManager loadManager) {
Expand Down Expand Up @@ -311,6 +316,26 @@ private static void createSystemTopics(PulsarService pulsar) throws PulsarServer
createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
}

private static boolean configureSystemTopics(PulsarService pulsar) {
try {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)
&& pulsar.getConfiguration().isSystemTopicAndTopicLevelPoliciesEnabled()) {
Long threshold = pulsar.getAdminClient().topicPolicies().getCompactionThreshold(TOPIC);
if (threshold == null || COMPACTION_THRESHOLD != threshold.longValue()) {
pulsar.getAdminClient().topicPolicies().setCompactionThreshold(TOPIC, COMPACTION_THRESHOLD);
log.info("Set compaction threshold: {} bytes for system topic {}.", COMPACTION_THRESHOLD, TOPIC);
}
} else {
log.warn("System topic or topic level policies is disabled. "
+ "{} compaction threshold follows the broker or namespace policies.", TOPIC);
}
return true;
} catch (Exception e) {
log.error("Failed to set compaction threshold for system topic:{}", TOPIC, e);
}
return false;
}

/**
* Gets the assigned broker for the given topic.
* @param pulsar PulsarService instance
Expand Down Expand Up @@ -383,9 +408,9 @@ public void start() throws PulsarServerException {

try {
this.brokerLoadDataStore = LoadDataStoreFactory
.create(pulsar.getClient(), BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class);
.create(pulsar, BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class);
this.topBundlesLoadDataStore = LoadDataStoreFactory
.create(pulsar.getClient(), TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class);
.create(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class);
} catch (LoadDataStoreException e) {
throw new PulsarServerException(e);
}
Expand Down Expand Up @@ -443,6 +468,7 @@ public void start() throws PulsarServerException {
this.splitScheduler.start();
this.initWaiter.countDown();
this.started = true;
log.info("Started load manager.");
} catch (Exception ex) {
if (this.brokerRegistry != null) {
brokerRegistry.close();
Expand Down Expand Up @@ -575,7 +601,7 @@ private CompletableFuture<Optional<BrokerLookupData>> dedupeLookupRequest(
if (ex != null) {
assignCounter.incrementFailure();
}
lookupRequests.remove(key, newFutureCreated.getValue());
lookupRequests.remove(key);
Technoboy- marked this conversation as resolved.
Show resolved Hide resolved
});
}
}
Expand Down Expand Up @@ -788,13 +814,13 @@ public void close() throws PulsarServerException {
}

public static boolean isInternalTopic(String topic) {
return topic.startsWith(ServiceUnitStateChannelImpl.TOPIC)
return topic.startsWith(TOPIC)
|| topic.startsWith(BROKER_LOAD_DATA_STORE_TOPIC)
|| topic.startsWith(TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
}

@VisibleForTesting
void playLeader() {
synchronized void playLeader() {
log.info("This broker:{} is setting the role from {} to {}",
pulsar.getBrokerId(), role, Leader);
int retry = 0;
Expand All @@ -812,7 +838,7 @@ void playLeader() {
serviceUnitStateChannel.scheduleOwnershipMonitor();
break;
} catch (Throwable e) {
log.error("The broker:{} failed to set the role. Retrying {} th ...",
log.warn("The broker:{} failed to set the role. Retrying {} th ...",
pulsar.getBrokerId(), ++retry, e);
try {
Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
Expand All @@ -832,7 +858,7 @@ void playLeader() {
}

@VisibleForTesting
void playFollower() {
synchronized void playFollower() {
log.info("This broker:{} is setting the role from {} to {}",
pulsar.getBrokerId(), role, Follower);
int retry = 0;
Expand All @@ -846,7 +872,7 @@ void playFollower() {
topBundlesLoadDataStore.startProducer();
break;
} catch (Throwable e) {
log.error("The broker:{} failed to set the role. Retrying {} th ...",
log.warn("The broker:{} failed to set the role. Retrying {} th ...",
pulsar.getBrokerId(), ++retry, e);
try {
Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
Expand Down Expand Up @@ -894,14 +920,20 @@ private List<Metrics> getIgnoredCommandMetrics(String advertisedBrokerAddress) {
return List.of(metric);
}

private void monitor() {
@VisibleForTesting
protected void monitor() {
try {
initWaiter.await();

// Monitor role
// Periodically check the role in case ZK watcher fails.
var isChannelOwner = serviceUnitStateChannel.isChannelOwner();
if (isChannelOwner) {
// System topic config might fail due to the race condition
// with topic policy init(Topic policies cache have not init).
if (!configuredSystemTopics) {
Technoboy- marked this conversation as resolved.
Show resolved Hide resolved
configuredSystemTopics = configureSystemTopics(pulsar);
}
if (role != Leader) {
log.warn("Current role:{} does not match with the channel ownership:{}. "
+ "Playing the leader role.", role, isChannelOwner);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,21 +480,28 @@ private CompletableFuture<Optional<String>> getActiveOwnerAsync(
String serviceUnit,
ServiceUnitState state,
Optional<String> owner) {
CompletableFuture<Optional<String>> activeOwner = owner.isPresent()
? brokerRegistry.lookupAsync(owner.get()).thenApply(lookupData -> lookupData.flatMap(__ -> owner))
: CompletableFuture.completedFuture(Optional.empty());

return activeOwner
.thenCompose(broker -> broker
.map(__ -> activeOwner)
.orElseGet(() -> deferGetOwnerRequest(serviceUnit).thenApply(Optional::ofNullable)))
.whenComplete((__, e) -> {
return deferGetOwnerRequest(serviceUnit)
.thenCompose(newOwner -> {
if (newOwner == null) {
return CompletableFuture.completedFuture(null);
}

return brokerRegistry.lookupAsync(newOwner)
.thenApply(lookupData -> {
if (lookupData.isPresent()) {
return newOwner;
} else {
throw new IllegalStateException(
"The new owner " + newOwner + " is inactive.");
}
});
}).whenComplete((__, e) -> {
if (e != null) {
log.error("Failed to get active owner broker. serviceUnit:{}, state:{}, owner:{}",
serviceUnit, state, owner, e);
log.error("{} failed to get active owner broker. serviceUnit:{}, state:{}, owner:{}",
brokerId, serviceUnit, state, owner, e);
ownerLookUpCounters.get(state).getFailure().incrementAndGet();
}
});
}).thenApply(Optional::ofNullable);
}

public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
Expand Down Expand Up @@ -532,6 +539,25 @@ public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
}
}

private Optional<String> getOwner(String serviceUnit) {
ServiceUnitStateData data = tableview.get(serviceUnit);
ServiceUnitState state = state(data);
switch (state) {
case Owned -> {
return Optional.of(data.dstBroker());
}
case Splitting -> {
return Optional.of(data.sourceBroker());
}
case Init, Free -> {
return Optional.empty();
}
default -> {
return null;
}
}
}

@Override
public Optional<String> getAssigned(String serviceUnit) {
if (!validateChannelState(Started, true)) {
Expand Down Expand Up @@ -720,7 +746,7 @@ private AtomicLong getHandlerCounter(ServiceUnitStateData data, boolean total) {

private void log(Throwable e, String serviceUnit, ServiceUnitStateData data, ServiceUnitStateData next) {
if (e == null) {
if (log.isDebugEnabled() || isTransferCommand(data)) {
if (debug() || isTransferCommand(data)) {
long handlerTotalCount = getHandlerTotalCounter(data).get();
long handlerFailureCount = getHandlerFailureCounter(data).get();
log.info("{} handled {} event for serviceUnit:{}, cur:{}, next:{}, "
Expand Down Expand Up @@ -759,6 +785,9 @@ private void handleSkippedEvent(String serviceUnit) {
private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) {
var getOwnerRequest = getOwnerRequests.remove(serviceUnit);
if (getOwnerRequest != null) {
if (debug()) {
log.info("Returned owner request for serviceUnit:{}", serviceUnit);
}
getOwnerRequest.complete(data.dstBroker());
}

Expand Down Expand Up @@ -883,26 +912,52 @@ private boolean isTargetBroker(String broker) {
}

private CompletableFuture<String> deferGetOwnerRequest(String serviceUnit) {

var requested = new MutableObject<CompletableFuture<String>>();
try {
return getOwnerRequests
.computeIfAbsent(serviceUnit, k -> {
CompletableFuture<String> future = new CompletableFuture<>();
var ownerBefore = getOwner(serviceUnit);
if (ownerBefore != null && ownerBefore.isPresent()) {
// Here, we do a quick active check first with the computeIfAbsent lock
brokerRegistry.lookupAsync(ownerBefore.get()).getNow(Optional.empty())
.ifPresent(__ -> requested.setValue(
CompletableFuture.completedFuture(ownerBefore.get())));

if (requested.getValue() != null) {
return requested.getValue();
}
}


CompletableFuture<String> future =
new CompletableFuture<String>().orTimeout(inFlightStateWaitingTimeInMillis,
TimeUnit.MILLISECONDS)
.exceptionally(e -> {
var ownerAfter = getOwner(serviceUnit);
log.warn("{} failed to wait for owner for serviceUnit:{}; Trying to "
+ "return the current owner:{}",
brokerId, serviceUnit, ownerAfter, e);
if (ownerAfter == null) {
throw new IllegalStateException(e);
}
return ownerAfter.orElse(null);
});
if (debug()) {
log.info("{} is waiting for owner for serviceUnit:{}", brokerId, serviceUnit);
}
requested.setValue(future);
return future;
});
} finally {
var future = requested.getValue();
if (future != null) {
future.orTimeout(inFlightStateWaitingTimeInMillis + 5 * 1000, TimeUnit.MILLISECONDS)
.whenComplete((v, e) -> {
if (e != null) {
getOwnerRequests.remove(serviceUnit, future);
log.warn("Failed to getOwner for serviceUnit:{}",
serviceUnit, e);
}
}
);
future.whenComplete((__, e) -> {
getOwnerRequests.remove(serviceUnit);
if (e != null) {
log.warn("{} failed to getOwner for serviceUnit:{}", brokerId, serviceUnit, e);
}
});
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions.store;

import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.broker.PulsarService;

/**
* The load data store factory, use to create the load data store.
*/
public class LoadDataStoreFactory {

public static <T> LoadDataStore<T> create(PulsarClient client, String name, Class<T> clazz)
public static <T> LoadDataStore<T> create(PulsarService pulsar, String name,
Class<T> clazz)
throws LoadDataStoreException {
return new TableViewLoadDataStoreImpl<>(client, name, clazz);
return new TableViewLoadDataStoreImpl<>(pulsar, name, clazz);
}
}
Loading
Loading