From 2cf83b5311bc355b4a100ce26fb76a4885d27cb2 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Thu, 5 Dec 2024 13:58:59 -0500 Subject: [PATCH] GH-9695: Replace AMQP `tx-size` with `batch-size` Fixes: #9695 Issue link: https://github.com/spring-projects/spring-integration/issues/9695 The `txSize` in the `SimpleMessageListenerContainer` has been replaced with more reasonable `batchSize`. Spring Integration XML support for AMQP module has missed to fix this change: we didn't have a respective test. * Deprecate `tx-size` (will be removed in `6.5`) XML attribute for the `` and introduce `batch-size` * Cover with the tests * Fix docs from `tx-size` to `batch-size` **Auto-cherry-pick to `6.3.x`** --- .../AbstractAmqpInboundAdapterParser.java | 9 +++- .../amqp/config/spring-integration-amqp.xsd | 17 ++++++- ...boundChannelAdapterParserTests-context.xml | 44 +++++++++---------- .../AmqpInboundChannelAdapterParserTests.java | 31 ++++++------- .../antora/modules/ROOT/pages/amqp.adoc | 2 +- .../pages/amqp/inbound-channel-adapter.adoc | 6 +-- .../ROOT/pages/amqp/inbound-gateway.adoc | 2 +- 7 files changed, 66 insertions(+), 45 deletions(-) diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/config/AbstractAmqpInboundAdapterParser.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/config/AbstractAmqpInboundAdapterParser.java index 32833e80122..44a615b499c 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/config/AbstractAmqpInboundAdapterParser.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/config/AbstractAmqpInboundAdapterParser.java @@ -59,6 +59,7 @@ abstract class AbstractAmqpInboundAdapterParser extends AbstractSingleBeanDefini "receive-timeout", "shutdown-timeout", "tx-size", + "batch-size", "missing-queues-fatal" }; @@ -154,7 +155,13 @@ private BeanDefinition buildListenerContainer(Element element, ParserContext par } builder.addConstructorArgReference(connectionFactoryRef); for (String attributeName : CONTAINER_VALUE_ATTRIBUTES) { - IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, attributeName); + // TODO remove 'tx-size' in 6.5 + if ("tx-size".equals(attributeName)) { + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, attributeName, "batchSize"); + } + else { + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, attributeName); + } } for (String attributeName : CONTAINER_REFERENCE_ATTRIBUTES) { IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, attributeName); diff --git a/spring-integration-amqp/src/main/resources/org/springframework/integration/amqp/config/spring-integration-amqp.xsd b/spring-integration-amqp/src/main/resources/org/springframework/integration/amqp/config/spring-integration-amqp.xsd index bf514627342..e6172835ef1 100644 --- a/spring-integration-amqp/src/main/resources/org/springframework/integration/amqp/config/spring-integration-amqp.xsd +++ b/spring-integration-amqp/src/main/resources/org/springframework/integration/amqp/config/spring-integration-amqp.xsd @@ -969,7 +969,7 @@ standard headers to also be mapped. To map all non-standard headers the 'NON_STA Acknowledge Mode for the MessageListenerContainer; default 'AUTO' meaning the adapter automatically acknowledges the message(s) - according to the tx-size. + according to the batch-size. @@ -1039,7 +1039,7 @@ standard headers to also be mapped. To map all non-standard headers the 'NON_STA Specifies how many messages to send to each consumer in a single request. Often this can be set quite high - to improve throughput. It should be greater than or equal to the tx-size value. + to improve throughput. It should be greater than or equal to the batch-size value. @@ -1117,10 +1117,23 @@ standard headers to also be mapped. To map all non-standard headers the 'NON_STA + [DEPRECATED] How many messages to process in a single transaction (if the channel is transactional). For best results it should be less than or equal to the prefetch count. Not allowed when 'consumers-per-queue' is set. + Deprecated in favor of 'batch-size'. + + + + + + + + + How many messages to process in a single request. + For best results it should be less than or equal to the prefetch count. + Not allowed when 'consumers-per-queue' is set. diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpInboundChannelAdapterParserTests-context.xml b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpInboundChannelAdapterParserTests-context.xml index 8b105f34e6c..85e4ee30bb8 100644 --- a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpInboundChannelAdapterParserTests-context.xml +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpInboundChannelAdapterParserTests-context.xml @@ -1,14 +1,12 @@ @@ -20,32 +18,34 @@ + batch-mode="EXTRACT_PAYLOADS" tx-size="2"/> + auto-startup="false" phase="123" acknowledge-mode="${ackMode}" + missing-queues-fatal="false" + batch-size="3"/> + channel="requestChannel" queue-names="inboundchanneladapter.test.2" + auto-startup="false" phase="123" + mapped-request-headers="foo*, STANDARD_REQUEST_HEADERS"/> + channel="requestChannel" queue-names="inboundchanneladapter.test.2" + auto-startup="false" phase="123" + mapped-request-headers="foo*"/> + channel="requestChannel" queue-names="inboundchanneladapter.test.2" + auto-startup="false" phase="123" + mapped-request-headers=""/> + channel="requestChannel" queue-names="inboundchanneladapter.test.2" + auto-startup="false" phase="123"/> + auto-startup="false"/> diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpInboundChannelAdapterParserTests.java b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpInboundChannelAdapterParserTests.java index 1bc7a1703b3..04eb584d5f3 100644 --- a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpInboundChannelAdapterParserTests.java +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpInboundChannelAdapterParserTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,8 +16,7 @@ package org.springframework.integration.amqp.config; -import org.junit.Test; -import org.junit.runner.RunWith; +import org.junit.jupiter.api.Test; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Message; @@ -37,10 +36,10 @@ import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.test.util.TestUtils; import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; /** * @author Mark Fisher @@ -49,8 +48,7 @@ * * @since 2.1 */ -@ContextConfiguration -@RunWith(SpringJUnit4ClassRunner.class) +@SpringJUnitConfig @DirtiesContext public class AmqpInboundChannelAdapterParserTests { @@ -71,6 +69,8 @@ public void verifyIdAsChannel() { .isInstanceOf(SimpleMessageListenerContainer.class); assertThat(TestUtils.getPropertyValue(adapter, "batchMode", BatchMode.class)) .isEqualTo(BatchMode.EXTRACT_PAYLOADS); + assertThat(TestUtils.getPropertyValue(adapter, "messageListenerContainer.batchSize", Integer.class)) + .isEqualTo(2); } @Test @@ -95,6 +95,8 @@ public void verifyLifeCycle() { .isEqualTo(AcknowledgeMode.NONE); assertThat(TestUtils.getPropertyValue(adapter, "messageListenerContainer.missingQueuesFatal", Boolean.class)) .isFalse(); + assertThat(TestUtils.getPropertyValue(adapter, "messageListenerContainer.batchSize", Integer.class)) + .isEqualTo(3); } @Test @@ -216,14 +218,13 @@ public void withHeaderMapperDefaultMapping() throws Exception { @Test public void testInt2971HeaderMapperAndMappedHeadersExclusivity() { - try { - new ClassPathXmlApplicationContext("AmqpInboundChannelAdapterParserTests-headerMapper-fail-context.xml", - this.getClass()).close(); - } - catch (BeanDefinitionParsingException e) { - assertThat(e.getMessage().startsWith("Configuration problem: The 'header-mapper' attribute " + - "is mutually exclusive with 'mapped-request-headers' or 'mapped-reply-headers'")).isTrue(); - } + assertThatExceptionOfType(BeanDefinitionParsingException.class) + .isThrownBy(() -> + new ClassPathXmlApplicationContext( + "AmqpInboundChannelAdapterParserTests-headerMapper-fail-context.xml", + getClass())) + .withMessageStartingWith("Configuration problem: The 'header-mapper' attribute " + + "is mutually exclusive with 'mapped-request-headers' or 'mapped-reply-headers'"); } } diff --git a/src/reference/antora/modules/ROOT/pages/amqp.adoc b/src/reference/antora/modules/ROOT/pages/amqp.adoc index b6725a17068..827f4952aef 100644 --- a/src/reference/antora/modules/ROOT/pages/amqp.adoc +++ b/src/reference/antora/modules/ROOT/pages/amqp.adoc @@ -32,7 +32,7 @@ The following adapters are available: * xref:amqp/inbound-gateway.adoc[Inbound Gateway] * xref:amqp/outbound-channel-adapter.adoc[Outbound Channel Adapter] * xref:amqp/outbound-gateway.adoc[Outbound Gateway] -* xref:amqp-async-outbound-gateway[Async Outbound Gateway] +* xref:amqp/async-outbound-gateway.adoc[Async Outbound Gateway] * xref:amqp/rmq-streams.adoc#rmq-stream-inbound-channel-adapter[RabbitMQ Stream Queue Inbound Channel Adapter] * xref:amqp/rmq-streams.adoc#rmq-stream-outbound-channel-adapter[RabbitMQ Stream Queue Outbound Channel Adapter] diff --git a/src/reference/antora/modules/ROOT/pages/amqp/inbound-channel-adapter.adoc b/src/reference/antora/modules/ROOT/pages/amqp/inbound-channel-adapter.adoc index e8ae8d91de3..906bf071fa4 100644 --- a/src/reference/antora/modules/ROOT/pages/amqp/inbound-channel-adapter.adoc +++ b/src/reference/antora/modules/ROOT/pages/amqp/inbound-channel-adapter.adoc @@ -87,7 +87,7 @@ XML:: task-executor="" <22> transaction-attribute="" <23> transaction-manager="" <24> - tx-size="" <25> + batch-size="" <25> consumers-per-queue <26> batch-mode="MESSAGES"/> <27> @@ -146,7 +146,7 @@ By default, this value is `Integer.MAX_VALUE`, meaning that this container start Optional. <17> Tells the AMQP broker how many messages to send to each consumer in a single request. Often, you can set this value high to improve throughput. -It should be greater than or equal to the transaction size (see the `tx-size` attribute, later in this list). +It should be greater than or equal to the transaction size (see the `batch-size` attribute, later in this list). Optional (defaults to `1`). <18> Receive timeout in milliseconds. Optional (defaults to `1000`). @@ -173,7 +173,7 @@ If the `channelTransacted` flag is `false`, no transaction semantics apply to th For further information, see https://docs.spring.io/spring-amqp/reference/html/%255Freference.html#%5Ftransactions[Transactions with Spring AMQP]. Optional. -<25> Tells the `SimpleMessageListenerContainer` how many messages to process in a single transaction (if the channel is transactional). +<25> Tells the `SimpleMessageListenerContainer` how many messages to process in a single request. For best results, it should be less than or equal to the value set in `prefetch-count`. Not allowed when 'consumers-per-queue' is set. Optional (defaults to `1`). diff --git a/src/reference/antora/modules/ROOT/pages/amqp/inbound-gateway.adoc b/src/reference/antora/modules/ROOT/pages/amqp/inbound-gateway.adoc index 7fcdf9617e4..cc2762a88ed 100644 --- a/src/reference/antora/modules/ROOT/pages/amqp/inbound-gateway.adoc +++ b/src/reference/antora/modules/ROOT/pages/amqp/inbound-gateway.adoc @@ -10,7 +10,7 @@ Java DSL:: + [source, java, role="primary"] ---- -@Bean // return the upper cased payload +@Bean // return the upper-cased payload public IntegrationFlow amqpInboundGateway(ConnectionFactory connectionFactory) { return IntegrationFlow.from(Amqp.inboundGateway(connectionFactory, "foo")) .transform(String.class, String::toUpperCase)