Skip to content

Commit

Permalink
feat: Add MessagingMessageConverter to support Header messaging under…
Browse files Browse the repository at this point in the history
… RedisInboundChannelAdapter.

feat: RedisQueueMessageDrivenEndpoint change expectMessage to true , and set serialization method to RedisSerializer.java
  • Loading branch information
guoshiqiufeng committed Sep 14, 2024
1 parent bad7423 commit c8af648
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package io.github.guoshiqiufeng.cloud.stream.binder.redis.support.converter;

import org.jetbrains.annotations.NotNull;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.SimpleMessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.MessageHeaderAccessor;

/**
* A message transformer for redis, primarily designed to support Header message sending.
*
* @author yanghq
* @version 1.0
* @since 2024/9/13 11:39
*/
public class MessagingMessageConverter implements MessageConverter {

private volatile MessageConverter messagingConverter = new SimpleMessageConverter();

@Override
public Object fromMessage(Message<?> messageArg, Class<?> targetClass) {
Message<?> message = messageArg;
if (this.messagingConverter != null) {
Message<?> converted = this.messagingConverter.toMessage(message.getPayload(), message.getHeaders());
if (converted != null) {
message = converted;
}
}

return RedisSerializer.java().serialize(message);
}

@Override
public Message<?> toMessage(@NotNull Object payload, MessageHeaders headers) {
if (payload instanceof byte[] bytes) {
Object deserialize = RedisSerializer.java().deserialize(bytes);
if (deserialize instanceof Message<?> record) {
payload = record.getPayload();
headers = record.getHeaders();
}
}
if (headers != null) {
MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(headers, MessageHeaderAccessor.class);
if (accessor != null && accessor.isMutable()) {
return MessageBuilder.createMessage(payload, accessor.getMessageHeaders());
}
}
return MessageBuilder.withPayload(payload).copyHeaders(headers).build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.github.guoshiqiufeng.cloud.stream.binder.redis.properties.RedisExtendedBindingProperties;
import io.github.guoshiqiufeng.cloud.stream.binder.redis.properties.RedisProducerProperties;
import io.github.guoshiqiufeng.cloud.stream.binder.redis.provisioning.RedisTopicProvisioner;
import io.github.guoshiqiufeng.cloud.stream.binder.redis.support.converter.MessagingMessageConverter;
import io.github.guoshiqiufeng.cloud.stream.binder.redis.utils.RedisConnectionFactoryUtil;
import lombok.Setter;
import org.springframework.beans.factory.BeanFactory;
Expand All @@ -30,6 +31,7 @@
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.redis.inbound.RedisInboundChannelAdapter;
import org.springframework.integration.redis.inbound.RedisQueueMessageDrivenEndpoint;
Expand Down Expand Up @@ -115,15 +117,13 @@ protected MessageHandler createProducerMessageHandler(ProducerDestination destin
if (producerProperties != null && producerProperties.getSerializer() != null) {
handler.setSerializer(producerProperties.getSerializer());
}
handler.setMessageConverter(new MessagingMessageConverter());
return handler;
} else {
RedisQueueOutboundChannelAdapter handler = new RedisQueueOutboundChannelAdapter(destination.getName(), connectionFactory);
// handler.set
handler.setApplicationContext(applicationContext);
handler.setBeanFactory(beanFactory);
if (producerProperties != null && producerProperties.getSerializer() != null) {
handler.setSerializer(producerProperties.getSerializer());
}
handler.setExtractPayload(false);
return handler;
}
}
Expand Down Expand Up @@ -162,6 +162,7 @@ protected MessageProducer createConsumerEndpoint(ConsumerDestination destination
if (consumerProperties != null && consumerProperties.getSerializer() != null) {
redisInboundChannelAdapter.setSerializer(consumerProperties.getSerializer());
}
redisInboundChannelAdapter.setMessageConverter(new MessagingMessageConverter());
redisInboundChannelAdapter.setBeanFactory(getBeanFactory());
redisInboundChannelAdapter.setApplicationContext(applicationContext);
return redisInboundChannelAdapter;
Expand All @@ -174,9 +175,8 @@ protected MessageProducer createConsumerEndpoint(ConsumerDestination destination
redisQueueMessageDrivenEndpoint.setBeanName(extendedConsumerProperties.getBindingName());
redisQueueMessageDrivenEndpoint.setBeanFactory(getBeanFactory());
redisQueueMessageDrivenEndpoint.setApplicationContext(applicationContext);
if (consumerProperties != null && consumerProperties.getSerializer() != null) {
redisQueueMessageDrivenEndpoint.setSerializer(consumerProperties.getSerializer());
}
redisQueueMessageDrivenEndpoint.setSerializer(RedisSerializer.java());
redisQueueMessageDrivenEndpoint.setExpectMessage(true);
return redisQueueMessageDrivenEndpoint;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void testSendAndReceiveNoOriginalContentType(TestInfo testInfo) throws Ex
inboundMessageRef.get().getPayload())
.isEqualTo("foo".getBytes());
assertThat(inboundMessageRef.get().getHeaders().get(MessageHeaders.CONTENT_TYPE))
.isEqualTo(MimeTypeUtils.APPLICATION_JSON);
.isEqualTo(MimeTypeUtils.TEXT_PLAIN);

producerBinding.unbind();
consumerBinding.unbind();
Expand Down Expand Up @@ -163,7 +163,7 @@ public void testSendAndReceive(TestInfo testInfo) throws Exception {
assertThat(inboundMessageRef.get().getPayload())
.isEqualTo("foo".getBytes());
assertThat(inboundMessageRef.get().getHeaders().get(MessageHeaders.CONTENT_TYPE))
.isEqualTo(MimeTypeUtils.APPLICATION_JSON);
.isEqualTo(MimeTypeUtils.TEXT_PLAIN);

producerBinding.unbind();
consumerBinding.unbind();
Expand Down Expand Up @@ -229,7 +229,7 @@ public Spy spyOn(String name) {
private RedisBinderConfigurationProperties createConfigurationProperties() {
var binderConfiguration = new RedisBinderConfigurationProperties(
redisProperties);
binderConfiguration.setSupportType(RedisBinderConfigurationProperties.SupportType.PUBLISH_SUBSCRIBE_CHANNEL);
binderConfiguration.setSupportType(RedisBinderConfigurationProperties.SupportType.QUEUE_CHANNEL);
binderConfiguration.getConsumerProperties().setSerializer(RedisSerializer.byteArray());
binderConfiguration.getProducerProperties().setSerializer(RedisSerializer.byteArray());
return binderConfiguration;
Expand Down

0 comments on commit c8af648

Please sign in to comment.