Skip to content

Commit

Permalink
[fix] [broker] Delete topic timeout due to NPE (apache#21595)
Browse files Browse the repository at this point in the history
### Issue:
There is an NPE that causes the Future of Delay message indexes bucket deletion to be no longer complete, which leads to the topic deletion timeout. You can reproduce this issue by the test `testDeletePartitionedTopicIfCursorPropsEmpty` and `testDeleteTopicIfCursorPropsEmpty`

### Modifications
Fix the NPE.
  • Loading branch information
poorbarcode authored Nov 21, 2023
1 parent 98bf9dd commit b2f2b53
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ private void recoveredCursor(PositionImpl position, Map<String, Long> properties
position = ledger.getLastPosition();
}
log.info("[{}] Cursor {} recovered to position {}", ledger.getName(), name, position);
this.cursorProperties = cursorProperties;
this.cursorProperties = cursorProperties == null ? Collections.emptyMap() : cursorProperties;
messagesConsumedCounter = -getNumberOfEntries(Range.openClosed(position, ledger.getLastPosition()));
markDeletePosition = position;
persistentMarkDeletePosition = position;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.commons.collections4.MapUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.delayed.bucket.BookkeeperBucketSnapshotStorage;
Expand Down Expand Up @@ -85,6 +86,9 @@ public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers d
*/
public CompletableFuture<Void> cleanResidualSnapshots(ManagedCursor cursor) {
Map<String, String> cursorProperties = cursor.getCursorProperties();
if (MapUtils.isEmpty(cursorProperties)) {
return CompletableFuture.completedFuture(null);
}
List<CompletableFuture<Void>> futures = new ArrayList<>();
FutureUtil.Sequencer<Void> sequencer = FutureUtil.Sequencer.create();
cursorProperties.forEach((k, v) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker;
Expand Down Expand Up @@ -137,9 +138,13 @@ public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispat

private synchronized long recoverBucketSnapshot() throws RuntimeException {
ManagedCursor cursor = this.lastMutableBucket.getCursor();
Map<String, String> cursorProperties = cursor.getCursorProperties();
if (MapUtils.isEmpty(cursorProperties)) {
return 0;
}
FutureUtil.Sequencer<Void> sequencer = this.lastMutableBucket.getSequencer();
Map<Range<Long>, ImmutableBucket> toBeDeletedBucketMap = new HashMap<>();
cursor.getCursorProperties().keySet().forEach(key -> {
cursorProperties.keySet().forEach(key -> {
if (key.startsWith(DELAYED_BUCKET_KEY_PREFIX)) {
String[] keys = key.split(DELIMITER);
checkArgument(keys.length == 3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@

import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.CURSOR_INTERNAL_PROPERTY_PREFIX;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Multimap;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -32,6 +35,7 @@
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.broker.BrokerTestUtil;
Expand All @@ -40,13 +44,15 @@
import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = "broker")
Expand Down Expand Up @@ -353,4 +359,121 @@ public void testDelete() throws Exception {
}
}
}

@DataProvider(name = "subscriptionTypes")
public Object[][] subscriptionTypes() {
return new Object[][]{
{SubscriptionType.Shared},
{SubscriptionType.Key_Shared},
{SubscriptionType.Failover},
{SubscriptionType.Exclusive},
};
}

/**
* see: https://github.com/apache/pulsar/pull/21595.
*/
@Test(dataProvider = "subscriptionTypes")
public void testDeleteTopicIfCursorPropsEmpty(SubscriptionType subscriptionType) throws Exception {
final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
final String subscriptionName = "s1";
// create a topic.
admin.topics().createNonPartitionedTopic(topic);
// create a subscription without props.
admin.topics().createSubscription(topic, subscriptionName, MessageId.earliest);
pulsarClient.newConsumer().topic(topic).subscriptionName(subscriptionName)
.subscriptionType(subscriptionType).subscribe().close();
ManagedCursorImpl cursor = findCursor(topic, subscriptionName);
assertNotNull(cursor);
assertTrue(cursor.getCursorProperties() == null || cursor.getCursorProperties().isEmpty());
// Test topic deletion is successful.
admin.topics().delete(topic);
}

/**
* see: https://github.com/apache/pulsar/pull/21595.
*/
@Test(dataProvider = "subscriptionTypes")
public void testDeletePartitionedTopicIfCursorPropsEmpty(SubscriptionType subscriptionType) throws Exception {
final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
final String subscriptionName = "s1";
// create a topic.
admin.topics().createPartitionedTopic(topic, 2);
// create a subscription without props.
admin.topics().createSubscription(topic, subscriptionName, MessageId.earliest);
pulsarClient.newConsumer().topic(topic).subscriptionName(subscriptionName)
.subscriptionType(subscriptionType).subscribe().close();
ManagedCursorImpl cursor = findCursor(topic + "-partition-0", subscriptionName);
assertNotNull(cursor);
assertTrue(cursor.getCursorProperties() == null || cursor.getCursorProperties().isEmpty());
// Test topic deletion is successful.
admin.topics().deletePartitionedTopic(topic);
}

/**
* see: https://github.com/apache/pulsar/pull/21595.
*/
@Test(dataProvider = "subscriptionTypes")
public void testDeleteTopicIfCursorPropsNotEmpty(SubscriptionType subscriptionType) throws Exception {
final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
final String subscriptionName = "s1";
// create a topic.
admin.topics().createNonPartitionedTopic(topic);
// create a subscription without props.
admin.topics().createSubscription(topic, subscriptionName, MessageId.earliest);
pulsarClient.newConsumer().topic(topic).subscriptionName(subscriptionName)
.subscriptionType(subscriptionType).subscribe().close();
ManagedCursorImpl cursor = findCursor(topic, subscriptionName);
assertNotNull(cursor);
assertTrue(cursor.getCursorProperties() == null || cursor.getCursorProperties().isEmpty());
// Put a subscription prop.
Map<String,String> properties = new HashMap<>();
properties.put("ignore", "ignore");
admin.topics().updateSubscriptionProperties(topic, subscriptionName, properties);
assertTrue(cursor.getCursorProperties() != null && !cursor.getCursorProperties().isEmpty());
// Test topic deletion is successful.
admin.topics().delete(topic);
}

/**
* see: https://github.com/apache/pulsar/pull/21595.
*/
@Test(dataProvider = "subscriptionTypes")
public void testDeletePartitionedTopicIfCursorPropsNotEmpty(SubscriptionType subscriptionType) throws Exception {
final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
final String subscriptionName = "s1";
// create a topic.
admin.topics().createPartitionedTopic(topic, 2);
pulsarClient.newProducer().topic(topic).create().close();
// create a subscription without props.
admin.topics().createSubscription(topic, subscriptionName, MessageId.earliest);
pulsarClient.newConsumer().topic(topic).subscriptionName(subscriptionName)
.subscriptionType(subscriptionType).subscribe().close();

ManagedCursorImpl cursor = findCursor(topic + "-partition-0", subscriptionName);
assertNotNull(cursor);
assertTrue(cursor.getCursorProperties() == null || cursor.getCursorProperties().isEmpty());
// Put a subscription prop.
Map<String,String> properties = new HashMap<>();
properties.put("ignore", "ignore");
admin.topics().updateSubscriptionProperties(topic, subscriptionName, properties);
assertTrue(cursor.getCursorProperties() != null && !cursor.getCursorProperties().isEmpty());
// Test topic deletion is successful.
admin.topics().deletePartitionedTopic(topic);
}


private ManagedCursorImpl findCursor(String topic, String subscriptionName) {
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get();
Iterator<ManagedCursor> cursorIterator = persistentTopic.getManagedLedger().getCursors().iterator();
while (cursorIterator.hasNext()) {
ManagedCursor managedCursor = cursorIterator.next();
if (managedCursor == null || !managedCursor.getName().equals(subscriptionName)) {
continue;
}
return (ManagedCursorImpl) managedCursor;
}
return null;
}
}

0 comments on commit b2f2b53

Please sign in to comment.