diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/MessageStore.java b/spring-integration-core/src/main/java/org/springframework/integration/store/MessageStore.java index 503e05b1fd8..1e8060947ff 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/MessageStore.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/MessageStore.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -62,11 +62,12 @@ public interface MessageStore { Message addMessage(Message message); /** - * Remove the Message with the given id from the MessageStore, if present, and return it. If no Message with that id - * is present in the store, this will return null. - * - * @param id THe message identifier. - * @return The message. + * Remove the Message with the given id from the MessageStore, if present, and return it. + * If no Message with that id is present in the store, this will return {@code null}. + * If this method is implemented on a {@link MessageGroupStore}, + * the message is removed from the store only if no groups holding this message. + * @param id the message identifier. + * @return the message (if any). */ Message removeMessage(UUID id); diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcMessageStore.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcMessageStore.java index 26f1fb8c148..2f9b2150b26 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcMessageStore.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcMessageStore.java @@ -172,6 +172,9 @@ SELECT COUNT(MESSAGE_ID) DELETE_MESSAGE(""" DELETE from %PREFIX%MESSAGE where MESSAGE_ID=? and REGION=? + and MESSAGE_ID not in ( + SELECT MESSAGE_ID from %PREFIX%GROUP_TO_MESSAGE + where MESSAGE_ID=? and REGION = ?) """), CREATE_MESSAGE(""" @@ -199,7 +202,12 @@ SELECT COUNT(GROUP_KEY) DELETE_MESSAGES_FROM_GROUP(""" DELETE from %PREFIX%MESSAGE - where MESSAGE_ID in (SELECT MESSAGE_ID from %PREFIX%GROUP_TO_MESSAGE where GROUP_KEY = ? and REGION = ?) + where MESSAGE_ID in ( + SELECT MESSAGE_ID from %PREFIX%GROUP_TO_MESSAGE where GROUP_KEY = ? and REGION = ? + and MESSAGE_ID not in ( + SELECT MESSAGE_ID from %PREFIX%GROUP_TO_MESSAGE + where GROUP_KEY != ? and REGION = ?) + ) and REGION = ? """), @@ -384,7 +392,8 @@ public Message removeMessage(UUID id) { if (message == null) { return null; } - int updated = this.jdbcTemplate.update(getQuery(Query.DELETE_MESSAGE), getKey(id), this.region); + String key = getKey(id); + int updated = this.jdbcTemplate.update(getQuery(Query.DELETE_MESSAGE), key, this.region, key, this.region); if (updated != 0) { return message; } @@ -575,15 +584,18 @@ public void removeMessagesFromGroup(Object groupId, Collection> messa (ps, messageToRemove) -> { ps.setString(1, groupKey); // NOSONAR - magic number ps.setString(2, getKey(messageToRemove.getHeaders().getId())); // NOSONAR - magic number - ps.setString(3, JdbcMessageStore.this.region); // NOSONAR - magic number + ps.setString(3, this.region); // NOSONAR - magic number }); this.jdbcTemplate.batchUpdate(getQuery(Query.DELETE_MESSAGE), messages, getRemoveBatchSize(), (ps, messageToRemove) -> { - ps.setString(1, getKey(messageToRemove.getHeaders().getId())); // NOSONAR - magic number - ps.setString(2, JdbcMessageStore.this.region); // NOSONAR - magic number + String key = getKey(messageToRemove.getHeaders().getId()); + ps.setString(1, key); // NOSONAR - magic number + ps.setString(2, this.region); // NOSONAR - magic number + ps.setString(3, key); // NOSONAR - magic number + ps.setString(4, this.region); // NOSONAR - magic number }); updateMessageGroup(groupKey); @@ -593,7 +605,8 @@ public void removeMessagesFromGroup(Object groupId, Collection> messa public void removeMessageGroup(Object groupId) { String groupKey = getKey(groupId); - this.jdbcTemplate.update(getQuery(Query.DELETE_MESSAGES_FROM_GROUP), groupKey, this.region, this.region); + this.jdbcTemplate.update(getQuery(Query.DELETE_MESSAGES_FROM_GROUP), + groupKey, this.region, groupKey, this.region, this.region); if (logger.isDebugEnabled()) { logger.debug("Removing relationships for the group with group key=" + groupKey); diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/mysql/MySqlJdbcMessageStoreTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/mysql/MySqlJdbcMessageStoreTests.java index 979ad02c7bb..1f8f5d67eff 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/mysql/MySqlJdbcMessageStoreTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/mysql/MySqlJdbcMessageStoreTests.java @@ -497,6 +497,36 @@ public void testMessageGroupCondition() { assertThat(this.messageStore.getMessageGroup(groupId).getCondition()).isEqualTo("testCondition"); } + @Test + public void sameMessageInTwoGroupsNotRemovedByFirstGroup() { + GenericMessage testMessage = new GenericMessage<>("test data"); + + messageStore.addMessageToGroup("1", testMessage); + messageStore.addMessageToGroup("2", testMessage); + + messageStore.removeMessageGroup("1"); + + assertThat(messageStore.getMessageCount()).isEqualTo(1); + + messageStore.removeMessageGroup("2"); + + assertThat(messageStore.getMessageCount()).isEqualTo(0); + } + + @Test + public void removeMessagesFromGroupDontRemoveSameMessageInOtherGroup() { + GenericMessage testMessage = new GenericMessage<>("test data"); + + messageStore.addMessageToGroup("1", testMessage); + messageStore.addMessageToGroup("2", testMessage); + + messageStore.removeMessagesFromGroup("1", testMessage); + + assertThat(messageStore.getMessageCount()).isEqualTo(1); + assertThat(messageStore.messageGroupSize("1")).isEqualTo(0); + assertThat(messageStore.messageGroupSize("2")).isEqualTo(1); + } + @Configuration public static class Config {