Skip to content

Commit

Permalink
Fix phase for TaskScheduler instances in tests
Browse files Browse the repository at this point in the history
Related to: #8856

Many tests create their own `ThreadPoolTaskScheduler` beans.
Therefore, its default phase might affect the memory and performance.

* Use `phase = SmartLifecycle.DEFAULT_PHASE / 2` for manual
 `ThreadPoolTaskScheduler` beans
* Migrate affected tests classes to JUnit 5
* Make some other configuration adjustments for better performance

**Cherry-pick to `6.2.x`**

(cherry picked from commit 39c99c0)
  • Loading branch information
artembilan committed Jan 16, 2024
1 parent 0be2c76 commit 1a3c25b
Show file tree
Hide file tree
Showing 25 changed files with 246 additions and 202 deletions.
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:task="http://www.springframework.org/schema/task"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/task https://www.springframework.org/schema/task/spring-task.xsd
http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/util https://www.springframework.org/schema/util/spring-util.xsd">

<context:property-placeholder properties-ref="props"/>

<util:properties id="props"/>
<util:properties id="props"/>

<channel id="outputChannel">
<queue capacity="5"/>
Expand All @@ -40,96 +38,99 @@
<beans:bean id="transactionManager" class="org.springframework.integration.transaction.PseudoTransactionManager"/>

<aggregator id="completelyDefinedAggregator"
input-channel="completelyDefinedAggregatorInput"
output-channel="outputChannel"
discard-channel="discardChannel"
ref="aggregatorBean"
release-strategy="releaseStrategy"
correlation-strategy="correlationStrategy"
send-timeout="86420000"
send-partial-result-on-expiry="true"
expire-groups-upon-completion="true"
expire-groups-upon-timeout="false"
empty-group-min-timeout="123"
group-timeout="456"
lock-registry="lockRegistry"
scheduler="scheduler"
message-store="store"
pop-sequence="false"
order="5"
expire-duration="10000"
expire-timeout="250">
<expire-transactional/>
input-channel="completelyDefinedAggregatorInput"
output-channel="outputChannel"
discard-channel="discardChannel"
ref="aggregatorBean"
release-strategy="releaseStrategy"
correlation-strategy="correlationStrategy"
send-timeout="86420000"
send-partial-result-on-expiry="true"
expire-groups-upon-completion="true"
expire-groups-upon-timeout="false"
empty-group-min-timeout="123"
group-timeout="456"
lock-registry="lockRegistry"
scheduler="scheduler"
message-store="store"
pop-sequence="false"
order="5"
expire-duration="10000"
expire-timeout="250">
<expire-transactional/>
</aggregator>

<beans:bean id="lockRegistry" class="org.springframework.integration.support.locks.DefaultLockRegistry"/>

<task:scheduler id="scheduler"/>
<beans:bean id="scheduler"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
<beans:property name="phase" value="1073741823"/>
</beans:bean>

<beans:bean id="store" class="org.springframework.integration.store.SimpleMessageStore"/>

<channel id="aggregatorWithExpressionsInput"/>
<channel id="aggregatorWithExpressionsOutput"/>
<aggregator id="aggregatorWithExpressions"
input-channel="aggregatorWithExpressionsInput"
output-channel="aggregatorWithExpressionsOutput"
expression="?[payload.startsWith('1')].![payload]"
release-strategy-expression="#root.size()>2"
correlation-strategy-expression="headers['foo']"/>
input-channel="aggregatorWithExpressionsInput"
output-channel="aggregatorWithExpressionsOutput"
expression="?[payload.startsWith('1')].![payload]"
release-strategy-expression="#root.size()>2"
correlation-strategy-expression="headers['foo']"/>

<channel id="aggregatorWithReferenceAndMethodInput"/>
<aggregator id="aggregatorWithReferenceAndMethod"
ref="adderBean"
method="add"
input-channel="aggregatorWithReferenceAndMethodInput"
output-channel="outputChannel"/>
ref="adderBean"
method="add"
input-channel="aggregatorWithReferenceAndMethodInput"
output-channel="outputChannel"/>

<channel id="aggregatorWithPojoReleaseStrategyInput"/>
<aggregator id="aggregatorWithPojoReleaseStrategy"
input-channel="aggregatorWithPojoReleaseStrategyInput"
output-channel="outputChannel"
ref="adderBean"
method="add"
release-strategy="pojoReleaseStrategy"
release-strategy-method="checkCompletenessAsList"/>
input-channel="aggregatorWithPojoReleaseStrategyInput"
output-channel="outputChannel"
ref="adderBean"
method="add"
release-strategy="pojoReleaseStrategy"
release-strategy-method="checkCompletenessAsList"/>

<channel id="aggregatorWithPojoReleaseStrategyInputAsCollection"/>
<aggregator id="aggregatorWithPojoReleaseStrategyAsCollection"
input-channel="aggregatorWithPojoReleaseStrategyInputAsCollection"
output-channel="outputChannel"
ref="adderBean"
method="add"
release-strategy="pojoReleaseStrategy"
release-strategy-method="checkCompletenessAsCollection"/>
input-channel="aggregatorWithPojoReleaseStrategyInputAsCollection"
output-channel="outputChannel"
ref="adderBean"
method="add"
release-strategy="pojoReleaseStrategy"
release-strategy-method="checkCompletenessAsCollection"/>

<channel id="aggregatorWithExpressionsAndPojoAggregatorInput"/>
<aggregator id="aggregatorWithExpressionsAndPojoAggregator"
input-channel="aggregatorWithExpressionsAndPojoAggregatorInput"
ref="aggregatorBean"
release-strategy-expression="size() == 2"
correlation-strategy-expression="headers['foo']"
empty-group-min-timeout="60000"/>
input-channel="aggregatorWithExpressionsAndPojoAggregatorInput"
ref="aggregatorBean"
release-strategy-expression="size() == 2"
correlation-strategy-expression="headers['foo']"
empty-group-min-timeout="60000"/>

<beans:bean id="aggregatorBean"
class="org.springframework.integration.config.TestAggregatorBean" />
class="org.springframework.integration.config.TestAggregatorBean"/>

<beans:bean id="aggregatorMGPBean"
class="org.springframework.integration.aggregator.SimpleMessageGroupProcessor" />
class="org.springframework.integration.aggregator.SimpleMessageGroupProcessor"/>

<beans:bean id="aggregatorCustomMGPBean"
class="org.springframework.integration.config.AggregatorParserTests$MyMGP" />
class="org.springframework.integration.config.AggregatorParserTests$MyMGP"/>

<beans:bean id="adderBean"
class="org.springframework.integration.config.Adder" />
<beans:bean id="adderBean"
class="org.springframework.integration.config.Adder"/>

<beans:bean id="releaseStrategy"
class="org.springframework.integration.config.TestReleaseStrategy" />
class="org.springframework.integration.config.TestReleaseStrategy"/>

<beans:bean id="correlationStrategy" class="org.springframework.integration.config.TestCorrelationStrategy"/>

<beans:bean id="pojoReleaseStrategy"
class="org.springframework.integration.config.MaxValueReleaseStrategy">
<beans:constructor-arg value="10" />
class="org.springframework.integration.config.MaxValueReleaseStrategy">
<beans:constructor-arg value="10"/>
</beans:bean>

</beans:beans>
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@

<beans:bean id="testScheduler" class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler"
p:poolSize="7"
p:phase="1073741823"
p:waitForTasksToCompleteOnShutdown="true"/>

<beans:bean id="testMessageStore" class="org.springframework.integration.store.SimpleMessageStore"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

<beans:bean id="multiThreadScheduler" class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler"
p:poolSize="5"
p:phase="1073741823"
p:waitForTasksToCompleteOnShutdown="true"/>

<service-activator input-channel="outputB" output-channel="outputB1" method="processMessage" ref="sampleHandler"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@
default-request-channel="requestChannel"
proxy-default-methods="true"/>

<channel id="receiveChannel">
<queue />
</channel>

<gateway id="solicitResponse"
service-interface="org.springframework.integration.gateway.TestService"
default-reply-channel="replyChannel"
default-reply-timeout="3000"/>
default-reply-channel="receiveChannel"
default-reply-timeout="5000"/>

<gateway id="requestReply"
service-interface="org.springframework.integration.gateway.TestService"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -72,7 +72,7 @@
* @author Gary Russell
*/
@SpringJUnitConfig
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@DirtiesContext
public class GatewayParserTests {

@Autowired
Expand Down Expand Up @@ -132,7 +132,7 @@ public void testOneWayOverride() {

@Test
public void testSolicitResponse() {
PollableChannel channel = (PollableChannel) context.getBean("replyChannel");
PollableChannel channel = (PollableChannel) context.getBean("receiveChannel");
channel.send(new GenericMessage<>("foo"));
TestService service = (TestService) context.getBean("solicitResponse");
String result = service.solicitResponse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
https://www.springframework.org/schema/integration/spring-integration.xsd">

<beans:bean id="taskScheduler"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler" />
class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
<beans:property name="phase" value="1073741823"/>
</beans:bean>

</beans:beans>
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -41,6 +41,7 @@
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.Lifecycle;
import org.springframework.context.SmartLifecycle;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
Expand Down Expand Up @@ -586,6 +587,7 @@ public PollerSpec poller() {
@Bean(name = IntegrationContextUtils.TASK_SCHEDULER_BEAN_NAME)
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPhase(SmartLifecycle.DEFAULT_PHASE / 2);
threadPoolTaskScheduler.setPoolSize(100);
return threadPoolTaskScheduler;
}
Expand Down Expand Up @@ -925,7 +927,9 @@ public IntegrationFlow dedicatedPollingThreadFlow() {

@Bean
public TaskScheduler dedicatedTaskScheduler() {
return new ThreadPoolTaskScheduler();
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPhase(SmartLifecycle.DEFAULT_PHASE / 2);
return threadPoolTaskScheduler;
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -80,6 +80,7 @@ public void test() throws Exception {
for (int i = 0; i < 3; i++) {
messages.add(channel.receive(1000));
}
adapter.stop();
scheduler.destroy();
Message<?> message1 = messages.get(0);
assertThat(message1.getPayload()).isEqualTo("test-1");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:task="http://www.springframework.org/schema/task"
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/task https://www.springframework.org/schema/task/spring-task.xsd
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:task="http://www.springframework.org/schema/task"
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/task https://www.springframework.org/schema/task/spring-task.xsd
http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd">

Expand All @@ -23,7 +23,10 @@
<task:scheduled ref="reaper3" method="run" fixed-rate="100"/>
</task:scheduled-tasks>

<task:scheduler id="scheduler"/>
<bean id="scheduler"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
<property name="phase" value="1073741823"/>
</bean>

<bean id="messageStore2" class="org.springframework.integration.store.SimpleMessageStore">
<property name="expiryCallbacks" ref="expiryCallback2"/>
Expand All @@ -35,21 +38,21 @@
<property name="messageGroupStore" ref="messageStore2"/>
</bean>

<bean id="messageStore3" class="org.springframework.integration.store.SimpleMessageStore" />
<bean id="messageStore3" class="org.springframework.integration.store.SimpleMessageStore"/>

<bean id="reaper3" class="org.springframework.integration.store.MessageGroupStoreReaper">
<property name="messageGroupStore" ref="messageStore3"/>
<property name="timeout" value="50" />
<property name="timeout" value="50"/>
</bean>

<int:aggregator input-channel="aggChannel"
discard-channel="discards"
message-store="messageStore3" />
discard-channel="discards"
message-store="messageStore3"/>

<int:channel id="aggChannel" />
<int:channel id="aggChannel"/>

<int:channel id="discards">
<int:queue />
<int:queue/>
</int:channel>

</beans>
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -543,6 +543,7 @@ public void noFlushAppend() throws Exception {
}
assertThat(flushes.get()).isGreaterThanOrEqualTo(2);
handler.stop();
taskScheduler.destroy();
}

@Test
Expand Down Expand Up @@ -586,6 +587,7 @@ protected BufferedOutputStream createOutputStream(File fileToWriteTo, boolean ap
verify(out).write(any(byte[].class), anyInt(), anyInt());
assertThat(closeWhileWriting.get()).isFalse();
handler.stop();
taskScheduler.destroy();
}

@Test
Expand Down
Loading

0 comments on commit 1a3c25b

Please sign in to comment.