Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][misc] Sync commits from apache into 3.1_ds #321

Merged
merged 5 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -463,16 +463,6 @@ dispatcherReadFailureBackoffMaxTimeInMs=60000
# The read failure backoff mandatory stop time in milliseconds. By default it is 0s.
dispatcherReadFailureBackoffMandatoryStopTimeInMs=0

# On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered
# out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff
# delay. This parameter sets the initial backoff delay in milliseconds.
dispatcherRetryBackoffInitialTimeInMs=100

# On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered
# out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff
# delay. This parameter sets the maximum backoff delay in milliseconds.
dispatcherRetryBackoffMaxTimeInMs=1000

# Precise dispathcer flow control according to history message number of each entry
preciseDispatcherFlowControl=false

Expand Down
10 changes: 0 additions & 10 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -279,16 +279,6 @@ dispatcherReadFailureBackoffMaxTimeInMs=60000
# The read failure backoff mandatory stop time in milliseconds. By default it is 0s.
dispatcherReadFailureBackoffMandatoryStopTimeInMs=0

# On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered
# out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff
# delay. This parameter sets the initial backoff delay in milliseconds.
dispatcherRetryBackoffInitialTimeInMs=100

# On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered
# out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff
# delay. This parameter sets the maximum backoff delay in milliseconds.
dispatcherRetryBackoffMaxTimeInMs=1000

# Precise dispatcher flow control according to history message number of each entry
preciseDispatcherFlowControl=false

Expand Down
10 changes: 5 additions & 5 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -476,11 +476,11 @@ The Apache Software License, Version 2.0
* JCTools - Java Concurrency Tools for the JVM
- org.jctools-jctools-core-2.1.2.jar
* Vertx
- io.vertx-vertx-auth-common-4.5.8.jar
- io.vertx-vertx-bridge-common-4.5.8.jar
- io.vertx-vertx-core-4.5.8.jar
- io.vertx-vertx-web-4.5.8.jar
- io.vertx-vertx-web-common-4.5.8.jar
- io.vertx-vertx-auth-common-4.5.10.jar
- io.vertx-vertx-bridge-common-4.5.10.jar
- io.vertx-vertx-core-4.5.10.jar
- io.vertx-vertx-web-4.5.10.jar
- io.vertx-vertx-web-common-4.5.10.jar
* Apache ZooKeeper
- org.apache.zookeeper-zookeeper-3.9.2.jar
- org.apache.zookeeper-zookeeper-jute-3.9.2.jar
Expand Down
1 change: 0 additions & 1 deletion docker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
<configuration>
<skip>false</skip>
<injectAllReactorProjects>true</injectAllReactorProjects>
<runOnlyOnce>true</runOnlyOnce>
<skipPoms>false</skipPoms>
</configuration>
</plugin>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ flexible messaging model and an intuitive client API.</description>
<jersey.version>2.42</jersey.version>
<athenz.version>1.10.50</athenz.version>
<prometheus.version>0.16.0</prometheus.version>
<vertx.version>4.5.8</vertx.version>
<vertx.version>4.5.10</vertx.version>
<rocksdb.version>7.9.2</rocksdb.version>
<slf4j.version>1.7.32</slf4j.version>
<commons.collections4.version>4.4</commons.collections4.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1174,20 +1174,6 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
)
private int dispatcherReadFailureBackoffMandatoryStopTimeInMs = 0;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered "
+ "out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff "
+ "delay. This parameter sets the initial backoff delay in milliseconds.")
private int dispatcherRetryBackoffInitialTimeInMs = 100;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered "
+ "out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff "
+ "delay. This parameter sets the maximum backoff delay in milliseconds.")
private int dispatcherRetryBackoffMaxTimeInMs = 1000;

@FieldContext(
dynamic = true,
category = CATEGORY_SERVER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
Expand Down Expand Up @@ -85,6 +84,7 @@
*/
public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMultipleConsumers
implements Dispatcher, ReadEntriesCallback {

protected final PersistentTopic topic;
protected final ManagedCursor cursor;
protected volatile Range<PositionImpl> lastIndividualDeletedRangeFromCursorRecovery;
Expand Down Expand Up @@ -122,8 +122,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
private AtomicBoolean isRescheduleReadInProgress = new AtomicBoolean(false);
protected final ExecutorService dispatchMessagesThread;
private final SharedConsumerAssignor assignor;
protected int lastNumberOfEntriesDispatched;
private final Backoff retryBackoff;

protected enum ReadType {
Normal, Replay
}
Expand All @@ -148,15 +147,10 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
this.initializeDispatchRateLimiterIfNeeded();
this.assignor = new SharedConsumerAssignor(this::getNextConsumer, this::addMessageToReplay);
ServiceConfiguration serviceConfiguration = topic.getBrokerService().pulsar().getConfiguration();
this.readFailureBackoff = new Backoff(
serviceConfiguration.getDispatcherReadFailureBackoffInitialTimeInMs(),
topic.getBrokerService().pulsar().getConfiguration().getDispatcherReadFailureBackoffInitialTimeInMs(),
TimeUnit.MILLISECONDS,
1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
retryBackoff = new Backoff(
serviceConfiguration.getDispatcherRetryBackoffInitialTimeInMs(), TimeUnit.MILLISECONDS,
serviceConfiguration.getDispatcherRetryBackoffMaxTimeInMs(), TimeUnit.MILLISECONDS,
0, TimeUnit.MILLISECONDS);
}

@Override
Expand Down Expand Up @@ -398,23 +392,16 @@ public synchronized void readMoreEntries() {

@Override
protected void reScheduleRead() {
reScheduleReadInMs(MESSAGE_RATE_BACKOFF_MS);
}

protected void reScheduleReadInMs(long readAfterMs) {
if (isRescheduleReadInProgress.compareAndSet(false, true)) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Reschedule message read in {} ms", topic.getName(), name, readAfterMs);
}
Runnable runnable = () -> {
isRescheduleReadInProgress.set(false);
readMoreEntries();
};
if (readAfterMs > 0) {
topic.getBrokerService().executor().schedule(runnable, readAfterMs, TimeUnit.MILLISECONDS);
} else {
topic.getBrokerService().executor().execute(runnable);
log.debug("[{}] [{}] Reschedule message read in {} ms", topic.getName(), name, MESSAGE_RATE_BACKOFF_MS);
}
topic.getBrokerService().executor().schedule(
() -> {
isRescheduleReadInProgress.set(false);
readMoreEntries();
},
MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
}
}

Expand Down Expand Up @@ -625,8 +612,8 @@ public final synchronized void readEntriesComplete(List<Entry> entries, Object c
log.debug("[{}] Distributing {} messages to {} consumers", name, entries.size(), consumerList.size());
}

long totalBytesSize = entries.stream().mapToLong(Entry::getLength).sum();
updatePendingBytesToDispatch(totalBytesSize);
long size = entries.stream().mapToLong(Entry::getLength).sum();
updatePendingBytesToDispatch(size);

// dispatch messages to a separate thread, but still in order for this subscription
// sendMessagesToConsumers is responsible for running broker-side filters
Expand All @@ -636,28 +623,19 @@ public final synchronized void readEntriesComplete(List<Entry> entries, Object c
// in a separate thread, and we want to prevent more reads
acquireSendInProgress();
dispatchMessagesThread.execute(() -> {
handleSendingMessagesAndReadingMore(readType, entries, false, totalBytesSize);
if (sendMessagesToConsumers(readType, entries, false)) {
updatePendingBytesToDispatch(-size);
readMoreEntries();
} else {
updatePendingBytesToDispatch(-size);
}
});
} else {
handleSendingMessagesAndReadingMore(readType, entries, true, totalBytesSize);
}
}

private synchronized void handleSendingMessagesAndReadingMore(ReadType readType, List<Entry> entries,
boolean needAcquireSendInProgress,
long totalBytesSize) {
boolean triggerReadingMore = sendMessagesToConsumers(readType, entries, needAcquireSendInProgress);
int entriesDispatched = lastNumberOfEntriesDispatched;
updatePendingBytesToDispatch(-totalBytesSize);
if (triggerReadingMore) {
if (entriesDispatched > 0) {
// Reset the backoff when we successfully dispatched messages
retryBackoff.reset();
// Call readMoreEntries in the same thread to trigger the next read
readMoreEntries();
} else if (entriesDispatched == 0) {
// If no messages were dispatched, we need to reschedule a new read with an increasing backoff delay
reScheduleReadInMs(retryBackoff.next());
if (sendMessagesToConsumers(readType, entries, true)) {
updatePendingBytesToDispatch(-size);
readMoreEntriesAsync();
} else {
updatePendingBytesToDispatch(-size);
}
}
}
Expand Down Expand Up @@ -696,7 +674,6 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
if (needTrimAckedMessages()) {
cursor.trimDeletedEntries(entries);
}
lastNumberOfEntriesDispatched = 0;

int entriesToDispatch = entries.size();
// Trigger read more messages
Expand Down Expand Up @@ -798,7 +775,6 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
totalBytesSent += sendMessageInfo.getTotalBytes();
}

lastNumberOfEntriesDispatched = (int) totalEntries;
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);

if (entriesToDispatch > 0) {
Expand All @@ -812,7 +788,6 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
entry.release();
});
}

return true;
}

Expand Down Expand Up @@ -874,7 +849,6 @@ private boolean sendChunkedMessagesToConsumers(ReadType readType,
totalBytesSent += sendMessageInfo.getTotalBytes();
}

lastNumberOfEntriesDispatched = (int) totalEntries;
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);

return numConsumers.get() == 0; // trigger a new readMoreEntries() call
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ protected Map<Consumer, List<PositionImpl>> initialValue() throws Exception {

@Override
protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List<Entry> entries) {
lastNumberOfEntriesDispatched = 0;
long totalMessagesSent = 0;
long totalBytesSent = 0;
long totalEntries = 0;
Expand Down Expand Up @@ -313,8 +312,6 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
}
}


lastNumberOfEntriesDispatched = (int) totalEntries;
// acquire message-dispatch permits for already delivered messages
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,6 @@ protected void doInitConf() throws Exception {
this.conf.setWebServicePort(Optional.of(0));
this.conf.setNumExecutorThreadPoolSize(5);
this.conf.setExposeBundlesMetricsInPrometheus(true);
// Disable the dispatcher retry backoff in tests by default
this.conf.setDispatcherRetryBackoffInitialTimeInMs(0);
this.conf.setDispatcherRetryBackoffMaxTimeInMs(0);
}

protected final void init() throws Exception {
Expand Down
Loading
Loading