From 0be2c76530c54e64718e044fbca19c72d4d54d45 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Fri, 12 Jan 2024 13:51:45 -0500 Subject: [PATCH] GH-8856: Adjust phase for TaskScheduler global bean Fixes: #8856 The `ThreadPoolTaskScheduler` in Spring Framework manages now a lifecycle with a `Integer.MAX_VALUE` phase by default. This causes a wait block for scheduled tasks in the `AbstractPollingEndpoint` instances. These tasks are cancelled by in the later `Integer.MAX_VALUE / 2` phase. So, we have a lifecycle deadlock on context close * Fix `DefaultConfiguringBeanFactoryPostProcessor.registerTaskScheduler()` with a `.addPropertyValue("phase", SmartLifecycle.DEFAULT_PHASE / 2)` to let the `AbstractPollingEndpoint` to cancel its currently scheduled task to avoid possible pollution with unexpected (and lost) messages **Cherry-pick to `6.2.x`** (cherry picked from commit 11e0ea1305ca3e927ddf59364780724b76c4e7f4) --- ...ltConfiguringBeanFactoryPostProcessor.java | 4 ++- .../endpoint/PollingLifecycleTests.java | 25 ++++++++++++++++++- .../config/ScatterGatherTests-context.xml | 3 ++- .../TestDefaultAnnotationConfiguration.java | 25 +++++++++++++++++++ 4 files changed, 54 insertions(+), 3 deletions(-) create mode 100644 spring-integration-core/src/test/java/org/springframework/integration/util/TestDefaultAnnotationConfiguration.java diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/DefaultConfiguringBeanFactoryPostProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/config/DefaultConfiguringBeanFactoryPostProcessor.java index e5782bb7e7b..96bd94dc24e 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/DefaultConfiguringBeanFactoryPostProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/DefaultConfiguringBeanFactoryPostProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2023 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,6 +33,7 @@ import org.springframework.beans.factory.support.BeanDefinitionRegistry; import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor; import org.springframework.beans.factory.support.RootBeanDefinition; +import org.springframework.context.SmartLifecycle; import org.springframework.core.Ordered; import org.springframework.core.log.LogAccessor; import org.springframework.integration.channel.ChannelUtils; @@ -276,6 +277,7 @@ private void registerTaskScheduler() { IntegrationProperties.TASK_SCHEDULER_POOL_SIZE)) .addPropertyValue("threadNamePrefix", "task-scheduler-") .addPropertyValue("rejectedExecutionHandler", new CallerRunsPolicy()) + .addPropertyValue("phase", SmartLifecycle.DEFAULT_PHASE / 2) .addPropertyReference("errorHandler", ChannelUtils.MESSAGE_PUBLISHING_ERROR_HANDLER_BEAN_NAME) .getBeanDefinition(); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/endpoint/PollingLifecycleTests.java b/spring-integration-core/src/test/java/org/springframework/integration/endpoint/PollingLifecycleTests.java index 3cc371c4aaa..f5ee4eca7e3 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/endpoint/PollingLifecycleTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/endpoint/PollingLifecycleTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,18 +29,21 @@ import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.Lifecycle; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.integration.channel.NullChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean; import org.springframework.integration.config.TestErrorHandler; import org.springframework.integration.core.MessageSource; import org.springframework.integration.scheduling.PollerMetadata; +import org.springframework.integration.util.TestDefaultAnnotationConfiguration; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.messaging.support.GenericMessage; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.support.PeriodicTrigger; +import org.springframework.util.StopWatch; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.atMost; @@ -220,4 +223,24 @@ public boolean isRunning() { assertThat(stopInvoked.get()).isTrue(); } + @Test + public void theScheduledPollingTaskIsCancelledNotCausingApplicationContextStopDeadLock() { + var context = new AnnotationConfigApplicationContext(); + context.register(TestDefaultAnnotationConfiguration.class); + + PollingConsumer consumer = new PollingConsumer(new QueueChannel(), (m) -> { }); + consumer.setTrigger(new PeriodicTrigger(Duration.ofSeconds(10))); + consumer.setReceiveTimeout(30_000); + + context.registerBean(PollingConsumer.class, () -> consumer); + context.refresh(); + + StopWatch stopWatch = new StopWatch(); + stopWatch.start(); + context.close(); + stopWatch.stop(); + + assertThat(stopWatch.getTotalTimeMillis()).isLessThan(10_000); + } + } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/scattergather/config/ScatterGatherTests-context.xml b/spring-integration-core/src/test/java/org/springframework/integration/scattergather/config/ScatterGatherTests-context.xml index 5754fce70f5..54afb9c533b 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/scattergather/config/ScatterGatherTests-context.xml +++ b/spring-integration-core/src/test/java/org/springframework/integration/scattergather/config/ScatterGatherTests-context.xml @@ -29,7 +29,8 @@ - + diff --git a/spring-integration-core/src/test/java/org/springframework/integration/util/TestDefaultAnnotationConfiguration.java b/spring-integration-core/src/test/java/org/springframework/integration/util/TestDefaultAnnotationConfiguration.java new file mode 100644 index 00000000000..fdd8afa32fd --- /dev/null +++ b/spring-integration-core/src/test/java/org/springframework/integration/util/TestDefaultAnnotationConfiguration.java @@ -0,0 +1,25 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.util; + +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.config.EnableIntegration; + +@Configuration +@EnableIntegration +public class TestDefaultAnnotationConfiguration { +}