Skip to content

Commit

Permalink
[fix][txn] Ack all message ids when ack chunk messages with transacti…
Browse files Browse the repository at this point in the history
…on. (apache#21268)

### Motivation

Now, only the last chunk will be acknowledged when acknowledging chunk messages with transactions.
If the messageId is a `ChunkMessageIdImpl`, the ledger ID and entry ID will belong to the `lastChunkMsgId`.
https://github.com/apache/pulsar/blob/2b5c199053a5b2d7f849e6604d619bae9197a8c9/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L2791-L2814

https://github.com/apache/pulsar/blob/2b5c199053a5b2d7f849e6604d619bae9197a8c9/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ChunkMessageIdImpl.java#L30-L33
### Modifications

Flow the common message acknowledge logic, ack all the chunks when acknowledging messages with transactions.
  • Loading branch information
liangyepianzhou authored Nov 8, 2023
1 parent 469ce7e commit f581417
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.broker.transaction;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
Expand All @@ -39,12 +41,15 @@
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ChunkMessageIdImpl;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.TxnAction;
Expand Down Expand Up @@ -368,4 +373,45 @@ public void completed(Exception e, long ledgerId, long entryId) {
return positionList;
}

@Test
public void testAckChunkMessage() throws Exception {
String producerName = "test-producer";
String subName = "testAckChunkMessage";
@Cleanup
PulsarClient pulsarClient1 = PulsarClient.builder().serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
.enableTransaction(true).build();
@Cleanup
Producer<String> producer = pulsarClient1
.newProducer(Schema.STRING)
.producerName(producerName)
.topic(CONSUME_TOPIC)
.enableChunking(true)
.enableBatching(false)
.create();
Consumer<String> consumer = pulsarClient1
.newConsumer(Schema.STRING)
.subscriptionType(SubscriptionType.Shared)
.topic(CONSUME_TOPIC)
.subscriptionName(subName)
.subscribe();

int messageSize = 6000; // payload size in KB
String message = "a".repeat(messageSize * 1000);
MessageId messageId = producer.newMessage().value(message).send();
assertTrue(messageId instanceof ChunkMessageIdImpl);
assertNotEquals(((ChunkMessageIdImpl) messageId).getLastChunkMessageId(),
((ChunkMessageIdImpl) messageId).getFirstChunkMessageId());

Transaction transaction = pulsarClient1.newTransaction()
.withTransactionTimeout(5, TimeUnit.HOURS)
.build()
.get();

Message<String> msg = consumer.receive();
consumer.acknowledgeAsync(msg.getMessageId(), transaction);
transaction.commit().get();

Assert.assertEquals(admin.topics().getStats(CONSUME_TOPIC).getSubscriptions().get(subName)
.getUnackedMessages(), 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import static org.apache.pulsar.common.protocol.Commands.hasChecksum;
import static org.apache.pulsar.common.protocol.Commands.serializeWithSize;
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ComparisonChain;
Expand All @@ -32,6 +33,7 @@
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.Timeout;
import io.netty.util.concurrent.FastThreadLocal;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand Down Expand Up @@ -67,6 +69,7 @@
import lombok.AccessLevel;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.DeadLetterPolicy;
Expand Down Expand Up @@ -94,7 +97,9 @@
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandAck.ValidationError;
import org.apache.pulsar.common.api.proto.CommandMessage;
Expand All @@ -117,6 +122,7 @@
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.SafeCollectionUtils;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
Expand Down Expand Up @@ -2799,7 +2805,7 @@ private CompletableFuture<Void> doTransactionAcknowledgeForResponse(MessageId me
final MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
final long ledgerId = messageIdAdv.getLedgerId();
final long entryId = messageIdAdv.getEntryId();
final ByteBuf cmd;
final List<ByteBuf> cmdList;
if (MessageIdAdvUtils.isBatch(messageIdAdv)) {
BitSetRecyclable bitSetRecyclable = BitSetRecyclable.create();
bitSetRecyclable.set(0, messageIdAdv.getBatchSize());
Expand All @@ -2809,12 +2815,37 @@ private CompletableFuture<Void> doTransactionAcknowledgeForResponse(MessageId me
} else {
bitSetRecyclable.clear(messageIdAdv.getBatchIndex());
}
cmd = Commands.newAck(consumerId, ledgerId, entryId, bitSetRecyclable, ackType, validationError, properties,
txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId, messageIdAdv.getBatchSize());
cmdList = Collections.singletonList(Commands.newAck(consumerId, ledgerId, entryId, bitSetRecyclable,
ackType, validationError, properties, txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId,
messageIdAdv.getBatchSize()));
bitSetRecyclable.recycle();
} else {
cmd = Commands.newAck(consumerId, ledgerId, entryId, null, ackType, validationError, properties,
txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId);
MessageIdImpl[] chunkMsgIds = this.unAckedChunkedMessageIdSequenceMap.remove(messageIdAdv);
// cumulative ack chunk by the last messageId
if (chunkMsgIds == null || ackType == AckType.Cumulative) {
cmdList = Collections.singletonList(Commands.newAck(consumerId, ledgerId, entryId, null, ackType,
validationError, properties, txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId));
} else {
if (Commands.peerSupportsMultiMessageAcknowledgment(
getClientCnx().getRemoteEndpointProtocolVersion())) {
List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck =
new ArrayList<>(chunkMsgIds.length);
for (MessageIdImpl cMsgId : chunkMsgIds) {
if (cMsgId != null && chunkMsgIds.length > 1) {
entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null));
}
}
cmdList = Collections.singletonList(
newMultiTransactionMessageAck(consumerId, txnID, entriesToAck, requestId));
} else {
cmdList = new ArrayList<>();
for (MessageIdImpl cMsgId : chunkMsgIds) {
cmdList.add(Commands.newAck(consumerId, cMsgId.ledgerId, cMsgId.entryId, null, ackType,
validationError, properties,
txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId));
}
}
}
}

if (ackType == AckType.Cumulative) {
Expand All @@ -2828,8 +2859,55 @@ private CompletableFuture<Void> doTransactionAcknowledgeForResponse(MessageId me
.ConnectException("Failed to ack message [" + messageId + "] "
+ "for transaction [" + txnID + "] due to consumer connect fail, consumer state: " + getState()));
} else {
return cnx.newAckForReceipt(cmd, requestId);
List<CompletableFuture<Void>> completableFutures = new LinkedList<>();
cmdList.forEach(cmd -> completableFutures.add(cnx.newAckForReceipt(cmd, requestId)));
return FutureUtil.waitForAll(completableFutures);
}
}

private ByteBuf newMultiTransactionMessageAck(long consumerId, TxnID txnID,
List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries,
long requestID) {
BaseCommand cmd = newMultiMessageAckCommon(entries);
cmd.getAck()
.setConsumerId(consumerId)
.setAckType(AckType.Individual)
.setTxnidLeastBits(txnID.getLeastSigBits())
.setTxnidMostBits(txnID.getMostSigBits())
.setRequestId(requestID);
return serializeWithSize(cmd);
}

private static final FastThreadLocal<BaseCommand> LOCAL_BASE_COMMAND = new FastThreadLocal<BaseCommand>() {
@Override
protected BaseCommand initialValue() throws Exception {
return new BaseCommand();
}
};

private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries) {
BaseCommand cmd = LOCAL_BASE_COMMAND.get()
.clear()
.setType(BaseCommand.Type.ACK);
CommandAck ack = cmd.setAck();
int entriesCount = entries.size();
for (int i = 0; i < entriesCount; i++) {
long ledgerId = entries.get(i).getLeft();
long entryId = entries.get(i).getMiddle();
ConcurrentBitSetRecyclable bitSet = entries.get(i).getRight();
MessageIdData msgId = ack.addMessageId()
.setLedgerId(ledgerId)
.setEntryId(entryId);
if (bitSet != null) {
long[] ackSet = bitSet.toLongArray();
for (int j = 0; j < ackSet.length; j++) {
msgId.addAckSet(ackSet[j]);
}
bitSet.recycle();
}
}

return cmd;
}

private CompletableFuture<Void> doTransactionAcknowledgeForResponse(List<MessageId> messageIds, AckType ackType,
Expand Down

0 comments on commit f581417

Please sign in to comment.