Skip to content

Commit

Permalink
[fix][broker] Delete compacted ledger when topic is deleted (apache#2…
Browse files Browse the repository at this point in the history
…1745)

(cherry picked from commit 1c8aeba)
  • Loading branch information
coderzc authored and nikhil-ctds committed Jan 4, 2024
1 parent a15fb8a commit 555a648
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
protected final MessageDeduplication messageDeduplication;

private static final Long COMPACTION_NEVER_RUN = -0xfebecffeL;
private CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(COMPACTION_NEVER_RUN);
private volatile CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(
COMPACTION_NEVER_RUN);
private TopicCompactionService topicCompactionService;

// TODO: Create compaction strategy from topic policy when exposing strategic compaction to users.
Expand Down Expand Up @@ -1166,13 +1167,14 @@ private void asyncDeleteCursorWithClearDelayedMessage(String subscriptionName,
CompletableFuture<Void> unsubscribeFuture) {
PersistentSubscription persistentSubscription = subscriptions.get(subscriptionName);
if (persistentSubscription == null) {
log.warn("[{}][{}] Can't find subscription, skip clear delayed message", topic, subscriptionName);
log.warn("[{}][{}] Can't find subscription, skip delete cursor", topic, subscriptionName);
unsubscribeFuture.complete(null);
return;
}

if (!isDelayedDeliveryEnabled()
|| !(brokerService.getDelayedDeliveryTrackerFactory() instanceof BucketDelayedDeliveryTrackerFactory)) {
asyncDeleteCursor(subscriptionName, unsubscribeFuture);
asyncDeleteCursorWithCleanCompactionLedger(persistentSubscription, unsubscribeFuture);
return;
}

Expand All @@ -1187,7 +1189,7 @@ private void asyncDeleteCursorWithClearDelayedMessage(String subscriptionName,
if (ex != null) {
unsubscribeFuture.completeExceptionally(ex);
} else {
asyncDeleteCursor(subscriptionName, unsubscribeFuture);
asyncDeleteCursorWithCleanCompactionLedger(persistentSubscription, unsubscribeFuture);
}
});
}
Expand All @@ -1197,6 +1199,29 @@ private void asyncDeleteCursorWithClearDelayedMessage(String subscriptionName,
dispatcher.clearDelayedMessages().whenComplete((__, ex) -> {
if (ex != null) {
unsubscribeFuture.completeExceptionally(ex);
} else {
asyncDeleteCursorWithCleanCompactionLedger(persistentSubscription, unsubscribeFuture);
}
});
}

private void asyncDeleteCursorWithCleanCompactionLedger(PersistentSubscription subscription,
CompletableFuture<Void> unsubscribeFuture) {
final String subscriptionName = subscription.getName();
if ((!isCompactionSubscription(subscriptionName)) || !(subscription instanceof PulsarCompactorSubscription)) {
asyncDeleteCursor(subscriptionName, unsubscribeFuture);
return;
}

currentCompaction.handle((__, e) -> {
if (e != null) {
log.warn("[{}][{}] Last compaction task failed", topic, subscriptionName);
}
return ((PulsarCompactorSubscription) subscription).cleanCompactedLedger();
}).whenComplete((__, ex) -> {
if (ex != null) {
log.error("[{}][{}] Error cleaning compacted ledger", topic, subscriptionName, ex);
unsubscribeFuture.completeExceptionally(ex);
} else {
asyncDeleteCursor(subscriptionName, unsubscribeFuture);
}
Expand Down Expand Up @@ -3187,17 +3212,29 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
public synchronized void triggerCompaction()
throws PulsarServerException, AlreadyRunningException {
if (currentCompaction.isDone()) {
if (!lock.readLock().tryLock()) {
log.info("[{}] Conflict topic-close, topic-delete, skip triggering compaction", topic);
return;
}
try {
if (isClosingOrDeleting) {
log.info("[{}] Topic is closing or deleting, skip triggering compaction", topic);
return;
}

if (strategicCompactionMap.containsKey(topic)) {
currentCompaction = brokerService.pulsar().getStrategicCompactor()
.compact(topic, strategicCompactionMap.get(topic));
} else {
currentCompaction = topicCompactionService.compact().thenApply(x -> null);
if (strategicCompactionMap.containsKey(topic)) {
currentCompaction = brokerService.pulsar().getStrategicCompactor()
.compact(topic, strategicCompactionMap.get(topic));
} else {
currentCompaction = topicCompactionService.compact().thenApply(x -> null);
}
} finally {
lock.readLock().unlock();
}
currentCompaction.whenComplete((ignore, ex) -> {
if (ex != null){
log.warn("[{}] Compaction failure.", topic, ex);
}
if (ex != null) {
log.warn("[{}] Compaction failure.", topic, ex);
}
});
} else {
throw new AlreadyRunningException("Compaction already in progress");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
import static org.apache.pulsar.broker.service.AbstractBaseDispatcher.checkAndApplyReachedEndOfTopicOrTopicMigration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.CompactedTopicContext;
import org.apache.pulsar.compaction.CompactedTopicImpl;
import org.apache.pulsar.compaction.Compactor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -106,5 +109,19 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
}
}

CompletableFuture<Void> cleanCompactedLedger() {
final CompletableFuture<CompactedTopicContext> compactedTopicContextFuture =
((CompactedTopicImpl) compactedTopic).getCompactedTopicContextFuture();
if (compactedTopicContextFuture != null) {
return compactedTopicContextFuture.thenCompose(context -> {
long compactedLedgerId = context.getLedger().getId();
((CompactedTopicImpl) compactedTopic).reset();
return compactedTopic.deleteCompactedLedger(compactedLedgerId);
});
} else {
return CompletableFuture.completedFuture(null);
}
}

private static final Logger log = LoggerFactory.getLogger(PulsarCompactorSubscription.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public RawReaderImpl(PulsarClientImpl client, String topic, String subscription,
consumerConfiguration.setReceiverQueueSize(DEFAULT_RECEIVER_QUEUE_SIZE);
consumerConfiguration.setReadCompacted(true);
consumerConfiguration.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
consumerConfiguration.setAckReceiptEnabled(true);

consumer = new RawConsumerImpl(client, consumerConfiguration,
consumerFuture);
Expand Down Expand Up @@ -122,7 +123,7 @@ static class RawConsumerImpl extends ConsumerImpl<byte[]> {
MessageId.earliest,
0 /* startMessageRollbackDurationInSec */,
Schema.BYTES, null,
true
false
);
incomingRawMessages = new GrowableArrayBlockingQueue<>();
pendingRawReceives = new ConcurrentLinkedQueue<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,11 @@ public Optional<Position> getCompactionHorizon() {
return Optional.ofNullable(this.compactionHorizon);
}

public void reset() {
this.compactionHorizon = null;
this.compactedTopicContext = null;
}

@Nullable
public CompletableFuture<CompactedTopicContext> getCompactedTopicContextFuture() {
return compactedTopicContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
Expand All @@ -46,21 +47,26 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Cleanup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.api.OpenBuilder;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SystemTopic;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
Expand All @@ -81,6 +87,7 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
Expand Down Expand Up @@ -1989,4 +1996,127 @@ public void testCompactionDuplicate() throws Exception {
}
}
}

@Test
public void testDeleteCompactedLedger() throws Exception {
String topicName = "persistent://my-property/use/my-ns/testDeleteCompactedLedger";

final String subName = "my-sub";
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(false).topic(topicName).create();

pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).readCompacted(true).subscribe().close();

for (int i = 0; i < 10; i++) {
producer.newMessage().key(String.valueOf(i % 2)).value(String.valueOf(i)).sendAsync();
}
producer.flush();

compact(topicName);

MutableLong compactedLedgerId = new MutableLong(-1);
Awaitility.await().untilAsserted(() -> {
PersistentTopicInternalStats stats = admin.topics().getInternalStats(topicName);
Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1L);
compactedLedgerId.setValue(stats.compactedLedger.ledgerId);
Assert.assertEquals(stats.compactedLedger.entries, 2L);
});

// delete compacted ledger
admin.topics().deleteSubscription(topicName, "__compaction");

Awaitility.await().untilAsserted(() -> {
PersistentTopicInternalStats stats = admin.topics().getInternalStats(topicName);
Assert.assertEquals(stats.compactedLedger.ledgerId, -1L);
Assert.assertEquals(stats.compactedLedger.entries, -1L);
assertThrows(BKException.BKNoSuchLedgerExistsException.class, () -> pulsarTestContext.getBookKeeperClient()
.openLedger(compactedLedgerId.getValue(), BookKeeper.DigestType.CRC32C, new byte[]{}));
});

compact(topicName);

MutableLong compactedLedgerId2 = new MutableLong(-1);
Awaitility.await().untilAsserted(() -> {
PersistentTopicInternalStats stats = admin.topics().getInternalStats(topicName);
Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1L);
compactedLedgerId2.setValue(stats.compactedLedger.ledgerId);
Assert.assertEquals(stats.compactedLedger.entries, 2L);
});

producer.close();
admin.topics().delete(topicName);

Awaitility.await().untilAsserted(() -> assertThrows(BKException.BKNoSuchLedgerExistsException.class,
() -> pulsarTestContext.getBookKeeperClient().openLedger(
compactedLedgerId2.getValue(), BookKeeper.DigestType.CRC32, new byte[]{})));
}

@Test
public void testDeleteCompactedLedgerWithSlowAck() throws Exception {
// Disable topic level policies, since block ack thread may also block thread of delete topic policies.
conf.setTopicLevelPoliciesEnabled(false);
restartBroker();

String topicName = "persistent://my-property/use/my-ns/testDeleteCompactedLedgerWithSlowAck";
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(false).topic(topicName).create();

pulsarClient.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Exclusive)
.subscriptionName(Compactor.COMPACTION_SUBSCRIPTION)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).readCompacted(true).subscribe()
.close();

for (int i = 0; i < 10; i++) {
producer.newMessage().key(String.valueOf(i % 2)).value(String.valueOf(i)).sendAsync();
}
producer.flush();

PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
PersistentSubscription subscription = spy(topic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION));
topic.getSubscriptions().put(Compactor.COMPACTION_SUBSCRIPTION, subscription);

AtomicLong compactedLedgerId = new AtomicLong(-1);
AtomicBoolean pauseAck = new AtomicBoolean();
Mockito.doAnswer(invocationOnMock -> {
Map<String, Long> properties = (Map<String, Long>) invocationOnMock.getArguments()[2];
log.info("acknowledgeMessage properties: {}", properties);
compactedLedgerId.set(properties.get(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY));
pauseAck.set(true);
while (pauseAck.get()) {
Thread.sleep(200);
}
return invocationOnMock.callRealMethod();
}).when(subscription).acknowledgeMessage(Mockito.any(), Mockito.eq(
CommandAck.AckType.Cumulative), Mockito.any());

admin.topics().triggerCompaction(topicName);

while (!pauseAck.get()) {
Thread.sleep(100);
}

CompletableFuture<Long> currentCompaction =
(CompletableFuture<Long>) FieldUtils.readDeclaredField(topic, "currentCompaction", true);
CompletableFuture<Long> spyCurrentCompaction = spy(currentCompaction);
FieldUtils.writeDeclaredField(topic, "currentCompaction", spyCurrentCompaction, true);
currentCompaction.whenComplete((obj, throwable) -> {
if (throwable != null) {
spyCurrentCompaction.completeExceptionally(throwable);
} else {
spyCurrentCompaction.complete(obj);
}
});
Mockito.doAnswer(invocationOnMock -> {
pauseAck.set(false);
return invocationOnMock.callRealMethod();
}).when(spyCurrentCompaction).handle(Mockito.any());

admin.topics().delete(topicName, true);

Awaitility.await().untilAsserted(() -> assertThrows(BKException.BKNoSuchLedgerExistsException.class,
() -> pulsarTestContext.getBookKeeperClient().openLedger(
compactedLedgerId.get(), BookKeeper.DigestType.CRC32, new byte[]{})));
}
}

0 comments on commit 555a648

Please sign in to comment.