diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 492db888fb260..a63334623a48b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -124,7 +124,6 @@
import org.apache.pulsar.broker.stats.prometheus.PulsarPrometheusMetricsServlet;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
-import org.apache.pulsar.broker.transaction.buffer.impl.SnapshotTableView;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
@@ -286,7 +285,6 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private PulsarMetadataEventSynchronizer localMetadataSynchronizer;
private CoordinationService coordinationService;
private TransactionBufferSnapshotServiceFactory transactionBufferSnapshotServiceFactory;
- private SnapshotTableView snapshotTableView;
private MetadataStore configurationMetadataStore;
private PulsarMetadataEventSynchronizer configMetadataSynchronizer;
private boolean shouldShutdownConfigurationMetadataStore;
@@ -994,8 +992,7 @@ public void start() throws PulsarServerException {
MLTransactionMetadataStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());
MLPendingAckStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());
- this.transactionBufferSnapshotServiceFactory = new TransactionBufferSnapshotServiceFactory(getClient());
- this.snapshotTableView = new SnapshotTableView(this);
+ this.transactionBufferSnapshotServiceFactory = new TransactionBufferSnapshotServiceFactory(this);
this.transactionTimer =
new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer"));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
index bd1b90981695e..ba6cbee355775 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
@@ -22,12 +22,16 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.systopic.SystemTopicClientBase;
-import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.broker.transaction.buffer.impl.TableView;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
@@ -42,6 +46,8 @@ public class SystemTopicTxnBufferSnapshotService {
protected final EventType systemTopicType;
private final ConcurrentHashMap> refCountedWriterMap;
+ @Getter
+ private final TableView tableView;
// The class ReferenceCountedWriter will maintain the reference count,
// when the reference count decrement to 0, it will be removed from writerFutureMap, the writer will be closed.
@@ -95,13 +101,16 @@ public synchronized void release() {
}
- public SystemTopicTxnBufferSnapshotService(PulsarClient client, EventType systemTopicType,
- Class schemaType) {
+ public SystemTopicTxnBufferSnapshotService(PulsarService pulsar, EventType systemTopicType,
+ Class schemaType) throws PulsarServerException {
+ final var client = (PulsarClientImpl) pulsar.getClient();
this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(client);
this.systemTopicType = systemTopicType;
this.schemaType = schemaType;
this.clients = new ConcurrentHashMap<>();
this.refCountedWriterMap = new ConcurrentHashMap<>();
+ this.tableView = new TableView<>(this::createReader,
+ client.getConfiguration().getOperationTimeoutMs(), pulsar.getExecutor());
}
public CompletableFuture> createReader(TopicName topicName) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotServiceFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotServiceFactory.java
index 4b8548fae47c7..d54f65572f594 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotServiceFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotServiceFactory.java
@@ -18,12 +18,15 @@
*/
package org.apache.pulsar.broker.service;
+import lombok.Getter;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment;
-import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.events.EventType;
+@Getter
public class TransactionBufferSnapshotServiceFactory {
private SystemTopicTxnBufferSnapshotService txnBufferSnapshotService;
@@ -33,29 +36,16 @@ public class TransactionBufferSnapshotServiceFactory {
private SystemTopicTxnBufferSnapshotService txnBufferSnapshotIndexService;
- public TransactionBufferSnapshotServiceFactory(PulsarClient pulsarClient) {
- this.txnBufferSnapshotSegmentService = new SystemTopicTxnBufferSnapshotService<>(pulsarClient,
+ public TransactionBufferSnapshotServiceFactory(PulsarService pulsar) throws PulsarServerException {
+ this.txnBufferSnapshotSegmentService = new SystemTopicTxnBufferSnapshotService<>(pulsar,
EventType.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS,
TransactionBufferSnapshotSegment.class);
- this.txnBufferSnapshotIndexService = new SystemTopicTxnBufferSnapshotService<>(pulsarClient,
+ this.txnBufferSnapshotIndexService = new SystemTopicTxnBufferSnapshotService<>(pulsar,
EventType.TRANSACTION_BUFFER_SNAPSHOT_INDEXES, TransactionBufferSnapshotIndexes.class);
- this.txnBufferSnapshotService = new SystemTopicTxnBufferSnapshotService<>(pulsarClient,
+ this.txnBufferSnapshotService = new SystemTopicTxnBufferSnapshotService<>(pulsar,
EventType.TRANSACTION_BUFFER_SNAPSHOT, TransactionBufferSnapshot.class);
}
- public SystemTopicTxnBufferSnapshotService getTxnBufferSnapshotIndexService() {
- return this.txnBufferSnapshotIndexService;
- }
-
- public SystemTopicTxnBufferSnapshotService
- getTxnBufferSnapshotSegmentService() {
- return this.txnBufferSnapshotSegmentService;
- }
-
- public SystemTopicTxnBufferSnapshotService getTxnBufferSnapshotService() {
- return this.txnBufferSnapshotService;
- }
-
public void close() throws Exception {
if (this.txnBufferSnapshotIndexService != null) {
this.txnBufferSnapshotIndexService.close();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
index 3c640dc41126d..1649349e3e6f6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
@@ -90,7 +90,8 @@ public CompletableFuture recoverFromSnapshot() {
final var pulsar = topic.getBrokerService().getPulsar();
pulsar.getTransactionExecutorProvider().getExecutor(this).execute(() -> {
try {
- final var snapshot = pulsar.getSnapshotTableView().readLatest(topic.getName());
+ final var snapshot = pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService()
+ .getTableView().readLatest(topic.getName());
if (snapshot != null) {
handleSnapshot(snapshot);
final var startReadCursorPosition = PositionFactory.create(snapshot.getMaxReadPositionLedgerId(),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
index e94e7a047797a..4ca27f77a87f5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
@@ -24,11 +24,11 @@
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Supplier;
@@ -54,7 +54,6 @@
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexesMetadata;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TxnIDData;
-import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -228,220 +227,129 @@ public CompletableFuture takeAbortedTxnsSnapshot(Position maxReadPosition)
@Override
public CompletableFuture recoverFromSnapshot() {
- return topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
- .getTxnBufferSnapshotIndexService()
- .createReader(TopicName.get(topic.getName())).thenComposeAsync(reader -> {
- Position startReadCursorPosition = null;
- TransactionBufferSnapshotIndexes persistentSnapshotIndexes = null;
- try {
- /*
- Read the transaction snapshot segment index.
-
- The processor can get the sequence ID, unsealed transaction IDs,
- segment index list and max read position in the snapshot segment index.
- Then we can traverse the index list to read all aborted transaction IDs
- in segments to aborts.
-
- */
- while (reader.hasMoreEvents()) {
- Message message = reader.readNextAsync()
- .get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
- if (topic.getName().equals(message.getKey())) {
- TransactionBufferSnapshotIndexes transactionBufferSnapshotIndexes = message.getValue();
- if (transactionBufferSnapshotIndexes != null) {
- persistentSnapshotIndexes = transactionBufferSnapshotIndexes;
- startReadCursorPosition = PositionFactory.create(
- transactionBufferSnapshotIndexes.getSnapshot().getMaxReadPositionLedgerId(),
- transactionBufferSnapshotIndexes.getSnapshot().getMaxReadPositionEntryId());
- }
- }
- }
- } catch (TimeoutException ex) {
- Throwable t = FutureUtil.unwrapCompletionException(ex);
- String errorMessage = String.format("[%s] Transaction buffer recover fail by read "
- + "transactionBufferSnapshot timeout!", topic.getName());
- log.error(errorMessage, t);
- return FutureUtil.failedFuture(
- new BrokerServiceException.ServiceUnitNotReadyException(errorMessage, t));
- } catch (Exception ex) {
- log.error("[{}] Transaction buffer recover fail when read "
- + "transactionBufferSnapshot!", topic.getName(), ex);
- return FutureUtil.failedFuture(ex);
- } finally {
- closeReader(reader);
- }
- Position finalStartReadCursorPosition = startReadCursorPosition;
- TransactionBufferSnapshotIndexes finalPersistentSnapshotIndexes = persistentSnapshotIndexes;
- if (persistentSnapshotIndexes == null) {
- return recoverOldSnapshot();
- } else {
- this.unsealedTxnIds = convertTypeToTxnID(persistentSnapshotIndexes
- .getSnapshot().getAborts());
- }
- //Read snapshot segment to recover aborts.
- ArrayList> completableFutures = new ArrayList<>();
- CompletableFuture openManagedLedgerAndHandleSegmentsFuture = new CompletableFuture<>();
- AtomicBoolean hasInvalidIndex = new AtomicBoolean(false);
- AsyncCallbacks.OpenReadOnlyManagedLedgerCallback callback = new AsyncCallbacks
- .OpenReadOnlyManagedLedgerCallback() {
- @Override
- public void openReadOnlyManagedLedgerComplete(ReadOnlyManagedLedgerImpl readOnlyManagedLedger,
- Object ctx) {
- finalPersistentSnapshotIndexes.getIndexList().forEach(index -> {
- CompletableFuture handleSegmentFuture = new CompletableFuture<>();
- completableFutures.add(handleSegmentFuture);
- readOnlyManagedLedger.asyncReadEntry(
- PositionFactory.create(index.getSegmentLedgerID(),
- index.getSegmentEntryID()),
- new AsyncCallbacks.ReadEntryCallback() {
- @Override
- public void readEntryComplete(Entry entry, Object ctx) {
- handleSnapshotSegmentEntry(entry);
- indexes.put(PositionFactory.create(
- index.abortedMarkLedgerID,
- index.abortedMarkEntryID),
- index);
- entry.release();
- handleSegmentFuture.complete(null);
- }
-
- @Override
- public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
- /*
- The logic flow of deleting expired segment is:
-
- 1. delete segment
- 2. update segment index
-
- If the worker delete segment successfully
- but failed to update segment index,
- the segment can not be read according to the index.
- We update index again if there are invalid indexes.
- */
- if (((ManagedLedgerImpl) topic.getManagedLedger())
- .ledgerExists(index.getAbortedMarkLedgerID())) {
- log.error("[{}] Failed to read snapshot segment [{}:{}]",
- topic.getName(), index.segmentLedgerID,
- index.segmentEntryID, exception);
- handleSegmentFuture.completeExceptionally(exception);
- } else {
- hasInvalidIndex.set(true);
- }
- }
-
- @Override
- public String toString() {
- return String.format("Transaction buffer [%s] recover from snapshot",
- SnapshotSegmentAbortedTxnProcessorImpl.this.topic.getName());
- }
- }, null);
- });
- openManagedLedgerAndHandleSegmentsFuture.complete(null);
- }
+ final var pulsar = topic.getBrokerService().getPulsar();
+ final var future = new CompletableFuture();
+ pulsar.getTransactionExecutorProvider().getExecutor(this).execute(() -> {
+ try {
+ final var indexes = pulsar.getTransactionBufferSnapshotServiceFactory()
+ .getTxnBufferSnapshotIndexService().getTableView().readLatest(topic.getName());
+ if (indexes == null) {
+ // Try recovering from the old format snapshot
+ future.complete(recoverOldSnapshot());
+ return;
+ }
+ final var snapshot = indexes.getSnapshot();
+ final var startReadCursorPosition = PositionFactory.create(snapshot.getMaxReadPositionLedgerId(),
+ snapshot.getMaxReadPositionEntryId());
+ this.unsealedTxnIds = convertTypeToTxnID(snapshot.getAborts());
+ // Read snapshot segment to recover aborts
+ final var snapshotSegmentTopicName = TopicName.get(TopicDomain.persistent.toString(),
+ TopicName.get(topic.getName()).getNamespaceObject(),
+ SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
+ readSegmentEntries(snapshotSegmentTopicName, indexes);
+ if (!this.indexes.isEmpty()) {
+ // If there is no segment index, the persistent worker will write segment begin from 0.
+ persistentWorker.sequenceID.set(this.indexes.get(this.indexes.lastKey()).sequenceID + 1);
+ }
+ unsealedTxnIds.forEach(txnID -> aborts.put(txnID, txnID));
+ future.complete(startReadCursorPosition);
+ } catch (Throwable throwable) {
+ future.completeExceptionally(throwable);
+ }
+ });
+ return future;
+ }
- @Override
- public void openReadOnlyManagedLedgerFailed(ManagedLedgerException exception, Object ctx) {
- log.error("[{}] Failed to open readOnly managed ledger", topic, exception);
- openManagedLedgerAndHandleSegmentsFuture.completeExceptionally(exception);
- }
- };
-
- TopicName snapshotSegmentTopicName = TopicName.get(TopicDomain.persistent.toString(),
- TopicName.get(topic.getName()).getNamespaceObject(),
- SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
- this.topic.getBrokerService().getPulsar().getManagedLedgerFactory()
- .asyncOpenReadOnlyManagedLedger(snapshotSegmentTopicName
- .getPersistenceNamingEncoding(), callback,
- topic.getManagedLedger().getConfig(),
- null);
- /*
- Wait the processor recover completely and then allow TB
- to recover the messages after the startReadCursorPosition.
- */
- return openManagedLedgerAndHandleSegmentsFuture
- .thenCompose((ignore) -> FutureUtil.waitForAll(completableFutures))
- .thenCompose((i) -> {
- /*
- Update the snapshot segment index if there exist invalid indexes.
- */
- if (hasInvalidIndex.get()) {
- persistentWorker.appendTask(PersistentWorker.OperationType.UpdateIndex,
- () -> persistentWorker.updateSnapshotIndex(
- finalPersistentSnapshotIndexes.getSnapshot()));
- }
- /*
- If there is no segment index, the persistent worker will write segment begin from 0.
- */
- if (indexes.size() != 0) {
- persistentWorker.sequenceID.set(indexes.get(indexes.lastKey()).sequenceID + 1);
- }
- /*
- Append the aborted txn IDs in the index metadata
- can keep the order of the aborted txn in the aborts.
- So that we can trim the expired snapshot segment in aborts
- according to the latest transaction IDs in the segmentIndex.
- */
- unsealedTxnIds.forEach(txnID -> aborts.put(txnID, txnID));
- return CompletableFuture.completedFuture(finalStartReadCursorPosition);
- }).exceptionally(ex -> {
- log.error("[{}] Failed to recover snapshot segment", this.topic.getName(), ex);
- return null;
- });
-
- }, topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
- .getExecutor(this));
+ private void readSegmentEntries(TopicName topicName, TransactionBufferSnapshotIndexes indexes) throws Exception {
+ final var managedLedger = openReadOnlyManagedLedger(topicName);
+ boolean hasInvalidIndex = false;
+ for (var index : indexes.getIndexList()) {
+ final var position = PositionFactory.create(index.getSegmentLedgerID(), index.getSegmentEntryID());
+ final var abortedPosition = PositionFactory.create(index.abortedMarkLedgerID, index.abortedMarkEntryID);
+ try {
+ final var entry = readEntry(managedLedger, position);
+ try {
+ handleSnapshotSegmentEntry(entry);
+ this.indexes.put(abortedPosition, index);
+ } finally {
+ entry.release();
+ }
+ } catch (Throwable throwable) {
+ if (((ManagedLedgerImpl) topic.getManagedLedger())
+ .ledgerExists(index.getAbortedMarkLedgerID())) {
+ log.error("[{}] Failed to read snapshot segment [{}:{}]",
+ topic.getName(), index.segmentLedgerID,
+ index.segmentEntryID, throwable);
+ throw throwable;
+ } else {
+ hasInvalidIndex = true;
+ }
+ }
+ }
+ if (hasInvalidIndex) {
+ // Update the snapshot segment index if there exist invalid indexes.
+ persistentWorker.appendTask(PersistentWorker.OperationType.UpdateIndex,
+ () -> persistentWorker.updateSnapshotIndex(indexes.getSnapshot()));
+ }
+ }
+
+ private ReadOnlyManagedLedgerImpl openReadOnlyManagedLedger(TopicName topicName) throws Exception {
+ final var future = new CompletableFuture();
+ final var callback = new AsyncCallbacks.OpenReadOnlyManagedLedgerCallback() {
+ @Override
+ public void openReadOnlyManagedLedgerComplete(ReadOnlyManagedLedgerImpl managedLedger, Object ctx) {
+ future.complete(managedLedger);
+ }
+
+ @Override
+ public void openReadOnlyManagedLedgerFailed(ManagedLedgerException exception, Object ctx) {
+ future.completeExceptionally(exception);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Transaction buffer [%s] recover from snapshot",
+ SnapshotSegmentAbortedTxnProcessorImpl.this.topic.getName());
+ }
+ };
+ topic.getBrokerService().getPulsar().getManagedLedgerFactory().asyncOpenReadOnlyManagedLedger(
+ topicName.getPersistenceNamingEncoding(), callback, topic.getManagedLedger().getConfig(), null);
+ return wait(future, "open read only ml for " + topicName);
+ }
+
+ private Entry readEntry(ReadOnlyManagedLedgerImpl managedLedger, Position position) throws Exception {
+ final var future = new CompletableFuture();
+ managedLedger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
+ @Override
+ public void readEntryComplete(Entry entry, Object ctx) {
+ future.complete(entry);
+ }
+
+ @Override
+ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+ future.completeExceptionally(exception);
+ }
+ }, null);
+ return wait(future, "read entry from " + position);
}
// This method will be deprecated and removed in version 4.x.0
- private CompletableFuture recoverOldSnapshot() {
- return topic.getBrokerService().getPulsar().getPulsarResources().getTopicResources()
- .listPersistentTopicsAsync(NamespaceName.get(TopicName.get(topic.getName()).getNamespace()))
- .thenCompose(topics -> {
- if (!topics.contains(TopicDomain.persistent + "://"
- + TopicName.get(topic.getName()).getNamespace() + "/"
- + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT)) {
- return CompletableFuture.completedFuture(null);
- } else {
- return topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
- .getTxnBufferSnapshotService()
- .createReader(TopicName.get(topic.getName())).thenComposeAsync(snapshotReader -> {
- Position startReadCursorPositionInOldSnapshot = null;
- try {
- while (snapshotReader.hasMoreEvents()) {
- Message message = snapshotReader.readNextAsync()
- .get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
- if (topic.getName().equals(message.getKey())) {
- TransactionBufferSnapshot transactionBufferSnapshot =
- message.getValue();
- if (transactionBufferSnapshot != null) {
- handleOldSnapshot(transactionBufferSnapshot);
- startReadCursorPositionInOldSnapshot = PositionFactory.create(
- transactionBufferSnapshot.getMaxReadPositionLedgerId(),
- transactionBufferSnapshot.getMaxReadPositionEntryId());
- }
- }
- }
- } catch (TimeoutException ex) {
- Throwable t = FutureUtil.unwrapCompletionException(ex);
- String errorMessage = String.format("[%s] Transaction buffer recover fail by "
- + "read transactionBufferSnapshot timeout!", topic.getName());
- log.error(errorMessage, t);
- return FutureUtil.failedFuture(new BrokerServiceException
- .ServiceUnitNotReadyException(errorMessage, t));
- } catch (Exception ex) {
- log.error("[{}] Transaction buffer recover fail when read "
- + "transactionBufferSnapshot!", topic.getName(), ex);
- return FutureUtil.failedFuture(ex);
- } finally {
- assert snapshotReader != null;
- closeReader(snapshotReader);
- }
- return CompletableFuture.completedFuture(startReadCursorPositionInOldSnapshot);
- },
- topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
- .getExecutor(this));
- }
- });
+ private Position recoverOldSnapshot() throws Exception {
+ final var pulsar = topic.getBrokerService().getPulsar();
+ final var topicName = TopicName.get(topic.getName());
+ final var topics = wait(pulsar.getPulsarResources().getTopicResources().listPersistentTopicsAsync(
+ NamespaceName.get(topicName.getNamespace())), "list persistent topics");
+ if (!topics.contains(TopicDomain.persistent + "://" + topicName.getNamespace() + "/"
+ + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT)) {
+ return null;
+ }
+ final var snapshot = pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService()
+ .getTableView().readLatest(topic.getName());
+ if (snapshot == null) {
+ return null;
+ }
+ handleOldSnapshot(snapshot);
+ return PositionFactory.create(snapshot.getMaxReadPositionLedgerId(), snapshot.getMaxReadPositionEntryId());
}
// This method will be deprecated and removed in version 4.x.0
@@ -509,9 +417,17 @@ private long getSystemClientOperationTimeoutMs() throws Exception {
return pulsarClient.getConfiguration().getOperationTimeoutMs();
}
- private void closeReader(SystemTopicClient.Reader reader) {
+ private R wait(CompletableFuture future, String msg) throws Exception {
+ try {
+ return future.get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw new CompletionException("Failed to " + msg, e.getCause());
+ }
+ }
+
+ private void closeReader(SystemTopicClient.Reader reader) {
reader.closeAsync().exceptionally(e -> {
- log.error("[{}]Transaction buffer snapshot reader close error!", topic.getName(), e);
+ log.warn("[{}] Failed to close reader: {}", topic.getName(), e.getMessage());
return null;
});
}
@@ -838,25 +754,37 @@ private CompletableFuture clearSnapshotSegmentAndIndexes() {
*
*/
private CompletableFuture clearAllSnapshotSegments() {
- return topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
- .getTxnBufferSnapshotSegmentService()
- .createReader(TopicName.get(topic.getName())).thenComposeAsync(reader -> {
- try {
- while (reader.hasMoreEvents()) {
- Message message = reader.readNextAsync()
- .get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
- if (topic.getName().equals(message.getValue().getTopicName())) {
- snapshotSegmentsWriter.getFuture().get().write(message.getKey(), null);
- }
+ final var future = new CompletableFuture();
+ final var pulsar = topic.getBrokerService().getPulsar();
+ pulsar.getTransactionExecutorProvider().getExecutor(this).execute(() -> {
+ try {
+ final var reader = wait(pulsar.getTransactionBufferSnapshotServiceFactory()
+ .getTxnBufferSnapshotSegmentService().createReader(TopicName.get(topic.getName()))
+ , "create reader");
+ try {
+ while (wait(reader.hasMoreEventsAsync(), "has more events")) {
+ final var message = wait(reader.readNextAsync(), "read next");
+ if (topic.getName().equals(message.getValue().getTopicName())) {
+ snapshotSegmentsWriter.getFuture().get().write(message.getKey(), null);
}
- return CompletableFuture.completedFuture(null);
- } catch (Exception ex) {
- log.error("[{}] Transaction buffer clear snapshot segments fail!", topic.getName(), ex);
- return FutureUtil.failedFuture(ex);
- } finally {
- closeReader(reader);
}
- });
+ future.complete(null);
+ } finally {
+ closeReader(reader);
+ }
+ } catch (Throwable throwable) {
+ future.completeExceptionally(throwable);
+ }
+ });
+ return future;
+ }
+
+ private R wait(CompletableFuture future, String msg) throws Exception {
+ try {
+ return future.get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw new CompletionException("Failed to " + msg, e.getCause());
+ }
}
synchronized CompletableFuture closeAsync() {
@@ -882,4 +810,4 @@ private List convertTypeToTxnIDData(List abortedTxns) {
return segment;
}
-}
\ No newline at end of file
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotTableView.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
similarity index 68%
rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotTableView.java
rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
index 7099f22c78cf7..79836d8c15fc4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotTableView.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
@@ -25,11 +25,10 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
-import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.utils.SimpleCache;
@@ -40,23 +39,24 @@
* - Maintains multiple long-lived readers that will be expired after some time (1 minute by default).
*/
@Slf4j
-public class SnapshotTableView {
+public class TableView {
// Remove the cached reader and snapshots if there is no refresh request in 1 minute
private static final long CACHE_EXPIRE_TIMEOUT_MS = 60 * 1000L;
- private final Map snapshots = new ConcurrentHashMap<>();
- private final SystemTopicTxnBufferSnapshotService snapshotService;
- private final long blockTimeoutMs;
- private final SimpleCache> readers;
+ @VisibleForTesting
+ protected final Function>> readerCreator;
+ private final Map snapshots = new ConcurrentHashMap<>();
+ private final long clientOperationTimeoutMs;
+ private final SimpleCache> readers;
- public SnapshotTableView(PulsarService pulsar) {
- this.snapshotService = pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService();
- this.blockTimeoutMs = Long.parseLong(pulsar.getConfig().getProperties()
- .getProperty("brokerClient_operationTimeoutMs", "30000"));
- this.readers = new SimpleCache<>(pulsar.getExecutor(), CACHE_EXPIRE_TIMEOUT_MS);
+ public TableView(Function>> readerCreator, long clientOperationTimeoutMs,
+ ScheduledExecutorService executor) {
+ this.readerCreator = readerCreator;
+ this.clientOperationTimeoutMs = clientOperationTimeoutMs;
+ this.readers = new SimpleCache<>(executor, CACHE_EXPIRE_TIMEOUT_MS);
}
- public TransactionBufferSnapshot readLatest(String topic) throws Exception {
+ public T readLatest(String topic) throws Exception {
final var reader = getReader(topic);
while (wait(reader.hasMoreEventsAsync(), "has more events")) {
final var msg = wait(reader.readNextAsync(), "read message");
@@ -72,11 +72,11 @@ public TransactionBufferSnapshot readLatest(String topic) throws Exception {
}
@VisibleForTesting
- protected Reader getReader(String topic) {
+ protected Reader getReader(String topic) {
final var topicName = TopicName.get(topic);
return readers.get(topicName.getNamespaceObject(), () -> {
try {
- return wait(snapshotService.createReader(topicName), "create reader");
+ return wait(readerCreator.apply(topicName), "create reader");
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -86,9 +86,9 @@ protected Reader getReader(String topic) {
}));
}
- private T wait(CompletableFuture future, String msg) throws Exception {
+ private R wait(CompletableFuture future, String msg) throws Exception {
try {
- return future.get(blockTimeoutMs, TimeUnit.MILLISECONDS);
+ return future.get(clientOperationTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw new CompletionException("Failed to " + msg, e.getCause());
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index 057bd4eb76d0c..8ab9d58f57076 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -66,7 +66,7 @@
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.impl.SingleSnapshotAbortedTxnProcessorImpl;
-import org.apache.pulsar.broker.transaction.buffer.impl.SnapshotTableView;
+import org.apache.pulsar.broker.transaction.buffer.impl.TableView;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndex;
@@ -582,19 +582,16 @@ private void checkSnapshotCount(TopicName topicName, boolean hasSnapshot,
reader.close();
}
- static class MockSnapshotTableView extends SnapshotTableView {
+ static class MockTableView extends TableView {
- private final PulsarService pulsar;
-
- public MockSnapshotTableView(PulsarService pulsar) {
- super(pulsar);
- this.pulsar = pulsar;
+ public MockTableView(PulsarService pulsar) {
+ super(topic -> pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService()
+ .createReader(topic), 30000L, pulsar.getExecutor());
}
@Override
public SystemTopicClient.Reader getReader(String topic) {
- return pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService()
- .createReader(TopicName.get(topic)).join();
+ return readerCreator.apply(TopicName.get(topic)).join();
}
}
@@ -628,6 +625,7 @@ public void testTransactionBufferRecoverThrowException() throws Exception {
doReturn(CompletableFuture.completedFuture(reader))
.when(systemTopicTxnBufferSnapshotService).createReader(any());
doReturn(refCounterWriter).when(systemTopicTxnBufferSnapshotService).getReferenceWriter(any());
+ doReturn(new MockTableView(pulsarServiceList.get(0))).when(systemTopicTxnBufferSnapshotService).getTableView();
TransactionBufferSnapshotServiceFactory transactionBufferSnapshotServiceFactory =
mock(TransactionBufferSnapshotServiceFactory.class);
doReturn(systemTopicTxnBufferSnapshotService)
@@ -680,10 +678,6 @@ private void checkCloseTopic(PulsarClient pulsarClient,
Field field,
Producer producer) throws Exception {
final var pulsar = getPulsarServiceList().get(0);
- final var snapshotTableViewField = PulsarService.class.getDeclaredField("snapshotTableView");
- final var originalSnapshotTableView = pulsar.getSnapshotTableView();
- snapshotTableViewField.setAccessible(true);
- snapshotTableViewField.set(pulsar, new MockSnapshotTableView(pulsar));
field.set(pulsar, transactionBufferSnapshotServiceFactory);
// recover again will throw then close topic
@@ -696,7 +690,6 @@ private void checkCloseTopic(PulsarClient pulsarClient,
});
field.set(pulsar, transactionBufferSnapshotServiceFactoryOriginal);
- snapshotTableViewField.set(pulsar, originalSnapshotTableView);
Transaction txn = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
@@ -708,8 +701,9 @@ private void checkCloseTopic(PulsarClient pulsarClient,
@Test
public void testTransactionBufferIndexSystemTopic() throws Exception {
+ final var pulsar = pulsarServiceList.get(0);
SystemTopicTxnBufferSnapshotService transactionBufferSnapshotIndexService =
- new TransactionBufferSnapshotServiceFactory(pulsarClient).getTxnBufferSnapshotIndexService();
+ new TransactionBufferSnapshotServiceFactory(pulsar).getTxnBufferSnapshotIndexService();
SystemTopicClient.Writer indexesWriter =
transactionBufferSnapshotIndexService.getReferenceWriter(
@@ -769,9 +763,10 @@ public void testTransactionBufferSegmentSystemTopic() throws Exception {
BrokerService brokerService = pulsarService.getBrokerService();
// create snapshot segment writer
+ final var pulsar = pulsarServiceList.get(0);
SystemTopicTxnBufferSnapshotService
transactionBufferSnapshotSegmentService =
- new TransactionBufferSnapshotServiceFactory(pulsarClient).getTxnBufferSnapshotSegmentService();
+ new TransactionBufferSnapshotServiceFactory(pulsar).getTxnBufferSnapshotSegmentService();
SystemTopicClient.Writer
segmentWriter = transactionBufferSnapshotSegmentService