Skip to content

Commit

Permalink
[fix][broker] Correct schema deletion for parititioned topic (apache#…
Browse files Browse the repository at this point in the history
…21574)

### Motivation

Schemas binding on the partitioned topic, but schemas will be deleted when a partition is deleted.

### Modifications

Correct the behaviors of schema deleting:
- Pulsar deletes schema when a non-partitioned topic is deleted.
- Pulsar deletes schema when a partitioned topic metadata is deleted.
- Pulsar does not delete schema when only a part of a partitioned topic is deleted.
  • Loading branch information
poorbarcode authored Nov 21, 2023
1 parent c87cfb3 commit d1b7d0b
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,9 @@ protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse,
.thenCompose(unused -> internalRemovePartitionsTopicAsync(numPartitions, force));
})
// Only tries to delete the znode for partitioned topic when all its partitions are successfully deleted
).thenCompose(__ -> getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
).thenCompose(ignore ->
pulsar().getBrokerService().deleteSchema(topicName).exceptionally(ex -> null))
.thenCompose(__ -> getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.runWithMarkDeleteAsync(topicName, () -> namespaceResources()
.getPartitionedTopicResources().deletePartitionedTopicAsync(topicName)))
.thenAccept(__ -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3569,7 +3569,7 @@ public CompletableFuture<Void> deleteTopicPolicies(TopicName topicName) {
.deleteTopicPoliciesAsync(TopicName.get(topicName.getPartitionedTopicName()));
}

CompletableFuture<SchemaVersion> deleteSchema(TopicName topicName) {
public CompletableFuture<SchemaVersion> deleteSchema(TopicName topicName) {
String base = topicName.getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
SchemaRegistryService schemaRegistryService = getPulsar().getSchemaRegistryService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
import org.apache.pulsar.common.util.Codec;
Expand Down Expand Up @@ -2380,6 +2381,15 @@ private Optional<CompactorMXBean> getCompactorMXBean() {
return Optional.ofNullable(compactor).map(c -> c.getStats());
}

@Override
public CompletableFuture<SchemaVersion> deleteSchema() {
if (TopicName.get(getName()).isPartitioned()) {
// Only delete schema when partitioned metadata is deleting.
return CompletableFuture.completedFuture(null);
}
return brokerService.deleteSchema(TopicName.get(getName()));
}

@Override
public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean includeLedgerMetadata) {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import static org.testng.Assert.assertTrue;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker")
public class TopicGCTest extends ProducerConsumerBase {

@BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@EqualsAndHashCode.Include
protected void doInitConf() throws Exception {
super.doInitConf();
this.conf.setBrokerDeleteInactiveTopicsEnabled(true);
this.conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
this.conf.setBrokerDeleteInactiveTopicsFrequencySeconds(10);
}

@Test
public void testCreateConsumerAfterOnePartDeleted() throws Exception {
final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
final String partition0 = topic + "-partition-0";
final String partition1 = topic + "-partition-1";
final String subscription = "s1";
admin.topics().createPartitionedTopic(topic, 2);
admin.topics().createSubscription(topic, subscription, MessageId.earliest);

// create consumers and producers.
Producer<String> producer0 = pulsarClient.newProducer(Schema.STRING).topic(partition0)
.enableBatching(false).create();
Producer<String> producer1 = pulsarClient.newProducer(Schema.STRING).topic(partition1)
.enableBatching(false).create();
org.apache.pulsar.client.api.Consumer<String> consumer1 = pulsarClient.newConsumer(Schema.STRING).topic(topic)
.subscriptionName(subscription).isAckReceiptEnabled(true).subscribe();

// Make consume all messages for one topic, do not consume any messages for another one.
producer0.send("1");
producer1.send("2");
admin.topics().skipAllMessages(partition0, subscription);

// Wait for topic GC.
// Partition 0 will be deleted about 20s later, left 2min to avoid flaky.
producer0.close();
consumer1.close();
Awaitility.await().atMost(2, TimeUnit.MINUTES).untilAsserted(() -> {
CompletableFuture<Optional<Topic>> tp1 = pulsar.getBrokerService().getTopic(partition0, false);
CompletableFuture<Optional<Topic>> tp2 = pulsar.getBrokerService().getTopic(partition1, false);
assertTrue(tp1 == null || !tp1.get().isPresent());
assertTrue(tp2 != null && tp2.get().isPresent());
});

// Verify that the consumer subscribed with partitioned topic can be created successful.
Consumer<String> consumerAllPartition = pulsarClient.newConsumer(Schema.STRING).topic(topic)
.subscriptionName(subscription).isAckReceiptEnabled(true).subscribe();
Message<String> msg = consumerAllPartition.receive(2, TimeUnit.SECONDS);
String receivedMsgValue = msg.getValue();
log.info("received msg: {}", receivedMsgValue);
consumerAllPartition.acknowledge(msg);

// cleanup.
consumerAllPartition.close();
producer0.close();
producer1.close();
admin.topics().deletePartitionedTopic(topic);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.schema;

import static org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
import static org.testng.Assert.assertTrue;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = "broker")
public class TopicSchemaTest extends ProducerConsumerBase {

@BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@DataProvider(name = "topicDomains")
public Object[][] topicDomains() {
return new Object[][]{
{TopicDomain.non_persistent},
{TopicDomain.persistent}
};
}

@Test(dataProvider = "topicDomains")
public void testDeleteNonPartitionedTopicWithSchema(TopicDomain topicDomain) throws Exception {
final String topic = BrokerTestUtil.newUniqueName(topicDomain.value() + "://public/default/tp");
final String schemaId = TopicName.get(TopicName.get(topic).getPartitionedTopicName()).getSchemaName();
admin.topics().createNonPartitionedTopic(topic);

// Add schema.
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic)
.enableBatching(false).create();
producer.close();
List<SchemaAndMetadata> schemaList1 = pulsar.getSchemaRegistryService().getAllSchemas(schemaId).join()
.stream().map(s -> s.join()).filter(Objects::nonNull).collect(Collectors.toList());
assertTrue(schemaList1 != null && schemaList1.size() > 0);

// Verify the schema has been deleted with topic.
admin.topics().delete(topic, false);
List<SchemaAndMetadata> schemaList2 = pulsar.getSchemaRegistryService().getAllSchemas(schemaId).join()
.stream().map(s -> s.join()).filter(Objects::nonNull).collect(Collectors.toList());
assertTrue(schemaList2 == null || schemaList2.isEmpty());
}

@Test
public void testDeletePartitionedTopicWithoutSchema() throws Exception {
// Non-persistent topic does not support partitioned topic now, so only write a test case for persistent topic.
TopicDomain topicDomain = TopicDomain.persistent;
final String topic = BrokerTestUtil.newUniqueName(topicDomain.value() + "://public/default/tp");
final String partition0 = topic + "-partition-0";
final String partition1 = topic + "-partition-1";
final String schemaId = TopicName.get(TopicName.get(topic).getPartitionedTopicName()).getSchemaName();
admin.topics().createPartitionedTopic(topic, 2);

// Add schema.
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic)
.enableBatching(false).create();
producer.close();
List<SchemaAndMetadata> schemaList1 = pulsar.getSchemaRegistryService().getAllSchemas(schemaId).join()
.stream().map(s -> s.join()).filter(Objects::nonNull).collect(Collectors.toList());
assertTrue(schemaList1 != null && schemaList1.size() > 0);

// Verify the schema will not been deleted with partition-0.
admin.topics().delete(partition0, false);
List<SchemaAndMetadata> schemaList2 = pulsar.getSchemaRegistryService().getAllSchemas(schemaId).join()
.stream().map(s -> s.join()).filter(Objects::nonNull).collect(Collectors.toList());
assertTrue(schemaList2 != null && schemaList2.size() > 0);

// Verify the schema will not been deleted with partition-0 & partition-1.
admin.topics().delete(partition1, false);
List<SchemaAndMetadata> schemaList3 = pulsar.getSchemaRegistryService().getAllSchemas(schemaId).join()
.stream().map(s -> s.join()).filter(Objects::nonNull).collect(Collectors.toList());
assertTrue(schemaList3 != null && schemaList3.size() > 0);

// Verify the schema will be deleted with partitioned metadata.
admin.topics().deletePartitionedTopic(topic, false);
List<SchemaAndMetadata> schemaList4 = pulsar.getSchemaRegistryService().getAllSchemas(schemaId).join()
.stream().map(s -> s.join()).filter(Objects::nonNull).collect(Collectors.toList());
assertTrue(schemaList4 == null || schemaList4.isEmpty());
}
}

0 comments on commit d1b7d0b

Please sign in to comment.