Skip to content

Commit

Permalink
Fix failed tests due to asynchronous snapshot taking
Browse files Browse the repository at this point in the history
  • Loading branch information
BewareMyPower committed Jul 23, 2024
1 parent 3e43da2 commit 7fc3cf5
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -653,14 +653,18 @@ public void testGetTransactionBufferInternalStats() throws Exception {
transaction.abort().get();

// Get transaction buffer internal stats and verify segmented snapshot stats
stats = admin.transactions().getTransactionBufferInternalStatsAsync(topic3, true).get();
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> assertTransactionStats(topic3));
}

private void assertTransactionStats(String topic) throws Exception {
var stats = admin.transactions().getTransactionBufferInternalStatsAsync(topic, true).get();
assertEquals(stats.snapshotType, AbortedTxnProcessor.SnapshotType.Segment.toString());
assertNull(stats.singleSnapshotSystemTopicInternalStats);
assertNotNull(stats.segmentInternalStats);

// Get managed ledger internal stats for the transaction buffer segments topic
internalStats = admin.topics().getInternalStats(
TopicName.get(topic2).getNamespace() + "/" +
var internalStats = admin.topics().getInternalStats(
TopicName.get(topic).getNamespace() + "/" +
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
verifyManagedLedgerInternalStats(stats.segmentInternalStats.managedLedgerInternalStats, internalStats);
assertTrue(stats.segmentInternalStats.managedLedgerName
Expand All @@ -669,7 +673,7 @@ public void testGetTransactionBufferInternalStats() throws Exception {
// Get managed ledger internal stats for the transaction buffer indexes topic
assertNotNull(stats.segmentIndexInternalStats);
internalStats = admin.topics().getInternalStats(
TopicName.get(topic2).getNamespace() + "/" +
TopicName.get(topic).getNamespace() + "/" +
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES);
verifyManagedLedgerInternalStats(stats.segmentIndexInternalStats.managedLedgerInternalStats, internalStats);
assertTrue(stats.segmentIndexInternalStats.managedLedgerName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.impl.LazySnapshotWriter;
import org.apache.pulsar.broker.transaction.buffer.impl.SingleSnapshotAbortedTxnProcessorImpl;
import org.apache.pulsar.broker.transaction.buffer.impl.SnapshotSegmentAbortedTxnProcessorImpl;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes;
Expand Down Expand Up @@ -227,8 +228,8 @@ public void testClearSnapshotSegments() throws Exception {
Field indexWriteFutureField = SnapshotSegmentAbortedTxnProcessorImpl
.PersistentWorker.class.getDeclaredField("snapshotIndexWriter");
indexWriteFutureField.setAccessible(true);
ReferenceCountedWriter<TransactionBufferSnapshotIndexes> snapshotIndexWriter =
(ReferenceCountedWriter<TransactionBufferSnapshotIndexes>) indexWriteFutureField.get(worker);
final var snapshotIndexWriter =
(LazySnapshotWriter<TransactionBufferSnapshotIndexes>) indexWriteFutureField.get(worker);
snapshotIndexWriter.release();
// After release, the writer should be closed, call close method again to make sure the writer was closed.
snapshotIndexWriter.getFuture().get().close();
Expand Down

0 comments on commit 7fc3cf5

Please sign in to comment.