Skip to content

Commit

Permalink
[fix][txn] Use PulsarResource check for topic existence instead of br…
Browse files Browse the repository at this point in the history
…okerservice.getTopic() (#20569)
  • Loading branch information
liangyepianzhou authored and Technoboy- committed Jul 3, 2023
1 parent ce51966 commit 42894e7
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -381,10 +382,12 @@ public void openReadOnlyManagedLedgerFailed(ManagedLedgerException exception, Ob

// This method will be deprecated and removed in version 4.x.0
private CompletableFuture<PositionImpl> recoverOldSnapshot() {
return topic.getBrokerService().getTopic(TopicName.get(topic.getName()).getNamespace() + "/"
+ SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT, false)
.thenCompose(topicOption -> {
if (!topicOption.isPresent()) {
return topic.getBrokerService().getPulsar().getPulsarResources().getTopicResources()
.listPersistentTopicsAsync(NamespaceName.get(TopicName.get(topic.getName()).getNamespace()))
.thenCompose(topics -> {
if (!topics.contains(TopicDomain.persistent + "://"
+ TopicName.get(topic.getName()).getNamespace() + "/"
+ SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT)) {
return CompletableFuture.completedFuture(null);
} else {
return topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,21 @@
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -115,6 +121,50 @@ public void testCommitOnTopic() throws ExecutionException, InterruptedException
}
}

@Test
public void testRecoveryTransactionBufferWhenCommonTopicAndSystemTopicAtDifferentBroker() throws Exception {
for (int i = 0; i < getPulsarServiceList().size(); i++) {
getPulsarServiceList().get(i).getConfig().setTransactionBufferSegmentedSnapshotEnabled(true);
}
String topic1 = NAMESPACE1 + "/testRecoveryTransactionBufferWhenCommonTopicAndSystemTopicAtDifferentBroker";
admin.tenants().createTenant(TENANT,
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NAMESPACE1, 4);
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
pulsarServiceList.get(0).getPulsarResources()
.getNamespaceResources()
.getPartitionedTopicResources()
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
new PartitionedTopicMetadata(3));
assertTrue(admin.namespaces().getBundles(NAMESPACE1).getNumBundles() > 1);
for (int i = 0; true ; i++) {
topic1 = topic1 + i;
admin.topics().createNonPartitionedTopic(topic1);
String segmentTopicBroker = admin.lookups()
.lookupTopic(NAMESPACE1 + "/" + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
String indexTopicBroker = admin.lookups()
.lookupTopic(NAMESPACE1 + "/" + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES);
if (segmentTopicBroker.equals(indexTopicBroker)) {
String topicBroker = admin.lookups().lookupTopic(topic1);
if (!topicBroker.equals(segmentTopicBroker)) {
break;
}
} else {
break;
}
}
pulsarClient = PulsarClient.builder().serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
.enableTransaction(true).build();
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic(topic1).create();
Transaction transaction = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.HOURS)
.build().get();
producer.newMessage(transaction).send();
}

@Test
public void testAbortOnTopic() throws ExecutionException, InterruptedException {
List<CompletableFuture<TxnID>> futures = new ArrayList<>();
Expand Down

0 comments on commit 42894e7

Please sign in to comment.