Skip to content

Commit

Permalink
Fix compatibility with latest Spring AMQP
Browse files Browse the repository at this point in the history
  • Loading branch information
artembilan committed Dec 21, 2023
1 parent 075613c commit f96beb3
Showing 1 changed file with 9 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2022 the original author or authors.
* Copyright 2014-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@
import java.util.Set;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

import org.junit.After;
import org.junit.ClassRule;
Expand Down Expand Up @@ -134,19 +135,23 @@ public void pubSubLostConnectionTest() throws Exception {
private void waitForNewConsumer(PublishSubscribeAmqpChannel channel, BlockingQueueConsumer consumer)
throws Exception {

final Object consumersMonitor = TestUtils.getPropertyValue(channel, "container.consumersMonitor");
Lock consumersLock = TestUtils.getPropertyValue(channel, "container.consumersLock", Lock.class);
int n = 0;
while (n++ < 100) {
Set<BlockingQueueConsumer> consumers = TestUtils
.getPropertyValue(channel, "container.consumers", Set.class);
synchronized (consumersMonitor) {
consumersLock.lock();
try {
if (!consumers.isEmpty()) {
BlockingQueueConsumer newConsumer = consumers.iterator().next();
if (newConsumer != consumer && newConsumer.getConsumerTags().size() > 0) {
if (newConsumer != consumer && !newConsumer.getConsumerTags().isEmpty()) {
break;
}
}
}
finally {
consumersLock.unlock();
}
Thread.sleep(100);
}
assertThat(n < 100).as("Failed to restart consumer").isTrue();
Expand Down

0 comments on commit f96beb3

Please sign in to comment.