diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index fe28d67227e8a..59c6674676101 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -49,6 +49,8 @@ import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision; import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter; import org.apache.pulsar.broker.loadbalance.extensions.reporter.TopBundleLoadDataReporter; +import org.apache.pulsar.broker.loadbalance.extensions.scheduler.LoadManagerScheduler; +import org.apache.pulsar.broker.loadbalance.extensions.scheduler.UnloadScheduler; import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore; import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException; import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory; @@ -86,6 +88,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { private LoadDataStore brokerLoadDataStore; private LoadDataStore topBundlesLoadDataStore; + private LoadManagerScheduler unloadScheduler; + @Getter private LoadManagerContext context; @@ -194,7 +198,9 @@ public void start() throws PulsarServerException { interval, interval, TimeUnit.MILLISECONDS); - // TODO: Start unload scheduler and bundle split scheduler + // TODO: Start bundle split scheduler. + this.unloadScheduler = new UnloadScheduler(pulsar.getLoadManagerExecutor(), context, serviceUnitStateChannel); + this.unloadScheduler.start(); this.started = true; } @@ -319,6 +325,7 @@ public void close() throws PulsarServerException { this.brokerLoadDataStore.close(); this.topBundlesLoadDataStore.close(); + this.unloadScheduler.close(); } catch (IOException ex) { throw new PulsarServerException(ex); } finally { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java new file mode 100644 index 0000000000000..5cdbd3027104d --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.scheduler; + +import com.google.common.annotations.VisibleForTesting; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel; +import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.Reflections; + +@Slf4j +public class UnloadScheduler implements LoadManagerScheduler { + + private final NamespaceUnloadStrategy namespaceUnloadStrategy; + + private final ScheduledExecutorService loadManagerExecutor; + + private final LoadManagerContext context; + + private final ServiceUnitStateChannel channel; + + private final ServiceConfiguration conf; + + private volatile ScheduledFuture task; + + private final Map recentlyUnloadedBundles; + + private final Map recentlyUnloadedBrokers; + + private volatile CompletableFuture currentRunningFuture = null; + + public UnloadScheduler(ScheduledExecutorService loadManagerExecutor, + LoadManagerContext context, + ServiceUnitStateChannel channel) { + this(loadManagerExecutor, context, channel, createNamespaceUnloadStrategy(context.brokerConfiguration())); + } + + @VisibleForTesting + protected UnloadScheduler(ScheduledExecutorService loadManagerExecutor, + LoadManagerContext context, + ServiceUnitStateChannel channel, + NamespaceUnloadStrategy strategy) { + this.namespaceUnloadStrategy = strategy; + this.recentlyUnloadedBundles = new HashMap<>(); + this.recentlyUnloadedBrokers = new HashMap<>(); + this.loadManagerExecutor = loadManagerExecutor; + this.context = context; + this.conf = context.brokerConfiguration(); + this.channel = channel; + } + + @Override + public synchronized void execute() { + boolean debugMode = conf.isLoadBalancerDebugModeEnabled() || log.isDebugEnabled(); + if (debugMode) { + log.info("Load balancer enabled: {}, Shedding enabled: {}.", + conf.isLoadBalancerEnabled(), conf.isLoadBalancerSheddingEnabled()); + } + if (!isLoadBalancerSheddingEnabled()) { + if (debugMode) { + log.info("The load balancer or load balancer shedding already disabled. Skipping."); + } + return; + } + if (currentRunningFuture != null && !currentRunningFuture.isDone()) { + if (debugMode) { + log.info("Auto namespace unload is running. Skipping."); + } + return; + } + // Remove bundles who have been unloaded for longer than the grace period from the recently unloaded map. + final long timeout = System.currentTimeMillis() + - TimeUnit.MINUTES.toMillis(conf.getLoadBalancerSheddingGracePeriodMinutes()); + recentlyUnloadedBundles.keySet().removeIf(e -> recentlyUnloadedBundles.get(e) < timeout); + + this.currentRunningFuture = channel.isChannelOwnerAsync().thenCompose(isChannelOwner -> { + if (!isChannelOwner) { + if (debugMode) { + log.info("Current broker is not channel owner. Skipping."); + } + return CompletableFuture.completedFuture(null); + } + return context.brokerRegistry().getAvailableBrokersAsync().thenCompose(availableBrokers -> { + if (debugMode) { + log.info("Available brokers: {}", availableBrokers); + } + if (availableBrokers.size() <= 1) { + log.info("Only 1 broker available: no load shedding will be performed. Skipping."); + return CompletableFuture.completedFuture(null); + } + final UnloadDecision unloadDecision = namespaceUnloadStrategy + .findBundlesForUnloading(context, recentlyUnloadedBundles, recentlyUnloadedBrokers); + if (debugMode) { + log.info("[{}] Unload decision result: {}", + namespaceUnloadStrategy.getClass().getSimpleName(), unloadDecision.toString()); + } + if (unloadDecision.getUnloads().isEmpty()) { + if (debugMode) { + log.info("[{}] Unload decision unloads is empty. Skipping.", + namespaceUnloadStrategy.getClass().getSimpleName()); + } + return CompletableFuture.completedFuture(null); + } + List> futures = new ArrayList<>(); + unloadDecision.getUnloads().forEach((broker, unload) -> { + log.info("[{}] Unloading bundle: {}", namespaceUnloadStrategy.getClass().getSimpleName(), unload); + futures.add(channel.publishUnloadEventAsync(unload).thenAccept(__ -> { + recentlyUnloadedBundles.put(unload.serviceUnit(), System.currentTimeMillis()); + recentlyUnloadedBrokers.put(unload.sourceBroker(), System.currentTimeMillis()); + })); + }); + return FutureUtil.waitForAll(futures).exceptionally(ex -> { + log.error("[{}] Namespace unload has exception.", + namespaceUnloadStrategy.getClass().getSimpleName(), ex); + return null; + }); + }); + }); + } + + @Override + public void start() { + long loadSheddingInterval = TimeUnit.MINUTES + .toMillis(conf.getLoadBalancerSheddingIntervalMinutes()); + this.task = loadManagerExecutor.scheduleAtFixedRate( + this::execute, loadSheddingInterval, loadSheddingInterval, TimeUnit.MILLISECONDS); + } + + @Override + public void close() { + if (this.task != null) { + this.task.cancel(false); + } + this.recentlyUnloadedBundles.clear(); + this.recentlyUnloadedBrokers.clear(); + } + + private static NamespaceUnloadStrategy createNamespaceUnloadStrategy(ServiceConfiguration conf) { + try { + return Reflections.createInstance(conf.getLoadBalancerLoadSheddingStrategy(), NamespaceUnloadStrategy.class, + Thread.currentThread().getContextClassLoader()); + } catch (Exception e) { + log.error("Error when trying to create namespace unload strategy: {}", + conf.getLoadBalancerLoadPlacementStrategy(), e); + } + log.error("create namespace unload strategy failed. using TransferShedder instead."); + return new TransferShedder(); + } + + private boolean isLoadBalancerSheddingEnabled() { + return conf.isLoadBalancerEnabled() && conf.isLoadBalancerSheddingEnabled(); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 81b41aa1687a7..1ef4f660e4af3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -99,6 +99,7 @@ import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage; +import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -125,6 +126,7 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest { private ServiceUnitStateChannelImpl channel2; @BeforeClass + @Override public void setup() throws Exception { conf.setAllowAutoTopicCreation(true); conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName()); @@ -186,6 +188,7 @@ protected void createNamespaceIfNotExists(PulsarResources resources, } @Override + @AfterClass protected void cleanup() throws Exception { pulsar1 = null; pulsar2.close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java new file mode 100644 index 0000000000000..cda5f81d81b93 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.scheduler; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.google.common.collect.Lists; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry; +import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel; +import org.apache.pulsar.broker.loadbalance.extensions.models.Unload; +import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision; +import org.apache.pulsar.client.util.ExecutorProvider; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +@Test(groups = "broker") +public class UnloadSchedulerTest { + + private ScheduledExecutorService loadManagerExecutor; + + public LoadManagerContext setupContext(){ + var ctx = getContext(); + ctx.brokerConfiguration().setLoadBalancerDebugModeEnabled(true); + return ctx; + } + + @BeforeMethod + public void setUp() { + this.loadManagerExecutor = Executors + .newSingleThreadScheduledExecutor(new ExecutorProvider.ExtendedThreadFactory("pulsar-load-manager")); + } + + @AfterMethod + public void tearDown() { + this.loadManagerExecutor.shutdown(); + } + + @Test(timeOut = 30 * 1000) + public void testExecuteSuccess() { + LoadManagerContext context = setupContext(); + BrokerRegistry registry = context.brokerRegistry(); + ServiceUnitStateChannel channel = mock(ServiceUnitStateChannel.class); + NamespaceUnloadStrategy unloadStrategy = mock(NamespaceUnloadStrategy.class); + doReturn(CompletableFuture.completedFuture(true)).when(channel).isChannelOwnerAsync(); + doReturn(CompletableFuture.completedFuture(Lists.newArrayList("broker-1", "broker-2"))) + .when(registry).getAvailableBrokersAsync(); + doReturn(CompletableFuture.completedFuture(null)).when(channel).publishUnloadEventAsync(any()); + UnloadDecision decision = new UnloadDecision(); + Unload unload = new Unload("broker-1", "bundle-1"); + decision.getUnloads().put("broker-1", unload); + doReturn(decision).when(unloadStrategy).findBundlesForUnloading(any(), any(), any()); + + UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor, context, channel, unloadStrategy); + + scheduler.execute(); + + verify(channel, times(1)).publishUnloadEventAsync(eq(unload)); + + // Test empty unload. + UnloadDecision emptyUnload = new UnloadDecision(); + doReturn(emptyUnload).when(unloadStrategy).findBundlesForUnloading(any(), any(), any()); + + scheduler.execute(); + + verify(channel, times(1)).publishUnloadEventAsync(eq(unload)); + } + + @Test(timeOut = 30 * 1000) + public void testExecuteMoreThenOnceWhenFirstNotDone() throws InterruptedException { + LoadManagerContext context = setupContext(); + BrokerRegistry registry = context.brokerRegistry(); + ServiceUnitStateChannel channel = mock(ServiceUnitStateChannel.class); + NamespaceUnloadStrategy unloadStrategy = mock(NamespaceUnloadStrategy.class); + doReturn(CompletableFuture.completedFuture(true)).when(channel).isChannelOwnerAsync(); + doAnswer(__ -> CompletableFuture.supplyAsync(() -> { + try { + // Delay 5 seconds to finish. + TimeUnit.SECONDS.sleep(5); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return Lists.newArrayList("broker-1", "broker-2"); + }, Executors.newFixedThreadPool(1))).when(registry).getAvailableBrokersAsync(); + UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor, context, channel, unloadStrategy); + + ExecutorService executorService = Executors.newFixedThreadPool(10); + CountDownLatch latch = new CountDownLatch(10); + for (int i = 0; i < 10; i++) { + executorService.execute(() -> { + scheduler.execute(); + latch.countDown(); + }); + } + latch.await(); + + verify(registry, times(1)).getAvailableBrokersAsync(); + } + + @Test(timeOut = 30 * 1000) + public void testDisableLoadBalancer() { + LoadManagerContext context = setupContext(); + context.brokerConfiguration().setLoadBalancerEnabled(false); + ServiceUnitStateChannel channel = mock(ServiceUnitStateChannel.class); + NamespaceUnloadStrategy unloadStrategy = mock(NamespaceUnloadStrategy.class); + UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor, context, channel, unloadStrategy); + + scheduler.execute(); + + verify(channel, times(0)).isChannelOwnerAsync(); + + context.brokerConfiguration().setLoadBalancerEnabled(true); + context.brokerConfiguration().setLoadBalancerSheddingEnabled(false); + scheduler.execute(); + + verify(channel, times(0)).isChannelOwnerAsync(); + } + + @Test(timeOut = 30 * 1000) + public void testNotChannelOwner() { + LoadManagerContext context = setupContext(); + context.brokerConfiguration().setLoadBalancerEnabled(false); + ServiceUnitStateChannel channel = mock(ServiceUnitStateChannel.class); + NamespaceUnloadStrategy unloadStrategy = mock(NamespaceUnloadStrategy.class); + UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor, context, channel, unloadStrategy); + doReturn(CompletableFuture.completedFuture(false)).when(channel).isChannelOwnerAsync(); + + scheduler.execute(); + + verify(context.brokerRegistry(), times(0)).getAvailableBrokersAsync(); + } + + public LoadManagerContext getContext(){ + var ctx = mock(LoadManagerContext.class); + var registry = mock(BrokerRegistry.class); + var conf = new ServiceConfiguration(); + doReturn(conf).when(ctx).brokerConfiguration(); + doReturn(registry).when(ctx).brokerRegistry(); + return ctx; + } +}