From 1921c46f96193603304bf9168483246dccd80186 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 19 Dec 2022 10:20:03 +0800 Subject: [PATCH] [fix] [tx] [branch-2.10] Transaction buffer recover blocked by readNext (#18971) ### Motivation Since PR #18833 can not cherry-pick to `branch-2.10`, create a separate PR. #### Context for Transaction Buffer - If turn on `transactionCoordinatorEnabled`, then `TransactionBuffer` will be initialized when a user topic create. - The `TransactionBuffer` reads the aborted logs of transactions from topic `__transaction_buffer_snapshot` -- this process is called `recovery`. - During recovery, the reading from that snapshot ledger is done via a `Reader`; the reader works like this: ``` while (reader.hasMessageAvailable()){ reader.readNext(); } ``` #### Context for Compaction - After [pip-14](https://github.com/apache/pulsar/wiki/PIP-14:-Topic-compaction), the consumer that enabled feature read-compacted will read messages from the compacted topic instead of the original topic if the task-compaction is done, and read messages from the original topic if task-compaction is not done. - If the data of the last message with key k sent to a topic is null, the compactor will mark all messages for that key as deleted. #### Issue There is a race condition: after executing `reader.hasMessageAvailable`, the following messages have been deleted by compaction-task, so read next will be blocked because there have no messages to read. ---- ### Modifications - If hits this issue, do recover again. ---- #### Why not just let the client try to load the topic again to retry the recover? If the topic load is failed, the client will receive an error response. This is a behavior that we can handle, so should not be perceived by the users. (cherry picked from commit f7dc64f900c1d14d534f572389a69cf590a05263) --- .../buffer/impl/TopicTransactionBuffer.java | 24 +++++- .../TopicTransactionBufferRecoverTest.java | 81 +++++++++++++++++++ 2 files changed, 101 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 791cd39ee0f4f..1d5400055d75f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -52,6 +53,7 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TransactionBufferStats; @@ -59,6 +61,7 @@ import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.Markers; import org.apache.pulsar.common.util.Codec; +import org.apache.pulsar.common.util.FutureUtil; import org.jctools.queues.MessagePassingQueue; import org.jctools.queues.SpscArrayQueue; @@ -601,6 +604,11 @@ private TopicTransactionBufferRecover(TopicTransactionBufferRecoverCallBack call this.takeSnapshotWriter = takeSnapshotWriter; } + private long getSystemClientOperationTimeoutMs() throws Exception { + PulsarClientImpl pulsarClient = (PulsarClientImpl) topic.getBrokerService().getPulsar().getClient(); + return pulsarClient.getConfiguration().getOperationTimeoutMs(); + } + @SneakyThrows @Override public void run() { @@ -615,7 +623,8 @@ public void run() { try { boolean hasSnapshot = false; while (reader.hasMoreEvents()) { - Message message = reader.readNext(); + Message message = reader.readNextAsync() + .get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS); if (topic.getName().equals(message.getKey())) { TransactionBufferSnapshot transactionBufferSnapshot = message.getValue(); if (transactionBufferSnapshot != null) { @@ -628,18 +637,25 @@ public void run() { } } if (!hasSnapshot) { - closeReader(reader); callBack.noNeedToRecover(); return; } + } catch (TimeoutException ex) { + Throwable t = FutureUtil.unwrapCompletionException(ex); + String errorMessage = String.format("[%s] Transaction buffer recover fail by read " + + "transactionBufferSnapshot timeout!", topic.getName()); + log.error(errorMessage, t); + callBack.recoverExceptionally( + new BrokerServiceException.ServiceUnitNotReadyException(errorMessage, t)); + return; } catch (Exception ex) { log.error("[{}] Transaction buffer recover fail when read " + "transactionBufferSnapshot!", topic.getName(), ex); callBack.recoverExceptionally(ex); - closeReader(reader); return; + } finally { + closeReader(reader); } - closeReader(reader); ManagedCursor managedCursor; try { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java index 08f4f67e307f7..3fdb9605c9f61 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java @@ -20,9 +20,12 @@ import static org.apache.pulsar.common.events.EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; @@ -33,9 +36,11 @@ import java.lang.reflect.Method; import java.util.NavigableMap; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; @@ -89,6 +94,7 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase { private static final int NUM_PARTITIONS = 16; @BeforeMethod protected void setup() throws Exception { + conf.getProperties().setProperty("brokerClient_operationTimeoutMs", Integer.valueOf(10 * 1000).toString()); setUpBase(1, NUM_PARTITIONS, RECOVER_COMMIT, 0); admin.topics().createNonPartitionedTopic(RECOVER_ABORT); admin.topics().createNonPartitionedTopic(TAKE_SNAPSHOT); @@ -225,6 +231,81 @@ private void recoverTest(String testTopic) throws Exception { } + private void makeTBSnapshotReaderTimeoutIfFirstRead(TopicName topicName) throws Exception { + SystemTopicClient.Reader mockReader = mock(SystemTopicClient.Reader.class); + AtomicBoolean isFirstCallOfMethodHasMoreEvents = new AtomicBoolean(); + AtomicBoolean isFirstCallOfMethodHasReadNext = new AtomicBoolean(); + AtomicBoolean isFirstCallOfMethodHasReadNextAsync = new AtomicBoolean(); + + doAnswer(invocation -> { + if (isFirstCallOfMethodHasMoreEvents.compareAndSet(false,true)){ + return true; + } else { + return false; + } + }).when(mockReader).hasMoreEvents(); + + doAnswer(invocation -> { + if (isFirstCallOfMethodHasReadNext.compareAndSet(false, true)){ + // Just stuck the thread. + Thread.sleep(3600 * 1000); + } + return null; + }).when(mockReader).readNext(); + + doAnswer(invocation -> { + CompletableFuture future = new CompletableFuture<>(); + new Thread(() -> { + if (isFirstCallOfMethodHasReadNextAsync.compareAndSet(false, true)){ + // Just stuck the thread. + try { + Thread.sleep(3600 * 1000); + } catch (InterruptedException e) { + } + future.complete(null); + } else { + future.complete(null); + } + }).start(); + return future; + }).when(mockReader).readNextAsync(); + + when(mockReader.closeAsync()).thenReturn(CompletableFuture.completedFuture(null)); + + for (PulsarService pulsarService : pulsarServiceList){ + // Init prop: lastMessageIdInBroker. + final TransactionBufferSnapshotService tbSnapshotService = + pulsarService.getTransactionBufferSnapshotService(); + TransactionBufferSnapshotService spyTbSnapshotService = spy(tbSnapshotService); + doAnswer(invocation -> CompletableFuture.completedFuture(mockReader)) + .when(spyTbSnapshotService).createReader(topicName); + Field field = + PulsarService.class.getDeclaredField("transactionBufferSnapshotService"); + field.setAccessible(true); + field.set(pulsarService, spyTbSnapshotService); + } + } + + @Test(timeOut = 60 * 1000) + public void testTBRecoverCanRetryIfTimeoutRead() throws Exception { + String topicName = String.format("persistent://%s/%s", NAMESPACE1, + "tx_recover_" + UUID.randomUUID().toString().replaceAll("-", "_")); + + // Make race condition of "getLastMessageId" and "compaction" to make recover can't complete. + makeTBSnapshotReaderTimeoutIfFirstRead(TopicName.get(topicName)); + // Verify( Cmd-PRODUCER will wait for TB recover finished ) + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .sendTimeout(0, TimeUnit.SECONDS) + .enableBatching(false) + .batchingMaxMessages(2) + .create(); + + // cleanup. + producer.close(); + admin.topics().delete(topicName, false); + } + @Test private void testTakeSnapshot() throws IOException, ExecutionException, InterruptedException {