Skip to content

Commit

Permalink
QPIDJMS-295 Add support for content-encoding property
Browse files Browse the repository at this point in the history
  • Loading branch information
Havret committed Sep 18, 2024
1 parent 9e5b9fd commit ded5733
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,37 @@ public void setProviderMessageIdObject(Object messageId) {
properties.setMessageId(messageId);
}

/**
* Retrieves the content encoding property, which identifies the encoding of the message content.
*
* @return the content encoding as a String, or null if not set.
*/
public String getContentEncoding() {
if (properties != null) {
Symbol contentEncoding = properties.getContentEncoding();
if (contentEncoding != null) {
return contentEncoding.toString();
}
}

return null;
}

/**
* Sets the content encoding property, which identifies the encoding of the message content.
*
* @param contentEncoding
* the content encoding as a String, or null to clear.
*/
public void setContentEncoding(String contentEncoding) {
if (contentEncoding != null) {
lazyCreateProperties();
properties.setContentEncoding(Symbol.valueOf(contentEncoding));
} else if (properties != null) {
properties.setContentEncoding(null);
}
}

@Override
public void setMessageId(String messageId) throws IdConversionException {
Object value = AmqpMessageIdHelper.toIdObject(messageId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_AMQP_REPLY_TO_GROUP_ID;
import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_AMQP_TTL;
import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_AMQP_TYPED_ENCODING;
import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_AMQP_CONTENT_ENCODING;

import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -189,6 +190,33 @@ public void clearProperty(AmqpJmsMessageFacade message) throws JMSException {
// TODO - Should we leave encoding intact or change to the default.
}
});
PROPERTY_INTERCEPTERS.put(JMS_AMQP_CONTENT_ENCODING, new PropertyIntercepter() {
@Override
public Object getProperty(AmqpJmsMessageFacade message) throws JMSException {
return message.getContentEncoding();
}

@Override
public void setProperty(AmqpJmsMessageFacade message, Object value) throws JMSException {
String rc = (String) TypeConversionSupport.convert(value, String.class);
if (rc == null) {
throw new JMSException("Property " + JMS_AMQP_CONTENT_ENCODING + " cannot be set from a " + value.getClass().getName());
}
message.setContentEncoding(rc);

}

@Override
public boolean propertyExists(AmqpJmsMessageFacade message) {
String contentEncoding = message.getContentEncoding();
return contentEncoding != null && !contentEncoding.isEmpty();
}

@Override
public void clearProperty(AmqpJmsMessageFacade message) throws JMSException {
message.setContentEncoding(null);
}
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ public final class AmqpMessageSupport {
public static final String JMS_AMQP_REPLY_TO_GROUP_ID = "JMS_AMQP_REPLY_TO_GROUP_ID";
public static final String JMS_AMQP_TYPED_ENCODING = "JMS_AMQP_TYPED_ENCODING";

/**
* Used to set or access the content-encoding property, identifying the message's encoding.
* Must be a String when set.
*/
public static final String JMS_AMQP_CONTENT_ENCODING = "JMS_AMQP_CONTENT_ENCODING";

/**
* Content type used to mark Data sections as containing a serialized java object.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@

import static org.apache.qpid.jms.provider.amqp.AmqpSupport.QUEUE_PREFIX;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.TOPIC_PREFIX;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand Down Expand Up @@ -2410,4 +2408,106 @@ private void doReceivedMessageDeliveryTimeTestImpl(boolean setDeliveryTimeAnnota
assertEquals(expectedDeliveryTime, receivedMessage.getJMSDeliveryTime(), "Unexpected delivery time");
}
}

/**
* Tests that when a message with the content-encoding field set is received,
* the JMS message correctly reflects this value through the JMS_AMQP_CONTENT_ENCODING property.
*
* @throws Exception if an error occurs during the test.
*/
@Test
@Timeout(20)
public void testReceivedMessageWithContentEncodingPropertySet() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();

testPeer.expectBegin();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");

DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
PropertiesDescribedType props = new PropertiesDescribedType();

String expectedContentEncoding = "gzip";
props.setContentEncoding(Symbol.valueOf(expectedContentEncoding));

testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, amqpValueNullContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();

MessageConsumer messageConsumer = session.createConsumer(queue);
Message receivedMessage = messageConsumer.receive(3000);
testPeer.waitForAllHandlersToComplete(3000);

assertNotNull(receivedMessage, "did not receive the message");

boolean foundContentEncoding = false;

Enumeration<?> names = receivedMessage.getPropertyNames();

while (names.hasMoreElements()) {
Object element = names.nextElement();
if (AmqpMessageSupport.JMS_AMQP_CONTENT_ENCODING.equals(element)) {
foundContentEncoding = true;
}
}

assertTrue(foundContentEncoding, "JMS_AMQP_CONTENT_ENCODING not in property names");
assertTrue(receivedMessage.propertyExists(AmqpMessageSupport.JMS_AMQP_CONTENT_ENCODING), "JMS_AMQP_CONTENT_ENCODING does not exist");
assertEquals(expectedContentEncoding, receivedMessage.getStringProperty(AmqpMessageSupport.JMS_AMQP_CONTENT_ENCODING), "did not get the expected JMS_AMQP_CONTENT_ENCODING");

testPeer.expectClose();
connection.close();

testPeer.waitForAllHandlersToComplete(1000);
}
}

/**
* Test that a message with the "content-encoding" property set to "gzip" is correctly sent
* and that the property is encoded as an AMQP Symbol.
*
* @throws Exception if an error occurs during the test.
*/
@Test
@Timeout(20)
public void testSendMessageWithContentEncodingPropertySet() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
Connection connection = testFixture.establishConnecton(testPeer);
testPeer.expectBegin();
testPeer.expectSenderAttach();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = "myQueue";
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);

MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));

MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);

MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
propsMatcher.withContentEncoding(equalTo(Symbol.valueOf("gzip")));

TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
messageMatcher.setPropertiesMatcher(propsMatcher);
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(null));

testPeer.expectTransfer(messageMatcher);

Message message = session.createTextMessage();
message.setStringProperty(AmqpMessageSupport.JMS_AMQP_CONTENT_ENCODING, "gzip");

producer.send(message);

testPeer.expectClose();
connection.close();

testPeer.waitForAllHandlersToComplete(1000);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_AMQP_REPLY_TO_GROUP_ID;
import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_AMQP_TTL;
import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_AMQP_TYPED_ENCODING;
import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_AMQP_CONTENT_ENCODING;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand Down Expand Up @@ -304,6 +305,79 @@ public void testSetJmsAmqpTypedEncodingConversionChecks() throws JMSException {
}
}

//-------- JMS_AMQP_CONTENT_ENCODING -------------------------------------//

@Test
public void testJmsAmqpContentEncodingInGetAllPropertyNames() throws JMSException {
assertTrue(AmqpJmsMessagePropertyIntercepter.getAllPropertyNames().contains(JMS_AMQP_CONTENT_ENCODING));
}

@Test
public void testSetJmsAmqpContentEncoding() throws JMSException {
String testValue = "gzip";
AmqpJmsObjectMessageFacade message = createAmqpObjectMessageFacade();
AmqpJmsMessagePropertyIntercepter.setProperty(message, JMS_AMQP_CONTENT_ENCODING, testValue);
Mockito.verify(message).setContentEncoding(testValue);
}

@Test
public void testGetJmsAmqpContentEncodingWhenNotSet() throws JMSException {
AmqpJmsMessageFacade message = createAmqpMessageFacade();
assertNull(AmqpJmsMessagePropertyIntercepter.getProperty(message, JMS_AMQP_CONTENT_ENCODING));
}

@Test
public void testGetJmsAmqpContentEncodingWhenSet() throws JMSException {
String testValue = "gzip";
AmqpJmsMessageFacade message = createAmqpMessageFacade();
Mockito.when(message.getContentEncoding()).thenReturn(testValue);
assertNotNull(AmqpJmsMessagePropertyIntercepter.getProperty(message, JMS_AMQP_CONTENT_ENCODING));
assertEquals(testValue, AmqpJmsMessagePropertyIntercepter.getProperty(message, JMS_AMQP_CONTENT_ENCODING));
}

@Test
public void testJmsAmqpContentEncodingNotInPropertyNamesWhenNotSet() throws JMSException {
AmqpJmsMessageFacade message = createAmqpMessageFacade();
assertNull(AmqpJmsMessagePropertyIntercepter.getProperty(message, JMS_AMQP_CONTENT_ENCODING));
assertFalse(AmqpJmsMessagePropertyIntercepter.getPropertyNames(message).contains(JMS_AMQP_CONTENT_ENCODING));
}

@Test
public void testJmsAmqpContentEncodingInPropertyNamesWhenSet() throws JMSException {
String testValue = "gzip";
AmqpJmsMessageFacade message = createAmqpMessageFacade();
Mockito.when(message.getApplicationPropertyNames(anySet())).then(new PassPropertyNames());
Mockito.when(message.getContentEncoding()).thenReturn(testValue);
assertTrue(AmqpJmsMessagePropertyIntercepter.getPropertyNames(message).contains(JMS_AMQP_CONTENT_ENCODING));
}

@Test
public void testJmsAmqpContentEncodingPropertyExistsWhenSet() throws JMSException {
String testValue = "gzip";
AmqpJmsMessageFacade message = createAmqpMessageFacade();
Mockito.when(message.getContentEncoding()).thenReturn(testValue);
assertTrue(AmqpJmsMessagePropertyIntercepter.propertyExists(message, JMS_AMQP_CONTENT_ENCODING));
}

@Test
public void testJmsAmqpContentEncodingPropertyExistsWhenNotSet() throws JMSException {
AmqpJmsMessageFacade message = createAmqpMessageFacade();
Mockito.when(message.getContentEncoding()).thenReturn(null);
assertFalse(AmqpJmsMessagePropertyIntercepter.propertyExists(message, JMS_AMQP_CONTENT_ENCODING));
Mockito.when(message.getContentEncoding()).thenReturn("");
assertFalse(AmqpJmsMessagePropertyIntercepter.propertyExists(message, JMS_AMQP_CONTENT_ENCODING));
}

@Test
public void testSetJmsAmqpContentEncodingConversionChecks() throws JMSException {
AmqpJmsMessageFacade message = createAmqpMessageFacade();
try {
AmqpJmsMessagePropertyIntercepter.setProperty(message, JMS_AMQP_CONTENT_ENCODING, new byte[1]);
fail("Should have thrown an exception for this call");
} catch (JMSException ignored) {
}
}

//--------- Utilities ----------------------------------------------------//

private AmqpJmsMessageFacade createAmqpMessageFacade() {
Expand Down

0 comments on commit ded5733

Please sign in to comment.