Skip to content

Commit

Permalink
Add new LiKafkaConsumerImpl metric - _offsetInvalidOrOutRangeCounter …
Browse files Browse the repository at this point in the history
…to estimate the potential data loss (#185)

Added new metric( _offsetInvalidOrOutRangeCounter) into LiKafkaConsumerImpl to count number of OffsetOutOfRange exceptions under following cases with liclosest reset strategy.

1. fetchedOffset < Log Start Offset (LSO)
2. fetchedOffset <= Log End Offset (LEO)

This metric can be used as indicator to estimate the potential data loss upon the LiKafkaConsumer client reseting the offset to "liclosest"
  • Loading branch information
zsong-li authored Aug 26, 2020
1 parent b27f603 commit da22836
Showing 1 changed file with 31 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand Down Expand Up @@ -80,6 +81,7 @@ public class LiKafkaConsumerImpl<K, V> implements LiKafkaConsumer<K, V> {
private final boolean _throwExceptionOnInvalidOffsets;
private final LiOffsetResetStrategy _offsetResetStrategy;
private long _lastAutoCommitMs;
private AtomicInteger _offsetInvalidOrOutRangeCount;
private final Map<MetricName, Metric> _extraMetrics = new HashMap<>(2);

private ConsumerRecordsProcessResult<K, V> _lastProcessedResult;
Expand Down Expand Up @@ -127,6 +129,8 @@ private LiKafkaConsumerImpl(LiKafkaConsumerConfig configs,
byteArrayDeserializer,
byteArrayDeserializer);
_clientId = LiKafkaClientsUtils.getClientId(_kafkaConsumer);
_offsetInvalidOrOutRangeCount = new AtomicInteger(0);

MetricName skippedRecordsMetricName = new MetricName(
"records-skipped",
"lnkd",
Expand All @@ -150,6 +154,30 @@ public Object metricValue() {
}
};

MetricName offsetOutOfRangeCounterName = new MetricName(
"consumer-liclosest-data-loss-estimation",
"lnkd",
"counter of how many times the consumer reached OffsetOutOfRangeException or NoOffsetForPartitionException"
+ "with liclosest reset strategy (case 2 and 3b) for potential data loss estimation.",
Collections.singletonMap("client-id", _clientId)
);
Metric offsetOutOfRangeCounter = new Metric() {
@Override
public MetricName metricName() {
return offsetOutOfRangeCounterName;
}

@Override
public double value() {
return _offsetInvalidOrOutRangeCount.get();
}

@Override
public Object metricValue() {
return value();
}
};

MetricName consumerOffsetWatermarkSpan = new MetricName(
"consumer-offset-watermark-span",
"lnkd",
Expand All @@ -176,6 +204,7 @@ public Object metricValue() {

_extraMetrics.put(skippedRecordsMetricName, skippedRecordsMetric);
_extraMetrics.put(consumerOffsetWatermarkSpan, consumerOffsetWatermarkSpanMetric);
_extraMetrics.put(offsetOutOfRangeCounterName, offsetOutOfRangeCounter);

try {

Expand Down Expand Up @@ -321,7 +350,6 @@ private ConsumerRecords<K, V> poll(long timeout, boolean includeMetadataInTimeou
}
} catch (OffsetOutOfRangeException | NoOffsetForPartitionException oe) {
handleInvalidOffsetException(oe);

// force throw exception if exception.on.invalid.offset.reset is set to true
if (_throwExceptionOnInvalidOffsets) {
throw oe;
Expand Down Expand Up @@ -666,6 +694,7 @@ private void handleLiClosestResetStrategy(InvalidOffsetException oe) {
if (beginningOffset != -1L && endOffset != -1L) {
if (beginningOffset > fetchedOffset) { // Case 2
seekBeginningPartitions.put(tp, beginningOffset);
_offsetInvalidOrOutRangeCount.getAndIncrement();
return;
}
if (endOffset < fetchedOffset) { // Case 3a
Expand All @@ -674,6 +703,7 @@ private void handleLiClosestResetStrategy(InvalidOffsetException oe) {
} else { // Case 3b: endOffset >= fetchedOffset
LOG.debug("Closest offset computed for topic partition {} is the fetched offset {}. ", tp, fetchedOffset);
seekFetchedOffsetPartitions.put(tp, fetchedOffset);
_offsetInvalidOrOutRangeCount.getAndIncrement();
}
} else {
// can't handle reset if the either bound values are not known
Expand Down

0 comments on commit da22836

Please sign in to comment.