diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 9dde179955fe3..492db888fb260 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -995,10 +995,7 @@ public void start() throws PulsarServerException { MLPendingAckStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress()); this.transactionBufferSnapshotServiceFactory = new TransactionBufferSnapshotServiceFactory(getClient()); - this.snapshotTableView = new SnapshotTableView( - transactionBufferSnapshotServiceFactory.getTxnBufferSnapshotService(), - executor, Long.parseLong(config.getProperties().getProperty( - "brokerClient_operationTimeoutMs", "30000"))); + this.snapshotTableView = new SnapshotTableView(this); this.transactionTimer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer")); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotTableView.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotTableView.java index 256084132081c..7099f22c78cf7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotTableView.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotTableView.java @@ -19,13 +19,15 @@ package org.apache.pulsar.broker.transaction.buffer.impl; import static org.apache.pulsar.broker.systopic.SystemTopicClient.Reader; +import com.google.common.annotations.VisibleForTesting; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService; import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot; import org.apache.pulsar.common.naming.NamespaceName; @@ -47,26 +49,15 @@ public class SnapshotTableView { private final long blockTimeoutMs; private final SimpleCache> readers; - public SnapshotTableView(SystemTopicTxnBufferSnapshotService snapshotService, - ScheduledExecutorService executor, long blockTimeoutMs) { - this.snapshotService = snapshotService; - this.blockTimeoutMs = blockTimeoutMs; - this.readers = new SimpleCache<>(executor, CACHE_EXPIRE_TIMEOUT_MS); + public SnapshotTableView(PulsarService pulsar) { + this.snapshotService = pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService(); + this.blockTimeoutMs = Long.parseLong(pulsar.getConfig().getProperties() + .getProperty("brokerClient_operationTimeoutMs", "30000")); + this.readers = new SimpleCache<>(pulsar.getExecutor(), CACHE_EXPIRE_TIMEOUT_MS); } public TransactionBufferSnapshot readLatest(String topic) throws Exception { - final var topicName = TopicName.get(topic); - final var namespace = topicName.getNamespaceObject(); - final var reader = readers.get(namespace, () -> { - try { - return wait(snapshotService.createReader(topicName), "create reader"); - } catch (Exception e) { - throw new RuntimeException(e); - } - }, __ -> __.closeAsync().exceptionally(e -> { - log.warn("Failed to close reader {}", e.getMessage()); - return null; - })); + final var reader = getReader(topic); while (wait(reader.hasMoreEventsAsync(), "has more events")) { final var msg = wait(reader.readNextAsync(), "read message"); if (msg.getKey() != null) { @@ -80,11 +71,26 @@ public TransactionBufferSnapshot readLatest(String topic) throws Exception { return snapshots.get(topic); } + @VisibleForTesting + protected Reader getReader(String topic) { + final var topicName = TopicName.get(topic); + return readers.get(topicName.getNamespaceObject(), () -> { + try { + return wait(snapshotService.createReader(topicName), "create reader"); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, __ -> __.closeAsync().exceptionally(e -> { + log.warn("Failed to close reader {}", e.getMessage()); + return null; + })); + } + private T wait(CompletableFuture future, String msg) throws Exception { try { return future.get(blockTimeoutMs, TimeUnit.MILLISECONDS); } catch (ExecutionException e) { - throw new ExecutionException("Failed to " + msg, e.getCause()); + throw new CompletionException("Failed to " + msg, e.getCause()); } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java index 3cf3a0bba3bcf..057bd4eb76d0c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java @@ -66,6 +66,7 @@ import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor; import org.apache.pulsar.broker.transaction.buffer.impl.SingleSnapshotAbortedTxnProcessorImpl; +import org.apache.pulsar.broker.transaction.buffer.impl.SnapshotTableView; import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer; import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot; import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndex; @@ -581,6 +582,22 @@ private void checkSnapshotCount(TopicName topicName, boolean hasSnapshot, reader.close(); } + static class MockSnapshotTableView extends SnapshotTableView { + + private final PulsarService pulsar; + + public MockSnapshotTableView(PulsarService pulsar) { + super(pulsar); + this.pulsar = pulsar; + } + + @Override + public SystemTopicClient.Reader getReader(String topic) { + return pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService() + .createReader(TopicName.get(topic)).join(); + } + } + @Test(timeOut=30000) public void testTransactionBufferRecoverThrowException() throws Exception { String topic = NAMESPACE1 + "/testTransactionBufferRecoverThrowPulsarClientException"; @@ -662,7 +679,12 @@ private void checkCloseTopic(PulsarClient pulsarClient, PersistentTopic originalTopic, Field field, Producer producer) throws Exception { - field.set(getPulsarServiceList().get(0), transactionBufferSnapshotServiceFactory); + final var pulsar = getPulsarServiceList().get(0); + final var snapshotTableViewField = PulsarService.class.getDeclaredField("snapshotTableView"); + final var originalSnapshotTableView = pulsar.getSnapshotTableView(); + snapshotTableViewField.setAccessible(true); + snapshotTableViewField.set(pulsar, new MockSnapshotTableView(pulsar)); + field.set(pulsar, transactionBufferSnapshotServiceFactory); // recover again will throw then close topic new TopicTransactionBuffer(originalTopic); @@ -673,7 +695,8 @@ private void checkCloseTopic(PulsarClient pulsarClient, assertTrue((boolean) close.get(originalTopic)); }); - field.set(getPulsarServiceList().get(0), transactionBufferSnapshotServiceFactoryOriginal); + field.set(pulsar, transactionBufferSnapshotServiceFactoryOriginal); + snapshotTableViewField.set(pulsar, originalSnapshotTableView); Transaction txn = pulsarClient.newTransaction() .withTransactionTimeout(5, TimeUnit.SECONDS)