Skip to content

Commit

Permalink
[fix][client] Fix race-condition causing doReconsumeLater to hang whe…
Browse files Browse the repository at this point in the history
…n creating retryLetterProducer has failed (apache#23560)
  • Loading branch information
hanmz authored Nov 29, 2024
1 parent 280997e commit bf1f677
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.Data;
Expand All @@ -45,6 +46,7 @@
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.testng.collections.Lists;

@Test(groups = "broker-api")
public class RetryTopicTest extends ProducerConsumerBase {
Expand Down Expand Up @@ -713,4 +715,70 @@ public void testRetryProducerWillCloseByConsumer() throws Exception {
admin.topics().delete(topicDLQ, false);
}


@Test(timeOut = 30000L)
public void testRetryTopicExceptionWithConcurrent() throws Exception {
final String topic = "persistent://my-property/my-ns/retry-topic";
final int maxRedeliveryCount = 2;
final int sendMessages = 10;
// subscribe before publish
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)
.receiverQueueSize(100)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
.build())
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topic)
.create();
for (int i = 0; i < sendMessages; i++) {
producer.newMessage().key("1").value(String.format("Hello Pulsar [%d]", i).getBytes()).send();
}
producer.close();

// mock a retry producer exception when reconsumelater is called
MultiTopicsConsumerImpl<byte[]> multiTopicsConsumer = (MultiTopicsConsumerImpl<byte[]>) consumer;
List<ConsumerImpl<byte[]>> consumers = multiTopicsConsumer.getConsumers();
for (ConsumerImpl<byte[]> c : consumers) {
Set<Field> deadLetterPolicyField =
ReflectionUtils.getAllFields(c.getClass(), ReflectionUtils.withName("deadLetterPolicy"));

if (deadLetterPolicyField.size() != 0) {
Field field = deadLetterPolicyField.iterator().next();
field.setAccessible(true);
DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) field.get(c);
deadLetterPolicy.setRetryLetterTopic("#persistent://invalid-topic#");
}
}

List<Message<byte[]>> messages = Lists.newArrayList();
for (int i = 0; i < sendMessages; i++) {
messages.add(consumer.receive());
}

// mock call the reconsumeLater method concurrently
CountDownLatch latch = new CountDownLatch(messages.size());
for (Message<byte[]> message : messages) {
new Thread(() -> {
try {
consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
} catch (Exception ignore) {

} finally {
latch.countDown();
}
}).start();
}

latch.await();
consumer.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,8 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
} catch (Exception e) {
result.completeExceptionally(e);
}
} else {
result.completeExceptionally(new PulsarClientException("Retry letter producer is null."));
}
MessageId finalMessageId = messageId;
result.exceptionally(ex -> {
Expand Down

0 comments on commit bf1f677

Please sign in to comment.