diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java index cde3b6c546a..b2a208ff528 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcChannelMessageStore.java @@ -554,12 +554,16 @@ public void removeMessageGroup(Object groupId) { public Message pollMessageFromGroup(Object groupId) { String key = getKey(groupId); Message polledMessage = doPollForMessage(key); - if (polledMessage != null && !doRemoveMessageFromGroup(groupId, polledMessage)) { + if (polledMessage != null && !isSingleStatementForPoll() && !doRemoveMessageFromGroup(groupId, polledMessage)) { return null; } return polledMessage; } + private boolean isSingleStatementForPoll() { + return this.channelMessageStoreQueryProvider.isSingleStatementForPoll(); + } + /** * This method executes a call to the DB to get the oldest Message in the * MessageGroup which in the context of the {@link JdbcChannelMessageStore} diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/ChannelMessageStoreQueryProvider.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/ChannelMessageStoreQueryProvider.java index f359a0a03fc..f14fe033f35 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/ChannelMessageStoreQueryProvider.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/ChannelMessageStoreQueryProvider.java @@ -25,6 +25,7 @@ * @author Artem Bilan * @author Gary Russell * @author Adama Sorho + * @author Johannes Edmeier * * @since 2.2 */ @@ -125,4 +126,14 @@ default String getDeleteMessageGroupQuery() { */ String getPriorityPollFromGroupQuery(); + /** + * Indicates if the queries for polling are using a single statement (e.g. DELETE ... RETURNING) to + * retrieve and delete the message from the channel store. + * @return true if a single statement is used, false if a select and delete is required. + * @since 6.2 + */ + default boolean isSingleStatementForPoll() { + return false; + } + } diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/PostgresChannelMessageStoreQueryProvider.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/PostgresChannelMessageStoreQueryProvider.java index 26551d60e5a..e3cae7f5478 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/PostgresChannelMessageStoreQueryProvider.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/channel/PostgresChannelMessageStoreQueryProvider.java @@ -27,30 +27,69 @@ public class PostgresChannelMessageStoreQueryProvider implements ChannelMessageS @Override public String getPollFromGroupExcludeIdsQuery() { - return SELECT_COMMON - + "and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) " - + "order by CREATED_DATE, MESSAGE_SEQUENCE LIMIT 1 FOR UPDATE SKIP LOCKED"; + return """ + delete + from %PREFIX%CHANNEL_MESSAGE + where CTID = (select CTID + from %PREFIX%CHANNEL_MESSAGE + where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key + and %PREFIX%CHANNEL_MESSAGE.REGION = :region + and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) + order by CREATED_DATE, MESSAGE_SEQUENCE + limit 1 for update skip locked) + returning MESSAGE_ID, MESSAGE_BYTES; + """; } @Override public String getPollFromGroupQuery() { - return SELECT_COMMON + - "order by CREATED_DATE, MESSAGE_SEQUENCE LIMIT 1 FOR UPDATE SKIP LOCKED"; + return """ + delete + from %PREFIX%CHANNEL_MESSAGE + where CTID = (select CTID + from %PREFIX%CHANNEL_MESSAGE + where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key + and %PREFIX%CHANNEL_MESSAGE.REGION = :region + order by CREATED_DATE, MESSAGE_SEQUENCE + limit 1 for update skip locked) + returning MESSAGE_ID, MESSAGE_BYTES; + """; } @Override public String getPriorityPollFromGroupExcludeIdsQuery() { - return SELECT_COMMON + - "and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) " + - "order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE " + - "LIMIT 1 FOR UPDATE SKIP LOCKED"; + return """ + delete + from %PREFIX%CHANNEL_MESSAGE + where CTID = (select CTID + from %PREFIX%CHANNEL_MESSAGE + where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key + and %PREFIX%CHANNEL_MESSAGE.REGION = :region + and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) + order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE + limit 1 for update skip locked) + returning MESSAGE_ID, MESSAGE_BYTES; + """; } @Override public String getPriorityPollFromGroupQuery() { - return SELECT_COMMON + - "order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE " + - "LIMIT 1 FOR UPDATE SKIP LOCKED"; + return """ + delete + from %PREFIX%CHANNEL_MESSAGE + where CTID = (select CTID + from %PREFIX%CHANNEL_MESSAGE + where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key + and %PREFIX%CHANNEL_MESSAGE.REGION = :region + order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE + limit 1 for update skip locked) + returning MESSAGE_ID, MESSAGE_BYTES; + """; + } + + @Override + public boolean isSingleStatementForPoll() { + return true; } } diff --git a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-drop-postgresql.sql b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-drop-postgresql.sql index a548cca409b..1b9f6a6f646 100644 --- a/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-drop-postgresql.sql +++ b/spring-integration-jdbc/src/main/resources/org/springframework/integration/jdbc/schema-drop-postgresql.sql @@ -1,12 +1,12 @@ -- Autogenerated: do not edit this file -DROP INDEX INT_MESSAGE_IX1 ; -DROP INDEX INT_CHANNEL_MSG_DATE_IDX ; -DROP INDEX INT_CHANNEL_MSG_PRIORITY_IDX ; -DROP TABLE INT_MESSAGE ; -DROP TABLE INT_MESSAGE_GROUP ; -DROP TABLE INT_GROUP_TO_MESSAGE ; -DROP TABLE INT_LOCK ; -DROP TABLE INT_CHANNEL_MESSAGE ; -DROP TABLE INT_METADATA_STORE ; -DROP SEQUENCE INT_MESSAGE_SEQ ; +DROP INDEX IF EXISTS INT_MESSAGE_IX1 ; +DROP INDEX IF EXISTS INT_CHANNEL_MSG_DATE_IDX ; +DROP INDEX IF EXISTS INT_CHANNEL_MSG_PRIORITY_IDX ; +DROP TABLE IF EXISTS INT_MESSAGE ; +DROP TABLE IF EXISTS INT_MESSAGE_GROUP ; +DROP TABLE IF EXISTS INT_GROUP_TO_MESSAGE ; +DROP TABLE IF EXISTS INT_LOCK ; +DROP TABLE IF EXISTS INT_CHANNEL_MESSAGE ; +DROP TABLE IF EXISTS INT_METADATA_STORE ; +DROP SEQUENCE IF EXISTS INT_MESSAGE_SEQ ; diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java index d443946fa02..911a51b9d2b 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java @@ -67,21 +67,6 @@ public class PostgresChannelMessageTableSubscriberTests implements PostgresContainerTest { private static final String INTEGRATION_DB_SCRIPTS = """ - CREATE SEQUENCE INT_MESSAGE_SEQ START WITH 1 INCREMENT BY 1 NO CYCLE; - ^^^ END OF SCRIPT ^^^ - - CREATE TABLE INT_CHANNEL_MESSAGE ( - MESSAGE_ID CHAR(36) NOT NULL, - GROUP_KEY CHAR(36) NOT NULL, - CREATED_DATE BIGINT NOT NULL, - MESSAGE_PRIORITY BIGINT, - MESSAGE_SEQUENCE BIGINT NOT NULL DEFAULT nextval('INT_MESSAGE_SEQ'), - MESSAGE_BYTES BYTEA, - REGION VARCHAR(100) NOT NULL, - constraint INT_CHANNEL_MESSAGE_PK primary key (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE) - ); - ^^^ END OF SCRIPT ^^^ - CREATE FUNCTION INT_CHANNEL_MESSAGE_NOTIFY_FCT() RETURNS TRIGGER AS $BODY$ diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresContainerTest.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresContainerTest.java index 4ae45e89237..abe117dcfcc 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresContainerTest.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresContainerTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 the original author or authors. + * Copyright 2022-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,7 +34,8 @@ @Testcontainers(disabledWithoutDocker = true) public interface PostgresContainerTest { - PostgreSQLContainer POSTGRES_CONTAINER = new PostgreSQLContainer<>("postgres:11"); + PostgreSQLContainer POSTGRES_CONTAINER = new PostgreSQLContainer<>("postgres:11") + .withInitScript("org/springframework/integration/jdbc/schema-postgresql.sql"); @BeforeAll static void startContainer() { diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresJdbcChannelMessageStoreTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresJdbcChannelMessageStoreTests.java new file mode 100644 index 00000000000..551037662af --- /dev/null +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresJdbcChannelMessageStoreTests.java @@ -0,0 +1,59 @@ +/* + * Copyright 2002-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.jdbc.store.channel; + +import org.apache.commons.dbcp2.BasicDataSource; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.jdbc.channel.PostgresContainerTest; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.transaction.PlatformTransactionManager; + +import javax.sql.DataSource; + +/** + * @author Johannes Edmeier + * @since 6.2 + */ +@ContextConfiguration +public class PostgresJdbcChannelMessageStoreTests extends AbstractJdbcChannelMessageStoreTests implements PostgresContainerTest { + + @Configuration + public static class Config { + + @Bean + public DataSource dataSource() { + BasicDataSource dataSource = new BasicDataSource(); + dataSource.setUrl(PostgresContainerTest.getJdbcUrl()); + dataSource.setUsername(PostgresContainerTest.getUsername()); + dataSource.setPassword(PostgresContainerTest.getPassword()); + return dataSource; + } + + @Bean + PlatformTransactionManager transactionManager(DataSource dataSource) { + return new DataSourceTransactionManager(dataSource); + } + + @Bean + PostgresChannelMessageStoreQueryProvider queryProvider() { + return new PostgresChannelMessageStoreQueryProvider(); + } + } + +}