From 3cce53da0d82ad8689667f3169c0e22f7826d4d4 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 11 Oct 2024 22:49:20 +0800 Subject: [PATCH 1/6] [improve][broker] Add getLastMessagePosition method to TopicCompactionService for flexible customization --- .../pulsar/broker/service/ServerCnx.java | 274 ++++++------------ .../PulsarTopicCompactionService.java | 58 ++++ .../compaction/TopicCompactionService.java | 17 ++ 3 files changed, 160 insertions(+), 189 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 37b431e833983..cda38ac98fb4c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -42,7 +42,6 @@ import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.ScheduledFuture; -import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.ArrayList; @@ -56,6 +55,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; @@ -137,7 +137,6 @@ import org.apache.pulsar.common.api.proto.CommandUnsubscribe; import org.apache.pulsar.common.api.proto.CommandWatchTopicList; import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose; -import org.apache.pulsar.common.api.proto.CompressionType; import org.apache.pulsar.common.api.proto.FeatureFlags; import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.api.proto.KeySharedMode; @@ -148,10 +147,7 @@ import org.apache.pulsar.common.api.proto.ProtocolVersion; import org.apache.pulsar.common.api.proto.Schema; import org.apache.pulsar.common.api.proto.ServerError; -import org.apache.pulsar.common.api.proto.SingleMessageMetadata; import org.apache.pulsar.common.api.proto.TxnAction; -import org.apache.pulsar.common.compression.CompressionCodec; -import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.common.intercept.InterceptException; import org.apache.pulsar.common.lookup.data.LookupData; import org.apache.pulsar.common.naming.Metadata; @@ -178,6 +174,7 @@ import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; import org.apache.pulsar.common.util.netty.NettyChannelUtil; import org.apache.pulsar.common.util.netty.NettyFutureUtil; +import org.apache.pulsar.compaction.TopicCompactionService; import org.apache.pulsar.functions.utils.Exceptions; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; @@ -2252,208 +2249,107 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId) if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) { Consumer consumer = consumerFuture.getNow(null); long requestId = getLastMessageId.getRequestId(); + final var readCompacted = consumer.readCompacted(); Topic topic = consumer.getSubscription().getTopic(); - topic.checkIfTransactionBufferRecoverCompletely() - .thenCompose(__ -> topic.getLastDispatchablePosition()) - .thenApply(lastPosition -> { - int partitionIndex = TopicName.getPartitionIndex(topic.getName()); - - Position markDeletePosition = null; - if (consumer.getSubscription() instanceof PersistentSubscription) { - markDeletePosition = ((PersistentSubscription) consumer.getSubscription()).getCursor() - .getMarkDeletedPosition(); - } - - getLargestBatchIndexWhenPossible( - topic, - lastPosition, - markDeletePosition, - partitionIndex, - requestId, - consumer.getSubscription().getName(), - consumer.readCompacted()); + topic.checkIfTransactionBufferRecoverCompletely().whenComplete((__, e) -> { + if (e != null) { + writeAndFlush(Commands.newError(requestId, ServerError.UnknownError, + "Failed to recover Transaction Buffer: " + e.getMessage())); + return; + } + topic.getLastDispatchablePosition().thenCompose(lastPosition -> { + final int partitionIndex = TopicName.getPartitionIndex(topic.getName()); + final Position markDeletePosition; + if (consumer.getSubscription() instanceof PersistentSubscription) { + markDeletePosition = ((PersistentSubscription) consumer.getSubscription()).getCursor() + .getMarkDeletedPosition(); + } else { + markDeletePosition = null; + } + final var persistentTopic = (PersistentTopic) topic; + final var compactionService = persistentTopic.getTopicCompactionService(); + final var ml = persistentTopic.getManagedLedger(); + final var compactionHorizonFuture = readCompacted + ? compactionService.getLastCompactedPosition() + : CompletableFuture.completedFuture(null); + return compactionHorizonFuture.thenCompose(compactionHorizon -> { + // there is no entry in the original topic + if (lastPosition.getEntryId() == -1 + || !ml.getLedgersInfo().containsKey(lastPosition.getLedgerId())) { + return compactionHorizon != null ? compactionService.getLastMessagePosition() + : TopicCompactionService.MessagePosition.EARLIEST_FUTURE; + } + if (compactionHorizon != null && lastPosition.compareTo(compactionHorizon) <= 0) { + return compactionService.getLastMessagePosition(); + } + + // For a valid position, we read the entry out and parse the batch size from its metadata. + final var future = getLastMessagePositionFromManagedLedger(ml, lastPosition, readCompacted); + return future.thenCompose(position -> { + if (position == null) { + // readCompacted is true and the managed ledger is corrupted + return compactionService.getLastMessagePosition(); + } + return CompletableFuture.completedFuture(position); + }); + }).thenAccept(lastMessageId -> { + final var response = Commands.newGetLastMessageIdResponse(requestId, lastMessageId.ledgerId(), + lastMessageId.entryId(), partitionIndex, lastMessageId.batchIndex(), + markDeletePosition != null ? markDeletePosition.getLedgerId() : -1, + markDeletePosition != null ? markDeletePosition.getEntryId() : -1); + writeAndFlush(response); + }); + }).exceptionally(throwable -> { + log.error("[{}][{}][{}] Failed to get last message id (readCompacted: {})", remoteAddress, + topic.getName(), consumer.getSubscription().getName(), readCompacted, throwable); + writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, throwable.getMessage())); return null; - }).exceptionally(e -> { - writeAndFlush(Commands.newError(getLastMessageId.getRequestId(), - ServerError.UnknownError, "Failed to recover Transaction Buffer.")); - return null; - }); + }); + }); } else { writeAndFlush(Commands.newError(getLastMessageId.getRequestId(), ServerError.MetadataError, "Consumer not found")); } } - private void getLargestBatchIndexWhenPossible( - Topic topic, - Position lastPosition, - Position markDeletePosition, - int partitionIndex, - long requestId, - String subscriptionName, - boolean readCompacted) { - - PersistentTopic persistentTopic = (PersistentTopic) topic; - ManagedLedger ml = persistentTopic.getManagedLedger(); - - // If it's not pointing to a valid entry, respond messageId of the current position. - // If the compaction cursor reach the end of the topic, respond messageId from compacted ledger - CompletableFuture compactionHorizonFuture = readCompacted - ? persistentTopic.getTopicCompactionService().getLastCompactedPosition() : - CompletableFuture.completedFuture(null); - - compactionHorizonFuture.whenComplete((compactionHorizon, ex) -> { - if (ex != null) { - log.error("Failed to get compactionHorizon.", ex); - writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, ex.getMessage())); - return; + private CompletableFuture getLastMessagePositionFromManagedLedger( + ManagedLedger managedLedger, Position lastPosition, boolean readCompacted) { + // For a valid position, we read the entry out and parse the batch size from its metadata. + CompletableFuture entryFuture = new CompletableFuture<>(); + managedLedger.asyncReadEntry(lastPosition, new AsyncCallbacks.ReadEntryCallback() { + @Override + public void readEntryComplete(Entry entry, Object ctx) { + entryFuture.complete(entry); } - - if (lastPosition.getEntryId() == -1 || !ml.getLedgersInfo().containsKey(lastPosition.getLedgerId())) { - // there is no entry in the original topic - if (compactionHorizon != null) { - // if readCompacted is true, we need to read the last entry from compacted topic - handleLastMessageIdFromCompactionService(persistentTopic, requestId, partitionIndex, - markDeletePosition); - } else { - // if readCompacted is false, we need to return MessageId.earliest - writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, -1, -1, partitionIndex, -1, - markDeletePosition != null ? markDeletePosition.getLedgerId() : -1, - markDeletePosition != null ? markDeletePosition.getEntryId() : -1)); - } - return; + @Override + public void readEntryFailed(ManagedLedgerException exception, Object ctx) { + entryFuture.completeExceptionally(exception); } - if (compactionHorizon != null && lastPosition.compareTo(compactionHorizon) <= 0) { - handleLastMessageIdFromCompactionService(persistentTopic, requestId, partitionIndex, - markDeletePosition); - return; + @Override + public String toString() { + return String.format("ServerCnx [%s] get largest batch index when possible", + ServerCnx.this); } - - // For a valid position, we read the entry out and parse the batch size from its metadata. - CompletableFuture entryFuture = new CompletableFuture<>(); - ml.asyncReadEntry(lastPosition, new AsyncCallbacks.ReadEntryCallback() { - @Override - public void readEntryComplete(Entry entry, Object ctx) { - entryFuture.complete(entry); - } - - @Override - public void readEntryFailed(ManagedLedgerException exception, Object ctx) { - entryFuture.completeExceptionally(exception); - } - - @Override - public String toString() { - return String.format("ServerCnx [%s] get largest batch index when possible", - ServerCnx.this.toString()); - } - }, null); - - CompletableFuture batchSizeFuture = entryFuture.thenApply(entry -> { + }, null); + return entryFuture.thenApply(entry -> { + try { MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); - int batchSize = metadata.getNumMessagesInBatch(); + final var batchSize = metadata.hasNumMessagesInBatch() ? metadata.getNumMessagesInBatch() : 0; + return new TopicCompactionService.MessagePosition(entry.getLedgerId(), entry.getEntryId(), batchSize - 1); + } finally { entry.release(); - return metadata.hasNumMessagesInBatch() ? batchSize : -1; - }); - - batchSizeFuture.whenComplete((batchSize, e) -> { - if (e != null) { - if (e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException - && readCompacted) { - handleLastMessageIdFromCompactionService(persistentTopic, requestId, partitionIndex, - markDeletePosition); - } else { - writeAndFlush(Commands.newError( - requestId, ServerError.MetadataError, - "Failed to get batch size for entry " + e.getMessage())); - } - } else { - int largestBatchIndex = batchSize > 0 ? batchSize - 1 : -1; - - if (log.isDebugEnabled()) { - log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress, - topic.getName(), subscriptionName, lastPosition, partitionIndex); - } - - writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, lastPosition.getLedgerId(), - lastPosition.getEntryId(), partitionIndex, largestBatchIndex, - markDeletePosition != null ? markDeletePosition.getLedgerId() : -1, - markDeletePosition != null ? markDeletePosition.getEntryId() : -1)); - } - }); - }); - } - - private void handleLastMessageIdFromCompactionService(PersistentTopic persistentTopic, long requestId, - int partitionIndex, Position markDeletePosition) { - persistentTopic.getTopicCompactionService().readLastCompactedEntry().thenAccept(entry -> { - if (entry != null) { - try { - // in this case, all the data has been compacted, so return the last position - // in the compacted ledger to the client - ByteBuf payload = entry.getDataBuffer(); - MessageMetadata metadata = Commands.parseMessageMetadata(payload); - int largestBatchIndex; - try { - largestBatchIndex = calculateTheLastBatchIndexInBatch(metadata, payload); - } catch (IOException ioEx) { - writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, - "Failed to deserialize batched message from the last entry of the compacted Ledger: " - + ioEx.getMessage())); - return; - } - writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, - entry.getLedgerId(), entry.getEntryId(), partitionIndex, largestBatchIndex, - markDeletePosition != null ? markDeletePosition.getLedgerId() : -1, - markDeletePosition != null ? markDeletePosition.getEntryId() : -1)); - } finally { - entry.release(); - } - } else { - // in this case, the ledgers been removed except the current ledger - // and current ledger without any data - writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, - -1, -1, partitionIndex, -1, - markDeletePosition != null ? markDeletePosition.getLedgerId() : -1, - markDeletePosition != null ? markDeletePosition.getEntryId() : -1)); } - }).exceptionally(ex -> { - writeAndFlush(Commands.newError( - requestId, ServerError.MetadataError, - "Failed to read last entry of the compacted Ledger " - + ex.getCause().getMessage())); - return null; - }); - } - - private int calculateTheLastBatchIndexInBatch(MessageMetadata metadata, ByteBuf payload) throws IOException { - int batchSize = metadata.getNumMessagesInBatch(); - if (batchSize <= 1){ - return -1; - } - if (metadata.hasCompression()) { - var tmp = payload; - CompressionType compressionType = metadata.getCompression(); - CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType); - int uncompressedSize = metadata.getUncompressedSize(); - payload = codec.decode(payload, uncompressedSize); - tmp.release(); - } - SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata(); - int lastBatchIndexInBatch = -1; - for (int i = 0; i < batchSize; i++){ - ByteBuf singleMessagePayload = - Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, i, batchSize); - singleMessagePayload.release(); - if (singleMessageMetadata.isCompactedOut()){ - continue; + }).exceptionally(e -> { + final var unwrapped = FutureUtil.unwrapCompletionException(e); + if (readCompacted + && unwrapped instanceof ManagedLedgerException.NonRecoverableLedgerException) { + return null; } - lastBatchIndexInBatch = i; - } - return lastBatchIndexInBatch; + throw new CompletionException(unwrapped); + }); } private CompletableFuture isNamespaceOperationAllowed(NamespaceName namespaceName, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java index 27efcf9524f8f..abf104a66a5c4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java @@ -22,12 +22,14 @@ import static org.apache.pulsar.compaction.CompactedTopicImpl.COMPACT_LEDGER_EMPTY; import static org.apache.pulsar.compaction.CompactedTopicImpl.NEWER_THAN_COMPACTED; import static org.apache.pulsar.compaction.CompactedTopicImpl.findStartPoint; +import io.netty.buffer.ByteBuf; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.NoSuchElementException; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.function.Predicate; import java.util.function.Supplier; import javax.annotation.Nonnull; @@ -35,6 +37,11 @@ import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; +import org.apache.pulsar.common.api.proto.CompressionType; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.api.proto.SingleMessageMetadata; +import org.apache.pulsar.common.compression.CompressionCodec; +import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.FutureUtil; @@ -136,4 +143,55 @@ public CompactedTopicImpl getCompactedTopic() { public void close() throws IOException { // noop } + + @Override + public CompletableFuture getLastMessagePosition() { + return readLastCompactedEntry().thenApply(entry -> { + if (entry == null) { + return MessagePosition.EARLIEST; + } + try { + // in this case, all the data has been compacted, so return the last position + // in the compacted ledger to the client + ByteBuf payload = entry.getDataBuffer(); + MessageMetadata metadata = Commands.parseMessageMetadata(payload); + try { + final var batchIndex = calculateTheLastBatchIndexInBatch(metadata, payload); + return new MessagePosition(entry.getLedgerId(), entry.getEntryId(), batchIndex); + } catch (IOException e) { + throw new CompletionException(new IOException("Failed to deserialize batched message from " + + "the last entry of the compacted ledger: " + e.getMessage())); + } + } finally { + entry.release(); + } + }); + } + + private static int calculateTheLastBatchIndexInBatch(MessageMetadata metadata, ByteBuf payload) throws IOException { + int batchSize = metadata.getNumMessagesInBatch(); + if (batchSize <= 1){ + return -1; + } + if (metadata.hasCompression()) { + var tmp = payload; + CompressionType compressionType = metadata.getCompression(); + CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType); + int uncompressedSize = metadata.getUncompressedSize(); + payload = codec.decode(payload, uncompressedSize); + tmp.release(); + } + SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata(); + int lastBatchIndexInBatch = -1; + for (int i = 0; i < batchSize; i++){ + ByteBuf singleMessagePayload = + Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, i, batchSize); + singleMessagePayload.release(); + if (singleMessageMetadata.isCompactedOut()){ + continue; + } + lastBatchIndexInBatch = i; + } + return lastBatchIndexInBatch; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java index fdd6bebbdec33..e57ad2df70b75 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java @@ -77,4 +77,21 @@ public interface TopicCompactionService extends AutoCloseable { * @return the first entry that greater or equal to target entryIndex, this entry can be null. */ CompletableFuture findEntryByEntryIndex(long entryIndex); + + /** + * Get the last message's position of the original topic. + */ + CompletableFuture getLastMessagePosition(); + + /** + * The position of a message. + * It adds a new field to {@link Position} that represents the batch index of the message in the entry. The batch + * index is -1 for a non-batched message or a non-positive integer when the entry is a batched message. + */ + record MessagePosition(long ledgerId, long entryId, int batchIndex) { + + public static final MessagePosition EARLIEST = new MessagePosition(-1L, -1L, -1); + public static final CompletableFuture EARLIEST_FUTURE = + CompletableFuture.completedFuture(EARLIEST); + } } From ed48100119680a7d4fa88c2add523318abd3fffd Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Sat, 12 Oct 2024 10:06:32 +0800 Subject: [PATCH 2/6] Fix checkstyle --- .../main/java/org/apache/pulsar/broker/service/ServerCnx.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index cda38ac98fb4c..2a0ad9e441d7d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2338,7 +2338,8 @@ public String toString() { try { MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); final var batchSize = metadata.hasNumMessagesInBatch() ? metadata.getNumMessagesInBatch() : 0; - return new TopicCompactionService.MessagePosition(entry.getLedgerId(), entry.getEntryId(), batchSize - 1); + return new TopicCompactionService.MessagePosition(entry.getLedgerId(), entry.getEntryId(), + batchSize - 1); } finally { entry.release(); } From d865ce931ce6c3d8036012d70396b4832459dbd3 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Sat, 12 Oct 2024 10:22:21 +0800 Subject: [PATCH 3/6] Make readLastCompactedEntry private --- .../admin/impl/PersistentTopicsBase.java | 20 +++++++------------ .../pulsar/broker/service/ServerCnx.java | 2 +- .../PulsarTopicCompactionService.java | 6 +++--- .../compaction/TopicCompactionService.java | 12 +++-------- .../pulsar/compaction/CompactorTest.java | 5 ++--- 5 files changed, 16 insertions(+), 29 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 6070093cc3585..e2ae8b4d4640a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -149,6 +149,7 @@ import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.BitSetRecyclable; import org.apache.pulsar.compaction.Compactor; +import org.apache.pulsar.compaction.TopicCompactionService; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.slf4j.Logger; @@ -2809,21 +2810,14 @@ protected CompletableFuture internalGetMessageIdByTimestampAsync(long } final PersistentTopic persistentTopic = (PersistentTopic) topic; - return persistentTopic.getTopicCompactionService().readLastCompactedEntry().thenCompose(lastEntry -> { - if (lastEntry == null) { + return persistentTopic.getTopicCompactionService().getLastMessagePosition().thenCompose(position -> { + if (position == TopicCompactionService.MessagePosition.EARLIEST) { return findMessageIdByPublishTime(timestamp, persistentTopic.getManagedLedger()); } - MessageMetadata metadata; - Position position = lastEntry.getPosition(); - try { - metadata = Commands.parseMessageMetadata(lastEntry.getDataBuffer()); - } finally { - lastEntry.release(); - } - if (timestamp == metadata.getPublishTime()) { - return CompletableFuture.completedFuture(new MessageIdImpl(position.getLedgerId(), - position.getEntryId(), topicName.getPartitionIndex())); - } else if (timestamp < metadata.getPublishTime()) { + if (timestamp == position.publishTime()) { + return CompletableFuture.completedFuture(new MessageIdImpl(position.ledgerId(), + position.entryId(), topicName.getPartitionIndex())); + } else if (timestamp < position.publishTime()) { return persistentTopic.getTopicCompactionService().findEntryByPublishTime(timestamp) .thenApply(compactedEntry -> { try { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 2a0ad9e441d7d..d1ef2caf81cc3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2339,7 +2339,7 @@ public String toString() { MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); final var batchSize = metadata.hasNumMessagesInBatch() ? metadata.getNumMessagesInBatch() : 0; return new TopicCompactionService.MessagePosition(entry.getLedgerId(), entry.getEntryId(), - batchSize - 1); + batchSize - 1, metadata.getPublishTime()); } finally { entry.release(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java index abf104a66a5c4..28ab739f27966 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java @@ -105,8 +105,7 @@ public CompletableFuture> readCompactedEntries(@Nonnull Position sta return resultFuture; } - @Override - public CompletableFuture readLastCompactedEntry() { + private CompletableFuture readLastCompactedEntry() { return compactedTopic.readLastEntryOfCompactedLedger(); } @@ -157,7 +156,8 @@ public CompletableFuture getLastMessagePosition() { MessageMetadata metadata = Commands.parseMessageMetadata(payload); try { final var batchIndex = calculateTheLastBatchIndexInBatch(metadata, payload); - return new MessagePosition(entry.getLedgerId(), entry.getEntryId(), batchIndex); + final var publishTime = metadata.getPublishTime(); + return new MessagePosition(entry.getLedgerId(), entry.getEntryId(), batchIndex, publishTime); } catch (IOException e) { throw new CompletionException(new IOException("Failed to deserialize batched message from " + "the last entry of the compacted ledger: " + e.getMessage())); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java index e57ad2df70b75..9ff4e4c5d147a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java @@ -47,13 +47,6 @@ public interface TopicCompactionService extends AutoCloseable { */ CompletableFuture> readCompactedEntries(@Nonnull Position startPosition, int numberOfEntriesToRead); - /** - * Read the last compacted entry from the TopicCompactionService. - * - * @return a future that will be completed with the compacted last entry, this entry can be null. - */ - CompletableFuture readLastCompactedEntry(); - /** * Get the last compacted position from the TopicCompactionService. * @@ -80,6 +73,7 @@ public interface TopicCompactionService extends AutoCloseable { /** * Get the last message's position of the original topic. + * {@link MessagePosition#EARLIEST} will be returned if the last message does not exist. */ CompletableFuture getLastMessagePosition(); @@ -88,9 +82,9 @@ public interface TopicCompactionService extends AutoCloseable { * It adds a new field to {@link Position} that represents the batch index of the message in the entry. The batch * index is -1 for a non-batched message or a non-positive integer when the entry is a batched message. */ - record MessagePosition(long ledgerId, long entryId, int batchIndex) { + record MessagePosition(long ledgerId, long entryId, int batchIndex, long publishTime) { - public static final MessagePosition EARLIEST = new MessagePosition(-1L, -1L, -1); + public static final MessagePosition EARLIEST = new MessagePosition(-1L, -1L, -1, 0L); public static final CompletableFuture EARLIEST_FUTURE = CompletableFuture.completedFuture(EARLIEST); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java index 5cf7d33200d66..7697bcac59f7c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java @@ -45,7 +45,6 @@ import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.pulsar.broker.BrokerTestUtil; @@ -400,10 +399,10 @@ public void testCompactedWithConcurrentSend() throws Exception { }); Position lastCompactedPosition = topicCompactionService.getLastCompactedPosition().get(); - Entry lastCompactedEntry = topicCompactionService.readLastCompactedEntry().get(); + final var lastPosition = topicCompactionService.getLastMessagePosition().get(); Assert.assertTrue(PositionFactory.create(lastCompactedPosition.getLedgerId(), lastCompactedPosition.getEntryId()) - .compareTo(lastCompactedEntry.getLedgerId(), lastCompactedEntry.getEntryId()) >= 0); + .compareTo(lastPosition.ledgerId(), lastPosition.entryId()) >= 0); future.join(); } From e74400175fc28937e5d189ac18ae8ed7833bdeac Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Sat, 12 Oct 2024 10:27:51 +0800 Subject: [PATCH 4/6] Remove findEntryByEntryIndex --- .../PulsarTopicCompactionService.java | 13 ---------- .../compaction/TopicCompactionService.java | 9 ------- .../TopicCompactionServiceTest.java | 24 +++---------------- 3 files changed, 3 insertions(+), 43 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java index 28ab739f27966..1586c64821a34 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java @@ -36,7 +36,6 @@ import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; -import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; import org.apache.pulsar.common.api.proto.CompressionType; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.SingleMessageMetadata; @@ -122,18 +121,6 @@ public CompletableFuture findEntryByPublishTime(long publishTime) { return compactedTopic.findFirstMatchEntry(predicate); } - @Override - public CompletableFuture findEntryByEntryIndex(long entryIndex) { - final Predicate predicate = entry -> { - BrokerEntryMetadata brokerEntryMetadata = Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer()); - if (brokerEntryMetadata == null || !brokerEntryMetadata.hasIndex()) { - return false; - } - return brokerEntryMetadata.getIndex() >= entryIndex; - }; - return compactedTopic.findFirstMatchEntry(predicate); - } - public CompactedTopicImpl getCompactedTopic() { return compactedTopic; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java index 9ff4e4c5d147a..8a62d93d29f43 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java @@ -62,15 +62,6 @@ public interface TopicCompactionService extends AutoCloseable { */ CompletableFuture findEntryByPublishTime(long publishTime); - /** - * Find the first entry that greater or equal to target entryIndex, - * if an entry that broker entry metadata is missed, then it will be skipped and find the next match entry. - * - * @param entryIndex the index of entry. - * @return the first entry that greater or equal to target entryIndex, this entry can be null. - */ - CompletableFuture findEntryByEntryIndex(long entryIndex); - /** * Get the last message's position of the original topic. * {@link MessagePosition#EARLIEST} will be returned if the last message does not exist. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java index 9f33479ce4cab..6f699d4dad4cd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java @@ -40,8 +40,6 @@ import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.impl.MessageImpl; -import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; -import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.protocol.Commands; @@ -173,27 +171,11 @@ public void test() throws Exception { List entries2 = service.readCompactedEntries(PositionFactory.EARLIEST, 1).join(); assertEquals(entries2.size(), 1); - Entry entry = service.findEntryByEntryIndex(0).join(); - BrokerEntryMetadata brokerEntryMetadata = Commands.peekBrokerEntryMetadataIfExist(entry.getDataBuffer()); + final var entry = service.findEntryByPublishTime(startTime).join(); + final var brokerEntryMetadata = Commands.peekBrokerEntryMetadataIfExist(entry.getDataBuffer()); assertNotNull(brokerEntryMetadata); assertEquals(brokerEntryMetadata.getIndex(), 2); - MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); - assertEquals(metadata.getPartitionKey(), "a"); - entry.release(); - - entry = service.findEntryByEntryIndex(3).join(); - brokerEntryMetadata = Commands.peekBrokerEntryMetadataIfExist(entry.getDataBuffer()); - assertNotNull(brokerEntryMetadata); - assertEquals(brokerEntryMetadata.getIndex(), 4); - metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); - assertEquals(metadata.getPartitionKey(), "b"); - entry.release(); - - entry = service.findEntryByPublishTime(startTime).join(); - brokerEntryMetadata = Commands.peekBrokerEntryMetadataIfExist(entry.getDataBuffer()); - assertNotNull(brokerEntryMetadata); - assertEquals(brokerEntryMetadata.getIndex(), 2); - metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); + final var metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); assertEquals(metadata.getPartitionKey(), "a"); entry.release(); } From 193d4995f6aec8680bb16dd885a99def9ba0a87b Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Sat, 12 Oct 2024 10:57:51 +0800 Subject: [PATCH 5/6] Fix failed test --- .../main/java/org/apache/pulsar/broker/service/ServerCnx.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index d1ef2caf81cc3..5c50243e87f02 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2255,7 +2255,7 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId) topic.checkIfTransactionBufferRecoverCompletely().whenComplete((__, e) -> { if (e != null) { writeAndFlush(Commands.newError(requestId, ServerError.UnknownError, - "Failed to recover Transaction Buffer: " + e.getMessage())); + "Failed to recover Transaction Buffer.")); return; } topic.getLastDispatchablePosition().thenCompose(lastPosition -> { From 415f38c613efc3f90e6d93c40506cea0f7587f8d Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Sat, 12 Oct 2024 11:39:56 +0800 Subject: [PATCH 6/6] Change findEntryByPublishTime to findPositionByPublishTime --- .../broker/admin/impl/PersistentTopicsBase.java | 12 +++--------- .../compaction/PulsarTopicCompactionService.java | 11 +++++------ .../compaction/TopicCompactionService.java | 11 +++++++---- .../compaction/TopicCompactionServiceTest.java | 16 +++++++--------- 4 files changed, 22 insertions(+), 28 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index e2ae8b4d4640a..975a8cbd11a64 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2818,15 +2818,9 @@ protected CompletableFuture internalGetMessageIdByTimestampAsync(long return CompletableFuture.completedFuture(new MessageIdImpl(position.ledgerId(), position.entryId(), topicName.getPartitionIndex())); } else if (timestamp < position.publishTime()) { - return persistentTopic.getTopicCompactionService().findEntryByPublishTime(timestamp) - .thenApply(compactedEntry -> { - try { - return new MessageIdImpl(compactedEntry.getLedgerId(), - compactedEntry.getEntryId(), topicName.getPartitionIndex()); - } finally { - compactedEntry.release(); - } - }); + return persistentTopic.getTopicCompactionService().findPositionByPublishTime(timestamp) + .thenApply(__ -> new MessageIdImpl(__.getLedgerId(), __.getEntryId(), + topicName.getPartitionIndex())); } else { return findMessageIdByPublishTime(timestamp, persistentTopic.getManagedLedger()); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java index 1586c64821a34..643e7233674d0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java @@ -30,12 +30,12 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.function.Predicate; import java.util.function.Supplier; import javax.annotation.Nonnull; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.pulsar.common.api.proto.CompressionType; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.SingleMessageMetadata; @@ -114,11 +114,10 @@ public CompletableFuture getLastCompactedPosition() { } @Override - public CompletableFuture findEntryByPublishTime(long publishTime) { - final Predicate predicate = entry -> { - return Commands.parseMessageMetadata(entry.getDataBuffer()).getPublishTime() >= publishTime; - }; - return compactedTopic.findFirstMatchEntry(predicate); + public CompletableFuture findPositionByPublishTime(long publishTime) { + return compactedTopic.findFirstMatchEntry(entry -> + Commands.parseMessageMetadata(entry.getDataBuffer()).getPublishTime() > publishTime + ).thenApply(entry -> entry != null ? entry.getPosition() : PositionFactory.EARLIEST); } public CompactedTopicImpl getCompactedTopic() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java index 8a62d93d29f43..d493f41e216a9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TopicCompactionService.java @@ -49,18 +49,21 @@ public interface TopicCompactionService extends AutoCloseable { /** * Get the last compacted position from the TopicCompactionService. - * + *

+ * The position is the mark-delete position of the original topic when the last compaction was performed. + *

* @return a future that will be completed with the last compacted position, this position can be null. */ CompletableFuture getLastCompactedPosition(); /** - * Find the first entry that greater or equal to target publishTime. + * Find the first entry's position that is greater or equal to target publishTime. * * @param publishTime the publish time of entry. - * @return the first entry metadata that greater or equal to target publishTime, this entry can be null. + * @return the first entry's position that is greater or equal to target publishTime or + * {@link org.apache.bookkeeper.mledger.PositionFactory#EARLIEST} if the entry does not exist */ - CompletableFuture findEntryByPublishTime(long publishTime); + CompletableFuture findPositionByPublishTime(long publishTime); /** * Get the last message's position of the original topic. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java index 6f699d4dad4cd..ff76ccf0a1f22 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java @@ -21,7 +21,7 @@ import static org.apache.pulsar.compaction.Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY; import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; import static org.testng.Assert.assertEquals; -import static org.testng.AssertJUnit.assertNotNull; +import static org.testng.Assert.assertNull; import static org.testng.AssertJUnit.fail; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; @@ -42,7 +42,6 @@ import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; -import org.apache.pulsar.common.protocol.Commands; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -96,6 +95,10 @@ public void test() throws Exception { .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); + assertEquals(PositionFactory.EARLIEST, service.findPositionByPublishTime(System.currentTimeMillis()).get()); + assertNull(service.getLastCompactedPosition().get()); + assertEquals(TopicCompactionService.MessagePosition.EARLIEST, service.getLastMessagePosition().get()); + producer.newMessage() .key("c") .value("C_0".getBytes()) @@ -171,12 +174,7 @@ public void test() throws Exception { List entries2 = service.readCompactedEntries(PositionFactory.EARLIEST, 1).join(); assertEquals(entries2.size(), 1); - final var entry = service.findEntryByPublishTime(startTime).join(); - final var brokerEntryMetadata = Commands.peekBrokerEntryMetadataIfExist(entry.getDataBuffer()); - assertNotNull(brokerEntryMetadata); - assertEquals(brokerEntryMetadata.getIndex(), 2); - final var metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); - assertEquals(metadata.getPartitionKey(), "a"); - entry.release(); + final var position = service.findPositionByPublishTime(startTime).join(); + assertEquals(position.getEntryId(), 2); } }