From 79430fc720b39e4db77b439c31b8e32dfbbf6c40 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 22 Jul 2024 17:47:41 +0800 Subject: [PATCH] [improve][broker] Reduce the pressure from the transaction buffer readers and writers in rolling restarts ### Motivation During the rolling restarts, the namespace bundle ownerships will change. Assuming there is a producer created on a single topic, and the ownership was transferred to the new broker. Assuming the namespace bundle has N topics and the namespace is `tenant/ns`, 1. All N topics in the same bundle of that topic will be loaded. 2. For each topic, the managed ledger will be initialized, when the transaction coordinator is enabled, a `TopicTransactionBuffer` will be created. 2.1 A Pulsar producer will be created on `tenant/ns/__transaction_buffer_snapshot` concurrently. 2.2 A Pulsar reader will be created on `tenant/ns/__transaction_buffer_snapshot` concurrently. 3. Once all N readers are created, the owner of the snapshot topic will start dispatching messages to N readers. Each dispatcher will read messages from BookKeeper concurrently and might fail with too many requests error because BK can only have `maxPendingReadRequestsPerThread` pending read requests (default: 10000). We have a `numTransactionReplayThreadPoolSize` config to limit the concurrency of transaction snapshot readers. However, it only limits the read loop. For example, if it's configured with 1, only 1 reader could read messages at the same time. However, N readers will be created concurrently. Each when one of these reader explicitly calls `readNext`, all N dispatchers at brokers side will dispatch messages to N readers. The behaviors above brings much CPU pressure on the owner broker, especially for a small cluster with only two brokers. ### Modifications - Synchronize the reader creation, read loop and the following process on its result. - Delay the snapshot producer creaton to when a message is written via `LazySnapshotWriter`. --- .../buffer/impl/LazySnapshotWriter.java | 63 +++++++++++++++ ...SingleSnapshotAbortedTxnProcessorImpl.java | 81 +++++++++---------- ...napshotSegmentAbortedTxnProcessorImpl.java | 31 +++---- .../buffer/impl/TopicTransactionBuffer.java | 5 +- 4 files changed, 110 insertions(+), 70 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/LazySnapshotWriter.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/LazySnapshotWriter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/LazySnapshotWriter.java new file mode 100644 index 00000000000000..c4ab4329cce438 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/LazySnapshotWriter.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.buffer.impl; + +import static org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter; +import static org.apache.pulsar.broker.systopic.SystemTopicClient.Writer; +import java.util.concurrent.CompletableFuture; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.common.naming.TopicName; + +@Slf4j +public class LazySnapshotWriter { + + private final PersistentTopic topic; + private final SystemTopicTxnBufferSnapshotService snapshotService; + private ReferenceCountedWriter writer; + + public LazySnapshotWriter(PersistentTopic topic, SystemTopicTxnBufferSnapshotService snapshotService) { + this.topic = topic; + this.snapshotService = snapshotService; + } + + public synchronized CompletableFuture> getFuture() { + if (writer != null) { + return writer.getFuture(); + } + writer = snapshotService.getReferenceWriter(TopicName.get(topic.getName()).getNamespaceObject()); + final var future = writer.getFuture(); + future.exceptionally(e -> { + synchronized (LazySnapshotWriter.this) { + // Next time `getWriter()` is called, the writer will be recreated + writer = null; + } + log.error("{} Failed to create snapshot writer", topic.getName(), e); + return null; + }); + return future; + } + + public synchronized void release() { + if (writer != null) { + writer.release(); + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java index 5c9075e9a38677..1e45642b00da48 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java @@ -21,36 +21,32 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.commons.collections4.map.LinkedMap; -import org.apache.pulsar.broker.service.BrokerServiceException; -import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor; import org.apache.pulsar.broker.transaction.buffer.metadata.AbortTxnMetadata; import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot; -import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TransactionBufferStats; -import org.apache.pulsar.common.util.FutureUtil; @Slf4j public class SingleSnapshotAbortedTxnProcessorImpl implements AbortedTxnProcessor { private final PersistentTopic topic; - private final ReferenceCountedWriter takeSnapshotWriter; /** * Aborts, map for jude message is aborted, linked for remove abort txn in memory when this * position have been deleted. */ private final LinkedMap aborts = new LinkedMap<>(); + private final LazySnapshotWriter takeSnapshotWriter; private volatile long lastSnapshotTimestamps; @@ -58,14 +54,8 @@ public class SingleSnapshotAbortedTxnProcessorImpl implements AbortedTxnProcesso public SingleSnapshotAbortedTxnProcessorImpl(PersistentTopic topic) { this.topic = topic; - this.takeSnapshotWriter = this.topic.getBrokerService().getPulsar() - .getTransactionBufferSnapshotServiceFactory() - .getTxnBufferSnapshotService().getReferenceWriter(TopicName.get(topic.getName()).getNamespaceObject()); - this.takeSnapshotWriter.getFuture().exceptionally((ex) -> { - log.error("{} Failed to create snapshot writer", topic.getName()); - topic.close(); - return null; - }); + this.takeSnapshotWriter = new LazySnapshotWriter<>(topic, topic.getBrokerService().getPulsar() + .getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService()); } @Override @@ -98,41 +88,35 @@ private long getSystemClientOperationTimeoutMs() throws Exception { @Override public CompletableFuture recoverFromSnapshot() { - return topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory() - .getTxnBufferSnapshotService() - .createReader(TopicName.get(topic.getName())).thenComposeAsync(reader -> { - try { + final var pulsar = topic.getBrokerService().getPulsar(); + final var positionFuture = new CompletableFuture(); + pulsar.getTransactionExecutorProvider().getExecutor(this).execute(() -> { + try { + final var reader = wait(pulsar.getTransactionBufferSnapshotServiceFactory() + .getTxnBufferSnapshotService().createReader(TopicName.get(topic.getName())), "create reader"); + try { Position startReadCursorPosition = null; - while (reader.hasMoreEvents()) { - Message message = reader.readNextAsync() - .get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS); - if (topic.getName().equals(message.getKey())) { - TransactionBufferSnapshot transactionBufferSnapshot = message.getValue(); - if (transactionBufferSnapshot != null) { - handleSnapshot(transactionBufferSnapshot); - startReadCursorPosition = PositionFactory.create( - transactionBufferSnapshot.getMaxReadPositionLedgerId(), - transactionBufferSnapshot.getMaxReadPositionEntryId()); - } + while (reader.hasMoreEvents()) { + final var message = wait(reader.readNextAsync(), "read message"); + if (topic.getName().equals(message.getKey())) { + TransactionBufferSnapshot transactionBufferSnapshot = message.getValue(); + if (transactionBufferSnapshot != null) { + handleSnapshot(transactionBufferSnapshot); + startReadCursorPosition = PositionFactory.create( + transactionBufferSnapshot.getMaxReadPositionLedgerId(), + transactionBufferSnapshot.getMaxReadPositionEntryId()); } } - return CompletableFuture.completedFuture(startReadCursorPosition); - } catch (TimeoutException ex) { - Throwable t = FutureUtil.unwrapCompletionException(ex); - String errorMessage = String.format("[%s] Transaction buffer recover fail by read " - + "transactionBufferSnapshot timeout!", topic.getName()); - log.error(errorMessage, t); - return FutureUtil.failedFuture( - new BrokerServiceException.ServiceUnitNotReadyException(errorMessage, t)); - } catch (Exception ex) { - log.error("[{}] Transaction buffer recover fail when read " - + "transactionBufferSnapshot!", topic.getName(), ex); - return FutureUtil.failedFuture(ex); - } finally { - closeReader(reader); } - }, topic.getBrokerService().getPulsar().getTransactionExecutorProvider() - .getExecutor(this)); + positionFuture.complete(startReadCursorPosition); + } finally { + closeReader(reader); + } + } catch (Exception e) { + positionFuture.completeExceptionally(e); + } + }); + return positionFuture; } @Override @@ -208,4 +192,11 @@ private void handleSnapshot(TransactionBufferSnapshot snapshot) { } } + private T wait(CompletableFuture future, String msg) throws Exception { + try { + return future.get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw new ExecutionException("Failed to " + msg, e.getCause()); + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java index e94e7a047797a5..9d55fdacfb2d19 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java @@ -44,7 +44,6 @@ import org.apache.commons.lang3.tuple.MutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.service.BrokerServiceException; -import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor; @@ -538,8 +537,8 @@ public class PersistentWorker { private final PersistentTopic topic; //Persistent snapshot segment and index at the single thread. - private final ReferenceCountedWriter snapshotSegmentsWriter; - private final ReferenceCountedWriter snapshotIndexWriter; + private final LazySnapshotWriter snapshotSegmentsWriter; + private final LazySnapshotWriter snapshotIndexWriter; private volatile boolean closed = false; @@ -566,24 +565,12 @@ public enum OperationType { public PersistentWorker(PersistentTopic topic) { this.topic = topic; - this.snapshotSegmentsWriter = this.topic.getBrokerService().getPulsar() - .getTransactionBufferSnapshotServiceFactory() - .getTxnBufferSnapshotSegmentService() - .getReferenceWriter(TopicName.get(topic.getName()).getNamespaceObject()); - this.snapshotSegmentsWriter.getFuture().exceptionally(ex -> { - log.error("{} Failed to create snapshot index writer", topic.getName()); - topic.close(); - return null; - }); - this.snapshotIndexWriter = this.topic.getBrokerService().getPulsar() - .getTransactionBufferSnapshotServiceFactory() - .getTxnBufferSnapshotIndexService() - .getReferenceWriter(TopicName.get(topic.getName()).getNamespaceObject()); - this.snapshotIndexWriter.getFuture().exceptionally((ex) -> { - log.error("{} Failed to create snapshot writer", topic.getName()); - topic.close(); - return null; - }); + final var snapshotServiceFactory = topic.getBrokerService().getPulsar() + .getTransactionBufferSnapshotServiceFactory(); + this.snapshotSegmentsWriter = new LazySnapshotWriter<>(topic, + snapshotServiceFactory.getTxnBufferSnapshotSegmentService()); + this.snapshotIndexWriter = new LazySnapshotWriter<>(topic, + snapshotServiceFactory.getTxnBufferSnapshotIndexService()); } public CompletableFuture appendTask(OperationType operationType, @@ -882,4 +869,4 @@ private List convertTypeToTxnIDData(List abortedTxns) { return segment; } -} \ No newline at end of file +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index b4662e5fa83ed8..7561457d11f8e4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -632,7 +632,7 @@ public void run() { this, topic.getName()); return; } - abortedTxnProcessor.recoverFromSnapshot().thenAcceptAsync(startReadCursorPosition -> { + abortedTxnProcessor.recoverFromSnapshot().thenAccept(startReadCursorPosition -> { //Transaction is not use for this topic, so just make maxReadPosition as LAC. if (startReadCursorPosition == null) { callBack.noNeedToRecover(); @@ -678,8 +678,7 @@ public void run() { closeCursor(SUBSCRIPTION_NAME); callBack.recoverComplete(); - }, topic.getBrokerService().getPulsar().getTransactionExecutorProvider() - .getExecutor(this)).exceptionally(e -> { + }).exceptionally(e -> { callBack.recoverExceptionally(e.getCause()); log.error("[{}]Transaction buffer failed to recover snapshot!", topic.getName(), e); return null;