From 1755cc6cf40e77a917c7935143467c84b4a46d70 Mon Sep 17 00:00:00 2001 From: Johannes Edmeier Date: Wed, 11 Oct 2023 23:05:00 +0200 Subject: [PATCH 1/6] GH-8760 PostgresJdbcChannelMessageStore using `DELETE ... RETURNING` Fixes spring-projects/spring-integration#8760 * Add PostgresJdbcChannelMessageStore and tests --- .../jdbc/store/JdbcChannelMessageStore.java | 2 +- .../PostgresJdbcChannelMessageStore.java | 54 ++++++++++++ ...tgresChannelMessageStoreQueryProvider.java | 72 +++++++++++++++ .../jdbc/schema-drop-postgresql.sql | 20 ++--- .../channel/DataSource-postgres-context.xml | 7 +- .../PostgresJdbcChannelMessageStoreTests.java | 87 +++++++++++++++++++ ...gresTxTimeoutMessageStoreTests-context.xml | 7 ++ 7 files changed, 237 insertions(+), 12 deletions(-) create mode 100644 spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/PostgresJdbcChannelMessageStore.java create mode 100644 spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/DeleteReturningPostgresChannelMessageStoreQueryProvider.java create mode 100644 spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresJdbcChannelMessageStoreTests.java diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java index cde3b6c546a..5249ffaa947 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java @@ -486,7 +486,7 @@ public MessageGroup addMessageToGroup(Object groupId, final Message message) * @param input Parameter may be null * @return Returns null when the input is null otherwise the UUID as String. */ - private String getKey(Object input) { + protected String getKey(Object input) { return input == null ? null : UUIDConverter.getUUID(input).toString(); } diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/PostgresJdbcChannelMessageStore.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/PostgresJdbcChannelMessageStore.java new file mode 100644 index 00000000000..797cbbe60a9 --- /dev/null +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/PostgresJdbcChannelMessageStore.java @@ -0,0 +1,54 @@ +/* + * Copyright 2002-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.jdbc.store; + +import org.springframework.integration.jdbc.store.channel.ChannelMessageStoreQueryProvider; +import org.springframework.integration.jdbc.store.channel.DeleteReturningPostgresChannelMessageStoreQueryProvider; +import org.springframework.integration.store.MessageStore; +import org.springframework.messaging.Message; +import org.springframework.util.Assert; + +import javax.sql.DataSource; + +/** + * Implementation of {@link MessageStore} for Postgres using a single statement to poll for messages. + * + * @author Johannes Edmeier + * @since 6.2 + */ +public class PostgresJdbcChannelMessageStore extends JdbcChannelMessageStore { + + public PostgresJdbcChannelMessageStore() { + super(); + } + + public PostgresJdbcChannelMessageStore(DataSource dataSource) { + super(dataSource); + } + + @Override + public Message pollMessageFromGroup(Object groupId) { + return doPollForMessage(getKey(groupId)); + } + + @Override + public void setChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider channelMessageStoreQueryProvider) { + Assert.isInstanceOf(DeleteReturningPostgresChannelMessageStoreQueryProvider.class, channelMessageStoreQueryProvider, + "The provided channelMessageStoreQueryProvider must be an instance of DeleteReturningPostgresChannelMessageStoreQueryProvider"); + super.setChannelMessageStoreQueryProvider(channelMessageStoreQueryProvider); + } +} diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/DeleteReturningPostgresChannelMessageStoreQueryProvider.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/DeleteReturningPostgresChannelMessageStoreQueryProvider.java new file mode 100644 index 00000000000..84be9cb32b4 --- /dev/null +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/DeleteReturningPostgresChannelMessageStoreQueryProvider.java @@ -0,0 +1,72 @@ +package org.springframework.integration.jdbc.store.channel; + +/** + * @author Johannes Edmeier + * + * @since 6.2 + */ +public class DeleteReturningPostgresChannelMessageStoreQueryProvider extends PostgresChannelMessageStoreQueryProvider { + + @Override + public String getPollFromGroupExcludeIdsQuery() { + return """ + delete + from %PREFIX%CHANNEL_MESSAGE + where CTID = (select CTID + from %PREFIX%CHANNEL_MESSAGE + where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key + and %PREFIX%CHANNEL_MESSAGE.REGION = :region + and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) + order by CREATED_DATE, MESSAGE_SEQUENCE + limit 1) + returning MESSAGE_ID, MESSAGE_BYTES; + """; + } + + @Override + public String getPollFromGroupQuery() { + return """ + delete + from %PREFIX%CHANNEL_MESSAGE + where CTID = (select CTID + from %PREFIX%CHANNEL_MESSAGE + where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key + and %PREFIX%CHANNEL_MESSAGE.REGION = :region + order by CREATED_DATE, MESSAGE_SEQUENCE + limit 1) + returning MESSAGE_ID, MESSAGE_BYTES; + """; + } + + @Override + public String getPriorityPollFromGroupExcludeIdsQuery() { + return """ + delete + from %PREFIX%CHANNEL_MESSAGE + where CTID = (select CTID + from %PREFIX%CHANNEL_MESSAGE + where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key + and %PREFIX%CHANNEL_MESSAGE.REGION = :region + and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) + order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE + limit 1) + returning MESSAGE_ID, MESSAGE_BYTES; + """; + } + + @Override + public String getPriorityPollFromGroupQuery() { + return """ + delete + from %PREFIX%CHANNEL_MESSAGE + where CTID = (select CTID + from %PREFIX%CHANNEL_MESSAGE + where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key + and %PREFIX%CHANNEL_MESSAGE.REGION = :region + order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE + limit 1) + returning MESSAGE_ID, MESSAGE_BYTES; + """; + } + +} \ No newline at end of file diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-drop-postgresql.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-drop-postgresql.sql index a548cca409b..1b9f6a6f646 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-drop-postgresql.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-drop-postgresql.sql @@ -1,12 +1,12 @@ -- Autogenerated: do not edit this file -DROP INDEX INT_MESSAGE_IX1 ; -DROP INDEX INT_CHANNEL_MSG_DATE_IDX ; -DROP INDEX INT_CHANNEL_MSG_PRIORITY_IDX ; -DROP TABLE INT_MESSAGE ; -DROP TABLE INT_MESSAGE_GROUP ; -DROP TABLE INT_GROUP_TO_MESSAGE ; -DROP TABLE INT_LOCK ; -DROP TABLE INT_CHANNEL_MESSAGE ; -DROP TABLE INT_METADATA_STORE ; -DROP SEQUENCE INT_MESSAGE_SEQ ; +DROP INDEX IF EXISTS INT_MESSAGE_IX1 ; +DROP INDEX IF EXISTS INT_CHANNEL_MSG_DATE_IDX ; +DROP INDEX IF EXISTS INT_CHANNEL_MSG_PRIORITY_IDX ; +DROP TABLE IF EXISTS INT_MESSAGE ; +DROP TABLE IF EXISTS INT_MESSAGE_GROUP ; +DROP TABLE IF EXISTS INT_GROUP_TO_MESSAGE ; +DROP TABLE IF EXISTS INT_LOCK ; +DROP TABLE IF EXISTS INT_CHANNEL_MESSAGE ; +DROP TABLE IF EXISTS INT_METADATA_STORE ; +DROP SEQUENCE IF EXISTS INT_MESSAGE_SEQ ; diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/DataSource-postgres-context.xml b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/DataSource-postgres-context.xml index a81c22f67f6..2d1c8a7f099 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/DataSource-postgres-context.xml +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/DataSource-postgres-context.xml @@ -18,6 +18,11 @@ - + + + + + + diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresJdbcChannelMessageStoreTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresJdbcChannelMessageStoreTests.java new file mode 100644 index 00000000000..fc9f6a8c4fa --- /dev/null +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresJdbcChannelMessageStoreTests.java @@ -0,0 +1,87 @@ +/* + * Copyright 2002-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.jdbc.store.channel; + +import org.apache.commons.dbcp2.BasicDataSource; +import org.junit.jupiter.api.BeforeEach; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.io.ClassPathResource; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.jdbc.channel.PostgresContainerTest; +import org.springframework.integration.jdbc.store.PostgresJdbcChannelMessageStore; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.jdbc.datasource.init.DataSourceInitializer; +import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator; +import org.springframework.jdbc.datasource.init.ScriptUtils; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.transaction.PlatformTransactionManager; + +import javax.sql.DataSource; + +/** + * @author Johannes Edmeier + */ +@ContextConfiguration +public class PostgresJdbcChannelMessageStoreTests extends AbstractJdbcChannelMessageStoreTests implements PostgresContainerTest { + @BeforeEach + @Override + public void init() { + messageStore = new PostgresJdbcChannelMessageStore(dataSource); + messageStore.setRegion(REGION); + messageStore.setChannelMessageStoreQueryProvider(queryProvider); + messageStore.afterPropertiesSet(); + messageStore.removeMessageGroup("AbstractJdbcChannelMessageStoreTests"); + } + + @Configuration + @EnableIntegration + public static class Config { + + @Bean + public DataSource dataSource() { + BasicDataSource dataSource = new BasicDataSource(); + dataSource.setUrl(PostgresContainerTest.getJdbcUrl()); + dataSource.setUsername(PostgresContainerTest.getUsername()); + dataSource.setPassword(PostgresContainerTest.getPassword()); + return dataSource; + } + + @Bean + DataSourceInitializer dataSourceInitializer(DataSource dataSource) { + DataSourceInitializer dataSourceInitializer = new DataSourceInitializer(); + dataSourceInitializer.setDataSource(dataSource); + ResourceDatabasePopulator databasePopulator = + new ResourceDatabasePopulator(new ClassPathResource("org/springframework/integration/jdbc/schema-drop-postgresql.sql"), + new ClassPathResource("org/springframework/integration/jdbc/schema-postgresql.sql")); + databasePopulator.setSeparator(ScriptUtils.EOF_STATEMENT_SEPARATOR); + dataSourceInitializer.setDatabasePopulator( + databasePopulator); + return dataSourceInitializer; + } + + @Bean + PlatformTransactionManager transactionManager(DataSource dataSource) { + return new DataSourceTransactionManager(dataSource); + } + + @Bean + DeleteReturningPostgresChannelMessageStoreQueryProvider queryProvider() { + return new DeleteReturningPostgresChannelMessageStoreQueryProvider(); + } + } +} diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresTxTimeoutMessageStoreTests-context.xml b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresTxTimeoutMessageStoreTests-context.xml index 8576059b251..91222f93747 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresTxTimeoutMessageStoreTests-context.xml +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresTxTimeoutMessageStoreTests-context.xml @@ -7,5 +7,12 @@ + + + + + + + From 3ee09d8d4a1699e506c5a4513e80efc93855e82b Mon Sep 17 00:00:00 2001 From: Johannes Edmeier Date: Fri, 13 Oct 2023 23:23:03 +0200 Subject: [PATCH 2/6] refa: added isUsingSingleStatementForPoll and removed PostgresJdbcChannelMessageStore specialization --- .../jdbc/store/JdbcChannelMessageStore.java | 6 ++- .../PostgresJdbcChannelMessageStore.java | 54 ------------------- .../ChannelMessageStoreQueryProvider.java | 12 +++++ ...tgresChannelMessageStoreQueryProvider.java | 4 ++ .../PostgresJdbcChannelMessageStoreTests.java | 16 +----- ...gresTxTimeoutMessageStoreTests-context.xml | 7 --- 6 files changed, 22 insertions(+), 77 deletions(-) delete mode 100644 spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/PostgresJdbcChannelMessageStore.java diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java index 5249ffaa947..833d727da3a 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java @@ -486,7 +486,7 @@ public MessageGroup addMessageToGroup(Object groupId, final Message message) * @param input Parameter may be null * @return Returns null when the input is null otherwise the UUID as String. */ - protected String getKey(Object input) { + private String getKey(Object input) { return input == null ? null : UUIDConverter.getUUID(input).toString(); } @@ -633,6 +633,10 @@ protected Message doPollForMessage(String groupIdKey) { } private boolean doRemoveMessageFromGroup(Object groupId, Message messageToRemove) { + if (this.channelMessageStoreQueryProvider.isUsingSingleStatementForPoll()) { + return true; + } + UUID id = messageToRemove.getHeaders().getId(); int updated = this.jdbcTemplate.update( getQuery(Query.DELETE_MESSAGE, () -> this.channelMessageStoreQueryProvider.getDeleteMessageQuery()), diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/PostgresJdbcChannelMessageStore.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/PostgresJdbcChannelMessageStore.java deleted file mode 100644 index 797cbbe60a9..00000000000 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/PostgresJdbcChannelMessageStore.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2002-2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.jdbc.store; - -import org.springframework.integration.jdbc.store.channel.ChannelMessageStoreQueryProvider; -import org.springframework.integration.jdbc.store.channel.DeleteReturningPostgresChannelMessageStoreQueryProvider; -import org.springframework.integration.store.MessageStore; -import org.springframework.messaging.Message; -import org.springframework.util.Assert; - -import javax.sql.DataSource; - -/** - * Implementation of {@link MessageStore} for Postgres using a single statement to poll for messages. - * - * @author Johannes Edmeier - * @since 6.2 - */ -public class PostgresJdbcChannelMessageStore extends JdbcChannelMessageStore { - - public PostgresJdbcChannelMessageStore() { - super(); - } - - public PostgresJdbcChannelMessageStore(DataSource dataSource) { - super(dataSource); - } - - @Override - public Message pollMessageFromGroup(Object groupId) { - return doPollForMessage(getKey(groupId)); - } - - @Override - public void setChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider channelMessageStoreQueryProvider) { - Assert.isInstanceOf(DeleteReturningPostgresChannelMessageStoreQueryProvider.class, channelMessageStoreQueryProvider, - "The provided channelMessageStoreQueryProvider must be an instance of DeleteReturningPostgresChannelMessageStoreQueryProvider"); - super.setChannelMessageStoreQueryProvider(channelMessageStoreQueryProvider); - } -} diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/ChannelMessageStoreQueryProvider.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/ChannelMessageStoreQueryProvider.java index f359a0a03fc..ff8598628b5 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/ChannelMessageStoreQueryProvider.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/ChannelMessageStoreQueryProvider.java @@ -25,6 +25,7 @@ * @author Artem Bilan * @author Gary Russell * @author Adama Sorho + * @author Johannes Edmeier * * @since 2.2 */ @@ -125,4 +126,15 @@ default String getDeleteMessageGroupQuery() { */ String getPriorityPollFromGroupQuery(); + /** + * Specifies if the returned query for polling is using a single statement (e.g. DELETE ... RETURNING) to + * retrieve and delete the message from the channel store. + * + * @return false if a select and delete statement is required, true if otherwise. + * @since 6.2 + */ + default boolean isUsingSingleStatementForPoll() { + return false; + } + } diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/DeleteReturningPostgresChannelMessageStoreQueryProvider.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/DeleteReturningPostgresChannelMessageStoreQueryProvider.java index 84be9cb32b4..18f6e6b489d 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/DeleteReturningPostgresChannelMessageStoreQueryProvider.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/DeleteReturningPostgresChannelMessageStoreQueryProvider.java @@ -69,4 +69,8 @@ public String getPriorityPollFromGroupQuery() { """; } + @Override + public boolean isUsingSingleStatementForPoll() { + return true; + } } \ No newline at end of file diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresJdbcChannelMessageStoreTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresJdbcChannelMessageStoreTests.java index fc9f6a8c4fa..096cc9acbe0 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresJdbcChannelMessageStoreTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresJdbcChannelMessageStoreTests.java @@ -17,13 +17,10 @@ package org.springframework.integration.jdbc.store.channel; import org.apache.commons.dbcp2.BasicDataSource; -import org.junit.jupiter.api.BeforeEach; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; -import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.jdbc.channel.PostgresContainerTest; -import org.springframework.integration.jdbc.store.PostgresJdbcChannelMessageStore; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.jdbc.datasource.init.DataSourceInitializer; import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator; @@ -38,18 +35,8 @@ */ @ContextConfiguration public class PostgresJdbcChannelMessageStoreTests extends AbstractJdbcChannelMessageStoreTests implements PostgresContainerTest { - @BeforeEach - @Override - public void init() { - messageStore = new PostgresJdbcChannelMessageStore(dataSource); - messageStore.setRegion(REGION); - messageStore.setChannelMessageStoreQueryProvider(queryProvider); - messageStore.afterPropertiesSet(); - messageStore.removeMessageGroup("AbstractJdbcChannelMessageStoreTests"); - } @Configuration - @EnableIntegration public static class Config { @Bean @@ -66,8 +53,7 @@ DataSourceInitializer dataSourceInitializer(DataSource dataSource) { DataSourceInitializer dataSourceInitializer = new DataSourceInitializer(); dataSourceInitializer.setDataSource(dataSource); ResourceDatabasePopulator databasePopulator = - new ResourceDatabasePopulator(new ClassPathResource("org/springframework/integration/jdbc/schema-drop-postgresql.sql"), - new ClassPathResource("org/springframework/integration/jdbc/schema-postgresql.sql")); + new ResourceDatabasePopulator(new ClassPathResource("org/springframework/integration/jdbc/schema-postgresql.sql")); databasePopulator.setSeparator(ScriptUtils.EOF_STATEMENT_SEPARATOR); dataSourceInitializer.setDatabasePopulator( databasePopulator); diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresTxTimeoutMessageStoreTests-context.xml b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresTxTimeoutMessageStoreTests-context.xml index 91222f93747..8576059b251 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresTxTimeoutMessageStoreTests-context.xml +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresTxTimeoutMessageStoreTests-context.xml @@ -7,12 +7,5 @@ - - - - - - - From eebe5666116e693c917ec5bb10e1f91c2d45ec73 Mon Sep 17 00:00:00 2001 From: Johannes Edmeier Date: Sat, 14 Oct 2023 18:04:21 +0200 Subject: [PATCH 3/6] refa: execute postgres init scripts to PostgresContainerTest --- ...stgresChannelMessageTableSubscriberTests.java | 15 --------------- .../jdbc/channel/PostgresContainerTest.java | 5 +++-- .../PostgresJdbcChannelMessageStoreTests.java | 16 ---------------- 3 files changed, 3 insertions(+), 33 deletions(-) diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java index d443946fa02..911a51b9d2b 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java @@ -67,21 +67,6 @@ public class PostgresChannelMessageTableSubscriberTests implements PostgresContainerTest { private static final String INTEGRATION_DB_SCRIPTS = """ - CREATE SEQUENCE INT_MESSAGE_SEQ START WITH 1 INCREMENT BY 1 NO CYCLE; - ^^^ END OF SCRIPT ^^^ - - CREATE TABLE INT_CHANNEL_MESSAGE ( - MESSAGE_ID CHAR(36) NOT NULL, - GROUP_KEY CHAR(36) NOT NULL, - CREATED_DATE BIGINT NOT NULL, - MESSAGE_PRIORITY BIGINT, - MESSAGE_SEQUENCE BIGINT NOT NULL DEFAULT nextval('INT_MESSAGE_SEQ'), - MESSAGE_BYTES BYTEA, - REGION VARCHAR(100) NOT NULL, - constraint INT_CHANNEL_MESSAGE_PK primary key (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE) - ); - ^^^ END OF SCRIPT ^^^ - CREATE FUNCTION INT_CHANNEL_MESSAGE_NOTIFY_FCT() RETURNS TRIGGER AS $BODY$ diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresContainerTest.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresContainerTest.java index 4ae45e89237..abe117dcfcc 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresContainerTest.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresContainerTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 the original author or authors. + * Copyright 2022-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,7 +34,8 @@ @Testcontainers(disabledWithoutDocker = true) public interface PostgresContainerTest { - PostgreSQLContainer POSTGRES_CONTAINER = new PostgreSQLContainer<>("postgres:11"); + PostgreSQLContainer POSTGRES_CONTAINER = new PostgreSQLContainer<>("postgres:11") + .withInitScript("org/springframework/integration/jdbc/schema-postgresql.sql"); @BeforeAll static void startContainer() { diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresJdbcChannelMessageStoreTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresJdbcChannelMessageStoreTests.java index 096cc9acbe0..6c610873017 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresJdbcChannelMessageStoreTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresJdbcChannelMessageStoreTests.java @@ -19,12 +19,8 @@ import org.apache.commons.dbcp2.BasicDataSource; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.core.io.ClassPathResource; import org.springframework.integration.jdbc.channel.PostgresContainerTest; import org.springframework.jdbc.datasource.DataSourceTransactionManager; -import org.springframework.jdbc.datasource.init.DataSourceInitializer; -import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator; -import org.springframework.jdbc.datasource.init.ScriptUtils; import org.springframework.test.context.ContextConfiguration; import org.springframework.transaction.PlatformTransactionManager; @@ -48,18 +44,6 @@ public DataSource dataSource() { return dataSource; } - @Bean - DataSourceInitializer dataSourceInitializer(DataSource dataSource) { - DataSourceInitializer dataSourceInitializer = new DataSourceInitializer(); - dataSourceInitializer.setDataSource(dataSource); - ResourceDatabasePopulator databasePopulator = - new ResourceDatabasePopulator(new ClassPathResource("org/springframework/integration/jdbc/schema-postgresql.sql")); - databasePopulator.setSeparator(ScriptUtils.EOF_STATEMENT_SEPARATOR); - dataSourceInitializer.setDatabasePopulator( - databasePopulator); - return dataSourceInitializer; - } - @Bean PlatformTransactionManager transactionManager(DataSource dataSource) { return new DataSourceTransactionManager(dataSource); From 0944380946ac79d2d0fbed14f89d1e230a877aba Mon Sep 17 00:00:00 2001 From: Johannes Edmeier Date: Mon, 16 Oct 2023 20:55:05 +0200 Subject: [PATCH 4/6] fix formatting, javadocs and stuff --- .../jdbc/store/JdbcChannelMessageStore.java | 2 +- .../ChannelMessageStoreQueryProvider.java | 7 +- ...tgresChannelMessageStoreQueryProvider.java | 76 ------------------- ...tgresChannelMessageStoreQueryProvider.java | 63 ++++++++++++--- .../channel/DataSource-postgres-context.xml | 7 +- .../PostgresJdbcChannelMessageStoreTests.java | 5 +- 6 files changed, 59 insertions(+), 101 deletions(-) delete mode 100644 spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/DeleteReturningPostgresChannelMessageStoreQueryProvider.java diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java index 833d727da3a..5d0af2c4eb1 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java @@ -633,7 +633,7 @@ protected Message doPollForMessage(String groupIdKey) { } private boolean doRemoveMessageFromGroup(Object groupId, Message messageToRemove) { - if (this.channelMessageStoreQueryProvider.isUsingSingleStatementForPoll()) { + if (this.channelMessageStoreQueryProvider.isSingleStatementForPoll()) { return true; } diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/ChannelMessageStoreQueryProvider.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/ChannelMessageStoreQueryProvider.java index ff8598628b5..f14fe033f35 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/ChannelMessageStoreQueryProvider.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/ChannelMessageStoreQueryProvider.java @@ -127,13 +127,12 @@ default String getDeleteMessageGroupQuery() { String getPriorityPollFromGroupQuery(); /** - * Specifies if the returned query for polling is using a single statement (e.g. DELETE ... RETURNING) to + * Indicates if the queries for polling are using a single statement (e.g. DELETE ... RETURNING) to * retrieve and delete the message from the channel store. - * - * @return false if a select and delete statement is required, true if otherwise. + * @return true if a single statement is used, false if a select and delete is required. * @since 6.2 */ - default boolean isUsingSingleStatementForPoll() { + default boolean isSingleStatementForPoll() { return false; } diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/DeleteReturningPostgresChannelMessageStoreQueryProvider.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/DeleteReturningPostgresChannelMessageStoreQueryProvider.java deleted file mode 100644 index 18f6e6b489d..00000000000 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/DeleteReturningPostgresChannelMessageStoreQueryProvider.java +++ /dev/null @@ -1,76 +0,0 @@ -package org.springframework.integration.jdbc.store.channel; - -/** - * @author Johannes Edmeier - * - * @since 6.2 - */ -public class DeleteReturningPostgresChannelMessageStoreQueryProvider extends PostgresChannelMessageStoreQueryProvider { - - @Override - public String getPollFromGroupExcludeIdsQuery() { - return """ - delete - from %PREFIX%CHANNEL_MESSAGE - where CTID = (select CTID - from %PREFIX%CHANNEL_MESSAGE - where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key - and %PREFIX%CHANNEL_MESSAGE.REGION = :region - and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) - order by CREATED_DATE, MESSAGE_SEQUENCE - limit 1) - returning MESSAGE_ID, MESSAGE_BYTES; - """; - } - - @Override - public String getPollFromGroupQuery() { - return """ - delete - from %PREFIX%CHANNEL_MESSAGE - where CTID = (select CTID - from %PREFIX%CHANNEL_MESSAGE - where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key - and %PREFIX%CHANNEL_MESSAGE.REGION = :region - order by CREATED_DATE, MESSAGE_SEQUENCE - limit 1) - returning MESSAGE_ID, MESSAGE_BYTES; - """; - } - - @Override - public String getPriorityPollFromGroupExcludeIdsQuery() { - return """ - delete - from %PREFIX%CHANNEL_MESSAGE - where CTID = (select CTID - from %PREFIX%CHANNEL_MESSAGE - where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key - and %PREFIX%CHANNEL_MESSAGE.REGION = :region - and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) - order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE - limit 1) - returning MESSAGE_ID, MESSAGE_BYTES; - """; - } - - @Override - public String getPriorityPollFromGroupQuery() { - return """ - delete - from %PREFIX%CHANNEL_MESSAGE - where CTID = (select CTID - from %PREFIX%CHANNEL_MESSAGE - where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key - and %PREFIX%CHANNEL_MESSAGE.REGION = :region - order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE - limit 1) - returning MESSAGE_ID, MESSAGE_BYTES; - """; - } - - @Override - public boolean isUsingSingleStatementForPoll() { - return true; - } -} \ No newline at end of file diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/PostgresChannelMessageStoreQueryProvider.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/PostgresChannelMessageStoreQueryProvider.java index 26551d60e5a..4d0a9910b86 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/PostgresChannelMessageStoreQueryProvider.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/PostgresChannelMessageStoreQueryProvider.java @@ -27,30 +27,69 @@ public class PostgresChannelMessageStoreQueryProvider implements ChannelMessageS @Override public String getPollFromGroupExcludeIdsQuery() { - return SELECT_COMMON - + "and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) " - + "order by CREATED_DATE, MESSAGE_SEQUENCE LIMIT 1 FOR UPDATE SKIP LOCKED"; + return """ + delete + from %PREFIX%CHANNEL_MESSAGE + where CTID = (select CTID + from %PREFIX%CHANNEL_MESSAGE + where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key + and %PREFIX%CHANNEL_MESSAGE.REGION = :region + and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) + order by CREATED_DATE, MESSAGE_SEQUENCE + limit 1) + returning MESSAGE_ID, MESSAGE_BYTES; + """; } @Override public String getPollFromGroupQuery() { - return SELECT_COMMON + - "order by CREATED_DATE, MESSAGE_SEQUENCE LIMIT 1 FOR UPDATE SKIP LOCKED"; + return """ + delete + from %PREFIX%CHANNEL_MESSAGE + where CTID = (select CTID + from %PREFIX%CHANNEL_MESSAGE + where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key + and %PREFIX%CHANNEL_MESSAGE.REGION = :region + order by CREATED_DATE, MESSAGE_SEQUENCE + limit 1) + returning MESSAGE_ID, MESSAGE_BYTES; + """; } @Override public String getPriorityPollFromGroupExcludeIdsQuery() { - return SELECT_COMMON + - "and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) " + - "order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE " + - "LIMIT 1 FOR UPDATE SKIP LOCKED"; + return """ + delete + from %PREFIX%CHANNEL_MESSAGE + where CTID = (select CTID + from %PREFIX%CHANNEL_MESSAGE + where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key + and %PREFIX%CHANNEL_MESSAGE.REGION = :region + and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) + order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE + limit 1) + returning MESSAGE_ID, MESSAGE_BYTES; + """; } @Override public String getPriorityPollFromGroupQuery() { - return SELECT_COMMON + - "order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE " + - "LIMIT 1 FOR UPDATE SKIP LOCKED"; + return """ + delete + from %PREFIX%CHANNEL_MESSAGE + where CTID = (select CTID + from %PREFIX%CHANNEL_MESSAGE + where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key + and %PREFIX%CHANNEL_MESSAGE.REGION = :region + order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE + limit 1) + returning MESSAGE_ID, MESSAGE_BYTES; + """; + } + + @Override + public boolean isSingleStatementForPoll() { + return true; } } diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/DataSource-postgres-context.xml b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/DataSource-postgres-context.xml index 2d1c8a7f099..a81c22f67f6 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/DataSource-postgres-context.xml +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/DataSource-postgres-context.xml @@ -18,11 +18,6 @@ - - - - - - + diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresJdbcChannelMessageStoreTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresJdbcChannelMessageStoreTests.java index 6c610873017..f7cc0a2ce2c 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresJdbcChannelMessageStoreTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresJdbcChannelMessageStoreTests.java @@ -50,8 +50,9 @@ PlatformTransactionManager transactionManager(DataSource dataSource) { } @Bean - DeleteReturningPostgresChannelMessageStoreQueryProvider queryProvider() { - return new DeleteReturningPostgresChannelMessageStoreQueryProvider(); + PostgresChannelMessageStoreQueryProvider queryProvider() { + return new PostgresChannelMessageStoreQueryProvider(); } } + } From e8d0eb629e81a553b0ed8b3bc1e1d3805c5d7170 Mon Sep 17 00:00:00 2001 From: Johannes Edmeier Date: Tue, 17 Oct 2023 16:26:59 +0200 Subject: [PATCH 5/6] use skip locked and move skip-remove condition --- .../jdbc/store/JdbcChannelMessageStore.java | 10 +++++----- .../PostgresChannelMessageStoreQueryProvider.java | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java index 5d0af2c4eb1..b2a208ff528 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java @@ -554,12 +554,16 @@ public void removeMessageGroup(Object groupId) { public Message pollMessageFromGroup(Object groupId) { String key = getKey(groupId); Message polledMessage = doPollForMessage(key); - if (polledMessage != null && !doRemoveMessageFromGroup(groupId, polledMessage)) { + if (polledMessage != null && !isSingleStatementForPoll() && !doRemoveMessageFromGroup(groupId, polledMessage)) { return null; } return polledMessage; } + private boolean isSingleStatementForPoll() { + return this.channelMessageStoreQueryProvider.isSingleStatementForPoll(); + } + /** * This method executes a call to the DB to get the oldest Message in the * MessageGroup which in the context of the {@link JdbcChannelMessageStore} @@ -633,10 +637,6 @@ protected Message doPollForMessage(String groupIdKey) { } private boolean doRemoveMessageFromGroup(Object groupId, Message messageToRemove) { - if (this.channelMessageStoreQueryProvider.isSingleStatementForPoll()) { - return true; - } - UUID id = messageToRemove.getHeaders().getId(); int updated = this.jdbcTemplate.update( getQuery(Query.DELETE_MESSAGE, () -> this.channelMessageStoreQueryProvider.getDeleteMessageQuery()), diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/PostgresChannelMessageStoreQueryProvider.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/PostgresChannelMessageStoreQueryProvider.java index 4d0a9910b86..e3cae7f5478 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/PostgresChannelMessageStoreQueryProvider.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/PostgresChannelMessageStoreQueryProvider.java @@ -36,7 +36,7 @@ public String getPollFromGroupExcludeIdsQuery() { and %PREFIX%CHANNEL_MESSAGE.REGION = :region and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) order by CREATED_DATE, MESSAGE_SEQUENCE - limit 1) + limit 1 for update skip locked) returning MESSAGE_ID, MESSAGE_BYTES; """; } @@ -51,7 +51,7 @@ public String getPollFromGroupQuery() { where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region order by CREATED_DATE, MESSAGE_SEQUENCE - limit 1) + limit 1 for update skip locked) returning MESSAGE_ID, MESSAGE_BYTES; """; } @@ -67,7 +67,7 @@ public String getPriorityPollFromGroupExcludeIdsQuery() { and %PREFIX%CHANNEL_MESSAGE.REGION = :region and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE - limit 1) + limit 1 for update skip locked) returning MESSAGE_ID, MESSAGE_BYTES; """; } @@ -82,7 +82,7 @@ public String getPriorityPollFromGroupQuery() { where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE - limit 1) + limit 1 for update skip locked) returning MESSAGE_ID, MESSAGE_BYTES; """; } From 8662b3915dfa71f2bcaa1e069318b6d8d09c6b5e Mon Sep 17 00:00:00 2001 From: Johannes Edmeier Date: Tue, 17 Oct 2023 16:28:00 +0200 Subject: [PATCH 6/6] add missing since --- .../jdbc/store/channel/PostgresJdbcChannelMessageStoreTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresJdbcChannelMessageStoreTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresJdbcChannelMessageStoreTests.java index f7cc0a2ce2c..551037662af 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresJdbcChannelMessageStoreTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresJdbcChannelMessageStoreTests.java @@ -28,6 +28,7 @@ /** * @author Johannes Edmeier + * @since 6.2 */ @ContextConfiguration public class PostgresJdbcChannelMessageStoreTests extends AbstractJdbcChannelMessageStoreTests implements PostgresContainerTest {