Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-8760 PostgresJdbcChannelMessageStore using DELETE ... RETURNING #8762

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -554,12 +554,16 @@ public void removeMessageGroup(Object groupId) {
public Message<?> pollMessageFromGroup(Object groupId) {
String key = getKey(groupId);
Message<?> polledMessage = doPollForMessage(key);
if (polledMessage != null && !doRemoveMessageFromGroup(groupId, polledMessage)) {
if (polledMessage != null && !isSingleStatementForPoll() && !doRemoveMessageFromGroup(groupId, polledMessage)) {
return null;
}
return polledMessage;
}

private boolean isSingleStatementForPoll() {
return this.channelMessageStoreQueryProvider.isSingleStatementForPoll();
}

/**
* This method executes a call to the DB to get the oldest Message in the
* MessageGroup which in the context of the {@link JdbcChannelMessageStore}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* @author Artem Bilan
* @author Gary Russell
* @author Adama Sorho
* @author Johannes Edmeier
*
* @since 2.2
*/
Expand Down Expand Up @@ -125,4 +126,14 @@ default String getDeleteMessageGroupQuery() {
*/
String getPriorityPollFromGroupQuery();

/**
* Indicates if the queries for polling are using a single statement (e.g. DELETE ... RETURNING) to
* retrieve and delete the message from the channel store.
* @return true if a single statement is used, false if a select and delete is required.
* @since 6.2
*/
default boolean isSingleStatementForPoll() {
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,69 @@ public class PostgresChannelMessageStoreQueryProvider implements ChannelMessageS

@Override
public String getPollFromGroupExcludeIdsQuery() {
return SELECT_COMMON
+ "and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) "
+ "order by CREATED_DATE, MESSAGE_SEQUENCE LIMIT 1 FOR UPDATE SKIP LOCKED";
return """
delete
from %PREFIX%CHANNEL_MESSAGE
where CTID = (select CTID
from %PREFIX%CHANNEL_MESSAGE
where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key
and %PREFIX%CHANNEL_MESSAGE.REGION = :region
and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids)
order by CREATED_DATE, MESSAGE_SEQUENCE
limit 1 for update skip locked)
returning MESSAGE_ID, MESSAGE_BYTES;
""";
}

@Override
public String getPollFromGroupQuery() {
return SELECT_COMMON +
"order by CREATED_DATE, MESSAGE_SEQUENCE LIMIT 1 FOR UPDATE SKIP LOCKED";
return """
delete
from %PREFIX%CHANNEL_MESSAGE
where CTID = (select CTID
from %PREFIX%CHANNEL_MESSAGE
where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key
and %PREFIX%CHANNEL_MESSAGE.REGION = :region
order by CREATED_DATE, MESSAGE_SEQUENCE
limit 1 for update skip locked)
returning MESSAGE_ID, MESSAGE_BYTES;
""";
}

@Override
public String getPriorityPollFromGroupExcludeIdsQuery() {
return SELECT_COMMON +
"and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) " +
"order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE " +
"LIMIT 1 FOR UPDATE SKIP LOCKED";
return """
delete
from %PREFIX%CHANNEL_MESSAGE
where CTID = (select CTID
from %PREFIX%CHANNEL_MESSAGE
where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key
and %PREFIX%CHANNEL_MESSAGE.REGION = :region
and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids)
order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE
limit 1 for update skip locked)
returning MESSAGE_ID, MESSAGE_BYTES;
""";
}

@Override
public String getPriorityPollFromGroupQuery() {
return SELECT_COMMON +
"order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE " +
"LIMIT 1 FOR UPDATE SKIP LOCKED";
return """
delete
from %PREFIX%CHANNEL_MESSAGE
where CTID = (select CTID
from %PREFIX%CHANNEL_MESSAGE
where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key
and %PREFIX%CHANNEL_MESSAGE.REGION = :region
order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE
limit 1 for update skip locked)
returning MESSAGE_ID, MESSAGE_BYTES;
""";
}

@Override
public boolean isSingleStatementForPoll() {
return true;
}

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
-- Autogenerated: do not edit this file

DROP INDEX INT_MESSAGE_IX1 ;
DROP INDEX INT_CHANNEL_MSG_DATE_IDX ;
DROP INDEX INT_CHANNEL_MSG_PRIORITY_IDX ;
DROP TABLE INT_MESSAGE ;
DROP TABLE INT_MESSAGE_GROUP ;
DROP TABLE INT_GROUP_TO_MESSAGE ;
DROP TABLE INT_LOCK ;
DROP TABLE INT_CHANNEL_MESSAGE ;
DROP TABLE INT_METADATA_STORE ;
DROP SEQUENCE INT_MESSAGE_SEQ ;
DROP INDEX IF EXISTS INT_MESSAGE_IX1 ;
DROP INDEX IF EXISTS INT_CHANNEL_MSG_DATE_IDX ;
DROP INDEX IF EXISTS INT_CHANNEL_MSG_PRIORITY_IDX ;
DROP TABLE IF EXISTS INT_MESSAGE ;
DROP TABLE IF EXISTS INT_MESSAGE_GROUP ;
DROP TABLE IF EXISTS INT_GROUP_TO_MESSAGE ;
DROP TABLE IF EXISTS INT_LOCK ;
DROP TABLE IF EXISTS INT_CHANNEL_MESSAGE ;
DROP TABLE IF EXISTS INT_METADATA_STORE ;
DROP SEQUENCE IF EXISTS INT_MESSAGE_SEQ ;
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,6 @@
public class PostgresChannelMessageTableSubscriberTests implements PostgresContainerTest {

private static final String INTEGRATION_DB_SCRIPTS = """
CREATE SEQUENCE INT_MESSAGE_SEQ START WITH 1 INCREMENT BY 1 NO CYCLE;
^^^ END OF SCRIPT ^^^

CREATE TABLE INT_CHANNEL_MESSAGE (
MESSAGE_ID CHAR(36) NOT NULL,
GROUP_KEY CHAR(36) NOT NULL,
CREATED_DATE BIGINT NOT NULL,
MESSAGE_PRIORITY BIGINT,
MESSAGE_SEQUENCE BIGINT NOT NULL DEFAULT nextval('INT_MESSAGE_SEQ'),
MESSAGE_BYTES BYTEA,
REGION VARCHAR(100) NOT NULL,
constraint INT_CHANNEL_MESSAGE_PK primary key (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);
^^^ END OF SCRIPT ^^^

CREATE FUNCTION INT_CHANNEL_MESSAGE_NOTIFY_FCT()
RETURNS TRIGGER AS
$BODY$
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 the original author or authors.
* Copyright 2022-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -34,7 +34,8 @@
@Testcontainers(disabledWithoutDocker = true)
public interface PostgresContainerTest {

PostgreSQLContainer<?> POSTGRES_CONTAINER = new PostgreSQLContainer<>("postgres:11");
PostgreSQLContainer<?> POSTGRES_CONTAINER = new PostgreSQLContainer<>("postgres:11")
.withInitScript("org/springframework/integration/jdbc/schema-postgresql.sql");

@BeforeAll
static void startContainer() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.jdbc.store.channel;

import org.apache.commons.dbcp2.BasicDataSource;
import org.springframework.context.annotation.Bean;

Check failure on line 20 in spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresJdbcChannelMessageStoreTests.java

View workflow job for this annotation

GitHub Actions / build

[Task :spring-integration-jdbc:checkstyleTest FAILED] [ImportOrder] 'org.springframework.context.annotation.Bean' should be separated from previous imports.
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.jdbc.channel.PostgresContainerTest;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

Check failure on line 27 in spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/store/channel/PostgresJdbcChannelMessageStoreTests.java

View workflow job for this annotation

GitHub Actions / build

[Task :spring-integration-jdbc:checkstyleTest FAILED] [ImportOrder] Wrong order for 'javax.sql.DataSource' import.

/**
* @author Johannes Edmeier
artembilan marked this conversation as resolved.
Show resolved Hide resolved
* @since 6.2
*/
@ContextConfiguration
public class PostgresJdbcChannelMessageStoreTests extends AbstractJdbcChannelMessageStoreTests implements PostgresContainerTest {

@Configuration
public static class Config {

@Bean
public DataSource dataSource() {
BasicDataSource dataSource = new BasicDataSource();
dataSource.setUrl(PostgresContainerTest.getJdbcUrl());
dataSource.setUsername(PostgresContainerTest.getUsername());
dataSource.setPassword(PostgresContainerTest.getPassword());
return dataSource;
}

@Bean
PlatformTransactionManager transactionManager(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}

@Bean
PostgresChannelMessageStoreQueryProvider queryProvider() {
return new PostgresChannelMessageStoreQueryProvider();
}
}
joshiste marked this conversation as resolved.
Show resolved Hide resolved

}
Loading