From 1627e9579930cfe0b8aca019fd662dc373e23db3 Mon Sep 17 00:00:00 2001 From: maheshnikam <55378196+nikam14@users.noreply.github.com> Date: Mon, 23 Sep 2024 14:17:31 +0530 Subject: [PATCH 1/5] [fix][test] Fix flaky test LeaderElectionTest.revalidateLeaderWithinSameSession (#22383) (cherry picked from commit 216b83008deb469e0fc55ed8117f0c393ebcb0ac) (cherry picked from commit e63dcd4c5e55d1a1e2dffac886ee0f1719326986) --- .../java/org/apache/pulsar/metadata/api/MetadataCache.java | 2 +- .../org/apache/pulsar/metadata/BaseMetadataStoreTest.java | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java index 6d558e709716d..31f7e50e994c5 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java @@ -57,7 +57,7 @@ public interface MetadataCache { * * @param path * the path of the object in the metadata store - * @return the cached object or an empty {@link Optional} is the cache doesn't have the object + * @return the cached object or an empty {@link Optional} is the cache does not have the object */ Optional getIfCached(String path); diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java index 411ee038c48b0..dd256d32f8ddb 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java @@ -127,10 +127,11 @@ public static void assertEqualsAndRetry(Supplier actual, int retryCount, long intSleepTimeInMillis) throws Exception { assertTrue(retryStrategically((__) -> { - if (actual.get().equals(expectedAndRetry)) { + Object actualObject = actual.get(); + if (actualObject.equals(expectedAndRetry)) { return false; } - assertEquals(actual.get(), expected); + assertEquals(actualObject, expected); return true; }, retryCount, intSleepTimeInMillis)); } From 1bae705f0e0ef96d1ecfbd1531ea070593198f93 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 23 Sep 2024 15:00:52 +0300 Subject: [PATCH 2/5] [fix][build] Fix problem where git.commit.id.abbrev is missing in image tagging (#23337) (cherry picked from commit 7d4ac9dc542ff1e840f4e520836b6a3c49c6338d) (cherry picked from commit 0b589b4e1fb1d87b9871114d5391440cbf5562a2) --- docker/pom.xml | 1 - 1 file changed, 1 deletion(-) diff --git a/docker/pom.xml b/docker/pom.xml index f53c4c1557cf5..4747b558d42f7 100644 --- a/docker/pom.xml +++ b/docker/pom.xml @@ -69,7 +69,6 @@ false true - true false From be82629392c6ad2d44312498196ebc41f0be2fe9 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 23 Sep 2024 16:37:33 +0300 Subject: [PATCH 3/5] [fix][sec] Upgrade vertx to 4.5.10 to address CVE-2024-8391 (#23338) (cherry picked from commit 501dfdeace9ef321acbdc5ce32d98eb3e56e083a) (cherry picked from commit e8e9126bb1ac4d2baf91bb73f4b4c12e81fe4497) --- distribution/server/src/assemble/LICENSE.bin.txt | 10 +++++----- pom.xml | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index 952248018f39a..4f448a3fe2281 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -476,11 +476,11 @@ The Apache Software License, Version 2.0 * JCTools - Java Concurrency Tools for the JVM - org.jctools-jctools-core-2.1.2.jar * Vertx - - io.vertx-vertx-auth-common-4.5.8.jar - - io.vertx-vertx-bridge-common-4.5.8.jar - - io.vertx-vertx-core-4.5.8.jar - - io.vertx-vertx-web-4.5.8.jar - - io.vertx-vertx-web-common-4.5.8.jar + - io.vertx-vertx-auth-common-4.5.10.jar + - io.vertx-vertx-bridge-common-4.5.10.jar + - io.vertx-vertx-core-4.5.10.jar + - io.vertx-vertx-web-4.5.10.jar + - io.vertx-vertx-web-common-4.5.10.jar * Apache ZooKeeper - org.apache.zookeeper-zookeeper-3.9.2.jar - org.apache.zookeeper-zookeeper-jute-3.9.2.jar diff --git a/pom.xml b/pom.xml index c318f99697f68..d56bdeeb287a6 100644 --- a/pom.xml +++ b/pom.xml @@ -150,7 +150,7 @@ flexible messaging model and an intuitive client API. 2.42 1.10.50 0.16.0 - 4.5.8 + 4.5.10 7.9.2 1.7.32 4.4 From 992ccaa721df77333d18db9df24da6b3f9c59bfe Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Sep 2024 18:02:04 +0300 Subject: [PATCH 4/5] Revert "[fix][broker] Fix retry backoff for PersistentDispatcherMultipleConsumers (#23284)" This reverts commit d9598b501b234a986e93a6609c5e62f6d05bd234. (cherry picked from commit 9b9e7a8ef4d31dc6fde6482b5dfe0ba928bc9ae9) --- ...PersistentDispatcherMultipleConsumers.java | 19 +-- ...ckyKeyDispatcherMultipleConsumersTest.java | 140 +++--------------- 2 files changed, 30 insertions(+), 129 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index cd5acd069e747..23c4cdd84c2d9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -406,15 +406,12 @@ protected void reScheduleReadInMs(long readAfterMs) { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Reschedule message read in {} ms", topic.getName(), name, readAfterMs); } - Runnable runnable = () -> { - isRescheduleReadInProgress.set(false); - readMoreEntries(); - }; - if (readAfterMs > 0) { - topic.getBrokerService().executor().schedule(runnable, readAfterMs, TimeUnit.MILLISECONDS); - } else { - topic.getBrokerService().executor().execute(runnable); - } + topic.getBrokerService().executor().schedule( + () -> { + isRescheduleReadInProgress.set(false); + readMoreEntries(); + }, + readAfterMs, TimeUnit.MILLISECONDS); } } @@ -798,7 +795,6 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis totalBytesSent += sendMessageInfo.getTotalBytes(); } - lastNumberOfEntriesDispatched = (int) totalEntries; acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent); if (entriesToDispatch > 0) { @@ -811,8 +807,9 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash); entry.release(); }); - } + lastNumberOfEntriesDispatched = entriesToDispatch; + } return true; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java index f7326734eaada..f6431b38fb6fc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java @@ -98,8 +98,6 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest { final String topicName = "persistent://public/default/testTopic"; final String subscriptionName = "testSubscription"; private AtomicInteger consumerMockAvailablePermits; - int retryBackoffInitialTimeInMs = 10; - int retryBackoffMaxTimeInMs = 50; @BeforeMethod public void setup() throws Exception { @@ -109,8 +107,8 @@ public void setup() throws Exception { doReturn(true).when(configMock).isSubscriptionKeySharedUseConsistentHashing(); doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints(); doReturn(true).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread(); - doAnswer(invocation -> retryBackoffInitialTimeInMs).when(configMock).getDispatcherRetryBackoffInitialTimeInMs(); - doAnswer(invocation -> retryBackoffMaxTimeInMs).when(configMock).getDispatcherRetryBackoffMaxTimeInMs(); + doReturn(10).when(configMock).getDispatcherRetryBackoffInitialTimeInMs(); + doReturn(50).when(configMock).getDispatcherRetryBackoffMaxTimeInMs(); pulsarMock = mock(PulsarService.class); doReturn(configMock).when(pulsarMock).getConfiguration(); @@ -461,45 +459,34 @@ public void testMessageRedelivery() throws Exception { allEntries.forEach(entry -> entry.release()); } - @DataProvider(name = "testBackoffDelayWhenNoMessagesDispatched") - private Object[][] testBackoffDelayWhenNoMessagesDispatchedParams() { - return new Object[][] { { false, true }, { true, true }, { true, false }, { false, false } }; + @DataProvider(name = "dispatchMessagesInSubscriptionThread") + private Object[][] dispatchMessagesInSubscriptionThread() { + return new Object[][] { { false }, { true } }; } - @Test(dataProvider = "testBackoffDelayWhenNoMessagesDispatched") - public void testBackoffDelayWhenNoMessagesDispatched(boolean dispatchMessagesInSubscriptionThread, boolean isKeyShared) + @Test(dataProvider = "dispatchMessagesInSubscriptionThread") + public void testBackoffDelayWhenNoMessagesDispatched(boolean dispatchMessagesInSubscriptionThread) throws Exception { persistentDispatcher.close(); List retryDelays = new CopyOnWriteArrayList<>(); doReturn(dispatchMessagesInSubscriptionThread).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread(); - - PersistentDispatcherMultipleConsumers dispatcher; - if (isKeyShared) { - dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers( - topicMock, cursorMock, subscriptionMock, configMock, - new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) { - @Override - protected void reScheduleReadInMs(long readAfterMs) { - retryDelays.add(readAfterMs); - } - }; - } else { - dispatcher = new PersistentDispatcherMultipleConsumers(topicMock, cursorMock, subscriptionMock) { - @Override - protected void reScheduleReadInMs(long readAfterMs) { - retryDelays.add(readAfterMs); - } - }; - } + persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers( + topicMock, cursorMock, subscriptionMock, configMock, + new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) { + @Override + protected void reScheduleReadInMs(long readAfterMs) { + retryDelays.add(readAfterMs); + } + }; // add a consumer without permits to trigger the retry behavior consumerMockAvailablePermits.set(0); - dispatcher.addConsumer(consumerMock); + persistentDispatcher.addConsumer(consumerMock); // call "readEntriesComplete" directly to test the retry behavior List entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); - dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); + persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); Awaitility.await().untilAsserted(() -> { assertEquals(retryDelays.size(), 1); assertEquals(retryDelays.get(0), 10, "Initial retry delay should be 10ms"); @@ -507,7 +494,7 @@ protected void reScheduleReadInMs(long readAfterMs) { ); // test the second retry delay entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); - dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); + persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); Awaitility.await().untilAsserted(() -> { assertEquals(retryDelays.size(), 2); double delay = retryDelays.get(1); @@ -517,7 +504,7 @@ protected void reScheduleReadInMs(long readAfterMs) { // verify the max retry delay for (int i = 0; i < 100; i++) { entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); - dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); + persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); } Awaitility.await().untilAsserted(() -> { assertEquals(retryDelays.size(), 102); @@ -528,14 +515,14 @@ protected void reScheduleReadInMs(long readAfterMs) { // unblock to check that the retry delay is reset consumerMockAvailablePermits.set(1000); entries = List.of(EntryImpl.create(1, 2, createMessage("message2", 1, "key2"))); - dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); + persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); // wait that the possibly async handling has completed - Awaitility.await().untilAsserted(() -> assertFalse(dispatcher.isSendInProgress())); + Awaitility.await().untilAsserted(() -> assertFalse(persistentDispatcher.isSendInProgress())); // now block again to check the next retry delay so verify it was reset consumerMockAvailablePermits.set(0); entries = List.of(EntryImpl.create(1, 3, createMessage("message3", 1, "key3"))); - dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); + persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); Awaitility.await().untilAsserted(() -> { assertEquals(retryDelays.size(), 103); assertEquals(retryDelays.get(0), 10, "Resetted retry delay should be 10ms"); @@ -543,89 +530,6 @@ protected void reScheduleReadInMs(long readAfterMs) { ); } - @Test(dataProvider = "testBackoffDelayWhenNoMessagesDispatched") - public void testBackoffDelayWhenRetryDelayDisabled(boolean dispatchMessagesInSubscriptionThread, boolean isKeyShared) - throws Exception { - persistentDispatcher.close(); - - // it should be possible to disable the retry delay - // by setting retryBackoffInitialTimeInMs and retryBackoffMaxTimeInMs to 0 - retryBackoffInitialTimeInMs=0; - retryBackoffMaxTimeInMs=0; - - List retryDelays = new CopyOnWriteArrayList<>(); - doReturn(dispatchMessagesInSubscriptionThread).when(configMock) - .isDispatcherDispatchMessagesInSubscriptionThread(); - - PersistentDispatcherMultipleConsumers dispatcher; - if (isKeyShared) { - dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers( - topicMock, cursorMock, subscriptionMock, configMock, - new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) { - @Override - protected void reScheduleReadInMs(long readAfterMs) { - retryDelays.add(readAfterMs); - } - }; - } else { - dispatcher = new PersistentDispatcherMultipleConsumers(topicMock, cursorMock, subscriptionMock) { - @Override - protected void reScheduleReadInMs(long readAfterMs) { - retryDelays.add(readAfterMs); - } - }; - } - - // add a consumer without permits to trigger the retry behavior - consumerMockAvailablePermits.set(0); - dispatcher.addConsumer(consumerMock); - - // call "readEntriesComplete" directly to test the retry behavior - List entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); - dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); - Awaitility.await().untilAsserted(() -> { - assertEquals(retryDelays.size(), 1); - assertEquals(retryDelays.get(0), 0, "Initial retry delay should be 0ms"); - } - ); - // test the second retry delay - entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); - dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); - Awaitility.await().untilAsserted(() -> { - assertEquals(retryDelays.size(), 2); - double delay = retryDelays.get(1); - assertEquals(delay, 0, 0, "Second retry delay should be 0ms"); - } - ); - // verify the max retry delay - for (int i = 0; i < 100; i++) { - entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); - dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); - } - Awaitility.await().untilAsserted(() -> { - assertEquals(retryDelays.size(), 102); - double delay = retryDelays.get(101); - assertEquals(delay, 0, 0, "Max delay should be 0ms"); - } - ); - // unblock to check that the retry delay is reset - consumerMockAvailablePermits.set(1000); - entries = List.of(EntryImpl.create(1, 2, createMessage("message2", 1, "key2"))); - dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); - // wait that the possibly async handling has completed - Awaitility.await().untilAsserted(() -> assertFalse(dispatcher.isSendInProgress())); - - // now block again to check the next retry delay so verify it was reset - consumerMockAvailablePermits.set(0); - entries = List.of(EntryImpl.create(1, 3, createMessage("message3", 1, "key3"))); - dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); - Awaitility.await().untilAsserted(() -> { - assertEquals(retryDelays.size(), 103); - assertEquals(retryDelays.get(0), 0, "Resetted retry delay should be 0ms"); - } - ); - } - private ByteBuf createMessage(String message, int sequenceId) { return createMessage(message, sequenceId, "testKey"); } From 2a8ddfc3790b4429afa64c08a152cfb8de9e24e9 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 24 Sep 2024 18:03:02 +0300 Subject: [PATCH 5/5] Revert "[improve][broker] Reschedule reads with increasing backoff when no messages are dispatched (#23226)" This reverts commit f990e28c8bf1abfbbf5539758156fffd578232ef. (cherry picked from commit 0fd9f1746baedd3825e17a33f7d4b9aadd0a3526) --- conf/broker.conf | 10 --- conf/standalone.conf | 10 --- .../pulsar/broker/ServiceConfiguration.java | 14 ---- ...PersistentDispatcherMultipleConsumers.java | 59 +++++--------- ...tStickyKeyDispatcherMultipleConsumers.java | 3 - .../auth/MockedPulsarServiceBaseTest.java | 3 - ...ckyKeyDispatcherMultipleConsumersTest.java | 81 +------------------ .../transaction/TransactionTestBase.java | 3 - 8 files changed, 20 insertions(+), 163 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 540d556b1b1ea..d4d803530f570 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -463,16 +463,6 @@ dispatcherReadFailureBackoffMaxTimeInMs=60000 # The read failure backoff mandatory stop time in milliseconds. By default it is 0s. dispatcherReadFailureBackoffMandatoryStopTimeInMs=0 -# On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered -# out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff -# delay. This parameter sets the initial backoff delay in milliseconds. -dispatcherRetryBackoffInitialTimeInMs=100 - -# On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered -# out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff -# delay. This parameter sets the maximum backoff delay in milliseconds. -dispatcherRetryBackoffMaxTimeInMs=1000 - # Precise dispathcer flow control according to history message number of each entry preciseDispatcherFlowControl=false diff --git a/conf/standalone.conf b/conf/standalone.conf index 55ab670b59880..773ec5497b781 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -279,16 +279,6 @@ dispatcherReadFailureBackoffMaxTimeInMs=60000 # The read failure backoff mandatory stop time in milliseconds. By default it is 0s. dispatcherReadFailureBackoffMandatoryStopTimeInMs=0 -# On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered -# out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff -# delay. This parameter sets the initial backoff delay in milliseconds. -dispatcherRetryBackoffInitialTimeInMs=100 - -# On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered -# out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff -# delay. This parameter sets the maximum backoff delay in milliseconds. -dispatcherRetryBackoffMaxTimeInMs=1000 - # Precise dispatcher flow control according to history message number of each entry preciseDispatcherFlowControl=false diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 1a6a85451e6bb..5321a420fb4b4 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1174,20 +1174,6 @@ The delayed message index time step(in seconds) in per bucket snapshot segment, ) private int dispatcherReadFailureBackoffMandatoryStopTimeInMs = 0; - @FieldContext( - category = CATEGORY_POLICIES, - doc = "On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered " - + "out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff " - + "delay. This parameter sets the initial backoff delay in milliseconds.") - private int dispatcherRetryBackoffInitialTimeInMs = 100; - - @FieldContext( - category = CATEGORY_POLICIES, - doc = "On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered " - + "out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff " - + "delay. This parameter sets the maximum backoff delay in milliseconds.") - private int dispatcherRetryBackoffMaxTimeInMs = 1000; - @FieldContext( dynamic = true, category = CATEGORY_SERVER, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 23c4cdd84c2d9..ae844b5784456 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -47,7 +47,6 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.tuple.Pair; -import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory; import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker; import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory; @@ -85,6 +84,7 @@ */ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMultipleConsumers implements Dispatcher, ReadEntriesCallback { + protected final PersistentTopic topic; protected final ManagedCursor cursor; protected volatile Range lastIndividualDeletedRangeFromCursorRecovery; @@ -122,8 +122,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul private AtomicBoolean isRescheduleReadInProgress = new AtomicBoolean(false); protected final ExecutorService dispatchMessagesThread; private final SharedConsumerAssignor assignor; - protected int lastNumberOfEntriesDispatched; - private final Backoff retryBackoff; + protected enum ReadType { Normal, Replay } @@ -148,15 +147,10 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize(); this.initializeDispatchRateLimiterIfNeeded(); this.assignor = new SharedConsumerAssignor(this::getNextConsumer, this::addMessageToReplay); - ServiceConfiguration serviceConfiguration = topic.getBrokerService().pulsar().getConfiguration(); this.readFailureBackoff = new Backoff( - serviceConfiguration.getDispatcherReadFailureBackoffInitialTimeInMs(), + topic.getBrokerService().pulsar().getConfiguration().getDispatcherReadFailureBackoffInitialTimeInMs(), TimeUnit.MILLISECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS); - retryBackoff = new Backoff( - serviceConfiguration.getDispatcherRetryBackoffInitialTimeInMs(), TimeUnit.MILLISECONDS, - serviceConfiguration.getDispatcherRetryBackoffMaxTimeInMs(), TimeUnit.MILLISECONDS, - 0, TimeUnit.MILLISECONDS); } @Override @@ -398,20 +392,16 @@ public synchronized void readMoreEntries() { @Override protected void reScheduleRead() { - reScheduleReadInMs(MESSAGE_RATE_BACKOFF_MS); - } - - protected void reScheduleReadInMs(long readAfterMs) { if (isRescheduleReadInProgress.compareAndSet(false, true)) { if (log.isDebugEnabled()) { - log.debug("[{}] [{}] Reschedule message read in {} ms", topic.getName(), name, readAfterMs); + log.debug("[{}] [{}] Reschedule message read in {} ms", topic.getName(), name, MESSAGE_RATE_BACKOFF_MS); } topic.getBrokerService().executor().schedule( () -> { isRescheduleReadInProgress.set(false); readMoreEntries(); }, - readAfterMs, TimeUnit.MILLISECONDS); + MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS); } } @@ -622,8 +612,8 @@ public final synchronized void readEntriesComplete(List entries, Object c log.debug("[{}] Distributing {} messages to {} consumers", name, entries.size(), consumerList.size()); } - long totalBytesSize = entries.stream().mapToLong(Entry::getLength).sum(); - updatePendingBytesToDispatch(totalBytesSize); + long size = entries.stream().mapToLong(Entry::getLength).sum(); + updatePendingBytesToDispatch(size); // dispatch messages to a separate thread, but still in order for this subscription // sendMessagesToConsumers is responsible for running broker-side filters @@ -633,28 +623,19 @@ public final synchronized void readEntriesComplete(List entries, Object c // in a separate thread, and we want to prevent more reads acquireSendInProgress(); dispatchMessagesThread.execute(() -> { - handleSendingMessagesAndReadingMore(readType, entries, false, totalBytesSize); + if (sendMessagesToConsumers(readType, entries, false)) { + updatePendingBytesToDispatch(-size); + readMoreEntries(); + } else { + updatePendingBytesToDispatch(-size); + } }); } else { - handleSendingMessagesAndReadingMore(readType, entries, true, totalBytesSize); - } - } - - private synchronized void handleSendingMessagesAndReadingMore(ReadType readType, List entries, - boolean needAcquireSendInProgress, - long totalBytesSize) { - boolean triggerReadingMore = sendMessagesToConsumers(readType, entries, needAcquireSendInProgress); - int entriesDispatched = lastNumberOfEntriesDispatched; - updatePendingBytesToDispatch(-totalBytesSize); - if (triggerReadingMore) { - if (entriesDispatched > 0) { - // Reset the backoff when we successfully dispatched messages - retryBackoff.reset(); - // Call readMoreEntries in the same thread to trigger the next read - readMoreEntries(); - } else if (entriesDispatched == 0) { - // If no messages were dispatched, we need to reschedule a new read with an increasing backoff delay - reScheduleReadInMs(retryBackoff.next()); + if (sendMessagesToConsumers(readType, entries, true)) { + updatePendingBytesToDispatch(-size); + readMoreEntriesAsync(); + } else { + updatePendingBytesToDispatch(-size); } } } @@ -693,7 +674,6 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis if (needTrimAckedMessages()) { cursor.trimDeletedEntries(entries); } - lastNumberOfEntriesDispatched = 0; int entriesToDispatch = entries.size(); // Trigger read more messages @@ -807,8 +787,6 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash); entry.release(); }); - - lastNumberOfEntriesDispatched = entriesToDispatch; } return true; } @@ -871,7 +849,6 @@ private boolean sendChunkedMessagesToConsumers(ReadType readType, totalBytesSent += sendMessageInfo.getTotalBytes(); } - lastNumberOfEntriesDispatched = (int) totalEntries; acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent); return numConsumers.get() == 0; // trigger a new readMoreEntries() call diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 397cb7226b767..2df9f38531f5d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -178,7 +178,6 @@ protected Map> initialValue() throws Exception { @Override protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List entries) { - lastNumberOfEntriesDispatched = 0; long totalMessagesSent = 0; long totalBytesSent = 0; long totalEntries = 0; @@ -313,8 +312,6 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis } } - - lastNumberOfEntriesDispatched = (int) totalEntries; // acquire message-dispatch permits for already delivered messages acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index 38df2cce3a764..68356b1140d99 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -237,9 +237,6 @@ protected void doInitConf() throws Exception { this.conf.setWebServicePort(Optional.of(0)); this.conf.setNumExecutorThreadPoolSize(5); this.conf.setExposeBundlesMetricsInPrometheus(true); - // Disable the dispatcher retry backoff in tests by default - this.conf.setDispatcherRetryBackoffInitialTimeInMs(0); - this.conf.setDispatcherRetryBackoffMaxTimeInMs(0); } protected final void init() throws Exception { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java index f6431b38fb6fc..7e1b5f8c71e6d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java @@ -35,7 +35,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import io.netty.buffer.ByteBuf; @@ -49,7 +48,6 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -77,7 +75,6 @@ import org.mockito.ArgumentCaptor; import org.testng.Assert; import org.testng.annotations.BeforeMethod; -import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Test(groups = "broker") @@ -97,7 +94,6 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest { final String topicName = "persistent://public/default/testTopic"; final String subscriptionName = "testSubscription"; - private AtomicInteger consumerMockAvailablePermits; @BeforeMethod public void setup() throws Exception { @@ -107,8 +103,7 @@ public void setup() throws Exception { doReturn(true).when(configMock).isSubscriptionKeySharedUseConsistentHashing(); doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints(); doReturn(true).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread(); - doReturn(10).when(configMock).getDispatcherRetryBackoffInitialTimeInMs(); - doReturn(50).when(configMock).getDispatcherRetryBackoffMaxTimeInMs(); + pulsarMock = mock(PulsarService.class); doReturn(configMock).when(pulsarMock).getConfiguration(); @@ -140,8 +135,7 @@ public void setup() throws Exception { consumerMock = mock(Consumer.class); channelMock = mock(ChannelPromise.class); doReturn("consumer1").when(consumerMock).consumerName(); - consumerMockAvailablePermits = new AtomicInteger(1000); - doAnswer(invocation -> consumerMockAvailablePermits.get()).when(consumerMock).getAvailablePermits(); + doReturn(1000).when(consumerMock).getAvailablePermits(); doReturn(true).when(consumerMock).isWritable(); doReturn(channelMock).when(consumerMock).sendMessages( anyList(), @@ -459,77 +453,6 @@ public void testMessageRedelivery() throws Exception { allEntries.forEach(entry -> entry.release()); } - @DataProvider(name = "dispatchMessagesInSubscriptionThread") - private Object[][] dispatchMessagesInSubscriptionThread() { - return new Object[][] { { false }, { true } }; - } - - @Test(dataProvider = "dispatchMessagesInSubscriptionThread") - public void testBackoffDelayWhenNoMessagesDispatched(boolean dispatchMessagesInSubscriptionThread) - throws Exception { - persistentDispatcher.close(); - - List retryDelays = new CopyOnWriteArrayList<>(); - doReturn(dispatchMessagesInSubscriptionThread).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread(); - persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers( - topicMock, cursorMock, subscriptionMock, configMock, - new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) { - @Override - protected void reScheduleReadInMs(long readAfterMs) { - retryDelays.add(readAfterMs); - } - }; - - // add a consumer without permits to trigger the retry behavior - consumerMockAvailablePermits.set(0); - persistentDispatcher.addConsumer(consumerMock); - - // call "readEntriesComplete" directly to test the retry behavior - List entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); - persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); - Awaitility.await().untilAsserted(() -> { - assertEquals(retryDelays.size(), 1); - assertEquals(retryDelays.get(0), 10, "Initial retry delay should be 10ms"); - } - ); - // test the second retry delay - entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); - persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); - Awaitility.await().untilAsserted(() -> { - assertEquals(retryDelays.size(), 2); - double delay = retryDelays.get(1); - assertEquals(delay, 20.0, 2.0, "Second retry delay should be 20ms (jitter <-10%)"); - } - ); - // verify the max retry delay - for (int i = 0; i < 100; i++) { - entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); - persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); - } - Awaitility.await().untilAsserted(() -> { - assertEquals(retryDelays.size(), 102); - double delay = retryDelays.get(101); - assertEquals(delay, 50.0, 5.0, "Max delay should be 50ms (jitter <-10%)"); - } - ); - // unblock to check that the retry delay is reset - consumerMockAvailablePermits.set(1000); - entries = List.of(EntryImpl.create(1, 2, createMessage("message2", 1, "key2"))); - persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); - // wait that the possibly async handling has completed - Awaitility.await().untilAsserted(() -> assertFalse(persistentDispatcher.isSendInProgress())); - - // now block again to check the next retry delay so verify it was reset - consumerMockAvailablePermits.set(0); - entries = List.of(EntryImpl.create(1, 3, createMessage("message3", 1, "key3"))); - persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); - Awaitility.await().untilAsserted(() -> { - assertEquals(retryDelays.size(), 103); - assertEquals(retryDelays.get(0), 10, "Resetted retry delay should be 10ms"); - } - ); - } - private ByteBuf createMessage(String message, int sequenceId) { return createMessage(message, sequenceId, "testKey"); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java index e6968a9e84367..1ff835732aab5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java @@ -163,9 +163,6 @@ protected void startBroker() throws Exception { conf.setBrokerDeduplicationEnabled(true); conf.setTransactionBufferSnapshotMaxTransactionCount(2); conf.setTransactionBufferSnapshotMinTimeInMillis(2000); - // Disable the dispatcher retry backoff in tests by default - conf.setDispatcherRetryBackoffInitialTimeInMs(0); - conf.setDispatcherRetryBackoffMaxTimeInMs(0); serviceConfigurationList.add(conf); PulsarTestContext.Builder testContextBuilder =