From a968da59abd43a038b973226edbcc55cfa48e340 Mon Sep 17 00:00:00 2001 From: congbo <39078850+congbobo184@users.noreply.github.com> Date: Fri, 16 Dec 2022 17:41:33 +0800 Subject: [PATCH] [fix][txn] transaction pending ack store future not completely problem (#18943) when `MLPendingAckStoreProvider` init PendingAckStore gets the ManagedLedger config throw exception, we don't handle the exception. and the `pendingAckStoreFeture` can't be complete, topic unload will use this future to close the pendingAck. https://github.com/apache/pulsar/blob/3011946a5c3b64ed7c08b6bfb1f6492f8aaaca9c/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java#L114-L115 when getting managedledger config to fail, `pendingAckStoreFeture` will `completeExceptionally()`; when pendingAckStore init fail, close pendingAckHandle success directly mock get managedLeger config throw exception, then unload can success (cherry picked from commit 1d9956cf9fc081b219d1db14eb2686677ea63021) --- .../persistent/PersistentSubscription.java | 2 +- .../pendingack/PendingAckHandle.java | 2 +- .../impl/MLPendingAckStoreProvider.java | 25 +++++--- .../impl/PendingAckHandleDisabled.java | 2 +- .../pendingack/impl/PendingAckHandleImpl.java | 36 ++++++++--- .../broker/transaction/TransactionTest.java | 2 +- .../pendingack/PendingAckPersistentTest.java | 59 ++++++++++++++++++- 7 files changed, 105 insertions(+), 23 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index fcb4135c97ece..fe3faf35bc9d8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -877,7 +877,7 @@ public CompletableFuture close() { if (dispatcher != null && dispatcher.isConsumerConnected()) { return FutureUtil.failedFuture(new SubscriptionBusyException("Subscription has active consumers")); } - return this.pendingAckHandle.close().thenAccept(v -> { + return this.pendingAckHandle.closeAsync().thenAccept(v -> { IS_FENCED_UPDATER.set(this, TRUE); log.info("[{}][{}] Successfully closed subscription [{}]", topicName, subName, cursor); }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java index d8e16dd40156c..b51aea23e6fcb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java @@ -151,7 +151,7 @@ CompletableFuture individualAcknowledgeMessage(TxnID txnID, List close(); + CompletableFuture closeAsync(); /** * Check if the PendingAckStore is init. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java index fdff9f59146d4..0a4d201a87f89 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java @@ -34,6 +34,7 @@ import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig; @@ -131,15 +132,21 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { pendingAckStoreFuture.completeExceptionally(exception); } }, () -> true, null); - }); - }).exceptionally(e -> { - log.error("Failed to obtain the existence of ManagerLedger with topic and subscription : " - + originPersistentTopic.getSubscriptions() + " " - + subscription.getName()); - pendingAckStoreFuture.completeExceptionally( - e.getCause()); - return null; - }); + }).exceptionally(e -> { + Throwable t = FutureUtil.unwrapCompletionException(e); + log.error("[{}] [{}] Failed to get managedLedger config when init pending ack store!", + originPersistentTopic, subscription, t); + pendingAckStoreFuture.completeExceptionally(t); + return null; + + }); + }).exceptionally(e -> { + Throwable t = FutureUtil.unwrapCompletionException(e); + log.error("[{}] [{}] Failed to check the pending ack topic exist when init pending ack store!", + originPersistentTopic, subscription, t); + pendingAckStoreFuture.completeExceptionally(t); + return null; + }); return pendingAckStoreFuture; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java index e34628da887e9..e02502014e841 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java @@ -93,7 +93,7 @@ public TransactionPendingAckStats getStats(boolean lowWaterMarks) { } @Override - public CompletableFuture close() { + public CompletableFuture closeAsync() { return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index 38d81d059b78a..64652faa6e943 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -153,9 +153,11 @@ public PendingAckHandleImpl(PersistentSubscription persistentSubscription) { completeHandleFuture(); } }) - .exceptionally(t -> { + .exceptionally(e -> { + Throwable t = FutureUtil.unwrapCompletionException(e); changeToErrorState(); exceptionHandleFuture(t); + this.pendingAckStoreFuture.completeExceptionally(t); return null; }); } @@ -168,13 +170,13 @@ private void initPendingAckStore() { this.pendingAckStoreFuture.thenAccept(pendingAckStore -> { recoverTime.setRecoverStartTime(System.currentTimeMillis()); pendingAckStore.replayAsync(this, internalPinnedExecutor); - }).exceptionally(e -> { - acceptQueue.clear(); + }).exceptionallyAsync(e -> { + handleCacheRequest(); changeToErrorState(); log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: {}", topicName, subName, e); exceptionHandleFuture(e.getCause()); return null; - }); + }, internalPinnedExecutor); } } } @@ -927,9 +929,7 @@ public synchronized void completeHandleFuture() { if (!this.pendingAckHandleCompletableFuture.isDone()) { this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this); } - if (recoverTime.getRecoverStartTime() == 0L) { - return; - } else { + if (recoverTime.getRecoverStartTime() != 0L) { recoverTime.setRecoverEndTime(System.currentTimeMillis()); } } @@ -963,11 +963,29 @@ public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID) } @Override - public CompletableFuture close() { + public CompletableFuture closeAsync() { changeToCloseState(); synchronized (PendingAckHandleImpl.this) { if (this.pendingAckStoreFuture != null) { - return this.pendingAckStoreFuture.thenAccept(PendingAckStore::closeAsync); + CompletableFuture closeFuture = new CompletableFuture<>(); + this.pendingAckStoreFuture.whenComplete((pendingAckStore, e) -> { + if (e != null) { + // init pending ack store fail, close don't need to + // retry and throw exception, complete directly + closeFuture.complete(null); + } else { + pendingAckStore.closeAsync().whenComplete((q, ex) -> { + if (ex != null) { + Throwable t = FutureUtil.unwrapCompletionException(ex); + closeFuture.completeExceptionally(t); + } else { + closeFuture.complete(null); + } + }); + } + }); + + return closeFuture; } else { return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 76b91801a7317..1043078d6869e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -1299,7 +1299,7 @@ public void testPendingAckReplayChangeStateError() throws InterruptedException, public Object answer(InvocationOnMock invocation) throws Throwable { executorService.execute(()->{ PendingAckHandleImpl pendingAckHandle = (PendingAckHandleImpl) invocation.getArguments()[0]; - pendingAckHandle.close(); + pendingAckHandle.closeAsync(); MLPendingAckReplyCallBack mlPendingAckReplyCallBack = new MLPendingAckReplyCallBack(pendingAckHandle); mlPendingAckReplyCallBack.replayComplete(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java index e75c9534c5986..dc5266d88695a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java @@ -21,7 +21,10 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; - +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.testng.AssertJUnit.assertNotNull; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.ArrayList; @@ -36,6 +39,9 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.collections4.map.LinkedMap; +import org.apache.pulsar.broker.service.AbstractTopic; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.transaction.TransactionTestBase; @@ -58,6 +64,7 @@ import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -703,4 +710,54 @@ public void testGetSubPatternTopicFilterTxnInternalTopic() throws Exception { patternConsumer.close(); producer.close(); } + + @Test + public void testGetManagedLegerConfigFailThenUnload() throws Exception { + String topic = TopicName.get(TopicDomain.persistent.toString(), + NamespaceName.get(NAMESPACE1), "testGetManagedLegerConfigFailThenUnload").toString(); + + String subscriptionName = "sub"; + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .enableBatching(false) + .topic(topic).create(); + + PersistentTopic persistentTopic = + (PersistentTopic) getPulsarServiceList() + .get(0) + .getBrokerService() + .getTopic(topic, false) + .get().orElse(null); + + assertNotNull(persistentTopic); + BrokerService brokerService = spy(persistentTopic.getBrokerService()); + doReturn(FutureUtil.failedFuture(new BrokerServiceException.ServiceUnitNotReadyException("test"))) + .when(brokerService).getManagedLedgerConfig(any()); + Field field = AbstractTopic.class.getDeclaredField("brokerService"); + field.setAccessible(true); + field.set(persistentTopic, brokerService); + + // init pending ack store + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .topic(topic) + .subscribe(); + + producer.send("test"); + Transaction transaction = pulsarClient.newTransaction() + .withTransactionTimeout(30, TimeUnit.SECONDS).build().get(); + + // pending ack init fail, so the ack will throw exception + try { + consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction).get(); + } catch (Exception e) { + assertTrue(e.getCause() instanceof PulsarClientException.LookupException); + } + + // can unload success + admin.topics().unload(topic); + } }