Skip to content

Commit

Permalink
[fix][txn] transaction pending ack store future not completely problem (
Browse files Browse the repository at this point in the history
#18943)

when `MLPendingAckStoreProvider` init PendingAckStore gets the ManagedLedger config throw exception, we don't handle the exception. and the `pendingAckStoreFeture` can't be complete, topic unload will use this future to close the pendingAck.
https://github.com/apache/pulsar/blob/3011946a5c3b64ed7c08b6bfb1f6492f8aaaca9c/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java#L114-L115
when getting managedledger config to fail, `pendingAckStoreFeture` will `completeExceptionally()`;

when pendingAckStore init fail, close pendingAckHandle success directly

mock get managedLeger config throw exception, then unload can success

(cherry picked from commit 1d9956c)
  • Loading branch information
congbobo184 authored and congbobo184 committed Dec 16, 2022
1 parent d8fa685 commit a968da5
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,7 @@ public CompletableFuture<Void> close() {
if (dispatcher != null && dispatcher.isConsumerConnected()) {
return FutureUtil.failedFuture(new SubscriptionBusyException("Subscription has active consumers"));
}
return this.pendingAckHandle.close().thenAccept(v -> {
return this.pendingAckHandle.closeAsync().thenAccept(v -> {
IS_FENCED_UPDATER.set(this, TRUE);
log.info("[{}][{}] Successfully closed subscription [{}]", topicName, subName, cursor);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID, List<MutablePa
*
* @return the future of this operation.
*/
CompletableFuture<Void> close();
CompletableFuture<Void> closeAsync();

/**
* Check if the PendingAckStore is init.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;


Expand Down Expand Up @@ -131,15 +132,21 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
pendingAckStoreFuture.completeExceptionally(exception);
}
}, () -> true, null);
});
}).exceptionally(e -> {
log.error("Failed to obtain the existence of ManagerLedger with topic and subscription : "
+ originPersistentTopic.getSubscriptions() + " "
+ subscription.getName());
pendingAckStoreFuture.completeExceptionally(
e.getCause());
return null;
});
}).exceptionally(e -> {
Throwable t = FutureUtil.unwrapCompletionException(e);
log.error("[{}] [{}] Failed to get managedLedger config when init pending ack store!",
originPersistentTopic, subscription, t);
pendingAckStoreFuture.completeExceptionally(t);
return null;

});
}).exceptionally(e -> {
Throwable t = FutureUtil.unwrapCompletionException(e);
log.error("[{}] [{}] Failed to check the pending ack topic exist when init pending ack store!",
originPersistentTopic, subscription, t);
pendingAckStoreFuture.completeExceptionally(t);
return null;
});
return pendingAckStoreFuture;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public TransactionPendingAckStats getStats(boolean lowWaterMarks) {
}

@Override
public CompletableFuture<Void> close() {
public CompletableFuture<Void> closeAsync() {
return CompletableFuture.completedFuture(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,11 @@ public PendingAckHandleImpl(PersistentSubscription persistentSubscription) {
completeHandleFuture();
}
})
.exceptionally(t -> {
.exceptionally(e -> {
Throwable t = FutureUtil.unwrapCompletionException(e);
changeToErrorState();
exceptionHandleFuture(t);
this.pendingAckStoreFuture.completeExceptionally(t);
return null;
});
}
Expand All @@ -168,13 +170,13 @@ private void initPendingAckStore() {
this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
recoverTime.setRecoverStartTime(System.currentTimeMillis());
pendingAckStore.replayAsync(this, internalPinnedExecutor);
}).exceptionally(e -> {
acceptQueue.clear();
}).exceptionallyAsync(e -> {
handleCacheRequest();
changeToErrorState();
log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: {}", topicName, subName, e);
exceptionHandleFuture(e.getCause());
return null;
});
}, internalPinnedExecutor);
}
}
}
Expand Down Expand Up @@ -927,9 +929,7 @@ public synchronized void completeHandleFuture() {
if (!this.pendingAckHandleCompletableFuture.isDone()) {
this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this);
}
if (recoverTime.getRecoverStartTime() == 0L) {
return;
} else {
if (recoverTime.getRecoverStartTime() != 0L) {
recoverTime.setRecoverEndTime(System.currentTimeMillis());
}
}
Expand Down Expand Up @@ -963,11 +963,29 @@ public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID)
}

@Override
public CompletableFuture<Void> close() {
public CompletableFuture<Void> closeAsync() {
changeToCloseState();
synchronized (PendingAckHandleImpl.this) {
if (this.pendingAckStoreFuture != null) {
return this.pendingAckStoreFuture.thenAccept(PendingAckStore::closeAsync);
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
this.pendingAckStoreFuture.whenComplete((pendingAckStore, e) -> {
if (e != null) {
// init pending ack store fail, close don't need to
// retry and throw exception, complete directly
closeFuture.complete(null);
} else {
pendingAckStore.closeAsync().whenComplete((q, ex) -> {
if (ex != null) {
Throwable t = FutureUtil.unwrapCompletionException(ex);
closeFuture.completeExceptionally(t);
} else {
closeFuture.complete(null);
}
});
}
});

return closeFuture;
} else {
return CompletableFuture.completedFuture(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1299,7 +1299,7 @@ public void testPendingAckReplayChangeStateError() throws InterruptedException,
public Object answer(InvocationOnMock invocation) throws Throwable {
executorService.execute(()->{
PendingAckHandleImpl pendingAckHandle = (PendingAckHandleImpl) invocation.getArguments()[0];
pendingAckHandle.close();
pendingAckHandle.closeAsync();
MLPendingAckReplyCallBack mlPendingAckReplyCallBack
= new MLPendingAckReplyCallBack(pendingAckHandle);
mlPendingAckReplyCallBack.replayComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.testng.AssertJUnit.assertNotNull;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
Expand All @@ -36,6 +39,9 @@
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
Expand All @@ -58,6 +64,7 @@
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -703,4 +710,54 @@ public void testGetSubPatternTopicFilterTxnInternalTopic() throws Exception {
patternConsumer.close();
producer.close();
}

@Test
public void testGetManagedLegerConfigFailThenUnload() throws Exception {
String topic = TopicName.get(TopicDomain.persistent.toString(),
NamespaceName.get(NAMESPACE1), "testGetManagedLegerConfigFailThenUnload").toString();

String subscriptionName = "sub";

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(false)
.topic(topic).create();

PersistentTopic persistentTopic =
(PersistentTopic) getPulsarServiceList()
.get(0)
.getBrokerService()
.getTopic(topic, false)
.get().orElse(null);

assertNotNull(persistentTopic);
BrokerService brokerService = spy(persistentTopic.getBrokerService());
doReturn(FutureUtil.failedFuture(new BrokerServiceException.ServiceUnitNotReadyException("test")))
.when(brokerService).getManagedLedgerConfig(any());
Field field = AbstractTopic.class.getDeclaredField("brokerService");
field.setAccessible(true);
field.set(persistentTopic, brokerService);

// init pending ack store
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.topic(topic)
.subscribe();

producer.send("test");
Transaction transaction = pulsarClient.newTransaction()
.withTransactionTimeout(30, TimeUnit.SECONDS).build().get();

// pending ack init fail, so the ack will throw exception
try {
consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction).get();
} catch (Exception e) {
assertTrue(e.getCause() instanceof PulsarClientException.LookupException);
}

// can unload success
admin.topics().unload(topic);
}
}

0 comments on commit a968da5

Please sign in to comment.