diff --git a/src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java b/src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java index 38d8982ed..ce594f033 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java @@ -3,8 +3,12 @@ import com.google.common.base.Strings; import java.math.BigInteger; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.common.utils.Crc32C; public class FileNameUtils { @@ -192,4 +196,34 @@ private static String readFromFileName(String fileName, int index) { return matcher.group(index); } + + /** + * Find gaps in offset ranges. + * + * @param filenames list of files + * @return continuous and missing offsets for given filenames + */ + static OffsetContinuityRanges searchForMissingOffsets(List filenames) { + List> missingOffsets = new ArrayList<>(); + + List> continuousOffsets = + filenames.stream() + .map( + file -> + Pair.of( + FileNameUtils.fileNameToStartOffset(file), + FileNameUtils.fileNameToEndOffset(file))) + .sorted() + .collect(Collectors.toList()); + + for (int i = 1; i < continuousOffsets.size(); i++) { + Pair current = continuousOffsets.get(i); + Pair previous = continuousOffsets.get(i - 1); + + if (previous.getRight() + 1 != current.getLeft()) { + missingOffsets.add(Pair.of(previous.getRight() + 1, current.getLeft() - 1)); + } + } + return new OffsetContinuityRanges(continuousOffsets, missingOffsets); + } } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/KCLogger.java b/src/main/java/com/snowflake/kafka/connector/internal/KCLogger.java index 91a9494ab..4db31357a 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/KCLogger.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/KCLogger.java @@ -95,6 +95,10 @@ public void error(String format, Object... vars) { } } + public boolean isDebugEnabled() { + return logger.isDebugEnabled(); + } + private String getFormattedLogMessage(String format, Object... vars) { if (prependMdcContext) { String connCtx = MDC.get(MDC_CONN_CTX_KEY); diff --git a/src/main/java/com/snowflake/kafka/connector/internal/OffsetContinuityRanges.java b/src/main/java/com/snowflake/kafka/connector/internal/OffsetContinuityRanges.java new file mode 100644 index 000000000..01c0f476f --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/internal/OffsetContinuityRanges.java @@ -0,0 +1,30 @@ +package com.snowflake.kafka.connector.internal; + +import java.util.List; +import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; + +public class OffsetContinuityRanges { + private final List> continuousOffsets; + private final List> missingOffsets; + + public OffsetContinuityRanges( + List> continuousOffsets, List> missingOffsets) { + this.continuousOffsets = continuousOffsets; + this.missingOffsets = missingOffsets; + } + + public String getContinuousOffsets() { + return serializeList(continuousOffsets); + } + + public String getMissingOffsets() { + return serializeList(missingOffsets); + } + + private static String serializeList(List> list) { + return list.stream() + .map(range -> "[" + range.getLeft() + "," + range.getRight() + "]") + .collect(Collectors.joining("", "[", "]")); + } +} diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java index 14c599ff7..16f027636 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java @@ -2,6 +2,7 @@ import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED; import static com.snowflake.kafka.connector.config.TopicToTableModeExtractor.determineTopic2TableMode; +import static com.snowflake.kafka.connector.internal.FileNameUtils.searchForMissingOffsets; import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_RECORD_COUNT; import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_SIZE_BYTES; import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_SUB_DOMAIN; @@ -1085,22 +1086,35 @@ private void filterResultFromSnowpipeScan( private void purge(List files) { if (!files.isEmpty()) { - LOGGER.debug( - "Purging loaded files for pipe:{}, loadedFileCount:{}, loadedFiles:{}", + OffsetContinuityRanges offsets = searchForMissingOffsets(files); + LOGGER.info( + "Purging loaded files for pipe: {}, loadedFileCount: {}, continuousOffsets: {}," + + " missingOffsets: {}", pipeName, files.size(), - Arrays.toString(files.toArray())); + offsets.getContinuousOffsets(), + offsets.getMissingOffsets()); + LOGGER.debug("Purging files: {}", files); conn.purgeStage(stageName, files); } } private void moveToTableStage(List failedFiles) { if (!failedFiles.isEmpty()) { - LOGGER.debug( - "Moving failed files for pipe:{} to tableStage failedFileCount:{}, failedFiles:{}", - pipeName, - failedFiles.size(), - Arrays.toString(failedFiles.toArray())); + OffsetContinuityRanges offsets = searchForMissingOffsets(failedFiles); + String baseLog = + String.format( + "Moving failed files for pipe: %s to tableStage failedFileCount: %d," + + " continuousOffsets: %s, missingOffsets: %s", + pipeName, + failedFiles.size(), + offsets.getContinuousOffsets(), + offsets.getMissingOffsets()); + if (LOGGER.isDebugEnabled()) { + LOGGER.info("{}, failedFiles: {}", baseLog, failedFiles); + } else { + LOGGER.info(baseLog); + } conn.moveToTableStage(tableName, stageName, failedFiles); } } diff --git a/src/test/java/com/snowflake/kafka/connector/internal/FileNameUtilsTest.java b/src/test/java/com/snowflake/kafka/connector/internal/FileNameUtilsTest.java index d1b2f4c59..1cddc5a06 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/FileNameUtilsTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/FileNameUtilsTest.java @@ -1,11 +1,21 @@ package com.snowflake.kafka.connector.internal; +import static com.snowflake.kafka.connector.internal.FileNameUtils.*; +import static com.snowflake.kafka.connector.internal.FileNameUtils.searchForMissingOffsets; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Stream; import org.junit.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; public class FileNameUtilsTest { + @Test public void testFileNameFunctions() throws InterruptedException { int partition = 123; @@ -154,4 +164,58 @@ public void testFileNameWontSupportMoreThan32767Partitions() { endOffset)) .isInstanceOf(IllegalArgumentException.class); } + + @ParameterizedTest + @MethodSource("testData") + public void testSearchForMissingOffsets( + List fileNames, String expectedContinuousOffsets, String expectedMissingOffsets) { + // when + OffsetContinuityRanges result = searchForMissingOffsets(fileNames); + + // then + assertThat(result.getContinuousOffsets()).isEqualTo(expectedContinuousOffsets); + assertThat(result.getMissingOffsets()).isEqualTo(expectedMissingOffsets); + } + + public static Stream testData() { + int partition = 123; + String tableName = "test_table"; + String filePrefix = filePrefix(TestUtils.TEST_CONNECTOR_NAME, tableName, "topic", partition); + + return Stream.of( + Arguments.of(Collections.emptyList(), "[]", "[]"), + Arguments.of(Collections.singletonList(fileName(filePrefix, 0, 10)), "[[0,10]]", "[]"), + Arguments.of( + Arrays.asList(fileName(filePrefix, 0, 10), fileName(filePrefix, 100, 2137)), + "[[0,10][100,2137]]", + "[[11,99]]"), + Arguments.of( + Arrays.asList( + fileName(filePrefix, 0, 10), + fileName(filePrefix, 11, 20), + fileName(filePrefix, 21, 100), + fileName(filePrefix, 101, 1991)), + "[[0,10][11,20][21,100][101,1991]]", + "[]"), + Arguments.of( + Arrays.asList( + fileName(filePrefix, 0, 10), + fileName(filePrefix, 11, 20), + fileName(filePrefix, 21, 100), + fileName(filePrefix, 101, 1991), + fileName(filePrefix, 1996, 2000), + fileName(filePrefix, 2001, 2024)), + "[[0,10][11,20][21,100][101,1991][1996,2000][2001,2024]]", + "[[1992,1995]]"), + Arguments.of( + Arrays.asList( + fileName(filePrefix, 1996, 2000), + fileName(filePrefix, 11, 20), + fileName(filePrefix, 21, 100), + fileName(filePrefix, 2001, 2024), + fileName(filePrefix, 101, 1991), + fileName(filePrefix, 0, 10)), + "[[0,10][11,20][21,100][101,1991][1996,2000][2001,2024]]", + "[[1992,1995]]")); + } }