Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Cherry-pick 2.10_ds fix to block transactions in replicated namespaces #208

Merged
merged 3 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1739,6 +1739,10 @@ transactionPendingAckBatchedWriteMaxSize=4194304
# the first record in a batch.
transactionPendingAckBatchedWriteMaxDelayInMillis=1

# Block transactions if the namespace or the topic is replicated
# See https://github.com/apache/pulsar/issues/18888
blockTransactionsIfReplicationEnabled=false

### --- Packages management service configuration variables (begin) --- ###

# Enable the packages management service or not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3118,6 +3118,12 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
)
private long transactionPendingAckLogIndexMinLag = 500L;

@FieldContext(
category = CATEGORY_TRANSACTION,
doc = "Block transactions if replication is enabled on the namespace/topic."
)
private boolean blockTransactionsIfReplicationEnabled = false;

@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
Expand Down Expand Up @@ -138,6 +139,7 @@ public class Consumer {
private final String clientAddress; // IP address only, no port number included
private final MessageId startMessageId;
private final boolean isAcknowledgmentAtBatchIndexLevelEnabled;
private final boolean blockTransactionsIfReplicationEnabled;

@Getter
@Setter
Expand Down Expand Up @@ -199,10 +201,11 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
stats.setClientVersion(cnx.getClientVersion());
stats.metadata = this.metadata;

final ServiceConfiguration serviceConfiguration = subscription.getTopic().getBrokerService()
.getPulsar().getConfiguration();
if (Subscription.isIndividualAckMode(subType)) {
this.pendingAcks = ConcurrentLongLongPairHashMap.newBuilder()
.autoShrink(subscription.getTopic().getBrokerService()
.getPulsar().getConfiguration().isAutoShrinkForConsumerPendingAcksMap())
.autoShrink(serviceConfiguration.isAutoShrinkForConsumerPendingAcksMap())
.expectedItems(256)
.concurrencyLevel(1)
.build();
Expand All @@ -215,7 +218,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
this.consumerEpoch = consumerEpoch;
this.isAcknowledgmentAtBatchIndexLevelEnabled = subscription.getTopic().getBrokerService()
.getPulsar().getConfiguration().isAcknowledgmentAtBatchIndexLevelEnabled();

this.blockTransactionsIfReplicationEnabled = serviceConfiguration.isBlockTransactionsIfReplicationEnabled();
this.schemaType = schemaType;
}

Expand Down Expand Up @@ -244,6 +247,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
this.clientAddress = null;
this.startMessageId = null;
this.isAcknowledgmentAtBatchIndexLevelEnabled = false;
this.blockTransactionsIfReplicationEnabled = false;
this.schemaType = null;
MESSAGE_PERMITS_UPDATER.set(this, availablePermits);
}
Expand Down Expand Up @@ -434,6 +438,18 @@ public CompletableFuture<Void> messageAcked(CommandAck ack) {
.collect(Collectors.toMap(KeyLongValue::getKey, KeyLongValue::getValue));
}

if (subscription instanceof PersistentSubscription
&& ack.hasTxnidMostBits()
&& ack.hasTxnidLeastBits()
&& blockTransactionsIfReplicationEnabled
&& subscription.getTopic().isReplicated()) {
final CompletableFuture<Void> failed = new CompletableFuture<>();
failed.completeExceptionally(new BrokerServiceException.NotAllowedException(
"Transactions are not allowed in a namespace with replication enabled"
));
return failed;
}

if (ack.getAckType() == AckType.Cumulative) {
if (ack.getMessageIdsCount() != 1) {
log.warn("[{}] [{}] Received multi-message ack", subscription, consumerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public class Producer {
private final boolean isNonPersistentTopic;
private final boolean isShadowTopic;
private final boolean isEncrypted;
private final boolean blockTransactionsIfReplicationEnabled;

private final ProducerAccessMode accessMode;
private Optional<Long> topicEpoch;
Expand Down Expand Up @@ -153,6 +154,7 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN
this.schemaVersion = schemaVersion;
this.accessMode = accessMode;
this.topicEpoch = topicEpoch;
this.blockTransactionsIfReplicationEnabled = serviceConf.isBlockTransactionsIfReplicationEnabled();

this.clientAddress = cnx.clientSourceAddress();
this.brokerInterceptor = cnx.getBrokerService().getInterceptor();
Expand Down Expand Up @@ -186,7 +188,7 @@ public boolean isSuccessorTo(Producer other) {
}

public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize,
boolean isChunked, boolean isMarker, Position position) {
boolean isChunked, boolean isMarker, Position position) {
if (checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize, position)) {
publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked, isMarker, position);
}
Expand Down Expand Up @@ -266,6 +268,19 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he
return true;
}

private boolean checkCanProduceTxnOnTopic(long sequenceId, ByteBuf headersAndPayload) {
if (blockTransactionsIfReplicationEnabled && topic.isReplicated()) {
cnx.execute(() -> {
cnx.getCommandSender().sendSendError(producerId,
sequenceId, ServerError.NotAllowedError,
"Transactions are not allowed in a namespace with replication enabled");
cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
});
return false;
}
return true;
}

private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked,
boolean isMarker, Position position) {
MessagePublishContext messagePublishContext =
Expand Down Expand Up @@ -806,6 +821,9 @@ public void checkEncryption() {

public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, long highSequenceId,
ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker) {
if (!checkCanProduceTxnOnTopic(sequenceId, headersAndPayload)) {
return;
}
if (!checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize, null)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,12 @@
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.HashSet;
import java.util.Map;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -132,6 +134,7 @@
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
Expand All @@ -155,6 +158,7 @@
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/**
Expand Down Expand Up @@ -1669,8 +1673,8 @@ public void testEncryptionRequired() throws Exception {
Transaction txn = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS).build().get();
producer.newMessage(txn)
.value(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8))
.send();
.value(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8))
.send();
txn.commit();
}

Expand Down Expand Up @@ -1783,4 +1787,89 @@ private void getTopic(String topicName) {
});
}

@DataProvider(name = "BlockTransactionsIfReplicationEnabledValues")
public static Object[][] packageNamesProvider() {
return new Object[][]{
{false},{true}
};
}

@Test(dataProvider = "BlockTransactionsIfReplicationEnabledValues")
public void testBlockTransactionsIfReplicationEnabled(boolean block) throws Exception {
conf.setBlockTransactionsIfReplicationEnabled(block);
admin.clusters().createCluster("r1", ClusterData.builder()
.serviceUrl(getPulsarServiceList().get(0).getWebServiceAddress())
.build()
);
final String namespace = TENANT + "/txndisabledns";
admin.namespaces().createNamespace(namespace);
String topic = "persistent://" + namespace + "/block-" + block;
admin.topics().createNonPartitionedTopic(topic);
getPulsarServiceList().get(0)
.getPulsarResources()
.getNamespaceResources()
.setPolicies(NamespaceName.get(namespace), p -> {
p.replication_clusters = new HashSet<>(Arrays.asList(CLUSTER_NAME, "r1"));
return p;
});
getPulsarServiceList().get(0)
.getBrokerService()
.getTopic(topic, false)
.get()
.get()
.checkReplication()
.get();

@Cleanup
Consumer<byte[]> consumer = getConsumer(topic, "s1");
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.producerName("testBlocked").sendTimeout(0, TimeUnit.SECONDS)
.topic(topic).enableBatching(true)
.create();

Transaction transaction = pulsarClient.newTransaction()
.withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
try {
producer.newMessage(transaction)
.value("test")
.send();
if (block) {
fail();
}
} catch (PulsarClientException.NotAllowedException notAllowedException) {
if (block) {
assertEquals(notAllowedException.getMessage(), "Transactions are not allowed "
+ "in a namespace with replication enabled");
} else {
fail("unexpected exception with block=false: " + notAllowedException.getMessage());
}
}


final MessageId msgNoTxn = producer.newMessage()
.value("testnotxn")
.send();

try {
consumer.acknowledgeAsync(msgNoTxn, transaction).get();
if (block) {
fail();
}
} catch (ExecutionException ex) {
if (block) {
assertTrue(PulsarClientException.unwrap(ex.getCause()).getMessage()
.contains("Transactions are not allowed in a namespace with replication enabled"));
} else {
fail("unexpected exception with block=false: " + ex.getCause().getMessage());
}
} finally {
getPulsarServiceList().get(0)
.getPulsarResources()
.getNamespaceResources().deletePolicies(NamespaceName.get(namespace));
admin.clusters().deleteCluster("r1");
}
consumer.acknowledgeAsync(msgNoTxn).get();
}

}