Skip to content

Commit

Permalink
[improve][broker] Reducing the parse of MessageMetadata in compaction (
Browse files Browse the repository at this point in the history
…apache#23285)

Co-authored-by: Aloys Zhang <[email protected]>
  • Loading branch information
aloyszhang and Aloys Zhang authored Sep 14, 2024
1 parent 13c19b5 commit 4f96146
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,16 @@ public static boolean isReadableBatch(MessageMetadata metadata) {
return metadata.hasNumMessagesInBatch() && metadata.getEncryptionKeysCount() == 0;
}

public static List<MessageCompactionData> extractMessageCompactionData(RawMessage msg)
public static List<MessageCompactionData> extractMessageCompactionData(RawMessage msg, MessageMetadata metadata)
throws IOException {
checkArgument(msg.getMessageIdData().getBatchIndex() == -1);

ByteBuf payload = msg.getHeadersAndPayload();
MessageMetadata metadata = Commands.parseMessageMetadata(payload);
if (metadata == null) {
metadata = Commands.parseMessageMetadata(payload);
} else {
Commands.skipMessageMetadata(payload);
}
int batchSize = metadata.getNumMessagesInBatch();

CompressionType compressionType = metadata.getCompression();
Expand Down Expand Up @@ -91,15 +95,24 @@ public static List<ImmutableTriple<MessageId, String, Integer>> extractIdsAndKey
RawMessage msg)
throws IOException {
List<ImmutableTriple<MessageId, String, Integer>> idsAndKeysAndSize = new ArrayList<>();
for (MessageCompactionData mcd : extractMessageCompactionData(msg)) {
for (MessageCompactionData mcd : extractMessageCompactionData(msg, null)) {
idsAndKeysAndSize.add(ImmutableTriple.of(mcd.messageId(), mcd.key(), mcd.payloadSize()));
}
return idsAndKeysAndSize;
}

public static List<ImmutableTriple<MessageId, String, Integer>> extractIdsAndKeysAndSize(
RawMessage msg, MessageMetadata metadata) throws IOException {
List<ImmutableTriple<MessageId, String, Integer>> idsAndKeysAndSize = new ArrayList<>();
for (MessageCompactionData mcd : extractMessageCompactionData(msg, metadata)) {
idsAndKeysAndSize.add(ImmutableTriple.of(mcd.messageId(), mcd.key(), mcd.payloadSize()));
}
return idsAndKeysAndSize;
}

public static Optional<RawMessage> rebatchMessage(RawMessage msg,
BiPredicate<String, MessageId> filter) throws IOException {
return rebatchMessage(msg, filter, true);
return rebatchMessage(msg, null, filter, true);
}

/**
Expand All @@ -109,6 +122,7 @@ public static Optional<RawMessage> rebatchMessage(RawMessage msg,
* NOTE: this message does not alter the reference count of the RawMessage argument.
*/
public static Optional<RawMessage> rebatchMessage(RawMessage msg,
MessageMetadata metadata,
BiPredicate<String, MessageId> filter,
boolean retainNullKey)
throws IOException {
Expand All @@ -123,7 +137,11 @@ public static Optional<RawMessage> rebatchMessage(RawMessage msg,
payload.readerIndex(readerIndex);
brokerMeta = payload.readSlice(brokerEntryMetadataSize + Short.BYTES + Integer.BYTES);
}
MessageMetadata metadata = Commands.parseMessageMetadata(payload);
if (metadata == null) {
metadata = Commands.parseMessageMetadata(payload);
} else {
Commands.skipMessageMetadata(payload);
}
ByteBuf batchBuffer = PulsarByteBufAllocator.DEFAULT.buffer(payload.capacity());

CompressionType compressionType = metadata.getCompression();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public AbstractTwoPhaseCompactor(ServiceConfiguration conf,
protected abstract Map<String, MessageId> toLatestMessageIdForKey(Map<String, T> latestForKey);

protected abstract boolean compactMessage(String topic, Map<String, T> latestForKey,
RawMessage m, MessageId id);
RawMessage m, MessageMetadata metadata, MessageId id);


protected abstract boolean compactBatchMessage(String topic, Map<String, T> latestForKey,
Expand Down Expand Up @@ -147,7 +147,7 @@ private void phaseOneLoop(RawReader reader,
} else if (RawBatchConverter.isReadableBatch(metadata)) {
deletedMessage = compactBatchMessage(reader.getTopic(), latestForKey, m, metadata, id);
} else {
deletedMessage = compactMessage(reader.getTopic(), latestForKey, m, id);
deletedMessage = compactMessage(reader.getTopic(), latestForKey, m, metadata, id);
}
MessageId first = firstMessageId.orElse(deletedMessage ? null : id);
MessageId to = deletedMessage ? toMessageId.orElse(null) : id;
Expand Down Expand Up @@ -239,15 +239,15 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
} else if (RawBatchConverter.isReadableBatch(metadata)) {
try {
messageToAdd = rebatchMessage(reader.getTopic(),
m, (key, subid) -> subid.equals(latestForKey.get(key)),
m, metadata, (key, subid) -> subid.equals(latestForKey.get(key)),
topicCompactionRetainNullKey);
} catch (IOException ioe) {
log.info("Error decoding batch for message {}. Whole batch will be included in output",
id, ioe);
messageToAdd = Optional.of(m);
}
} else {
Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
Pair<String, Integer> keyAndSize = extractKeyAndSize(m, metadata);
MessageId msg;
if (keyAndSize == null) {
messageToAdd = topicCompactionRetainNullKey ? Optional.of(m) : Optional.empty();
Expand Down Expand Up @@ -392,9 +392,8 @@ private CompletableFuture<Void> addToCompactedLedger(LedgerHandle lh, RawMessage
return bkf;
}

protected Pair<String, Integer> extractKeyAndSize(RawMessage m) {
protected Pair<String, Integer> extractKeyAndSize(RawMessage m, MessageMetadata msgMetadata) {
ByteBuf headersAndPayload = m.getHeadersAndPayload();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
if (msgMetadata.hasPartitionKey()) {
int size = headersAndPayload.readableBytes();
if (msgMetadata.hasUncompressedSize()) {
Expand All @@ -408,13 +407,14 @@ protected Pair<String, Integer> extractKeyAndSize(RawMessage m) {


protected Optional<RawMessage> rebatchMessage(String topic, RawMessage msg,
MessageMetadata metadata,
BiPredicate<String, MessageId> filter,
boolean retainNullKey)
throws IOException {
if (log.isDebugEnabled()) {
log.debug("Rebatching message {} for topic {}", msg.getMessageId(), topic);
}
return RawBatchConverter.rebatchMessage(msg, filter, retainNullKey);
return RawBatchConverter.rebatchMessage(msg, metadata, filter, retainNullKey);
}

protected static class PhaseOneResult<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.impl.RawBatchConverter;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -61,10 +60,10 @@ protected Map<String, MessageId> toLatestMessageIdForKey(

@Override
protected boolean compactMessage(String topic, Map<String, Pair<MessageId, Long>> latestForKey,
RawMessage m, MessageId id) {
RawMessage m, MessageMetadata metadata, MessageId id) {
boolean deletedMessage = false;
boolean replaceMessage = false;
MessageCompactionData mcd = extractMessageCompactionData(m);
MessageCompactionData mcd = extractMessageCompactionData(m, metadata);

if (mcd != null) {
boolean newer = Optional.ofNullable(latestForKey.get(mcd.key()))
Expand Down Expand Up @@ -100,7 +99,7 @@ protected boolean compactBatchMessage(String topic, Map<String, Pair<MessageId,
int numMessagesInBatch = metadata.getNumMessagesInBatch();
int deleteCnt = 0;

for (MessageCompactionData mcd : extractMessageCompactionDataFromBatch(m)) {
for (MessageCompactionData mcd : extractMessageCompactionDataFromBatch(m, metadata)) {
if (mcd.key() == null) {
if (!topicCompactionRetainNullKey) {
// record delete null-key message event
Expand Down Expand Up @@ -139,23 +138,22 @@ protected boolean compactBatchMessage(String topic, Map<String, Pair<MessageId,
return deletedMessage;
}

protected MessageCompactionData extractMessageCompactionData(RawMessage m) {
protected MessageCompactionData extractMessageCompactionData(RawMessage m, MessageMetadata metadata) {
ByteBuf headersAndPayload = m.getHeadersAndPayload();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
if (msgMetadata.hasPartitionKey()) {
if (metadata.hasPartitionKey()) {
int size = headersAndPayload.readableBytes();
if (msgMetadata.hasUncompressedSize()) {
size = msgMetadata.getUncompressedSize();
if (metadata.hasUncompressedSize()) {
size = metadata.getUncompressedSize();
}
return new MessageCompactionData(m.getMessageId(), msgMetadata.getPartitionKey(),
size, msgMetadata.getEventTime());
return new MessageCompactionData(m.getMessageId(), metadata.getPartitionKey(),
size, metadata.getEventTime());
} else {
return null;
}
}

private List<MessageCompactionData> extractMessageCompactionDataFromBatch(RawMessage msg)
private List<MessageCompactionData> extractMessageCompactionDataFromBatch(RawMessage msg, MessageMetadata metadata)
throws IOException {
return RawBatchConverter.extractMessageCompactionData(msg);
return RawBatchConverter.extractMessageCompactionData(msg, metadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ protected Map<String, MessageId> toLatestMessageIdForKey(Map<String, MessageId>

@Override
protected boolean compactMessage(String topic, Map<String, MessageId> latestForKey,
RawMessage m, MessageId id) {
RawMessage m, MessageMetadata metadata, MessageId id) {
boolean deletedMessage = false;
boolean replaceMessage = false;
Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
Pair<String, Integer> keyAndSize = extractKeyAndSize(m, metadata);
if (keyAndSize != null) {
if (keyAndSize.getRight() > 0) {
MessageId old = latestForKey.put(keyAndSize.getLeft(), id);
Expand Down Expand Up @@ -84,7 +84,7 @@ protected boolean compactBatchMessage(String topic, Map<String, MessageId> lates
int numMessagesInBatch = metadata.getNumMessagesInBatch();
int deleteCnt = 0;
for (ImmutableTriple<MessageId, String, Integer> e : extractIdsAndKeysAndSizeFromBatch(
m)) {
m, metadata)) {
if (e != null) {
if (e.getMiddle() == null) {
if (!topicCompactionRetainNullKey) {
Expand Down Expand Up @@ -119,9 +119,9 @@ protected boolean compactBatchMessage(String topic, Map<String, MessageId> lates
}

protected List<ImmutableTriple<MessageId, String, Integer>> extractIdsAndKeysAndSizeFromBatch(
RawMessage msg)
RawMessage msg, MessageMetadata metadata)
throws IOException {
return RawBatchConverter.extractIdsAndKeysAndSize(msg);
return RawBatchConverter.extractIdsAndKeysAndSize(msg, metadata);
}

}

0 comments on commit 4f96146

Please sign in to comment.