Skip to content

Commit

Permalink
[fix][client] Fix deserialized BatchMessageIdImpl acknowledgment failure
Browse files Browse the repository at this point in the history
Fixes apache#19030

### Motivation

When a `BatchMessageIdImpl` is created from a deserialization, the
`BatchMessageAcker` object cannot be shared by all instances in the same
batch, which leads to an acknowledgment failure when batch index ACK is
disabled (by default).

### Modifications

Maintain a map from the `(ledger id, entry id)` pair to the
`BatchMessageAcker` in `ConsumerImpl`. If the `BatchMessageIdImpl`
doesn't carry a valid `BatchMessageAcker`, create and cache a
`BatchMessageAcker` instance and remove it when all messages in the
batch are acknowledged.

It requires a change in `MessageIdImpl#fromByteArray` that a
`BatchMessageAckerDisabled` will be created to indicate there is no
shared acker.

To avoid making code more complicated, this patch refactors the existing
code that many logics about consumer are moved from the ACK tracker to
the consumer. It also removes the `AckType` parameter when acknowledging
a list of messages.
  • Loading branch information
BewareMyPower committed Dec 22, 2022
1 parent 41edd2e commit ba52f1e
Show file tree
Hide file tree
Showing 9 changed files with 261 additions and 187 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Cleanup;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = "broker-api")
public class MessageIdSerializationTest extends ProducerConsumerBase {

@BeforeClass
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test(timeOut = 30000)
public void testSerialization() throws Exception {
String topic = "test-serialization-origin";
@Cleanup Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
.topic(topic)
.batchingMaxMessages(100)
.batchingMaxPublishDelay(1, TimeUnit.DAYS)
.create();
Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("sub")
.isAckReceiptEnabled(true)
.subscribe();

final int numMessages = 10;
for (int i = 0; i < numMessages; i++) {
producer.sendAsync(i);
}
producer.flush();
final List<MessageId> msgIds = new ArrayList<>();
for (int i = 0; i < numMessages; i++) {
msgIds.add(consumer.receive().getMessageId());
}
final AtomicLong ledgerId = new AtomicLong(-1L);
final AtomicLong entryId = new AtomicLong(-1L);
for (int i = 0; i < numMessages; i++) {
assertTrue(msgIds.get(i) instanceof BatchMessageIdImpl);
final BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgIds.get(i);
ledgerId.compareAndSet(-1L, batchMessageId.getLedgerId());
assertEquals(batchMessageId.getLedgerId(), ledgerId.get());
entryId.compareAndSet(-1L, batchMessageId.getEntryId());
assertEquals(batchMessageId.getEntryId(), entryId.get());
assertEquals(batchMessageId.getBatchSize(), numMessages);
}

final List<MessageId> deserializedMsgIds = new ArrayList<>();
for (MessageId msgId : msgIds) {
MessageId deserialized = MessageId.fromByteArray(msgId.toByteArray());
assertTrue(deserialized instanceof BatchMessageIdImpl);
deserializedMsgIds.add(deserialized);
}
for (MessageId msgId : deserializedMsgIds) {
consumer.acknowledge(msgId);
}
consumer.close();

consumer = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("sub")
.isAckReceiptEnabled(true)
.subscribe();
MessageId newMsgId = producer.send(0);
MessageId receivedMessageId = consumer.receive().getMessageId();
assertEquals(newMsgId, receivedMessageId);
consumer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@ public interface AcknowledgmentsGroupingTracker extends AutoCloseable {

CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties);

CompletableFuture<Void> addListAcknowledgment(List<MessageId> messageIds, AckType ackType,
Map<String, Long> properties);
default CompletableFuture<Void> addBatchIndexAck(BatchMessageIdImpl msgId, AckType ackType,
Map<String, Long> properties) {
return CompletableFuture.completedFuture(null);
}

CompletableFuture<Void> addListAcknowledgment(List<MessageIdImpl> messageIds, Map<String, Long> properties);

void flush();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,12 +542,12 @@ public CompletableFuture<Void> acknowledgeAsync(Messages<?> messages, Transactio

@Override
public CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList) {
return doAcknowledgeWithTxn(messageIdList, AckType.Individual, Collections.emptyMap(), null);
return doAcknowledgeWithTxn(messageIdList, Collections.emptyMap(), null);
}

@Override
public CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList, Transaction txn) {
return doAcknowledgeWithTxn(messageIdList, AckType.Individual, Collections.emptyMap(), (TransactionImpl) txn);
return doAcknowledgeWithTxn(messageIdList, Collections.emptyMap(), (TransactionImpl) txn);
}

@Override
Expand Down Expand Up @@ -655,17 +655,17 @@ public void negativeAcknowledge(Message<?> message) {
negativeAcknowledge(message.getMessageId());
}

protected CompletableFuture<Void> doAcknowledgeWithTxn(List<MessageId> messageIdList, AckType ackType,
protected CompletableFuture<Void> doAcknowledgeWithTxn(List<MessageId> messageIdList,
Map<String, Long> properties,
TransactionImpl txn) {
CompletableFuture<Void> ackFuture;
if (txn != null && this instanceof ConsumerImpl) {
ackFuture = txn.registerAckedTopic(getTopic(), subscription)
.thenCompose(ignored -> doAcknowledge(messageIdList, ackType, properties, txn));
.thenCompose(ignored -> doAcknowledge(messageIdList, AckType.Individual, properties, txn));
// register the ackFuture as part of the transaction
txn.registerAckOp(ackFuture);
} else {
ackFuture = doAcknowledge(messageIdList, ackType, properties, txn);
ackFuture = doAcknowledge(messageIdList, AckType.Individual, properties, txn);
}
return ackFuture;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,11 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.AccessLevel;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.DeadLetterPolicy;
Expand Down Expand Up @@ -204,6 +206,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle

private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
// Key is the ledger id and the entry id, entry is the acker that represents which single messages are acknowledged
private final Map<Pair<Long, Long>, BatchMessageAcker> batchMessageToAcker = new ConcurrentHashMap<>();

static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
String topic,
ConsumerConfigurationData<T> conf,
Expand Down Expand Up @@ -529,6 +534,52 @@ protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
return result;
}

private void processMessageIdBeforeAcknowledge(MessageIdImpl messageId, AckType ackType, int numMessages) {
if (ackType == AckType.Individual) {
stats.incrementNumAcksSent(numMessages);
unAckedMessageTracker.remove(messageId);
if (possibleSendToDeadLetterTopicMessages != null) {
possibleSendToDeadLetterTopicMessages.remove(messageId);
}
} else {
stats.incrementNumAcksSent(unAckedMessageTracker.removeMessagesTill(messageId));
}
}

private @Nullable MessageIdImpl getMessageIdToAcknowledge(BatchMessageIdImpl messageId, AckType ackType) {
if (conf.isBatchIndexAckEnabled()) {
return messageId;
}
final BatchMessageAcker acker;
if (messageId.getAcker() instanceof BatchMessageAckerDisabled) {
acker = batchMessageToAcker.computeIfAbsent(
Pair.of(messageId.getLedgerId(), messageId.getEntryId()),
__ -> BatchMessageAcker.newAcker(messageId.getOriginalBatchSize()));
} else {
acker = messageId.getAcker();
}
if (ackType == AckType.Individual) {
if (acker.ackIndividual(messageId.getBatchIndex())) {
batchMessageToAcker.remove(Pair.of(messageId.getLedgerId(), messageId.getEntryId()));
return messageId.toMessageIdImpl();
} else {
return null;
}
} else {
if (acker.ackCumulative(messageId.getBatchIndex())) {
batchMessageToAcker.remove(Pair.of(messageId.getLedgerId(), messageId.getEntryId()));
return messageId.toMessageIdImpl();
} else {
if (acker.isPrevBatchCumulativelyAcked()) {
return null;
} else {
acker.setPrevBatchCumulativelyAcked(true);
return messageId.prevBatchMessageId();
}
}
}
}

@Override
protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType,
Map<String, Long> properties,
Expand All @@ -549,13 +600,34 @@ protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ack
return doTransactionAcknowledgeForResponse(messageId, ackType, null, properties,
new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits()));
}
return acknowledgmentsGroupingTracker.addAcknowledgment((MessageIdImpl) messageId, ackType, properties);
if (ackType == AckType.Individual) {
onAcknowledge(messageId, null);
} else {
onAcknowledgeCumulative(messageId, null);
}
if (messageId instanceof BatchMessageIdImpl) {
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
MessageIdImpl messageIdImpl = getMessageIdToAcknowledge(batchMessageId, ackType);
if (messageIdImpl == null) {
return CompletableFuture.completedFuture(null);
} else if (messageIdImpl instanceof BatchMessageIdImpl) {
return acknowledgmentsGroupingTracker.addBatchIndexAck(
(BatchMessageIdImpl) messageIdImpl, ackType, properties);
} else {
processMessageIdBeforeAcknowledge(messageIdImpl, ackType, batchMessageId.getOriginalBatchSize());
return acknowledgmentsGroupingTracker.addAcknowledgment(messageIdImpl, ackType, properties);
}
} else {
MessageIdImpl messageIdImpl = (MessageIdImpl) messageId;
processMessageIdBeforeAcknowledge(messageIdImpl, ackType, 1);
return acknowledgmentsGroupingTracker.addAcknowledgment(messageIdImpl, ackType, properties);
}
}

@Override
protected CompletableFuture<Void> doAcknowledge(List<MessageId> messageIdList, AckType ackType,
Map<String, Long> properties, TransactionImpl txn) {

List<MessageIdImpl> messageIdListToAck = new ArrayList<>();
for (MessageId messageId : messageIdList) {
checkArgument(messageId instanceof MessageIdImpl);
}
Expand All @@ -573,7 +645,24 @@ protected CompletableFuture<Void> doAcknowledge(List<MessageId> messageIdList, A
return doTransactionAcknowledgeForResponse(messageIdList, ackType, null,
properties, new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits()));
} else {
return this.acknowledgmentsGroupingTracker.addListAcknowledgment(messageIdList, ackType, properties);
for (MessageId messageId : messageIdList) {
checkArgument(messageId instanceof MessageIdImpl);
onAcknowledge(messageId, null);
if (messageId instanceof BatchMessageIdImpl) {
MessageIdImpl messageIdImpl = getMessageIdToAcknowledge((BatchMessageIdImpl) messageId, ackType);
if (messageIdImpl != null) {
if (!(messageIdImpl instanceof BatchMessageIdImpl)) {
processMessageIdBeforeAcknowledge(messageIdImpl, ackType, 1);
}
messageIdListToAck.add(messageIdImpl);
}
} else {
MessageIdImpl messageIdImpl = (MessageIdImpl) messageId;
processMessageIdBeforeAcknowledge(messageIdImpl, ackType, 1);
messageIdListToAck.add(messageIdImpl);
}
}
return this.acknowledgmentsGroupingTracker.addListAcknowledgment(messageIdListToAck, properties);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,9 @@ public static MessageId fromByteArray(byte[] data) throws IOException {
}

MessageIdImpl messageId;
if (idData.hasBatchIndex()) {
if (idData.hasBatchSize()) {
messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(),
idData.getBatchIndex(), idData.getBatchSize(), BatchMessageAcker.newAcker(idData.getBatchSize()));
} else {
messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(),
idData.getBatchIndex());
}
if (idData.hasBatchIndex() && idData.hasBatchSize()) {
messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(),
idData.getBatchIndex(), idData.getBatchSize(), BatchMessageAckerDisabled.INSTANCE);
} else if (idData.hasFirstChunkMessageId()) {
MessageIdData firstChunkIdData = idData.getFirstChunkMessageId();
messageId = new ChunkMessageIdImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ protected CompletableFuture<Void> doAcknowledge(List<MessageId> messageIdList,
}
topicToMessageIdMap.forEach((topicPartitionName, messageIds) -> {
ConsumerImpl<T> consumer = consumers.get(topicPartitionName);
resultFutures.add(consumer.doAcknowledgeWithTxn(messageIds, ackType, properties, txn)
resultFutures.add(consumer.doAcknowledgeWithTxn(messageIds, properties, txn)
.thenAccept((res) -> messageIdList.forEach(unAckedMessageTracker::remove)));
});
return CompletableFuture.allOf(resultFutures.toArray(new CompletableFuture[0]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, AckType ac
}

@Override
public CompletableFuture<Void> addListAcknowledgment(List<MessageId> messageIds,
AckType ackType,
public CompletableFuture<Void> addListAcknowledgment(List<MessageIdImpl> messageIds,
Map<String, Long> properties) {
// no-op
return CompletableFuture.completedFuture(null);
Expand Down
Loading

0 comments on commit ba52f1e

Please sign in to comment.