Skip to content

Commit

Permalink
[fix][txn] Fix getting last message ID when there are ongoing transac…
Browse files Browse the repository at this point in the history
…tions (apache#21466)
  • Loading branch information
liangyepianzhou authored Dec 13, 2023
1 parent 757723e commit 50007c3
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2119,23 +2119,28 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId)
long requestId = getLastMessageId.getRequestId();

Topic topic = consumer.getSubscription().getTopic();
Position lastPosition = topic.getLastPosition();
int partitionIndex = TopicName.getPartitionIndex(topic.getName());

Position markDeletePosition = null;
if (consumer.getSubscription() instanceof PersistentSubscription) {
markDeletePosition = ((PersistentSubscription) consumer.getSubscription()).getCursor()
.getMarkDeletedPosition();
}

getLargestBatchIndexWhenPossible(
topic,
(PositionImpl) lastPosition,
(PositionImpl) markDeletePosition,
partitionIndex,
requestId,
consumer.getSubscription().getName());
topic.checkIfTransactionBufferRecoverCompletely(true).thenRun(() -> {
Position lastPosition = ((PersistentTopic) topic).getMaxReadPosition();
int partitionIndex = TopicName.getPartitionIndex(topic.getName());

Position markDeletePosition = null;
if (consumer.getSubscription() instanceof PersistentSubscription) {
markDeletePosition = ((PersistentSubscription) consumer.getSubscription()).getCursor()
.getMarkDeletedPosition();
}

getLargestBatchIndexWhenPossible(
topic,
(PositionImpl) lastPosition,
(PositionImpl) markDeletePosition,
partitionIndex,
requestId,
consumer.getSubscription().getName());
}).exceptionally(e -> {
writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
ServerError.UnknownError, "Failed to recover Transaction Buffer."));
return null;
});
} else {
writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
ServerError.MetadataError, "Consumer not found"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
this.transactionBuffer = brokerService.getPulsar()
.getTransactionBufferProvider().newTransactionBuffer(this);
} else {
this.transactionBuffer = new TransactionBufferDisable();
this.transactionBuffer = new TransactionBufferDisable(this);
}
transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry());
if (ledger instanceof ShadowManagedLedgerImpl) {
Expand Down Expand Up @@ -420,7 +420,7 @@ public CompletableFuture<Void> initialize() {
this.transactionBuffer = brokerService.getPulsar()
.getTransactionBufferProvider().newTransactionBuffer(this);
} else {
this.transactionBuffer = new TransactionBufferDisable();
this.transactionBuffer = new TransactionBufferDisable(this);
}
shadowSourceTopic = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,12 @@ public TransactionBufferReader newReader(long sequenceId) throws

final ConcurrentMap<TxnID, TxnBuffer> buffers;
final Map<Long, Set<TxnID>> txnIndex;
private final Topic topic;

public InMemTransactionBuffer(Topic topic) {
this.buffers = new ConcurrentHashMap<>();
this.txnIndex = new HashMap<>();
this.topic = topic;
}

@Override
Expand Down Expand Up @@ -372,7 +375,7 @@ public void syncMaxReadPositionForNormalPublish(PositionImpl position) {

@Override
public PositionImpl getMaxReadPosition() {
return PositionImpl.LATEST;
return (PositionImpl) topic.getLastPosition();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.pulsar.broker.service.BrokerServiceException;
Expand Down Expand Up @@ -446,8 +447,7 @@ void updateMaxReadPosition(TxnID txnID) {
ongoingTxns.remove(txnID);
if (!ongoingTxns.isEmpty()) {
PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey());
//max read position is less than first ongoing transaction message position, so entryId -1
maxReadPosition = PositionImpl.get(position.getLedgerId(), position.getEntryId() - 1);
maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position);
} else {
maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
Expand All @@ -40,6 +41,11 @@
@Slf4j
public class TransactionBufferDisable implements TransactionBuffer {

private final Topic topic;
public TransactionBufferDisable(Topic topic) {
this.topic = topic;
}

@Override
public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
return CompletableFuture.completedFuture(null);
Expand Down Expand Up @@ -91,7 +97,7 @@ public void syncMaxReadPositionForNormalPublish(PositionImpl position) {

@Override
public PositionImpl getMaxReadPosition() {
return PositionImpl.LATEST;
return (PositionImpl) topic.getLastPosition();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,18 @@
*/
package org.apache.pulsar.broker.transaction.buffer;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.when;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertTrue;
import static org.testng.AssertJUnit.fail;
import java.util.List;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
Expand All @@ -30,8 +39,13 @@
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
Expand Down Expand Up @@ -179,4 +193,121 @@ public void testCloseTransactionBufferWhenTimeout() throws Exception {
Assert.assertTrue(f.isCompletedExceptionally());
}

/**
* This test mainly test the following two point:
* 1. `getLastMessageIds` will get max read position.
* Send two message |1:0|1:1|; mock max read position as |1:0|; `getLastMessageIds` will get |1:0|.
* 2. `getLastMessageIds` will wait Transaction buffer recover completely.
* Mock `checkIfTBRecoverCompletely` return an exception, `getLastMessageIds` will fail too.
* Mock `checkIfTBRecoverCompletely` return null, `getLastMessageIds` will get correct result.
*/
@Test
public void testGetMaxPositionAfterTBReady() throws Exception {
// 1. Prepare test environment.
String topic = "persistent://" + NAMESPACE1 + "/testGetMaxReadyPositionAfterTBReady";
// 1.1 Mock component.
TransactionBuffer transactionBuffer = Mockito.spy(TransactionBuffer.class);
when(transactionBuffer.checkIfTBRecoverCompletely(anyBoolean()))
// Handle producer will check transaction buffer recover completely.
.thenReturn(CompletableFuture.completedFuture(null))
// If the Transaction buffer failed to recover, we can not get the correct last max read id.
.thenReturn(CompletableFuture.failedFuture(new Throwable("Mock fail")))
// If the transaction buffer recover successfully, the max read position can be acquired successfully.
.thenReturn(CompletableFuture.completedFuture(null));
TransactionBufferProvider transactionBufferProvider = Mockito.spy(TransactionBufferProvider.class);
Mockito.doReturn(transactionBuffer).when(transactionBufferProvider).newTransactionBuffer(any());
TransactionBufferProvider originalTBProvider = getPulsarServiceList().get(0).getTransactionBufferProvider();
Mockito.doReturn(transactionBufferProvider).when(getPulsarServiceList().get(0)).getTransactionBufferProvider();
// 2. Building producer and consumer.
admin.topics().createNonPartitionedTopic(topic);
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub")
.subscribe();
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.create();
// 3. Send message and test the exception can be handled as expected.
MessageIdImpl messageId = (MessageIdImpl) producer.newMessage().send();
producer.newMessage().send();
Mockito.doReturn(new PositionImpl(messageId.getLedgerId(), messageId.getEntryId()))
.when(transactionBuffer).getMaxReadPosition();
try {
consumer.getLastMessageIds();
fail();
} catch (PulsarClientException exception) {
assertTrue(exception.getMessage().contains("Failed to recover Transaction Buffer."));
}
List<TopicMessageId> messageIdList = consumer.getLastMessageIds();
assertEquals(messageIdList.size(), 1);
TopicMessageIdImpl actualMessageID = (TopicMessageIdImpl) messageIdList.get(0);
assertEquals(messageId.getLedgerId(), actualMessageID.getLedgerId());
assertEquals(messageId.getEntryId(), actualMessageID.getEntryId());
// 4. Clean resource
Mockito.doReturn(originalTBProvider).when(getPulsarServiceList().get(0)).getTransactionBufferProvider();
}

/**
* Add a E2E test for the get last message ID. It tests 4 cases.
* <p>
* 1. Only normal messages in the topic.
* 2. There are ongoing transactions, last message ID will not be updated until transaction end.
* 3. Aborted transaction will make the last message ID be updated as expected.
* 4. Committed transaction will make the last message ID be updated as expected.
* </p>
*/
@Test
public void testGetLastMessageIdsWithOngoingTransactions() throws Exception {
// 1. Prepare environment
String topic = "persistent://" + NAMESPACE1 + "/testGetLastMessageIdsWithOngoingTransactions";
String subName = "my-subscription";
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.create();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscribe();

// 2. Test last max read position can be required correctly.
// 2.1 Case1: send 3 original messages. |1:0|1:1|1:2|
MessageIdImpl expectedLastMessageID = null;
for (int i = 0; i < 3; i++) {
expectedLastMessageID = (MessageIdImpl) producer.newMessage().send();
}
assertMessageId(consumer, expectedLastMessageID, 0);
// 2.2 Case2: send 2 ongoing transactional messages and 2 original messages.
// |1:0|1:1|1:2|txn1->1:3|1:4|txn2->1:5|1:6|.
Transaction txn1 = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.HOURS)
.build()
.get();
Transaction txn2 = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.HOURS)
.build()
.get();
producer.newMessage(txn1).send();
MessageIdImpl expectedLastMessageID1 = (MessageIdImpl) producer.newMessage().send();
producer.newMessage(txn2).send();
MessageIdImpl expectedLastMessageID2 = (MessageIdImpl) producer.newMessage().send();
// 2.2.1 Last message ID will not change when txn1 and txn2 do not end.
assertMessageId(consumer, expectedLastMessageID, 0);
// 2.2.2 Last message ID will update to 1:4 when txn1 committed.
txn1.commit().get(5, TimeUnit.SECONDS);
assertMessageId(consumer, expectedLastMessageID1, 0);
// 2.2.3 Last message ID will update to 1:6 when txn2 aborted.
txn2.abort().get(5, TimeUnit.SECONDS);
// Todo: We can not ignore the marker's position in this fix.
assertMessageId(consumer, expectedLastMessageID2, 2);
}

private void assertMessageId(Consumer<?> consumer, MessageIdImpl expected, int entryOffset) throws Exception {
TopicMessageIdImpl actual = (TopicMessageIdImpl) consumer.getLastMessageIds().get(0);
assertEquals(expected.getEntryId(), actual.getEntryId() - entryOffset);
assertEquals(expected.getLedgerId(), actual.getLedgerId());
}

}

0 comments on commit 50007c3

Please sign in to comment.