Skip to content

Commit

Permalink
[improve][broker] Add getLastMessagePosition method to TopicCompactio…
Browse files Browse the repository at this point in the history
…nService for flexible customization
  • Loading branch information
BewareMyPower committed Oct 11, 2024
1 parent 2dace76 commit 3cce53d
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 189 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.ScheduledFuture;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
Expand All @@ -56,6 +55,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
Expand Down Expand Up @@ -137,7 +137,6 @@
import org.apache.pulsar.common.api.proto.CommandUnsubscribe;
import org.apache.pulsar.common.api.proto.CommandWatchTopicList;
import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose;
import org.apache.pulsar.common.api.proto.CompressionType;
import org.apache.pulsar.common.api.proto.FeatureFlags;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
Expand All @@ -148,10 +147,7 @@
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.Schema;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.Metadata;
Expand All @@ -178,6 +174,7 @@
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.common.util.netty.NettyChannelUtil;
import org.apache.pulsar.common.util.netty.NettyFutureUtil;
import org.apache.pulsar.compaction.TopicCompactionService;
import org.apache.pulsar.functions.utils.Exceptions;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
Expand Down Expand Up @@ -2252,208 +2249,107 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId)
if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
long requestId = getLastMessageId.getRequestId();
final var readCompacted = consumer.readCompacted();

Topic topic = consumer.getSubscription().getTopic();
topic.checkIfTransactionBufferRecoverCompletely()
.thenCompose(__ -> topic.getLastDispatchablePosition())
.thenApply(lastPosition -> {
int partitionIndex = TopicName.getPartitionIndex(topic.getName());

Position markDeletePosition = null;
if (consumer.getSubscription() instanceof PersistentSubscription) {
markDeletePosition = ((PersistentSubscription) consumer.getSubscription()).getCursor()
.getMarkDeletedPosition();
}

getLargestBatchIndexWhenPossible(
topic,
lastPosition,
markDeletePosition,
partitionIndex,
requestId,
consumer.getSubscription().getName(),
consumer.readCompacted());
topic.checkIfTransactionBufferRecoverCompletely().whenComplete((__, e) -> {
if (e != null) {
writeAndFlush(Commands.newError(requestId, ServerError.UnknownError,
"Failed to recover Transaction Buffer: " + e.getMessage()));
return;
}
topic.getLastDispatchablePosition().thenCompose(lastPosition -> {
final int partitionIndex = TopicName.getPartitionIndex(topic.getName());
final Position markDeletePosition;
if (consumer.getSubscription() instanceof PersistentSubscription) {
markDeletePosition = ((PersistentSubscription) consumer.getSubscription()).getCursor()
.getMarkDeletedPosition();
} else {
markDeletePosition = null;
}
final var persistentTopic = (PersistentTopic) topic;
final var compactionService = persistentTopic.getTopicCompactionService();
final var ml = persistentTopic.getManagedLedger();
final var compactionHorizonFuture = readCompacted
? compactionService.getLastCompactedPosition()
: CompletableFuture.<Position>completedFuture(null);
return compactionHorizonFuture.thenCompose(compactionHorizon -> {
// there is no entry in the original topic
if (lastPosition.getEntryId() == -1
|| !ml.getLedgersInfo().containsKey(lastPosition.getLedgerId())) {
return compactionHorizon != null ? compactionService.getLastMessagePosition()
: TopicCompactionService.MessagePosition.EARLIEST_FUTURE;
}
if (compactionHorizon != null && lastPosition.compareTo(compactionHorizon) <= 0) {
return compactionService.getLastMessagePosition();
}

// For a valid position, we read the entry out and parse the batch size from its metadata.
final var future = getLastMessagePositionFromManagedLedger(ml, lastPosition, readCompacted);
return future.thenCompose(position -> {
if (position == null) {
// readCompacted is true and the managed ledger is corrupted
return compactionService.getLastMessagePosition();
}
return CompletableFuture.completedFuture(position);
});
}).thenAccept(lastMessageId -> {
final var response = Commands.newGetLastMessageIdResponse(requestId, lastMessageId.ledgerId(),
lastMessageId.entryId(), partitionIndex, lastMessageId.batchIndex(),
markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
markDeletePosition != null ? markDeletePosition.getEntryId() : -1);
writeAndFlush(response);
});
}).exceptionally(throwable -> {
log.error("[{}][{}][{}] Failed to get last message id (readCompacted: {})", remoteAddress,
topic.getName(), consumer.getSubscription().getName(), readCompacted, throwable);
writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, throwable.getMessage()));
return null;
}).exceptionally(e -> {
writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
ServerError.UnknownError, "Failed to recover Transaction Buffer."));
return null;
});
});
});
} else {
writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
ServerError.MetadataError, "Consumer not found"));
}
}

private void getLargestBatchIndexWhenPossible(
Topic topic,
Position lastPosition,
Position markDeletePosition,
int partitionIndex,
long requestId,
String subscriptionName,
boolean readCompacted) {

PersistentTopic persistentTopic = (PersistentTopic) topic;
ManagedLedger ml = persistentTopic.getManagedLedger();

// If it's not pointing to a valid entry, respond messageId of the current position.
// If the compaction cursor reach the end of the topic, respond messageId from compacted ledger
CompletableFuture<Position> compactionHorizonFuture = readCompacted
? persistentTopic.getTopicCompactionService().getLastCompactedPosition() :
CompletableFuture.completedFuture(null);

compactionHorizonFuture.whenComplete((compactionHorizon, ex) -> {
if (ex != null) {
log.error("Failed to get compactionHorizon.", ex);
writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, ex.getMessage()));
return;
private CompletableFuture<TopicCompactionService.MessagePosition> getLastMessagePositionFromManagedLedger(
ManagedLedger managedLedger, Position lastPosition, boolean readCompacted) {
// For a valid position, we read the entry out and parse the batch size from its metadata.
CompletableFuture<Entry> entryFuture = new CompletableFuture<>();
managedLedger.asyncReadEntry(lastPosition, new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
entryFuture.complete(entry);
}


if (lastPosition.getEntryId() == -1 || !ml.getLedgersInfo().containsKey(lastPosition.getLedgerId())) {
// there is no entry in the original topic
if (compactionHorizon != null) {
// if readCompacted is true, we need to read the last entry from compacted topic
handleLastMessageIdFromCompactionService(persistentTopic, requestId, partitionIndex,
markDeletePosition);
} else {
// if readCompacted is false, we need to return MessageId.earliest
writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, -1, -1, partitionIndex, -1,
markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
}
return;
@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
entryFuture.completeExceptionally(exception);
}

if (compactionHorizon != null && lastPosition.compareTo(compactionHorizon) <= 0) {
handleLastMessageIdFromCompactionService(persistentTopic, requestId, partitionIndex,
markDeletePosition);
return;
@Override
public String toString() {
return String.format("ServerCnx [%s] get largest batch index when possible",
ServerCnx.this);
}

// For a valid position, we read the entry out and parse the batch size from its metadata.
CompletableFuture<Entry> entryFuture = new CompletableFuture<>();
ml.asyncReadEntry(lastPosition, new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
entryFuture.complete(entry);
}

@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
entryFuture.completeExceptionally(exception);
}

@Override
public String toString() {
return String.format("ServerCnx [%s] get largest batch index when possible",
ServerCnx.this.toString());
}
}, null);

CompletableFuture<Integer> batchSizeFuture = entryFuture.thenApply(entry -> {
}, null);
return entryFuture.thenApply(entry -> {
try {
MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
int batchSize = metadata.getNumMessagesInBatch();
final var batchSize = metadata.hasNumMessagesInBatch() ? metadata.getNumMessagesInBatch() : 0;
return new TopicCompactionService.MessagePosition(entry.getLedgerId(), entry.getEntryId(), batchSize - 1);
} finally {
entry.release();
return metadata.hasNumMessagesInBatch() ? batchSize : -1;
});

batchSizeFuture.whenComplete((batchSize, e) -> {
if (e != null) {
if (e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException
&& readCompacted) {
handleLastMessageIdFromCompactionService(persistentTopic, requestId, partitionIndex,
markDeletePosition);
} else {
writeAndFlush(Commands.newError(
requestId, ServerError.MetadataError,
"Failed to get batch size for entry " + e.getMessage()));
}
} else {
int largestBatchIndex = batchSize > 0 ? batchSize - 1 : -1;

if (log.isDebugEnabled()) {
log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress,
topic.getName(), subscriptionName, lastPosition, partitionIndex);
}

writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, lastPosition.getLedgerId(),
lastPosition.getEntryId(), partitionIndex, largestBatchIndex,
markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
}
});
});
}

private void handleLastMessageIdFromCompactionService(PersistentTopic persistentTopic, long requestId,
int partitionIndex, Position markDeletePosition) {
persistentTopic.getTopicCompactionService().readLastCompactedEntry().thenAccept(entry -> {
if (entry != null) {
try {
// in this case, all the data has been compacted, so return the last position
// in the compacted ledger to the client
ByteBuf payload = entry.getDataBuffer();
MessageMetadata metadata = Commands.parseMessageMetadata(payload);
int largestBatchIndex;
try {
largestBatchIndex = calculateTheLastBatchIndexInBatch(metadata, payload);
} catch (IOException ioEx) {
writeAndFlush(Commands.newError(requestId, ServerError.MetadataError,
"Failed to deserialize batched message from the last entry of the compacted Ledger: "
+ ioEx.getMessage()));
return;
}
writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
entry.getLedgerId(), entry.getEntryId(), partitionIndex, largestBatchIndex,
markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
} finally {
entry.release();
}
} else {
// in this case, the ledgers been removed except the current ledger
// and current ledger without any data
writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
-1, -1, partitionIndex, -1,
markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
}
}).exceptionally(ex -> {
writeAndFlush(Commands.newError(
requestId, ServerError.MetadataError,
"Failed to read last entry of the compacted Ledger "
+ ex.getCause().getMessage()));
return null;
});
}

private int calculateTheLastBatchIndexInBatch(MessageMetadata metadata, ByteBuf payload) throws IOException {
int batchSize = metadata.getNumMessagesInBatch();
if (batchSize <= 1){
return -1;
}
if (metadata.hasCompression()) {
var tmp = payload;
CompressionType compressionType = metadata.getCompression();
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType);
int uncompressedSize = metadata.getUncompressedSize();
payload = codec.decode(payload, uncompressedSize);
tmp.release();
}
SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
int lastBatchIndexInBatch = -1;
for (int i = 0; i < batchSize; i++){
ByteBuf singleMessagePayload =
Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, i, batchSize);
singleMessagePayload.release();
if (singleMessageMetadata.isCompactedOut()){
continue;
}).exceptionally(e -> {
final var unwrapped = FutureUtil.unwrapCompletionException(e);
if (readCompacted
&& unwrapped instanceof ManagedLedgerException.NonRecoverableLedgerException) {
return null;
}
lastBatchIndexInBatch = i;
}
return lastBatchIndexInBatch;
throw new CompletionException(unwrapped);
});
}

private CompletableFuture<Boolean> isNamespaceOperationAllowed(NamespaceName namespaceName,
Expand Down
Loading

0 comments on commit 3cce53d

Please sign in to comment.