Skip to content

Commit

Permalink
AbstractMessageGroupStore add lock when operate group metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
NaccOll committed Dec 9, 2024
1 parent dd83235 commit af5a737
Show file tree
Hide file tree
Showing 11 changed files with 244 additions and 316 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,12 @@
* @author Gary Russell
* @author Artem Bilan
* @author Ngoc Nhan
* @author Youbin Wu
*
* @since 2.1
*/
public abstract class AbstractKeyValueMessageStore extends AbstractMessageGroupStore implements MessageStore {

private static final String GROUP_ID_MUST_NOT_BE_NULL = "'groupId' must not be null";

protected static final String MESSAGE_KEY_PREFIX = "MESSAGE_";

protected static final String MESSAGE_GROUP_KEY_PREFIX = "GROUP_OF_MESSAGES_";
Expand Down Expand Up @@ -206,7 +205,7 @@ public MessageGroupMetadata getGroupMetadata(Object groupId) {
}

@Override
public void addMessagesToGroup(Object groupId, Message<?>... messages) {
public void addMessagesToGroupInner(Object groupId, Message<?>... messages) {
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
Assert.notNull(messages, "'messages' must not be null");

Expand Down Expand Up @@ -240,7 +239,7 @@ public void addMessagesToGroup(Object groupId, Message<?>... messages) {
}

@Override
public void removeMessagesFromGroup(Object groupId, Collection<Message<?>> messages) {
public void removeMessagesFromGroupInner(Object groupId, Collection<Message<?>> messages) {
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
Assert.notNull(messages, "'messages' must not be null");

Expand Down Expand Up @@ -283,7 +282,7 @@ public Message<?> getMessageFromGroup(Object groupId, UUID messageId) {
}

@Override
public boolean removeMessageFromGroupById(Object groupId, UUID messageId) {
public boolean removeMessageFromGroupByIdInner(Object groupId, UUID messageId) {
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
Assert.notNull(messageId, "'messageId' must not be null");
Object mgm = doRetrieve(this.groupPrefix + groupId);
Expand All @@ -305,7 +304,7 @@ public boolean removeMessageFromGroupById(Object groupId, UUID messageId) {
}

@Override
public void completeGroup(Object groupId) {
public void completeGroupInner(Object groupId) {
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
MessageGroupMetadata metadata = getGroupMetadata(groupId);
if (metadata != null) {
Expand All @@ -319,7 +318,7 @@ public void completeGroup(Object groupId) {
* Remove the MessageGroup with the provided group ID.
*/
@Override
public void removeMessageGroup(Object groupId) {
public void removeMessageGroupInner(Object groupId) {
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
Object mgm = doRemove(this.groupPrefix + groupId);
if (mgm != null) {
Expand All @@ -337,7 +336,7 @@ public void removeMessageGroup(Object groupId) {
}

@Override
public void setGroupCondition(Object groupId, String condition) {
public void setGroupConditionInner(Object groupId, String condition) {
MessageGroupMetadata metadata = getGroupMetadata(groupId);
if (metadata != null) {
metadata.setCondition(condition);
Expand All @@ -346,7 +345,7 @@ public void setGroupCondition(Object groupId, String condition) {
}

@Override
public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) {
public void setLastReleasedSequenceNumberForGroupInner(Object groupId, int sequenceNumber) {
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
MessageGroupMetadata metadata = getGroupMetadata(groupId);
if (metadata == null) {
Expand All @@ -359,7 +358,7 @@ public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNu
}

@Override
public Message<?> pollMessageFromGroup(Object groupId) {
public Message<?> pollMessageFromGroupInner(Object groupId) {
MessageGroupMetadata groupMetadata = getGroupMetadata(groupId);
if (groupMetadata != null) {
UUID firstId = groupMetadata.firstId();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,30 +19,42 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.integration.support.locks.DefaultLockRegistry;
import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.integration.util.CheckedCallable;
import org.springframework.integration.util.CheckedRunnable;
import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.util.Assert;

/**
* @author Dave Syer
* @author Oleg Zhurakousky
* @author Gary Russell
* @author Artem Bilan
* @author Christian Tzolov
* @author Youbin Wu
*
* @since 2.0
*/
@ManagedResource
public abstract class AbstractMessageGroupStore extends AbstractBatchingMessageGroupStore
implements MessageGroupStore, Iterable<MessageGroup> {

protected static final String INTERRUPTED_WHILE_OBTAINING_LOCK = "Interrupted while obtaining lock";

protected static final String GROUP_ID_MUST_NOT_BE_NULL = "'groupId' must not be null";

protected final Log logger = LogFactory.getLog(getClass()); // NOSONAR final

private final Lock lock = new ReentrantLock();
Expand All @@ -56,11 +68,15 @@ public abstract class AbstractMessageGroupStore extends AbstractBatchingMessageG

private boolean timeoutOnIdle;

protected LockRegistry lockRegistry;

protected AbstractMessageGroupStore() {
this.lockRegistry = new DefaultLockRegistry();
}

protected AbstractMessageGroupStore(boolean lazyLoadMessageGroups) {
this.lazyLoadMessageGroups = lazyLoadMessageGroups;
this.lockRegistry = new DefaultLockRegistry();
}

@Override
Expand Down Expand Up @@ -109,6 +125,16 @@ public void setLazyLoadMessageGroups(boolean lazyLoadMessageGroups) {
this.lazyLoadMessageGroups = lazyLoadMessageGroups;
}

/**
* Specify the type of the {@link LockRegistry} to ensure atomic operations
* @param lockRegistry lockRegistryType
* @since 6.5
*/
public void setLockRegistry(LockRegistry lockRegistry) {
Assert.notNull(lockRegistry, "The LockRegistry cannot be null");
this.lockRegistry = lockRegistry;
}

@Override
public void registerMessageGroupExpiryCallback(MessageGroupCallback callback) {
if (callback instanceof UniqueExpiryCallback) {
Expand Down Expand Up @@ -195,12 +221,98 @@ public void removeMessagesFromGroup(Object key, Message<?>... messages) {
removeMessagesFromGroup(key, Arrays.asList(messages));
}

@Override
public void removeMessagesFromGroup(Object key, Collection<Message<?>> messages) {
Assert.notNull(key, GROUP_ID_MUST_NOT_BE_NULL);
lockExecute(key, () -> removeMessagesFromGroupInner(key, messages));
}

protected abstract void removeMessagesFromGroupInner(Object key, Collection<Message<?>> messages);

@Override
public void addMessagesToGroup(Object groupId, Message<?>... messages) {
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
lockExecute(groupId, () -> addMessagesToGroupInner(groupId, messages));
}

protected abstract void addMessagesToGroupInner(Object groupId, Message<?>... messages);

@Override
public MessageGroup addMessageToGroup(Object groupId, Message<?> message) {
addMessagesToGroup(groupId, message);
return getMessageGroup(groupId);
}

@Override
public void removeMessageGroup(Object groupId) {
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
lockExecute(groupId, () -> removeMessageGroupInner(groupId));
}

protected abstract void removeMessageGroupInner(Object groupId);

@Override
public boolean removeMessageFromGroupById(Object groupId, UUID messageId) {
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
return lockExecute(groupId, () -> removeMessageFromGroupByIdInner(groupId, messageId));
}

protected boolean removeMessageFromGroupByIdInner(Object groupId, UUID messageId) {
throw new UnsupportedOperationException("Not supported for this store");
}

@Override
public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) {
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
lockExecute(groupId, () -> setLastReleasedSequenceNumberForGroupInner(groupId, sequenceNumber));
}

protected abstract void setLastReleasedSequenceNumberForGroupInner(Object groupId, int sequenceNumber);

@Override
public void completeGroup(Object groupId) {
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
lockExecute(groupId, () -> completeGroupInner(groupId));
}

protected abstract void completeGroupInner(Object groupId);

@Override
public void setGroupCondition(Object groupId, String condition) {
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
lockExecute(groupId, () -> setGroupConditionInner(groupId, condition));
}

protected abstract void setGroupConditionInner(Object groupId, String condition);

@Override
public Message<?> pollMessageFromGroup(Object groupId) {
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
return lockExecute(groupId, () -> pollMessageFromGroupInner(groupId));
}

protected abstract Message<?> pollMessageFromGroupInner(Object groupId);

protected <T, E extends RuntimeException> T lockExecute(Object groupId, CheckedCallable<T, E> runnable) {
try {
return this.lockRegistry.executeLocked(groupId, runnable);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new MessagingException(INTERRUPTED_WHILE_OBTAINING_LOCK, e);
}
}

protected <E extends RuntimeException> void lockExecute(Object groupId, CheckedRunnable<E> runnable) {
try {
this.lockRegistry.executeLocked(groupId, runnable);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new MessagingException(INTERRUPTED_WHILE_OBTAINING_LOCK, e);
}
}

private void expire(MessageGroup group) {

RuntimeException exception = null;
Expand Down
Loading

0 comments on commit af5a737

Please sign in to comment.