diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java index 64424f5fc..45a21d1f1 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java @@ -333,6 +333,37 @@ public void setProviderMessageIdObject(Object messageId) { properties.setMessageId(messageId); } + /** + * Retrieves the content encoding property, which identifies the encoding of the message content. + * + * @return the content encoding as a String, or null if not set. + */ + public String getContentEncoding() { + if (properties != null) { + Symbol contentEncoding = properties.getContentEncoding(); + if (contentEncoding != null) { + return contentEncoding.toString(); + } + } + + return null; + } + + /** + * Sets the content encoding property, which identifies the encoding of the message content. + * + * @param contentEncoding + * the content encoding as a String, or null to clear. + */ + public void setContentEncoding(String contentEncoding) { + if (contentEncoding != null) { + lazyCreateProperties(); + properties.setContentEncoding(Symbol.valueOf(contentEncoding)); + } else if (properties != null) { + properties.setContentEncoding(null); + } + } + @Override public void setMessageId(String messageId) throws IdConversionException { Object value = AmqpMessageIdHelper.toIdObject(messageId); diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessagePropertyIntercepter.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessagePropertyIntercepter.java index 01aee6b30..1e494817b 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessagePropertyIntercepter.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessagePropertyIntercepter.java @@ -19,6 +19,7 @@ import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_AMQP_REPLY_TO_GROUP_ID; import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_AMQP_TTL; import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_AMQP_TYPED_ENCODING; +import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_AMQP_CONTENT_ENCODING; import java.util.HashMap; import java.util.HashSet; @@ -189,6 +190,33 @@ public void clearProperty(AmqpJmsMessageFacade message) throws JMSException { // TODO - Should we leave encoding intact or change to the default. } }); + PROPERTY_INTERCEPTERS.put(JMS_AMQP_CONTENT_ENCODING, new PropertyIntercepter() { + @Override + public Object getProperty(AmqpJmsMessageFacade message) throws JMSException { + return message.getContentEncoding(); + } + + @Override + public void setProperty(AmqpJmsMessageFacade message, Object value) throws JMSException { + String rc = (String) TypeConversionSupport.convert(value, String.class); + if (rc == null) { + throw new JMSException("Property " + JMS_AMQP_CONTENT_ENCODING + " cannot be set from a " + value.getClass().getName()); + } + message.setContentEncoding(rc); + + } + + @Override + public boolean propertyExists(AmqpJmsMessageFacade message) { + String contentEncoding = message.getContentEncoding(); + return contentEncoding != null && !contentEncoding.isEmpty(); + } + + @Override + public void clearProperty(AmqpJmsMessageFacade message) throws JMSException { + message.setContentEncoding(null); + } + }); } /** diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java index f448e723f..0e0e308d5 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java @@ -83,6 +83,12 @@ public final class AmqpMessageSupport { public static final String JMS_AMQP_REPLY_TO_GROUP_ID = "JMS_AMQP_REPLY_TO_GROUP_ID"; public static final String JMS_AMQP_TYPED_ENCODING = "JMS_AMQP_TYPED_ENCODING"; + /** + * Used to set or access the content-encoding property, identifying the message's encoding. + * Must be a String when set. + */ + public static final String JMS_AMQP_CONTENT_ENCODING = "JMS_AMQP_CONTENT_ENCODING"; + /** * Content type used to mark Data sections as containing a serialized java object. */ diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java index 5dd03d4b6..ce8ff1927 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java @@ -20,9 +20,7 @@ import static org.apache.qpid.jms.provider.amqp.AmqpSupport.QUEUE_PREFIX; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.TOPIC_PREFIX; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.*; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -2410,4 +2408,106 @@ private void doReceivedMessageDeliveryTimeTestImpl(boolean setDeliveryTimeAnnota assertEquals(expectedDeliveryTime, receivedMessage.getJMSDeliveryTime(), "Unexpected delivery time"); } } + + /** + * Tests that when a message with the content-encoding field set is received, + * the JMS message correctly reflects this value through the JMS_AMQP_CONTENT_ENCODING property. + * + * @throws Exception if an error occurs during the test. + */ + @Test + @Timeout(20) + public void testReceivedMessageWithContentEncodingPropertySet() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer()) { + Connection connection = testFixture.establishConnecton(testPeer); + connection.start(); + + testPeer.expectBegin(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + + DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); + PropertiesDescribedType props = new PropertiesDescribedType(); + + String expectedContentEncoding = "gzip"; + props.setContentEncoding(Symbol.valueOf(expectedContentEncoding)); + + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, amqpValueNullContent); + testPeer.expectDispositionThatIsAcceptedAndSettled(); + + MessageConsumer messageConsumer = session.createConsumer(queue); + Message receivedMessage = messageConsumer.receive(3000); + testPeer.waitForAllHandlersToComplete(3000); + + assertNotNull(receivedMessage, "did not receive the message"); + + boolean foundContentEncoding = false; + + Enumeration names = receivedMessage.getPropertyNames(); + + while (names.hasMoreElements()) { + Object element = names.nextElement(); + if (AmqpMessageSupport.JMS_AMQP_CONTENT_ENCODING.equals(element)) { + foundContentEncoding = true; + } + } + + assertTrue(foundContentEncoding, "JMS_AMQP_CONTENT_ENCODING not in property names"); + assertTrue(receivedMessage.propertyExists(AmqpMessageSupport.JMS_AMQP_CONTENT_ENCODING), "JMS_AMQP_CONTENT_ENCODING does not exist"); + assertEquals(expectedContentEncoding, receivedMessage.getStringProperty(AmqpMessageSupport.JMS_AMQP_CONTENT_ENCODING), "did not get the expected JMS_AMQP_CONTENT_ENCODING"); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + /** + * Test that a message with the "content-encoding" property set to "gzip" is correctly sent + * and that the property is encoded as an AMQP Symbol. + * + * @throws Exception if an error occurs during the test. + */ + @Test + @Timeout(20) + public void testSendMessageWithContentEncodingPropertySet() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer()) { + Connection connection = testFixture.establishConnecton(testPeer); + testPeer.expectBegin(); + testPeer.expectSenderAttach(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + String queueName = "myQueue"; + Queue queue = session.createQueue(queueName); + MessageProducer producer = session.createProducer(queue); + + MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true)); + + MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); + + MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true); + propsMatcher.withContentEncoding(equalTo(Symbol.valueOf("gzip"))); + + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(headersMatcher); + messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); + messageMatcher.setPropertiesMatcher(propsMatcher); + messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(null)); + + testPeer.expectTransfer(messageMatcher); + + Message message = session.createTextMessage(); + message.setStringProperty(AmqpMessageSupport.JMS_AMQP_CONTENT_ENCODING, "gzip"); + + producer.send(message); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } } diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessagePropertyIntercepterTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessagePropertyIntercepterTest.java index 58b7964c6..3307a0b36 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessagePropertyIntercepterTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessagePropertyIntercepterTest.java @@ -19,6 +19,7 @@ import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_AMQP_REPLY_TO_GROUP_ID; import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_AMQP_TTL; import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_AMQP_TYPED_ENCODING; +import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_AMQP_CONTENT_ENCODING; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -304,6 +305,79 @@ public void testSetJmsAmqpTypedEncodingConversionChecks() throws JMSException { } } + //-------- JMS_AMQP_CONTENT_ENCODING -------------------------------------// + + @Test + public void testJmsAmqpContentEncodingInGetAllPropertyNames() throws JMSException { + assertTrue(AmqpJmsMessagePropertyIntercepter.getAllPropertyNames().contains(JMS_AMQP_CONTENT_ENCODING)); + } + + @Test + public void testSetJmsAmqpContentEncoding() throws JMSException { + String testValue = "gzip"; + AmqpJmsObjectMessageFacade message = createAmqpObjectMessageFacade(); + AmqpJmsMessagePropertyIntercepter.setProperty(message, JMS_AMQP_CONTENT_ENCODING, testValue); + Mockito.verify(message).setContentEncoding(testValue); + } + + @Test + public void testGetJmsAmqpContentEncodingWhenNotSet() throws JMSException { + AmqpJmsMessageFacade message = createAmqpMessageFacade(); + assertNull(AmqpJmsMessagePropertyIntercepter.getProperty(message, JMS_AMQP_CONTENT_ENCODING)); + } + + @Test + public void testGetJmsAmqpContentEncodingWhenSet() throws JMSException { + String testValue = "gzip"; + AmqpJmsMessageFacade message = createAmqpMessageFacade(); + Mockito.when(message.getContentEncoding()).thenReturn(testValue); + assertNotNull(AmqpJmsMessagePropertyIntercepter.getProperty(message, JMS_AMQP_CONTENT_ENCODING)); + assertEquals(testValue, AmqpJmsMessagePropertyIntercepter.getProperty(message, JMS_AMQP_CONTENT_ENCODING)); + } + + @Test + public void testJmsAmqpContentEncodingNotInPropertyNamesWhenNotSet() throws JMSException { + AmqpJmsMessageFacade message = createAmqpMessageFacade(); + assertNull(AmqpJmsMessagePropertyIntercepter.getProperty(message, JMS_AMQP_CONTENT_ENCODING)); + assertFalse(AmqpJmsMessagePropertyIntercepter.getPropertyNames(message).contains(JMS_AMQP_CONTENT_ENCODING)); + } + + @Test + public void testJmsAmqpContentEncodingInPropertyNamesWhenSet() throws JMSException { + String testValue = "gzip"; + AmqpJmsMessageFacade message = createAmqpMessageFacade(); + Mockito.when(message.getApplicationPropertyNames(anySet())).then(new PassPropertyNames()); + Mockito.when(message.getContentEncoding()).thenReturn(testValue); + assertTrue(AmqpJmsMessagePropertyIntercepter.getPropertyNames(message).contains(JMS_AMQP_CONTENT_ENCODING)); + } + + @Test + public void testJmsAmqpContentEncodingPropertyExistsWhenSet() throws JMSException { + String testValue = "gzip"; + AmqpJmsMessageFacade message = createAmqpMessageFacade(); + Mockito.when(message.getContentEncoding()).thenReturn(testValue); + assertTrue(AmqpJmsMessagePropertyIntercepter.propertyExists(message, JMS_AMQP_CONTENT_ENCODING)); + } + + @Test + public void testJmsAmqpContentEncodingPropertyExistsWhenNotSet() throws JMSException { + AmqpJmsMessageFacade message = createAmqpMessageFacade(); + Mockito.when(message.getContentEncoding()).thenReturn(null); + assertFalse(AmqpJmsMessagePropertyIntercepter.propertyExists(message, JMS_AMQP_CONTENT_ENCODING)); + Mockito.when(message.getContentEncoding()).thenReturn(""); + assertFalse(AmqpJmsMessagePropertyIntercepter.propertyExists(message, JMS_AMQP_CONTENT_ENCODING)); + } + + @Test + public void testSetJmsAmqpContentEncodingConversionChecks() throws JMSException { + AmqpJmsMessageFacade message = createAmqpMessageFacade(); + try { + AmqpJmsMessagePropertyIntercepter.setProperty(message, JMS_AMQP_CONTENT_ENCODING, new byte[1]); + fail("Should have thrown an exception for this call"); + } catch (JMSException ignored) { + } + } + //--------- Utilities ----------------------------------------------------// private AmqpJmsMessageFacade createAmqpMessageFacade() {