diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 4c6db644f1e01..52837cbdcd56a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -1898,22 +1898,21 @@ public void testDispatcherMaxReadSizeBytes() throws Exception { admin.topics().unload(topicName); - ConsumerImpl consumer = (ConsumerImpl) client.newConsumer(Schema.BYTES) - .topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName) - .subscribe(); - - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); + PersistentTopic topic = + (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, true, Map.of()).get().get(); TopicCompactionService topicCompactionService = Mockito.spy(topic.getTopicCompactionService()); FieldUtils.writeDeclaredField(topic, "topicCompactionService", topicCompactionService, true); + ConsumerImpl consumer = (ConsumerImpl) client.newConsumer(Schema.BYTES) + .topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName) + .subscribe(); + Awaitility.await().untilAsserted(() -> { assertEquals(consumer.getStats().getMsgNumInReceiverQueue(), 1); }); - consumer.increaseAvailablePermits(2); - Mockito.verify(topicCompactionService, Mockito.times(1)).readCompactedEntries(Mockito.any(), Mockito.same(1)); consumer.close();