Skip to content

Commit

Permalink
[improve][client] Make replicateSubscriptionState nullable (apache#23757
Browse files Browse the repository at this point in the history
)

Signed-off-by: Zixuan Liu <[email protected]>

(cherry picked from commit 3fce309)
Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece committed Dec 25, 2024
1 parent 0ff335c commit 887d518
Show file tree
Hide file tree
Showing 10 changed files with 186 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -984,8 +984,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
? subscribe.getStartMessageRollbackDurationSec()
: -1;
final SchemaData schema = subscribe.hasSchema() ? getSchema(subscribe.getSchema()) : null;
final boolean isReplicated = subscribe.hasReplicateSubscriptionState()
&& subscribe.isReplicateSubscriptionState();
final Boolean isReplicated =
subscribe.hasReplicateSubscriptionState() ? subscribe.isReplicateSubscriptionState() : null;
final boolean forceTopicCreation = subscribe.isForceTopicCreation();
final KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta()
? new KeySharedMeta().copyFrom(subscribe.getKeySharedMeta())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class SubscriptionOption {
private boolean readCompacted;
private CommandSubscribe.InitialPosition initialPosition;
private long startMessageRollbackDurationSec;
private boolean replicatedSubscriptionStateArg;
private Boolean replicatedSubscriptionStateArg;
private KeySharedMeta keySharedMeta;
private Optional<Map<String, String>> subscriptionProperties;
private long consumerEpoch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public CompletableFuture<Consumer> subscribe(SubscriptionOption option) {
option.getSubType(), option.getPriorityLevel(), option.getConsumerName(),
option.isDurable(), option.getStartMessageId(), option.getMetadata(),
option.isReadCompacted(), option.getInitialPosition(),
option.getStartMessageRollbackDurationSec(), option.isReplicatedSubscriptionStateArg(),
option.getStartMessageRollbackDurationSec(), option.getReplicatedSubscriptionStateArg(),
option.getKeySharedMeta(), option.getSubscriptionProperties().orElse(null));
}

Expand All @@ -269,7 +269,7 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
MessageId startMessageId, Map<String, String> metadata,
boolean readCompacted, InitialPosition initialPosition,
long resetStartMessageBackInSec,
boolean replicateSubscriptionState,
Boolean replicateSubscriptionState,
KeySharedMeta keySharedMeta,
Map<String, String> subscriptionProperties) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,29 +119,33 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
static {
REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
}
private volatile Boolean replicatedControlled;

static Map<String, Long> getBaseCursorProperties(boolean isReplicated) {
return isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
static Map<String, Long> getBaseCursorProperties(Boolean isReplicated) {
return isReplicated != null && isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES :
NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
}

static boolean isCursorFromReplicatedSubscription(ManagedCursor cursor) {
return cursor.getProperties().containsKey(REPLICATED_SUBSCRIPTION_PROPERTY);
}

public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor,
boolean replicated) {
Boolean replicated) {
this(topic, subscriptionName, cursor, replicated, Collections.emptyMap());
}

public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor,
boolean replicated, Map<String, String> subscriptionProperties) {
Boolean replicated, Map<String, String> subscriptionProperties) {
this.topic = topic;
this.cursor = cursor;
this.topicName = topic.getName();
this.subName = subscriptionName;
this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString();
this.expiryMonitor = new PersistentMessageExpiryMonitor(topic, subscriptionName, cursor, this);
this.setReplicated(replicated);
if (replicated != null) {
this.setReplicated(replicated);
}
this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties);
if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
Expand Down Expand Up @@ -179,6 +183,7 @@ public boolean isReplicated() {
}

public boolean setReplicated(boolean replicated) {
replicatedControlled = replicated;
ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig();

if (!replicated || !config.isEnableReplicatedSubscriptions()) {
Expand Down Expand Up @@ -1230,4 +1235,9 @@ public boolean checkIfPendingAckStoreInit() {
public PendingAckHandle getPendingAckHandle() {
return pendingAckHandle;
}

@VisibleForTesting
public Boolean getReplicatedControlled() {
return replicatedControlled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ private CompletableFuture<Void> removeOrphanReplicationCursors() {
}

private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor,
boolean replicated, Map<String, String> subscriptionProperties) {
Boolean replicated, Map<String, String> subscriptionProperties) {
checkNotNull(compactedTopic);
if (subscriptionName.equals(COMPACTION_SUBSCRIPTION)) {
return new CompactorSubscription(this, compactedTopic, subscriptionName, cursor);
Expand Down Expand Up @@ -685,7 +685,7 @@ public CompletableFuture<Consumer> subscribe(SubscriptionOption option) {
option.getSubType(), option.getPriorityLevel(), option.getConsumerName(), option.isDurable(),
option.getStartMessageId(), option.getMetadata(), option.isReadCompacted(),
option.getInitialPosition(), option.getStartMessageRollbackDurationSec(),
option.isReplicatedSubscriptionStateArg(), option.getKeySharedMeta(),
option.getReplicatedSubscriptionStateArg(), option.getKeySharedMeta(),
option.getSubscriptionProperties().orElse(Collections.emptyMap()), option.getConsumerEpoch());
}

Expand All @@ -696,7 +696,7 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
Map<String, String> metadata, boolean readCompacted,
InitialPosition initialPosition,
long startMessageRollbackDurationSec,
boolean replicatedSubscriptionStateArg,
Boolean replicatedSubscriptionStateArg,
KeySharedMeta keySharedMeta,
Map<String, String> subscriptionProperties,
long consumerEpoch) {
Expand All @@ -706,12 +706,9 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
}

return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> {
boolean replicatedSubscriptionState = replicatedSubscriptionStateArg;

if (replicatedSubscriptionState
if (replicatedSubscriptionStateArg != null && replicatedSubscriptionStateArg
&& !brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions()) {
log.warn("[{}] Replicated Subscription is disabled by broker.", getName());
replicatedSubscriptionState = false;
}

if (subType == SubType.Key_Shared
Expand Down Expand Up @@ -777,9 +774,9 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
lock.readLock().unlock();
}

CompletableFuture<? extends Subscription> subscriptionFuture = isDurable ? //
getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec,
replicatedSubscriptionState, subscriptionProperties)
CompletableFuture<? extends Subscription> subscriptionFuture = isDurable
? getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec,
replicatedSubscriptionStateArg, subscriptionProperties)
: getNonDurableSubscription(subscriptionName, startMessageId, initialPosition,
startMessageRollbackDurationSec, readCompacted, subscriptionProperties);

Expand Down Expand Up @@ -865,7 +862,9 @@ public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subs
}

private CompletableFuture<Subscription> getDurableSubscription(String subscriptionName,
InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean replicated,
InitialPosition initialPosition,
long startMessageRollbackDurationSec,
Boolean replicated,
Map<String, String> subscriptionProperties) {
CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) {
Expand Down Expand Up @@ -897,7 +896,7 @@ public void openCursorComplete(ManagedCursor cursor, Object ctx) {
return;
}
}
if (replicated && !subscription.isReplicated()) {
if (replicated != null && replicated && !subscription.isReplicated()) {
// Flip the subscription state
subscription.setReplicated(replicated);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class ReplicateSubscriptionTest extends ProducerConsumerBase {

@BeforeClass
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Override
protected void doInitConf() throws Exception {
super.doInitConf();
}

@DataProvider
public Object[] replicateSubscriptionState() {
return new Object[]{
Boolean.TRUE,
Boolean.FALSE,
null
};
}

@Test(dataProvider = "replicateSubscriptionState")
public void testReplicateSubscriptionState(Boolean replicateSubscriptionState)
throws Exception {
String topic = "persistent://my-property/my-ns/" + System.nanoTime();
String subName = "sub-" + System.nanoTime();
ConsumerBuilder<String> consumerBuilder = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subName);
if (replicateSubscriptionState != null) {
consumerBuilder.replicateSubscriptionState(replicateSubscriptionState);
}
ConsumerBuilderImpl consumerBuilderImpl = (ConsumerBuilderImpl) consumerBuilder;
assertEquals(consumerBuilderImpl.getConf().getReplicateSubscriptionState(), replicateSubscriptionState);
@Cleanup
Consumer<String> ignored = consumerBuilder.subscribe();
CompletableFuture<Optional<Topic>> topicIfExists = pulsar.getBrokerService().getTopicIfExists(topic);
assertThat(topicIfExists)
.succeedsWithin(3, TimeUnit.SECONDS)
.matches(optionalTopic -> {
assertTrue(optionalTopic.isPresent());
Topic topicRef = optionalTopic.get();
Subscription subscription = topicRef.getSubscription(subName);
assertNotNull(subscription);
assertTrue(subscription instanceof PersistentSubscription);
PersistentSubscription persistentSubscription = (PersistentSubscription) subscription;
assertEquals(persistentSubscription.getReplicatedControlled(), replicateSubscriptionState);
return true;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,7 @@ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
setClientCnx(cnx);
ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(),
priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted,
conf.isReplicateSubscriptionState(),
conf.getReplicateSubscriptionState(),
InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
startMessageRollbackDuration, si, createTopicIfDoesNotExist, conf.getKeySharedPolicy(),
// Use the current epoch to subscribe.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import static com.google.common.base.Preconditions.checkArgument;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Sets;
import io.swagger.annotations.ApiModelProperty;
import java.io.Serializable;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -145,7 +147,13 @@ public int getMaxPendingChuckedMessage() {

private long autoUpdatePartitionsIntervalSeconds = 60;

private boolean replicateSubscriptionState = false;
@ApiModelProperty(
name = "replicateSubscriptionState",
value = "If `replicateSubscriptionState` is enabled, a subscription state is replicated to geo-replicated"
+ " clusters."
)
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private Boolean replicateSubscriptionState;

private boolean resetIncludeHead = false;

Expand Down Expand Up @@ -185,4 +193,14 @@ public ConsumerConfigurationData<T> clone() {
throw new RuntimeException("Failed to clone ConsumerConfigurationData");
}
}

/**
* Backward compatibility with the old `replicateSubscriptionState` field.
* @deprecated Using {@link #getReplicateSubscriptionState()} instead.
*/
@JsonIgnore
@Deprecated
public boolean isReplicateSubscriptionState() {
return replicateSubscriptionState != null && replicateSubscriptionState;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ public void testLoadConf() throws Exception {
assertTrue(configurationData.isRetryEnable());
assertFalse(configurationData.isAutoUpdatePartitions());
assertEquals(configurationData.getAutoUpdatePartitionsIntervalSeconds(), 2);
assertTrue(configurationData.isReplicateSubscriptionState());
assertEquals(configurationData.getReplicateSubscriptionState(), Boolean.TRUE);
assertTrue(configurationData.isResetIncludeHead());
assertTrue(configurationData.isBatchIndexAckEnabled());
assertTrue(configurationData.isAckReceiptEnabled());
Expand Down Expand Up @@ -518,7 +518,7 @@ public void testLoadConfNotModified() {
assertFalse(configurationData.isRetryEnable());
assertTrue(configurationData.isAutoUpdatePartitions());
assertEquals(configurationData.getAutoUpdatePartitionsIntervalSeconds(), 60);
assertFalse(configurationData.isReplicateSubscriptionState());
assertNull(configurationData.getReplicateSubscriptionState());
assertFalse(configurationData.isResetIncludeHead());
assertFalse(configurationData.isBatchIndexAckEnabled());
assertFalse(configurationData.isAckReceiptEnabled());
Expand All @@ -537,6 +537,38 @@ public void testLoadConfNotModified() {
assertNull(configurationData.getPayloadProcessor());
}

@Test
public void testReplicateSubscriptionState() {
ConsumerBuilderImpl<byte[]> consumerBuilder = createConsumerBuilder();
assertNull(consumerBuilder.getConf().getReplicateSubscriptionState());

consumerBuilder.replicateSubscriptionState(true);
assertEquals(consumerBuilder.getConf().getReplicateSubscriptionState(), Boolean.TRUE);

consumerBuilder.replicateSubscriptionState(false);
assertEquals(consumerBuilder.getConf().getReplicateSubscriptionState(), Boolean.FALSE);

Map<String, Object> conf = new HashMap<>();
consumerBuilder = createConsumerBuilder();
consumerBuilder.loadConf(conf);
assertNull(consumerBuilder.getConf().getReplicateSubscriptionState());

conf.put("replicateSubscriptionState", true);
consumerBuilder = createConsumerBuilder();
consumerBuilder.loadConf(conf);
assertEquals(consumerBuilder.getConf().getReplicateSubscriptionState(), Boolean.TRUE);

conf.put("replicateSubscriptionState", false);
consumerBuilder = createConsumerBuilder();
consumerBuilder.loadConf(conf);
assertEquals(consumerBuilder.getConf().getReplicateSubscriptionState(), Boolean.FALSE);

conf.put("replicateSubscriptionState", null);
consumerBuilder = createConsumerBuilder();
consumerBuilder.loadConf(conf);
assertNull(consumerBuilder.getConf().getReplicateSubscriptionState());
}

private ConsumerBuilderImpl<byte[]> createConsumerBuilder() {
ConsumerBuilderImpl<byte[]> consumerBuilder = new ConsumerBuilderImpl<>(null, Schema.BYTES);
Map<String, String> properties = new HashMap<>();
Expand Down
Loading

0 comments on commit 887d518

Please sign in to comment.