Skip to content

Commit

Permalink
[fix][test] Fix flaky CompactionTest.testDispatcherMaxReadSizeBytes (a…
Browse files Browse the repository at this point in the history
…pache#21329)

(cherry picked from commit c883f50)
  • Loading branch information
lhotari authored and nikhil-ctds committed Dec 6, 2023
1 parent d4e3a35 commit 890fdeb
Showing 1 changed file with 6 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1898,22 +1898,21 @@ public void testDispatcherMaxReadSizeBytes() throws Exception {

admin.topics().unload(topicName);

ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) client.newConsumer(Schema.BYTES)
.topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName)
.subscribe();


PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
PersistentTopic topic =
(PersistentTopic) pulsar.getBrokerService().getTopic(topicName, true, Map.of()).get().get();
TopicCompactionService topicCompactionService = Mockito.spy(topic.getTopicCompactionService());
FieldUtils.writeDeclaredField(topic, "topicCompactionService", topicCompactionService, true);

ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) client.newConsumer(Schema.BYTES)
.topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName)
.subscribe();

Awaitility.await().untilAsserted(() -> {
assertEquals(consumer.getStats().getMsgNumInReceiverQueue(),
1);
});

consumer.increaseAvailablePermits(2);

Mockito.verify(topicCompactionService, Mockito.times(1)).readCompactedEntries(Mockito.any(), Mockito.same(1));

consumer.close();
Expand Down

0 comments on commit 890fdeb

Please sign in to comment.