Skip to content

Commit

Permalink
[fix][txn]Handle exceptions in the transaction pending ack init (#21274)
Browse files Browse the repository at this point in the history
Co-authored-by: Baodi Shi <[email protected]>
  • Loading branch information
liangyepianzhou and shibd authored Apr 15, 2024
1 parent 837f8bc commit 5d18ff7
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1376,7 +1376,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
// Send error back to client, only if not completed already.
if (consumerFuture.completeExceptionally(exception)) {
commandSender.sendErrorResponse(requestId,
BrokerServiceException.getClientErrorCode(exception),
BrokerServiceException.getClientErrorCode(exception.getCause()),
exception.getCause().getMessage());
}
consumers.remove(consumerId, consumerFuture);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.compareToWithAckSet;
import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetOverlap;
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.Timer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -35,16 +36,19 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.Consumer;
Expand All @@ -53,7 +57,9 @@
import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandleStats;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
Expand Down Expand Up @@ -134,6 +140,12 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi

public final RecoverTimeRecord recoverTime = new RecoverTimeRecord();

private final long pendingAckInitFailureBackoffInitialTimeInMs = 100;

public final Backoff backoff = new Backoff(pendingAckInitFailureBackoffInitialTimeInMs, TimeUnit.MILLISECONDS,
1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);

private final Timer transactionOpTimer;

public PendingAckHandleImpl(PersistentSubscription persistentSubscription) {
super(State.None);
Expand All @@ -153,7 +165,11 @@ public PendingAckHandleImpl(PersistentSubscription persistentSubscription) {

this.pendingAckStoreProvider = this.persistentSubscription.getTopic()
.getBrokerService().getPulsar().getTransactionPendingAckStoreProvider();
transactionOpTimer = persistentSubscription.getTopic().getBrokerService().getPulsar().getTransactionTimer();
init();
}

private void init() {
pendingAckStoreProvider.checkInitializedBefore(persistentSubscription)
.thenAcceptAsync(init -> {
if (init) {
Expand All @@ -164,9 +180,9 @@ public PendingAckHandleImpl(PersistentSubscription persistentSubscription) {
}, internalPinnedExecutor)
.exceptionallyAsync(e -> {
Throwable t = FutureUtil.unwrapCompletionException(e);
changeToErrorState();
// Handling the exceptions in `exceptionHandleFuture`,
// it will be helpful to make the exception handling clearer.
exceptionHandleFuture(t);
this.pendingAckStoreFuture.completeExceptionally(t);
return null;
}, internalPinnedExecutor);
}
Expand All @@ -180,9 +196,8 @@ private void initPendingAckStore() {
recoverTime.setRecoverStartTime(System.currentTimeMillis());
pendingAckStore.replayAsync(this, internalPinnedExecutor);
}).exceptionallyAsync(e -> {
handleCacheRequest();
changeToErrorState();
log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: {}", topicName, subName, e);
// Handling the exceptions in `exceptionHandleFuture`,
// it will be helpful to make the exception handling clearer.
exceptionHandleFuture(e.getCause());
return null;
}, internalPinnedExecutor);
Expand Down Expand Up @@ -945,12 +960,39 @@ public void completeHandleFuture() {
}

public void exceptionHandleFuture(Throwable t) {
final boolean completedNow = this.pendingAckHandleCompletableFuture.completeExceptionally(t);
if (isRetryableException(t)) {
this.state = State.None;
long retryTime = backoff.next();
log.warn("[{}][{}] Failed to init transaction pending ack. It will be retried in {} Ms",
persistentSubscription.getTopic().getName(), subName, retryTime, t);
transactionOpTimer.newTimeout((timeout) -> init(), retryTime, TimeUnit.MILLISECONDS);
return;
}
log.error("[{}] [{}] PendingAckHandleImpl init fail!", topicName, subName, t);
handleCacheRequest();
changeToErrorState();
// ToDo: Add a new serverError `TransactionComponentLoadFailedException`
// and before that a `Unknown` will be returned first.
this.pendingAckStoreFuture = FutureUtil.failedFuture(new BrokerServiceException(
String.format("[%s][%s] Failed to init transaction pending ack.", topicName, subName)));
final boolean completedNow = this.pendingAckHandleCompletableFuture.completeExceptionally(
new BrokerServiceException(
String.format("[%s][%s] Failed to init transaction pending ack.", topicName, subName)));
if (completedNow) {
recoverTime.setRecoverEndTime(System.currentTimeMillis());
}
}

private static boolean isRetryableException(Throwable ex) {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
return (realCause instanceof ManagedLedgerException
&& !(realCause instanceof ManagedLedgerException.ManagedLedgerFencedException)
&& !(realCause instanceof ManagedLedgerException.NonRecoverableLedgerException))
|| realCause instanceof PulsarClientException.BrokerPersistenceException
|| realCause instanceof PulsarClientException.LookupException
|| realCause instanceof PulsarClientException.ConnectException;
}

@Override
public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID) {
TransactionInPendingAckStats transactionInPendingAckStats = new TransactionInPendingAckStats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1517,7 +1517,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
fail("Expect failure by PendingAckHandle closed, but success");
} catch (ExecutionException executionException){
Throwable t = executionException.getCause();
Assert.assertTrue(t instanceof BrokerServiceException.ServiceUnitNotReadyException);
Assert.assertTrue(t instanceof BrokerServiceException);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.assertTrue;
Expand All @@ -44,9 +46,11 @@
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.pulsar.PrometheusMetricsTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
Expand All @@ -59,6 +63,7 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
Expand Down Expand Up @@ -100,6 +105,83 @@ protected void cleanup() {
super.internalCleanup();
}

/**
* Test consumer can be built successfully with retryable exception
* and get correct error with no-retryable exception.
* @throws Exception
*/
@Test(timeOut = 60000)
public void testBuildConsumerEncounterPendingAckInitFailure() throws Exception {
// 1. Prepare and make sure the consumer can be built successfully.
String topic = NAMESPACE1 + "/testUnloadSubscriptionWhenFailedInitPendingAck";
@Cleanup
Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.subscriptionName("subName1")
.topic(topic)
.subscribe();
// 2. Mock a transactionPendingAckStoreProvider to test building consumer
// failing at transactionPendingAckStoreProvider::checkInitializedBefore.
Field transactionPendingAckStoreProviderField = PulsarService.class
.getDeclaredField("transactionPendingAckStoreProvider");
transactionPendingAckStoreProviderField.setAccessible(true);
TransactionPendingAckStoreProvider pendingAckStoreProvider =
(TransactionPendingAckStoreProvider) transactionPendingAckStoreProviderField
.get(pulsarServiceList.get(0));
TransactionPendingAckStoreProvider mockProvider = mock(pendingAckStoreProvider.getClass());
// 3. Test retryable exception when checkInitializedBefore:
// The consumer will be built successfully after one time retry.
when(mockProvider.checkInitializedBefore(any()))
// First, the method checkInitializedBefore will fail with a retryable exception.
.thenReturn(FutureUtil.failedFuture(new ManagedLedgerException("mock fail initialize")))
// Then, the method will be executed successfully.
.thenReturn(CompletableFuture.completedFuture(false));
transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), mockProvider);
@Cleanup
Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
.subscriptionName("subName2")
.topic(topic)
.subscribe();

// 4. Test retryable exception when newPendingAckStore:
// The consumer will be built successfully after one time retry.
when(mockProvider.checkInitializedBefore(any()))
.thenReturn(CompletableFuture.completedFuture(true));

when(mockProvider.newPendingAckStore(any()))
// First, the method newPendingAckStore will fail with a retryable exception.
.thenReturn(FutureUtil.failedFuture(new ManagedLedgerException("mock fail new store")))
// Then, the method will be executed successfully.
.thenCallRealMethod();
transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), mockProvider);
@Cleanup
Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
.subscriptionName("subName3")
.topic(topic)
.subscribe();

// 5. Test no-retryable exception:
// The consumer building will be failed without retrying.
when(mockProvider.checkInitializedBefore(any()))
// The method checkInitializedBefore will fail with a no-retryable exception without retrying.
.thenReturn(FutureUtil.failedFuture(new ManagedLedgerException
.NonRecoverableLedgerException("mock fail")))
.thenReturn(CompletableFuture.completedFuture(false));
@Cleanup PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
.operationTimeout(3, TimeUnit.SECONDS)
.build();
try {
@Cleanup
Consumer<byte[]> consumer4 = pulsarClient.newConsumer()
.subscriptionName("subName4")
.topic(topic)
.subscribe();
fail();
} catch (Exception exception) {
assertTrue(exception.getMessage().contains("Failed to init transaction pending ack."));
}
}

@Test
public void individualPendingAckReplayTest() throws Exception {
int messageCount = 1000;
Expand Down

0 comments on commit 5d18ff7

Please sign in to comment.