From 18e94b3dd982cd85b7e33e4819f3e92365f905f1 Mon Sep 17 00:00:00 2001 From: Jiwei Guo Date: Fri, 18 Aug 2023 22:54:44 +0800 Subject: [PATCH 1/2] [fix][broker] Fix incorrect unack msk count when dup ack a message (#20990) (cherry picked from commit 4facdad9c7e8f6558dfb71b230aa8260e57b559e) Signed-off-by: Zixuan Liu --- .../pulsar/broker/service/Consumer.java | 24 +++++++++++------ .../broker/service/BrokerServiceTest.java | 26 +++++++++++++++++++ 2 files changed, 42 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 8924b750eb624..a7c06d0c85d63 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -444,6 +444,7 @@ public CompletableFuture messageAcked(CommandAck ack) { private CompletableFuture individualAckNormal(CommandAck ack, Map properties) { List positionsAcked = new ArrayList<>(); long totalAckCount = 0; + boolean individualAck = false; for (int i = 0; i < ack.getMessageIdsCount(); i++) { MessageIdData msgId = ack.getMessageIdAt(i); PositionImpl position; @@ -467,14 +468,18 @@ private CompletableFuture individualAckNormal(CommandAck ack, Map producer = pulsarClient.newProducer() + .topic(topicName) + .create(); + @Cleanup + Consumer consumer1 = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName("sub-1") + .acknowledgmentGroupTime(0, TimeUnit.SECONDS) + .subscriptionType(SubscriptionType.Shared) + .isAckReceiptEnabled(true) + .subscribe(); + producer.send("1".getBytes(StandardCharsets.UTF_8)); + Message message = consumer1.receive(); + consumer1.acknowledge(message); + consumer1.acknowledge(message); + assertEquals(admin.topics().getStats(topicName).getSubscriptions() + .get("sub-1").getUnackedMessages(), 0); + } } From 540a1e3cd421a764edaf49e5d06aea98c3e93ad1 Mon Sep 17 00:00:00 2001 From: labuladong Date: Wed, 7 Dec 2022 14:48:44 +0800 Subject: [PATCH 2/2] [fix][test] flaky test `testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction` (#18726) (cherry picked from commit 2d205c93cb9e1324834e0818de0edc531b66a119) Signed-off-by: Zixuan Liu --- .../impl/KeySharedSubscriptionTest.java | 128 ++++++++---------- 1 file changed, 54 insertions(+), 74 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java index 213296d22833a..722073c38d7dd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java @@ -20,30 +20,34 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import lombok.Cleanup; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.pulsar.client.api.BatcherBuilder; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.KeySharedPolicy; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Range; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.util.Murmur3_32Hash; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; @Test(groups = "broker-impl") public class KeySharedSubscriptionTest extends ProducerConsumerBase { @@ -81,80 +85,50 @@ public void testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction(Subsc AtomicLong lastActiveTime = new AtomicLong(); AtomicBoolean canAcknowledgement = new AtomicBoolean(false); - @Cleanup - Consumer consumer1 = pulsarClient.newConsumer() - .topic(topic) - .subscriptionName("sub-1") - .subscriptionType(subscriptionType) - .consumerName("con-1") - .messageListener((cons1, msg) -> { - lastActiveTime.set(System.currentTimeMillis()); - nameToId.computeIfAbsent(cons1,(k) -> new ArrayList<>()) - .add(msg.getMessageId()); - recMessages.add(msg.getMessageId()); - if (canAcknowledgement.get()) { - try { - cons1.acknowledge(msg); - } catch (PulsarClientException e) { - throw new RuntimeException(e); - } - } - }) - .subscribe(); - @Cleanup - Consumer consumer2 = pulsarClient.newConsumer() - .topic(topic) - .subscriptionName("sub-1") - .subscriptionType(subscriptionType) - .messageListener((cons2, msg) -> { - lastActiveTime.set(System.currentTimeMillis()); - nameToId.computeIfAbsent(cons2,(k) -> new ArrayList<>()) - .add(msg.getMessageId()); - recMessages.add(msg.getMessageId()); - if (canAcknowledgement.get()) { - try { - cons2.acknowledge(msg); - } catch (PulsarClientException e) { - throw new RuntimeException(e); + List> consumerList = new ArrayList<>(); + // create 3 consumers + for (int i = 0; i < 3; i++) { + ConsumerBuilder builder = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("sub-1") + .subscriptionType(subscriptionType) + .messageListener((consumer, msg) -> { + lastActiveTime.set(System.currentTimeMillis()); + nameToId.computeIfAbsent(consumer, (k) -> new ArrayList<>()) + .add(msg.getMessageId()); + recMessages.add(msg.getMessageId()); + if (canAcknowledgement.get()) { + try { + consumer.acknowledge(msg); + } catch (PulsarClientException e) { + throw new RuntimeException(e); + } } - } - }) - .consumerName("con-2") - .subscribe(); - @Cleanup - Consumer consumer3 = pulsarClient.newConsumer() - .topic(topic) - .subscriptionName("sub-1") - .subscriptionType(subscriptionType) - .messageListener((cons3, msg) -> { - lastActiveTime.set(System.currentTimeMillis()); - nameToId.computeIfAbsent(cons3,(k) -> new ArrayList<>()) - .add(msg.getMessageId()); - recMessages.add(msg.getMessageId()); - if (canAcknowledgement.get()) { - try { - cons3.acknowledge(msg); - } catch (PulsarClientException e) { - throw new RuntimeException(e); - } - } - }) - .consumerName("con-3") - .subscribe(); + }); + + if (subscriptionType == SubscriptionType.Key_Shared) { + // ensure every consumer can be distributed messages + int hash = Murmur3_32Hash.getInstance().makeHash(("key-" + i).getBytes()) + % KeySharedPolicy.DEFAULT_HASH_RANGE_SIZE; + builder.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(hash, hash))); + } + + consumerList.add(builder.subscribe()); + } - @Cleanup Producer producer = pulsarClient.newProducer() .topic(topic) .enableBatching(true) .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS) // We chose 9 because the maximum unacked message is 10 .batchingMaxMessages(9) + .batcherBuilder(BatcherBuilder.KEY_BASED) .create(); for (int i = 0; i < totalMsg; i++) { - producer.sendAsync(UUID.randomUUID().toString() - .getBytes(StandardCharsets.UTF_8)) - .thenAccept(pubMessages::add); + byte[] msg = UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8); + producer.newMessage().key("key-" + (i % 3)).value(msg) + .sendAsync().thenAccept(pubMessages::add); } // Wait for all consumers can not read more messages. the consumers are stuck by max unacked messages. @@ -176,7 +150,7 @@ public void testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction(Subsc // Wait for all consumers to continue receiving messages. Awaitility.await() - .atMost(30, TimeUnit.SECONDS) + .atMost(15, TimeUnit.SECONDS) .pollDelay(5, TimeUnit.SECONDS) .until(() -> (System.currentTimeMillis() - lastActiveTime.get()) > TimeUnit.SECONDS.toMillis(5)); @@ -186,5 +160,11 @@ public void testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction(Subsc Assert.assertEquals(pubMessages.size(), totalMsg); Assert.assertEquals(pubMessages.size(), recMessages.size()); Assert.assertTrue(recMessages.containsAll(pubMessages)); + + // cleanup + producer.close(); + for (Consumer consumer : consumerList) { + consumer.close(); + } } }