diff --git a/conf/broker.conf b/conf/broker.conf
index 540d556b1b1ea..d4d803530f570 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -463,16 +463,6 @@ dispatcherReadFailureBackoffMaxTimeInMs=60000
# The read failure backoff mandatory stop time in milliseconds. By default it is 0s.
dispatcherReadFailureBackoffMandatoryStopTimeInMs=0
-# On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered
-# out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff
-# delay. This parameter sets the initial backoff delay in milliseconds.
-dispatcherRetryBackoffInitialTimeInMs=100
-
-# On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered
-# out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff
-# delay. This parameter sets the maximum backoff delay in milliseconds.
-dispatcherRetryBackoffMaxTimeInMs=1000
-
# Precise dispathcer flow control according to history message number of each entry
preciseDispatcherFlowControl=false
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 55ab670b59880..773ec5497b781 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -279,16 +279,6 @@ dispatcherReadFailureBackoffMaxTimeInMs=60000
# The read failure backoff mandatory stop time in milliseconds. By default it is 0s.
dispatcherReadFailureBackoffMandatoryStopTimeInMs=0
-# On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered
-# out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff
-# delay. This parameter sets the initial backoff delay in milliseconds.
-dispatcherRetryBackoffInitialTimeInMs=100
-
-# On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered
-# out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff
-# delay. This parameter sets the maximum backoff delay in milliseconds.
-dispatcherRetryBackoffMaxTimeInMs=1000
-
# Precise dispatcher flow control according to history message number of each entry
preciseDispatcherFlowControl=false
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 952248018f39a..4f448a3fe2281 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -476,11 +476,11 @@ The Apache Software License, Version 2.0
* JCTools - Java Concurrency Tools for the JVM
- org.jctools-jctools-core-2.1.2.jar
* Vertx
- - io.vertx-vertx-auth-common-4.5.8.jar
- - io.vertx-vertx-bridge-common-4.5.8.jar
- - io.vertx-vertx-core-4.5.8.jar
- - io.vertx-vertx-web-4.5.8.jar
- - io.vertx-vertx-web-common-4.5.8.jar
+ - io.vertx-vertx-auth-common-4.5.10.jar
+ - io.vertx-vertx-bridge-common-4.5.10.jar
+ - io.vertx-vertx-core-4.5.10.jar
+ - io.vertx-vertx-web-4.5.10.jar
+ - io.vertx-vertx-web-common-4.5.10.jar
* Apache ZooKeeper
- org.apache.zookeeper-zookeeper-3.9.2.jar
- org.apache.zookeeper-zookeeper-jute-3.9.2.jar
diff --git a/docker/pom.xml b/docker/pom.xml
index f53c4c1557cf5..4747b558d42f7 100644
--- a/docker/pom.xml
+++ b/docker/pom.xml
@@ -69,7 +69,6 @@
false
true
- true
false
diff --git a/pom.xml b/pom.xml
index c318f99697f68..d56bdeeb287a6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -150,7 +150,7 @@ flexible messaging model and an intuitive client API.
2.42
1.10.50
0.16.0
- 4.5.8
+ 4.5.10
7.9.2
1.7.32
4.4
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 1a6a85451e6bb..5321a420fb4b4 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1174,20 +1174,6 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
)
private int dispatcherReadFailureBackoffMandatoryStopTimeInMs = 0;
- @FieldContext(
- category = CATEGORY_POLICIES,
- doc = "On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered "
- + "out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff "
- + "delay. This parameter sets the initial backoff delay in milliseconds.")
- private int dispatcherRetryBackoffInitialTimeInMs = 100;
-
- @FieldContext(
- category = CATEGORY_POLICIES,
- doc = "On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered "
- + "out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff "
- + "delay. This parameter sets the maximum backoff delay in milliseconds.")
- private int dispatcherRetryBackoffMaxTimeInMs = 1000;
-
@FieldContext(
dynamic = true,
category = CATEGORY_SERVER,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index cd5acd069e747..ae844b5784456 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -47,7 +47,6 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
@@ -85,6 +84,7 @@
*/
public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMultipleConsumers
implements Dispatcher, ReadEntriesCallback {
+
protected final PersistentTopic topic;
protected final ManagedCursor cursor;
protected volatile Range lastIndividualDeletedRangeFromCursorRecovery;
@@ -122,8 +122,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
private AtomicBoolean isRescheduleReadInProgress = new AtomicBoolean(false);
protected final ExecutorService dispatchMessagesThread;
private final SharedConsumerAssignor assignor;
- protected int lastNumberOfEntriesDispatched;
- private final Backoff retryBackoff;
+
protected enum ReadType {
Normal, Replay
}
@@ -148,15 +147,10 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
this.initializeDispatchRateLimiterIfNeeded();
this.assignor = new SharedConsumerAssignor(this::getNextConsumer, this::addMessageToReplay);
- ServiceConfiguration serviceConfiguration = topic.getBrokerService().pulsar().getConfiguration();
this.readFailureBackoff = new Backoff(
- serviceConfiguration.getDispatcherReadFailureBackoffInitialTimeInMs(),
+ topic.getBrokerService().pulsar().getConfiguration().getDispatcherReadFailureBackoffInitialTimeInMs(),
TimeUnit.MILLISECONDS,
1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
- retryBackoff = new Backoff(
- serviceConfiguration.getDispatcherRetryBackoffInitialTimeInMs(), TimeUnit.MILLISECONDS,
- serviceConfiguration.getDispatcherRetryBackoffMaxTimeInMs(), TimeUnit.MILLISECONDS,
- 0, TimeUnit.MILLISECONDS);
}
@Override
@@ -398,23 +392,16 @@ public synchronized void readMoreEntries() {
@Override
protected void reScheduleRead() {
- reScheduleReadInMs(MESSAGE_RATE_BACKOFF_MS);
- }
-
- protected void reScheduleReadInMs(long readAfterMs) {
if (isRescheduleReadInProgress.compareAndSet(false, true)) {
if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] Reschedule message read in {} ms", topic.getName(), name, readAfterMs);
- }
- Runnable runnable = () -> {
- isRescheduleReadInProgress.set(false);
- readMoreEntries();
- };
- if (readAfterMs > 0) {
- topic.getBrokerService().executor().schedule(runnable, readAfterMs, TimeUnit.MILLISECONDS);
- } else {
- topic.getBrokerService().executor().execute(runnable);
+ log.debug("[{}] [{}] Reschedule message read in {} ms", topic.getName(), name, MESSAGE_RATE_BACKOFF_MS);
}
+ topic.getBrokerService().executor().schedule(
+ () -> {
+ isRescheduleReadInProgress.set(false);
+ readMoreEntries();
+ },
+ MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
}
}
@@ -625,8 +612,8 @@ public final synchronized void readEntriesComplete(List entries, Object c
log.debug("[{}] Distributing {} messages to {} consumers", name, entries.size(), consumerList.size());
}
- long totalBytesSize = entries.stream().mapToLong(Entry::getLength).sum();
- updatePendingBytesToDispatch(totalBytesSize);
+ long size = entries.stream().mapToLong(Entry::getLength).sum();
+ updatePendingBytesToDispatch(size);
// dispatch messages to a separate thread, but still in order for this subscription
// sendMessagesToConsumers is responsible for running broker-side filters
@@ -636,28 +623,19 @@ public final synchronized void readEntriesComplete(List entries, Object c
// in a separate thread, and we want to prevent more reads
acquireSendInProgress();
dispatchMessagesThread.execute(() -> {
- handleSendingMessagesAndReadingMore(readType, entries, false, totalBytesSize);
+ if (sendMessagesToConsumers(readType, entries, false)) {
+ updatePendingBytesToDispatch(-size);
+ readMoreEntries();
+ } else {
+ updatePendingBytesToDispatch(-size);
+ }
});
} else {
- handleSendingMessagesAndReadingMore(readType, entries, true, totalBytesSize);
- }
- }
-
- private synchronized void handleSendingMessagesAndReadingMore(ReadType readType, List entries,
- boolean needAcquireSendInProgress,
- long totalBytesSize) {
- boolean triggerReadingMore = sendMessagesToConsumers(readType, entries, needAcquireSendInProgress);
- int entriesDispatched = lastNumberOfEntriesDispatched;
- updatePendingBytesToDispatch(-totalBytesSize);
- if (triggerReadingMore) {
- if (entriesDispatched > 0) {
- // Reset the backoff when we successfully dispatched messages
- retryBackoff.reset();
- // Call readMoreEntries in the same thread to trigger the next read
- readMoreEntries();
- } else if (entriesDispatched == 0) {
- // If no messages were dispatched, we need to reschedule a new read with an increasing backoff delay
- reScheduleReadInMs(retryBackoff.next());
+ if (sendMessagesToConsumers(readType, entries, true)) {
+ updatePendingBytesToDispatch(-size);
+ readMoreEntriesAsync();
+ } else {
+ updatePendingBytesToDispatch(-size);
}
}
}
@@ -696,7 +674,6 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
if (needTrimAckedMessages()) {
cursor.trimDeletedEntries(entries);
}
- lastNumberOfEntriesDispatched = 0;
int entriesToDispatch = entries.size();
// Trigger read more messages
@@ -798,7 +775,6 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
totalBytesSent += sendMessageInfo.getTotalBytes();
}
- lastNumberOfEntriesDispatched = (int) totalEntries;
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);
if (entriesToDispatch > 0) {
@@ -812,7 +788,6 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
entry.release();
});
}
-
return true;
}
@@ -874,7 +849,6 @@ private boolean sendChunkedMessagesToConsumers(ReadType readType,
totalBytesSent += sendMessageInfo.getTotalBytes();
}
- lastNumberOfEntriesDispatched = (int) totalEntries;
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);
return numConsumers.get() == 0; // trigger a new readMoreEntries() call
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 397cb7226b767..2df9f38531f5d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -178,7 +178,6 @@ protected Map> initialValue() throws Exception {
@Override
protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List entries) {
- lastNumberOfEntriesDispatched = 0;
long totalMessagesSent = 0;
long totalBytesSent = 0;
long totalEntries = 0;
@@ -313,8 +312,6 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
}
}
-
- lastNumberOfEntriesDispatched = (int) totalEntries;
// acquire message-dispatch permits for already delivered messages
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 38df2cce3a764..68356b1140d99 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -237,9 +237,6 @@ protected void doInitConf() throws Exception {
this.conf.setWebServicePort(Optional.of(0));
this.conf.setNumExecutorThreadPoolSize(5);
this.conf.setExposeBundlesMetricsInPrometheus(true);
- // Disable the dispatcher retry backoff in tests by default
- this.conf.setDispatcherRetryBackoffInitialTimeInMs(0);
- this.conf.setDispatcherRetryBackoffMaxTimeInMs(0);
}
protected final void init() throws Exception {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index f7326734eaada..7e1b5f8c71e6d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -35,7 +35,6 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import io.netty.buffer.ByteBuf;
@@ -49,7 +48,6 @@
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -77,7 +75,6 @@
import org.mockito.ArgumentCaptor;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker")
@@ -97,9 +94,6 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
final String topicName = "persistent://public/default/testTopic";
final String subscriptionName = "testSubscription";
- private AtomicInteger consumerMockAvailablePermits;
- int retryBackoffInitialTimeInMs = 10;
- int retryBackoffMaxTimeInMs = 50;
@BeforeMethod
public void setup() throws Exception {
@@ -109,8 +103,7 @@ public void setup() throws Exception {
doReturn(true).when(configMock).isSubscriptionKeySharedUseConsistentHashing();
doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints();
doReturn(true).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread();
- doAnswer(invocation -> retryBackoffInitialTimeInMs).when(configMock).getDispatcherRetryBackoffInitialTimeInMs();
- doAnswer(invocation -> retryBackoffMaxTimeInMs).when(configMock).getDispatcherRetryBackoffMaxTimeInMs();
+
pulsarMock = mock(PulsarService.class);
doReturn(configMock).when(pulsarMock).getConfiguration();
@@ -142,8 +135,7 @@ public void setup() throws Exception {
consumerMock = mock(Consumer.class);
channelMock = mock(ChannelPromise.class);
doReturn("consumer1").when(consumerMock).consumerName();
- consumerMockAvailablePermits = new AtomicInteger(1000);
- doAnswer(invocation -> consumerMockAvailablePermits.get()).when(consumerMock).getAvailablePermits();
+ doReturn(1000).when(consumerMock).getAvailablePermits();
doReturn(true).when(consumerMock).isWritable();
doReturn(channelMock).when(consumerMock).sendMessages(
anyList(),
@@ -461,171 +453,6 @@ public void testMessageRedelivery() throws Exception {
allEntries.forEach(entry -> entry.release());
}
- @DataProvider(name = "testBackoffDelayWhenNoMessagesDispatched")
- private Object[][] testBackoffDelayWhenNoMessagesDispatchedParams() {
- return new Object[][] { { false, true }, { true, true }, { true, false }, { false, false } };
- }
-
- @Test(dataProvider = "testBackoffDelayWhenNoMessagesDispatched")
- public void testBackoffDelayWhenNoMessagesDispatched(boolean dispatchMessagesInSubscriptionThread, boolean isKeyShared)
- throws Exception {
- persistentDispatcher.close();
-
- List retryDelays = new CopyOnWriteArrayList<>();
- doReturn(dispatchMessagesInSubscriptionThread).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread();
-
- PersistentDispatcherMultipleConsumers dispatcher;
- if (isKeyShared) {
- dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
- topicMock, cursorMock, subscriptionMock, configMock,
- new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) {
- @Override
- protected void reScheduleReadInMs(long readAfterMs) {
- retryDelays.add(readAfterMs);
- }
- };
- } else {
- dispatcher = new PersistentDispatcherMultipleConsumers(topicMock, cursorMock, subscriptionMock) {
- @Override
- protected void reScheduleReadInMs(long readAfterMs) {
- retryDelays.add(readAfterMs);
- }
- };
- }
-
- // add a consumer without permits to trigger the retry behavior
- consumerMockAvailablePermits.set(0);
- dispatcher.addConsumer(consumerMock);
-
- // call "readEntriesComplete" directly to test the retry behavior
- List entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
- dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
- Awaitility.await().untilAsserted(() -> {
- assertEquals(retryDelays.size(), 1);
- assertEquals(retryDelays.get(0), 10, "Initial retry delay should be 10ms");
- }
- );
- // test the second retry delay
- entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
- dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
- Awaitility.await().untilAsserted(() -> {
- assertEquals(retryDelays.size(), 2);
- double delay = retryDelays.get(1);
- assertEquals(delay, 20.0, 2.0, "Second retry delay should be 20ms (jitter <-10%)");
- }
- );
- // verify the max retry delay
- for (int i = 0; i < 100; i++) {
- entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
- dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
- }
- Awaitility.await().untilAsserted(() -> {
- assertEquals(retryDelays.size(), 102);
- double delay = retryDelays.get(101);
- assertEquals(delay, 50.0, 5.0, "Max delay should be 50ms (jitter <-10%)");
- }
- );
- // unblock to check that the retry delay is reset
- consumerMockAvailablePermits.set(1000);
- entries = List.of(EntryImpl.create(1, 2, createMessage("message2", 1, "key2")));
- dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
- // wait that the possibly async handling has completed
- Awaitility.await().untilAsserted(() -> assertFalse(dispatcher.isSendInProgress()));
-
- // now block again to check the next retry delay so verify it was reset
- consumerMockAvailablePermits.set(0);
- entries = List.of(EntryImpl.create(1, 3, createMessage("message3", 1, "key3")));
- dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
- Awaitility.await().untilAsserted(() -> {
- assertEquals(retryDelays.size(), 103);
- assertEquals(retryDelays.get(0), 10, "Resetted retry delay should be 10ms");
- }
- );
- }
-
- @Test(dataProvider = "testBackoffDelayWhenNoMessagesDispatched")
- public void testBackoffDelayWhenRetryDelayDisabled(boolean dispatchMessagesInSubscriptionThread, boolean isKeyShared)
- throws Exception {
- persistentDispatcher.close();
-
- // it should be possible to disable the retry delay
- // by setting retryBackoffInitialTimeInMs and retryBackoffMaxTimeInMs to 0
- retryBackoffInitialTimeInMs=0;
- retryBackoffMaxTimeInMs=0;
-
- List retryDelays = new CopyOnWriteArrayList<>();
- doReturn(dispatchMessagesInSubscriptionThread).when(configMock)
- .isDispatcherDispatchMessagesInSubscriptionThread();
-
- PersistentDispatcherMultipleConsumers dispatcher;
- if (isKeyShared) {
- dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
- topicMock, cursorMock, subscriptionMock, configMock,
- new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) {
- @Override
- protected void reScheduleReadInMs(long readAfterMs) {
- retryDelays.add(readAfterMs);
- }
- };
- } else {
- dispatcher = new PersistentDispatcherMultipleConsumers(topicMock, cursorMock, subscriptionMock) {
- @Override
- protected void reScheduleReadInMs(long readAfterMs) {
- retryDelays.add(readAfterMs);
- }
- };
- }
-
- // add a consumer without permits to trigger the retry behavior
- consumerMockAvailablePermits.set(0);
- dispatcher.addConsumer(consumerMock);
-
- // call "readEntriesComplete" directly to test the retry behavior
- List entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
- dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
- Awaitility.await().untilAsserted(() -> {
- assertEquals(retryDelays.size(), 1);
- assertEquals(retryDelays.get(0), 0, "Initial retry delay should be 0ms");
- }
- );
- // test the second retry delay
- entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
- dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
- Awaitility.await().untilAsserted(() -> {
- assertEquals(retryDelays.size(), 2);
- double delay = retryDelays.get(1);
- assertEquals(delay, 0, 0, "Second retry delay should be 0ms");
- }
- );
- // verify the max retry delay
- for (int i = 0; i < 100; i++) {
- entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
- dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
- }
- Awaitility.await().untilAsserted(() -> {
- assertEquals(retryDelays.size(), 102);
- double delay = retryDelays.get(101);
- assertEquals(delay, 0, 0, "Max delay should be 0ms");
- }
- );
- // unblock to check that the retry delay is reset
- consumerMockAvailablePermits.set(1000);
- entries = List.of(EntryImpl.create(1, 2, createMessage("message2", 1, "key2")));
- dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
- // wait that the possibly async handling has completed
- Awaitility.await().untilAsserted(() -> assertFalse(dispatcher.isSendInProgress()));
-
- // now block again to check the next retry delay so verify it was reset
- consumerMockAvailablePermits.set(0);
- entries = List.of(EntryImpl.create(1, 3, createMessage("message3", 1, "key3")));
- dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
- Awaitility.await().untilAsserted(() -> {
- assertEquals(retryDelays.size(), 103);
- assertEquals(retryDelays.get(0), 0, "Resetted retry delay should be 0ms");
- }
- );
- }
-
private ByteBuf createMessage(String message, int sequenceId) {
return createMessage(message, sequenceId, "testKey");
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index e6968a9e84367..1ff835732aab5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -163,9 +163,6 @@ protected void startBroker() throws Exception {
conf.setBrokerDeduplicationEnabled(true);
conf.setTransactionBufferSnapshotMaxTransactionCount(2);
conf.setTransactionBufferSnapshotMinTimeInMillis(2000);
- // Disable the dispatcher retry backoff in tests by default
- conf.setDispatcherRetryBackoffInitialTimeInMs(0);
- conf.setDispatcherRetryBackoffMaxTimeInMs(0);
serviceConfigurationList.add(conf);
PulsarTestContext.Builder testContextBuilder =
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
index 6d558e709716d..31f7e50e994c5 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
@@ -57,7 +57,7 @@ public interface MetadataCache {
*
* @param path
* the path of the object in the metadata store
- * @return the cached object or an empty {@link Optional} is the cache doesn't have the object
+ * @return the cached object or an empty {@link Optional} is the cache does not have the object
*/
Optional getIfCached(String path);
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
index 411ee038c48b0..dd256d32f8ddb 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
@@ -127,10 +127,11 @@ public static void assertEqualsAndRetry(Supplier