Skip to content

Commit

Permalink
Merge pull request #194 from data-integrations/bugfix/CDAP-19731-vali…
Browse files Browse the repository at this point in the history
…date-offsets

CDAP-19731 - Validate offsets and give informative error message.
  • Loading branch information
greeshmaswaminathan authored Jan 31, 2023
2 parents 5fb62b0 + 0d44a20 commit 191c6a4
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;

Expand All @@ -49,7 +50,10 @@ private KafkaHelpers() {
* @return Mapping of topic-partiton to its latest offset
*/
public static <K, V> Map<TopicPartition, Long> getLatestOffsets(Consumer<K, V> consumer,
List<TopicPartition> topicAndPartitions) {
Collection<TopicPartition> topicAndPartitions) {
if (topicAndPartitions.isEmpty()) {
return Collections.emptyMap();
}
consumer.assign(topicAndPartitions);
consumer.seekToEnd(topicAndPartitions);

Expand All @@ -69,7 +73,10 @@ public static <K, V> Map<TopicPartition, Long> getLatestOffsets(Consumer<K, V> c
* @return Mapping of topic-partiton to its earliest offset
*/
public static <K, V> Map<TopicPartition, Long> getEarliestOffsets(Consumer<K, V> consumer,
List<TopicPartition> topicAndPartitions) {
Collection<TopicPartition> topicAndPartitions) {
if (topicAndPartitions.isEmpty()) {
return Collections.emptyMap();
}
consumer.assign(topicAndPartitions);
consumer.seekToBeginning(topicAndPartitions);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.plugin.kafka.source;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -58,6 +59,7 @@
import java.util.Properties;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
* Util method for {@link KafkaStreamingSource}.
Expand Down Expand Up @@ -185,6 +187,8 @@ static Map<TopicPartition, Long> getInitialPartitionOffsets(KafkaConfig conf,
Map<TopicPartition, Long> savedPartitions = stateSupplier.get();
if (!savedPartitions.isEmpty()) {
LOG.info("Saved partitions found {}. ", savedPartitions);
validateSavedPartitions(savedPartitions, KafkaHelpers.getEarliestOffsets(consumer, savedPartitions.keySet()),
KafkaHelpers.getLatestOffsets(consumer, savedPartitions.keySet()));
return savedPartitions;
}

Expand All @@ -195,6 +199,26 @@ static Map<TopicPartition, Long> getInitialPartitionOffsets(KafkaConfig conf,
return offsets;
}

@VisibleForTesting
static void validateSavedPartitions(Map<TopicPartition, Long> savedPartitions,
Map<TopicPartition, Long> earliestOffsets,
Map<TopicPartition, Long> latestOffsets) {
String errorString = savedPartitions.keySet().
stream().
filter(topicPartition -> savedPartitions.get(topicPartition) < earliestOffsets.get(topicPartition)
|| savedPartitions.get(topicPartition) > latestOffsets.get(topicPartition)).
map(topicPartition -> String.format(
"Invalid offset %d for topic %s, partition %d. Earliest offset is %d and latest offset is %d.",
savedPartitions.get(topicPartition), topicPartition.topic(), topicPartition.partition(),
earliestOffsets.get(topicPartition), latestOffsets.get(topicPartition))).
collect(Collectors.joining("\n"));
if (errorString.isEmpty()) {
return;
}

throw new IllegalArgumentException(String.format("Problem with saved offsets:\n%s", errorString));
}

static Function2<ConsumerRecord<byte[], byte[]>, Time, StructuredRecord>
getRecordTransformFunction(KafkaConfig conf) {
return conf.getFormat() == null ? new BytesFunction(conf) : new FormatFunction(conf);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.cdap.plugin.kafka.source;

import org.apache.kafka.common.TopicPartition;
import org.junit.Test;

import java.util.HashMap;
import java.util.Map;

/**
* Tests for KafkaStreamingSourceUtil
*/
public class KafkaStreamingSourceUtilTest {

@Test
public void testValidateSavedPartitionsValid() {
Map<TopicPartition, Long> savedPartitions = new HashMap<>();
TopicPartition partition1 = new TopicPartition("test-topic", 0);
TopicPartition partition2 = new TopicPartition("test-topic", 1);
savedPartitions.put(partition1, 100L);
savedPartitions.put(partition2, 102L);
Map<TopicPartition, Long> earliestOffsets = new HashMap<>();
earliestOffsets.put(partition1, 0L);
earliestOffsets.put(partition2, 0L);
Map<TopicPartition, Long> latestOffsets = new HashMap<>();
latestOffsets.put(partition1, 202L);
latestOffsets.put(partition2, 200L);
KafkaStreamingSourceUtil.validateSavedPartitions(savedPartitions, earliestOffsets, latestOffsets);
}

@Test(expected = IllegalArgumentException.class)
public void testValidateSavedPartitionsInvalid() {
Map<TopicPartition, Long> savedPartitions = new HashMap<>();
TopicPartition partition1 = new TopicPartition("test-topic", 0);
TopicPartition partition2 = new TopicPartition("test-topic", 1);
savedPartitions.put(partition1, 100L);
savedPartitions.put(partition2, 102L);
Map<TopicPartition, Long> earliestOffsets = new HashMap<>();
earliestOffsets.put(partition1, 0L);
earliestOffsets.put(partition2, 0L);
Map<TopicPartition, Long> latestOffsets = new HashMap<>();
latestOffsets.put(partition1, 10L);
latestOffsets.put(partition2, 0L);
KafkaStreamingSourceUtil.validateSavedPartitions(savedPartitions, earliestOffsets, latestOffsets);
}
}

0 comments on commit 191c6a4

Please sign in to comment.