From f35692431839267a22dce232b1162193c7ec25a3 Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Mon, 12 Feb 2024 15:29:39 -0800 Subject: [PATCH 1/4] [fix][broker] Set the serviceUnitStateChannel topic compaction threshold explicitly. --- .../channel/ServiceUnitStateChannelImpl.java | 7 ++++++ .../channel/ServiceUnitStateChannelTest.java | 25 +++++++++++-------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index d84b0eaf0d238..c6f3e9b427ae1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -121,6 +121,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { private static final long MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS = 10; private static final int MAX_OUTSTANDING_PUB_MESSAGES = 500; private static final long MAX_OWNED_BUNDLE_COUNT_DELAY_TIME_IN_MILLIS = 10 * 60 * 1000; + private static final long COMPACTION_THRESHOLD = 5 * 1024 * 1024; private final PulsarService pulsar; private final ServiceConfiguration config; private final Schema schema; @@ -298,6 +299,12 @@ public synchronized void start() throws PulsarServerException { ExtensibleLoadManagerImpl.createSystemTopic(pulsar, TOPIC); + Long threshold = pulsar.getAdminClient().topicPolicies().getCompactionThreshold(TOPIC); + if (threshold == null || COMPACTION_THRESHOLD != threshold) { + pulsar.getAdminClient().topicPolicies().setCompactionThreshold(TOPIC, COMPACTION_THRESHOLD); + log.info("Set compaction threshold for system topic {}.", TOPIC); + } + producer = pulsar.getClient().newProducer(schema) .enableBatching(true) .compressionType(MSG_COMPRESSION_TYPE) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index f7816151a4242..505a3802e07a6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -87,7 +87,6 @@ import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.impl.TableViewImpl; import org.apache.pulsar.common.policies.data.TopicType; @@ -336,18 +335,14 @@ private int validateChannelStart(ServiceUnitStateChannelImpl channel) @Test(priority = 1) public void compactionScheduleTest() { - Awaitility.await() .pollInterval(200, TimeUnit.MILLISECONDS) .atMost(5, TimeUnit.SECONDS) + .ignoreExceptions() .untilAsserted(() -> { // wait until true - try { - var threshold = admin.topicPolicies() - .getCompactionThreshold(ServiceUnitStateChannelImpl.TOPIC, false).longValue(); - assertEquals(5 * 1024 * 1024, threshold); - } catch (Exception e) { - ; - } + var threshold = admin.topicPolicies() + .getCompactionThreshold(ServiceUnitStateChannelImpl.TOPIC, false); + assertEquals(5 * 1024 * 1024, threshold == null ? 0 : threshold.longValue()); }); } @@ -930,8 +925,7 @@ public void handleBrokerDeletionEventTest() } @Test(priority = 10) - public void conflictAndCompactionTest() throws ExecutionException, InterruptedException, TimeoutException, - IllegalAccessException, PulsarClientException, PulsarServerException { + public void conflictAndCompactionTest() throws Exception { String bundle = String.format("%s/%s", "public/default", "0x0000000a_0xffffffff"); var owner1 = channel1.getOwnerAsync(bundle); var owner2 = channel2.getOwnerAsync(bundle); @@ -964,6 +958,12 @@ public void conflictAndCompactionTest() throws ExecutionException, InterruptedEx Field strategicCompactorField = FieldUtils.getDeclaredField(PulsarService.class, "strategicCompactor", true); FieldUtils.writeField(strategicCompactorField, pulsar1, compactor, true); FieldUtils.writeField(strategicCompactorField, pulsar2, compactor, true); + + var threshold = admin.topicPolicies() + .getCompactionThreshold(ServiceUnitStateChannelImpl.TOPIC, false); + admin.topicPolicies() + .setCompactionThreshold(ServiceUnitStateChannelImpl.TOPIC, 0); + Awaitility.await() .pollInterval(200, TimeUnit.MILLISECONDS) .atMost(140, TimeUnit.SECONDS) @@ -984,6 +984,9 @@ public void conflictAndCompactionTest() throws ExecutionException, InterruptedEx channel3.close(); FieldUtils.writeDeclaredField(channel2, "inFlightStateWaitingTimeInMillis", 30 * 1000, true); + admin.topicPolicies() + .setCompactionThreshold(ServiceUnitStateChannelImpl.TOPIC, threshold); + } @Test(priority = 11) From c94cc8c211eb915357c56fb9671010c959b5bf78 Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Mon, 12 Feb 2024 17:07:41 -0800 Subject: [PATCH 2/4] resolved comments --- .../channel/ServiceUnitStateChannelImpl.java | 10 ++-- .../channel/ServiceUnitStateChannelTest.java | 46 ++++++++++--------- 2 files changed, 31 insertions(+), 25 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index c6f3e9b427ae1..5e5b8344fbc86 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -299,10 +299,12 @@ public synchronized void start() throws PulsarServerException { ExtensibleLoadManagerImpl.createSystemTopic(pulsar, TOPIC); - Long threshold = pulsar.getAdminClient().topicPolicies().getCompactionThreshold(TOPIC); - if (threshold == null || COMPACTION_THRESHOLD != threshold) { - pulsar.getAdminClient().topicPolicies().setCompactionThreshold(TOPIC, COMPACTION_THRESHOLD); - log.info("Set compaction threshold for system topic {}.", TOPIC); + if (config.isSystemTopicAndTopicLevelPoliciesEnabled()) { + Long threshold = pulsar.getAdminClient().topicPolicies().getCompactionThreshold(TOPIC); + if (threshold == null || COMPACTION_THRESHOLD != threshold) { + pulsar.getAdminClient().topicPolicies().setCompactionThreshold(TOPIC, COMPACTION_THRESHOLD); + log.info("Set compaction threshold for system topic {}.", TOPIC); + } } producer = pulsar.getClient().newProducer(schema) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index 505a3802e07a6..011edf6ea3b21 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -964,28 +964,32 @@ public void conflictAndCompactionTest() throws Exception { admin.topicPolicies() .setCompactionThreshold(ServiceUnitStateChannelImpl.TOPIC, 0); - Awaitility.await() - .pollInterval(200, TimeUnit.MILLISECONDS) - .atMost(140, TimeUnit.SECONDS) - .untilAsserted(() -> { - channel1.publishAssignEventAsync(bundle, brokerId1); - verify(compactor, times(1)) - .compact(eq(ServiceUnitStateChannelImpl.TOPIC), any()); - }); - + try { + Awaitility.await() + .pollInterval(200, TimeUnit.MILLISECONDS) + .atMost(140, TimeUnit.SECONDS) + .untilAsserted(() -> { + channel1.publishAssignEventAsync(bundle, brokerId1); + verify(compactor, times(1)) + .compact(eq(ServiceUnitStateChannelImpl.TOPIC), any()); + }); + + + var channel3 = createChannel(pulsar); + channel3.start(); + Awaitility.await() + .pollInterval(200, TimeUnit.MILLISECONDS) + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> assertEquals( + channel3.getOwnerAsync(bundle).get(), Optional.of(brokerId1))); + channel3.close(); + } finally { + FieldUtils.writeDeclaredField(channel2, + "inFlightStateWaitingTimeInMillis", 30 * 1000, true); + admin.topicPolicies() + .setCompactionThreshold(ServiceUnitStateChannelImpl.TOPIC, threshold); + } - var channel3 = createChannel(pulsar); - channel3.start(); - Awaitility.await() - .pollInterval(200, TimeUnit.MILLISECONDS) - .atMost(5, TimeUnit.SECONDS) - .untilAsserted(() -> assertEquals( - channel3.getOwnerAsync(bundle).get(), Optional.of(brokerId1))); - channel3.close(); - FieldUtils.writeDeclaredField(channel2, - "inFlightStateWaitingTimeInMillis", 30 * 1000, true); - admin.topicPolicies() - .setCompactionThreshold(ServiceUnitStateChannelImpl.TOPIC, threshold); } From 8cd8be18950e42ad76c75b70a926efcf10da8e19 Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Wed, 14 Feb 2024 16:38:00 -0800 Subject: [PATCH 3/4] fixed other bugs --- .../extensions/ExtensibleLoadManagerImpl.java | 52 +++++++-- .../channel/ServiceUnitStateChannelImpl.java | 110 +++++++++++++----- .../store/LoadDataStoreFactory.java | 7 +- .../store/TableViewLoadDataStoreImpl.java | 63 +++++++--- .../impl/RawBatchMessageContainerImpl.java | 1 + .../ExtensibleLoadManagerImplTest.java | 71 ++++++----- .../channel/ServiceUnitStateChannelTest.java | 26 ++--- .../extensions/store/LoadDataStoreTest.java | 41 ++----- .../pulsar/client/impl/TableViewImpl.java | 8 +- 9 files changed, 242 insertions(+), 137 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 3ebc2a52ecc73..2e660bc6e1b8b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -21,6 +21,7 @@ import static java.lang.String.format; import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Follower; import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Leader; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.TOPIC; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success; import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Admin; import static org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.getNamespaceBundle; @@ -118,6 +119,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { private static final long MONITOR_INTERVAL_IN_MILLIS = 120_000; + public static final long COMPACTION_THRESHOLD = 5 * 1024 * 1024; + private static final String ELECTION_ROOT = "/loadbalance/extension/leader"; private PulsarService pulsar; @@ -174,6 +177,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { private volatile boolean started = false; + private boolean configuredSystemTopics = false; + private final AssignCounter assignCounter = new AssignCounter(); @Getter private final UnloadCounter unloadCounter = new UnloadCounter(); @@ -270,7 +275,7 @@ public static boolean isLoadManagerExtensionEnabled(ServiceConfiguration conf) { } public static boolean isLoadManagerExtensionEnabled(PulsarService pulsar) { - return pulsar.getLoadManager().get() instanceof ExtensibleLoadManagerImpl; + return pulsar.getLoadManager().get() instanceof ExtensibleLoadManagerWrapper; } public static ExtensibleLoadManagerImpl get(LoadManager loadManager) { @@ -311,6 +316,26 @@ private static void createSystemTopics(PulsarService pulsar) throws PulsarServer createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC); } + private static boolean configureSystemTopics(PulsarService pulsar) { + try { + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar) + && pulsar.getConfiguration().isSystemTopicAndTopicLevelPoliciesEnabled()) { + Long threshold = pulsar.getAdminClient().topicPolicies().getCompactionThreshold(TOPIC); + if (threshold == null || COMPACTION_THRESHOLD != threshold.longValue()) { + pulsar.getAdminClient().topicPolicies().setCompactionThreshold(TOPIC, COMPACTION_THRESHOLD); + log.info("Set compaction threshold: {} bytes for system topic {}.", COMPACTION_THRESHOLD, TOPIC); + } + } else { + log.warn("System topic or topic level policies is disabled. " + + "{} compaction threshold follows the broker or namespace policies.", TOPIC); + } + return true; + } catch (Exception e) { + log.error("Failed to set compaction threshold for system topic:{}", TOPIC, e); + } + return false; + } + /** * Gets the assigned broker for the given topic. * @param pulsar PulsarService instance @@ -383,9 +408,9 @@ public void start() throws PulsarServerException { try { this.brokerLoadDataStore = LoadDataStoreFactory - .create(pulsar.getClient(), BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class); + .create(pulsar, BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class); this.topBundlesLoadDataStore = LoadDataStoreFactory - .create(pulsar.getClient(), TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class); + .create(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class); } catch (LoadDataStoreException e) { throw new PulsarServerException(e); } @@ -443,6 +468,7 @@ public void start() throws PulsarServerException { this.splitScheduler.start(); this.initWaiter.countDown(); this.started = true; + log.info("Started load manager."); } catch (Exception ex) { if (this.brokerRegistry != null) { brokerRegistry.close(); @@ -575,7 +601,7 @@ private CompletableFuture> dedupeLookupRequest( if (ex != null) { assignCounter.incrementFailure(); } - lookupRequests.remove(key, newFutureCreated.getValue()); + lookupRequests.remove(key); }); } } @@ -788,13 +814,13 @@ public void close() throws PulsarServerException { } public static boolean isInternalTopic(String topic) { - return topic.startsWith(ServiceUnitStateChannelImpl.TOPIC) + return topic.startsWith(TOPIC) || topic.startsWith(BROKER_LOAD_DATA_STORE_TOPIC) || topic.startsWith(TOP_BUNDLES_LOAD_DATA_STORE_TOPIC); } @VisibleForTesting - void playLeader() { + synchronized void playLeader() { log.info("This broker:{} is setting the role from {} to {}", pulsar.getBrokerId(), role, Leader); int retry = 0; @@ -812,7 +838,7 @@ void playLeader() { serviceUnitStateChannel.scheduleOwnershipMonitor(); break; } catch (Throwable e) { - log.error("The broker:{} failed to set the role. Retrying {} th ...", + log.warn("The broker:{} failed to set the role. Retrying {} th ...", pulsar.getBrokerId(), ++retry, e); try { Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS)); @@ -832,7 +858,7 @@ void playLeader() { } @VisibleForTesting - void playFollower() { + synchronized void playFollower() { log.info("This broker:{} is setting the role from {} to {}", pulsar.getBrokerId(), role, Follower); int retry = 0; @@ -846,7 +872,7 @@ void playFollower() { topBundlesLoadDataStore.startProducer(); break; } catch (Throwable e) { - log.error("The broker:{} failed to set the role. Retrying {} th ...", + log.warn("The broker:{} failed to set the role. Retrying {} th ...", pulsar.getBrokerId(), ++retry, e); try { Thread.sleep(Math.min(retry * 10, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS)); @@ -894,7 +920,8 @@ private List getIgnoredCommandMetrics(String advertisedBrokerAddress) { return List.of(metric); } - private void monitor() { + @VisibleForTesting + protected void monitor() { try { initWaiter.await(); @@ -902,6 +929,11 @@ private void monitor() { // Periodically check the role in case ZK watcher fails. var isChannelOwner = serviceUnitStateChannel.isChannelOwner(); if (isChannelOwner) { + // System topic config might fail due to the race condition + // with topic policy init(Topic policies cache have not init). + if (!configuredSystemTopics) { + configuredSystemTopics = configureSystemTopics(pulsar); + } if (role != Leader) { log.warn("Current role:{} does not match with the channel ownership:{}. " + "Playing the leader role.", role, isChannelOwner); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 5e5b8344fbc86..92a15c4dc5be2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -121,7 +121,6 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { private static final long MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS = 10; private static final int MAX_OUTSTANDING_PUB_MESSAGES = 500; private static final long MAX_OWNED_BUNDLE_COUNT_DELAY_TIME_IN_MILLIS = 10 * 60 * 1000; - private static final long COMPACTION_THRESHOLD = 5 * 1024 * 1024; private final PulsarService pulsar; private final ServiceConfiguration config; private final Schema schema; @@ -299,14 +298,6 @@ public synchronized void start() throws PulsarServerException { ExtensibleLoadManagerImpl.createSystemTopic(pulsar, TOPIC); - if (config.isSystemTopicAndTopicLevelPoliciesEnabled()) { - Long threshold = pulsar.getAdminClient().topicPolicies().getCompactionThreshold(TOPIC); - if (threshold == null || COMPACTION_THRESHOLD != threshold) { - pulsar.getAdminClient().topicPolicies().setCompactionThreshold(TOPIC, COMPACTION_THRESHOLD); - log.info("Set compaction threshold for system topic {}.", TOPIC); - } - } - producer = pulsar.getClient().newProducer(schema) .enableBatching(true) .compressionType(MSG_COMPRESSION_TYPE) @@ -489,21 +480,28 @@ private CompletableFuture> getActiveOwnerAsync( String serviceUnit, ServiceUnitState state, Optional owner) { - CompletableFuture> activeOwner = owner.isPresent() - ? brokerRegistry.lookupAsync(owner.get()).thenApply(lookupData -> lookupData.flatMap(__ -> owner)) - : CompletableFuture.completedFuture(Optional.empty()); - - return activeOwner - .thenCompose(broker -> broker - .map(__ -> activeOwner) - .orElseGet(() -> deferGetOwnerRequest(serviceUnit).thenApply(Optional::ofNullable))) - .whenComplete((__, e) -> { + return deferGetOwnerRequest(serviceUnit) + .thenCompose(newOwner -> { + if (newOwner == null) { + return CompletableFuture.completedFuture(null); + } + + return brokerRegistry.lookupAsync(newOwner) + .thenApply(lookupData -> { + if (lookupData.isPresent()) { + return newOwner; + } else { + throw new IllegalStateException( + "The new owner " + newOwner + " is inactive."); + } + }); + }).whenComplete((__, e) -> { if (e != null) { - log.error("Failed to get active owner broker. serviceUnit:{}, state:{}, owner:{}", - serviceUnit, state, owner, e); + log.error("{} failed to get active owner broker. serviceUnit:{}, state:{}, owner:{}", + brokerId, serviceUnit, state, owner, e); ownerLookUpCounters.get(state).getFailure().incrementAndGet(); } - }); + }).thenApply(Optional::ofNullable); } public CompletableFuture> getOwnerAsync(String serviceUnit) { @@ -541,6 +539,25 @@ public CompletableFuture> getOwnerAsync(String serviceUnit) { } } + private Optional getOwner(String serviceUnit) { + ServiceUnitStateData data = tableview.get(serviceUnit); + ServiceUnitState state = state(data); + switch (state) { + case Owned -> { + return Optional.of(data.dstBroker()); + } + case Splitting -> { + return Optional.of(data.sourceBroker()); + } + case Init, Free -> { + return Optional.empty(); + } + default -> { + return null; + } + } + } + @Override public Optional getAssigned(String serviceUnit) { if (!validateChannelState(Started, true)) { @@ -729,7 +746,7 @@ private AtomicLong getHandlerCounter(ServiceUnitStateData data, boolean total) { private void log(Throwable e, String serviceUnit, ServiceUnitStateData data, ServiceUnitStateData next) { if (e == null) { - if (log.isDebugEnabled() || isTransferCommand(data)) { + if (debug() || isTransferCommand(data)) { long handlerTotalCount = getHandlerTotalCounter(data).get(); long handlerFailureCount = getHandlerFailureCounter(data).get(); log.info("{} handled {} event for serviceUnit:{}, cur:{}, next:{}, " @@ -768,6 +785,9 @@ private void handleSkippedEvent(String serviceUnit) { private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) { var getOwnerRequest = getOwnerRequests.remove(serviceUnit); if (getOwnerRequest != null) { + if (debug()) { + log.info("Returned owner request for serviceUnit:{}", serviceUnit); + } getOwnerRequest.complete(data.dstBroker()); } @@ -892,26 +912,52 @@ private boolean isTargetBroker(String broker) { } private CompletableFuture deferGetOwnerRequest(String serviceUnit) { + var requested = new MutableObject>(); try { return getOwnerRequests .computeIfAbsent(serviceUnit, k -> { - CompletableFuture future = new CompletableFuture<>(); + var ownerBefore = getOwner(serviceUnit); + if (ownerBefore != null && ownerBefore.isPresent()) { + // Here, we do a quick active check first with the computeIfAbsent lock + brokerRegistry.lookupAsync(ownerBefore.get()).getNow(Optional.empty()) + .ifPresent(__ -> requested.setValue( + CompletableFuture.completedFuture(ownerBefore.get()))); + + if (requested.getValue() != null) { + return requested.getValue(); + } + } + + + CompletableFuture future = + new CompletableFuture().orTimeout(inFlightStateWaitingTimeInMillis, + TimeUnit.MILLISECONDS) + .exceptionally(e -> { + var ownerAfter = getOwner(serviceUnit); + log.warn("{} failed to wait for owner for serviceUnit:{}; Trying to " + + "return the current owner:{}", + brokerId, serviceUnit, ownerAfter, e); + if (ownerAfter == null) { + throw new IllegalStateException(e); + } + return ownerAfter.orElse(null); + }); + if (debug()) { + log.info("{} is waiting for owner for serviceUnit:{}", brokerId, serviceUnit); + } requested.setValue(future); return future; }); } finally { var future = requested.getValue(); if (future != null) { - future.orTimeout(inFlightStateWaitingTimeInMillis + 5 * 1000, TimeUnit.MILLISECONDS) - .whenComplete((v, e) -> { - if (e != null) { - getOwnerRequests.remove(serviceUnit, future); - log.warn("Failed to getOwner for serviceUnit:{}", - serviceUnit, e); - } - } - ); + future.whenComplete((__, e) -> { + getOwnerRequests.remove(serviceUnit); + if (e != null) { + log.warn("{} failed to getOwner for serviceUnit:{}", brokerId, serviceUnit, e); + } + }); } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreFactory.java index 18f39abd76b76..bcb2657c67f05 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreFactory.java @@ -18,15 +18,16 @@ */ package org.apache.pulsar.broker.loadbalance.extensions.store; -import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.broker.PulsarService; /** * The load data store factory, use to create the load data store. */ public class LoadDataStoreFactory { - public static LoadDataStore create(PulsarClient client, String name, Class clazz) + public static LoadDataStore create(PulsarService pulsar, String name, + Class clazz) throws LoadDataStoreException { - return new TableViewLoadDataStoreImpl<>(client, name, clazz); + return new TableViewLoadDataStoreImpl<>(pulsar, name, clazz); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java index 56afbef04565c..454f7b4dd98ff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java @@ -24,33 +24,41 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TableView; -import org.apache.pulsar.common.util.FutureUtil; /** * The load data store, base on {@link TableView }. * * @param Load data type. */ +@Slf4j public class TableViewLoadDataStoreImpl implements LoadDataStore { private volatile TableView tableView; + private volatile long tableViewLastUpdateTimestamp; private volatile Producer producer; + private final ServiceConfiguration conf; + private final PulsarClient client; private final String topic; private final Class clazz; - public TableViewLoadDataStoreImpl(PulsarClient client, String topic, Class clazz) throws LoadDataStoreException { + public TableViewLoadDataStoreImpl(PulsarService pulsar, String topic, Class clazz) + throws LoadDataStoreException { try { - this.client = client; + this.conf = pulsar.getConfiguration(); + this.client = pulsar.getClient(); this.topic = topic; this.clazz = clazz; } catch (Exception e) { @@ -60,40 +68,36 @@ public TableViewLoadDataStoreImpl(PulsarClient client, String topic, Class cl @Override public synchronized CompletableFuture pushAsync(String key, T loadData) { - if (producer == null) { - return FutureUtil.failedFuture(new IllegalStateException("producer has not been started")); - } + validateProducer(); return producer.newMessage().key(key).value(loadData).sendAsync().thenAccept(__ -> {}); } @Override public synchronized CompletableFuture removeAsync(String key) { - if (producer == null) { - return FutureUtil.failedFuture(new IllegalStateException("producer has not been started")); - } + validateProducer(); return producer.newMessage().key(key).value(null).sendAsync().thenAccept(__ -> {}); } @Override public synchronized Optional get(String key) { - validateTableViewStart(); + validateTableView(); return Optional.ofNullable(tableView.get(key)); } @Override public synchronized void forEach(BiConsumer action) { - validateTableViewStart(); + validateTableView(); tableView.forEach(action); } public synchronized Set> entrySet() { - validateTableViewStart(); + validateTableView(); return tableView.entrySet(); } @Override public synchronized int size() { - validateTableViewStart(); + validateTableView(); return tableView.size(); } @@ -116,6 +120,8 @@ public synchronized void startTableView() throws LoadDataStoreException { if (tableView == null) { try { tableView = client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).create(); + tableView.forEachAndListen((k, v) -> + tableViewLastUpdateTimestamp = System.currentTimeMillis()); } catch (PulsarClientException e) { tableView = null; throw new LoadDataStoreException(e); @@ -150,9 +156,34 @@ public synchronized void init() throws IOException { start(); } - private synchronized void validateTableViewStart() { - if (tableView == null) { - throw new IllegalStateException("table view has not been started"); + private void validateProducer() { + if (producer == null || !producer.isConnected()) { + try { + if (producer != null) { + producer.close(); + } + producer = null; + startProducer(); + log.info("Restarted producer on {}", topic); + } catch (Exception e) { + log.error("Failed to restart producer on {}", topic, e); + throw new RuntimeException(e); + } + } + } + + private void validateTableView() { + if (tableView == null || System.currentTimeMillis() - tableViewLastUpdateTimestamp + > ((long) conf.getLoadBalancerReportUpdateMaxIntervalMinutes()) * 60 * 1000 * 2) { + tableViewLastUpdateTimestamp = 0; + try { + closeTableView(); + startTableView(); + log.info("Restarted tableview on {}", topic); + } catch (Exception e) { + log.error("Failed to restart tableview on {}", topic, e); + throw new RuntimeException(e); + } } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java index ba8d3db7178d9..374f1e30c0a89 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java @@ -187,6 +187,7 @@ public ByteBuf toByteBuf() { idData.writeTo(buf); buf.writeInt(metadataAndPayload.readableBytes()); buf.writeBytes(metadataAndPayload); + metadataAndPayload.release(); encryptedPayload.release(); clear(); return buf; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 755d0c2a2ec13..fd2b0a6320072 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -70,6 +70,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -134,6 +135,7 @@ import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage; import org.awaitility.Awaitility; +import org.testng.AssertJUnit; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; @@ -164,11 +166,7 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest { private LookupService lookupService; - @BeforeClass - @Override - public void setup() throws Exception { - // Set the inflight state waiting time and ownership monitor delay time to 5 seconds to avoid - // stuck when doing unload. + private static void initConfig(ServiceConfiguration conf){ conf.setLoadBalancerInFlightServiceUnitStateWaitingTimeInMillis(5 * 1000); conf.setLoadBalancerServiceUnitStateMonitorIntervalInSeconds(1); conf.setForceDeleteNamespaceAllowed(true); @@ -179,15 +177,18 @@ public void setup() throws Exception { conf.setLoadBalancerSheddingEnabled(false); conf.setLoadBalancerDebugModeEnabled(true); conf.setTopicLevelPoliciesEnabled(true); + } + + @BeforeClass + @Override + public void setup() throws Exception { + // Set the inflight state waiting time and ownership monitor delay time to 5 seconds to avoid + // stuck when doing unload. + initConfig(conf); super.internalSetup(conf); pulsar1 = pulsar; ServiceConfiguration defaultConf = getDefaultConf(); - defaultConf.setAllowAutoTopicCreation(true); - defaultConf.setForceDeleteNamespaceAllowed(true); - defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName()); - defaultConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName()); - defaultConf.setLoadBalancerSheddingEnabled(false); - defaultConf.setTopicLevelPoliciesEnabled(true); + initConfig(defaultConf); additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf); pulsar2 = additionalPulsarTestContext.getPulsarService(); @@ -213,10 +214,8 @@ public void setup() throws Exception { @Override @AfterClass(alwaysRun = true) protected void cleanup() throws Exception { - pulsar1 = null; - pulsar2.close(); - super.internalCleanup(); this.additionalPulsarTestContext.close(); + super.internalCleanup(); } @BeforeMethod(alwaysRun = true) @@ -258,9 +257,6 @@ public void testAssign() throws Exception { Optional brokerLookupData1 = secondaryLoadManager.assign(Optional.empty(), bundle).get(); assertEquals(brokerLookupData, brokerLookupData1); - verify(primaryLoadManager, times(1)).getBrokerSelectionStrategy(); - verify(secondaryLoadManager, times(0)).getBrokerSelectionStrategy(); - Optional lookupResult = pulsar2.getNamespaceService() .getBrokerServiceUrlAsync(topicName, null).get(); assertTrue(lookupResult.isPresent()); @@ -700,13 +696,15 @@ public void testOptimizeUnloadDisable(TopicDomain topicDomain) throws Exception Awaitility.await().atMost(20, TimeUnit.SECONDS).ignoreExceptions().until(() -> { var message = String.format("message-%d", sendCount.getValue()); - boolean messageSent = false; + AtomicBoolean messageSent = new AtomicBoolean(false); while (true) { var recvFuture = consumer.receiveAsync().orTimeout(1000, TimeUnit.MILLISECONDS); - - if (!messageSent) { - producer.send(message); - messageSent = true; + if (!messageSent.get()) { + producer.sendAsync(message).thenAccept(messageId -> { + if (messageId != null) { + messageSent.set(true); + } + }).get(1000, TimeUnit.MILLISECONDS); } if (topicDomain == TopicDomain.non_persistent) { @@ -824,7 +822,8 @@ public void testSplitBundleWithSpecificPositionAdminAPI() throws Exception { "specified_positions_divide", List.of(bundleRanges.get(0), bundleRanges.get(1), splitPosition)); BundlesData bundlesData = admin.namespaces().getBundles(namespace); - assertEquals(bundlesData.getNumBundles(), numBundles + 1); + Awaitility.waitAtMost(15, TimeUnit.SECONDS) + .untilAsserted(() -> assertEquals(bundlesData.getNumBundles(), numBundles + 1)); String lowBundle = String.format("0x%08x", bundleRanges.get(0)); String midBundle = String.format("0x%08x", splitPosition); String highBundle = String.format("0x%08x", bundleRanges.get(1)); @@ -838,10 +837,8 @@ public void testDeleteNamespaceBundle() throws Exception { admin.namespaces().createNamespace(namespace, 3); TopicName topicName = TopicName.get(namespace + "/test-delete-namespace-bundle"); - - Awaitility.await() - .atMost(30, TimeUnit.SECONDS) + .atMost(15, TimeUnit.SECONDS) .ignoreExceptions() .untilAsserted(() -> { NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get(); @@ -854,7 +851,10 @@ public void testDeleteNamespaceBundle() throws Exception { assertFalse(primaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); }); - admin.namespaces().deleteNamespace(namespace, true); + Awaitility.await() + .atMost(15, TimeUnit.SECONDS) + .ignoreExceptions() + .untilAsserted(() -> admin.namespaces().deleteNamespace(namespace, true)); } @Test(timeOut = 30 * 1000) @@ -1080,7 +1080,7 @@ private void assertLookupSLANamespaceOwner(PulsarService pulsar, assertEquals(result, expectedBrokerServiceUrl); } - @Test + @Test(priority = 10) public void testTopBundlesLoadDataStoreTableViewFromChannelOwner() throws Exception { var topBundlesLoadDataStorePrimary = (LoadDataStore) FieldUtils.readDeclaredField(primaryLoadManager, "topBundlesLoadDataStore", true); @@ -1598,6 +1598,21 @@ public void testHealthcheck() throws PulsarAdminException { admin.brokers().healthcheck(TopicVersion.V2); } + @Test(timeOut = 30 * 1000) + public void compactionScheduleTest() { + Awaitility.await() + .pollInterval(200, TimeUnit.MILLISECONDS) + .atMost(30, TimeUnit.SECONDS) + .ignoreExceptions() + .untilAsserted(() -> { // wait until true + primaryLoadManager.monitor(); + secondaryLoadManager.monitor(); + var threshold = admin.topicPolicies() + .getCompactionThreshold(ServiceUnitStateChannelImpl.TOPIC, false); + AssertJUnit.assertEquals(5 * 1024 * 1024, threshold == null ? 0 : threshold.longValue()); + }); + } + private static abstract class MockBrokerFilter implements BrokerFilter { @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index 011edf6ea3b21..f99594481b67a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -333,19 +333,6 @@ private int validateChannelStart(ServiceUnitStateChannelImpl channel) return errorCnt; } - @Test(priority = 1) - public void compactionScheduleTest() { - Awaitility.await() - .pollInterval(200, TimeUnit.MILLISECONDS) - .atMost(5, TimeUnit.SECONDS) - .ignoreExceptions() - .untilAsserted(() -> { // wait until true - var threshold = admin.topicPolicies() - .getCompactionThreshold(ServiceUnitStateChannelImpl.TOPIC, false); - assertEquals(5 * 1024 * 1024, threshold == null ? 0 : threshold.longValue()); - }); - } - @Test(priority = 2) public void assignmentTest() throws ExecutionException, InterruptedException, IllegalAccessException, TimeoutException { @@ -960,7 +947,7 @@ public void conflictAndCompactionTest() throws Exception { FieldUtils.writeField(strategicCompactorField, pulsar2, compactor, true); var threshold = admin.topicPolicies() - .getCompactionThreshold(ServiceUnitStateChannelImpl.TOPIC, false); + .getCompactionThreshold(ServiceUnitStateChannelImpl.TOPIC); admin.topicPolicies() .setCompactionThreshold(ServiceUnitStateChannelImpl.TOPIC, 0); @@ -986,8 +973,10 @@ public void conflictAndCompactionTest() throws Exception { } finally { FieldUtils.writeDeclaredField(channel2, "inFlightStateWaitingTimeInMillis", 30 * 1000, true); - admin.topicPolicies() - .setCompactionThreshold(ServiceUnitStateChannelImpl.TOPIC, threshold); + if (threshold != null) { + admin.topicPolicies() + .setCompactionThreshold(ServiceUnitStateChannelImpl.TOPIC, threshold); + } } @@ -1593,7 +1582,7 @@ public void testActiveGetOwner() throws Exception { // verify getOwnerAsync times out because the owner is inactive now. long start = System.currentTimeMillis(); var ex = expectThrows(ExecutionException.class, () -> channel1.getOwnerAsync(bundle).get()); - assertTrue(ex.getCause() instanceof TimeoutException); + assertTrue(ex.getCause() instanceof IllegalStateException); assertTrue(System.currentTimeMillis() - start >= 1000); // simulate ownership cleanup(no selected owner) by the leader channel @@ -1793,6 +1782,8 @@ private static void overrideTableView(ServiceUnitStateChannel channel, String se throws IllegalAccessException { var tv = (TableViewImpl) FieldUtils.readField(channel, "tableview", true); + var getOwnerRequests = (Map>) + FieldUtils.readField(channel, "getOwnerRequests", true); var cache = (ConcurrentMap) FieldUtils.readField(tv, "data", true); if(val == null){ @@ -1800,6 +1791,7 @@ private static void overrideTableView(ServiceUnitStateChannel channel, String se } else { cache.put(serviceUnit, val); } + getOwnerRequests.clear(); } private static void cleanOpsCounters(ServiceUnitStateChannel channel) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java index f486370400c92..d25cba2bd1bdd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java @@ -20,8 +20,6 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.fail; import static org.testng.AssertJUnit.assertTrue; import com.google.common.collect.Sets; @@ -29,6 +27,7 @@ import lombok.Cleanup; import lombok.Data; import lombok.NoArgsConstructor; +import org.apache.commons.lang.reflect.FieldUtils; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; @@ -40,7 +39,6 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; -import java.util.concurrent.ExecutionException; @Test(groups = "broker") public class LoadDataStoreTest extends MockedPulsarServiceBaseTest { @@ -76,7 +74,7 @@ public void testPushGetAndRemove() throws Exception { @Cleanup LoadDataStore loadDataStore = - LoadDataStoreFactory.create(pulsar.getClient(), topic, MyClass.class); + LoadDataStoreFactory.create(pulsar, topic, MyClass.class); loadDataStore.startProducer(); loadDataStore.startTableView(); MyClass myClass1 = new MyClass("1", 1); @@ -110,7 +108,7 @@ public void testForEach() throws Exception { @Cleanup LoadDataStore loadDataStore = - LoadDataStoreFactory.create(pulsar.getClient(), topic, Integer.class); + LoadDataStoreFactory.create(pulsar, topic, Integer.class); loadDataStore.startProducer(); loadDataStore.startTableView(); @@ -135,7 +133,7 @@ public void testForEach() throws Exception { public void testTableViewRestart() throws Exception { String topic = TopicDomain.persistent + "://" + NamespaceName.SYSTEM_NAMESPACE + "/" + UUID.randomUUID(); LoadDataStore loadDataStore = - LoadDataStoreFactory.create(pulsar.getClient(), topic, Integer.class); + LoadDataStoreFactory.create(pulsar, topic, Integer.class); loadDataStore.startProducer(); loadDataStore.startTableView(); @@ -145,43 +143,26 @@ public void testTableViewRestart() throws Exception { loadDataStore.closeTableView(); loadDataStore.pushAsync("1", 2).get(); - Exception ex = null; - try { - loadDataStore.get("1"); - } catch (IllegalStateException e) { - ex = e; - } - assertNotNull(ex); - loadDataStore.startTableView(); Awaitility.await().untilAsserted(() -> assertEquals(loadDataStore.get("1").get(), 2)); + + loadDataStore.pushAsync("1", 3).get(); + FieldUtils.writeField(loadDataStore, "tableViewLastUpdateTimestamp", 0 , true); + Awaitility.await().untilAsserted(() -> assertEquals(loadDataStore.get("1").get(), 3)); } @Test public void testProducerStop() throws Exception { String topic = TopicDomain.persistent + "://" + NamespaceName.SYSTEM_NAMESPACE + "/" + UUID.randomUUID(); LoadDataStore loadDataStore = - LoadDataStoreFactory.create(pulsar.getClient(), topic, Integer.class); + LoadDataStoreFactory.create(pulsar, topic, Integer.class); loadDataStore.startProducer(); loadDataStore.pushAsync("1", 1).get(); loadDataStore.removeAsync("1").get(); loadDataStore.close(); - try { - loadDataStore.pushAsync("2", 2).get(); - fail(); - } catch (ExecutionException ex) { - assertTrue(ex.getCause() instanceof IllegalStateException); - } - try { - loadDataStore.removeAsync("2").get(); - fail(); - } catch (ExecutionException ex) { - assertTrue(ex.getCause() instanceof IllegalStateException); - } - loadDataStore.startProducer(); - loadDataStore.pushAsync("3", 3).get(); - loadDataStore.removeAsync("3").get(); + loadDataStore.pushAsync("2", 2).get(); + loadDataStore.removeAsync("2").get(); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java index 151c96d96aa40..64abd6d811b8e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java @@ -304,8 +304,14 @@ private void readTailMessages(Reader reader) { log.error("Reader {} was closed while reading tail messages.", reader.getTopic(), ex); } else { + // Retrying on the other exceptions such as NotConnectedException + try { + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } log.warn("Reader {} was interrupted while reading tail messages. " - + "Retrying..", reader.getTopic(), ex); + + "Retrying..", reader.getTopic(), ex); readTailMessages(reader); } return null; From 1603861e489404bc83361801bcf9da6d38544206 Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Tue, 20 Feb 2024 10:45:13 -0800 Subject: [PATCH 4/4] resolved comments --- .../store/TableViewLoadDataStoreImpl.java | 24 ++++++++++++++++--- .../ExtensibleLoadManagerTest.java | 1 - 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java index 454f7b4dd98ff..d916e91716223 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java @@ -23,8 +23,10 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.client.api.Producer; @@ -41,6 +43,8 @@ @Slf4j public class TableViewLoadDataStoreImpl implements LoadDataStore { + private static final long LOAD_DATA_REPORT_UPDATE_MAX_INTERVAL_MULTIPLIER_BEFORE_RESTART = 2; + private volatile TableView tableView; private volatile long tableViewLastUpdateTimestamp; @@ -173,13 +177,27 @@ private void validateProducer() { } private void validateTableView() { - if (tableView == null || System.currentTimeMillis() - tableViewLastUpdateTimestamp - > ((long) conf.getLoadBalancerReportUpdateMaxIntervalMinutes()) * 60 * 1000 * 2) { + String restartReason = null; + + if (tableView == null) { + restartReason = "table view is null"; + } else { + long inactiveDuration = System.currentTimeMillis() - tableViewLastUpdateTimestamp; + long threshold = TimeUnit.MINUTES.toMillis(conf.getLoadBalancerReportUpdateMaxIntervalMinutes()) + * LOAD_DATA_REPORT_UPDATE_MAX_INTERVAL_MULTIPLIER_BEFORE_RESTART; + if (inactiveDuration > threshold) { + restartReason = String.format("inactiveDuration=%d secs > threshold = %d secs", + TimeUnit.MILLISECONDS.toSeconds(inactiveDuration), + TimeUnit.MILLISECONDS.toSeconds(threshold)); + } + } + + if (StringUtils.isNotBlank(restartReason)) { tableViewLastUpdateTimestamp = 0; try { closeTableView(); startTableView(); - log.info("Restarted tableview on {}", topic); + log.info("Restarted tableview on {}, {}", topic, restartReason); } catch (Exception e) { log.error("Failed to restart tableview on {}", topic, e); throw new RuntimeException(e); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java index 954c1aa3773a7..4af5b527c2453 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java @@ -89,7 +89,6 @@ public void setup() throws Exception { "org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder"); brokerEnvs.put("forceDeleteNamespaceAllowed", "true"); brokerEnvs.put("loadBalancerDebugModeEnabled", "true"); - brokerEnvs.put("topicLevelPoliciesEnabled", "false"); brokerEnvs.put("PULSAR_MEM", "-Xmx512M"); spec.brokerEnvs(brokerEnvs); pulsarCluster = PulsarCluster.forSpec(spec);