From 7fc3cf55985e40b4884b761f4c9a213d0bf762dc Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 23 Jul 2024 16:02:38 +0800 Subject: [PATCH] Fix failed tests due to asynchronous snapshot taking --- .../broker/admin/v3/AdminApiTransactionTest.java | 12 ++++++++---- .../transaction/SegmentAbortedTxnProcessorTest.java | 5 +++-- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java index 1cc20b04c2137b..fb14e4f2045c50 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java @@ -653,14 +653,18 @@ public void testGetTransactionBufferInternalStats() throws Exception { transaction.abort().get(); // Get transaction buffer internal stats and verify segmented snapshot stats - stats = admin.transactions().getTransactionBufferInternalStatsAsync(topic3, true).get(); + Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> assertTransactionStats(topic3)); + } + + private void assertTransactionStats(String topic) throws Exception { + var stats = admin.transactions().getTransactionBufferInternalStatsAsync(topic, true).get(); assertEquals(stats.snapshotType, AbortedTxnProcessor.SnapshotType.Segment.toString()); assertNull(stats.singleSnapshotSystemTopicInternalStats); assertNotNull(stats.segmentInternalStats); // Get managed ledger internal stats for the transaction buffer segments topic - internalStats = admin.topics().getInternalStats( - TopicName.get(topic2).getNamespace() + "/" + + var internalStats = admin.topics().getInternalStats( + TopicName.get(topic).getNamespace() + "/" + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS); verifyManagedLedgerInternalStats(stats.segmentInternalStats.managedLedgerInternalStats, internalStats); assertTrue(stats.segmentInternalStats.managedLedgerName @@ -669,7 +673,7 @@ public void testGetTransactionBufferInternalStats() throws Exception { // Get managed ledger internal stats for the transaction buffer indexes topic assertNotNull(stats.segmentIndexInternalStats); internalStats = admin.topics().getInternalStats( - TopicName.get(topic2).getNamespace() + "/" + + TopicName.get(topic).getNamespace() + "/" + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES); verifyManagedLedgerInternalStats(stats.segmentIndexInternalStats.managedLedgerInternalStats, internalStats); assertTrue(stats.segmentIndexInternalStats.managedLedgerName diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java index d9ba825f02e93d..f0b753bc5eade0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java @@ -47,6 +47,7 @@ import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory; import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor; +import org.apache.pulsar.broker.transaction.buffer.impl.LazySnapshotWriter; import org.apache.pulsar.broker.transaction.buffer.impl.SingleSnapshotAbortedTxnProcessorImpl; import org.apache.pulsar.broker.transaction.buffer.impl.SnapshotSegmentAbortedTxnProcessorImpl; import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes; @@ -227,8 +228,8 @@ public void testClearSnapshotSegments() throws Exception { Field indexWriteFutureField = SnapshotSegmentAbortedTxnProcessorImpl .PersistentWorker.class.getDeclaredField("snapshotIndexWriter"); indexWriteFutureField.setAccessible(true); - ReferenceCountedWriter snapshotIndexWriter = - (ReferenceCountedWriter) indexWriteFutureField.get(worker); + final var snapshotIndexWriter = + (LazySnapshotWriter) indexWriteFutureField.get(worker); snapshotIndexWriter.release(); // After release, the writer should be closed, call close method again to make sure the writer was closed. snapshotIndexWriter.getFuture().get().close();