Skip to content

Commit

Permalink
[fix][broker] Fix incorrect unack msk count when dup ack a message (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
nodece authored Mar 14, 2024
1 parent 5812b30 commit 2c85dd1
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ public CompletableFuture<Void> messageAcked(CommandAck ack) {
private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String, Long> properties) {
List<Position> positionsAcked = new ArrayList<>();
long totalAckCount = 0;
boolean individualAck = false;
for (int i = 0; i < ack.getMessageIdsCount(); i++) {
MessageIdData msgId = ack.getMessageIdAt(i);
PositionImpl position;
Expand All @@ -467,14 +468,18 @@ private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String,
} else {
position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, position, ackOwnerConsumer);
individualAck = true;
}

addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);

if (individualAck) {
if (checkCanRemovePendingAcksAndHandle(position, msgId)) {
addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
}
} else {
addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
}
positionsAcked.add(position);

checkCanRemovePendingAcksAndHandle(position, msgId);

checkAckValidationError(ack, position);

totalAckCount += ackedCount;
Expand Down Expand Up @@ -636,10 +641,11 @@ private void checkAckValidationError(CommandAck ack, PositionImpl position) {
}
}

private void checkCanRemovePendingAcksAndHandle(PositionImpl position, MessageIdData msgId) {
private boolean checkCanRemovePendingAcksAndHandle(PositionImpl position, MessageIdData msgId) {
if (Subscription.isIndividualAckMode(subType) && msgId.getAckSetsCount() == 0) {
removePendingAcks(position);
return removePendingAcks(position);
}
return false;
}

private Consumer getAckOwnerConsumer(long ledgerId, long entryId) {
Expand Down Expand Up @@ -886,7 +892,7 @@ public int hashCode() {
*
* @param position
*/
private void removePendingAcks(PositionImpl position) {
private boolean removePendingAcks(PositionImpl position) {
Consumer ackOwnedConsumer = null;
if (pendingAcks.get(position.getLedgerId(), position.getEntryId()) == null) {
for (Consumer consumer : subscription.getConsumers()) {
Expand All @@ -907,7 +913,7 @@ private void removePendingAcks(PositionImpl position) {
if (ackedPosition != null) {
if (!ackOwnedConsumer.getPendingAcks().remove(position.getLedgerId(), position.getEntryId())) {
// Message was already removed by the other consumer
return;
return false;
}
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received ack {}", topicName, subscription, consumerId, position);
Expand All @@ -921,7 +927,9 @@ private void removePendingAcks(PositionImpl position) {
ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
flowConsumerBlockedPermits(ackOwnedConsumer);
}
return true;
}
return false;
}

public ConcurrentLongLongPairHashMap getPendingAcks() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1435,4 +1435,30 @@ public void testDynamicConfigurationsForceDeleteTenantAllowed() throws Exception
assertTrue(conf.isForceDeleteTenantAllowed());
});
}

@Test
public void testDuplicateAcknowledgement() throws Exception {
final String ns = "prop/ns-test";

admin.namespaces().createNamespace(ns, 2);
final String topicName = "persistent://prop/ns-test/duplicated-acknowledgement-test";
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.create();
@Cleanup
Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName("sub-1")
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.isAckReceiptEnabled(true)
.subscribe();
producer.send("1".getBytes(StandardCharsets.UTF_8));
Message<byte[]> message = consumer1.receive();
consumer1.acknowledge(message);
consumer1.acknowledge(message);
assertEquals(admin.topics().getStats(topicName).getSubscriptions()
.get("sub-1").getUnackedMessages(), 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,35 @@
*/
package org.apache.pulsar.client.impl;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.Cleanup;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

@Test(groups = "broker-impl")
public class KeySharedSubscriptionTest extends ProducerConsumerBase {
Expand All @@ -70,91 +73,58 @@ public Object[][] subType() {
@Test(dataProvider = "subType")
public void testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction(SubscriptionType subscriptionType)
throws PulsarClientException {
PulsarClient pulsarClient = PulsarClient.builder().
serviceUrl(lookupUrl.toString())
.build();
final int totalMsg = 1000;
String topic = "broker-close-test-" + RandomStringUtils.randomAlphabetic(5);
Map<Consumer<?>, List<MessageId>> nameToId = Maps.newConcurrentMap();
Map<Consumer<?>, List<MessageId>> nameToId = new ConcurrentHashMap<>();
Set<MessageId> pubMessages = Sets.newConcurrentHashSet();
Set<MessageId> recMessages = Sets.newConcurrentHashSet();
AtomicLong lastActiveTime = new AtomicLong();
AtomicBoolean canAcknowledgement = new AtomicBoolean(false);

@Cleanup
Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub-1")
.subscriptionType(subscriptionType)
.consumerName("con-1")
.messageListener((cons1, msg) -> {
lastActiveTime.set(System.currentTimeMillis());
nameToId.computeIfAbsent(cons1,(k) -> new ArrayList<>())
.add(msg.getMessageId());
recMessages.add(msg.getMessageId());
if (canAcknowledgement.get()) {
try {
cons1.acknowledge(msg);
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
})
.subscribe();
@Cleanup
Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub-1")
.subscriptionType(subscriptionType)
.messageListener((cons2, msg) -> {
lastActiveTime.set(System.currentTimeMillis());
nameToId.computeIfAbsent(cons2,(k) -> new ArrayList<>())
.add(msg.getMessageId());
recMessages.add(msg.getMessageId());
if (canAcknowledgement.get()) {
try {
cons2.acknowledge(msg);
} catch (PulsarClientException e) {
throw new RuntimeException(e);
List<Consumer<?>> consumerList = new ArrayList<>();
// create 3 consumers
for (int i = 0; i < 3; i++) {
ConsumerBuilder<byte[]> builder = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub-1")
.subscriptionType(subscriptionType)
.messageListener((consumer, msg) -> {
lastActiveTime.set(System.currentTimeMillis());
nameToId.computeIfAbsent(consumer, (k) -> new ArrayList<>())
.add(msg.getMessageId());
recMessages.add(msg.getMessageId());
if (canAcknowledgement.get()) {
try {
consumer.acknowledge(msg);
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
}
})
.consumerName("con-2")
.subscribe();
@Cleanup
Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub-1")
.subscriptionType(subscriptionType)
.messageListener((cons3, msg) -> {
lastActiveTime.set(System.currentTimeMillis());
nameToId.computeIfAbsent(cons3,(k) -> new ArrayList<>())
.add(msg.getMessageId());
recMessages.add(msg.getMessageId());
if (canAcknowledgement.get()) {
try {
cons3.acknowledge(msg);
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
})
.consumerName("con-3")
.subscribe();
});

if (subscriptionType == SubscriptionType.Key_Shared) {
// ensure every consumer can be distributed messages
int hash = Murmur3_32Hash.getInstance().makeHash(("key-" + i).getBytes())
% KeySharedPolicy.DEFAULT_HASH_RANGE_SIZE;
builder.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(hash, hash)));
}

consumerList.add(builder.subscribe());
}

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
// We chose 9 because the maximum unacked message is 10
.batchingMaxMessages(9)
.batcherBuilder(BatcherBuilder.KEY_BASED)
.create();

for (int i = 0; i < totalMsg; i++) {
producer.sendAsync(UUID.randomUUID().toString()
.getBytes(StandardCharsets.UTF_8))
.thenAccept(pubMessages::add);
byte[] msg = UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8);
producer.newMessage().key("key-" + (i % 3)).value(msg)
.sendAsync().thenAccept(pubMessages::add);
}

// Wait for all consumers can not read more messages. the consumers are stuck by max unacked messages.
Expand All @@ -176,7 +146,7 @@ public void testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction(Subsc

// Wait for all consumers to continue receiving messages.
Awaitility.await()
.atMost(30, TimeUnit.SECONDS)
.atMost(15, TimeUnit.SECONDS)
.pollDelay(5, TimeUnit.SECONDS)
.until(() ->
(System.currentTimeMillis() - lastActiveTime.get()) > TimeUnit.SECONDS.toMillis(5));
Expand All @@ -186,5 +156,11 @@ public void testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction(Subsc
Assert.assertEquals(pubMessages.size(), totalMsg);
Assert.assertEquals(pubMessages.size(), recMessages.size());
Assert.assertTrue(recMessages.containsAll(pubMessages));

// cleanup
producer.close();
for (Consumer<?> consumer : consumerList) {
consumer.close();
}
}
}

0 comments on commit 2c85dd1

Please sign in to comment.