Skip to content

Commit

Permalink
Ignore TopicDoesNotExistException when writing null messages
Browse files Browse the repository at this point in the history
  • Loading branch information
BewareMyPower committed Jul 23, 2024
1 parent 79430fc commit 3e43da2
Showing 1 changed file with 15 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -54,6 +55,7 @@
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TxnIDData;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.MessageIdImpl;
Expand Down Expand Up @@ -797,18 +799,23 @@ private CompletableFuture<Void> updateSnapshotIndex(TransactionBufferSnapshotInd
}

private CompletableFuture<Void> clearSnapshotSegmentAndIndexes() {
CompletableFuture<Void> res = persistentWorker.clearAllSnapshotSegments()
return persistentWorker.clearAllSnapshotSegments()
.thenCompose((ignore) -> snapshotIndexWriter.getFuture()
.thenCompose(indexesWriter -> indexesWriter.writeAsync(topic.getName(), null)))
.thenRun(() ->
log.debug("Successes to clear the snapshot segment and indexes for the topic [{}]",
topic.getName()));
res.exceptionally(e -> {
log.error("Failed to clear the snapshot segment and indexes for the topic [{}]",
topic.getName(), e);
return null;
});
return res;
topic.getName()))
.exceptionally(throwable -> {
final var e = FutureUtil.unwrapCompletionException(throwable);
if (e instanceof PulsarClientException.TopicDoesNotExistException) {
// The system topic is deleted, no need to write a null message anymore
return null;
} else {
log.error("Failed to clear the snapshot segment and indexes for the topic [{}]",
topic.getName(), e);
throw new CompletionException(e);
}
});
}

/**
Expand Down

0 comments on commit 3e43da2

Please sign in to comment.