Skip to content

Commit

Permalink
[fix][broker]Prevent StackOverFlowException in SHARED subscription(a…
Browse files Browse the repository at this point in the history
…pache#16968)

(cherry picked from commit a88d952)
Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
mattisonchao authored and nodece committed Sep 10, 2024
1 parent 7bc4039 commit 433b22b
Showing 1 changed file with 13 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
protected volatile PositionImpl minReplayedPosition = null;
protected boolean shouldRewindBeforeReadingOrReplaying = false;
protected final String name;
protected boolean sendInProgress;
protected volatile boolean sendInProgress;
protected static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
TOTAL_AVAILABLE_PERMITS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
Expand Down Expand Up @@ -251,6 +251,14 @@ public synchronized void consumerFlow(Consumer consumer, int additionalNumberOfM
readMoreEntries();
}

/**
* We should not call readMoreEntries() recursively in the same thread as there is a risk of StackOverflowError.
*
*/
public void readMoreEntiresAsync() {
topic.getBrokerService().executor().execute(() -> readMoreEntries());
}

public synchronized void readMoreEntries() {
if (sendInProgress) {
// we cannot read more entries while sending the previous batch
Expand Down Expand Up @@ -295,9 +303,7 @@ public synchronized void readMoreEntries() {
// next entries as readCompletedEntries-callback was never called
if ((messagesToReplayFiltered.size() - deletedMessages.size()) == 0) {
havePendingReplayRead = false;
// We should not call readMoreEntries() recursively in the same thread
// as there is a risk of StackOverflowError
topic.getBrokerService().executor().execute(() -> readMoreEntries());
readMoreEntiresAsync();
}
} else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) {
log.warn("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name,
Expand Down Expand Up @@ -591,17 +597,13 @@ public final synchronized void readEntriesComplete(List<Entry> entries, Object c
}
}

protected final synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
protected final synchronized boolean sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
sendInProgress = true;
boolean readMoreEntries;
try {
readMoreEntries = trySendMessagesToConsumers(readType, entries);
return trySendMessagesToConsumers(readType, entries);
} finally {
sendInProgress = false;
}
if (readMoreEntries) {
readMoreEntries();
}
}

/**
Expand Down Expand Up @@ -890,7 +892,7 @@ public void addUnAckedMessages(int numberOfMessages) {
if (maxUnackedMessages <= 0 && blockedDispatcherOnUnackedMsgs == TRUE
&& BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) {
log.info("[{}] Dispatcher is unblocked, since maxUnackedMessagesPerSubscription=0", name);
topic.getBrokerService().executor().execute(() -> readMoreEntries());
readMoreEntiresAsync();
}

int unAckedMessages = TOTAL_UNACKED_MESSAGES_UPDATER.addAndGet(this, numberOfMessages);
Expand Down

0 comments on commit 433b22b

Please sign in to comment.