Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Avoid Bookie data is never deleted #19992

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2709,8 +2709,6 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
return;
}

doDeleteLedgers(ledgersToDelete);

for (LedgerInfo ls : offloadedLedgersToDelete) {
LedgerInfo.Builder newInfoBuilder = ls.toBuilder();
newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true);
Expand All @@ -2726,36 +2724,56 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
log.debug("[{}] Updating of ledgers list after trimming", name);
}

store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStoreCallback<Void>() {
@Override
public void operationComplete(Void result, Stat stat) {
log.info("[{}] End TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.size(),
TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this));
ledgersStat = stat;
metadataMutex.unlock();
trimmerMutex.unlock();

for (LedgerInfo ls : ledgersToDelete) {
log.info("[{}] Removing ledger {} - size: {}", name, ls.getLedgerId(), ls.getSize());
asyncDeleteLedger(ls.getLedgerId(), ls);
}
for (LedgerInfo ls : offloadedLedgersToDelete) {
log.info("[{}] Deleting offloaded ledger {} from bookkeeper - size: {}", name, ls.getLedgerId(),
ls.getSize());
asyncDeleteLedgerFromBookKeeper(ls.getLedgerId());
}
promise.complete(null);
}

@Override
public void operationFailed(MetaStoreException e) {
log.warn("[{}] Failed to update the list of ledgers after trimming", name, e);
metadataMutex.unlock();
trimmerMutex.unlock();
handleBadVersion(e);
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (LedgerInfo ls : ledgersToDelete) {
log.info("[{}] Removing ledger {} - size: {}", name, ls.getLedgerId(), ls.getSize());
futures.add(asyncDeleteLedger(ls.getLedgerId(), ls));
}
for (LedgerInfo ls : offloadedLedgersToDelete) {
log.info("[{}] Deleting offloaded ledger {} from bookkeeper - size: {}", name, ls.getLedgerId(),
ls.getSize());
futures.add(asyncDeleteLedger(ls.getLedgerId(), DEFAULT_LEDGER_DELETE_RETRIES, null));
}
if (futures.isEmpty()) {
metadataMutex.unlock();
trimmerMutex.unlock();
return;
}
FutureUtil.waitForAll(futures).thenAccept((res) ->
store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat,
new MetaStoreCallback<>() {
@Override
public void operationComplete(Void result, Stat stat) {
log.info("[{}] End TrimConsumedLedgers. ledgers={} totalSize={}", name,
ledgers.size(),
TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this));
try {
doDeleteLedgers(ledgersToDelete);
advanceCursorsIfNecessary(ledgersToDelete);
ledgersStat = stat;
promise.complete(null);
} catch (Exception e) {
promise.completeExceptionally(e);
} finally {
metadataMutex.unlock();
trimmerMutex.unlock();
}
}

promise.completeExceptionally(e);
}
@Override
public void operationFailed(MetaStoreException e) {
log.warn("[{}] Failed to update the list of ledgers after trimming",
name, e);
metadataMutex.unlock();
trimmerMutex.unlock();
handleBadVersion(e);
promise.completeExceptionally(e);
}})
).exceptionally((e) -> {
metadataMutex.unlock();
trimmerMutex.unlock();
promise.completeExceptionally(e);
return null;
});
}
}
Expand Down Expand Up @@ -2931,39 +2949,52 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
private void asyncDeleteLedgerFromBookKeeper(long ledgerId) {
asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES);
}

private void asyncDeleteLedger(long ledgerId, LedgerInfo info) {
public CompletableFuture<Void> asyncDeleteLedger(long ledgerId, LedgerInfo info) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
if (!info.getOffloadContext().getBookkeeperDeleted()) {
// only delete if it hasn't been previously deleted for offload
asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES);
futures.add(asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES, null));
}

if (info.getOffloadContext().hasUidMsb()) {
UUID uuid = new UUID(info.getOffloadContext().getUidMsb(), info.getOffloadContext().getUidLsb());
OffloadUtils.cleanupOffloaded(ledgerId, uuid, config,
futures.add(OffloadUtils.cleanupOffloaded(ledgerId, uuid, config,
OffloadUtils.getOffloadDriverMetadata(info, config.getLedgerOffloader().getOffloadDriverMetadata()),
"Trimming", name, scheduledExecutor);
"Trimming", name, scheduledExecutor));
}
if (!futures.isEmpty()) {
return FutureUtil.waitForAll(futures);
}
return CompletableFuture.completedFuture(null);
}

private void asyncDeleteLedger(long ledgerId, long retry) {
if (retry <= 0) {
log.warn("[{}] Failed to delete ledger after retries {}", name, ledgerId);
return;
}
asyncDeleteLedger(ledgerId, retry, null);
}
@VisibleForTesting
public CompletableFuture<Void> asyncDeleteLedger(long ledgerId, long retry,
CompletableFuture<Void> callbackFuture) {
CompletableFuture<Void> future = callbackFuture == null ? new CompletableFuture<>() : callbackFuture;
bookKeeper.asyncDeleteLedger(ledgerId, (rc, ctx) -> {
if (isNoSuchLedgerExistsException(rc)) {
log.warn("[{}] Ledger was already deleted {}", name, ledgerId);
} else if (rc != BKException.Code.OK) {
log.error("[{}] Error deleting ledger {} : {}", name, ledgerId, BKException.getMessage(rc));
scheduledExecutor.schedule(safeRun(() -> asyncDeleteLedger(ledgerId, retry - 1)),
DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS);
if (retry - 1 <= 0) {
log.warn("[{}] Failed to delete ledger after retries {}", name, ledgerId);
future.completeExceptionally(new ManagedLedgerException(BKException.getMessage(rc)));
} else {
scheduledExecutor.schedule(safeRun(() -> asyncDeleteLedger(ledgerId, retry - 1, future)),
DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS);
}
return;
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Deleted ledger {}", name, ledgerId);
}
}
future.complete(null);
}, null);
return future;
}

@SuppressWarnings("checkstyle:fallthrough")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -2272,6 +2273,37 @@ public void testDeletionAfterRetention() throws Exception {
assertTrue(ml.getTotalSize() <= "shortmessage".getBytes().length);
ml.close();
}
@Test
public void testMetaDataNotDeleteWhenLedgerDeleteFailed() throws Exception {
BookKeeper spyBookKeeper = spy(bkc);
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, spyBookKeeper);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setRetentionSizeInMB(0);
config.setMaxEntriesPerLedger(1);
config.setRetentionTime(1, TimeUnit.SECONDS);
ManagedLedgerImpl ml = spy((ManagedLedgerImpl) factory.open("metaDataNotDelete", config));
ManagedCursorImpl c1 = (ManagedCursorImpl) ml.openCursor("c1noretention");
doReturn(FutureUtil.failedFuture(new ManagedLedgerException("12345"))).when(ml).asyncDeleteLedger(anyLong(), any(LedgerInfo.class));
ml.addEntry("testEntry".getBytes());
c1.skipEntries(1, IndividualDeletedEntries.Include);
ml.addEntry("testEntry2".getBytes());
c1.skipEntries(1, IndividualDeletedEntries.Include);
assertTrue(ml.getLedgersInfoAsList().size() > 1);

//delete ledger failed, metadata should not be delete
ml.internalTrimConsumedLedgers(new CompletableFuture<>());
assertTrue(ml.getLedgersInfoAsList().size() > 1);

// let it success
doReturn(CompletableFuture.completedFuture(null)).when(ml).asyncDeleteLedger(anyLong(), any(LedgerInfo.class));
// make sure reach the retention time, the interval is 1s
Thread.sleep(1000);
ml.internalTrimConsumedLedgers(new CompletableFuture<>());
Awaitility.await().untilAsserted(() -> assertTrue(ml.getLedgersInfoAsList().size() <= 1));
c1.close();
ml.close();
factory.shutdown();
}

@Test
public void testDeletionAfterLedgerClosedAndRetention() throws Exception {
Expand Down Expand Up @@ -2356,8 +2388,9 @@ public void testRetention0WithEmptyLedgerWithoutCursors() throws Exception {
ml.internalTrimConsumedLedgers(CompletableFuture.completedFuture(null));

assertTrue(ml.getFirstPosition().ledgerId <= ml.lastConfirmedEntry.ledgerId);
assertFalse(ml.getLedgersInfo().containsKey(ml.lastConfirmedEntry.ledgerId),
"the ledger at lastConfirmedEntry has not been trimmed!");
ManagedLedgerImpl finalMl = ml;
Awaitility.await().untilAsserted(() -> assertFalse(finalMl.getLedgersInfo().containsKey(finalMl.lastConfirmedEntry.ledgerId),
"the ledger at lastConfirmedEntry has not been trimmed!"));
ml.close();
}

Expand Down