diff --git a/buildtools/pom.xml b/buildtools/pom.xml
index 3171af83930b0..37d974b532f75 100644
--- a/buildtools/pom.xml
+++ b/buildtools/pom.xml
@@ -47,7 +47,7 @@
4.1
10.14.2
3.1.2
- 4.1.111.Final
+ 4.1.113.Final
4.2.3
32.1.1-jre
1.10.12
diff --git a/conf/broker.conf b/conf/broker.conf
index d4d803530f570..540d556b1b1ea 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -463,6 +463,16 @@ dispatcherReadFailureBackoffMaxTimeInMs=60000
# The read failure backoff mandatory stop time in milliseconds. By default it is 0s.
dispatcherReadFailureBackoffMandatoryStopTimeInMs=0
+# On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered
+# out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff
+# delay. This parameter sets the initial backoff delay in milliseconds.
+dispatcherRetryBackoffInitialTimeInMs=100
+
+# On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered
+# out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff
+# delay. This parameter sets the maximum backoff delay in milliseconds.
+dispatcherRetryBackoffMaxTimeInMs=1000
+
# Precise dispathcer flow control according to history message number of each entry
preciseDispatcherFlowControl=false
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 773ec5497b781..55ab670b59880 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -279,6 +279,16 @@ dispatcherReadFailureBackoffMaxTimeInMs=60000
# The read failure backoff mandatory stop time in milliseconds. By default it is 0s.
dispatcherReadFailureBackoffMandatoryStopTimeInMs=0
+# On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered
+# out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff
+# delay. This parameter sets the initial backoff delay in milliseconds.
+dispatcherRetryBackoffInitialTimeInMs=100
+
+# On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered
+# out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff
+# delay. This parameter sets the maximum backoff delay in milliseconds.
+dispatcherRetryBackoffMaxTimeInMs=1000
+
# Precise dispatcher flow control according to history message number of each entry
preciseDispatcherFlowControl=false
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 544cec44f54e0..423f8850cab1d 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -289,33 +289,33 @@ The Apache Software License, Version 2.0
- org.apache.commons-commons-lang3-3.11.jar
- org.apache.commons-commons-text-1.10.0.jar
* Netty
- - io.netty-netty-buffer-4.1.111.Final.jar
- - io.netty-netty-codec-4.1.111.Final.jar
- - io.netty-netty-codec-dns-4.1.111.Final.jar
- - io.netty-netty-codec-http-4.1.111.Final.jar
- - io.netty-netty-codec-http2-4.1.111.Final.jar
- - io.netty-netty-codec-socks-4.1.111.Final.jar
- - io.netty-netty-codec-haproxy-4.1.111.Final.jar
- - io.netty-netty-common-4.1.111.Final.jar
- - io.netty-netty-handler-4.1.111.Final.jar
- - io.netty-netty-handler-proxy-4.1.111.Final.jar
- - io.netty-netty-resolver-4.1.111.Final.jar
- - io.netty-netty-resolver-dns-4.1.111.Final.jar
- - io.netty-netty-resolver-dns-classes-macos-4.1.111.Final.jar
- - io.netty-netty-resolver-dns-native-macos-4.1.111.Final-osx-aarch_64.jar
- - io.netty-netty-resolver-dns-native-macos-4.1.111.Final-osx-x86_64.jar
- - io.netty-netty-transport-4.1.111.Final.jar
- - io.netty-netty-transport-classes-epoll-4.1.111.Final.jar
- - io.netty-netty-transport-native-epoll-4.1.111.Final-linux-aarch_64.jar
- - io.netty-netty-transport-native-epoll-4.1.111.Final-linux-x86_64.jar
- - io.netty-netty-transport-native-unix-common-4.1.111.Final.jar
- - io.netty-netty-tcnative-boringssl-static-2.0.65.Final.jar
- - io.netty-netty-tcnative-boringssl-static-2.0.65.Final-linux-aarch_64.jar
- - io.netty-netty-tcnative-boringssl-static-2.0.65.Final-linux-x86_64.jar
- - io.netty-netty-tcnative-boringssl-static-2.0.65.Final-osx-aarch_64.jar
- - io.netty-netty-tcnative-boringssl-static-2.0.65.Final-osx-x86_64.jar
- - io.netty-netty-tcnative-boringssl-static-2.0.65.Final-windows-x86_64.jar
- - io.netty-netty-tcnative-classes-2.0.65.Final.jar
+ - io.netty-netty-buffer-4.1.113.Final.jar
+ - io.netty-netty-codec-4.1.113.Final.jar
+ - io.netty-netty-codec-dns-4.1.113.Final.jar
+ - io.netty-netty-codec-http-4.1.113.Final.jar
+ - io.netty-netty-codec-http2-4.1.113.Final.jar
+ - io.netty-netty-codec-socks-4.1.113.Final.jar
+ - io.netty-netty-codec-haproxy-4.1.113.Final.jar
+ - io.netty-netty-common-4.1.113.Final.jar
+ - io.netty-netty-handler-4.1.113.Final.jar
+ - io.netty-netty-handler-proxy-4.1.113.Final.jar
+ - io.netty-netty-resolver-4.1.113.Final.jar
+ - io.netty-netty-resolver-dns-4.1.113.Final.jar
+ - io.netty-netty-resolver-dns-classes-macos-4.1.113.Final.jar
+ - io.netty-netty-resolver-dns-native-macos-4.1.113.Final-osx-aarch_64.jar
+ - io.netty-netty-resolver-dns-native-macos-4.1.113.Final-osx-x86_64.jar
+ - io.netty-netty-transport-4.1.113.Final.jar
+ - io.netty-netty-transport-classes-epoll-4.1.113.Final.jar
+ - io.netty-netty-transport-native-epoll-4.1.113.Final-linux-aarch_64.jar
+ - io.netty-netty-transport-native-epoll-4.1.113.Final-linux-x86_64.jar
+ - io.netty-netty-transport-native-unix-common-4.1.113.Final.jar
+ - io.netty-netty-tcnative-boringssl-static-2.0.66.Final.jar
+ - io.netty-netty-tcnative-boringssl-static-2.0.66.Final-linux-aarch_64.jar
+ - io.netty-netty-tcnative-boringssl-static-2.0.66.Final-linux-x86_64.jar
+ - io.netty-netty-tcnative-boringssl-static-2.0.66.Final-osx-aarch_64.jar
+ - io.netty-netty-tcnative-boringssl-static-2.0.66.Final-osx-x86_64.jar
+ - io.netty-netty-tcnative-boringssl-static-2.0.66.Final-windows-x86_64.jar
+ - io.netty-netty-tcnative-classes-2.0.66.Final.jar
- io.netty.incubator-netty-incubator-transport-classes-io_uring-0.0.21.Final.jar
- io.netty.incubator-netty-incubator-transport-native-io_uring-0.0.21.Final-linux-x86_64.jar
- io.netty.incubator-netty-incubator-transport-native-io_uring-0.0.21.Final-linux-aarch_64.jar
diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt b/distribution/shell/src/assemble/LICENSE.bin.txt
index 728ad9f27fefa..d19d4473c41cc 100644
--- a/distribution/shell/src/assemble/LICENSE.bin.txt
+++ b/distribution/shell/src/assemble/LICENSE.bin.txt
@@ -344,35 +344,35 @@ The Apache Software License, Version 2.0
- commons-text-1.10.0.jar
- commons-compress-1.26.0.jar
* Netty
- - netty-buffer-4.1.111.Final.jar
- - netty-codec-4.1.111.Final.jar
- - netty-codec-dns-4.1.111.Final.jar
- - netty-codec-http-4.1.111.Final.jar
- - netty-codec-socks-4.1.111.Final.jar
- - netty-codec-haproxy-4.1.111.Final.jar
- - netty-common-4.1.111.Final.jar
- - netty-handler-4.1.111.Final.jar
- - netty-handler-proxy-4.1.111.Final.jar
- - netty-resolver-4.1.111.Final.jar
- - netty-resolver-dns-4.1.111.Final.jar
- - netty-transport-4.1.111.Final.jar
- - netty-transport-classes-epoll-4.1.111.Final.jar
- - netty-transport-native-epoll-4.1.111.Final-linux-aarch_64.jar
- - netty-transport-native-epoll-4.1.111.Final-linux-x86_64.jar
- - netty-transport-native-unix-common-4.1.111.Final.jar
- - netty-tcnative-boringssl-static-2.0.65.Final.jar
- - netty-tcnative-boringssl-static-2.0.65.Final-linux-aarch_64.jar
- - netty-tcnative-boringssl-static-2.0.65.Final-linux-x86_64.jar
- - netty-tcnative-boringssl-static-2.0.65.Final-osx-aarch_64.jar
- - netty-tcnative-boringssl-static-2.0.65.Final-osx-x86_64.jar
- - netty-tcnative-boringssl-static-2.0.65.Final-windows-x86_64.jar
- - netty-tcnative-classes-2.0.65.Final.jar
+ - netty-buffer-4.1.113.Final.jar
+ - netty-codec-4.1.113.Final.jar
+ - netty-codec-dns-4.1.113.Final.jar
+ - netty-codec-http-4.1.113.Final.jar
+ - netty-codec-socks-4.1.113.Final.jar
+ - netty-codec-haproxy-4.1.113.Final.jar
+ - netty-common-4.1.113.Final.jar
+ - netty-handler-4.1.113.Final.jar
+ - netty-handler-proxy-4.1.113.Final.jar
+ - netty-resolver-4.1.113.Final.jar
+ - netty-resolver-dns-4.1.113.Final.jar
+ - netty-transport-4.1.113.Final.jar
+ - netty-transport-classes-epoll-4.1.113.Final.jar
+ - netty-transport-native-epoll-4.1.113.Final-linux-aarch_64.jar
+ - netty-transport-native-epoll-4.1.113.Final-linux-x86_64.jar
+ - netty-transport-native-unix-common-4.1.113.Final.jar
+ - netty-tcnative-boringssl-static-2.0.66.Final.jar
+ - netty-tcnative-boringssl-static-2.0.66.Final-linux-aarch_64.jar
+ - netty-tcnative-boringssl-static-2.0.66.Final-linux-x86_64.jar
+ - netty-tcnative-boringssl-static-2.0.66.Final-osx-aarch_64.jar
+ - netty-tcnative-boringssl-static-2.0.66.Final-osx-x86_64.jar
+ - netty-tcnative-boringssl-static-2.0.66.Final-windows-x86_64.jar
+ - netty-tcnative-classes-2.0.66.Final.jar
- netty-incubator-transport-classes-io_uring-0.0.21.Final.jar
- netty-incubator-transport-native-io_uring-0.0.21.Final-linux-aarch_64.jar
- netty-incubator-transport-native-io_uring-0.0.21.Final-linux-x86_64.jar
- - netty-resolver-dns-classes-macos-4.1.111.Final.jar
- - netty-resolver-dns-native-macos-4.1.111.Final-osx-aarch_64.jar
- - netty-resolver-dns-native-macos-4.1.111.Final-osx-x86_64.jar
+ - netty-resolver-dns-classes-macos-4.1.113.Final.jar
+ - netty-resolver-dns-native-macos-4.1.113.Final-osx-aarch_64.jar
+ - netty-resolver-dns-native-macos-4.1.113.Final-osx-x86_64.jar
* Prometheus client
- simpleclient-0.16.0.jar
- simpleclient_log4j2-0.16.0.jar
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
index b60ae41670de7..a9c436f11c000 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloader.java
@@ -230,5 +230,9 @@ default void scanLedgers(OffloadedLedgerMetadataConsumer consumer,
Map offloadDriverMetadata) throws ManagedLedgerException {
throw ManagedLedgerException.getManagedLedgerException(new UnsupportedOperationException());
}
+
+ default boolean isAppendable() {
+ return true;
+ }
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index beccda60cc1b6..7846df838f6ed 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -94,6 +94,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.UpdatePropertiesCallback;
import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -2494,9 +2495,8 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture> p
100, TimeUnit.MILLISECONDS);
}
- private void maybeOffloadInBackground(CompletableFuture promise) {
- if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
- || config.getLedgerOffloader().getOffloadPolicies() == null) {
+ public void maybeOffloadInBackground(CompletableFuture promise) {
+ if (getOffloadPoliciesIfAppendable().isEmpty()) {
return;
}
@@ -2512,8 +2512,7 @@ private void maybeOffloadInBackground(CompletableFuture promise) {
private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds,
CompletableFuture finalPromise) {
- if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
- || config.getLedgerOffloader().getOffloadPolicies() == null) {
+ if (getOffloadPoliciesIfAppendable().isEmpty()) {
String msg = String.format("[%s] Nothing to offload due to offloader or offloadPolicies is NULL", name);
finalPromise.completeExceptionally(new IllegalArgumentException(msg));
return;
@@ -2615,6 +2614,16 @@ void internalTrimConsumedLedgers(CompletableFuture> promise) {
internalTrimLedgers(false, promise);
}
+ private Optional getOffloadPoliciesIfAppendable() {
+ LedgerOffloader ledgerOffloader = config.getLedgerOffloader();
+ if (ledgerOffloader == null
+ || !ledgerOffloader.isAppendable()
+ || ledgerOffloader.getOffloadPolicies() == null) {
+ return Optional.empty();
+ }
+ return Optional.ofNullable(ledgerOffloader.getOffloadPolicies());
+ }
+
@VisibleForTesting
List internalEvictOffloadedLedgers() {
int inactiveOffloadedLedgerEvictionTimeMs = config.getInactiveOffloadedLedgerEvictionTimeMs();
@@ -2668,10 +2677,7 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture> promise) {
List ledgersToDelete = new ArrayList<>();
List offloadedLedgersToDelete = new ArrayList<>();
- Optional optionalOffloadPolicies = Optional.ofNullable(config.getLedgerOffloader() != null
- && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
- ? config.getLedgerOffloader().getOffloadPolicies()
- : null);
+ Optional optionalOffloadPolicies = getOffloadPoliciesIfAppendable();
synchronized (this) {
if (log.isDebugEnabled()) {
log.debug("[{}] Start TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.keySet(),
@@ -3198,8 +3204,10 @@ public void offloadFailed(ManagedLedgerException e, Object ctx) {
@Override
public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ctx) {
- if (config.getLedgerOffloader() != null && config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE) {
- callback.offloadFailed(new ManagedLedgerException("NullLedgerOffloader"), ctx);
+ LedgerOffloader ledgerOffloader = config.getLedgerOffloader();
+ if (ledgerOffloader != null && !ledgerOffloader.isAppendable()) {
+ String msg = String.format("[%s] does not support offload", ledgerOffloader.getClass().getSimpleName());
+ callback.offloadFailed(new ManagedLedgerException(msg), ctx);
return;
}
PositionImpl requestOffloadTo = (PositionImpl) pos;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonAppendableLedgerOffloader.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonAppendableLedgerOffloader.java
new file mode 100644
index 0000000000000..261de9fc5438d
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonAppendableLedgerOffloader.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+
+public class NonAppendableLedgerOffloader implements LedgerOffloader {
+ private LedgerOffloader delegate;
+
+ public NonAppendableLedgerOffloader(LedgerOffloader delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public String getOffloadDriverName() {
+ return delegate.getOffloadDriverName();
+ }
+
+ @Override
+ public CompletableFuture offload(ReadHandle ledger,
+ UUID uid,
+ Map extraMetadata) {
+ return FutureUtil.failedFuture(new UnsupportedOperationException());
+ }
+
+ @Override
+ public CompletableFuture readOffloaded(long ledgerId, UUID uid,
+ Map offloadDriverMetadata) {
+ return delegate.readOffloaded(ledgerId, uid, offloadDriverMetadata);
+ }
+
+ @Override
+ public CompletableFuture deleteOffloaded(long ledgerId, UUID uid,
+ Map offloadDriverMetadata) {
+ return delegate.deleteOffloaded(ledgerId, uid, offloadDriverMetadata);
+ }
+
+ @Override
+ public OffloadPoliciesImpl getOffloadPolicies() {
+ return delegate.getOffloadPolicies();
+ }
+
+ @Override
+ public void close() {
+ delegate.close();
+ }
+
+ @Override
+ public boolean isAppendable() {
+ return false;
+ }
+}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
index 0e5e7cf4b5b55..7aa2deada3e8e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
@@ -70,4 +70,9 @@ public OffloadPoliciesImpl getOffloadPolicies() {
public void close() {
}
+
+ @Override
+ public boolean isAppendable() {
+ return false;
+ }
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index fc47b8c3f72a4..7b648c11e33b0 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -3849,7 +3849,7 @@ public void testDoNotGetOffloadPoliciesMultipleTimesWhenTrimLedgers() throws Exc
config.setLedgerOffloader(ledgerOffloader);
ledger.internalTrimConsumedLedgers(Futures.NULL_PROMISE);
- verify(ledgerOffloader, times(1)).getOffloadPolicies();
+ verify(ledgerOffloader, times(1)).isAppendable();
}
@Test(timeOut = 30000)
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
index 59e815fb1b4a5..e8b84fc206f3c 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java
@@ -29,6 +29,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
@@ -57,6 +58,8 @@
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.OffloadedLedgerHandle;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.util.MockClock;
@@ -64,12 +67,34 @@
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
+import org.awaitility.Awaitility;
import org.testng.Assert;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
public class OffloadPrefixReadTest extends MockedBookKeeperTestCase {
- @Test
- public void testOffloadRead() throws Exception {
+
+ private final String offloadTypeAppendable = "NonAppendable";
+
+ @Override
+ protected void initManagedLedgerFactoryConfig(ManagedLedgerFactoryConfig config) {
+ super.initManagedLedgerFactoryConfig(config);
+ // disable cache.
+ config.setMaxCacheSize(0);
+ }
+
+ @DataProvider(name = "offloadAndDeleteTypes")
+ public Object[][] offloadAndDeleteTypes() {
+ return new Object[][]{
+ {"normal", true},
+ {"normal", false},
+ {offloadTypeAppendable, true},
+ {offloadTypeAppendable, false},
+ };
+ }
+
+ @Test(dataProvider = "offloadAndDeleteTypes")
+ public void testOffloadRead(String offloadType, boolean deleteMl) throws Exception {
MockLedgerOffloader offloader = spy(MockLedgerOffloader.class);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
@@ -92,6 +117,10 @@ public void testOffloadRead() throws Exception {
Assert.assertTrue(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getComplete());
Assert.assertFalse(ledger.getLedgersInfoAsList().get(2).getOffloadContext().getComplete());
+ if (offloadTypeAppendable.equals(offloadType)) {
+ config.setLedgerOffloader(new NonAppendableLedgerOffloader(offloader));
+ }
+
UUID firstLedgerUUID = new UUID(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getUidMsb(),
ledger.getLedgersInfoAsList().get(0).getOffloadContext().getUidLsb());
UUID secondLedgerUUID = new UUID(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getUidMsb(),
@@ -119,13 +148,30 @@ public void testOffloadRead() throws Exception {
verify(offloader, times(2))
.readOffloaded(anyLong(), (UUID) any(), anyMap());
- ledger.close();
- // Ensure that all the read handles had been closed
- assertEquals(offloader.openedReadHandles.get(), 0);
+ if (!deleteMl) {
+ ledger.close();
+ // Ensure that all the read handles had been closed
+ assertEquals(offloader.openedReadHandles.get(), 0);
+ } else {
+ // Verify: the ledger offloaded will be deleted after managed ledger is deleted.
+ ledger.delete();
+ Awaitility.await().untilAsserted(() -> {
+ assertTrue(offloader.offloads.size() <= 1);
+ assertTrue(ledger.ledgers.size() <= 1);
+ });
+ }
}
- @Test
- public void testBookkeeperFirstOffloadRead() throws Exception {
+ @DataProvider(name = "offloadTypes")
+ public Object[][] offloadTypes() {
+ return new Object[][]{
+ {"normal"},
+ {offloadTypeAppendable},
+ };
+ }
+
+ @Test(dataProvider = "offloadTypes")
+ public void testBookkeeperFirstOffloadRead(String offloadType) throws Exception {
MockLedgerOffloader offloader = spy(MockLedgerOffloader.class);
MockClock clock = new MockClock();
offloader.getOffloadPolicies()
@@ -190,6 +236,10 @@ public void testBookkeeperFirstOffloadRead() throws Exception {
Assert.assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getBookkeeperDeleted());
Assert.assertTrue(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getBookkeeperDeleted());
+ if (offloadTypeAppendable.equals(offloadType)) {
+ config.setLedgerOffloader(new NonAppendableLedgerOffloader(offloader));
+ }
+
for (Entry e : cursor.readEntries(10)) {
Assert.assertEquals(new String(e.getData()), "entry-" + i++);
}
@@ -199,6 +249,56 @@ public void testBookkeeperFirstOffloadRead() throws Exception {
.readOffloaded(anyLong(), (UUID) any(), anyMap());
verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID), anyMap());
+ // Verify: the ledger offloaded will be trimmed after if no backlog.
+ while (cursor.hasMoreEntries()) {
+ cursor.readEntries(1);
+ }
+ config.setRetentionTime(0, TimeUnit.MILLISECONDS);
+ config.setRetentionSizeInMB(0);
+ CompletableFuture trimFuture = new CompletableFuture();
+ ledger.trimConsumedLedgersInBackground(trimFuture);
+ trimFuture.join();
+ Awaitility.await().untilAsserted(() -> {
+ assertTrue(offloader.offloads.size() <= 1);
+ assertTrue(ledger.ledgers.size() <= 1);
+ });
+
+ // cleanup.
+ ledger.delete();
+ }
+
+
+
+ @Test
+ public void testSkipOffloadIfReadOnly() throws Exception {
+ LedgerOffloader ol = new NonAppendableLedgerOffloader(spy(MockLedgerOffloader.class));
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(10);
+ config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+ config.setRetentionTime(10, TimeUnit.MINUTES);
+ config.setRetentionSizeInMB(10);
+ config.setLedgerOffloader(ol);
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger", config);
+
+ for (int i = 0; i < 25; i++) {
+ String content = "entry-" + i;
+ ledger.addEntry(content.getBytes());
+ }
+ assertEquals(ledger.getLedgersInfoAsList().size(), 3);
+
+ try {
+ ledger.offloadPrefix(ledger.getLastConfirmedEntry());
+ } catch (ManagedLedgerException mle) {
+ assertTrue(mle.getMessage().contains("does not support offload"));
+ }
+
+ assertEquals(ledger.getLedgersInfoAsList().size(), 3);
+ Assert.assertFalse(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getComplete());
+ Assert.assertFalse(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getComplete());
+ Assert.assertFalse(ledger.getLedgersInfoAsList().get(2).getOffloadContext().getComplete());
+
+ // cleanup.
+ ledger.delete();
}
@@ -219,7 +319,7 @@ static class MockLedgerOffloader implements LedgerOffloader {
Set offloadedLedgers() {
return offloads.values().stream().map(ReadHandle::getId).collect(Collectors.toSet());
}
-
+
@Override
public String getOffloadDriverName() {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index 2cdb14fb71e41..53fa172558509 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -94,7 +94,7 @@ public void testNullOffloader() throws Exception {
ledger.offloadPrefix(p);
fail("Should have thrown an exception");
} catch (ManagedLedgerException e) {
- assertEquals(e.getMessage(), "NullLedgerOffloader");
+ assertTrue(e.getMessage().contains("does not support offload"));
}
assertEquals(ledger.getLedgersInfoAsList().size(), 5);
assertEquals(ledger.getLedgersInfoAsList().stream()
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
index 645563eb78c4d..c7685cfaa6594 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
@@ -83,13 +83,17 @@ public final void setUp(Method method) throws Exception {
}
ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
- // increase default cache eviction interval so that caching could be tested with less flakyness
- managedLedgerFactoryConfig.setCacheEvictionIntervalMs(200);
+ initManagedLedgerFactoryConfig(managedLedgerFactoryConfig);
factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
setUpTestCase();
}
+ protected void initManagedLedgerFactoryConfig(ManagedLedgerFactoryConfig config) {
+ // increase default cache eviction interval so that caching could be tested with less flakyness
+ config.setCacheEvictionIntervalMs(200);
+ }
+
protected void setUpTestCase() throws Exception {
}
diff --git a/pom.xml b/pom.xml
index 36d8ae6c63143..d3dd0cb940723 100644
--- a/pom.xml
+++ b/pom.xml
@@ -143,7 +143,7 @@ flexible messaging model and an intuitive client API.
1.1.10.5
4.1.12.1
5.1.0
- 4.1.111.Final
+ 4.1.113.Final
0.0.21.Final
9.4.54.v20240208
2.5.2
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 5321a420fb4b4..1a6a85451e6bb 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1174,6 +1174,20 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
)
private int dispatcherReadFailureBackoffMandatoryStopTimeInMs = 0;
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered "
+ + "out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff "
+ + "delay. This parameter sets the initial backoff delay in milliseconds.")
+ private int dispatcherRetryBackoffInitialTimeInMs = 100;
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered "
+ + "out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff "
+ + "delay. This parameter sets the maximum backoff delay in milliseconds.")
+ private int dispatcherRetryBackoffMaxTimeInMs = 1000;
+
@FieldContext(
dynamic = true,
category = CATEGORY_SERVER,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 20e2affc2f9ef..dd66fc957d8c7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -274,6 +274,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private final ExecutorProvider transactionExecutorProvider;
private String brokerId;
private final CompletableFuture readyForIncomingRequestsFuture = new CompletableFuture<>();
+ private final List pendingTasksBeforeReadyForIncomingRequests = new ArrayList<>();
public enum State {
Init, Started, Closing, Closed
@@ -903,7 +904,13 @@ public void start() throws PulsarServerException {
this.metricsGenerator = new MetricsGenerator(this);
// the broker is ready to accept incoming requests by Pulsar binary protocol and http/https
- readyForIncomingRequestsFuture.complete(null);
+ final List runnables;
+ synchronized (pendingTasksBeforeReadyForIncomingRequests) {
+ runnables = new ArrayList<>(pendingTasksBeforeReadyForIncomingRequests);
+ pendingTasksBeforeReadyForIncomingRequests.clear();
+ readyForIncomingRequestsFuture.complete(null);
+ }
+ runnables.forEach(Runnable::run);
// Initialize the message protocol handlers.
// start the protocol handlers only after the broker is ready,
@@ -962,7 +969,21 @@ public void start() throws PulsarServerException {
}
public void runWhenReadyForIncomingRequests(Runnable runnable) {
- readyForIncomingRequestsFuture.thenRun(runnable);
+ // Here we don't call the thenRun() methods because CompletableFuture maintains a stack for pending callbacks,
+ // not a queue. Once the future is complete, the pending callbacks will be executed in reverse order of
+ // when they were added.
+ final boolean addedToPendingTasks;
+ synchronized (pendingTasksBeforeReadyForIncomingRequests) {
+ if (readyForIncomingRequestsFuture.isDone()) {
+ addedToPendingTasks = false;
+ } else {
+ pendingTasksBeforeReadyForIncomingRequests.add(runnable);
+ addedToPendingTasks = true;
+ }
+ }
+ if (!addedToPendingTasks) {
+ runnable.run();
+ }
}
public void waitUntilReadyForIncomingRequests() throws ExecutionException, InterruptedException {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index d61c6115dccb4..b473d2dbf1f52 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -606,7 +606,7 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
&& pulsar().getConfig().isCreateTopicToRemoteClusterForReplication()) {
internalCreatePartitionedTopicToReplicatedClustersInBackground(numPartitions);
log.info("[{}] Successfully created partitioned for topic {} for the remote clusters",
- clientAppId());
+ clientAppId(), topicName);
} else {
log.info("[{}] Skip creating partitioned for topic {} for the remote clusters",
clientAppId(), topicName);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index 9d039a49a5c7c..4f7d0747f2822 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -27,11 +27,13 @@
import io.swagger.annotations.ExampleProperty;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -64,6 +66,7 @@
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.FailureDomainImpl;
import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
+import org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicyImpl;
import org.apache.pulsar.common.util.FutureUtil;
@@ -681,10 +684,13 @@ public void setNamespaceIsolationPolicy(
.setIsolationDataWithCreateAsync(cluster, (p) -> Collections.emptyMap())
.thenApply(__ -> new NamespaceIsolationPolicies()))
).thenCompose(nsIsolationPolicies -> {
+ NamespaceIsolationDataImpl oldPolicy = nsIsolationPolicies
+ .getPolicies().getOrDefault(policyName, null);
nsIsolationPolicies.setPolicy(policyName, policyData);
return namespaceIsolationPolicies()
- .setIsolationDataAsync(cluster, old -> nsIsolationPolicies.getPolicies());
- }).thenCompose(__ -> filterAndUnloadMatchedNamespaceAsync(cluster, policyData))
+ .setIsolationDataAsync(cluster, old -> nsIsolationPolicies.getPolicies())
+ .thenApply(__ -> oldPolicy);
+ }).thenCompose(oldPolicy -> filterAndUnloadMatchedNamespaceAsync(cluster, policyData, oldPolicy))
.thenAccept(__ -> {
log.info("[{}] Successful to update clusters/{}/namespaceIsolationPolicies/{}.",
clientAppId(), cluster, policyName);
@@ -719,7 +725,13 @@ public void setNamespaceIsolationPolicy(
* Get matched namespaces; call unload for each namespaces.
*/
private CompletableFuture filterAndUnloadMatchedNamespaceAsync(String cluster,
- NamespaceIsolationDataImpl policyData) {
+ NamespaceIsolationDataImpl policyData,
+ NamespaceIsolationDataImpl oldPolicy) {
+ // exit early if none of the namespaces need to be unloaded
+ if (NamespaceIsolationPolicyUnloadScope.none.equals(policyData.getUnloadScope())) {
+ return CompletableFuture.completedFuture(null);
+ }
+
PulsarAdmin adminClient;
try {
adminClient = pulsar().getAdminClient();
@@ -728,6 +740,7 @@ private CompletableFuture filterAndUnloadMatchedNamespaceAsync(String clus
}
// compile regex patterns once
List namespacePatterns = policyData.getNamespaces().stream().map(Pattern::compile).toList();
+ // TODO for 4.x, we should include both old and new namespace regex pattern for unload `all_matching` option
return adminClient.tenants().getTenantsAsync().thenCompose(tenants -> {
List>> filteredNamespacesForEachTenant = tenants.stream()
.map(tenant -> adminClient.namespaces().getNamespacesAsync(tenant).thenCompose(namespaces -> {
@@ -753,6 +766,41 @@ private CompletableFuture filterAndUnloadMatchedNamespaceAsync(String clus
if (CollectionUtils.isEmpty(shouldUnloadNamespaces)) {
return CompletableFuture.completedFuture(null);
}
+ // If unload type is 'changed', we need to figure out a further subset of namespaces whose placement might
+ // actually have been changed.
+
+ log.debug("Old policy: {} ; new policy: {}", oldPolicy, policyData);
+ if (oldPolicy != null && NamespaceIsolationPolicyUnloadScope.changed.equals(policyData.getUnloadScope())) {
+ // We also compare that the previous primary broker list is same as current, in case all namespaces need
+ // to be placed again anyway.
+ if (CollectionUtils.isEqualCollection(oldPolicy.getPrimary(), policyData.getPrimary())) {
+ // list is same, so we continue finding the changed namespaces.
+
+ // We create a union regex list contains old + new regexes
+ Set combinedNamespaces = new HashSet<>(oldPolicy.getNamespaces());
+ combinedNamespaces.addAll(policyData.getNamespaces());
+ // We create a intersection of the old and new regexes. These won't need to be unloaded
+ Set commonNamespaces = new HashSet<>(oldPolicy.getNamespaces());
+ commonNamespaces.retainAll(policyData.getNamespaces());
+
+ log.debug("combined regexes: {}; common regexes:{}", combinedNamespaces, combinedNamespaces);
+
+ // Find the changed regexes (new - new ∩ old). TODO for 4.x, make this (new U old - new ∩ old)
+ combinedNamespaces.removeAll(commonNamespaces);
+
+ log.debug("changed regexes: {}", commonNamespaces);
+
+ // Now we further filter the filtered namespaces based on this combinedNamespaces set
+ shouldUnloadNamespaces = shouldUnloadNamespaces.stream()
+ .filter(name -> combinedNamespaces.stream()
+ .map(Pattern::compile)
+ .anyMatch(pattern -> pattern.matcher(name).matches())
+ ).toList();
+
+ }
+ }
+ // unload type is either null or not in (changed, none), so we proceed to unload all namespaces
+ // TODO - default in 4.x should become `changed`
List> futures = shouldUnloadNamespaces.stream()
.map(namespaceName -> adminClient.namespaces().unloadAsync(namespaceName))
.collect(Collectors.toList());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 9dc4967cd741a..e0fd738a408e2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -97,7 +97,10 @@
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.common.util.Backoff;
+import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
import org.slf4j.Logger;
@@ -120,6 +123,10 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
public static final long COMPACTION_THRESHOLD = 5 * 1024 * 1024;
+ public static final int STARTUP_TIMEOUT_SECONDS = 30;
+
+ public static final int MAX_RETRY = 5;
+
private static final String ELECTION_ROOT = "/loadbalance/extension/leader";
public static final Set INTERNAL_TOPICS =
@@ -347,10 +354,43 @@ public void start() throws PulsarServerException {
this.serviceUnitStateChannel.listen(splitManager);
this.leaderElectionService.start();
pulsar.runWhenReadyForIncomingRequests(() -> {
- try {
- this.serviceUnitStateChannel.start();
- } catch (Exception e) {
- failStarting(e);
+ Backoff backoff = new BackoffBuilder()
+ .setInitialTime(100, TimeUnit.MILLISECONDS)
+ .setMax(STARTUP_TIMEOUT_SECONDS, TimeUnit.SECONDS)
+ .create();
+ int retry = 0;
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ brokerRegistry.register();
+ this.serviceUnitStateChannel.start();
+ break;
+ } catch (Exception e) {
+ log.warn("The broker:{} failed to start service unit state channel. Retrying {} th ...",
+ pulsar.getBrokerId(), ++retry, e);
+ try {
+ Thread.sleep(backoff.next());
+ } catch (InterruptedException ex) {
+ log.warn("Interrupted while sleeping.");
+ // preserve thread's interrupt status
+ Thread.currentThread().interrupt();
+ try {
+ pulsar.close();
+ } catch (PulsarServerException exc) {
+ log.error("Failed to close pulsar service.", exc);
+ }
+ return;
+ }
+ failStarting(e);
+ if (retry >= MAX_RETRY) {
+ log.error("Failed to start the service unit state channel after retry {} th. "
+ + "Closing pulsar service.", retry, e);
+ try {
+ pulsar.close();
+ } catch (PulsarServerException ex) {
+ log.error("Failed to close pulsar service.", ex);
+ }
+ }
+ }
}
});
this.antiAffinityGroupPolicyHelper =
@@ -444,8 +484,15 @@ private void failStarting(Exception ex) {
this.brokerRegistry, ex);
if (this.brokerRegistry != null) {
try {
- brokerRegistry.close();
- } catch (PulsarServerException e) {
+ brokerRegistry.unregister();
+ } catch (MetadataStoreException e) {
+ // ignore
+ }
+ }
+ if (this.serviceUnitStateChannel != null) {
+ try {
+ serviceUnitStateChannel.close();
+ } catch (IOException e) {
// ignore
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index d63121e38783c..884b0e9ea000a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -90,7 +90,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
-import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
+import org.apache.bookkeeper.mledger.impl.NonAppendableLedgerOffloader;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
@@ -2003,28 +2003,24 @@ private CompletableFuture getManagedLedgerConfig(@Nonnull T
topicLevelOffloadPolicies,
OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)),
getPulsar().getConfig().getProperties());
- if (NamespaceService.isSystemServiceNamespace(namespace.toString())
- || SystemTopicNames.isSystemTopic(topicName)) {
- /*
- Avoid setting broker internal system topics using off-loader because some of them are the
- preconditions of other topics. The slow replying log speed will cause a delay in all the topic
- loading.(timeout)
- */
- managedLedgerConfig.setLedgerOffloader(NullLedgerOffloader.INSTANCE);
- } else {
- if (topicLevelOffloadPolicies != null) {
- try {
- LedgerOffloader topicLevelLedgerOffLoader =
- pulsar().createManagedLedgerOffloader(offloadPolicies);
- managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
- } catch (PulsarServerException e) {
- throw new RuntimeException(e);
- }
- } else {
- //If the topic level policy is null, use the namespace level
- managedLedgerConfig
- .setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
+ if (topicLevelOffloadPolicies != null) {
+ try {
+ LedgerOffloader topicLevelLedgerOffLoader = pulsar().createManagedLedgerOffloader(offloadPolicies);
+ managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
+ } catch (PulsarServerException e) {
+ throw new RuntimeException(e);
}
+ } else {
+ //If the topic level policy is null, use the namespace level
+ managedLedgerConfig
+ .setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
+ }
+ if (managedLedgerConfig.getLedgerOffloader() != null
+ && managedLedgerConfig.getLedgerOffloader().isAppendable()
+ && (NamespaceService.isSystemServiceNamespace(namespace.toString())
+ || SystemTopicNames.isSystemTopic(topicName))) {
+ managedLedgerConfig.setLedgerOffloader(
+ new NonAppendableLedgerOffloader(managedLedgerConfig.getLedgerOffloader()));
}
managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index ae844b5784456..cd5acd069e747 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -47,6 +47,7 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
@@ -84,7 +85,6 @@
*/
public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMultipleConsumers
implements Dispatcher, ReadEntriesCallback {
-
protected final PersistentTopic topic;
protected final ManagedCursor cursor;
protected volatile Range lastIndividualDeletedRangeFromCursorRecovery;
@@ -122,7 +122,8 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
private AtomicBoolean isRescheduleReadInProgress = new AtomicBoolean(false);
protected final ExecutorService dispatchMessagesThread;
private final SharedConsumerAssignor assignor;
-
+ protected int lastNumberOfEntriesDispatched;
+ private final Backoff retryBackoff;
protected enum ReadType {
Normal, Replay
}
@@ -147,10 +148,15 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
this.initializeDispatchRateLimiterIfNeeded();
this.assignor = new SharedConsumerAssignor(this::getNextConsumer, this::addMessageToReplay);
+ ServiceConfiguration serviceConfiguration = topic.getBrokerService().pulsar().getConfiguration();
this.readFailureBackoff = new Backoff(
- topic.getBrokerService().pulsar().getConfiguration().getDispatcherReadFailureBackoffInitialTimeInMs(),
+ serviceConfiguration.getDispatcherReadFailureBackoffInitialTimeInMs(),
TimeUnit.MILLISECONDS,
1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
+ retryBackoff = new Backoff(
+ serviceConfiguration.getDispatcherRetryBackoffInitialTimeInMs(), TimeUnit.MILLISECONDS,
+ serviceConfiguration.getDispatcherRetryBackoffMaxTimeInMs(), TimeUnit.MILLISECONDS,
+ 0, TimeUnit.MILLISECONDS);
}
@Override
@@ -392,16 +398,23 @@ public synchronized void readMoreEntries() {
@Override
protected void reScheduleRead() {
+ reScheduleReadInMs(MESSAGE_RATE_BACKOFF_MS);
+ }
+
+ protected void reScheduleReadInMs(long readAfterMs) {
if (isRescheduleReadInProgress.compareAndSet(false, true)) {
if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] Reschedule message read in {} ms", topic.getName(), name, MESSAGE_RATE_BACKOFF_MS);
+ log.debug("[{}] [{}] Reschedule message read in {} ms", topic.getName(), name, readAfterMs);
+ }
+ Runnable runnable = () -> {
+ isRescheduleReadInProgress.set(false);
+ readMoreEntries();
+ };
+ if (readAfterMs > 0) {
+ topic.getBrokerService().executor().schedule(runnable, readAfterMs, TimeUnit.MILLISECONDS);
+ } else {
+ topic.getBrokerService().executor().execute(runnable);
}
- topic.getBrokerService().executor().schedule(
- () -> {
- isRescheduleReadInProgress.set(false);
- readMoreEntries();
- },
- MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
}
}
@@ -612,8 +625,8 @@ public final synchronized void readEntriesComplete(List entries, Object c
log.debug("[{}] Distributing {} messages to {} consumers", name, entries.size(), consumerList.size());
}
- long size = entries.stream().mapToLong(Entry::getLength).sum();
- updatePendingBytesToDispatch(size);
+ long totalBytesSize = entries.stream().mapToLong(Entry::getLength).sum();
+ updatePendingBytesToDispatch(totalBytesSize);
// dispatch messages to a separate thread, but still in order for this subscription
// sendMessagesToConsumers is responsible for running broker-side filters
@@ -623,19 +636,28 @@ public final synchronized void readEntriesComplete(List entries, Object c
// in a separate thread, and we want to prevent more reads
acquireSendInProgress();
dispatchMessagesThread.execute(() -> {
- if (sendMessagesToConsumers(readType, entries, false)) {
- updatePendingBytesToDispatch(-size);
- readMoreEntries();
- } else {
- updatePendingBytesToDispatch(-size);
- }
+ handleSendingMessagesAndReadingMore(readType, entries, false, totalBytesSize);
});
} else {
- if (sendMessagesToConsumers(readType, entries, true)) {
- updatePendingBytesToDispatch(-size);
- readMoreEntriesAsync();
- } else {
- updatePendingBytesToDispatch(-size);
+ handleSendingMessagesAndReadingMore(readType, entries, true, totalBytesSize);
+ }
+ }
+
+ private synchronized void handleSendingMessagesAndReadingMore(ReadType readType, List entries,
+ boolean needAcquireSendInProgress,
+ long totalBytesSize) {
+ boolean triggerReadingMore = sendMessagesToConsumers(readType, entries, needAcquireSendInProgress);
+ int entriesDispatched = lastNumberOfEntriesDispatched;
+ updatePendingBytesToDispatch(-totalBytesSize);
+ if (triggerReadingMore) {
+ if (entriesDispatched > 0) {
+ // Reset the backoff when we successfully dispatched messages
+ retryBackoff.reset();
+ // Call readMoreEntries in the same thread to trigger the next read
+ readMoreEntries();
+ } else if (entriesDispatched == 0) {
+ // If no messages were dispatched, we need to reschedule a new read with an increasing backoff delay
+ reScheduleReadInMs(retryBackoff.next());
}
}
}
@@ -674,6 +696,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
if (needTrimAckedMessages()) {
cursor.trimDeletedEntries(entries);
}
+ lastNumberOfEntriesDispatched = 0;
int entriesToDispatch = entries.size();
// Trigger read more messages
@@ -775,6 +798,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
totalBytesSent += sendMessageInfo.getTotalBytes();
}
+ lastNumberOfEntriesDispatched = (int) totalEntries;
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);
if (entriesToDispatch > 0) {
@@ -788,6 +812,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
entry.release();
});
}
+
return true;
}
@@ -849,6 +874,7 @@ private boolean sendChunkedMessagesToConsumers(ReadType readType,
totalBytesSent += sendMessageInfo.getTotalBytes();
}
+ lastNumberOfEntriesDispatched = (int) totalEntries;
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);
return numConsumers.get() == 0; // trigger a new readMoreEntries() call
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 2df9f38531f5d..397cb7226b767 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -178,6 +178,7 @@ protected Map> initialValue() throws Exception {
@Override
protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List entries) {
+ lastNumberOfEntriesDispatched = 0;
long totalMessagesSent = 0;
long totalBytesSent = 0;
long totalEntries = 0;
@@ -312,6 +313,8 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
}
}
+
+ lastNumberOfEntriesDispatched = (int) totalEntries;
// acquire message-dispatch permits for already delivered messages
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 0254cf06f4ead..fd3dec81196a5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -21,6 +21,7 @@
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
+import static org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
@@ -52,6 +53,7 @@
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.ws.rs.NotAcceptableException;
@@ -108,27 +110,7 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
-import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
-import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
-import org.apache.pulsar.common.policies.data.BacklogQuota;
-import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
-import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
-import org.apache.pulsar.common.policies.data.BundlesData;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ConsumerStats;
-import org.apache.pulsar.common.policies.data.EntryFilters;
-import org.apache.pulsar.common.policies.data.FailureDomain;
-import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
-import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
-import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
-import org.apache.pulsar.common.policies.data.PersistencePolicies;
-import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
-import org.apache.pulsar.common.policies.data.RetentionPolicies;
-import org.apache.pulsar.common.policies.data.SubscriptionStats;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.apache.pulsar.common.policies.data.TopicStats;
-import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.common.policies.data.*;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -3494,4 +3476,188 @@ public void testGetStatsIfPartitionNotExists() throws Exception {
// cleanup.
admin.topics().deletePartitionedTopic(partitionedTp);
}
+
+ private NamespaceIsolationData createPolicyData(NamespaceIsolationPolicyUnloadScope scope, List namespaces,
+ List primaryBrokers
+ ) {
+ // setup ns-isolation-policy in both the clusters.
+ Map parameters1 = new HashMap<>();
+ parameters1.put("min_limit", "1");
+ parameters1.put("usage_threshold", "100");
+ List nsRegexList = new ArrayList<>(namespaces);
+
+ return NamespaceIsolationData.builder()
+ // "prop-ig/ns1" is present in test cluster, policy set on test2 should work
+ .namespaces(nsRegexList)
+ .primary(primaryBrokers)
+ .secondary(Collections.singletonList(""))
+ .autoFailoverPolicy(AutoFailoverPolicyData.builder()
+ .policyType(AutoFailoverPolicyType.min_available)
+ .parameters(parameters1)
+ .build())
+ .unloadScope(scope)
+ .build();
+ }
+
+ private boolean allTopicsUnloaded(List topics) {
+ for (String topic : topics) {
+ if (pulsar.getBrokerService().getTopicReference(topic).isPresent()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void loadTopics(List topics) throws PulsarClientException, ExecutionException, InterruptedException {
+ // create a topic by creating a producer so that the topic is present on the broker
+ for (String topic : topics) {
+ Producer producer = pulsarClient.newProducer().topic(topic).create();
+ producer.close();
+ pulsar.getBrokerService().getTopicIfExists(topic).get();
+ }
+
+ // All namespaces are loaded onto broker. Assert that
+ for (String topic : topics) {
+ assertTrue(pulsar.getBrokerService().getTopicReference(topic).isPresent());
+ }
+ }
+
+ /**
+ * Validates that the namespace isolation policy set and update is unloading only the relevant namespaces based on
+ * the unload scope provided.
+ *
+ * @param topicType persistent or non persistent.
+ * @param policyName policy name.
+ * @param nsPrefix unique namespace prefix.
+ * @param totalNamespaces total namespaces to create. Only the end part. Each namespace also gets a topic t1.
+ * @param initialScope unload scope while creating the policy.
+ * @param initialNamespaceRegex namespace regex while creating the policy.
+ * @param initialLoadedNS expected namespaces to be still loaded after the policy create call. Remaining namespaces
+ * will be asserted to be unloaded within 20 seconds.
+ * @param updatedScope unload scope while updating the policy.
+ * @param updatedNamespaceRegex namespace regex while updating the policy.
+ * @param updatedLoadedNS expected namespaces to be loaded after policy update call. Remaining namespaces will be
+ * asserted to be unloaded within 20 seconds.
+ * @throws PulsarAdminException
+ * @throws PulsarClientException
+ * @throws ExecutionException
+ * @throws InterruptedException
+ */
+ private void testIsolationPolicyUnloadsNSWithScope(String topicType, String policyName, String nsPrefix,
+ List totalNamespaces,
+ NamespaceIsolationPolicyUnloadScope initialScope,
+ List initialNamespaceRegex, List initialLoadedNS,
+ NamespaceIsolationPolicyUnloadScope updatedScope,
+ List updatedNamespaceRegex, List updatedLoadedNS,
+ List updatedBrokerRegex)
+ throws PulsarAdminException, PulsarClientException, ExecutionException, InterruptedException {
+
+ // Create all namespaces
+ List allTopics = new ArrayList<>();
+ for (String namespacePart: totalNamespaces) {
+ admin.namespaces().createNamespace(nsPrefix + namespacePart, Set.of("test"));
+ allTopics.add(topicType + "://" + nsPrefix + namespacePart + "/t1");
+ }
+ // Load all topics so that they are present. Assume topic t1 under each namespace
+ loadTopics(allTopics);
+
+ // Create the policy
+ NamespaceIsolationData nsPolicyData1 = createPolicyData(
+ initialScope, initialNamespaceRegex, Collections.singletonList(".*")
+ );
+ admin.clusters().createNamespaceIsolationPolicy("test", policyName, nsPolicyData1);
+
+ List initialLoadedTopics = new ArrayList<>();
+ for (String namespacePart: initialLoadedNS) {
+ initialLoadedTopics.add(topicType + "://" + nsPrefix + namespacePart + "/t1");
+ }
+
+ List initialUnloadedTopics = new ArrayList<>(allTopics);
+ initialUnloadedTopics.removeAll(initialLoadedTopics);
+
+ // Assert that all topics (and thus ns) not under initialLoadedNS namespaces are unloaded
+ if (initialUnloadedTopics.isEmpty()) {
+ // Just wait a bit to ensure we don't miss lazy unloading of topics we expect not to unload
+ TimeUnit.SECONDS.sleep(5);
+ } else {
+ Awaitility.await()
+ .atMost(10, TimeUnit.SECONDS)
+ .until(() -> allTopicsUnloaded(initialUnloadedTopics));
+ }
+ // Assert that all topics under initialLoadedNS are still present
+ initialLoadedTopics.forEach(t -> assertTrue(pulsar.getBrokerService().getTopicReference(t).isPresent()));
+
+ // Load the topics again
+ loadTopics(allTopics);
+
+ // Update policy using updatedScope with updated namespace regex
+ nsPolicyData1 = createPolicyData(updatedScope, updatedNamespaceRegex, updatedBrokerRegex);
+ admin.clusters().updateNamespaceIsolationPolicy("test", policyName, nsPolicyData1);
+
+ List updatedLoadedTopics = new ArrayList<>();
+ for (String namespacePart : updatedLoadedNS) {
+ updatedLoadedTopics.add(topicType + "://" + nsPrefix + namespacePart + "/t1");
+ }
+
+ List updatedUnloadedTopics = new ArrayList<>(allTopics);
+ updatedUnloadedTopics.removeAll(updatedLoadedTopics);
+
+ // Assert that all topics (and thus ns) not under updatedLoadedNS namespaces are unloaded
+ if (updatedUnloadedTopics.isEmpty()) {
+ // Just wait a bit to ensure we don't miss lazy unloading of topics we expect not to unload
+ TimeUnit.SECONDS.sleep(5);
+ } else {
+ Awaitility.await()
+ .atMost(10, TimeUnit.SECONDS)
+ .until(() -> allTopicsUnloaded(updatedUnloadedTopics));
+ }
+ // Assert that all topics under updatedLoadedNS are still present
+ updatedLoadedTopics.forEach(t -> assertTrue(pulsar.getBrokerService().getTopicReference(t).isPresent()));
+
+ }
+
+ @Test(dataProvider = "topicType")
+ public void testIsolationPolicyUnloadsNSWithAllScope(final String topicType) throws Exception {
+ String nsPrefix = newUniqueName(defaultTenant + "/") + "-unload-test-";
+ testIsolationPolicyUnloadsNSWithScope(
+ topicType, "policy-all", nsPrefix, List.of("a1", "a2", "b1", "b2", "c1"),
+ all_matching, List.of(".*-unload-test-a.*"), List.of("b1", "b2", "c1"),
+ all_matching, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("b1", "b2"),
+ Collections.singletonList(".*")
+ );
+ }
+
+ @Test(dataProvider = "topicType")
+ public void testIsolationPolicyUnloadsNSWithChangedScope(final String topicType) throws Exception {
+ String nsPrefix = newUniqueName(defaultTenant + "/") + "-unload-test-";
+ testIsolationPolicyUnloadsNSWithScope(
+ topicType, "policy-changed", nsPrefix, List.of("a1", "a2", "b1", "b2", "c1"),
+ all_matching, List.of(".*-unload-test-a.*"), List.of("b1", "b2", "c1"),
+ changed, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("a1", "a2", "b1", "b2"),
+ Collections.singletonList(".*")
+ );
+ }
+
+ @Test(dataProvider = "topicType")
+ public void testIsolationPolicyUnloadsNSWithNoneScope(final String topicType) throws Exception {
+ String nsPrefix = newUniqueName(defaultTenant + "/") + "-unload-test-";
+ testIsolationPolicyUnloadsNSWithScope(
+ topicType, "policy-none", nsPrefix, List.of("a1", "a2", "b1", "b2", "c1"),
+ all_matching, List.of(".*-unload-test-a.*"), List.of("b1", "b2", "c1"),
+ none, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("a1", "a2", "b1", "b2", "c1"),
+ Collections.singletonList(".*")
+ );
+ }
+
+ @Test(dataProvider = "topicType")
+ public void testIsolationPolicyUnloadsNSWithPrimaryChanged(final String topicType) throws Exception {
+ String nsPrefix = newUniqueName(defaultTenant + "/") + "-unload-test-";
+ // As per changed flag, only c1 should unload, but due to primary change, both a* and c* will.
+ testIsolationPolicyUnloadsNSWithScope(
+ topicType, "policy-primary-changed", nsPrefix, List.of("a1", "a2", "b1", "b2", "c1"),
+ all_matching, List.of(".*-unload-test-a.*"), List.of("b1", "b2", "c1"),
+ changed, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("b1", "b2"),
+ List.of(".*", "broker.*")
+ );
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
index c3265897b8767..76d5f8a4050ad 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -114,6 +114,7 @@ private void testOffload(String topicName, String mlName) throws Exception {
CompletableFuture promise = new CompletableFuture<>();
doReturn(promise).when(offloader).offload(any(), any(), any());
+ doReturn(true).when(offloader).isAppendable();
MessageId currentId = MessageId.latest;
try (Producer p = pulsarClient.newProducer().topic(topicName).enableBatching(false).create()) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 68356b1140d99..38df2cce3a764 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -237,6 +237,9 @@ protected void doInitConf() throws Exception {
this.conf.setWebServicePort(Optional.of(0));
this.conf.setNumExecutorThreadPoolSize(5);
this.conf.setExposeBundlesMetricsInPrometheus(true);
+ // Disable the dispatcher retry backoff in tests by default
+ this.conf.setDispatcherRetryBackoffInitialTimeInMs(0);
+ this.conf.setDispatcherRetryBackoffMaxTimeInMs(0);
}
protected final void init() throws Exception {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 2ce6728e98a08..c200cce09bf32 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -77,6 +77,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.commons.lang3.StringUtils;
+import org.apache.bookkeeper.mledger.impl.NonAppendableLedgerOffloader;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
@@ -1909,6 +1910,10 @@ public void close() {
final String namespace = "prop/" + UUID.randomUUID();
admin.namespaces().createNamespace(namespace);
admin.namespaces().setOffloadPolicies(namespace, offloadPolicies);
+ Awaitility.await().untilAsserted(() -> {
+ Object policiesGot = admin.namespaces().getOffloadPolicies(namespace);
+ assertNotNull(policiesGot);
+ });
// Inject the cache to avoid real load off-loader jar
final Map ledgerOffloaderMap = pulsar.getLedgerOffloaderMap();
@@ -1922,8 +1927,20 @@ public void close() {
// (2) test system topic
for (String eventTopicName : SystemTopicNames.EVENTS_TOPIC_NAMES) {
- managedLedgerConfig = brokerService.getManagedLedgerConfig(TopicName.get(eventTopicName)).join();
- Assert.assertEquals(managedLedgerConfig.getLedgerOffloader(), NullLedgerOffloader.INSTANCE);
+ boolean offloadPoliciesExists = false;
+ try {
+ Object policiesGot =
+ admin.namespaces().getOffloadPolicies(TopicName.get(eventTopicName).getNamespace());
+ offloadPoliciesExists = policiesGot != null;
+ } catch (PulsarAdminException.NotFoundException notFoundException) {
+ offloadPoliciesExists = false;
+ }
+ var managedLedgerConfig2 = brokerService.getManagedLedgerConfig(TopicName.get(eventTopicName)).join();
+ if (offloadPoliciesExists) {
+ Assert.assertTrue(managedLedgerConfig2.getLedgerOffloader() instanceof NonAppendableLedgerOffloader);
+ } else {
+ Assert.assertEquals(managedLedgerConfig2.getLedgerOffloader(), NullLedgerOffloader.INSTANCE);
+ }
}
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 7e1b5f8c71e6d..f7326734eaada 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -35,6 +35,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import io.netty.buffer.ByteBuf;
@@ -48,6 +49,7 @@
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -75,6 +77,7 @@
import org.mockito.ArgumentCaptor;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker")
@@ -94,6 +97,9 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
final String topicName = "persistent://public/default/testTopic";
final String subscriptionName = "testSubscription";
+ private AtomicInteger consumerMockAvailablePermits;
+ int retryBackoffInitialTimeInMs = 10;
+ int retryBackoffMaxTimeInMs = 50;
@BeforeMethod
public void setup() throws Exception {
@@ -103,7 +109,8 @@ public void setup() throws Exception {
doReturn(true).when(configMock).isSubscriptionKeySharedUseConsistentHashing();
doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints();
doReturn(true).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread();
-
+ doAnswer(invocation -> retryBackoffInitialTimeInMs).when(configMock).getDispatcherRetryBackoffInitialTimeInMs();
+ doAnswer(invocation -> retryBackoffMaxTimeInMs).when(configMock).getDispatcherRetryBackoffMaxTimeInMs();
pulsarMock = mock(PulsarService.class);
doReturn(configMock).when(pulsarMock).getConfiguration();
@@ -135,7 +142,8 @@ public void setup() throws Exception {
consumerMock = mock(Consumer.class);
channelMock = mock(ChannelPromise.class);
doReturn("consumer1").when(consumerMock).consumerName();
- doReturn(1000).when(consumerMock).getAvailablePermits();
+ consumerMockAvailablePermits = new AtomicInteger(1000);
+ doAnswer(invocation -> consumerMockAvailablePermits.get()).when(consumerMock).getAvailablePermits();
doReturn(true).when(consumerMock).isWritable();
doReturn(channelMock).when(consumerMock).sendMessages(
anyList(),
@@ -453,6 +461,171 @@ public void testMessageRedelivery() throws Exception {
allEntries.forEach(entry -> entry.release());
}
+ @DataProvider(name = "testBackoffDelayWhenNoMessagesDispatched")
+ private Object[][] testBackoffDelayWhenNoMessagesDispatchedParams() {
+ return new Object[][] { { false, true }, { true, true }, { true, false }, { false, false } };
+ }
+
+ @Test(dataProvider = "testBackoffDelayWhenNoMessagesDispatched")
+ public void testBackoffDelayWhenNoMessagesDispatched(boolean dispatchMessagesInSubscriptionThread, boolean isKeyShared)
+ throws Exception {
+ persistentDispatcher.close();
+
+ List retryDelays = new CopyOnWriteArrayList<>();
+ doReturn(dispatchMessagesInSubscriptionThread).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread();
+
+ PersistentDispatcherMultipleConsumers dispatcher;
+ if (isKeyShared) {
+ dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
+ topicMock, cursorMock, subscriptionMock, configMock,
+ new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) {
+ @Override
+ protected void reScheduleReadInMs(long readAfterMs) {
+ retryDelays.add(readAfterMs);
+ }
+ };
+ } else {
+ dispatcher = new PersistentDispatcherMultipleConsumers(topicMock, cursorMock, subscriptionMock) {
+ @Override
+ protected void reScheduleReadInMs(long readAfterMs) {
+ retryDelays.add(readAfterMs);
+ }
+ };
+ }
+
+ // add a consumer without permits to trigger the retry behavior
+ consumerMockAvailablePermits.set(0);
+ dispatcher.addConsumer(consumerMock);
+
+ // call "readEntriesComplete" directly to test the retry behavior
+ List entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
+ dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(retryDelays.size(), 1);
+ assertEquals(retryDelays.get(0), 10, "Initial retry delay should be 10ms");
+ }
+ );
+ // test the second retry delay
+ entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
+ dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(retryDelays.size(), 2);
+ double delay = retryDelays.get(1);
+ assertEquals(delay, 20.0, 2.0, "Second retry delay should be 20ms (jitter <-10%)");
+ }
+ );
+ // verify the max retry delay
+ for (int i = 0; i < 100; i++) {
+ entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
+ dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
+ }
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(retryDelays.size(), 102);
+ double delay = retryDelays.get(101);
+ assertEquals(delay, 50.0, 5.0, "Max delay should be 50ms (jitter <-10%)");
+ }
+ );
+ // unblock to check that the retry delay is reset
+ consumerMockAvailablePermits.set(1000);
+ entries = List.of(EntryImpl.create(1, 2, createMessage("message2", 1, "key2")));
+ dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
+ // wait that the possibly async handling has completed
+ Awaitility.await().untilAsserted(() -> assertFalse(dispatcher.isSendInProgress()));
+
+ // now block again to check the next retry delay so verify it was reset
+ consumerMockAvailablePermits.set(0);
+ entries = List.of(EntryImpl.create(1, 3, createMessage("message3", 1, "key3")));
+ dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(retryDelays.size(), 103);
+ assertEquals(retryDelays.get(0), 10, "Resetted retry delay should be 10ms");
+ }
+ );
+ }
+
+ @Test(dataProvider = "testBackoffDelayWhenNoMessagesDispatched")
+ public void testBackoffDelayWhenRetryDelayDisabled(boolean dispatchMessagesInSubscriptionThread, boolean isKeyShared)
+ throws Exception {
+ persistentDispatcher.close();
+
+ // it should be possible to disable the retry delay
+ // by setting retryBackoffInitialTimeInMs and retryBackoffMaxTimeInMs to 0
+ retryBackoffInitialTimeInMs=0;
+ retryBackoffMaxTimeInMs=0;
+
+ List retryDelays = new CopyOnWriteArrayList<>();
+ doReturn(dispatchMessagesInSubscriptionThread).when(configMock)
+ .isDispatcherDispatchMessagesInSubscriptionThread();
+
+ PersistentDispatcherMultipleConsumers dispatcher;
+ if (isKeyShared) {
+ dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
+ topicMock, cursorMock, subscriptionMock, configMock,
+ new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) {
+ @Override
+ protected void reScheduleReadInMs(long readAfterMs) {
+ retryDelays.add(readAfterMs);
+ }
+ };
+ } else {
+ dispatcher = new PersistentDispatcherMultipleConsumers(topicMock, cursorMock, subscriptionMock) {
+ @Override
+ protected void reScheduleReadInMs(long readAfterMs) {
+ retryDelays.add(readAfterMs);
+ }
+ };
+ }
+
+ // add a consumer without permits to trigger the retry behavior
+ consumerMockAvailablePermits.set(0);
+ dispatcher.addConsumer(consumerMock);
+
+ // call "readEntriesComplete" directly to test the retry behavior
+ List entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
+ dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(retryDelays.size(), 1);
+ assertEquals(retryDelays.get(0), 0, "Initial retry delay should be 0ms");
+ }
+ );
+ // test the second retry delay
+ entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
+ dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(retryDelays.size(), 2);
+ double delay = retryDelays.get(1);
+ assertEquals(delay, 0, 0, "Second retry delay should be 0ms");
+ }
+ );
+ // verify the max retry delay
+ for (int i = 0; i < 100; i++) {
+ entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
+ dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
+ }
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(retryDelays.size(), 102);
+ double delay = retryDelays.get(101);
+ assertEquals(delay, 0, 0, "Max delay should be 0ms");
+ }
+ );
+ // unblock to check that the retry delay is reset
+ consumerMockAvailablePermits.set(1000);
+ entries = List.of(EntryImpl.create(1, 2, createMessage("message2", 1, "key2")));
+ dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
+ // wait that the possibly async handling has completed
+ Awaitility.await().untilAsserted(() -> assertFalse(dispatcher.isSendInProgress()));
+
+ // now block again to check the next retry delay so verify it was reset
+ consumerMockAvailablePermits.set(0);
+ entries = List.of(EntryImpl.create(1, 3, createMessage("message3", 1, "key3")));
+ dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(retryDelays.size(), 103);
+ assertEquals(retryDelays.get(0), 0, "Resetted retry delay should be 0ms");
+ }
+ );
+ }
+
private ByteBuf createMessage(String message, int sequenceId) {
return createMessage(message, sequenceId, "testKey");
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index 1ff835732aab5..e6968a9e84367 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -163,6 +163,9 @@ protected void startBroker() throws Exception {
conf.setBrokerDeduplicationEnabled(true);
conf.setTransactionBufferSnapshotMaxTransactionCount(2);
conf.setTransactionBufferSnapshotMinTimeInMillis(2000);
+ // Disable the dispatcher retry backoff in tests by default
+ conf.setDispatcherRetryBackoffInitialTimeInMs(0);
+ conf.setDispatcherRetryBackoffMaxTimeInMs(0);
serviceConfigurationList.add(conf);
PulsarTestContext.Builder testContextBuilder =
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/UnloadSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/UnloadSubscriptionTest.java
index 93d5bf30ec6b1..22f7a5d6a43e4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/UnloadSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/UnloadSubscriptionTest.java
@@ -60,6 +60,7 @@ protected void doInitConf() throws Exception {
super.doInitConf();
conf.setSystemTopicEnabled(false);
conf.setTransactionCoordinatorEnabled(false);
+ conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
}
@AfterClass(alwaysRun = true)
@@ -242,6 +243,7 @@ private Consumer createConsumer(String topicName, String subName, Subscr
.subscriptionName(subName)
.subscriptionType(subType)
.isAckReceiptEnabled(true)
+ .enableBatchIndexAcknowledgment(true)
.subscribe();
return consumer;
}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationData.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationData.java
index aa48e69c14571..4f367f72fda33 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationData.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationData.java
@@ -31,6 +31,8 @@ public interface NamespaceIsolationData {
AutoFailoverPolicyData getAutoFailoverPolicy();
+ NamespaceIsolationPolicyUnloadScope getUnloadScope();
+
void validate();
interface Builder {
@@ -42,6 +44,8 @@ interface Builder {
Builder autoFailoverPolicy(AutoFailoverPolicyData autoFailoverPolicyData);
+ Builder unloadScope(NamespaceIsolationPolicyUnloadScope unloadScope);
+
NamespaceIsolationData build();
}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationPolicyUnloadScope.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationPolicyUnloadScope.java
new file mode 100644
index 0000000000000..2edeac45630f5
--- /dev/null
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationPolicyUnloadScope.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+/**
+ * The type of unload to perform while setting the isolation policy.
+ */
+public enum NamespaceIsolationPolicyUnloadScope {
+ all_matching, // unloads all matching namespaces as per new regex
+ none, // unloads no namespaces
+ changed; // unloads only the namespaces which are newly added or removed from the regex list
+
+ public static NamespaceIsolationPolicyUnloadScope fromString(String unloadScopeString) {
+ for (NamespaceIsolationPolicyUnloadScope unloadScope : NamespaceIsolationPolicyUnloadScope.values()) {
+ if (unloadScope.toString().equalsIgnoreCase(unloadScopeString)) {
+ return unloadScope;
+ }
+ }
+ return null;
+ }
+}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java
index 8de7ef500eaa0..0771144d31d9a 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java
@@ -37,6 +37,7 @@
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
+import org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope;
@Parameters(commandDescription = "Operations about namespace isolation policy")
public class CmdNamespaceIsolationPolicy extends CmdBase {
@@ -73,13 +74,22 @@ private class SetPolicy extends CliCommand {
required = true, converter = NameValueParameterSplitter.class)
private Map autoFailoverPolicyParams;
+ @Parameter(names = "--unload-scope", description = "configure the type of unload to do -"
+ + " ['all_matching', 'none', 'changed'] namespaces. By default, all namespaces matching the namespaces"
+ + " regex will be unloaded and placed again. You can choose to not unload any namespace while setting"
+ + " this new policy by choosing `none` or choose to unload only the namespaces whose placement will"
+ + " actually change. If you chose 'none', you will need to manually unload the namespaces for them to"
+ + " be placed correctly, or wait till some namespaces get load balanced automatically based on load"
+ + " shedding configurations.")
+ private NamespaceIsolationPolicyUnloadScope unloadScope;
+
void run() throws PulsarAdminException {
String clusterName = getOneArgument(params, 0, 2);
String policyName = getOneArgument(params, 1, 2);
// validate and create the POJO
NamespaceIsolationData namespaceIsolationData = createNamespaceIsolationData(namespaces, primary, secondary,
- autoFailoverPolicyTypeName, autoFailoverPolicyParams);
+ autoFailoverPolicyTypeName, autoFailoverPolicyParams, unloadScope);
getAdmin().clusters().createNamespaceIsolationPolicy(clusterName, policyName, namespaceIsolationData);
}
@@ -179,7 +189,8 @@ private NamespaceIsolationData createNamespaceIsolationData(List namespa
List primary,
List secondary,
String autoFailoverPolicyTypeName,
- Map autoFailoverPolicyParams) {
+ Map autoFailoverPolicyParams,
+ NamespaceIsolationPolicyUnloadScope unload) {
// validate
namespaces = validateList(namespaces);
@@ -246,6 +257,8 @@ private NamespaceIsolationData createNamespaceIsolationData(List namespa
throw new ParameterException("Unknown auto failover policy type specified : " + autoFailoverPolicyTypeName);
}
+ nsIsolationDataBuilder.unloadScope(unload);
+
return nsIsolationDataBuilder.build();
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/NamespaceIsolationPolicy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/NamespaceIsolationPolicy.java
index bd28d30d4cee9..52480d91eefa4 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/NamespaceIsolationPolicy.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/NamespaceIsolationPolicy.java
@@ -23,6 +23,7 @@
import java.util.SortedSet;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.BrokerStatus;
+import org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope;
/**
* Namespace isolation policy.
@@ -43,6 +44,11 @@ public interface NamespaceIsolationPolicy {
*/
List getSecondaryBrokers();
+ /**
+ * Get the unload scope for the policy set call.
+ */
+ NamespaceIsolationPolicyUnloadScope getUnloadScope();
+
/**
* Get the list of primary brokers for the namespace according to the policy.
*
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationDataImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationDataImpl.java
index bdb51f63f89ed..1e72f0e50ee05 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationDataImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationDataImpl.java
@@ -75,6 +75,15 @@ public class NamespaceIsolationDataImpl implements NamespaceIsolationData {
@JsonProperty("auto_failover_policy")
private AutoFailoverPolicyData autoFailoverPolicy;
+ @ApiModelProperty(
+ name = "unload_scope",
+ value = "The type of unload to perform while applying the new isolation policy.",
+ example = "'all_matching' (default) for unloading all matching namespaces. 'none' for not unloading "
+ + "any namespace. 'changed' for unloading only the namespaces whose placement is actually changing"
+ )
+ @JsonProperty("unload_scope")
+ private NamespaceIsolationPolicyUnloadScope unloadScope;
+
public static NamespaceIsolationDataImplBuilder builder() {
return new NamespaceIsolationDataImplBuilder();
}
@@ -106,6 +115,7 @@ public static class NamespaceIsolationDataImplBuilder implements NamespaceIsolat
private List primary = new ArrayList<>();
private List secondary = new ArrayList<>();
private AutoFailoverPolicyData autoFailoverPolicy;
+ private NamespaceIsolationPolicyUnloadScope unloadScope;
public NamespaceIsolationDataImplBuilder namespaces(List namespaces) {
this.namespaces = namespaces;
@@ -127,8 +137,13 @@ public NamespaceIsolationDataImplBuilder autoFailoverPolicy(AutoFailoverPolicyDa
return this;
}
+ public NamespaceIsolationDataImplBuilder unloadScope(NamespaceIsolationPolicyUnloadScope unloadScope) {
+ this.unloadScope = unloadScope;
+ return this;
+ }
+
public NamespaceIsolationDataImpl build() {
- return new NamespaceIsolationDataImpl(namespaces, primary, secondary, autoFailoverPolicy);
+ return new NamespaceIsolationDataImpl(namespaces, primary, secondary, autoFailoverPolicy, unloadScope);
}
}
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/NamespaceIsolationPolicyImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/NamespaceIsolationPolicyImpl.java
index af3663869fa02..440282f29cb36 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/NamespaceIsolationPolicyImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/impl/NamespaceIsolationPolicyImpl.java
@@ -29,6 +29,7 @@
import org.apache.pulsar.common.policies.NamespaceIsolationPolicy;
import org.apache.pulsar.common.policies.data.BrokerStatus;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
+import org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope;
/**
* Implementation of the namespace isolation policy.
@@ -39,6 +40,7 @@ public class NamespaceIsolationPolicyImpl implements NamespaceIsolationPolicy {
private List primary;
private List secondary;
private AutoFailoverPolicy autoFailoverPolicy;
+ private NamespaceIsolationPolicyUnloadScope unloadScope;
private boolean matchNamespaces(String fqnn) {
for (String nsRegex : namespaces) {
@@ -64,6 +66,7 @@ public NamespaceIsolationPolicyImpl(NamespaceIsolationData policyData) {
this.primary = policyData.getPrimary();
this.secondary = policyData.getSecondary();
this.autoFailoverPolicy = AutoFailoverPolicyFactory.create(policyData.getAutoFailoverPolicy());
+ this.unloadScope = policyData.getUnloadScope();
}
@Override
@@ -76,6 +79,11 @@ public List getSecondaryBrokers() {
return this.secondary;
}
+ @Override
+ public NamespaceIsolationPolicyUnloadScope getUnloadScope() {
+ return this.unloadScope;
+ }
+
@Override
public List findPrimaryBrokers(List availableBrokers, NamespaceName namespace) {
if (!this.matchNamespaces(namespace.toString())) {
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index 44957f0819277..9fdabaf3394fc 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -231,33 +231,33 @@ The Apache Software License, Version 2.0
- commons-compress-1.26.0.jar
- commons-lang3-3.11.jar
* Netty
- - netty-buffer-4.1.111.Final.jar
- - netty-codec-4.1.111.Final.jar
- - netty-codec-dns-4.1.111.Final.jar
- - netty-codec-http-4.1.111.Final.jar
- - netty-codec-haproxy-4.1.111.Final.jar
- - netty-codec-socks-4.1.111.Final.jar
- - netty-handler-proxy-4.1.111.Final.jar
- - netty-common-4.1.111.Final.jar
- - netty-handler-4.1.111.Final.jar
+ - netty-buffer-4.1.113.Final.jar
+ - netty-codec-4.1.113.Final.jar
+ - netty-codec-dns-4.1.113.Final.jar
+ - netty-codec-http-4.1.113.Final.jar
+ - netty-codec-haproxy-4.1.113.Final.jar
+ - netty-codec-socks-4.1.113.Final.jar
+ - netty-handler-proxy-4.1.113.Final.jar
+ - netty-common-4.1.113.Final.jar
+ - netty-handler-4.1.113.Final.jar
- netty-reactive-streams-2.0.6.jar
- - netty-resolver-4.1.111.Final.jar
- - netty-resolver-dns-4.1.111.Final.jar
- - netty-resolver-dns-classes-macos-4.1.111.Final.jar
- - netty-resolver-dns-native-macos-4.1.111.Final-osx-aarch_64.jar
- - netty-resolver-dns-native-macos-4.1.111.Final-osx-x86_64.jar
- - netty-tcnative-boringssl-static-2.0.65.Final.jar
- - netty-tcnative-boringssl-static-2.0.65.Final-linux-aarch_64.jar
- - netty-tcnative-boringssl-static-2.0.65.Final-linux-x86_64.jar
- - netty-tcnative-boringssl-static-2.0.65.Final-osx-aarch_64.jar
- - netty-tcnative-boringssl-static-2.0.65.Final-osx-x86_64.jar
- - netty-tcnative-boringssl-static-2.0.65.Final-windows-x86_64.jar
- - netty-tcnative-classes-2.0.65.Final.jar
- - netty-transport-4.1.111.Final.jar
- - netty-transport-classes-epoll-4.1.111.Final.jar
- - netty-transport-native-epoll-4.1.111.Final-linux-aarch_64.jar
- - netty-transport-native-epoll-4.1.111.Final-linux-x86_64.jar
- - netty-transport-native-unix-common-4.1.111.Final.jar
+ - netty-resolver-4.1.113.Final.jar
+ - netty-resolver-dns-4.1.113.Final.jar
+ - netty-resolver-dns-classes-macos-4.1.113.Final.jar
+ - netty-resolver-dns-native-macos-4.1.113.Final-osx-aarch_64.jar
+ - netty-resolver-dns-native-macos-4.1.113.Final-osx-x86_64.jar
+ - netty-tcnative-boringssl-static-2.0.66.Final.jar
+ - netty-tcnative-boringssl-static-2.0.66.Final-linux-aarch_64.jar
+ - netty-tcnative-boringssl-static-2.0.66.Final-linux-x86_64.jar
+ - netty-tcnative-boringssl-static-2.0.66.Final-osx-aarch_64.jar
+ - netty-tcnative-boringssl-static-2.0.66.Final-osx-x86_64.jar
+ - netty-tcnative-boringssl-static-2.0.66.Final-windows-x86_64.jar
+ - netty-tcnative-classes-2.0.66.Final.jar
+ - netty-transport-4.1.113.Final.jar
+ - netty-transport-classes-epoll-4.1.113.Final.jar
+ - netty-transport-native-epoll-4.1.113.Final-linux-aarch_64.jar
+ - netty-transport-native-epoll-4.1.113.Final-linux-x86_64.jar
+ - netty-transport-native-unix-common-4.1.113.Final.jar
- netty-incubator-transport-classes-io_uring-0.0.21.Final.jar
- netty-incubator-transport-native-io_uring-0.0.21.Final-linux-x86_64.jar
- netty-incubator-transport-native-io_uring-0.0.21.Final-linux-aarch_64.jar