From 890fdebb208a99814a9a1751f01ad9464209446a Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 9 Oct 2023 17:48:38 +0300 Subject: [PATCH] [fix][test] Fix flaky CompactionTest.testDispatcherMaxReadSizeBytes (#21329) (cherry picked from commit c883f50e117ff9da310c879aa048993c28ea955a) --- .../apache/pulsar/compaction/CompactionTest.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 4c6db644f1e01..52837cbdcd56a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -1898,22 +1898,21 @@ public void testDispatcherMaxReadSizeBytes() throws Exception { admin.topics().unload(topicName); - ConsumerImpl consumer = (ConsumerImpl) client.newConsumer(Schema.BYTES) - .topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName) - .subscribe(); - - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); + PersistentTopic topic = + (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, true, Map.of()).get().get(); TopicCompactionService topicCompactionService = Mockito.spy(topic.getTopicCompactionService()); FieldUtils.writeDeclaredField(topic, "topicCompactionService", topicCompactionService, true); + ConsumerImpl consumer = (ConsumerImpl) client.newConsumer(Schema.BYTES) + .topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName) + .subscribe(); + Awaitility.await().untilAsserted(() -> { assertEquals(consumer.getStats().getMsgNumInReceiverQueue(), 1); }); - consumer.increaseAvailablePermits(2); - Mockito.verify(topicCompactionService, Mockito.times(1)).readCompactedEntries(Mockito.any(), Mockito.same(1)); consumer.close();