diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java index 0e17719aca7e7..9e262d1cb5617 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java @@ -19,6 +19,8 @@ package org.apache.pulsar.broker.transaction; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertTrue; import com.google.common.collect.Sets; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -39,12 +41,15 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.client.impl.ChunkMessageIdImpl; import org.apache.pulsar.common.api.proto.MessageIdData; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.TxnAction; @@ -368,4 +373,45 @@ public void completed(Exception e, long ledgerId, long entryId) { return positionList; } + @Test + public void testAckChunkMessage() throws Exception { + String producerName = "test-producer"; + String subName = "testAckChunkMessage"; + @Cleanup + PulsarClient pulsarClient1 = PulsarClient.builder().serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl()) + .enableTransaction(true).build(); + @Cleanup + Producer producer = pulsarClient1 + .newProducer(Schema.STRING) + .producerName(producerName) + .topic(CONSUME_TOPIC) + .enableChunking(true) + .enableBatching(false) + .create(); + Consumer consumer = pulsarClient1 + .newConsumer(Schema.STRING) + .subscriptionType(SubscriptionType.Shared) + .topic(CONSUME_TOPIC) + .subscriptionName(subName) + .subscribe(); + + int messageSize = 6000; // payload size in KB + String message = "a".repeat(messageSize * 1000); + MessageId messageId = producer.newMessage().value(message).send(); + assertTrue(messageId instanceof ChunkMessageIdImpl); + assertNotEquals(((ChunkMessageIdImpl) messageId).getLastChunkMessageId(), + ((ChunkMessageIdImpl) messageId).getFirstChunkMessageId()); + + Transaction transaction = pulsarClient1.newTransaction() + .withTransactionTimeout(5, TimeUnit.HOURS) + .build() + .get(); + + Message msg = consumer.receive(); + consumer.acknowledgeAsync(msg.getMessageId(), transaction); + transaction.commit().get(); + + Assert.assertEquals(admin.topics().getStats(CONSUME_TOPIC).getSubscriptions().get(subName) + .getUnackedMessages(), 0); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 15cfeb267411d..85d6c5668d54c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; import static org.apache.pulsar.common.protocol.Commands.hasChecksum; +import static org.apache.pulsar.common.protocol.Commands.serializeWithSize; import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ComparisonChain; @@ -32,6 +33,7 @@ import io.netty.util.Recycler.Handle; import io.netty.util.ReferenceCountUtil; import io.netty.util.Timeout; +import io.netty.util.concurrent.FastThreadLocal; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -67,6 +69,7 @@ import lombok.AccessLevel; import lombok.Getter; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Triple; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.DeadLetterPolicy; @@ -94,7 +97,9 @@ import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.EncryptionContext; import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey; +import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; +import org.apache.pulsar.common.api.proto.CommandAck; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.CommandAck.ValidationError; import org.apache.pulsar.common.api.proto.CommandMessage; @@ -117,6 +122,7 @@ import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.SafeCollectionUtils; import org.apache.pulsar.common.util.collections.BitSetRecyclable; +import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; import org.slf4j.Logger; @@ -2799,7 +2805,7 @@ private CompletableFuture doTransactionAcknowledgeForResponse(MessageId me final MessageIdAdv messageIdAdv = (MessageIdAdv) messageId; final long ledgerId = messageIdAdv.getLedgerId(); final long entryId = messageIdAdv.getEntryId(); - final ByteBuf cmd; + final List cmdList; if (MessageIdAdvUtils.isBatch(messageIdAdv)) { BitSetRecyclable bitSetRecyclable = BitSetRecyclable.create(); bitSetRecyclable.set(0, messageIdAdv.getBatchSize()); @@ -2809,12 +2815,37 @@ private CompletableFuture doTransactionAcknowledgeForResponse(MessageId me } else { bitSetRecyclable.clear(messageIdAdv.getBatchIndex()); } - cmd = Commands.newAck(consumerId, ledgerId, entryId, bitSetRecyclable, ackType, validationError, properties, - txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId, messageIdAdv.getBatchSize()); + cmdList = Collections.singletonList(Commands.newAck(consumerId, ledgerId, entryId, bitSetRecyclable, + ackType, validationError, properties, txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId, + messageIdAdv.getBatchSize())); bitSetRecyclable.recycle(); } else { - cmd = Commands.newAck(consumerId, ledgerId, entryId, null, ackType, validationError, properties, - txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId); + MessageIdImpl[] chunkMsgIds = this.unAckedChunkedMessageIdSequenceMap.remove(messageIdAdv); + // cumulative ack chunk by the last messageId + if (chunkMsgIds == null || ackType == AckType.Cumulative) { + cmdList = Collections.singletonList(Commands.newAck(consumerId, ledgerId, entryId, null, ackType, + validationError, properties, txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId)); + } else { + if (Commands.peerSupportsMultiMessageAcknowledgment( + getClientCnx().getRemoteEndpointProtocolVersion())) { + List> entriesToAck = + new ArrayList<>(chunkMsgIds.length); + for (MessageIdImpl cMsgId : chunkMsgIds) { + if (cMsgId != null && chunkMsgIds.length > 1) { + entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null)); + } + } + cmdList = Collections.singletonList( + newMultiTransactionMessageAck(consumerId, txnID, entriesToAck, requestId)); + } else { + cmdList = new ArrayList<>(); + for (MessageIdImpl cMsgId : chunkMsgIds) { + cmdList.add(Commands.newAck(consumerId, cMsgId.ledgerId, cMsgId.entryId, null, ackType, + validationError, properties, + txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId)); + } + } + } } if (ackType == AckType.Cumulative) { @@ -2828,8 +2859,55 @@ private CompletableFuture doTransactionAcknowledgeForResponse(MessageId me .ConnectException("Failed to ack message [" + messageId + "] " + "for transaction [" + txnID + "] due to consumer connect fail, consumer state: " + getState())); } else { - return cnx.newAckForReceipt(cmd, requestId); + List> completableFutures = new LinkedList<>(); + cmdList.forEach(cmd -> completableFutures.add(cnx.newAckForReceipt(cmd, requestId))); + return FutureUtil.waitForAll(completableFutures); + } + } + + private ByteBuf newMultiTransactionMessageAck(long consumerId, TxnID txnID, + List> entries, + long requestID) { + BaseCommand cmd = newMultiMessageAckCommon(entries); + cmd.getAck() + .setConsumerId(consumerId) + .setAckType(AckType.Individual) + .setTxnidLeastBits(txnID.getLeastSigBits()) + .setTxnidMostBits(txnID.getMostSigBits()) + .setRequestId(requestID); + return serializeWithSize(cmd); + } + + private static final FastThreadLocal LOCAL_BASE_COMMAND = new FastThreadLocal() { + @Override + protected BaseCommand initialValue() throws Exception { + return new BaseCommand(); + } + }; + + private static BaseCommand newMultiMessageAckCommon(List> entries) { + BaseCommand cmd = LOCAL_BASE_COMMAND.get() + .clear() + .setType(BaseCommand.Type.ACK); + CommandAck ack = cmd.setAck(); + int entriesCount = entries.size(); + for (int i = 0; i < entriesCount; i++) { + long ledgerId = entries.get(i).getLeft(); + long entryId = entries.get(i).getMiddle(); + ConcurrentBitSetRecyclable bitSet = entries.get(i).getRight(); + MessageIdData msgId = ack.addMessageId() + .setLedgerId(ledgerId) + .setEntryId(entryId); + if (bitSet != null) { + long[] ackSet = bitSet.toLongArray(); + for (int j = 0; j < ackSet.length; j++) { + msgId.addAckSet(ackSet[j]); + } + bitSet.recycle(); + } } + + return cmd; } private CompletableFuture doTransactionAcknowledgeForResponse(List messageIds, AckType ackType,