Skip to content

Commit

Permalink
Mock snapshot table view in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
BewareMyPower committed Jul 24, 2024
1 parent 4fdab3a commit 3e2c82c
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -995,10 +995,7 @@ public void start() throws PulsarServerException {
MLPendingAckStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());

this.transactionBufferSnapshotServiceFactory = new TransactionBufferSnapshotServiceFactory(getClient());
this.snapshotTableView = new SnapshotTableView(
transactionBufferSnapshotServiceFactory.getTxnBufferSnapshotService(),
executor, Long.parseLong(config.getProperties().getProperty(
"brokerClient_operationTimeoutMs", "30000")));
this.snapshotTableView = new SnapshotTableView(this);

this.transactionTimer =
new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
package org.apache.pulsar.broker.transaction.buffer.impl;

import static org.apache.pulsar.broker.systopic.SystemTopicClient.Reader;
import com.google.common.annotations.VisibleForTesting;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
import org.apache.pulsar.common.naming.NamespaceName;
Expand All @@ -47,26 +49,15 @@ public class SnapshotTableView {
private final long blockTimeoutMs;
private final SimpleCache<NamespaceName, Reader<TransactionBufferSnapshot>> readers;

public SnapshotTableView(SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshot> snapshotService,
ScheduledExecutorService executor, long blockTimeoutMs) {
this.snapshotService = snapshotService;
this.blockTimeoutMs = blockTimeoutMs;
this.readers = new SimpleCache<>(executor, CACHE_EXPIRE_TIMEOUT_MS);
public SnapshotTableView(PulsarService pulsar) {
this.snapshotService = pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService();
this.blockTimeoutMs = Long.parseLong(pulsar.getConfig().getProperties()
.getProperty("brokerClient_operationTimeoutMs", "30000"));
this.readers = new SimpleCache<>(pulsar.getExecutor(), CACHE_EXPIRE_TIMEOUT_MS);
}

public TransactionBufferSnapshot readLatest(String topic) throws Exception {
final var topicName = TopicName.get(topic);
final var namespace = topicName.getNamespaceObject();
final var reader = readers.get(namespace, () -> {
try {
return wait(snapshotService.createReader(topicName), "create reader");
} catch (Exception e) {
throw new RuntimeException(e);
}
}, __ -> __.closeAsync().exceptionally(e -> {
log.warn("Failed to close reader {}", e.getMessage());
return null;
}));
final var reader = getReader(topic);
while (wait(reader.hasMoreEventsAsync(), "has more events")) {
final var msg = wait(reader.readNextAsync(), "read message");
if (msg.getKey() != null) {
Expand All @@ -80,11 +71,26 @@ public TransactionBufferSnapshot readLatest(String topic) throws Exception {
return snapshots.get(topic);
}

@VisibleForTesting
protected Reader<TransactionBufferSnapshot> getReader(String topic) {
final var topicName = TopicName.get(topic);
return readers.get(topicName.getNamespaceObject(), () -> {
try {
return wait(snapshotService.createReader(topicName), "create reader");
} catch (Exception e) {
throw new RuntimeException(e);
}
}, __ -> __.closeAsync().exceptionally(e -> {
log.warn("Failed to close reader {}", e.getMessage());
return null;
}));
}

private <T> T wait(CompletableFuture<T> future, String msg) throws Exception {
try {
return future.get(blockTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw new ExecutionException("Failed to " + msg, e.getCause());
throw new CompletionException("Failed to " + msg, e.getCause());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.impl.SingleSnapshotAbortedTxnProcessorImpl;
import org.apache.pulsar.broker.transaction.buffer.impl.SnapshotTableView;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndex;
Expand Down Expand Up @@ -581,6 +582,22 @@ private void checkSnapshotCount(TopicName topicName, boolean hasSnapshot,
reader.close();
}

static class MockSnapshotTableView extends SnapshotTableView {

private final PulsarService pulsar;

public MockSnapshotTableView(PulsarService pulsar) {
super(pulsar);
this.pulsar = pulsar;
}

@Override
public SystemTopicClient.Reader<TransactionBufferSnapshot> getReader(String topic) {
return pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService()
.createReader(TopicName.get(topic)).join();
}
}

@Test(timeOut=30000)
public void testTransactionBufferRecoverThrowException() throws Exception {
String topic = NAMESPACE1 + "/testTransactionBufferRecoverThrowPulsarClientException";
Expand Down Expand Up @@ -662,7 +679,12 @@ private void checkCloseTopic(PulsarClient pulsarClient,
PersistentTopic originalTopic,
Field field,
Producer<byte[]> producer) throws Exception {
field.set(getPulsarServiceList().get(0), transactionBufferSnapshotServiceFactory);
final var pulsar = getPulsarServiceList().get(0);
final var snapshotTableViewField = PulsarService.class.getDeclaredField("snapshotTableView");
final var originalSnapshotTableView = pulsar.getSnapshotTableView();
snapshotTableViewField.setAccessible(true);
snapshotTableViewField.set(pulsar, new MockSnapshotTableView(pulsar));
field.set(pulsar, transactionBufferSnapshotServiceFactory);

// recover again will throw then close topic
new TopicTransactionBuffer(originalTopic);
Expand All @@ -673,7 +695,8 @@ private void checkCloseTopic(PulsarClient pulsarClient,
assertTrue((boolean) close.get(originalTopic));
});

field.set(getPulsarServiceList().get(0), transactionBufferSnapshotServiceFactoryOriginal);
field.set(pulsar, transactionBufferSnapshotServiceFactoryOriginal);
snapshotTableViewField.set(pulsar, originalSnapshotTableView);

Transaction txn = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
Expand Down

0 comments on commit 3e2c82c

Please sign in to comment.