From 3e43da22b4be7742a89752949c8d0ddf15ea7618 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 23 Jul 2024 15:15:37 +0800 Subject: [PATCH] Ignore TopicDoesNotExistException when writing null messages --- ...napshotSegmentAbortedTxnProcessorImpl.java | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java index 9d55fdacfb2d19..efaf165393f6b9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java @@ -24,6 +24,7 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.TimeUnit; @@ -54,6 +55,7 @@ import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment; import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TxnIDData; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.MessageIdImpl; @@ -797,18 +799,23 @@ private CompletableFuture updateSnapshotIndex(TransactionBufferSnapshotInd } private CompletableFuture clearSnapshotSegmentAndIndexes() { - CompletableFuture res = persistentWorker.clearAllSnapshotSegments() + return persistentWorker.clearAllSnapshotSegments() .thenCompose((ignore) -> snapshotIndexWriter.getFuture() .thenCompose(indexesWriter -> indexesWriter.writeAsync(topic.getName(), null))) .thenRun(() -> log.debug("Successes to clear the snapshot segment and indexes for the topic [{}]", - topic.getName())); - res.exceptionally(e -> { - log.error("Failed to clear the snapshot segment and indexes for the topic [{}]", - topic.getName(), e); - return null; - }); - return res; + topic.getName())) + .exceptionally(throwable -> { + final var e = FutureUtil.unwrapCompletionException(throwable); + if (e instanceof PulsarClientException.TopicDoesNotExistException) { + // The system topic is deleted, no need to write a null message anymore + return null; + } else { + log.error("Failed to clear the snapshot segment and indexes for the topic [{}]", + topic.getName(), e); + throw new CompletionException(e); + } + }); } /**