Skip to content

Commit

Permalink
[improve][broker] PIP-379: Enable the use of the classic implementati…
Browse files Browse the repository at this point in the history
…on of Key_Shared / Shared with feature flag (apache#23424)
  • Loading branch information
lhotari authored Oct 9, 2024
1 parent 5aadec0 commit 676fdb1
Show file tree
Hide file tree
Showing 45 changed files with 3,211 additions and 263 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,22 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
+ "The higher the number, the more equal the assignment of keys to consumers")
private int subscriptionKeySharedConsistentHashingReplicaPoints = 100;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "For persistent Key_Shared subscriptions, enables the use of the classic implementation of the "
+ "Key_Shared subscription that was used before Pulsar 4.0.0 and PIP-379.",
dynamic = true
)
private boolean subscriptionKeySharedUseClassicPersistentImplementation = false;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "For persistent Shared subscriptions, enables the use of the classic implementation of the Shared "
+ "subscription that was used before Pulsar 4.0.0.",
dynamic = true
)
private boolean subscriptionSharedUseClassicPersistentImplementation = false;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "Set the default behavior for message deduplication in the broker.\n\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
import java.time.Clock;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;

@Slf4j
public abstract class AbstractDelayedDeliveryTracker implements DelayedDeliveryTracker, TimerTask {

protected final PersistentDispatcherMultipleConsumers dispatcher;
protected final AbstractPersistentDispatcherMultipleConsumers dispatcher;

// Reference to the shared (per-broker) timer for delayed delivery
protected final Timer timer;
Expand All @@ -49,13 +49,13 @@ public abstract class AbstractDelayedDeliveryTracker implements DelayedDeliveryT

private final boolean isDelayedDeliveryDeliverAtTimeStrict;

public AbstractDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer,
public AbstractDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher, Timer timer,
long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict) {
this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict);
}

public AbstractDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer,
public AbstractDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher, Timer timer,
long tickTimeMillis, Clock clock,
boolean isDelayedDeliveryDeliverAtTimeStrict) {
this.dispatcher = dispatcher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage;
import org.apache.pulsar.broker.delayed.bucket.RecoverDelayedDeliveryTrackerException;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -78,7 +78,7 @@ public void initialize(PulsarService pulsarService) throws Exception {
}

@Override
public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
public DelayedDeliveryTracker newTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
String topicName = dispatcher.getTopic().getName();
String subscriptionName = dispatcher.getSubscription().getName();
BrokerService brokerService = dispatcher.getTopic().getBrokerService();
Expand All @@ -97,7 +97,7 @@ public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers d
}

@VisibleForTesting
BucketDelayedDeliveryTracker newTracker0(PersistentDispatcherMultipleConsumers dispatcher)
BucketDelayedDeliveryTracker newTracker0(AbstractPersistentDispatcherMultipleConsumers dispatcher)
throws RecoverDelayedDeliveryTrackerException {
return new BucketDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis,
isDelayedDeliveryDeliverAtTimeStrict, bucketSnapshotStorage, delayedDeliveryMinIndexCountPerBucket,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import com.google.common.annotations.Beta;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;

/**
* Factory of InMemoryDelayedDeliveryTracker objects. This is the entry point for implementations.
Expand All @@ -42,7 +42,7 @@ public interface DelayedDeliveryTrackerFactory extends AutoCloseable {
* @param dispatcher
* a multi-consumer dispatcher instance
*/
DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher);
DelayedDeliveryTracker newTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher);

/**
* Close the factory and release all the resources.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;

@Slf4j
Expand All @@ -52,17 +52,18 @@ public class InMemoryDelayedDeliveryTracker extends AbstractDelayedDeliveryTrack
// Track whether we have seen all messages with fixed delay so far.
private boolean messagesHaveFixedDelay = true;

InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis,
InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher, Timer timer,
long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict,
long fixedDelayDetectionLookahead) {
this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict,
fixedDelayDetectionLookahead);
}

public InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer,
long tickTimeMillis, Clock clock,
boolean isDelayedDeliveryDeliverAtTimeStrict,
long fixedDelayDetectionLookahead) {
public InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher, Timer timer,
long tickTimeMillis, Clock clock,
boolean isDelayedDeliveryDeliverAtTimeStrict,
long fixedDelayDetectionLookahead) {
super(dispatcher, timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict);
this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -51,7 +51,7 @@ public void initialize(PulsarService pulsarService) {
}

@Override
public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
public DelayedDeliveryTracker newTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
String topicName = dispatcher.getTopic().getName();
String subscriptionName = dispatcher.getSubscription().getName();
DelayedDeliveryTracker tracker = DelayedDeliveryTracker.DISABLE;
Expand All @@ -66,7 +66,7 @@ public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers d
}

@VisibleForTesting
InMemoryDelayedDeliveryTracker newTracker0(PersistentDispatcherMultipleConsumers dispatcher) {
InMemoryDelayedDeliveryTracker newTracker0(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
return new InMemoryDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis,
isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
Expand Down Expand Up @@ -105,7 +105,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker

private CompletableFuture<Void> pendingLoad = null;

public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher,
public BucketDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher,
Timer timer, long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict,
BucketSnapshotStorage bucketSnapshotStorage,
Expand All @@ -117,7 +117,7 @@ public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispat
maxIndexesPerBucketSnapshotSegment, maxNumBuckets);
}

public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher,
public BucketDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher,
Timer timer, long tickTimeMillis, Clock clock,
boolean isDelayedDeliveryDeliverAtTimeStrict,
BucketSnapshotStorage bucketSnapshotStorage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
*
Expand Down Expand Up @@ -239,7 +236,4 @@ private int getFirstConsumerIndexOfPriority(int targetPriority) {
return -1;
}

private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);


}
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@
import org.apache.pulsar.broker.service.TopicEventsListener.EventStage;
import org.apache.pulsar.broker.service.TopicEventsListener.TopicEvent;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SystemTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
Expand Down Expand Up @@ -301,7 +301,7 @@ public class BrokerService implements Closeable {
private final int maxUnackedMessages;
public final int maxUnackedMsgsPerDispatcher;
private final AtomicBoolean blockedDispatcherOnHighUnackedMsgs = new AtomicBoolean(false);
private final Set<PersistentDispatcherMultipleConsumers> blockedDispatchers = ConcurrentHashMap.newKeySet();
private final Set<AbstractPersistentDispatcherMultipleConsumers> blockedDispatchers = ConcurrentHashMap.newKeySet();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
@VisibleForTesting
private final DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory;
Expand Down Expand Up @@ -3328,7 +3328,7 @@ public OrderedExecutor getTopicOrderedExecutor() {
* @param dispatcher
* @param numberOfMessages
*/
public void addUnAckedMessages(PersistentDispatcherMultipleConsumers dispatcher, int numberOfMessages) {
public void addUnAckedMessages(AbstractPersistentDispatcherMultipleConsumers dispatcher, int numberOfMessages) {
// don't block dispatchers if maxUnackedMessages = 0
if (maxUnackedMessages > 0) {
totalUnackedMessages.add(numberOfMessages);
Expand Down Expand Up @@ -3387,10 +3387,10 @@ private void blockDispatchersWithLargeUnAckMessages() {
try {
forEachTopic(topic -> {
topic.getSubscriptions().forEach((subName, persistentSubscription) -> {
if (persistentSubscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) {
PersistentDispatcherMultipleConsumers dispatcher =
(PersistentDispatcherMultipleConsumers) persistentSubscription
.getDispatcher();
if (persistentSubscription.getDispatcher()
instanceof AbstractPersistentDispatcherMultipleConsumers) {
AbstractPersistentDispatcherMultipleConsumers dispatcher =
(AbstractPersistentDispatcherMultipleConsumers) persistentSubscription.getDispatcher();
int dispatcherUnAckMsgs = dispatcher.getTotalUnackedMessages();
if (dispatcherUnAckMsgs > maxUnackedMsgsPerDispatcher) {
log.info("[{}] Blocking dispatcher due to reached max broker limit {}",
Expand All @@ -3411,7 +3411,7 @@ private void blockDispatchersWithLargeUnAckMessages() {
*
* @param dispatcherList
*/
public void unblockDispatchersOnUnAckMessages(List<PersistentDispatcherMultipleConsumers> dispatcherList) {
public void unblockDispatchersOnUnAckMessages(List<AbstractPersistentDispatcherMultipleConsumers> dispatcherList) {
lock.writeLock().lock();
try {
dispatcherList.forEach(dispatcher -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public class Consumer {

private static final double avgPercent = 0.9;
private boolean preciseDispatcherFlowControl;
private Position lastSentPositionWhenJoining;
private Position readPositionWhenJoining;
private final String clientAddress; // IP address only, no port number included
private final MessageId startMessageId;
private final boolean isAcknowledgmentAtBatchIndexLevelEnabled;
Expand Down Expand Up @@ -973,8 +973,8 @@ public ConsumerStatsImpl getStats() {
stats.unackedMessages = unackedMessages;
stats.blockedConsumerOnUnackedMsgs = blockedConsumerOnUnackedMsgs;
stats.avgMessagesPerEntry = getAvgMessagesPerEntry();
if (lastSentPositionWhenJoining != null) {
stats.lastSentPositionWhenJoining = lastSentPositionWhenJoining.toString();
if (readPositionWhenJoining != null) {
stats.readPositionWhenJoining = readPositionWhenJoining.toString();
}
return stats;
}
Expand Down Expand Up @@ -1189,8 +1189,8 @@ public boolean isPreciseDispatcherFlowControl() {
return preciseDispatcherFlowControl;
}

public void setLastSentPositionWhenJoining(Position lastSentPositionWhenJoining) {
this.lastSentPositionWhenJoining = lastSentPositionWhenJoining;
public void setReadPositionWhenJoining(Position readPositionWhenJoining) {
this.readPositionWhenJoining = readPositionWhenJoining;
}

public int getMaxUnackedMessages() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ default void afterAckMessages(Throwable exOfDeletion, Object ctxOfDeletion){}

/**
* Trigger a new "readMoreEntries" if the dispatching has been paused before. This method is only implemented in
* {@link org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers} right now, other
* implements are not necessary to implement this method.
* {@link org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers} right now,
* other implementations do not necessary implement this method.
* @return did a resume.
*/
default boolean checkAndResumeIfPaused(){
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;

public interface StickyKeyDispatcher extends Dispatcher {

boolean hasSameKeySharedPolicy(KeySharedMeta ksm);

Map<Consumer, List<Range>> getConsumerKeyHashRanges();

boolean isAllowOutOfOrderDelivery();

KeySharedMode getKeySharedMode();

StickyKeyConsumerSelector getSelector();

long getNumberOfMessagesInReplay();

default LinkedHashMap<Consumer, Position> getRecentlyJoinedConsumers() {
return null;
}

boolean isClassic();
}
Loading

0 comments on commit 676fdb1

Please sign in to comment.