diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index bf105507cac0f..c9c8407a13d2b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -919,7 +919,7 @@ public void start() throws PulsarServerException {
MLTransactionMetadataStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());
MLPendingAckStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());
- this.transactionBufferSnapshotServiceFactory = new TransactionBufferSnapshotServiceFactory(getClient());
+ this.transactionBufferSnapshotServiceFactory = new TransactionBufferSnapshotServiceFactory(this);
this.transactionTimer =
new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer"));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
index bd1b90981695e..ba6cbee355775 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
@@ -22,12 +22,16 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.systopic.SystemTopicClientBase;
-import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.broker.transaction.buffer.impl.TableView;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
@@ -42,6 +46,8 @@ public class SystemTopicTxnBufferSnapshotService {
protected final EventType systemTopicType;
private final ConcurrentHashMap> refCountedWriterMap;
+ @Getter
+ private final TableView tableView;
// The class ReferenceCountedWriter will maintain the reference count,
// when the reference count decrement to 0, it will be removed from writerFutureMap, the writer will be closed.
@@ -95,13 +101,16 @@ public synchronized void release() {
}
- public SystemTopicTxnBufferSnapshotService(PulsarClient client, EventType systemTopicType,
- Class schemaType) {
+ public SystemTopicTxnBufferSnapshotService(PulsarService pulsar, EventType systemTopicType,
+ Class schemaType) throws PulsarServerException {
+ final var client = (PulsarClientImpl) pulsar.getClient();
this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(client);
this.systemTopicType = systemTopicType;
this.schemaType = schemaType;
this.clients = new ConcurrentHashMap<>();
this.refCountedWriterMap = new ConcurrentHashMap<>();
+ this.tableView = new TableView<>(this::createReader,
+ client.getConfiguration().getOperationTimeoutMs(), pulsar.getExecutor());
}
public CompletableFuture> createReader(TopicName topicName) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotServiceFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotServiceFactory.java
index 4b8548fae47c7..d54f65572f594 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotServiceFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotServiceFactory.java
@@ -18,12 +18,15 @@
*/
package org.apache.pulsar.broker.service;
+import lombok.Getter;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment;
-import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.events.EventType;
+@Getter
public class TransactionBufferSnapshotServiceFactory {
private SystemTopicTxnBufferSnapshotService txnBufferSnapshotService;
@@ -33,29 +36,16 @@ public class TransactionBufferSnapshotServiceFactory {
private SystemTopicTxnBufferSnapshotService txnBufferSnapshotIndexService;
- public TransactionBufferSnapshotServiceFactory(PulsarClient pulsarClient) {
- this.txnBufferSnapshotSegmentService = new SystemTopicTxnBufferSnapshotService<>(pulsarClient,
+ public TransactionBufferSnapshotServiceFactory(PulsarService pulsar) throws PulsarServerException {
+ this.txnBufferSnapshotSegmentService = new SystemTopicTxnBufferSnapshotService<>(pulsar,
EventType.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS,
TransactionBufferSnapshotSegment.class);
- this.txnBufferSnapshotIndexService = new SystemTopicTxnBufferSnapshotService<>(pulsarClient,
+ this.txnBufferSnapshotIndexService = new SystemTopicTxnBufferSnapshotService<>(pulsar,
EventType.TRANSACTION_BUFFER_SNAPSHOT_INDEXES, TransactionBufferSnapshotIndexes.class);
- this.txnBufferSnapshotService = new SystemTopicTxnBufferSnapshotService<>(pulsarClient,
+ this.txnBufferSnapshotService = new SystemTopicTxnBufferSnapshotService<>(pulsar,
EventType.TRANSACTION_BUFFER_SNAPSHOT, TransactionBufferSnapshot.class);
}
- public SystemTopicTxnBufferSnapshotService getTxnBufferSnapshotIndexService() {
- return this.txnBufferSnapshotIndexService;
- }
-
- public SystemTopicTxnBufferSnapshotService
- getTxnBufferSnapshotSegmentService() {
- return this.txnBufferSnapshotSegmentService;
- }
-
- public SystemTopicTxnBufferSnapshotService getTxnBufferSnapshotService() {
- return this.txnBufferSnapshotService;
- }
-
public void close() throws Exception {
if (this.txnBufferSnapshotIndexService != null) {
this.txnBufferSnapshotIndexService.close();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
index 967f1f16fefe4..2eb31c3a6d589 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
@@ -21,25 +21,18 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.map.LinkedMap;
-import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.metadata.AbortTxnMetadata;
import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
-import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
-import org.apache.pulsar.common.util.FutureUtil;
@Slf4j
public class SingleSnapshotAbortedTxnProcessorImpl implements AbortedTxnProcessor {
@@ -90,48 +83,27 @@ public boolean checkAbortedTransaction(TxnID txnID) {
return aborts.containsKey(txnID);
}
- private long getSystemClientOperationTimeoutMs() throws Exception {
- PulsarClientImpl pulsarClient = (PulsarClientImpl) topic.getBrokerService().getPulsar().getClient();
- return pulsarClient.getConfiguration().getOperationTimeoutMs();
- }
-
@Override
public CompletableFuture recoverFromSnapshot() {
- return topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
- .getTxnBufferSnapshotService()
- .createReader(TopicName.get(topic.getName())).thenComposeAsync(reader -> {
- try {
- PositionImpl startReadCursorPosition = null;
- while (reader.hasMoreEvents()) {
- Message message = reader.readNextAsync()
- .get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
- if (topic.getName().equals(message.getKey())) {
- TransactionBufferSnapshot transactionBufferSnapshot = message.getValue();
- if (transactionBufferSnapshot != null) {
- handleSnapshot(transactionBufferSnapshot);
- startReadCursorPosition = PositionImpl.get(
- transactionBufferSnapshot.getMaxReadPositionLedgerId(),
- transactionBufferSnapshot.getMaxReadPositionEntryId());
- }
- }
- }
- return CompletableFuture.completedFuture(startReadCursorPosition);
- } catch (TimeoutException ex) {
- Throwable t = FutureUtil.unwrapCompletionException(ex);
- String errorMessage = String.format("[%s] Transaction buffer recover fail by read "
- + "transactionBufferSnapshot timeout!", topic.getName());
- log.error(errorMessage, t);
- return FutureUtil.failedFuture(
- new BrokerServiceException.ServiceUnitNotReadyException(errorMessage, t));
- } catch (Exception ex) {
- log.error("[{}] Transaction buffer recover fail when read "
- + "transactionBufferSnapshot!", topic.getName(), ex);
- return FutureUtil.failedFuture(ex);
- } finally {
- closeReader(reader);
- }
- }, topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
- .getExecutor(this));
+ final var future = new CompletableFuture();
+ final var pulsar = topic.getBrokerService().getPulsar();
+ pulsar.getTransactionExecutorProvider().getExecutor(this).execute(() -> {
+ try {
+ final var snapshot = pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService()
+ .getTableView().readLatest(topic.getName());
+ if (snapshot != null) {
+ handleSnapshot(snapshot);
+ final var startReadCursorPosition = new PositionImpl(snapshot.getMaxReadPositionLedgerId(),
+ snapshot.getMaxReadPositionEntryId());
+ future.complete(startReadCursorPosition);
+ } else {
+ future.complete(null);
+ }
+ } catch (Throwable e) {
+ future.completeExceptionally(e);
+ }
+ });
+ return future;
}
@Override
@@ -190,13 +162,6 @@ public synchronized CompletableFuture closeAsync() {
return CompletableFuture.completedFuture(null);
}
- private void closeReader(SystemTopicClient.Reader reader) {
- reader.closeAsync().exceptionally(e -> {
- log.error("[{}]Transaction buffer reader close error!", topic.getName(), e);
- return null;
- });
- }
-
private void handleSnapshot(TransactionBufferSnapshot snapshot) {
if (snapshot.getAborts() != null) {
snapshot.getAborts().forEach(abortTxnMetadata ->
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
index 385500dfbe9e7..d39cd42f3600e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
@@ -24,11 +24,11 @@
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Supplier;
@@ -53,7 +53,6 @@
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexesMetadata;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TxnIDData;
-import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -227,220 +226,129 @@ public CompletableFuture takeAbortedTxnsSnapshot(PositionImpl maxReadPosit
@Override
public CompletableFuture recoverFromSnapshot() {
- return topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
- .getTxnBufferSnapshotIndexService()
- .createReader(TopicName.get(topic.getName())).thenComposeAsync(reader -> {
- PositionImpl startReadCursorPosition = null;
- TransactionBufferSnapshotIndexes persistentSnapshotIndexes = null;
- try {
- /*
- Read the transaction snapshot segment index.
-
- The processor can get the sequence ID, unsealed transaction IDs,
- segment index list and max read position in the snapshot segment index.
- Then we can traverse the index list to read all aborted transaction IDs
- in segments to aborts.
-
- */
- while (reader.hasMoreEvents()) {
- Message message = reader.readNextAsync()
- .get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
- if (topic.getName().equals(message.getKey())) {
- TransactionBufferSnapshotIndexes transactionBufferSnapshotIndexes = message.getValue();
- if (transactionBufferSnapshotIndexes != null) {
- persistentSnapshotIndexes = transactionBufferSnapshotIndexes;
- startReadCursorPosition = PositionImpl.get(
- transactionBufferSnapshotIndexes.getSnapshot().getMaxReadPositionLedgerId(),
- transactionBufferSnapshotIndexes.getSnapshot().getMaxReadPositionEntryId());
- }
- }
- }
- } catch (TimeoutException ex) {
- Throwable t = FutureUtil.unwrapCompletionException(ex);
- String errorMessage = String.format("[%s] Transaction buffer recover fail by read "
- + "transactionBufferSnapshot timeout!", topic.getName());
- log.error(errorMessage, t);
- return FutureUtil.failedFuture(
- new BrokerServiceException.ServiceUnitNotReadyException(errorMessage, t));
- } catch (Exception ex) {
- log.error("[{}] Transaction buffer recover fail when read "
- + "transactionBufferSnapshot!", topic.getName(), ex);
- return FutureUtil.failedFuture(ex);
- } finally {
- closeReader(reader);
- }
- PositionImpl finalStartReadCursorPosition = startReadCursorPosition;
- TransactionBufferSnapshotIndexes finalPersistentSnapshotIndexes = persistentSnapshotIndexes;
- if (persistentSnapshotIndexes == null) {
- return recoverOldSnapshot();
- } else {
- this.unsealedTxnIds = convertTypeToTxnID(persistentSnapshotIndexes
- .getSnapshot().getAborts());
- }
- //Read snapshot segment to recover aborts.
- ArrayList> completableFutures = new ArrayList<>();
- CompletableFuture openManagedLedgerAndHandleSegmentsFuture = new CompletableFuture<>();
- AtomicBoolean hasInvalidIndex = new AtomicBoolean(false);
- AsyncCallbacks.OpenReadOnlyManagedLedgerCallback callback = new AsyncCallbacks
- .OpenReadOnlyManagedLedgerCallback() {
- @Override
- public void openReadOnlyManagedLedgerComplete(ReadOnlyManagedLedgerImpl readOnlyManagedLedger,
- Object ctx) {
- finalPersistentSnapshotIndexes.getIndexList().forEach(index -> {
- CompletableFuture handleSegmentFuture = new CompletableFuture<>();
- completableFutures.add(handleSegmentFuture);
- readOnlyManagedLedger.asyncReadEntry(
- new PositionImpl(index.getSegmentLedgerID(),
- index.getSegmentEntryID()),
- new AsyncCallbacks.ReadEntryCallback() {
- @Override
- public void readEntryComplete(Entry entry, Object ctx) {
- handleSnapshotSegmentEntry(entry);
- indexes.put(new PositionImpl(
- index.abortedMarkLedgerID,
- index.abortedMarkEntryID),
- index);
- entry.release();
- handleSegmentFuture.complete(null);
- }
-
- @Override
- public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
- /*
- The logic flow of deleting expired segment is:
-
- 1. delete segment
- 2. update segment index
-
- If the worker delete segment successfully
- but failed to update segment index,
- the segment can not be read according to the index.
- We update index again if there are invalid indexes.
- */
- if (((ManagedLedgerImpl) topic.getManagedLedger())
- .ledgerExists(index.getAbortedMarkLedgerID())) {
- log.error("[{}] Failed to read snapshot segment [{}:{}]",
- topic.getName(), index.segmentLedgerID,
- index.segmentEntryID, exception);
- handleSegmentFuture.completeExceptionally(exception);
- } else {
- hasInvalidIndex.set(true);
- }
- }
-
- @Override
- public String toString() {
- return String.format("Transaction buffer [%s] recover from snapshot",
- SnapshotSegmentAbortedTxnProcessorImpl.this.topic.getName());
- }
- }, null);
- });
- openManagedLedgerAndHandleSegmentsFuture.complete(null);
- }
+ final var pulsar = topic.getBrokerService().getPulsar();
+ final var future = new CompletableFuture();
+ pulsar.getTransactionExecutorProvider().getExecutor(this).execute(() -> {
+ try {
+ final var indexes = pulsar.getTransactionBufferSnapshotServiceFactory()
+ .getTxnBufferSnapshotIndexService().getTableView().readLatest(topic.getName());
+ if (indexes == null) {
+ // Try recovering from the old format snapshot
+ future.complete(recoverOldSnapshot());
+ return;
+ }
+ final var snapshot = indexes.getSnapshot();
+ final var startReadCursorPosition = new PositionImpl(snapshot.getMaxReadPositionLedgerId(),
+ snapshot.getMaxReadPositionEntryId());
+ this.unsealedTxnIds = convertTypeToTxnID(snapshot.getAborts());
+ // Read snapshot segment to recover aborts
+ final var snapshotSegmentTopicName = TopicName.get(TopicDomain.persistent.toString(),
+ TopicName.get(topic.getName()).getNamespaceObject(),
+ SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
+ readSegmentEntries(snapshotSegmentTopicName, indexes);
+ if (!this.indexes.isEmpty()) {
+ // If there is no segment index, the persistent worker will write segment begin from 0.
+ persistentWorker.sequenceID.set(this.indexes.get(this.indexes.lastKey()).sequenceID + 1);
+ }
+ unsealedTxnIds.forEach(txnID -> aborts.put(txnID, txnID));
+ future.complete(startReadCursorPosition);
+ } catch (Throwable throwable) {
+ future.completeExceptionally(throwable);
+ }
+ });
+ return future;
+ }
- @Override
- public void openReadOnlyManagedLedgerFailed(ManagedLedgerException exception, Object ctx) {
- log.error("[{}] Failed to open readOnly managed ledger", topic, exception);
- openManagedLedgerAndHandleSegmentsFuture.completeExceptionally(exception);
- }
- };
-
- TopicName snapshotSegmentTopicName = TopicName.get(TopicDomain.persistent.toString(),
- TopicName.get(topic.getName()).getNamespaceObject(),
- SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
- this.topic.getBrokerService().getPulsar().getManagedLedgerFactory()
- .asyncOpenReadOnlyManagedLedger(snapshotSegmentTopicName
- .getPersistenceNamingEncoding(), callback,
- topic.getManagedLedger().getConfig(),
- null);
- /*
- Wait the processor recover completely and then allow TB
- to recover the messages after the startReadCursorPosition.
- */
- return openManagedLedgerAndHandleSegmentsFuture
- .thenCompose((ignore) -> FutureUtil.waitForAll(completableFutures))
- .thenCompose((i) -> {
- /*
- Update the snapshot segment index if there exist invalid indexes.
- */
- if (hasInvalidIndex.get()) {
- persistentWorker.appendTask(PersistentWorker.OperationType.UpdateIndex,
- () -> persistentWorker.updateSnapshotIndex(
- finalPersistentSnapshotIndexes.getSnapshot()));
- }
- /*
- If there is no segment index, the persistent worker will write segment begin from 0.
- */
- if (indexes.size() != 0) {
- persistentWorker.sequenceID.set(indexes.get(indexes.lastKey()).sequenceID + 1);
- }
- /*
- Append the aborted txn IDs in the index metadata
- can keep the order of the aborted txn in the aborts.
- So that we can trim the expired snapshot segment in aborts
- according to the latest transaction IDs in the segmentIndex.
- */
- unsealedTxnIds.forEach(txnID -> aborts.put(txnID, txnID));
- return CompletableFuture.completedFuture(finalStartReadCursorPosition);
- }).exceptionally(ex -> {
- log.error("[{}] Failed to recover snapshot segment", this.topic.getName(), ex);
- return null;
- });
-
- }, topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
- .getExecutor(this));
+ private void readSegmentEntries(TopicName topicName, TransactionBufferSnapshotIndexes indexes) throws Exception {
+ final var managedLedger = openReadOnlyManagedLedger(topicName);
+ boolean hasInvalidIndex = false;
+ for (var index : indexes.getIndexList()) {
+ final var position = new PositionImpl(index.getSegmentLedgerID(), index.getSegmentEntryID());
+ final var abortedPosition = new PositionImpl(index.abortedMarkLedgerID, index.abortedMarkEntryID);
+ try {
+ final var entry = readEntry(managedLedger, position);
+ try {
+ handleSnapshotSegmentEntry(entry);
+ this.indexes.put(abortedPosition, index);
+ } finally {
+ entry.release();
+ }
+ } catch (Throwable throwable) {
+ if (((ManagedLedgerImpl) topic.getManagedLedger())
+ .ledgerExists(index.getAbortedMarkLedgerID())) {
+ log.error("[{}] Failed to read snapshot segment [{}:{}]",
+ topic.getName(), index.segmentLedgerID,
+ index.segmentEntryID, throwable);
+ throw throwable;
+ } else {
+ hasInvalidIndex = true;
+ }
+ }
+ }
+ if (hasInvalidIndex) {
+ // Update the snapshot segment index if there exist invalid indexes.
+ persistentWorker.appendTask(PersistentWorker.OperationType.UpdateIndex,
+ () -> persistentWorker.updateSnapshotIndex(indexes.getSnapshot()));
+ }
+ }
+
+ private ReadOnlyManagedLedgerImpl openReadOnlyManagedLedger(TopicName topicName) throws Exception {
+ final var future = new CompletableFuture();
+ final var callback = new AsyncCallbacks.OpenReadOnlyManagedLedgerCallback() {
+ @Override
+ public void openReadOnlyManagedLedgerComplete(ReadOnlyManagedLedgerImpl managedLedger, Object ctx) {
+ future.complete(managedLedger);
+ }
+
+ @Override
+ public void openReadOnlyManagedLedgerFailed(ManagedLedgerException exception, Object ctx) {
+ future.completeExceptionally(exception);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Transaction buffer [%s] recover from snapshot",
+ SnapshotSegmentAbortedTxnProcessorImpl.this.topic.getName());
+ }
+ };
+ topic.getBrokerService().getPulsar().getManagedLedgerFactory().asyncOpenReadOnlyManagedLedger(
+ topicName.getPersistenceNamingEncoding(), callback, topic.getManagedLedger().getConfig(), null);
+ return wait(future, "open read only ml for " + topicName);
+ }
+
+ private Entry readEntry(ReadOnlyManagedLedgerImpl managedLedger, PositionImpl position) throws Exception {
+ final var future = new CompletableFuture();
+ managedLedger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
+ @Override
+ public void readEntryComplete(Entry entry, Object ctx) {
+ future.complete(entry);
+ }
+
+ @Override
+ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+ future.completeExceptionally(exception);
+ }
+ }, null);
+ return wait(future, "read entry from " + position);
}
// This method will be deprecated and removed in version 4.x.0
- private CompletableFuture recoverOldSnapshot() {
- return topic.getBrokerService().getPulsar().getPulsarResources().getTopicResources()
- .listPersistentTopicsAsync(NamespaceName.get(TopicName.get(topic.getName()).getNamespace()))
- .thenCompose(topics -> {
- if (!topics.contains(TopicDomain.persistent + "://"
- + TopicName.get(topic.getName()).getNamespace() + "/"
- + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT)) {
- return CompletableFuture.completedFuture(null);
- } else {
- return topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
- .getTxnBufferSnapshotService()
- .createReader(TopicName.get(topic.getName())).thenComposeAsync(snapshotReader -> {
- PositionImpl startReadCursorPositionInOldSnapshot = null;
- try {
- while (snapshotReader.hasMoreEvents()) {
- Message message = snapshotReader.readNextAsync()
- .get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
- if (topic.getName().equals(message.getKey())) {
- TransactionBufferSnapshot transactionBufferSnapshot =
- message.getValue();
- if (transactionBufferSnapshot != null) {
- handleOldSnapshot(transactionBufferSnapshot);
- startReadCursorPositionInOldSnapshot = PositionImpl.get(
- transactionBufferSnapshot.getMaxReadPositionLedgerId(),
- transactionBufferSnapshot.getMaxReadPositionEntryId());
- }
- }
- }
- } catch (TimeoutException ex) {
- Throwable t = FutureUtil.unwrapCompletionException(ex);
- String errorMessage = String.format("[%s] Transaction buffer recover fail by "
- + "read transactionBufferSnapshot timeout!", topic.getName());
- log.error(errorMessage, t);
- return FutureUtil.failedFuture(new BrokerServiceException
- .ServiceUnitNotReadyException(errorMessage, t));
- } catch (Exception ex) {
- log.error("[{}] Transaction buffer recover fail when read "
- + "transactionBufferSnapshot!", topic.getName(), ex);
- return FutureUtil.failedFuture(ex);
- } finally {
- assert snapshotReader != null;
- closeReader(snapshotReader);
- }
- return CompletableFuture.completedFuture(startReadCursorPositionInOldSnapshot);
- },
- topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
- .getExecutor(this));
- }
- });
+ private PositionImpl recoverOldSnapshot() throws Exception {
+ final var pulsar = topic.getBrokerService().getPulsar();
+ final var topicName = TopicName.get(topic.getName());
+ final var topics = wait(pulsar.getPulsarResources().getTopicResources().listPersistentTopicsAsync(
+ NamespaceName.get(topicName.getNamespace())), "list persistent topics");
+ if (!topics.contains(TopicDomain.persistent + "://" + topicName.getNamespace() + "/"
+ + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT)) {
+ return null;
+ }
+ final var snapshot = pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService()
+ .getTableView().readLatest(topic.getName());
+ if (snapshot == null) {
+ return null;
+ }
+ handleOldSnapshot(snapshot);
+ return new PositionImpl(snapshot.getMaxReadPositionLedgerId(), snapshot.getMaxReadPositionEntryId());
}
// This method will be deprecated and removed in version 4.x.0
@@ -508,9 +416,17 @@ private long getSystemClientOperationTimeoutMs() throws Exception {
return pulsarClient.getConfiguration().getOperationTimeoutMs();
}
- private void closeReader(SystemTopicClient.Reader reader) {
+ private R wait(CompletableFuture future, String msg) throws Exception {
+ try {
+ return future.get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw new CompletionException("Failed to " + msg, e.getCause());
+ }
+ }
+
+ private void closeReader(SystemTopicClient.Reader reader) {
reader.closeAsync().exceptionally(e -> {
- log.error("[{}]Transaction buffer snapshot reader close error!", topic.getName(), e);
+ log.warn("[{}] Failed to close reader: {}", topic.getName(), e.getMessage());
return null;
});
}
@@ -837,25 +753,37 @@ private CompletableFuture clearSnapshotSegmentAndIndexes() {
*
*/
private CompletableFuture clearAllSnapshotSegments() {
- return topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
- .getTxnBufferSnapshotSegmentService()
- .createReader(TopicName.get(topic.getName())).thenComposeAsync(reader -> {
- try {
- while (reader.hasMoreEvents()) {
- Message message = reader.readNextAsync()
- .get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
- if (topic.getName().equals(message.getValue().getTopicName())) {
- snapshotSegmentsWriter.getFuture().get().write(message.getKey(), null);
- }
+ final var future = new CompletableFuture();
+ final var pulsar = topic.getBrokerService().getPulsar();
+ pulsar.getTransactionExecutorProvider().getExecutor(this).execute(() -> {
+ try {
+ final var reader = wait(pulsar.getTransactionBufferSnapshotServiceFactory()
+ .getTxnBufferSnapshotSegmentService().createReader(TopicName.get(topic.getName()))
+ , "create reader");
+ try {
+ while (wait(reader.hasMoreEventsAsync(), "has more events")) {
+ final var message = wait(reader.readNextAsync(), "read next");
+ if (topic.getName().equals(message.getValue().getTopicName())) {
+ snapshotSegmentsWriter.getFuture().get().write(message.getKey(), null);
}
- return CompletableFuture.completedFuture(null);
- } catch (Exception ex) {
- log.error("[{}] Transaction buffer clear snapshot segments fail!", topic.getName(), ex);
- return FutureUtil.failedFuture(ex);
- } finally {
- closeReader(reader);
}
- });
+ future.complete(null);
+ } finally {
+ closeReader(reader);
+ }
+ } catch (Throwable throwable) {
+ future.completeExceptionally(throwable);
+ }
+ });
+ return future;
+ }
+
+ private R wait(CompletableFuture future, String msg) throws Exception {
+ try {
+ return future.get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw new CompletionException("Failed to " + msg, e.getCause());
+ }
}
synchronized CompletableFuture closeAsync() {
@@ -881,4 +809,4 @@ private List convertTypeToTxnIDData(List abortedTxns) {
return segment;
}
-}
\ No newline at end of file
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
new file mode 100644
index 0000000000000..7608a393cc980
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import static org.apache.pulsar.broker.systopic.SystemTopicClient.Reader;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.utils.SimpleCache;
+
+/**
+ * Compared with the more generic {@link org.apache.pulsar.client.api.TableView}, this table view
+ * - Provides just a single public method that reads the latest value synchronously.
+ * - Maintains multiple long-lived readers that will be expired after some time (1 minute by default).
+ */
+@Slf4j
+public class TableView {
+
+ // Remove the cached reader and snapshots if there is no refresh request in 1 minute
+ private static final long CACHE_EXPIRE_TIMEOUT_MS = 60 * 1000L;
+ private static final long CACHE_EXPIRE_CHECK_FREQUENCY_MS = 3000L;
+ @VisibleForTesting
+ protected final Function>> readerCreator;
+ private final Map snapshots = new ConcurrentHashMap<>();
+ private final long clientOperationTimeoutMs;
+ private final SimpleCache> readers;
+
+ public TableView(Function>> readerCreator, long clientOperationTimeoutMs,
+ ScheduledExecutorService executor) {
+ this.readerCreator = readerCreator;
+ this.clientOperationTimeoutMs = clientOperationTimeoutMs;
+ this.readers = new SimpleCache<>(executor, CACHE_EXPIRE_TIMEOUT_MS, CACHE_EXPIRE_CHECK_FREQUENCY_MS);
+ }
+
+ public T readLatest(String topic) throws Exception {
+ final var reader = getReader(topic);
+ while (wait(reader.hasMoreEventsAsync(), "has more events")) {
+ final var msg = wait(reader.readNextAsync(), "read message");
+ if (msg.getKey() != null) {
+ if (msg.getValue() != null) {
+ snapshots.put(msg.getKey(), msg.getValue());
+ } else {
+ snapshots.remove(msg.getKey());
+ }
+ }
+ }
+ return snapshots.get(topic);
+ }
+
+ @VisibleForTesting
+ protected Reader getReader(String topic) {
+ final var topicName = TopicName.get(topic);
+ return readers.get(topicName.getNamespaceObject(), () -> {
+ try {
+ return wait(readerCreator.apply(topicName), "create reader");
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }, __ -> __.closeAsync().exceptionally(e -> {
+ log.warn("Failed to close reader {}", e.getMessage());
+ return null;
+ }));
+ }
+
+ private R wait(CompletableFuture future, String msg) throws Exception {
+ try {
+ return future.get(clientOperationTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw new CompletionException("Failed to " + msg, e.getCause());
+ }
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index dfb73815e08d7..43666aae1abdb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -621,7 +621,7 @@ public void run() {
this, topic.getName());
return;
}
- abortedTxnProcessor.recoverFromSnapshot().thenAcceptAsync(startReadCursorPosition -> {
+ abortedTxnProcessor.recoverFromSnapshot().thenAccept(startReadCursorPosition -> {
//Transaction is not use for this topic, so just make maxReadPosition as LAC.
if (startReadCursorPosition == null) {
callBack.noNeedToRecover();
@@ -667,8 +667,7 @@ public void run() {
closeCursor(SUBSCRIPTION_NAME);
callBack.recoverComplete();
- }, topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
- .getExecutor(this)).exceptionally(e -> {
+ }).exceptionally(e -> {
callBack.recoverExceptionally(e.getCause());
log.error("[{}]Transaction buffer failed to recover snapshot!", topic.getName(), e);
return null;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java
new file mode 100644
index 0000000000000..6a3a6721198e1
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import lombok.RequiredArgsConstructor;
+
+public class SimpleCache {
+
+ private final Map> cache = new HashMap<>();
+ private final long timeoutMs;
+
+ @RequiredArgsConstructor
+ private class ExpirableValue {
+
+ private final V value;
+ private final Consumer expireCallback;
+ private long deadlineMs;
+
+ boolean tryExpire() {
+ if (System.currentTimeMillis() >= deadlineMs) {
+ expireCallback.accept(value);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ void updateDeadline() {
+ deadlineMs = System.currentTimeMillis() + timeoutMs;
+ }
+ }
+
+ public SimpleCache(final ScheduledExecutorService scheduler, final long timeoutMs, final long frequencyMs) {
+ this.timeoutMs = timeoutMs;
+ scheduler.scheduleAtFixedRate(() -> {
+ synchronized (SimpleCache.this) {
+ final var keys = new HashSet();
+ cache.forEach((key, value) -> {
+ if (value.tryExpire()) {
+ keys.add(key);
+ }
+ });
+ cache.keySet().removeAll(keys);
+ }
+ }, frequencyMs, frequencyMs, TimeUnit.MILLISECONDS);
+ }
+
+ public synchronized V get(final K key, final Supplier valueSupplier, final Consumer expireCallback) {
+ final var value = cache.get(key);
+ if (value != null) {
+ value.updateDeadline();
+ return value.value;
+ }
+
+ final var newValue = new ExpirableValue<>(valueSupplier.get(), expireCallback);
+ newValue.updateDeadline();
+ cache.put(key, newValue);
+ return newValue.value;
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index bde9307552f59..8ea84f69476d4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -65,6 +65,7 @@
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.impl.SingleSnapshotAbortedTxnProcessorImpl;
+import org.apache.pulsar.broker.transaction.buffer.impl.TableView;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndex;
@@ -89,7 +90,6 @@
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -581,6 +581,19 @@ private void checkSnapshotCount(TopicName topicName, boolean hasSnapshot,
reader.close();
}
+ static class MockTableView extends TableView {
+
+ public MockTableView(PulsarService pulsar) {
+ super(topic -> pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService()
+ .createReader(topic), 30000L, pulsar.getExecutor());
+ }
+
+ @Override
+ public SystemTopicClient.Reader getReader(String topic) {
+ return readerCreator.apply(TopicName.get(topic)).join();
+ }
+ }
+
@Test(timeOut=30000)
public void testTransactionBufferRecoverThrowException() throws Exception {
String topic = NAMESPACE1 + "/testTransactionBufferRecoverThrowPulsarClientException";
@@ -611,6 +624,7 @@ public void testTransactionBufferRecoverThrowException() throws Exception {
doReturn(CompletableFuture.completedFuture(reader))
.when(systemTopicTxnBufferSnapshotService).createReader(any());
doReturn(refCounterWriter).when(systemTopicTxnBufferSnapshotService).getReferenceWriter(any());
+ doReturn(new MockTableView(pulsarServiceList.get(0))).when(systemTopicTxnBufferSnapshotService).getTableView();
TransactionBufferSnapshotServiceFactory transactionBufferSnapshotServiceFactory =
mock(TransactionBufferSnapshotServiceFactory.class);
doReturn(systemTopicTxnBufferSnapshotService)
@@ -662,7 +676,8 @@ private void checkCloseTopic(PulsarClient pulsarClient,
PersistentTopic originalTopic,
Field field,
Producer producer) throws Exception {
- field.set(getPulsarServiceList().get(0), transactionBufferSnapshotServiceFactory);
+ final var pulsar = getPulsarServiceList().get(0);
+ field.set(pulsar, transactionBufferSnapshotServiceFactory);
// recover again will throw then close topic
new TopicTransactionBuffer(originalTopic);
@@ -673,7 +688,7 @@ private void checkCloseTopic(PulsarClient pulsarClient,
assertTrue((boolean) close.get(originalTopic));
});
- field.set(getPulsarServiceList().get(0), transactionBufferSnapshotServiceFactoryOriginal);
+ field.set(pulsar, transactionBufferSnapshotServiceFactoryOriginal);
Transaction txn = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
@@ -683,29 +698,11 @@ private void checkCloseTopic(PulsarClient pulsarClient,
txn.commit().get();
}
-
- @Test
- public void testTransactionBufferNoSnapshotCloseReader() throws Exception{
- String topic = NAMESPACE1 + "/test";
- @Cleanup
- Producer producer = pulsarClient.newProducer(Schema.STRING).producerName("testTxnTimeOut_producer")
- .topic(topic).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
-
- admin.topics().unload(topic);
-
- // unload success, all readers have been closed except for the compaction sub
- producer.send("test");
- TopicStats stats = admin.topics().getStats(NAMESPACE1 + "/" + TRANSACTION_BUFFER_SNAPSHOT);
-
- // except for the compaction sub
- assertEquals(stats.getSubscriptions().size(), 1);
- assertTrue(stats.getSubscriptions().keySet().contains("__compaction"));
- }
-
@Test
public void testTransactionBufferIndexSystemTopic() throws Exception {
+ final var pulsar = pulsarServiceList.get(0);
SystemTopicTxnBufferSnapshotService transactionBufferSnapshotIndexService =
- new TransactionBufferSnapshotServiceFactory(pulsarClient).getTxnBufferSnapshotIndexService();
+ new TransactionBufferSnapshotServiceFactory(pulsar).getTxnBufferSnapshotIndexService();
SystemTopicClient.Writer indexesWriter =
transactionBufferSnapshotIndexService.getReferenceWriter(
@@ -765,9 +762,10 @@ public void testTransactionBufferSegmentSystemTopic() throws Exception {
BrokerService brokerService = pulsarService.getBrokerService();
// create snapshot segment writer
+ final var pulsar = pulsarServiceList.get(0);
SystemTopicTxnBufferSnapshotService
transactionBufferSnapshotSegmentService =
- new TransactionBufferSnapshotServiceFactory(pulsarClient).getTxnBufferSnapshotSegmentService();
+ new TransactionBufferSnapshotServiceFactory(pulsar).getTxnBufferSnapshotSegmentService();
SystemTopicClient.Writer
segmentWriter = transactionBufferSnapshotSegmentService
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleCacheTest.java
new file mode 100644
index 0000000000000..c590eda171804
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleCacheTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.Test;
+
+public class SimpleCacheTest {
+
+ private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+
+ @AfterClass
+ public void shutdown() {
+ executor.shutdown();
+ }
+
+ @Test
+ public void testConcurrentUpdate() throws Exception {
+ final var cache = new SimpleCache(executor, 10000L, 10000L);
+ final var pool = Executors.newFixedThreadPool(2);
+ final var latch = new CountDownLatch(2);
+ for (int i = 0; i < 2; i++) {
+ final var value = i + 100;
+ pool.execute(() -> {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ignored) {
+ }
+ cache.get(0, () -> value, __ -> {});
+ latch.countDown();
+ });
+ }
+ latch.await();
+ final var value = cache.get(0, () -> -1, __ -> {});
+ Assert.assertTrue(value == 100 || value == 101);
+ pool.shutdown();
+ }
+
+ @Test
+ public void testExpire() throws InterruptedException {
+ final var cache = new SimpleCache(executor, 500L, 5);
+ final var expiredValues = Collections.synchronizedSet(new HashSet());
+
+ final var allKeys = IntStream.range(0, 5).boxed().collect(Collectors.toSet());
+ allKeys.forEach(key -> cache.get(key, () -> key + 100, expiredValues::add));
+
+ Thread.sleep(400L);
+ final var recentAccessedKey = Set.of(1, 2);
+ recentAccessedKey.forEach(key -> cache.get(key, () -> -1, expiredValues::add)); // access these keys
+
+ Thread.sleep(300L);
+ recentAccessedKey.forEach(key -> Assert.assertEquals(key + 100, cache.get(key, () -> -1, __ -> {})));
+ allKeys.stream().filter(key -> !recentAccessedKey.contains(key))
+ .forEach(key -> Assert.assertEquals(-1, cache.get(key, () -> -1, __ -> {})));
+ }
+}