Skip to content

Commit

Permalink
GH-8876: Use long for AmqpHeaders.DELAY header
Browse files Browse the repository at this point in the history
Fixes: #8876

* Use respective new `MessageProperties` `getDelayLong()` & `setDelayLong()`
  • Loading branch information
artembilan committed Jan 30, 2024
1 parent 0ba60bc commit 19ad0dc
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -106,7 +106,7 @@ public abstract class AbstractAmqpOutboundEndpoint extends AbstractReplyProducin

private Expression delayExpression;

private ExpressionEvaluatingMessageProcessor<Integer> delayGenerator;
private ExpressionEvaluatingMessageProcessor<Long> delayGenerator;

private boolean headersMappedLast;

Expand Down Expand Up @@ -483,7 +483,7 @@ private void configureCorrelationDataGenerator(BeanFactory beanFactory) {

private void configureDelayGenerator(BeanFactory beanFactory) {
if (this.delayExpression != null) {
this.delayGenerator = new ExpressionEvaluatingMessageProcessor<>(this.delayExpression, Integer.class);
this.delayGenerator = new ExpressionEvaluatingMessageProcessor<>(this.delayExpression, Long.class);
if (beanFactory != null) {
this.delayGenerator.setBeanFactory(beanFactory);
}
Expand Down Expand Up @@ -622,7 +622,7 @@ protected String generateRoutingKey(Message<?> requestMessage) {

protected void addDelayProperty(Message<?> message, org.springframework.amqp.core.Message amqpMessage) {
if (this.delayGenerator != null) {
amqpMessage.getMessageProperties().setDelay(this.delayGenerator.processMessage(message));
amqpMessage.getMessageProperties().setDelayLong(this.delayGenerator.processMessage(message));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -133,7 +133,8 @@ protected Map<String, Object> extractStandardHeaders(MessageProperties amqpMessa
JavaUtils.INSTANCE
.acceptIfCondition(priority != null && priority > 0, IntegrationMessageHeaderAccessor.PRIORITY,
priority, headers::put)
.acceptIfNotNull(AmqpHeaders.RECEIVED_DELAY, amqpMessageProperties.getReceivedDelay(), headers::put)
.acceptIfNotNull(AmqpHeaders.RECEIVED_DELAY, amqpMessageProperties.getReceivedDelayLong(),
headers::put)
.acceptIfNotNull(AmqpHeaders.RECEIVED_EXCHANGE, amqpMessageProperties.getReceivedExchange(),
headers::put)
.acceptIfHasText(AmqpHeaders.RECEIVED_ROUTING_KEY, amqpMessageProperties.getReceivedRoutingKey(),
Expand Down Expand Up @@ -196,8 +197,8 @@ protected void populateStandardHeaders(@Nullable Map<String, Object> allHeaders,
amqpMessageProperties::setContentType)
.acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.CORRELATION_ID, String.class),
amqpMessageProperties::setCorrelationId)
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELAY, Integer.class),
amqpMessageProperties::setDelay)
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELAY, Long.class),
amqpMessageProperties::setDelayLong)
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELIVERY_MODE, MessageDeliveryMode.class),
amqpMessageProperties::setDeliveryMode)
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELIVERY_TAG, Long.class),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -79,19 +79,19 @@ public void testDelayExpression() {
endpoint.handleMessage(new GenericMessage<>("foo"));
ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
verify(amqpTemplate).send(eq("foo"), eq("bar"), captor.capture(), isNull());
assertThat(captor.getValue().getMessageProperties().getDelay()).isEqualTo(42);
assertThat(captor.getValue().getMessageProperties().getDelayLong()).isEqualTo(42);
endpoint.setExpectReply(true);
endpoint.setOutputChannel(new NullChannel());
endpoint.handleMessage(new GenericMessage<>("foo"));
verify(amqpTemplate).sendAndReceive(eq("foo"), eq("bar"), captor.capture(), isNull());
assertThat(captor.getValue().getMessageProperties().getDelay()).isEqualTo(42);
assertThat(captor.getValue().getMessageProperties().getDelayLong()).isEqualTo(42);

endpoint.setDelay(23);
endpoint.setRoutingKey("baz");
endpoint.afterPropertiesSet();
endpoint.handleMessage(new GenericMessage<>("foo"));
verify(amqpTemplate).sendAndReceive(eq("foo"), eq("baz"), captor.capture(), isNull());
assertThat(captor.getValue().getMessageProperties().getDelay()).isEqualTo(23);
assertThat(captor.getValue().getMessageProperties().getDelayLong()).isEqualTo(23);
}

@Test
Expand All @@ -114,7 +114,7 @@ public void testAsyncDelayExpression() {
ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
gateway.handleMessage(new GenericMessage<>("foo"));
verify(amqpTemplate).sendAndReceive(eq("foo"), eq("bar"), captor.capture());
assertThat(captor.getValue().getMessageProperties().getDelay()).isEqualTo(42);
assertThat(captor.getValue().getMessageProperties().getDelayLong()).isEqualTo(42);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -77,7 +77,7 @@ public void fromHeaders() {
headerMap.put(AmqpHeaders.CONTENT_TYPE, "test.contentType");
String testCorrelationId = "foo";
headerMap.put(AmqpHeaders.CORRELATION_ID, testCorrelationId);
headerMap.put(AmqpHeaders.DELAY, 1234);
headerMap.put(AmqpHeaders.DELAY, 1234L);
headerMap.put(AmqpHeaders.DELIVERY_MODE, MessageDeliveryMode.NON_PERSISTENT);
headerMap.put(AmqpHeaders.DELIVERY_TAG, 1234L);
headerMap.put(AmqpHeaders.EXPIRATION, "test.expiration");
Expand Down Expand Up @@ -111,7 +111,7 @@ public void fromHeaders() {
assertThat(amqpProperties.getContentLength()).isEqualTo(99L);
assertThat(amqpProperties.getContentType()).isEqualTo("test.contentType");
assertThat(amqpProperties.getCorrelationId()).isEqualTo(testCorrelationId);
assertThat(amqpProperties.getDelay()).isEqualTo(Integer.valueOf(1234));
assertThat(amqpProperties.getDelayLong()).isEqualTo(1234L);
assertThat(amqpProperties.getDeliveryMode()).isEqualTo(MessageDeliveryMode.NON_PERSISTENT);
assertThat(amqpProperties.getDeliveryTag()).isEqualTo(1234L);
assertThat(amqpProperties.getExpiration()).isEqualTo("test.expiration");
Expand Down Expand Up @@ -183,7 +183,7 @@ public void toHeaders() {
amqpProperties.setMessageCount(42);
amqpProperties.setMessageId("test.messageId");
amqpProperties.setPriority(22);
amqpProperties.setReceivedDelay(4567);
amqpProperties.setReceivedDelayLong(4567L);
amqpProperties.setReceivedExchange("test.receivedExchange");
amqpProperties.setReceivedRoutingKey("test.receivedRoutingKey");
amqpProperties.setRedelivered(true);
Expand All @@ -206,7 +206,7 @@ public void toHeaders() {
assertThat(headerMap.get(AmqpHeaders.EXPIRATION)).isEqualTo("test.expiration");
assertThat(headerMap.get(AmqpHeaders.MESSAGE_COUNT)).isEqualTo(42);
assertThat(headerMap.get(AmqpHeaders.MESSAGE_ID)).isEqualTo("test.messageId");
assertThat(headerMap.get(AmqpHeaders.RECEIVED_DELAY)).isEqualTo(4567);
assertThat(headerMap.get(AmqpHeaders.RECEIVED_DELAY)).isEqualTo(4567L);
assertThat(headerMap.get(AmqpHeaders.RECEIVED_EXCHANGE)).isEqualTo("test.receivedExchange");
assertThat(headerMap.get(AmqpHeaders.RECEIVED_ROUTING_KEY)).isEqualTo("test.receivedRoutingKey");
assertThat(headerMap.get(AmqpHeaders.REPLY_TO)).isEqualTo("test.replyTo");
Expand Down

0 comments on commit 19ad0dc

Please sign in to comment.