diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 5bf29c32c53e..982c6e388053 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -1096,6 +1096,7 @@ public boolean checkMemory(boolean runOnFailure, Runnable runWhenAvailableParame if (isFull()) { if (runOnFailure && runWhenAvailable != null) { addToBlockList(runWhenAvailable, blockedCallback); + pagingManager.addBlockedStore(this); } return false; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AMQPBlockingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AMQPBlockingTest.java new file mode 100644 index 000000000000..79550fbb3de3 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AMQPBlockingTest.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; +import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.transport.amqp.client.*; +import org.junit.jupiter.api.Test; + +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class AMQPBlockingTest extends GlobalDiskFullTest { + + @Override + protected ActiveMQServer createServer(int port) throws Exception { + ActiveMQServer server = this.createServer(true, true); + server.getConfiguration().getAcceptorConfigurations().clear(); + server.getConfiguration().getAcceptorConfigurations().add(this.addAcceptorConfiguration(server, port)); + server.getConfiguration().setName("localhost"); + server.getConfiguration().setJournalDirectory(server.getConfiguration().getJournalDirectory() + port); + server.getConfiguration().setBindingsDirectory(server.getConfiguration().getBindingsDirectory() + port); + server.getConfiguration().setPagingDirectory(server.getConfiguration().getPagingDirectory() + port); + server.getConfiguration().setMessageExpiryScanPeriod(5000L); + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL); + server.getConfiguration().addAddressSetting("TEST",addressSettings); + this.addAdditionalAcceptors(server); + this.configureAddressPolicy(server); + this.configureBrokerSecurity(server); + this.addConfiguration(server); + server.start(); + this.createAddressAndQueues(server); + return server; + } + + @Test + public void testProducerOnDiskFull() throws Exception { + FileStoreMonitor monitor = ((ActiveMQServerImpl)server).getMonitor().setMaxUsage(0.0); + final CountDownLatch latch = new CountDownLatch(1); + monitor.addCallback((usableSpace, totalSpace, ok, type) -> { + latch.countDown(); + }); + + assertTrue(latch.await(1, TimeUnit.MINUTES)); + + AmqpClient client = createAmqpClient(new URI("tcp://localhost:" + AMQP_PORT)); + AmqpConnection connection = addConnection(client.connect()); + + try { + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender("TEST"); + byte[] payload = new byte[1000]; + + AmqpSender anonSender = session.createSender(); + + CountDownLatch sentWithName = new CountDownLatch(1); + CountDownLatch sentAnon = new CountDownLatch(1); + + Thread threadWithName = new Thread(() -> { + try { + final AmqpMessage message = new AmqpMessage(); + message.setBytes(payload); + sender.setSendTimeout(-1); + sender.send(message); + } catch (Exception e) { + e.printStackTrace(); + } finally { + sentWithName.countDown(); + } + }); + + threadWithName.start(); + + + Thread threadWithAnon = new Thread(() -> { + try { + final AmqpMessage message = new AmqpMessage(); + message.setBytes(payload); + anonSender.setSendTimeout(-1); + message.setAddress(getQueueName()); + anonSender.send(message); + } catch (Exception e) { + e.printStackTrace(); + } finally { + sentAnon.countDown(); + } + }); + + threadWithAnon.start(); + + assertFalse(sentWithName.await(500, TimeUnit.MILLISECONDS), "Thread sender should be blocked"); + assertFalse(sentAnon.await(500, TimeUnit.MILLISECONDS), "Thread sender anonymous should be blocked"); + monitor.setMaxUsage(100.0); + + assertTrue(sentWithName.await(30, TimeUnit.SECONDS), "Thread sender should be released"); + assertTrue(sentAnon.await(30, TimeUnit.SECONDS), "Thread sender anonymous should be released"); + + threadWithName.join(TimeUnit.SECONDS.toMillis(30)); + threadWithAnon.join(TimeUnit.SECONDS.toMillis(30)); + assertFalse(threadWithName.isAlive()); + assertFalse(threadWithAnon.isAlive()); + } finally { + connection.close(); + } + } +}