Skip to content

Commit

Permalink
[fix] [tx] [branch-2.11] Transaction buffer recover blocked by readNe…
Browse files Browse the repository at this point in the history
…xt (#18969)

### Motivation
Since PR #18833 can not cherry-pick to `branch-2.11`, create a separate PR.


#### Context for Transaction Buffer
- If turn on `transactionCoordinatorEnabled`,  then `TransactionBuffer` will be initialized when a user topic create.
- The `TransactionBuffer` reads the aborted logs of transactions from topic `__transaction_buffer_snapshot`  -- this process is called `recovery`.
- During recovery, the reading from that snapshot ledger is done via a `Reader`; the reader works like this:
```
while (reader.hasMessageAvailable()){
    reader.readNext();
}
``` 

#### Context for Compaction
- After [pip-14](https://github.com/apache/pulsar/wiki/PIP-14:-Topic-compaction), the consumer that enabled feature read-compacted will read messages from the compacted topic instead of the original topic if the task-compaction is done, and read messages from the original topic if task-compaction is not done.
- If the data of the last message with key k sent to a topic is null, the compactor will mark all messages for that key as deleted.

#### Issue
There is a race condition: after executing `reader.hasMessageAvailable`,  the following messages have been deleted by compaction-task, so read next will be blocked because there have no messages to read.

----

### Modifications

- If hits this issue, do recover again.

----

#### Why not just let the client try to load the topic again to retry the recover?
If the topic load is failed, the client will receive an error response. This is a behavior that we can handle, so should not be perceived by the users.
  • Loading branch information
poorbarcode authored Dec 19, 2022
1 parent a968da5 commit faa9416
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -52,13 +53,15 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RecoverTimeRecord;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.SpscArrayQueue;
Expand Down Expand Up @@ -615,6 +618,11 @@ private TopicTransactionBufferRecover(TopicTransactionBufferRecoverCallBack call
this.takeSnapshotWriter = takeSnapshotWriter;
}

private long getSystemClientOperationTimeoutMs() throws Exception {
PulsarClientImpl pulsarClient = (PulsarClientImpl) topic.getBrokerService().getPulsar().getClient();
return pulsarClient.getConfiguration().getOperationTimeoutMs();
}

@SneakyThrows
@Override
public void run() {
Expand All @@ -629,7 +637,8 @@ public void run() {
try {
boolean hasSnapshot = false;
while (reader.hasMoreEvents()) {
Message<TransactionBufferSnapshot> message = reader.readNext();
Message<TransactionBufferSnapshot> message = reader.readNextAsync()
.get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
if (topic.getName().equals(message.getKey())) {
TransactionBufferSnapshot transactionBufferSnapshot = message.getValue();
if (transactionBufferSnapshot != null) {
Expand All @@ -642,18 +651,25 @@ public void run() {
}
}
if (!hasSnapshot) {
closeReader(reader);
callBack.noNeedToRecover();
return;
}
} catch (TimeoutException ex) {
Throwable t = FutureUtil.unwrapCompletionException(ex);
String errorMessage = String.format("[%s] Transaction buffer recover fail by read "
+ "transactionBufferSnapshot timeout!", topic.getName());
log.error(errorMessage, t);
callBack.recoverExceptionally(
new BrokerServiceException.ServiceUnitNotReadyException(errorMessage, t));
return;
} catch (Exception ex) {
log.error("[{}] Transaction buffer recover fail when read "
+ "transactionBufferSnapshot!", topic.getName(), ex);
callBack.recoverExceptionally(ex);
closeReader(reader);
return;
} finally {
closeReader(reader);
}
closeReader(reader);

ManagedCursor managedCursor;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@

import static org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
Expand All @@ -33,9 +36,11 @@
import java.lang.reflect.Method;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
Expand Down Expand Up @@ -88,6 +93,7 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
private static final int NUM_PARTITIONS = 16;
@BeforeMethod
protected void setup() throws Exception {
conf.getProperties().setProperty("brokerClient_operationTimeoutMs", Integer.valueOf(10 * 1000).toString());
setUpBase(1, NUM_PARTITIONS, RECOVER_COMMIT, 0);
admin.topics().createNonPartitionedTopic(RECOVER_ABORT);
admin.topics().createNonPartitionedTopic(TAKE_SNAPSHOT);
Expand Down Expand Up @@ -224,6 +230,81 @@ private void recoverTest(String testTopic) throws Exception {

}

private void makeTBSnapshotReaderTimeoutIfFirstRead(TopicName topicName) throws Exception {
SystemTopicClient.Reader mockReader = mock(SystemTopicClient.Reader.class);
AtomicBoolean isFirstCallOfMethodHasMoreEvents = new AtomicBoolean();
AtomicBoolean isFirstCallOfMethodHasReadNext = new AtomicBoolean();
AtomicBoolean isFirstCallOfMethodHasReadNextAsync = new AtomicBoolean();

doAnswer(invocation -> {
if (isFirstCallOfMethodHasMoreEvents.compareAndSet(false,true)){
return true;
} else {
return false;
}
}).when(mockReader).hasMoreEvents();

doAnswer(invocation -> {
if (isFirstCallOfMethodHasReadNext.compareAndSet(false, true)){
// Just stuck the thread.
Thread.sleep(3600 * 1000);
}
return null;
}).when(mockReader).readNext();

doAnswer(invocation -> {
CompletableFuture<Message> future = new CompletableFuture<>();
new Thread(() -> {
if (isFirstCallOfMethodHasReadNextAsync.compareAndSet(false, true)){
// Just stuck the thread.
try {
Thread.sleep(3600 * 1000);
} catch (InterruptedException e) {
}
future.complete(null);
} else {
future.complete(null);
}
}).start();
return future;
}).when(mockReader).readNextAsync();

when(mockReader.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));

for (PulsarService pulsarService : pulsarServiceList){
// Init prop: lastMessageIdInBroker.
final TransactionBufferSnapshotService tbSnapshotService =
pulsarService.getTransactionBufferSnapshotService();
TransactionBufferSnapshotService spyTbSnapshotService = spy(tbSnapshotService);
doAnswer(invocation -> CompletableFuture.completedFuture(mockReader))
.when(spyTbSnapshotService).createReader(topicName);
Field field =
PulsarService.class.getDeclaredField("transactionBufferSnapshotService");
field.setAccessible(true);
field.set(pulsarService, spyTbSnapshotService);
}
}

@Test(timeOut = 60 * 1000)
public void testTBRecoverCanRetryIfTimeoutRead() throws Exception {
String topicName = String.format("persistent://%s/%s", NAMESPACE1,
"tx_recover_" + UUID.randomUUID().toString().replaceAll("-", "_"));

// Make race condition of "getLastMessageId" and "compaction" to make recover can't complete.
makeTBSnapshotReaderTimeoutIfFirstRead(TopicName.get(topicName));
// Verify( Cmd-PRODUCER will wait for TB recover finished )
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topicName)
.sendTimeout(0, TimeUnit.SECONDS)
.enableBatching(false)
.batchingMaxMessages(2)
.create();

// cleanup.
producer.close();
admin.topics().delete(topicName, false);
}

@Test
private void testTakeSnapshot() throws IOException, ExecutionException, InterruptedException {

Expand Down

0 comments on commit faa9416

Please sign in to comment.