diff --git a/conf/broker.conf b/conf/broker.conf index 4ad8536fd8d68..293042e15dbe4 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1739,6 +1739,10 @@ transactionPendingAckBatchedWriteMaxSize=4194304 # the first record in a batch. transactionPendingAckBatchedWriteMaxDelayInMillis=1 +# Block transactions if the namespace or the topic is replicated +# See https://github.com/apache/pulsar/issues/18888 +blockTransactionsIfReplicationEnabled=false + ### --- Packages management service configuration variables (begin) --- ### # Enable the packages management service or not diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index fb26775591345..e000f7ff13e84 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -3118,6 +3118,12 @@ The delayed message index time step(in seconds) in per bucket snapshot segment, ) private long transactionPendingAckLogIndexMinLag = 500L; + @FieldContext( + category = CATEGORY_TRANSACTION, + doc = "Block transactions if replication is enabled on the namespace/topic." + ) + private boolean blockTransactionsIfReplicationEnabled = false; + @FieldContext( category = CATEGORY_SERVER, dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index ecf8682084803..a9ef268e7eea9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -43,6 +43,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -138,6 +139,7 @@ public class Consumer { private final String clientAddress; // IP address only, no port number included private final MessageId startMessageId; private final boolean isAcknowledgmentAtBatchIndexLevelEnabled; + private final boolean blockTransactionsIfReplicationEnabled; @Getter @Setter @@ -199,10 +201,11 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo stats.setClientVersion(cnx.getClientVersion()); stats.metadata = this.metadata; + final ServiceConfiguration serviceConfiguration = subscription.getTopic().getBrokerService() + .getPulsar().getConfiguration(); if (Subscription.isIndividualAckMode(subType)) { this.pendingAcks = ConcurrentLongLongPairHashMap.newBuilder() - .autoShrink(subscription.getTopic().getBrokerService() - .getPulsar().getConfiguration().isAutoShrinkForConsumerPendingAcksMap()) + .autoShrink(serviceConfiguration.isAutoShrinkForConsumerPendingAcksMap()) .expectedItems(256) .concurrencyLevel(1) .build(); @@ -215,7 +218,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo this.consumerEpoch = consumerEpoch; this.isAcknowledgmentAtBatchIndexLevelEnabled = subscription.getTopic().getBrokerService() .getPulsar().getConfiguration().isAcknowledgmentAtBatchIndexLevelEnabled(); - + this.blockTransactionsIfReplicationEnabled = serviceConfiguration.isBlockTransactionsIfReplicationEnabled(); this.schemaType = schemaType; } @@ -244,6 +247,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo this.clientAddress = null; this.startMessageId = null; this.isAcknowledgmentAtBatchIndexLevelEnabled = false; + this.blockTransactionsIfReplicationEnabled = false; this.schemaType = null; MESSAGE_PERMITS_UPDATER.set(this, availablePermits); } @@ -434,6 +438,18 @@ public CompletableFuture messageAcked(CommandAck ack) { .collect(Collectors.toMap(KeyLongValue::getKey, KeyLongValue::getValue)); } + if (subscription instanceof PersistentSubscription + && ack.hasTxnidMostBits() + && ack.hasTxnidLeastBits() + && blockTransactionsIfReplicationEnabled + && subscription.getTopic().isReplicated()) { + final CompletableFuture failed = new CompletableFuture<>(); + failed.completeExceptionally(new BrokerServiceException.NotAllowedException( + "Transactions are not allowed in a namespace with replication enabled" + )); + return failed; + } + if (ack.getAckType() == AckType.Cumulative) { if (ack.getMessageIdsCount() != 1) { log.warn("[{}] [{}] Received multi-message ack", subscription, consumerId); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index f7d2bb2dd2797..fec2d52a239cc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -91,6 +91,7 @@ public class Producer { private final boolean isNonPersistentTopic; private final boolean isShadowTopic; private final boolean isEncrypted; + private final boolean blockTransactionsIfReplicationEnabled; private final ProducerAccessMode accessMode; private Optional topicEpoch; @@ -153,6 +154,7 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN this.schemaVersion = schemaVersion; this.accessMode = accessMode; this.topicEpoch = topicEpoch; + this.blockTransactionsIfReplicationEnabled = serviceConf.isBlockTransactionsIfReplicationEnabled(); this.clientAddress = cnx.clientSourceAddress(); this.brokerInterceptor = cnx.getBrokerService().getInterceptor(); @@ -186,7 +188,7 @@ public boolean isSuccessorTo(Producer other) { } public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize, - boolean isChunked, boolean isMarker, Position position) { + boolean isChunked, boolean isMarker, Position position) { if (checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize, position)) { publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked, isMarker, position); } @@ -266,6 +268,19 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he return true; } + private boolean checkCanProduceTxnOnTopic(long sequenceId, ByteBuf headersAndPayload) { + if (blockTransactionsIfReplicationEnabled && topic.isReplicated()) { + cnx.execute(() -> { + cnx.getCommandSender().sendSendError(producerId, + sequenceId, ServerError.NotAllowedError, + "Transactions are not allowed in a namespace with replication enabled"); + cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes()); + }); + return false; + } + return true; + } + private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked, boolean isMarker, Position position) { MessagePublishContext messagePublishContext = @@ -806,6 +821,9 @@ public void checkEncryption() { public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, long highSequenceId, ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker) { + if (!checkCanProduceTxnOnTopic(sequenceId, headersAndPayload)) { + return; + } if (!checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize, null)) { return; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index cf389824794e5..ae0cc912935aa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -44,10 +44,12 @@ import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.List; +import java.util.HashSet; import java.util.Map; +import java.util.List; import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -132,6 +134,7 @@ import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TopicPolicies; @@ -155,6 +158,7 @@ import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; /** @@ -1669,8 +1673,8 @@ public void testEncryptionRequired() throws Exception { Transaction txn = pulsarClient.newTransaction() .withTransactionTimeout(5, TimeUnit.SECONDS).build().get(); producer.newMessage(txn) - .value(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8)) - .send(); + .value(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8)) + .send(); txn.commit(); } @@ -1783,4 +1787,89 @@ private void getTopic(String topicName) { }); } + @DataProvider(name = "BlockTransactionsIfReplicationEnabledValues") + public static Object[][] packageNamesProvider() { + return new Object[][]{ + {false},{true} + }; + } + + @Test(dataProvider = "BlockTransactionsIfReplicationEnabledValues") + public void testBlockTransactionsIfReplicationEnabled(boolean block) throws Exception { + conf.setBlockTransactionsIfReplicationEnabled(block); + admin.clusters().createCluster("r1", ClusterData.builder() + .serviceUrl(getPulsarServiceList().get(0).getWebServiceAddress()) + .build() + ); + final String namespace = TENANT + "/txndisabledns"; + admin.namespaces().createNamespace(namespace); + String topic = "persistent://" + namespace + "/block-" + block; + admin.topics().createNonPartitionedTopic(topic); + getPulsarServiceList().get(0) + .getPulsarResources() + .getNamespaceResources() + .setPolicies(NamespaceName.get(namespace), p -> { + p.replication_clusters = new HashSet<>(Arrays.asList(CLUSTER_NAME, "r1")); + return p; + }); + getPulsarServiceList().get(0) + .getBrokerService() + .getTopic(topic, false) + .get() + .get() + .checkReplication() + .get(); + + @Cleanup + Consumer consumer = getConsumer(topic, "s1"); + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .producerName("testBlocked").sendTimeout(0, TimeUnit.SECONDS) + .topic(topic).enableBatching(true) + .create(); + + Transaction transaction = pulsarClient.newTransaction() + .withTransactionTimeout(10, TimeUnit.SECONDS).build().get(); + try { + producer.newMessage(transaction) + .value("test") + .send(); + if (block) { + fail(); + } + } catch (PulsarClientException.NotAllowedException notAllowedException) { + if (block) { + assertEquals(notAllowedException.getMessage(), "Transactions are not allowed " + + "in a namespace with replication enabled"); + } else { + fail("unexpected exception with block=false: " + notAllowedException.getMessage()); + } + } + + + final MessageId msgNoTxn = producer.newMessage() + .value("testnotxn") + .send(); + + try { + consumer.acknowledgeAsync(msgNoTxn, transaction).get(); + if (block) { + fail(); + } + } catch (ExecutionException ex) { + if (block) { + assertTrue(PulsarClientException.unwrap(ex.getCause()).getMessage() + .contains("Transactions are not allowed in a namespace with replication enabled")); + } else { + fail("unexpected exception with block=false: " + ex.getCause().getMessage()); + } + } finally { + getPulsarServiceList().get(0) + .getPulsarResources() + .getNamespaceResources().deletePolicies(NamespaceName.get(namespace)); + admin.clusters().deleteCluster("r1"); + } + consumer.acknowledgeAsync(msgNoTxn).get(); + } + }