Skip to content

Commit

Permalink
[server] Increased Wait After Unsubscribe During State Transitions (#…
Browse files Browse the repository at this point in the history
…1213)

Increased the timeout value so that the consumer will more frequently safely unsubscribe instead of timing out. This should only be during partition state transitions because those are the times when not waiting for all inflight messages to be processed could result in a state mismatch. According to the values of the metric `consumer_records_producing_to_write_buffer_latency`, increasing the timeout to `1800s` should cover almost 100% of all cases.

Also added a log message for when `SharedKafkaConsumer#waitAfterUnsubscribe()` takes longer than 15 seconds to gather some data about how long it would take.
  • Loading branch information
KaiSernLim authored Nov 18, 2024
1 parent bb7d601 commit c3e3b1a
Show file tree
Hide file tree
Showing 12 changed files with 152 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ public abstract SharedKafkaConsumer getConsumerAssignedToVersionTopicPartition(

public abstract void unsubscribeAll(PubSubTopic versionTopic);

public abstract void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition);
public void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) {
unSubscribe(versionTopic, pubSubTopicPartition, SharedKafkaConsumer.DEFAULT_MAX_WAIT_MS);
}

public abstract void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition, long timeoutMs);

public abstract void batchUnsubscribe(PubSubTopic versionTopic, Set<PubSubTopicPartition> topicPartitionsToUnSub);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1143,7 +1143,7 @@ protected void leaderExecuteTopicSwitch(
newSourceTopic,
unreachableBrokerList);
// unsubscribe the old source and subscribe to the new source
consumerUnSubscribe(currentLeaderTopic, partitionConsumptionState);
consumerUnSubscribeForStateTransition(currentLeaderTopic, partitionConsumptionState);
waitForLastLeaderPersistFuture(
partitionConsumptionState,
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,15 @@ void resetOffsetFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPa
}

public void unsubscribeConsumerFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) {
unsubscribeConsumerFor(versionTopic, pubSubTopicPartition, SharedKafkaConsumer.DEFAULT_MAX_WAIT_MS);
}

public void unsubscribeConsumerFor(
PubSubTopic versionTopic,
PubSubTopicPartition pubSubTopicPartition,
long timeoutMs) {
for (AbstractKafkaConsumerService consumerService: kafkaServerToConsumerServiceMap.values()) {
consumerService.unSubscribe(versionTopic, pubSubTopicPartition);
consumerService.unSubscribe(versionTopic, pubSubTopicPartition, timeoutMs);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,15 +243,15 @@ public void unsubscribeAll(PubSubTopic versionTopic) {
* Stop specific subscription associated with the given version topic.
*/
@Override
public void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) {
PubSubConsumerAdapter consumer = getConsumerAssignedToVersionTopicPartition(versionTopic, pubSubTopicPartition);
public void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition, long timeoutMs) {
SharedKafkaConsumer consumer = getConsumerAssignedToVersionTopicPartition(versionTopic, pubSubTopicPartition);
if (consumer != null) {
/**
* Refer {@link KafkaConsumerService#startConsumptionIntoDataReceiver} for avoiding race condition caused by
* setting data receiver and unsubscribing concurrently for the same topic partition on a shared consumer.
*/
synchronized (consumer) {
consumer.unSubscribe(pubSubTopicPartition);
consumer.unSubscribe(pubSubTopicPartition, timeoutMs);
removeTopicPartitionFromConsumptionTask(consumer, pubSubTopicPartition);
}
versionTopicToTopicPartitionToConsumer.compute(versionTopic, (k, topicPartitionToConsumerMap) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ public void unsubscribeAll(PubSubTopic versionTopic) {
}

@Override
public void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) {
public void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition, long timeoutMs) {
KafkaConsumerService kafkaConsumerService = getKafkaConsumerService(versionTopic, pubSubTopicPartition);
if (kafkaConsumerService != null) {
kafkaConsumerService.unSubscribe(versionTopic, pubSubTopicPartition);
kafkaConsumerService.unSubscribe(versionTopic, pubSubTopicPartition, timeoutMs);
topicPartitionToConsumerService.remove(new TopicPartitionForIngestion(versionTopic, pubSubTopicPartition));
} else {
LOGGER.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ protected void processConsumerAction(ConsumerAction message, Store store) throws
PubSubTopic topic = message.getTopicPartition().getPubSubTopic();
PubSubTopic leaderTopic = offsetRecord.getLeaderTopic(pubSubTopicRepository);
if (leaderTopic != null && (!topic.equals(leaderTopic) || partitionConsumptionState.consumeRemotely())) {
consumerUnSubscribe(leaderTopic, partitionConsumptionState);
consumerUnSubscribeForStateTransition(leaderTopic, partitionConsumptionState);
waitForAllMessageToBeProcessedFromTopicPartition(
new PubSubTopicPartitionImpl(leaderTopic, partition),
partitionConsumptionState);
Expand Down Expand Up @@ -605,7 +605,7 @@ protected void checkLongRunningTaskState() throws InterruptedException {
* this replica can finally be promoted to leader.
*/
// unsubscribe from previous topic/partition
consumerUnSubscribe(versionTopic, partitionConsumptionState);
consumerUnSubscribeForStateTransition(versionTopic, partitionConsumptionState);

LOGGER.info(
"Starting promotion of replica: {} to leader for the partition, unsubscribed from current topic: {}",
Expand Down Expand Up @@ -675,7 +675,7 @@ protected void checkLongRunningTaskState() throws InterruptedException {
/** If LEADER is consuming remote VT or SR, EOP is already received, switch back to local fabrics. */
if (shouldLeaderSwitchToLocalConsumption(partitionConsumptionState)) {
// Unsubscribe from remote Kafka topic, but keep the consumer in cache.
consumerUnSubscribe(currentLeaderTopic, partitionConsumptionState);
consumerUnSubscribeForStateTransition(currentLeaderTopic, partitionConsumptionState);
// If remote consumption flag is false, existing messages for the partition in the drainer queue should be
// processed before that
PubSubTopicPartition leaderTopicPartition =
Expand Down Expand Up @@ -1015,7 +1015,7 @@ protected void leaderExecuteTopicSwitch(
}

// unsubscribe the old source and subscribe to the new source
consumerUnSubscribe(currentLeaderTopic, partitionConsumptionState);
consumerUnSubscribeForStateTransition(currentLeaderTopic, partitionConsumptionState);
waitForLastLeaderPersistFuture(
partitionConsumptionState,
String.format(
Expand Down Expand Up @@ -3816,7 +3816,7 @@ protected void resubscribe(PartitionConsumptionState partitionConsumptionState)
protected void resubscribeAsFollower(PartitionConsumptionState partitionConsumptionState)
throws InterruptedException {
int partition = partitionConsumptionState.getPartition();
consumerUnSubscribe(versionTopic, partitionConsumptionState);
consumerUnSubscribeForStateTransition(versionTopic, partitionConsumptionState);
waitForAllMessageToBeProcessedFromTopicPartition(
new PubSubTopicPartitionImpl(versionTopic, partition),
partitionConsumptionState);
Expand All @@ -3836,7 +3836,7 @@ protected void resubscribeAsLeader(PartitionConsumptionState partitionConsumptio
OffsetRecord offsetRecord = partitionConsumptionState.getOffsetRecord();
PubSubTopic leaderTopic = offsetRecord.getLeaderTopic(pubSubTopicRepository);
int partition = partitionConsumptionState.getPartition();
consumerUnSubscribe(leaderTopic, partitionConsumptionState);
consumerUnSubscribeForStateTransition(leaderTopic, partitionConsumptionState);
PubSubTopicPartition leaderTopicPartition = new PubSubTopicPartitionImpl(leaderTopic, partition);
waitForAllMessageToBeProcessedFromTopicPartition(
new PubSubTopicPartitionImpl(leaderTopic, partition),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
Expand All @@ -39,8 +40,15 @@
* TODO: move this logic inside consumption task, this class does not need to be sub-class of {@link PubSubConsumerAdapter}
*/
class SharedKafkaConsumer implements PubSubConsumerAdapter {
public static final long DEFAULT_MAX_WAIT_MS = TimeUnit.SECONDS.toMillis(10);
/**
* Increase the max wait during state transitions to ensure that it waits for the messages to finish processing. A
* poll() indicates that all previous inflight messages under the previous state were processed, so there can't be a
* state mismatch. The consumer_records_producing_to_write_buffer_latency metric suggests how long the wait should be.
*/
public static final long STATE_TRANSITION_MAX_WAIT_MS = TimeUnit.MINUTES.toMillis(30);

private static final Logger LOGGER = LogManager.getLogger(SharedKafkaConsumer.class);
private long nextPollTimeOutSeconds = 10;

protected final PubSubConsumerAdapter delegate;

Expand All @@ -61,6 +69,8 @@ class SharedKafkaConsumer implements PubSubConsumerAdapter {
*/
private final AtomicBoolean waitingForPoll = new AtomicBoolean(false);

private long timeoutMsOverride = -1; // for unit test purposes

private final Time time;

/**
Expand Down Expand Up @@ -145,20 +155,23 @@ synchronized void subscribe(
updateCurrentAssignment(delegate.getAssignment());
}

public synchronized void unSubscribe(PubSubTopicPartition pubSubTopicPartition) {
unSubscribe(pubSubTopicPartition, DEFAULT_MAX_WAIT_MS);
}

/**
* There is an additional goal of this function which is to make sure that all the records consumed for this {topic,partition} prior to
* calling unsubscribe here is produced to drainer service. {@link ConsumptionTask#run()} ends up calling
* {@link SharedKafkaConsumer#poll(long)} and produceToStoreBufferService sequentially. So waiting for at least one more
* invocation of {@link SharedKafkaConsumer#poll(long)} achieves the above objective.
*/
@Override
public synchronized void unSubscribe(PubSubTopicPartition pubSubTopicPartition) {
public synchronized void unSubscribe(PubSubTopicPartition pubSubTopicPartition, long timeoutMs) {
unSubscribeAction(() -> {
this.delegate.unSubscribe(pubSubTopicPartition);
PubSubTopic versionTopic = subscribedTopicPartitionToVersionTopic.remove(pubSubTopicPartition);
unsubscriptionListener.call(this, versionTopic, pubSubTopicPartition);
return Collections.singleton(pubSubTopicPartition);
});
}, timeoutMs);
}

@Override
Expand All @@ -170,7 +183,7 @@ public synchronized void batchUnsubscribe(Set<PubSubTopicPartition> pubSubTopicP
unsubscriptionListener.call(this, versionTopic, pubSubTopicPartition);
}
return pubSubTopicPartitionSet;
});
}, DEFAULT_MAX_WAIT_MS);
}

/**
Expand All @@ -179,7 +192,7 @@ public synchronized void batchUnsubscribe(Set<PubSubTopicPartition> pubSubTopicP
*
* @param supplier which performs the unsubscription and returns a set of partitions which were unsubscribed
*/
protected synchronized void unSubscribeAction(Supplier<Set<PubSubTopicPartition>> supplier) {
protected synchronized void unSubscribeAction(Supplier<Set<PubSubTopicPartition>> supplier, long timeoutMs) {
long currentPollTimes = pollTimes;
Set<PubSubTopicPartition> topicPartitions = supplier.get();
long startTime = System.currentTimeMillis();
Expand All @@ -191,28 +204,44 @@ protected synchronized void unSubscribeAction(Supplier<Set<PubSubTopicPartition>
topicPartitions,
elapsedTime);
updateCurrentAssignment(delegate.getAssignment());
waitAfterUnsubscribe(currentPollTimes, topicPartitions);
waitAfterUnsubscribe(currentPollTimes, topicPartitions, timeoutMs);
}

protected void waitAfterUnsubscribe(long currentPollTimes, Set<PubSubTopicPartition> topicPartitions) {
protected void waitAfterUnsubscribe(
long currentPollTimes,
Set<PubSubTopicPartition> topicPartitions,
long timeoutMs) {
// This clause is only for unit test purposes, for when the timeout needs to be set to 0.
if (timeoutMsOverride != -1) {
timeoutMs = timeoutMsOverride;
}

currentPollTimes++;
waitingForPoll.set(true);
// Wait for the next poll or maximum 10 seconds. Interestingly wait api does not provide any indication if wait
// returned
// due to timeout. So an explicit time check is necessary.
long timeoutMs = (time.getNanoseconds() / Time.NS_PER_MS) + nextPollTimeOutSeconds * Time.MS_PER_SECOND;
final long startTimeMs = time.getMilliseconds();
final long endTimeMs = startTimeMs + timeoutMs;
try {
while (currentPollTimes > pollTimes) {
long waitMs = timeoutMs - (time.getNanoseconds() / Time.NS_PER_MS);
final long waitMs = endTimeMs - time.getMilliseconds();
if (waitMs <= 0) {
LOGGER.warn(
"Wait for poll request after unsubscribe topic partition(s) ({}) timed out after {} seconds",
"Wait for poll request after unsubscribe topic partition(s) ({}) timed out after {} milliseconds",
topicPartitions,
nextPollTimeOutSeconds);
timeoutMs);
break;
}
wait(waitMs);
}
final long elapsedMs = time.getMilliseconds() - startTimeMs;
if (elapsedMs > TimeUnit.SECONDS.toMillis(15)) {
LOGGER.warn(
"Wait for poll request after unsubscribe topic partition(s) ({}) took {} milliseconds",
topicPartitions,
elapsedMs);
}
// no action to take actually, just return;
} catch (InterruptedException e) {
LOGGER.info("Wait for poll request in `unsubscribe` function got interrupted.");
Expand All @@ -221,8 +250,8 @@ protected void waitAfterUnsubscribe(long currentPollTimes, Set<PubSubTopicPartit
}

// Only for testing.
void setNextPollTimeoutSeconds(long nextPollTimeOutSeconds) {
this.nextPollTimeOutSeconds = nextPollTimeOutSeconds;
void setTimeoutMsOverride(long timeoutMsOverride) {
this.timeoutMsOverride = timeoutMsOverride;
}

// Only for testing.
Expand All @@ -248,7 +277,7 @@ public synchronized Map<PubSubTopicPartition, List<PubSubMessage<KafkaKey, Kafka
/**
* Always invoke this method no matter whether the consumer have subscription or not. Therefore we could notify any
* waiter who might be waiting for a invocation of poll to happen even if the consumer does not have subscription
* after calling {@link SharedKafkaConsumer#unSubscribe(String, int)}.
* after calling {@link SharedKafkaConsumer#unSubscribe(PubSubTopicPartition)}.
*/
pollTimes++;
if (waitingForPoll.get()) {
Expand Down Expand Up @@ -347,9 +376,4 @@ public Long endOffset(PubSubTopicPartition pubSubTopicPartition) {
public List<PubSubTopicPartitionInfo> partitionsFor(PubSubTopic topic) {
throw new UnsupportedOperationException("partitionsFor is not supported in SharedKafkaConsumer");
}

// Test only
public void setNextPollTimeOutSeconds(long seconds) {
this.nextPollTimeOutSeconds = seconds;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3401,11 +3401,20 @@ public boolean consumerHasSubscription(PubSubTopic topic, PartitionConsumptionSt
.hasConsumerAssignedFor(versionTopic, new PubSubTopicPartitionImpl(topic, partitionId));
}

public void consumerUnSubscribe(PubSubTopic topic, PartitionConsumptionState partitionConsumptionState) {
/**
* It is important during a state transition to wait in {@link SharedKafkaConsumer#waitAfterUnsubscribe(long, Set, long)}
* until all inflight messages have been processed by the consumer, otherwise there could be a mismatch in the PCS's
* leader-follower state vs the intended state when the message was polled. Thus, we use an increased timeout of up to
* 30 minutes according to the maximum value of the metric consumer_records_producing_to_write_buffer_latency.
*/
public void consumerUnSubscribeForStateTransition(
PubSubTopic topic,
PartitionConsumptionState partitionConsumptionState) {
Instant startTime = Instant.now();
int partitionId = partitionConsumptionState.getPartition();
PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl(topic, partitionId);
aggKafkaConsumerService.unsubscribeConsumerFor(versionTopic, topicPartition);
aggKafkaConsumerService
.unsubscribeConsumerFor(versionTopic, topicPartition, SharedKafkaConsumer.STATE_TRANSITION_MAX_WAIT_MS);
LOGGER.info(
"Consumer unsubscribed to topic-partition: {} for replica: {}. Took {} ms",
topicPartition,
Expand Down
Loading

0 comments on commit c3e3b1a

Please sign in to comment.