diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 10f7948f553cb..a8692a58e09d1 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2728,8 +2728,6 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture promise) { return; } - doDeleteLedgers(ledgersToDelete); - for (LedgerInfo ls : offloadedLedgersToDelete) { LedgerInfo.Builder newInfoBuilder = ls.toBuilder(); newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true); @@ -2745,36 +2743,56 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture promise) { log.debug("[{}] Updating of ledgers list after trimming", name); } - store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStoreCallback() { - @Override - public void operationComplete(Void result, Stat stat) { - log.info("[{}] End TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.size(), - TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this)); - ledgersStat = stat; - metadataMutex.unlock(); - trimmerMutex.unlock(); - - for (LedgerInfo ls : ledgersToDelete) { - log.info("[{}] Removing ledger {} - size: {}", name, ls.getLedgerId(), ls.getSize()); - asyncDeleteLedger(ls.getLedgerId(), ls); - } - for (LedgerInfo ls : offloadedLedgersToDelete) { - log.info("[{}] Deleting offloaded ledger {} from bookkeeper - size: {}", name, ls.getLedgerId(), - ls.getSize()); - asyncDeleteLedgerFromBookKeeper(ls.getLedgerId()); - } - promise.complete(null); - } - - @Override - public void operationFailed(MetaStoreException e) { - log.warn("[{}] Failed to update the list of ledgers after trimming", name, e); - metadataMutex.unlock(); - trimmerMutex.unlock(); - handleBadVersion(e); + List> futures = new ArrayList<>(); + for (LedgerInfo ls : ledgersToDelete) { + log.info("[{}] Removing ledger {} - size: {}", name, ls.getLedgerId(), ls.getSize()); + futures.add(asyncDeleteLedger(ls.getLedgerId(), ls)); + } + for (LedgerInfo ls : offloadedLedgersToDelete) { + log.info("[{}] Deleting offloaded ledger {} from bookkeeper - size: {}", name, ls.getLedgerId(), + ls.getSize()); + futures.add(asyncDeleteLedger(ls.getLedgerId(), DEFAULT_LEDGER_DELETE_RETRIES)); + } + if (futures.isEmpty()) { + metadataMutex.unlock(); + trimmerMutex.unlock(); + return; + } + FutureUtil.waitForAll(futures).thenAccept((res) -> + store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, + new MetaStoreCallback<>() { + @Override + public void operationComplete(Void result, Stat stat) { + log.info("[{}] End TrimConsumedLedgers. ledgers={} totalSize={}", name, + ledgers.size(), + TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this)); + try { + doDeleteLedgers(ledgersToDelete); + advanceCursorsIfNecessary(ledgersToDelete); + ledgersStat = stat; + promise.complete(null); + } catch (Exception e) { + promise.completeExceptionally(e); + } finally { + metadataMutex.unlock(); + trimmerMutex.unlock(); + } + } - promise.completeExceptionally(e); - } + @Override + public void operationFailed(MetaStoreException e) { + log.warn("[{}] Failed to update the list of ledgers after trimming", + name, e); + metadataMutex.unlock(); + trimmerMutex.unlock(); + handleBadVersion(e); + promise.completeExceptionally(e); + }}) + ).exceptionally((e) -> { + metadataMutex.unlock(); + trimmerMutex.unlock(); + promise.completeExceptionally(e); + return null; }); } } @@ -2950,39 +2968,54 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { private void asyncDeleteLedgerFromBookKeeper(long ledgerId) { asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES); } - - private void asyncDeleteLedger(long ledgerId, LedgerInfo info) { + public CompletableFuture asyncDeleteLedger(long ledgerId, LedgerInfo info) { + List> futures = new ArrayList<>(); if (!info.getOffloadContext().getBookkeeperDeleted()) { // only delete if it hasn't been previously deleted for offload - asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES); + futures.add(asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES)); } - if (info.getOffloadContext().hasUidMsb()) { UUID uuid = new UUID(info.getOffloadContext().getUidMsb(), info.getOffloadContext().getUidLsb()); OffloadUtils.cleanupOffloaded(ledgerId, uuid, config, OffloadUtils.getOffloadDriverMetadata(info, config.getLedgerOffloader().getOffloadDriverMetadata()), "Trimming", name, scheduledExecutor); } + if (!futures.isEmpty()) { + return FutureUtil.waitForAll(futures); + } + return CompletableFuture.completedFuture(null); } - private void asyncDeleteLedger(long ledgerId, long retry) { - if (retry <= 0) { - log.warn("[{}] Failed to delete ledger after retries {}", name, ledgerId); - return; - } + public CompletableFuture asyncDeleteLedger(long ledgerId, long retry) { + CompletableFuture future = new CompletableFuture<>(); bookKeeper.asyncDeleteLedger(ledgerId, (rc, ctx) -> { if (isNoSuchLedgerExistsException(rc)) { log.warn("[{}] Ledger was already deleted {}", name, ledgerId); + future.complete(null); } else if (rc != BKException.Code.OK) { log.error("[{}] Error deleting ledger {} : {}", name, ledgerId, BKException.getMessage(rc)); scheduledExecutor.schedule(() -> asyncDeleteLedger(ledgerId, retry - 1), DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS); + if (retry - 1 <= 0) { + log.warn("[{}] Failed to delete ledger after retries {}", name, ledgerId); + future.completeExceptionally(new ManagedLedgerException(BKException.getMessage(rc))); + } else { + scheduledExecutor.schedule(() -> asyncDeleteLedger(ledgerId, retry - 1) + .thenAccept((res) -> future.complete(null)) + .exceptionally((e) -> { + future.completeExceptionally(e); + return null; + }), + DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS); + } } else { if (log.isDebugEnabled()) { log.debug("[{}] Deleted ledger {}", name, ledgerId); } + future.complete(null); } }, null); + return future; } @SuppressWarnings("checkstyle:fallthrough") diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 70ddbb9998fd8..8fc2c2d7031dd 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -21,6 +21,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; @@ -2273,6 +2274,37 @@ public void testDeletionAfterRetention() throws Exception { assertTrue(ml.getTotalSize() <= "shortmessage".getBytes().length); ml.close(); } + @Test + public void testMetaDataNotDeleteWhenLedgerDeleteFailed() throws Exception { + BookKeeper spyBookKeeper = spy(bkc); + ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, spyBookKeeper); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setRetentionSizeInMB(0); + config.setMaxEntriesPerLedger(1); + config.setRetentionTime(1, TimeUnit.SECONDS); + ManagedLedgerImpl ml = spy((ManagedLedgerImpl) factory.open("metaDataNotDelete", config)); + ManagedCursorImpl c1 = (ManagedCursorImpl) ml.openCursor("c1noretention"); + doReturn(FutureUtil.failedFuture(new ManagedLedgerException("12345"))).when(ml).asyncDeleteLedger(anyLong(), any(LedgerInfo.class)); + ml.addEntry("testEntry".getBytes()); + c1.skipEntries(1, IndividualDeletedEntries.Include); + ml.addEntry("testEntry2".getBytes()); + c1.skipEntries(1, IndividualDeletedEntries.Include); + assertTrue(ml.getLedgersInfoAsList().size() > 1); + + //delete ledger failed, metadata should not be delete + ml.internalTrimConsumedLedgers(new CompletableFuture<>()); + assertTrue(ml.getLedgersInfoAsList().size() > 1); + + // let it success + doReturn(CompletableFuture.completedFuture(null)).when(ml).asyncDeleteLedger(anyLong(), any(LedgerInfo.class)); + // make sure reach the retention time, the interval is 1s + Thread.sleep(1000); + ml.internalTrimConsumedLedgers(new CompletableFuture<>()); + Awaitility.await().untilAsserted(() -> assertTrue(ml.getLedgersInfoAsList().size() <= 1)); + c1.close(); + ml.close(); + factory.shutdown(); + } @Test public void testDeletionAfterLedgerClosedAndRetention() throws Exception { @@ -2357,8 +2389,9 @@ public void testRetention0WithEmptyLedgerWithoutCursors() throws Exception { ml.internalTrimConsumedLedgers(CompletableFuture.completedFuture(null)); assertTrue(ml.getFirstPosition().ledgerId <= ml.lastConfirmedEntry.ledgerId); - assertFalse(ml.getLedgersInfo().containsKey(ml.lastConfirmedEntry.ledgerId), - "the ledger at lastConfirmedEntry has not been trimmed!"); + ManagedLedgerImpl finalMl = ml; + Awaitility.await().untilAsserted(() -> assertFalse(finalMl.getLedgersInfo().containsKey(finalMl.lastConfirmedEntry.ledgerId), + "the ledger at lastConfirmedEntry has not been trimmed!")); ml.close(); }