diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 5bf29c32c53e..982c6e388053 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -1096,6 +1096,7 @@ public boolean checkMemory(boolean runOnFailure, Runnable runWhenAvailableParame if (isFull()) { if (runOnFailure && runWhenAvailable != null) { addToBlockList(runWhenAvailable, blockedCallback); + pagingManager.addBlockedStore(this); } return false; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullFailPolicyTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullFailPolicyTest.java new file mode 100644 index 000000000000..e7ee5c66113b --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullFailPolicyTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; +import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.extensions.parameterized.Parameter; +import org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension; +import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; +import java.net.URI; +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ExtendWith(ParameterizedTestExtension.class) +public class GlobalDiskFullFailPolicyTest extends GlobalDiskFullTest { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + @Parameter(index = 0) + public AddressFullMessagePolicy addressFullPolicy; + + @Parameters(name = "addressFullPolicy={0}") + public static Collection parameters() { + return Arrays.asList(new Object[][] { + {AddressFullMessagePolicy.FAIL}, {AddressFullMessagePolicy.DROP} + }); + } + + @Override + protected void configureAddressPolicy(ActiveMQServer server) { + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setAddressFullMessagePolicy(addressFullPolicy); + server.getConfiguration().addAddressSetting("TEST",addressSettings); + } + + @TestTemplate + @Override + public void testProducerOnDiskFull() throws Exception { + FileStoreMonitor monitor = ((ActiveMQServerImpl)server).getMonitor().setMaxUsage(0.0); + final CountDownLatch latch = new CountDownLatch(1); + monitor.addCallback((usableSpace, totalSpace, ok, type) -> { + latch.countDown(); + }); + + assertTrue(latch.await(1, TimeUnit.MINUTES)); + + AmqpClient client = createAmqpClient(new URI("tcp://localhost:" + AMQP_PORT)); + AmqpConnection connection = addConnection(client.connect()); + + try { + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender("TEST"); + byte[] payload = new byte[1000]; + + AmqpSender anonSender = session.createSender(); + + CountDownLatch sentWithName = new CountDownLatch(1); + CountDownLatch sentAnon = new CountDownLatch(1); + + Thread threadWithName = new Thread(() -> { + try { + final AmqpMessage message = new AmqpMessage(); + message.setBytes(payload); + sender.setSendTimeout(-1); + sender.send(message); + } catch (Exception e) { + logger.warn("Caught exception while sending", e); + } finally { + sentWithName.countDown(); + } + }); + + Thread threadWithAnon = new Thread(() -> { + try { + final AmqpMessage message = new AmqpMessage(); + message.setBytes(payload); + anonSender.setSendTimeout(-1); + message.setAddress(getQueueName()); + anonSender.send(message); + sentAnon.countDown(); + } catch (Exception e) { + logger.warn("Caught exception while sending", e); + } + }); + + ExecutorService pool = Executors.newCachedThreadPool(); + runAfter(pool::shutdownNow); + + pool.execute(threadWithName); + pool.execute(threadWithAnon); + + assertFalse(sentWithName.await(500, TimeUnit.MILLISECONDS), "Thread sender should be blocked"); + assertFalse(sentAnon.await(500, TimeUnit.MILLISECONDS), "Thread sender anonymous should be blocked"); + monitor.setMaxUsage(100.0); + + assertTrue(sentWithName.await(30, TimeUnit.SECONDS), "Thread sender should be released"); + assertTrue(sentAnon.await(30, TimeUnit.SECONDS), "Thread sender anonymous should be released"); + + } finally { + connection.close(); + } + } +}