diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 791cd39ee0f4f..1d5400055d75f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -52,6 +53,7 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TransactionBufferStats; @@ -59,6 +61,7 @@ import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.Markers; import org.apache.pulsar.common.util.Codec; +import org.apache.pulsar.common.util.FutureUtil; import org.jctools.queues.MessagePassingQueue; import org.jctools.queues.SpscArrayQueue; @@ -601,6 +604,11 @@ private TopicTransactionBufferRecover(TopicTransactionBufferRecoverCallBack call this.takeSnapshotWriter = takeSnapshotWriter; } + private long getSystemClientOperationTimeoutMs() throws Exception { + PulsarClientImpl pulsarClient = (PulsarClientImpl) topic.getBrokerService().getPulsar().getClient(); + return pulsarClient.getConfiguration().getOperationTimeoutMs(); + } + @SneakyThrows @Override public void run() { @@ -615,7 +623,8 @@ public void run() { try { boolean hasSnapshot = false; while (reader.hasMoreEvents()) { - Message message = reader.readNext(); + Message message = reader.readNextAsync() + .get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS); if (topic.getName().equals(message.getKey())) { TransactionBufferSnapshot transactionBufferSnapshot = message.getValue(); if (transactionBufferSnapshot != null) { @@ -628,18 +637,25 @@ public void run() { } } if (!hasSnapshot) { - closeReader(reader); callBack.noNeedToRecover(); return; } + } catch (TimeoutException ex) { + Throwable t = FutureUtil.unwrapCompletionException(ex); + String errorMessage = String.format("[%s] Transaction buffer recover fail by read " + + "transactionBufferSnapshot timeout!", topic.getName()); + log.error(errorMessage, t); + callBack.recoverExceptionally( + new BrokerServiceException.ServiceUnitNotReadyException(errorMessage, t)); + return; } catch (Exception ex) { log.error("[{}] Transaction buffer recover fail when read " + "transactionBufferSnapshot!", topic.getName(), ex); callBack.recoverExceptionally(ex); - closeReader(reader); return; + } finally { + closeReader(reader); } - closeReader(reader); ManagedCursor managedCursor; try { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java index 08f4f67e307f7..3fdb9605c9f61 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java @@ -20,9 +20,12 @@ import static org.apache.pulsar.common.events.EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; @@ -33,9 +36,11 @@ import java.lang.reflect.Method; import java.util.NavigableMap; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; @@ -89,6 +94,7 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase { private static final int NUM_PARTITIONS = 16; @BeforeMethod protected void setup() throws Exception { + conf.getProperties().setProperty("brokerClient_operationTimeoutMs", Integer.valueOf(10 * 1000).toString()); setUpBase(1, NUM_PARTITIONS, RECOVER_COMMIT, 0); admin.topics().createNonPartitionedTopic(RECOVER_ABORT); admin.topics().createNonPartitionedTopic(TAKE_SNAPSHOT); @@ -225,6 +231,81 @@ private void recoverTest(String testTopic) throws Exception { } + private void makeTBSnapshotReaderTimeoutIfFirstRead(TopicName topicName) throws Exception { + SystemTopicClient.Reader mockReader = mock(SystemTopicClient.Reader.class); + AtomicBoolean isFirstCallOfMethodHasMoreEvents = new AtomicBoolean(); + AtomicBoolean isFirstCallOfMethodHasReadNext = new AtomicBoolean(); + AtomicBoolean isFirstCallOfMethodHasReadNextAsync = new AtomicBoolean(); + + doAnswer(invocation -> { + if (isFirstCallOfMethodHasMoreEvents.compareAndSet(false,true)){ + return true; + } else { + return false; + } + }).when(mockReader).hasMoreEvents(); + + doAnswer(invocation -> { + if (isFirstCallOfMethodHasReadNext.compareAndSet(false, true)){ + // Just stuck the thread. + Thread.sleep(3600 * 1000); + } + return null; + }).when(mockReader).readNext(); + + doAnswer(invocation -> { + CompletableFuture future = new CompletableFuture<>(); + new Thread(() -> { + if (isFirstCallOfMethodHasReadNextAsync.compareAndSet(false, true)){ + // Just stuck the thread. + try { + Thread.sleep(3600 * 1000); + } catch (InterruptedException e) { + } + future.complete(null); + } else { + future.complete(null); + } + }).start(); + return future; + }).when(mockReader).readNextAsync(); + + when(mockReader.closeAsync()).thenReturn(CompletableFuture.completedFuture(null)); + + for (PulsarService pulsarService : pulsarServiceList){ + // Init prop: lastMessageIdInBroker. + final TransactionBufferSnapshotService tbSnapshotService = + pulsarService.getTransactionBufferSnapshotService(); + TransactionBufferSnapshotService spyTbSnapshotService = spy(tbSnapshotService); + doAnswer(invocation -> CompletableFuture.completedFuture(mockReader)) + .when(spyTbSnapshotService).createReader(topicName); + Field field = + PulsarService.class.getDeclaredField("transactionBufferSnapshotService"); + field.setAccessible(true); + field.set(pulsarService, spyTbSnapshotService); + } + } + + @Test(timeOut = 60 * 1000) + public void testTBRecoverCanRetryIfTimeoutRead() throws Exception { + String topicName = String.format("persistent://%s/%s", NAMESPACE1, + "tx_recover_" + UUID.randomUUID().toString().replaceAll("-", "_")); + + // Make race condition of "getLastMessageId" and "compaction" to make recover can't complete. + makeTBSnapshotReaderTimeoutIfFirstRead(TopicName.get(topicName)); + // Verify( Cmd-PRODUCER will wait for TB recover finished ) + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .sendTimeout(0, TimeUnit.SECONDS) + .enableBatching(false) + .batchingMaxMessages(2) + .create(); + + // cleanup. + producer.close(); + admin.topics().delete(topicName, false); + } + @Test private void testTakeSnapshot() throws IOException, ExecutionException, InterruptedException {