diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java index f41a7aedd59b2..d8c491dab2906 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java @@ -52,12 +52,16 @@ public static boolean isReadableBatch(MessageMetadata metadata) { return metadata.hasNumMessagesInBatch() && metadata.getEncryptionKeysCount() == 0; } - public static List extractMessageCompactionData(RawMessage msg) + public static List extractMessageCompactionData(RawMessage msg, MessageMetadata metadata) throws IOException { checkArgument(msg.getMessageIdData().getBatchIndex() == -1); ByteBuf payload = msg.getHeadersAndPayload(); - MessageMetadata metadata = Commands.parseMessageMetadata(payload); + if (metadata == null) { + metadata = Commands.parseMessageMetadata(payload); + } else { + Commands.skipMessageMetadata(payload); + } int batchSize = metadata.getNumMessagesInBatch(); CompressionType compressionType = metadata.getCompression(); @@ -91,7 +95,16 @@ public static List> extractIdsAndKey RawMessage msg) throws IOException { List> idsAndKeysAndSize = new ArrayList<>(); - for (MessageCompactionData mcd : extractMessageCompactionData(msg)) { + for (MessageCompactionData mcd : extractMessageCompactionData(msg, null)) { + idsAndKeysAndSize.add(ImmutableTriple.of(mcd.messageId(), mcd.key(), mcd.payloadSize())); + } + return idsAndKeysAndSize; + } + + public static List> extractIdsAndKeysAndSize( + RawMessage msg, MessageMetadata metadata) throws IOException { + List> idsAndKeysAndSize = new ArrayList<>(); + for (MessageCompactionData mcd : extractMessageCompactionData(msg, metadata)) { idsAndKeysAndSize.add(ImmutableTriple.of(mcd.messageId(), mcd.key(), mcd.payloadSize())); } return idsAndKeysAndSize; @@ -99,7 +112,7 @@ public static List> extractIdsAndKey public static Optional rebatchMessage(RawMessage msg, BiPredicate filter) throws IOException { - return rebatchMessage(msg, filter, true); + return rebatchMessage(msg, null, filter, true); } /** @@ -109,6 +122,7 @@ public static Optional rebatchMessage(RawMessage msg, * NOTE: this message does not alter the reference count of the RawMessage argument. */ public static Optional rebatchMessage(RawMessage msg, + MessageMetadata metadata, BiPredicate filter, boolean retainNullKey) throws IOException { @@ -123,7 +137,11 @@ public static Optional rebatchMessage(RawMessage msg, payload.readerIndex(readerIndex); brokerMeta = payload.readSlice(brokerEntryMetadataSize + Short.BYTES + Integer.BYTES); } - MessageMetadata metadata = Commands.parseMessageMetadata(payload); + if (metadata == null) { + metadata = Commands.parseMessageMetadata(payload); + } else { + Commands.skipMessageMetadata(payload); + } ByteBuf batchBuffer = PulsarByteBufAllocator.DEFAULT.buffer(payload.capacity()); CompressionType compressionType = metadata.getCompression(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java index 5b03f270251a0..ddfe8825a8888 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java @@ -77,7 +77,7 @@ public AbstractTwoPhaseCompactor(ServiceConfiguration conf, protected abstract Map toLatestMessageIdForKey(Map latestForKey); protected abstract boolean compactMessage(String topic, Map latestForKey, - RawMessage m, MessageId id); + RawMessage m, MessageMetadata metadata, MessageId id); protected abstract boolean compactBatchMessage(String topic, Map latestForKey, @@ -147,7 +147,7 @@ private void phaseOneLoop(RawReader reader, } else if (RawBatchConverter.isReadableBatch(metadata)) { deletedMessage = compactBatchMessage(reader.getTopic(), latestForKey, m, metadata, id); } else { - deletedMessage = compactMessage(reader.getTopic(), latestForKey, m, id); + deletedMessage = compactMessage(reader.getTopic(), latestForKey, m, metadata, id); } MessageId first = firstMessageId.orElse(deletedMessage ? null : id); MessageId to = deletedMessage ? toMessageId.orElse(null) : id; @@ -239,7 +239,7 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map } else if (RawBatchConverter.isReadableBatch(metadata)) { try { messageToAdd = rebatchMessage(reader.getTopic(), - m, (key, subid) -> subid.equals(latestForKey.get(key)), + m, metadata, (key, subid) -> subid.equals(latestForKey.get(key)), topicCompactionRetainNullKey); } catch (IOException ioe) { log.info("Error decoding batch for message {}. Whole batch will be included in output", @@ -247,7 +247,7 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map messageToAdd = Optional.of(m); } } else { - Pair keyAndSize = extractKeyAndSize(m); + Pair keyAndSize = extractKeyAndSize(m, metadata); MessageId msg; if (keyAndSize == null) { messageToAdd = topicCompactionRetainNullKey ? Optional.of(m) : Optional.empty(); @@ -392,9 +392,8 @@ private CompletableFuture addToCompactedLedger(LedgerHandle lh, RawMessage return bkf; } - protected Pair extractKeyAndSize(RawMessage m) { + protected Pair extractKeyAndSize(RawMessage m, MessageMetadata msgMetadata) { ByteBuf headersAndPayload = m.getHeadersAndPayload(); - MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); if (msgMetadata.hasPartitionKey()) { int size = headersAndPayload.readableBytes(); if (msgMetadata.hasUncompressedSize()) { @@ -408,13 +407,14 @@ protected Pair extractKeyAndSize(RawMessage m) { protected Optional rebatchMessage(String topic, RawMessage msg, + MessageMetadata metadata, BiPredicate filter, boolean retainNullKey) throws IOException { if (log.isDebugEnabled()) { log.debug("Rebatching message {} for topic {}", msg.getMessageId(), topic); } - return RawBatchConverter.rebatchMessage(msg, filter, retainNullKey); + return RawBatchConverter.rebatchMessage(msg, metadata, filter, retainNullKey); } protected static class PhaseOneResult { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java index 2cd19ba15d608..db129b54533a8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java @@ -34,7 +34,6 @@ import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.impl.RawBatchConverter; import org.apache.pulsar.common.api.proto.MessageMetadata; -import org.apache.pulsar.common.protocol.Commands; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,10 +60,10 @@ protected Map toLatestMessageIdForKey( @Override protected boolean compactMessage(String topic, Map> latestForKey, - RawMessage m, MessageId id) { + RawMessage m, MessageMetadata metadata, MessageId id) { boolean deletedMessage = false; boolean replaceMessage = false; - MessageCompactionData mcd = extractMessageCompactionData(m); + MessageCompactionData mcd = extractMessageCompactionData(m, metadata); if (mcd != null) { boolean newer = Optional.ofNullable(latestForKey.get(mcd.key())) @@ -100,7 +99,7 @@ protected boolean compactBatchMessage(String topic, Map extractMessageCompactionDataFromBatch(RawMessage msg) + private List extractMessageCompactionDataFromBatch(RawMessage msg, MessageMetadata metadata) throws IOException { - return RawBatchConverter.extractMessageCompactionData(msg); + return RawBatchConverter.extractMessageCompactionData(msg, metadata); } } \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PublishingOrderCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PublishingOrderCompactor.java index a825c0782fbf9..223e8c421a5ef 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PublishingOrderCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PublishingOrderCompactor.java @@ -53,10 +53,10 @@ protected Map toLatestMessageIdForKey(Map @Override protected boolean compactMessage(String topic, Map latestForKey, - RawMessage m, MessageId id) { + RawMessage m, MessageMetadata metadata, MessageId id) { boolean deletedMessage = false; boolean replaceMessage = false; - Pair keyAndSize = extractKeyAndSize(m); + Pair keyAndSize = extractKeyAndSize(m, metadata); if (keyAndSize != null) { if (keyAndSize.getRight() > 0) { MessageId old = latestForKey.put(keyAndSize.getLeft(), id); @@ -84,7 +84,7 @@ protected boolean compactBatchMessage(String topic, Map lates int numMessagesInBatch = metadata.getNumMessagesInBatch(); int deleteCnt = 0; for (ImmutableTriple e : extractIdsAndKeysAndSizeFromBatch( - m)) { + m, metadata)) { if (e != null) { if (e.getMiddle() == null) { if (!topicCompactionRetainNullKey) { @@ -119,9 +119,9 @@ protected boolean compactBatchMessage(String topic, Map lates } protected List> extractIdsAndKeysAndSizeFromBatch( - RawMessage msg) + RawMessage msg, MessageMetadata metadata) throws IOException { - return RawBatchConverter.extractIdsAndKeysAndSize(msg); + return RawBatchConverter.extractIdsAndKeysAndSize(msg, metadata); } } \ No newline at end of file