Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
315157973 committed Jan 24, 2024
1 parent 817e524 commit 856ad28
Showing 1 changed file with 24 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,20 @@
*/
package org.apache.pulsar.broker.service.persistent;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.SortedMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import javax.annotation.Nullable;
import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.LedgerNotExistException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
Expand Down Expand Up @@ -86,10 +81,12 @@ public boolean expireMessages(int messageTTLInSeconds) {
if (expirationCheckInProgressUpdater.compareAndSet(this, FALSE, TRUE)) {
log.info("[{}][{}] Starting message expiry check, ttl= {} seconds", topicName, subName,
messageTTLInSeconds);
Map<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgerIdToInfoCache = new ConcurrentHashMap<>();
if (checkExpiryByLedgerClosureTime(cursor, messageTTLInSeconds)) {
return true;
}
cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
try {
long entryTimestamp = getEntryTimestamp(entry, ledgerIdToInfoCache, cursor);
long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
return MessageImpl.isEntryExpired(messageTTLInSeconds, entryTimestamp);
} catch (Exception e) {
log.error("[{}][{}] Error deserializing message for expiry check", topicName, subName, e);
Expand All @@ -108,31 +105,25 @@ public boolean expireMessages(int messageTTLInSeconds) {
}
}

private long getEntryTimestamp(Entry entry,
Map<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgerIdToInfoCache,
ManagedCursor cursor) throws IOException {
long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
ManagedLedger managedLedger = cursor.getManagedLedger();
ManagedLedgerConfig config = managedLedger.getConfig();
MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = ledgerIdToInfoCache.computeIfAbsent(
entry.getLedgerId(), (ledgerId) -> {
try {
return managedLedger.getLedgerInfo(ledgerId)
.get(config.getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS);
} catch (Exception e) {
return null;
}
});
if (ledgerInfo == null) {
return entryTimestamp;
}
long maxAvailableTimestamp = ledgerInfo.getTimestamp() + config.getMaximumRolloverTimeMs();
if (entryTimestamp > maxAvailableTimestamp) {
log.debug("Timestamp {} from {} {}:{} is incorrect, ledger is created at {}, please check client's clock",
entry.getLedgerId(), topicName, entry.getEntryId(), entryTimestamp, ledgerInfo.getTimestamp());
return maxAvailableTimestamp;
private boolean checkExpiryByLedgerClosureTime(ManagedCursor cursor, int messageTTLInSeconds) {
if (cursor instanceof ManagedCursorImpl managedCursor) {
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) managedCursor.getManagedLedger();
Position deletedPosition = managedCursor.getMarkDeletedPosition();
SortedMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgerInfoSortedMap =
managedLedger.getLedgersInfo()
.subMap(deletedPosition.getLedgerId(), managedLedger.getLedgersInfo().lastKey());
long expiryLedgerId = -1;
for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : ledgerInfoSortedMap.values()) {
if (!MessageImpl.isEntryExpired(messageTTLInSeconds, ledgerInfo.getTimestamp())) {
break;
}
expiryLedgerId = ledgerInfo.getLedgerId();
}
if (expiryLedgerId > -1) {
findEntryComplete(PositionImpl.get(managedLedger.getNextValidLedger(expiryLedgerId), 0), null);
}
}
return entryTimestamp;
return false;
}

@Override
Expand Down

0 comments on commit 856ad28

Please sign in to comment.