From c87cfb3f50b32f5c032b5d1c3a6a0b91cde2bbe9 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Tue, 21 Nov 2023 18:17:23 +0800 Subject: [PATCH] [improve] [client] Add producerName for deadLetterProducer (#21589) Fixes #21441 Related PR: https://github.com/apache/pulsar/pull/21507 ### Motivation Add producerName for dead letter producer, easier to locate problems. ### Modifications ```java .producerName(String.format("%s-%s-DLQ", this.topicName, this.subscription)) ``` When creating a `deadLetterProducer`, specify the `producerName` to replace the randomly generated name. --- .../client/api/DeadLetterTopicTest.java | 59 +++++++++++++++++++ .../pulsar/client/impl/ConsumerImpl.java | 1 + 2 files changed, 60 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java index 7be292a602603..ea93ece549e27 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java @@ -137,6 +137,65 @@ public void testDeadLetterTopicWithMessageKey() throws Exception { consumer.close(); } + public void testDeadLetterTopicWithProducerName() throws Exception { + final String topic = "persistent://my-property/my-ns/dead-letter-topic"; + final String subscription = "my-subscription"; + String deadLetterProducerName = String.format("%s-%s-DLQ", topic, subscription); + + final int maxRedeliveryCount = 1; + + final int sendMessages = 100; + + Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName(subscription) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(1, TimeUnit.SECONDS) + .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build()) + .receiverQueueSize(100) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + @Cleanup + PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES) + .topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ") + .subscriptionName("my-subscription") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .create(); + + for (int i = 0; i < sendMessages; i++) { + producer.newMessage() + .value(String.format("Hello Pulsar [%d]", i).getBytes()) + .send(); + } + + producer.close(); + + int totalReceived = 0; + do { + Message message = consumer.receive(); + log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + totalReceived++; + } while (totalReceived < sendMessages * (maxRedeliveryCount + 1)); + + int totalInDeadLetter = 0; + do { + Message message = deadLetterConsumer.receive(); + assertEquals(message.getProducerName(), deadLetterProducerName); + log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + deadLetterConsumer.acknowledge(message); + totalInDeadLetter++; + } while (totalInDeadLetter < sendMessages); + + deadLetterConsumer.close(); + consumer.close(); + } + @DataProvider(name = "produceLargeMessages") public Object[][] produceLargeMessages() { return new Object[][] { { false }, { true } }; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 85d6c5668d54c..fbc2a8c285dd2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -2171,6 +2171,7 @@ private void initDeadLetterProducerIfNeeded() { ((ProducerBuilderImpl) client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema))) .initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName()) .topic(this.deadLetterPolicy.getDeadLetterTopic()) + .producerName(String.format("%s-%s-DLQ", this.topicName, this.subscription)) .blockIfQueueFull(false) .enableBatching(false) .enableChunking(true)