Skip to content

Commit

Permalink
[improve] [client] Add producerName for deadLetterProducer (apache#21589
Browse files Browse the repository at this point in the history
)

Fixes apache#21441
Related PR: apache#21507

### Motivation

Add producerName for dead letter producer, easier to locate problems.

### Modifications

```java
.producerName(String.format("%s-%s-DLQ", this.topicName, this.subscription))
```

When creating a `deadLetterProducer`, specify the `producerName` to replace the randomly generated name.
  • Loading branch information
crossoverJie authored Nov 21, 2023
1 parent b2f2b53 commit c87cfb3
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,65 @@ public void testDeadLetterTopicWithMessageKey() throws Exception {
consumer.close();
}

public void testDeadLetterTopicWithProducerName() throws Exception {
final String topic = "persistent://my-property/my-ns/dead-letter-topic";
final String subscription = "my-subscription";
String deadLetterProducerName = String.format("%s-%s-DLQ", topic, subscription);

final int maxRedeliveryCount = 1;

final int sendMessages = 100;

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName(subscription)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
.receiverQueueSize(100)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

@Cleanup
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
.subscriptionName("my-subscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topic)
.create();

for (int i = 0; i < sendMessages; i++) {
producer.newMessage()
.value(String.format("Hello Pulsar [%d]", i).getBytes())
.send();
}

producer.close();

int totalReceived = 0;
do {
Message<byte[]> message = consumer.receive();
log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
totalReceived++;
} while (totalReceived < sendMessages * (maxRedeliveryCount + 1));

int totalInDeadLetter = 0;
do {
Message message = deadLetterConsumer.receive();
assertEquals(message.getProducerName(), deadLetterProducerName);
log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
deadLetterConsumer.acknowledge(message);
totalInDeadLetter++;
} while (totalInDeadLetter < sendMessages);

deadLetterConsumer.close();
consumer.close();
}

@DataProvider(name = "produceLargeMessages")
public Object[][] produceLargeMessages() {
return new Object[][] { { false }, { true } };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2171,6 +2171,7 @@ private void initDeadLetterProducerIfNeeded() {
((ProducerBuilderImpl<byte[]>) client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema)))
.initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName())
.topic(this.deadLetterPolicy.getDeadLetterTopic())
.producerName(String.format("%s-%s-DLQ", this.topicName, this.subscription))
.blockIfQueueFull(false)
.enableBatching(false)
.enableChunking(true)
Expand Down

0 comments on commit c87cfb3

Please sign in to comment.