Skip to content

Commit

Permalink
Apply transaction TableView for all AbortedTxnProcessor implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
BewareMyPower committed Jul 24, 2024
1 parent 92eede7 commit d24b2be
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 293 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@
import org.apache.pulsar.broker.stats.prometheus.PulsarPrometheusMetricsServlet;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
import org.apache.pulsar.broker.transaction.buffer.impl.SnapshotTableView;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
Expand Down Expand Up @@ -286,7 +285,6 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private PulsarMetadataEventSynchronizer localMetadataSynchronizer;
private CoordinationService coordinationService;
private TransactionBufferSnapshotServiceFactory transactionBufferSnapshotServiceFactory;
private SnapshotTableView snapshotTableView;
private MetadataStore configurationMetadataStore;
private PulsarMetadataEventSynchronizer configMetadataSynchronizer;
private boolean shouldShutdownConfigurationMetadataStore;
Expand Down Expand Up @@ -994,8 +992,7 @@ public void start() throws PulsarServerException {
MLTransactionMetadataStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());
MLPendingAckStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());

this.transactionBufferSnapshotServiceFactory = new TransactionBufferSnapshotServiceFactory(getClient());
this.snapshotTableView = new SnapshotTableView(this);
this.transactionBufferSnapshotServiceFactory = new TransactionBufferSnapshotServiceFactory(this);

this.transactionTimer =
new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.systopic.SystemTopicClientBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.broker.transaction.buffer.impl.TableView;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
Expand All @@ -42,6 +46,8 @@ public class SystemTopicTxnBufferSnapshotService<T> {
protected final EventType systemTopicType;

private final ConcurrentHashMap<NamespaceName, ReferenceCountedWriter<T>> refCountedWriterMap;
@Getter
private final TableView<T> tableView;

// The class ReferenceCountedWriter will maintain the reference count,
// when the reference count decrement to 0, it will be removed from writerFutureMap, the writer will be closed.
Expand Down Expand Up @@ -95,13 +101,16 @@ public synchronized void release() {

}

public SystemTopicTxnBufferSnapshotService(PulsarClient client, EventType systemTopicType,
Class<T> schemaType) {
public SystemTopicTxnBufferSnapshotService(PulsarService pulsar, EventType systemTopicType,
Class<T> schemaType) throws PulsarServerException {
final var client = (PulsarClientImpl) pulsar.getClient();
this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(client);
this.systemTopicType = systemTopicType;
this.schemaType = schemaType;
this.clients = new ConcurrentHashMap<>();
this.refCountedWriterMap = new ConcurrentHashMap<>();
this.tableView = new TableView<>(this::createReader,
client.getConfiguration().getOperationTimeoutMs(), pulsar.getExecutor());
}

public CompletableFuture<SystemTopicClient.Reader<T>> createReader(TopicName topicName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
*/
package org.apache.pulsar.broker.service;

import lombok.Getter;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.events.EventType;

@Getter
public class TransactionBufferSnapshotServiceFactory {

private SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshot> txnBufferSnapshotService;
Expand All @@ -33,29 +36,16 @@ public class TransactionBufferSnapshotServiceFactory {

private SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotIndexes> txnBufferSnapshotIndexService;

public TransactionBufferSnapshotServiceFactory(PulsarClient pulsarClient) {
this.txnBufferSnapshotSegmentService = new SystemTopicTxnBufferSnapshotService<>(pulsarClient,
public TransactionBufferSnapshotServiceFactory(PulsarService pulsar) throws PulsarServerException {
this.txnBufferSnapshotSegmentService = new SystemTopicTxnBufferSnapshotService<>(pulsar,
EventType.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS,
TransactionBufferSnapshotSegment.class);
this.txnBufferSnapshotIndexService = new SystemTopicTxnBufferSnapshotService<>(pulsarClient,
this.txnBufferSnapshotIndexService = new SystemTopicTxnBufferSnapshotService<>(pulsar,
EventType.TRANSACTION_BUFFER_SNAPSHOT_INDEXES, TransactionBufferSnapshotIndexes.class);
this.txnBufferSnapshotService = new SystemTopicTxnBufferSnapshotService<>(pulsarClient,
this.txnBufferSnapshotService = new SystemTopicTxnBufferSnapshotService<>(pulsar,
EventType.TRANSACTION_BUFFER_SNAPSHOT, TransactionBufferSnapshot.class);
}

public SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotIndexes> getTxnBufferSnapshotIndexService() {
return this.txnBufferSnapshotIndexService;
}

public SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotSegment>
getTxnBufferSnapshotSegmentService() {
return this.txnBufferSnapshotSegmentService;
}

public SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshot> getTxnBufferSnapshotService() {
return this.txnBufferSnapshotService;
}

public void close() throws Exception {
if (this.txnBufferSnapshotIndexService != null) {
this.txnBufferSnapshotIndexService.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ public CompletableFuture<Position> recoverFromSnapshot() {
final var pulsar = topic.getBrokerService().getPulsar();
pulsar.getTransactionExecutorProvider().getExecutor(this).execute(() -> {
try {
final var snapshot = pulsar.getSnapshotTableView().readLatest(topic.getName());
final var snapshot = pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService()
.getTableView().readLatest(topic.getName());
if (snapshot != null) {
handleSnapshot(snapshot);
final var startReadCursorPosition = PositionFactory.create(snapshot.getMaxReadPositionLedgerId(),
Expand Down
Loading

0 comments on commit d24b2be

Please sign in to comment.