Skip to content

Commit

Permalink
Check if lastConfirmedEntry is valid
Browse files Browse the repository at this point in the history
  • Loading branch information
BewareMyPower committed Aug 12, 2024
1 parent cb36d00 commit 38a4c84
Showing 1 changed file with 20 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.api.BKException;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
Expand Down Expand Up @@ -249,7 +250,7 @@ private void asyncReadEntry0(ReadHandle lh, Position position, final ReadEntryCa
manager.mlFactoryMBean.recordCacheHit(cachedEntry.getLength());
callback.readEntryComplete(cachedEntry, ctx);
} else {
lh.readUnconfirmedAsync(position.getEntryId(), position.getEntryId()).thenAcceptAsync(
readAsync(lh, position.getEntryId(), position.getEntryId()).thenAcceptAsync(
ledgerEntries -> {
try {
Iterator<LedgerEntry> iterator = ledgerEntries.iterator();
Expand Down Expand Up @@ -429,7 +430,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
CompletableFuture<List<EntryImpl>> readFromStorage(ReadHandle lh,
long firstEntry, long lastEntry, boolean shouldCacheEntry) {
final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
CompletableFuture<List<EntryImpl>> readResult = lh.readUnconfirmedAsync(firstEntry, lastEntry)
CompletableFuture<List<EntryImpl>> readResult = readAsync(lh, firstEntry, lastEntry)
.thenApply(
ledgerEntries -> {
requireNonNull(ml.getName());
Expand Down Expand Up @@ -512,5 +513,22 @@ public void invalidateEntriesBeforeTimestamp(long timestamp) {
manager.entriesRemoved(evictedPair.getRight(), evictedPair.getLeft());
}

private CompletableFuture<LedgerEntries> readAsync(ReadHandle handle, long firstEntry, long lastEntry) {
final var lastConfirmedEntry = ml.getLastConfirmedEntry();
if (lastConfirmedEntry == null) {
return CompletableFuture.failedFuture(new IllegalStateException("LastConfirmedEntry is null when reading "
+ handle.getId()));
}
if (handle.getId() > lastConfirmedEntry.getLedgerId()) {
return CompletableFuture.failedFuture(new IllegalStateException("LastConfirmedEntry is "
+ lastConfirmedEntry + " while trying to read ledger " + handle.getId()));
}
if (handle.getId() == lastConfirmedEntry.getLedgerId() && lastEntry > lastConfirmedEntry.getEntryId()) {
return CompletableFuture.failedFuture(new IllegalStateException("Last ConfirmedEntry is "
+ lastConfirmedEntry + " while trying to read entry " + lastEntry));
}
return handle.readUnconfirmedAsync(firstEntry, lastEntry);
}

private static final Logger log = LoggerFactory.getLogger(RangeEntryCacheImpl.class);
}

0 comments on commit 38a4c84

Please sign in to comment.