Skip to content

Commit

Permalink
Revert "[fix][broker] Fix retry backoff for PersistentDispatcherMulti…
Browse files Browse the repository at this point in the history
…pleConsumers (apache#23284)"

This reverts commit d9598b5.

(cherry picked from commit 9b9e7a8)
  • Loading branch information
lhotari authored and srinath-ctds committed Sep 26, 2024
1 parent 35d086e commit 1d2de0b
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -406,15 +406,12 @@ protected void reScheduleReadInMs(long readAfterMs) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Reschedule message read in {} ms", topic.getName(), name, readAfterMs);
}
Runnable runnable = () -> {
isRescheduleReadInProgress.set(false);
readMoreEntries();
};
if (readAfterMs > 0) {
topic.getBrokerService().executor().schedule(runnable, readAfterMs, TimeUnit.MILLISECONDS);
} else {
topic.getBrokerService().executor().execute(runnable);
}
topic.getBrokerService().executor().schedule(
() -> {
isRescheduleReadInProgress.set(false);
readMoreEntries();
},
readAfterMs, TimeUnit.MILLISECONDS);
}
}

Expand Down Expand Up @@ -798,7 +795,6 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
totalBytesSent += sendMessageInfo.getTotalBytes();
}

lastNumberOfEntriesDispatched = (int) totalEntries;
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);

if (entriesToDispatch > 0) {
Expand All @@ -811,8 +807,9 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
entry.release();
});
}

lastNumberOfEntriesDispatched = entriesToDispatch;
}
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,6 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
final String topicName = "persistent://public/default/testTopic";
final String subscriptionName = "testSubscription";
private AtomicInteger consumerMockAvailablePermits;
int retryBackoffInitialTimeInMs = 10;
int retryBackoffMaxTimeInMs = 50;

@BeforeMethod
public void setup() throws Exception {
Expand All @@ -109,8 +107,8 @@ public void setup() throws Exception {
doReturn(true).when(configMock).isSubscriptionKeySharedUseConsistentHashing();
doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints();
doReturn(true).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread();
doAnswer(invocation -> retryBackoffInitialTimeInMs).when(configMock).getDispatcherRetryBackoffInitialTimeInMs();
doAnswer(invocation -> retryBackoffMaxTimeInMs).when(configMock).getDispatcherRetryBackoffMaxTimeInMs();
doReturn(10).when(configMock).getDispatcherRetryBackoffInitialTimeInMs();
doReturn(50).when(configMock).getDispatcherRetryBackoffMaxTimeInMs();
pulsarMock = mock(PulsarService.class);
doReturn(configMock).when(pulsarMock).getConfiguration();

Expand Down Expand Up @@ -461,53 +459,42 @@ public void testMessageRedelivery() throws Exception {
allEntries.forEach(entry -> entry.release());
}

@DataProvider(name = "testBackoffDelayWhenNoMessagesDispatched")
private Object[][] testBackoffDelayWhenNoMessagesDispatchedParams() {
return new Object[][] { { false, true }, { true, true }, { true, false }, { false, false } };
@DataProvider(name = "dispatchMessagesInSubscriptionThread")
private Object[][] dispatchMessagesInSubscriptionThread() {
return new Object[][] { { false }, { true } };
}

@Test(dataProvider = "testBackoffDelayWhenNoMessagesDispatched")
public void testBackoffDelayWhenNoMessagesDispatched(boolean dispatchMessagesInSubscriptionThread, boolean isKeyShared)
@Test(dataProvider = "dispatchMessagesInSubscriptionThread")
public void testBackoffDelayWhenNoMessagesDispatched(boolean dispatchMessagesInSubscriptionThread)
throws Exception {
persistentDispatcher.close();

List<Long> retryDelays = new CopyOnWriteArrayList<>();
doReturn(dispatchMessagesInSubscriptionThread).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread();

PersistentDispatcherMultipleConsumers dispatcher;
if (isKeyShared) {
dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
topicMock, cursorMock, subscriptionMock, configMock,
new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) {
@Override
protected void reScheduleReadInMs(long readAfterMs) {
retryDelays.add(readAfterMs);
}
};
} else {
dispatcher = new PersistentDispatcherMultipleConsumers(topicMock, cursorMock, subscriptionMock) {
@Override
protected void reScheduleReadInMs(long readAfterMs) {
retryDelays.add(readAfterMs);
}
};
}
persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
topicMock, cursorMock, subscriptionMock, configMock,
new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) {
@Override
protected void reScheduleReadInMs(long readAfterMs) {
retryDelays.add(readAfterMs);
}
};

// add a consumer without permits to trigger the retry behavior
consumerMockAvailablePermits.set(0);
dispatcher.addConsumer(consumerMock);
persistentDispatcher.addConsumer(consumerMock);

// call "readEntriesComplete" directly to test the retry behavior
List<Entry> entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 1);
assertEquals(retryDelays.get(0), 10, "Initial retry delay should be 10ms");
}
);
// test the second retry delay
entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 2);
double delay = retryDelays.get(1);
Expand All @@ -517,7 +504,7 @@ protected void reScheduleReadInMs(long readAfterMs) {
// verify the max retry delay
for (int i = 0; i < 100; i++) {
entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
}
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 102);
Expand All @@ -528,104 +515,21 @@ protected void reScheduleReadInMs(long readAfterMs) {
// unblock to check that the retry delay is reset
consumerMockAvailablePermits.set(1000);
entries = List.of(EntryImpl.create(1, 2, createMessage("message2", 1, "key2")));
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
// wait that the possibly async handling has completed
Awaitility.await().untilAsserted(() -> assertFalse(dispatcher.isSendInProgress()));
Awaitility.await().untilAsserted(() -> assertFalse(persistentDispatcher.isSendInProgress()));

// now block again to check the next retry delay so verify it was reset
consumerMockAvailablePermits.set(0);
entries = List.of(EntryImpl.create(1, 3, createMessage("message3", 1, "key3")));
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 103);
assertEquals(retryDelays.get(0), 10, "Resetted retry delay should be 10ms");
}
);
}

@Test(dataProvider = "testBackoffDelayWhenNoMessagesDispatched")
public void testBackoffDelayWhenRetryDelayDisabled(boolean dispatchMessagesInSubscriptionThread, boolean isKeyShared)
throws Exception {
persistentDispatcher.close();

// it should be possible to disable the retry delay
// by setting retryBackoffInitialTimeInMs and retryBackoffMaxTimeInMs to 0
retryBackoffInitialTimeInMs=0;
retryBackoffMaxTimeInMs=0;

List<Long> retryDelays = new CopyOnWriteArrayList<>();
doReturn(dispatchMessagesInSubscriptionThread).when(configMock)
.isDispatcherDispatchMessagesInSubscriptionThread();

PersistentDispatcherMultipleConsumers dispatcher;
if (isKeyShared) {
dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
topicMock, cursorMock, subscriptionMock, configMock,
new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) {
@Override
protected void reScheduleReadInMs(long readAfterMs) {
retryDelays.add(readAfterMs);
}
};
} else {
dispatcher = new PersistentDispatcherMultipleConsumers(topicMock, cursorMock, subscriptionMock) {
@Override
protected void reScheduleReadInMs(long readAfterMs) {
retryDelays.add(readAfterMs);
}
};
}

// add a consumer without permits to trigger the retry behavior
consumerMockAvailablePermits.set(0);
dispatcher.addConsumer(consumerMock);

// call "readEntriesComplete" directly to test the retry behavior
List<Entry> entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 1);
assertEquals(retryDelays.get(0), 0, "Initial retry delay should be 0ms");
}
);
// test the second retry delay
entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 2);
double delay = retryDelays.get(1);
assertEquals(delay, 0, 0, "Second retry delay should be 0ms");
}
);
// verify the max retry delay
for (int i = 0; i < 100; i++) {
entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
}
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 102);
double delay = retryDelays.get(101);
assertEquals(delay, 0, 0, "Max delay should be 0ms");
}
);
// unblock to check that the retry delay is reset
consumerMockAvailablePermits.set(1000);
entries = List.of(EntryImpl.create(1, 2, createMessage("message2", 1, "key2")));
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
// wait that the possibly async handling has completed
Awaitility.await().untilAsserted(() -> assertFalse(dispatcher.isSendInProgress()));

// now block again to check the next retry delay so verify it was reset
consumerMockAvailablePermits.set(0);
entries = List.of(EntryImpl.create(1, 3, createMessage("message3", 1, "key3")));
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 103);
assertEquals(retryDelays.get(0), 0, "Resetted retry delay should be 0ms");
}
);
}

private ByteBuf createMessage(String message, int sequenceId) {
return createMessage(message, sequenceId, "testKey");
}
Expand Down

0 comments on commit 1d2de0b

Please sign in to comment.