Skip to content

Commit

Permalink
ENTMQBR-9338 AMQP producer not unblock if the disk space is freed
Browse files Browse the repository at this point in the history
(test adapted from the one on the Jira)
  • Loading branch information
howardgao committed Aug 20, 2024
1 parent 87cc157 commit fec502a
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,7 @@ public boolean checkMemory(boolean runOnFailure, Runnable runWhenAvailableParame
if (isFull()) {
if (runOnFailure && runWhenAvailable != null) {
addToBlockList(runWhenAvailable, blockedCallback);
pagingManager.addBlockedStore(this);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;

import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.transport.amqp.client.*;
import org.junit.jupiter.api.Test;

import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class AMQPBlockingTest extends GlobalDiskFullTest {

@Override
protected ActiveMQServer createServer(int port) throws Exception {
ActiveMQServer server = this.createServer(true, true);
server.getConfiguration().getAcceptorConfigurations().clear();
server.getConfiguration().getAcceptorConfigurations().add(this.addAcceptorConfiguration(server, port));
server.getConfiguration().setName("localhost");
server.getConfiguration().setJournalDirectory(server.getConfiguration().getJournalDirectory() + port);
server.getConfiguration().setBindingsDirectory(server.getConfiguration().getBindingsDirectory() + port);
server.getConfiguration().setPagingDirectory(server.getConfiguration().getPagingDirectory() + port);
server.getConfiguration().setMessageExpiryScanPeriod(5000L);
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
server.getConfiguration().addAddressSetting("TEST",addressSettings);
this.addAdditionalAcceptors(server);
this.configureAddressPolicy(server);
this.configureBrokerSecurity(server);
this.addConfiguration(server);
server.start();
this.createAddressAndQueues(server);
return server;
}

@Test
public void testProducerOnDiskFull() throws Exception {
FileStoreMonitor monitor = ((ActiveMQServerImpl)server).getMonitor().setMaxUsage(0.0);
final CountDownLatch latch = new CountDownLatch(1);
monitor.addCallback((usableSpace, totalSpace, ok, type) -> {
latch.countDown();
});

assertTrue(latch.await(1, TimeUnit.MINUTES));

AmqpClient client = createAmqpClient(new URI("tcp://localhost:" + AMQP_PORT));
AmqpConnection connection = addConnection(client.connect());

try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("TEST");
byte[] payload = new byte[1000];

AmqpSender anonSender = session.createSender();

CountDownLatch sentWithName = new CountDownLatch(1);
CountDownLatch sentAnon = new CountDownLatch(1);

Thread threadWithName = new Thread(() -> {
try {
final AmqpMessage message = new AmqpMessage();
message.setBytes(payload);
sender.setSendTimeout(-1);
sender.send(message);
} catch (Exception e) {
e.printStackTrace();
} finally {
sentWithName.countDown();
}
});

threadWithName.start();


Thread threadWithAnon = new Thread(() -> {
try {
final AmqpMessage message = new AmqpMessage();
message.setBytes(payload);
anonSender.setSendTimeout(-1);
message.setAddress(getQueueName());
anonSender.send(message);
} catch (Exception e) {
e.printStackTrace();
} finally {
sentAnon.countDown();
}
});

threadWithAnon.start();

assertFalse(sentWithName.await(500, TimeUnit.MILLISECONDS), "Thread sender should be blocked");
assertFalse(sentAnon.await(500, TimeUnit.MILLISECONDS), "Thread sender anonymous should be blocked");
monitor.setMaxUsage(100.0);

assertTrue(sentWithName.await(30, TimeUnit.SECONDS), "Thread sender should be released");
assertTrue(sentAnon.await(30, TimeUnit.SECONDS), "Thread sender anonymous should be released");

threadWithName.join(TimeUnit.SECONDS.toMillis(30));
threadWithAnon.join(TimeUnit.SECONDS.toMillis(30));
assertFalse(threadWithName.isAlive());
assertFalse(threadWithAnon.isAlive());
} finally {
connection.close();
}
}
}

0 comments on commit fec502a

Please sign in to comment.