diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 8924b750eb624..a7c06d0c85d63 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -444,6 +444,7 @@ public CompletableFuture messageAcked(CommandAck ack) { private CompletableFuture individualAckNormal(CommandAck ack, Map properties) { List positionsAcked = new ArrayList<>(); long totalAckCount = 0; + boolean individualAck = false; for (int i = 0; i < ack.getMessageIdsCount(); i++) { MessageIdData msgId = ack.getMessageIdAt(i); PositionImpl position; @@ -467,14 +468,18 @@ private CompletableFuture individualAckNormal(CommandAck ack, Map producer = pulsarClient.newProducer() + .topic(topicName) + .create(); + @Cleanup + Consumer consumer1 = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName("sub-1") + .acknowledgmentGroupTime(0, TimeUnit.SECONDS) + .subscriptionType(SubscriptionType.Shared) + .isAckReceiptEnabled(true) + .subscribe(); + producer.send("1".getBytes(StandardCharsets.UTF_8)); + Message message = consumer1.receive(); + consumer1.acknowledge(message); + consumer1.acknowledge(message); + assertEquals(admin.topics().getStats(topicName).getSubscriptions() + .get("sub-1").getUnackedMessages(), 0); + } }