From 38a4c848c5a4626673e82e1f37d587d00b1eea1f Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 12 Aug 2024 14:14:56 +0800 Subject: [PATCH] Check if lastConfirmedEntry is valid --- .../impl/cache/RangeEntryCacheImpl.java | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java index e60348b39f30f..c4370bee4e196 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java @@ -31,6 +31,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import org.apache.bookkeeper.client.api.BKException; +import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.AsyncCallbacks; @@ -249,7 +250,7 @@ private void asyncReadEntry0(ReadHandle lh, Position position, final ReadEntryCa manager.mlFactoryMBean.recordCacheHit(cachedEntry.getLength()); callback.readEntryComplete(cachedEntry, ctx); } else { - lh.readUnconfirmedAsync(position.getEntryId(), position.getEntryId()).thenAcceptAsync( + readAsync(lh, position.getEntryId(), position.getEntryId()).thenAcceptAsync( ledgerEntries -> { try { Iterator iterator = ledgerEntries.iterator(); @@ -429,7 +430,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { CompletableFuture> readFromStorage(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry) { final int entriesToRead = (int) (lastEntry - firstEntry) + 1; - CompletableFuture> readResult = lh.readUnconfirmedAsync(firstEntry, lastEntry) + CompletableFuture> readResult = readAsync(lh, firstEntry, lastEntry) .thenApply( ledgerEntries -> { requireNonNull(ml.getName()); @@ -512,5 +513,22 @@ public void invalidateEntriesBeforeTimestamp(long timestamp) { manager.entriesRemoved(evictedPair.getRight(), evictedPair.getLeft()); } + private CompletableFuture readAsync(ReadHandle handle, long firstEntry, long lastEntry) { + final var lastConfirmedEntry = ml.getLastConfirmedEntry(); + if (lastConfirmedEntry == null) { + return CompletableFuture.failedFuture(new IllegalStateException("LastConfirmedEntry is null when reading " + + handle.getId())); + } + if (handle.getId() > lastConfirmedEntry.getLedgerId()) { + return CompletableFuture.failedFuture(new IllegalStateException("LastConfirmedEntry is " + + lastConfirmedEntry + " while trying to read ledger " + handle.getId())); + } + if (handle.getId() == lastConfirmedEntry.getLedgerId() && lastEntry > lastConfirmedEntry.getEntryId()) { + return CompletableFuture.failedFuture(new IllegalStateException("Last ConfirmedEntry is " + + lastConfirmedEntry + " while trying to read entry " + lastEntry)); + } + return handle.readUnconfirmedAsync(firstEntry, lastEntry); + } + private static final Logger log = LoggerFactory.getLogger(RangeEntryCacheImpl.class); }