Skip to content
This repository has been archived by the owner on Feb 1, 2024. It is now read-only.

Unpacking sns envelopes does not include all message headers #157

Open
maplester opened this issue Jun 23, 2023 · 1 comment
Open

Unpacking sns envelopes does not include all message headers #157

maplester opened this issue Jun 23, 2023 · 1 comment

Comments

@maplester
Copy link

maplester commented Jun 23, 2023

I found an issue using this binder with "snsFanout: true" to read sns messages from sqs queues. Custom message headers dont get converted. The following changes to the class SnsFanoutMessageBuilderFactory solve the problem:

package de.idealo.spring.stream.binder.sqs;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.springframework.integration.support.DefaultMessageBuilderFactory;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.converter.MessageConversionException;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

public class SnsFanoutMessageBuilderFactory extends DefaultMessageBuilderFactory {

    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    @SuppressWarnings("unchecked")
    public <T> MessageBuilder<T> fromMessage(Message<T> message) {
        JsonNode jsonNode;
        try {
            jsonNode = this.objectMapper.readTree((String) message.getPayload());
        } catch (JsonProcessingException e) {
            throw new MessagingException(message, e);
        }

        if (!jsonNode.has("Type")) {
            throw new MessageConversionException("Payload: '" + message.getPayload()
                    + "' does not contain a Type attribute", null);
        }

        if (!"Notification".equals(jsonNode.get("Type").asText())) {
            throw new MessageConversionException(
                    "Payload: '" + message.getPayload() + "' is not a valid notification",
                    null);
        }

        if (!jsonNode.has("Message")) {
            throw new MessageConversionException(
                    "Payload: '" + message.getPayload() + "' does not contain a message",
                    null);
        }

        String messagePayload = jsonNode.get("Message").asText();

        Map<String, String> envelopedMessageAttributes = new HashMap<>();
        if (jsonNode.has("MessageAttributes")) {
            envelopedMessageAttributes.putAll(toMap(jsonNode.get("MessageAttributes")));
        }

        return (MessageBuilder<T>)
            MessageBuilder.withPayload(messagePayload)
                .copyHeaders(message.getHeaders())
                .copyHeaders(envelopedMessageAttributes);
    }

    private static Map<String, String> toMap(JsonNode node) {
        Map<String, String> messageHeaders = new HashMap<>();
        Iterator<String> fieldNames = node.fieldNames();
        while (fieldNames.hasNext()) {
            String attributeName = fieldNames.next();
            String attributeValue = node.get(attributeName).get("Value").asText();
            messageHeaders.put(attributeName, attributeValue);
        }
        return messageHeaders;
    }
}

Could you please include this change?

@maplester
Copy link
Author

@joerglaumann as you are the one who updated to major version 3, could you make this change?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant