From 7390a00972bb5d233f5937674aef7cda0658bf1d Mon Sep 17 00:00:00 2001 From: feynmanlin <315157973@qq.com> Date: Sat, 1 Apr 2023 16:22:55 +0800 Subject: [PATCH 1/3] [fix][broker] Avoid Bookie data is never deleted --- .../mledger/impl/ManagedLedgerImpl.java | 122 +++++++++++------- .../mledger/impl/ManagedLedgerTest.java | 38 +++++- 2 files changed, 113 insertions(+), 47 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 8376ee1bb8467..bd289b90ca16c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2709,8 +2709,6 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture promise) { return; } - doDeleteLedgers(ledgersToDelete); - for (LedgerInfo ls : offloadedLedgersToDelete) { LedgerInfo.Builder newInfoBuilder = ls.toBuilder(); newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true); @@ -2726,36 +2724,56 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture promise) { log.debug("[{}] Updating of ledgers list after trimming", name); } - store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStoreCallback() { - @Override - public void operationComplete(Void result, Stat stat) { - log.info("[{}] End TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.size(), - TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this)); - ledgersStat = stat; - metadataMutex.unlock(); - trimmerMutex.unlock(); - - for (LedgerInfo ls : ledgersToDelete) { - log.info("[{}] Removing ledger {} - size: {}", name, ls.getLedgerId(), ls.getSize()); - asyncDeleteLedger(ls.getLedgerId(), ls); - } - for (LedgerInfo ls : offloadedLedgersToDelete) { - log.info("[{}] Deleting offloaded ledger {} from bookkeeper - size: {}", name, ls.getLedgerId(), - ls.getSize()); - asyncDeleteLedgerFromBookKeeper(ls.getLedgerId()); - } - promise.complete(null); - } - - @Override - public void operationFailed(MetaStoreException e) { - log.warn("[{}] Failed to update the list of ledgers after trimming", name, e); - metadataMutex.unlock(); - trimmerMutex.unlock(); - handleBadVersion(e); + List> futures = new ArrayList<>(); + for (LedgerInfo ls : ledgersToDelete) { + log.info("[{}] Removing ledger {} - size: {}", name, ls.getLedgerId(), ls.getSize()); + futures.add(asyncDeleteLedger(ls.getLedgerId(), ls)); + } + for (LedgerInfo ls : offloadedLedgersToDelete) { + log.info("[{}] Deleting offloaded ledger {} from bookkeeper - size: {}", name, ls.getLedgerId(), + ls.getSize()); + futures.add(asyncDeleteLedger(ls.getLedgerId(), DEFAULT_LEDGER_DELETE_RETRIES, null)); + } + if (futures.isEmpty()) { + metadataMutex.unlock(); + trimmerMutex.unlock(); + return; + } + FutureUtil.waitForAll(futures).thenAccept((res) -> + store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, + new MetaStoreCallback<>() { + @Override + public void operationComplete(Void result, Stat stat) { + log.info("[{}] End TrimConsumedLedgers. ledgers={} totalSize={}", name, + ledgers.size(), + TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this)); + try { + doDeleteLedgers(ledgersToDelete); + advanceCursorsIfNecessary(ledgersToDelete); + ledgersStat = stat; + promise.complete(null); + } catch (Exception e) { + promise.completeExceptionally(e); + } finally { + metadataMutex.unlock(); + trimmerMutex.unlock(); + } + } - promise.completeExceptionally(e); - } + @Override + public void operationFailed(MetaStoreException e) { + log.warn("[{}] Failed to update the list of ledgers after trimming", + name, e); + metadataMutex.unlock(); + trimmerMutex.unlock(); + handleBadVersion(e); + promise.completeExceptionally(e); + }}) + ).exceptionally((e) -> { + metadataMutex.unlock(); + trimmerMutex.unlock(); + promise.completeExceptionally(e); + return null; }); } } @@ -2931,39 +2949,53 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { private void asyncDeleteLedgerFromBookKeeper(long ledgerId) { asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES); } - - private void asyncDeleteLedger(long ledgerId, LedgerInfo info) { + public CompletableFuture asyncDeleteLedger(long ledgerId, LedgerInfo info) { + List> futures = new ArrayList<>(); if (!info.getOffloadContext().getBookkeeperDeleted()) { // only delete if it hasn't been previously deleted for offload - asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES); + futures.add(asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES, null)); } - if (info.getOffloadContext().hasUidMsb()) { UUID uuid = new UUID(info.getOffloadContext().getUidMsb(), info.getOffloadContext().getUidLsb()); - OffloadUtils.cleanupOffloaded(ledgerId, uuid, config, + futures.add(OffloadUtils.cleanupOffloaded(ledgerId, uuid, config, OffloadUtils.getOffloadDriverMetadata(info, config.getLedgerOffloader().getOffloadDriverMetadata()), - "Trimming", name, scheduledExecutor); + "Trimming", name, scheduledExecutor)); } + if (!futures.isEmpty()) { + return FutureUtil.waitForAll(futures); + } + return CompletableFuture.completedFuture(null); } private void asyncDeleteLedger(long ledgerId, long retry) { - if (retry <= 0) { - log.warn("[{}] Failed to delete ledger after retries {}", name, ledgerId); - return; - } + asyncDeleteLedger(ledgerId, retry, null); + } + @VisibleForTesting + public CompletableFuture asyncDeleteLedger(long ledgerId, long retry, + CompletableFuture callbackFuture) { + CompletableFuture future = callbackFuture == null ? new CompletableFuture<>() : callbackFuture; bookKeeper.asyncDeleteLedger(ledgerId, (rc, ctx) -> { + if (rc != BKException.Code.OK) { + log.error("[{}] Error deleting ledger {} : {}", name, ledgerId, BKException.getMessage(rc)); + if (retry - 1 <= 0) { + log.warn("[{}] Failed to delete ledger after retries {}", name, ledgerId); + future.completeExceptionally(new ManagedLedgerException(BKException.getMessage(rc))); + } else { + scheduledExecutor.schedule(safeRun(() -> asyncDeleteLedger(ledgerId, retry - 1, future)), + DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS); + } + return; + } if (isNoSuchLedgerExistsException(rc)) { log.warn("[{}] Ledger was already deleted {}", name, ledgerId); - } else if (rc != BKException.Code.OK) { - log.error("[{}] Error deleting ledger {} : {}", name, ledgerId, BKException.getMessage(rc)); - scheduledExecutor.schedule(safeRun(() -> asyncDeleteLedger(ledgerId, retry - 1)), - DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS); } else { if (log.isDebugEnabled()) { log.debug("[{}] Deleted ledger {}", name, ledgerId); } } + future.complete(null); }, null); + return future; } @SuppressWarnings("checkstyle:fallthrough") diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index a4d8b75d00c96..bf8218ee7a139 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -21,6 +21,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; @@ -128,6 +129,7 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.Stat; import org.apache.pulsar.metadata.api.extended.SessionEvent; @@ -2271,6 +2273,37 @@ public void testDeletionAfterRetention() throws Exception { assertTrue(ml.getTotalSize() <= "shortmessage".getBytes().length); ml.close(); } + @Test + public void testMetaDataNotDeleteWhenLedgerDeleteFailed() throws Exception { + BookKeeper spyBookKeeper = spy(bkc); + ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, spyBookKeeper); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setRetentionSizeInMB(0); + config.setMaxEntriesPerLedger(1); + config.setRetentionTime(1, TimeUnit.SECONDS); + ManagedLedgerImpl ml = spy((ManagedLedgerImpl) factory.open("metaDataNotDelete", config)); + ManagedCursorImpl c1 = (ManagedCursorImpl) ml.openCursor("c1noretention"); + doReturn(FutureUtil.failedFuture(new ManagedLedgerException("12345"))).when(ml).asyncDeleteLedger(anyLong(), any(LedgerInfo.class)); + ml.addEntry("testEntry".getBytes()); + c1.skipEntries(1, IndividualDeletedEntries.Include); + ml.addEntry("testEntry2".getBytes()); + c1.skipEntries(1, IndividualDeletedEntries.Include); + assertTrue(ml.getLedgersInfoAsList().size() > 1); + + //delete ledger failed, metadata should not be delete + ml.internalTrimConsumedLedgers(new CompletableFuture<>()); + assertTrue(ml.getLedgersInfoAsList().size() > 1); + + // let it success + doReturn(CompletableFuture.completedFuture(null)).when(ml).asyncDeleteLedger(anyLong(), any(LedgerInfo.class)); + // make sure reach the retention time, the interval is 1s + Thread.sleep(1000); + ml.internalTrimConsumedLedgers(new CompletableFuture<>()); + Awaitility.await().untilAsserted(() -> assertTrue(ml.getLedgersInfoAsList().size() <= 1)); + c1.close(); + ml.close(); + factory.shutdown(); + } @Test public void testDeletionAfterLedgerClosedAndRetention() throws Exception { @@ -2355,8 +2388,9 @@ public void testRetention0WithEmptyLedgerWithoutCursors() throws Exception { ml.internalTrimConsumedLedgers(CompletableFuture.completedFuture(null)); assertTrue(ml.getFirstPosition().ledgerId <= ml.lastConfirmedEntry.ledgerId); - assertFalse(ml.getLedgersInfo().containsKey(ml.lastConfirmedEntry.ledgerId), - "the ledger at lastConfirmedEntry has not been trimmed!"); + ManagedLedgerImpl finalMl = ml; + Awaitility.await().untilAsserted(() -> assertFalse(finalMl.getLedgersInfo().containsKey(finalMl.lastConfirmedEntry.ledgerId), + "the ledger at lastConfirmedEntry has not been trimmed!")); ml.close(); } From b36ab3a147165dd8b6c86e7e873d0df28f4c6bb9 Mon Sep 17 00:00:00 2001 From: feynmanlin <315157973@qq.com> Date: Mon, 3 Apr 2023 20:42:56 +0800 Subject: [PATCH 2/3] [fix][broker] Pre-judging whether Ledger exists --- .../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index bd289b90ca16c..9627d2eba14c0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2975,7 +2975,9 @@ public CompletableFuture asyncDeleteLedger(long ledgerId, long retry, CompletableFuture callbackFuture) { CompletableFuture future = callbackFuture == null ? new CompletableFuture<>() : callbackFuture; bookKeeper.asyncDeleteLedger(ledgerId, (rc, ctx) -> { - if (rc != BKException.Code.OK) { + if (isNoSuchLedgerExistsException(rc)) { + log.warn("[{}] Ledger was already deleted {}", name, ledgerId); + } else if (rc != BKException.Code.OK) { log.error("[{}] Error deleting ledger {} : {}", name, ledgerId, BKException.getMessage(rc)); if (retry - 1 <= 0) { log.warn("[{}] Failed to delete ledger after retries {}", name, ledgerId); @@ -2985,9 +2987,6 @@ public CompletableFuture asyncDeleteLedger(long ledgerId, long retry, DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS); } return; - } - if (isNoSuchLedgerExistsException(rc)) { - log.warn("[{}] Ledger was already deleted {}", name, ledgerId); } else { if (log.isDebugEnabled()) { log.debug("[{}] Deleted ledger {}", name, ledgerId); From 895582938a222de15f955623aeeab4435c625d71 Mon Sep 17 00:00:00 2001 From: feynmanlin <315157973@qq.com> Date: Mon, 24 Apr 2023 20:23:36 +0800 Subject: [PATCH 3/3] [fix][broker] Fix unit test --- .../mledger/impl/ManagedLedgerImpl.java | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 9627d2eba14c0..fb78c8848f785 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2732,7 +2732,7 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture promise) { for (LedgerInfo ls : offloadedLedgersToDelete) { log.info("[{}] Deleting offloaded ledger {} from bookkeeper - size: {}", name, ls.getLedgerId(), ls.getSize()); - futures.add(asyncDeleteLedger(ls.getLedgerId(), DEFAULT_LEDGER_DELETE_RETRIES, null)); + futures.add(asyncDeleteLedger(ls.getLedgerId(), DEFAULT_LEDGER_DELETE_RETRIES)); } if (futures.isEmpty()) { metadataMutex.unlock(); @@ -2953,13 +2953,13 @@ public CompletableFuture asyncDeleteLedger(long ledgerId, LedgerInfo info) List> futures = new ArrayList<>(); if (!info.getOffloadContext().getBookkeeperDeleted()) { // only delete if it hasn't been previously deleted for offload - futures.add(asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES, null)); + futures.add(asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES)); } if (info.getOffloadContext().hasUidMsb()) { UUID uuid = new UUID(info.getOffloadContext().getUidMsb(), info.getOffloadContext().getUidLsb()); - futures.add(OffloadUtils.cleanupOffloaded(ledgerId, uuid, config, + OffloadUtils.cleanupOffloaded(ledgerId, uuid, config, OffloadUtils.getOffloadDriverMetadata(info, config.getLedgerOffloader().getOffloadDriverMetadata()), - "Trimming", name, scheduledExecutor)); + "Trimming", name, scheduledExecutor); } if (!futures.isEmpty()) { return FutureUtil.waitForAll(futures); @@ -2967,32 +2967,32 @@ public CompletableFuture asyncDeleteLedger(long ledgerId, LedgerInfo info) return CompletableFuture.completedFuture(null); } - private void asyncDeleteLedger(long ledgerId, long retry) { - asyncDeleteLedger(ledgerId, retry, null); - } - @VisibleForTesting - public CompletableFuture asyncDeleteLedger(long ledgerId, long retry, - CompletableFuture callbackFuture) { - CompletableFuture future = callbackFuture == null ? new CompletableFuture<>() : callbackFuture; + public CompletableFuture asyncDeleteLedger(long ledgerId, long retry) { + CompletableFuture future = new CompletableFuture<>(); bookKeeper.asyncDeleteLedger(ledgerId, (rc, ctx) -> { if (isNoSuchLedgerExistsException(rc)) { log.warn("[{}] Ledger was already deleted {}", name, ledgerId); + future.complete(null); } else if (rc != BKException.Code.OK) { log.error("[{}] Error deleting ledger {} : {}", name, ledgerId, BKException.getMessage(rc)); if (retry - 1 <= 0) { log.warn("[{}] Failed to delete ledger after retries {}", name, ledgerId); future.completeExceptionally(new ManagedLedgerException(BKException.getMessage(rc))); } else { - scheduledExecutor.schedule(safeRun(() -> asyncDeleteLedger(ledgerId, retry - 1, future)), + scheduledExecutor.schedule(safeRun(() -> asyncDeleteLedger(ledgerId, retry - 1) + .thenAccept((res) -> future.complete(null)) + .exceptionally((e) -> { + future.completeExceptionally(e); + return null; + })), DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS); } - return; } else { if (log.isDebugEnabled()) { log.debug("[{}] Deleted ledger {}", name, ledgerId); } + future.complete(null); } - future.complete(null); }, null); return future; }