Skip to content

Commit

Permalink
[improve][broker] Reduce the pressure from the transaction buffer rea…
Browse files Browse the repository at this point in the history
…ders and writers in rolling restarts

### Motivation

During the rolling restarts, the namespace bundle ownerships will
change. Assuming there is a producer created on a single topic, and the
ownership was transferred to the new broker. Assuming the namespace
bundle has N topics and the namespace is `tenant/ns`,
1. All N topics in the same bundle of that topic will be loaded.
2. For each topic, the managed ledger will be initialized, when the
   transaction coordinator is enabled, a `TopicTransactionBuffer` will
   be created.
   2.1 A Pulsar producer will be created on
     `tenant/ns/__transaction_buffer_snapshot` concurrently.
   2.2 A Pulsar reader will be created on
     `tenant/ns/__transaction_buffer_snapshot` concurrently.
3. Once all N readers are created, the owner of the snapshot topic will
   start dispatching messages to N readers. Each dispatcher will read
   messages from BookKeeper concurrently and might fail with too many
   requests error because BK can only have
  `maxPendingReadRequestsPerThread` pending read requests (default: 10000).

We have a `numTransactionReplayThreadPoolSize` config to limit the
concurrency of transaction snapshot readers. However, it only limits the
read loop. For example, if it's configured with 1, only 1 reader could
read messages at the same time. However, N readers will be created
concurrently. Each when one of these reader explicitly calls `readNext`,
all N dispatchers at brokers side will dispatch messages to N readers.

The behaviors above brings much CPU pressure on the owner broker,
especially for a small cluster with only two brokers.

### Modifications

- Synchronize the reader creation, read loop and the following process
  on its result. Maintain only one reader for each namespace.
  • Loading branch information
BewareMyPower committed Jul 23, 2024
1 parent c50f4af commit ffcc578
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@
import org.apache.pulsar.broker.stats.prometheus.PulsarPrometheusMetricsServlet;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
import org.apache.pulsar.broker.transaction.buffer.impl.SnapshotTableView;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
Expand Down Expand Up @@ -285,6 +286,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private PulsarMetadataEventSynchronizer localMetadataSynchronizer;
private CoordinationService coordinationService;
private TransactionBufferSnapshotServiceFactory transactionBufferSnapshotServiceFactory;
private SnapshotTableView snapshotTableView;
private MetadataStore configurationMetadataStore;
private PulsarMetadataEventSynchronizer configMetadataSynchronizer;
private boolean shouldShutdownConfigurationMetadataStore;
Expand Down Expand Up @@ -993,6 +995,10 @@ public void start() throws PulsarServerException {
MLPendingAckStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());

this.transactionBufferSnapshotServiceFactory = new TransactionBufferSnapshotServiceFactory(getClient());
this.snapshotTableView = new SnapshotTableView(
transactionBufferSnapshotServiceFactory.getTxnBufferSnapshotService(),
executor, Long.parseLong(config.getProperties().getProperty(
"brokerClient_operationTimeoutMs", "30000")));

this.transactionTimer =
new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,19 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.metadata.AbortTxnMetadata;
import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.util.FutureUtil;

@Slf4j
public class SingleSnapshotAbortedTxnProcessorImpl implements AbortedTxnProcessor {
Expand Down Expand Up @@ -91,48 +84,26 @@ public boolean checkAbortedTransaction(TxnID txnID) {
return aborts.containsKey(txnID);
}

private long getSystemClientOperationTimeoutMs() throws Exception {
PulsarClientImpl pulsarClient = (PulsarClientImpl) topic.getBrokerService().getPulsar().getClient();
return pulsarClient.getConfiguration().getOperationTimeoutMs();
}

@Override
public CompletableFuture<Position> recoverFromSnapshot() {
return topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
.getTxnBufferSnapshotService()
.createReader(TopicName.get(topic.getName())).thenComposeAsync(reader -> {
try {
Position startReadCursorPosition = null;
while (reader.hasMoreEvents()) {
Message<TransactionBufferSnapshot> message = reader.readNextAsync()
.get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
if (topic.getName().equals(message.getKey())) {
TransactionBufferSnapshot transactionBufferSnapshot = message.getValue();
if (transactionBufferSnapshot != null) {
handleSnapshot(transactionBufferSnapshot);
startReadCursorPosition = PositionFactory.create(
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
transactionBufferSnapshot.getMaxReadPositionEntryId());
}
}
}
return CompletableFuture.completedFuture(startReadCursorPosition);
} catch (TimeoutException ex) {
Throwable t = FutureUtil.unwrapCompletionException(ex);
String errorMessage = String.format("[%s] Transaction buffer recover fail by read "
+ "transactionBufferSnapshot timeout!", topic.getName());
log.error(errorMessage, t);
return FutureUtil.failedFuture(
new BrokerServiceException.ServiceUnitNotReadyException(errorMessage, t));
} catch (Exception ex) {
log.error("[{}] Transaction buffer recover fail when read "
+ "transactionBufferSnapshot!", topic.getName(), ex);
return FutureUtil.failedFuture(ex);
} finally {
closeReader(reader);
}
}, topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
.getExecutor(this));
final var future = new CompletableFuture<Position>();
final var pulsar = topic.getBrokerService().getPulsar();
pulsar.getTransactionExecutorProvider().getExecutor(this).execute(() -> {
try {
final var snapshot = pulsar.getSnapshotTableView().readLatest(topic.getName());
if (snapshot != null) {
handleSnapshot(snapshot);
final var startReadCursorPosition = PositionFactory.create(snapshot.getMaxReadPositionLedgerId(),
snapshot.getMaxReadPositionEntryId());
future.complete(startReadCursorPosition);
} else {
future.complete(null);
}
} catch (Throwable e) {
future.completeExceptionally(e);
}
});
return future;
}

@Override
Expand Down Expand Up @@ -191,13 +162,6 @@ public synchronized CompletableFuture<Void> closeAsync() {
return CompletableFuture.completedFuture(null);
}

private void closeReader(SystemTopicClient.Reader<TransactionBufferSnapshot> reader) {
reader.closeAsync().exceptionally(e -> {
log.error("[{}]Transaction buffer reader close error!", topic.getName(), e);
return null;
});
}

private void handleSnapshot(TransactionBufferSnapshot snapshot) {
if (snapshot.getAborts() != null) {
snapshot.getAborts().forEach(abortTxnMetadata ->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.transaction.buffer.impl;

import static org.apache.pulsar.broker.systopic.SystemTopicClient.Reader;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.utils.SimpleCache;

/**
* Compared with the more generic {@link org.apache.pulsar.client.api.TableView}, this table view
* - Provides just a single public method that reads the latest value synchronously.
* - Maintains multiple long-lived readers that will be expired after some time (1 minute by default).
*/
@Slf4j
public class SnapshotTableView {

// Remove the cached reader and snapshots if there is no refresh request in 1 minute
private static final long CACHE_EXPIRE_TIMEOUT_MS = 60 * 1000L;
private final Map<String, TransactionBufferSnapshot> snapshots = new ConcurrentHashMap<>();
private final SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshot> snapshotService;
private final long blockTimeoutMs;
private final SimpleCache<NamespaceName, Reader<TransactionBufferSnapshot>> readers;

public SnapshotTableView(SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshot> snapshotService,
ScheduledExecutorService executor, long blockTimeoutMs) {
this.snapshotService = snapshotService;
this.blockTimeoutMs = blockTimeoutMs;
this.readers = new SimpleCache<>(executor, CACHE_EXPIRE_TIMEOUT_MS);
}

public TransactionBufferSnapshot readLatest(String topic) throws Exception {
final var topicName = TopicName.get(topic);
final var namespace = topicName.getNamespaceObject();
final var reader = readers.get(namespace, () -> {
try {
return wait(snapshotService.createReader(topicName), "create reader");
} catch (Exception e) {
throw new RuntimeException(e);
}
}, __ -> __.closeAsync().exceptionally(e -> {
log.warn("Failed to close reader {}", e.getMessage());
return null;
}));
while (wait(reader.hasMoreEventsAsync(), "has more events")) {
final var msg = wait(reader.readNextAsync(), "read message");
if (msg.getKey() != null) {
snapshots.put(msg.getKey(), msg.getValue());
}
}
return snapshots.get(topic);
}

private <T> T wait(CompletableFuture<T> future, String msg) throws Exception {
try {
return future.get(blockTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw new ExecutionException("Failed to " + msg, e.getCause());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.utils;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
public class SimpleCache<K, V> {

private final Map<K, V> cache = new HashMap<>();
private final Map<K, ScheduledFuture<?>> futures = new HashMap<>();
private final ScheduledExecutorService executor;
private final long timeoutMs;

public synchronized V get(final K key, final Supplier<V> valueSupplier, final Consumer<V> expireCallback) {
final V value;
V existingValue = cache.get(key);
if (existingValue != null) {
value = existingValue;
} else {
value = valueSupplier.get();
cache.put(key, value);
}
final var future = futures.remove(key);
if (future != null) {
future.cancel(true);
}
futures.put(key, executor.schedule(() -> {
synchronized (SimpleCache.this) {
futures.remove(key);
final var removedValue = cache.remove(key);
if (removedValue != null) {
expireCallback.accept(removedValue);
}
}
}, timeoutMs, TimeUnit.MILLISECONDS));
return value;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.utils;

import java.util.Collections;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;

public class SimpleCacheTest {

private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

@AfterClass
public void shutdown() {
executor.shutdown();
}

@Test
public void testConcurrentUpdate() throws Exception {
final var cache = new SimpleCache<Integer, Integer>(executor, 10000L);
final var pool = Executors.newFixedThreadPool(2);
final var latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
final var value = i + 100;
pool.execute(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
}
cache.get(0, () -> value, __ -> {});
latch.countDown();
});
}
latch.await();
final var value = cache.get(0, () -> -1, __ -> {});
Assert.assertTrue(value == 100 || value == 101);
pool.shutdown();
}

@Test
public void testExpire() throws InterruptedException {
final var cache = new SimpleCache<Integer, Integer>(executor, 500L);
final var expiredValues = new CopyOnWriteArrayList<Integer>();
cache.get(0, () -> 100, expiredValues::add);
for (int i = 0; i < 100; i++) {
cache.get(1, () -> 101, expiredValues::add);
Thread.sleep(10);
}
Assert.assertEquals(cache.get(0, () -> -1, __ -> {}), -1); // the value is expired
Assert.assertEquals(cache.get(1, () -> -1, __ -> {}), 101);
Assert.assertEquals(expiredValues, Collections.singletonList(100));
}
}

0 comments on commit ffcc578

Please sign in to comment.