Skip to content

Commit

Permalink
fixed other bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
heesung-sn committed Feb 17, 2024
1 parent c94cc8c commit e445fd0
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static java.lang.String.format;
import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Follower;
import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Leader;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.TOPIC;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Admin;
import static org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.getNamespaceBundle;
Expand Down Expand Up @@ -118,6 +119,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {

private static final long MONITOR_INTERVAL_IN_MILLIS = 120_000;

public static final long COMPACTION_THRESHOLD = 5 * 1024 * 1024;

private static final String ELECTION_ROOT = "/loadbalance/extension/leader";

private PulsarService pulsar;
Expand Down Expand Up @@ -174,6 +177,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {

private volatile boolean started = false;

private boolean configuredSystemTopics = false;

private final AssignCounter assignCounter = new AssignCounter();
@Getter
private final UnloadCounter unloadCounter = new UnloadCounter();
Expand Down Expand Up @@ -270,7 +275,7 @@ public static boolean isLoadManagerExtensionEnabled(ServiceConfiguration conf) {
}

public static boolean isLoadManagerExtensionEnabled(PulsarService pulsar) {
return pulsar.getLoadManager().get() instanceof ExtensibleLoadManagerImpl;
return pulsar.getLoadManager().get() instanceof ExtensibleLoadManagerWrapper;
}

public static ExtensibleLoadManagerImpl get(LoadManager loadManager) {
Expand Down Expand Up @@ -311,6 +316,26 @@ private static void createSystemTopics(PulsarService pulsar) throws PulsarServer
createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
}

private static boolean configureSystemTopics(PulsarService pulsar) {
try {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)
&& pulsar.getConfiguration().isSystemTopicAndTopicLevelPoliciesEnabled()) {
Long threshold = pulsar.getAdminClient().topicPolicies().getCompactionThreshold(TOPIC);
if (threshold == null || COMPACTION_THRESHOLD != threshold.longValue()) {
pulsar.getAdminClient().topicPolicies().setCompactionThreshold(TOPIC, COMPACTION_THRESHOLD);
log.info("Set compaction threshold: {} bytes for system topic {}.", COMPACTION_THRESHOLD, TOPIC);
}
} else {
log.warn("System topic or topic level policies is disabled. "
+ "{} compaction threshold follows the broker or namespace policies.", TOPIC);
}
return true;
} catch (Exception e) {
log.error("Failed to set compaction threshold for system topic:{}", TOPIC, e);
}
return false;
}

/**
* Gets the assigned broker for the given topic.
* @param pulsar PulsarService instance
Expand Down Expand Up @@ -443,6 +468,7 @@ public void start() throws PulsarServerException {
this.splitScheduler.start();
this.initWaiter.countDown();
this.started = true;
log.info("Started load manager.");
} catch (Exception ex) {
if (this.brokerRegistry != null) {
brokerRegistry.close();
Expand Down Expand Up @@ -575,7 +601,7 @@ private CompletableFuture<Optional<BrokerLookupData>> dedupeLookupRequest(
if (ex != null) {
assignCounter.incrementFailure();
}
lookupRequests.remove(key, newFutureCreated.getValue());
lookupRequests.remove(key);
});
}
}
Expand Down Expand Up @@ -788,13 +814,13 @@ public void close() throws PulsarServerException {
}

public static boolean isInternalTopic(String topic) {
return topic.startsWith(ServiceUnitStateChannelImpl.TOPIC)
return topic.startsWith(TOPIC)
|| topic.startsWith(BROKER_LOAD_DATA_STORE_TOPIC)
|| topic.startsWith(TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
}

@VisibleForTesting
void playLeader() {
synchronized void playLeader() {
log.info("This broker:{} is setting the role from {} to {}",
pulsar.getBrokerId(), role, Leader);
int retry = 0;
Expand All @@ -812,7 +838,7 @@ void playLeader() {
serviceUnitStateChannel.scheduleOwnershipMonitor();
break;
} catch (Throwable e) {
log.error("The broker:{} failed to set the role. Retrying {} th ...",
log.warn("The broker:{} failed to set the role. Retrying {} th ...",
pulsar.getBrokerId(), ++retry, e);
try {
Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
Expand All @@ -832,7 +858,7 @@ void playLeader() {
}

@VisibleForTesting
void playFollower() {
synchronized void playFollower() {
log.info("This broker:{} is setting the role from {} to {}",
pulsar.getBrokerId(), role, Follower);
int retry = 0;
Expand All @@ -846,7 +872,7 @@ void playFollower() {
topBundlesLoadDataStore.startProducer();
break;
} catch (Throwable e) {
log.error("The broker:{} failed to set the role. Retrying {} th ...",
log.warn("The broker:{} failed to set the role. Retrying {} th ...",
pulsar.getBrokerId(), ++retry, e);
try {
Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
Expand Down Expand Up @@ -894,14 +920,20 @@ private List<Metrics> getIgnoredCommandMetrics(String advertisedBrokerAddress) {
return List.of(metric);
}

private void monitor() {
@VisibleForTesting
protected void monitor() {
try {
initWaiter.await();

// Monitor role
// Periodically check the role in case ZK watcher fails.
var isChannelOwner = serviceUnitStateChannel.isChannelOwner();
if (isChannelOwner) {
// System topic config might fail due to the race condition
// with topic policy init(Topic policies cache have not init).
if (!configuredSystemTopics) {
configuredSystemTopics = configureSystemTopics(pulsar);
}
if (role != Leader) {
log.warn("Current role:{} does not match with the channel ownership:{}. "
+ "Playing the leader role.", role, isChannelOwner);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
private static final long MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS = 10;
private static final int MAX_OUTSTANDING_PUB_MESSAGES = 500;
private static final long MAX_OWNED_BUNDLE_COUNT_DELAY_TIME_IN_MILLIS = 10 * 60 * 1000;
private static final long COMPACTION_THRESHOLD = 5 * 1024 * 1024;
private final PulsarService pulsar;
private final ServiceConfiguration config;
private final Schema<ServiceUnitStateData> schema;
Expand Down Expand Up @@ -299,14 +298,6 @@ public synchronized void start() throws PulsarServerException {

ExtensibleLoadManagerImpl.createSystemTopic(pulsar, TOPIC);

if (config.isSystemTopicAndTopicLevelPoliciesEnabled()) {
Long threshold = pulsar.getAdminClient().topicPolicies().getCompactionThreshold(TOPIC);
if (threshold == null || COMPACTION_THRESHOLD != threshold) {
pulsar.getAdminClient().topicPolicies().setCompactionThreshold(TOPIC, COMPACTION_THRESHOLD);
log.info("Set compaction threshold for system topic {}.", TOPIC);
}
}

producer = pulsar.getClient().newProducer(schema)
.enableBatching(true)
.compressionType(MSG_COMPRESSION_TYPE)
Expand Down Expand Up @@ -489,21 +480,28 @@ private CompletableFuture<Optional<String>> getActiveOwnerAsync(
String serviceUnit,
ServiceUnitState state,
Optional<String> owner) {
CompletableFuture<Optional<String>> activeOwner = owner.isPresent()
? brokerRegistry.lookupAsync(owner.get()).thenApply(lookupData -> lookupData.flatMap(__ -> owner))
: CompletableFuture.completedFuture(Optional.empty());

return activeOwner
.thenCompose(broker -> broker
.map(__ -> activeOwner)
.orElseGet(() -> deferGetOwnerRequest(serviceUnit).thenApply(Optional::ofNullable)))
.whenComplete((__, e) -> {
return deferGetOwnerRequest(serviceUnit)
.thenCompose(newOwner -> {
if (newOwner == null) {
return CompletableFuture.completedFuture(null);
}

return brokerRegistry.lookupAsync(newOwner)
.thenApply(lookupData -> {
if (lookupData.isPresent()) {
return newOwner;
} else {
throw new IllegalStateException(
"The new owner " + newOwner + " is inactive.");
}
});
}).whenComplete((__, e) -> {
if (e != null) {
log.error("Failed to get active owner broker. serviceUnit:{}, state:{}, owner:{}",
serviceUnit, state, owner, e);
log.error("{} failed to get active owner broker. serviceUnit:{}, state:{}, owner:{}",
brokerId, serviceUnit, state, owner, e);
ownerLookUpCounters.get(state).getFailure().incrementAndGet();
}
});
}).thenApply(Optional::ofNullable);
}

public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
Expand Down Expand Up @@ -541,6 +539,22 @@ public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
}
}

private Optional<String> getOwner(String serviceUnit) {
ServiceUnitStateData data = tableview.get(serviceUnit);
ServiceUnitState state = state(data);
switch (state) {
case Owned -> {
return Optional.of(data.dstBroker());
}
case Splitting -> {
return Optional.of(data.sourceBroker());
}
default -> {
return Optional.empty();
}
}
}

@Override
public Optional<String> getAssigned(String serviceUnit) {
if (!validateChannelState(Started, true)) {
Expand Down Expand Up @@ -729,7 +743,7 @@ private AtomicLong getHandlerCounter(ServiceUnitStateData data, boolean total) {

private void log(Throwable e, String serviceUnit, ServiceUnitStateData data, ServiceUnitStateData next) {
if (e == null) {
if (log.isDebugEnabled() || isTransferCommand(data)) {
if (debug() || isTransferCommand(data)) {
long handlerTotalCount = getHandlerTotalCounter(data).get();
long handlerFailureCount = getHandlerFailureCounter(data).get();
log.info("{} handled {} event for serviceUnit:{}, cur:{}, next:{}, "
Expand Down Expand Up @@ -768,6 +782,9 @@ private void handleSkippedEvent(String serviceUnit) {
private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) {
var getOwnerRequest = getOwnerRequests.remove(serviceUnit);
if (getOwnerRequest != null) {
if (debug()) {
log.info("Returned owner request for serviceUnit:{}", serviceUnit);
}
getOwnerRequest.complete(data.dstBroker());
}

Expand Down Expand Up @@ -892,26 +909,47 @@ private boolean isTargetBroker(String broker) {
}

private CompletableFuture<String> deferGetOwnerRequest(String serviceUnit) {

var requested = new MutableObject<CompletableFuture<String>>();
try {
return getOwnerRequests
.computeIfAbsent(serviceUnit, k -> {
CompletableFuture<String> future = new CompletableFuture<>();
var ownerBefore = getOwner(serviceUnit);
if (ownerBefore.isPresent()) {
// Here, we do a quick active check first with the computeIfAbsent lock
brokerRegistry.lookupAsync(ownerBefore.get()).getNow(Optional.empty())
.ifPresent(__ -> requested.setValue(
CompletableFuture.completedFuture(ownerBefore.get())));
if (requested.getValue() != null) {
return requested.getValue();
}
}

CompletableFuture<String> future =
new CompletableFuture<String>().orTimeout(inFlightStateWaitingTimeInMillis,
TimeUnit.MILLISECONDS)
.exceptionally(e -> {
var ownerAfter = getOwner(serviceUnit);
log.warn("{} failed to wait for owner for serviceUnit:{}; Trying to "
+ "return the current owner:{}",
brokerId, serviceUnit, ownerAfter, e);
return ownerAfter.orElse(null);
});
if (debug()) {
log.info("{} is waiting for owner for serviceUnit:{}", brokerId, serviceUnit);
}
requested.setValue(future);
return future;
});
} finally {
var future = requested.getValue();
if (future != null) {
future.orTimeout(inFlightStateWaitingTimeInMillis + 5 * 1000, TimeUnit.MILLISECONDS)
.whenComplete((v, e) -> {
if (e != null) {
getOwnerRequests.remove(serviceUnit, future);
log.warn("Failed to getOwner for serviceUnit:{}",
serviceUnit, e);
}
}
);
future.whenComplete((__, e) -> {
getOwnerRequests.remove(serviceUnit);
if (e != null) {
log.warn("{} failed to getOwner for serviceUnit:{}", brokerId, serviceUnit, e);
}
});
}
}
}
Expand Down
Loading

0 comments on commit e445fd0

Please sign in to comment.