Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NO-SNOW Improve offset logging for Snowpipe #1027

Merged
merged 10 commits into from
Dec 11, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
import com.google.common.base.Strings;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.common.utils.Crc32C;

public class FileNameUtils {
Expand Down Expand Up @@ -192,4 +196,34 @@ private static String readFromFileName(String fileName, int index) {

return matcher.group(index);
}

/**
* Find gaps in offset ranges.
*
* @param filenames list of files
* @return continuous and missing offsets for given filenames
*/
static OffsetContinuityRanges searchForMissingOffsets(List<String> filenames) {
List<Pair<Long, Long>> missingOffsets = new ArrayList<>();

List<Pair<Long, Long>> continuousOffsets =
sfc-gh-mbobowski marked this conversation as resolved.
Show resolved Hide resolved
filenames.stream()
.map(
file ->
Pair.of(
FileNameUtils.fileNameToStartOffset(file),
FileNameUtils.fileNameToEndOffset(file)))
.sorted()
.collect(Collectors.toList());

for (int i = 1; i < continuousOffsets.size(); i++) {
Pair<Long, Long> current = continuousOffsets.get(i);
Pair<Long, Long> previous = continuousOffsets.get(i - 1);

if (previous.getRight() + 1 != current.getLeft()) {
missingOffsets.add(Pair.of(previous.getRight() + 1, current.getLeft() - 1));
}
}
return new OffsetContinuityRanges(continuousOffsets, missingOffsets);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ public void error(String format, Object... vars) {
}
}

public boolean isDebugEnabled() {
return logger.isDebugEnabled();
}

private String getFormattedLogMessage(String format, Object... vars) {
if (prependMdcContext) {
String connCtx = MDC.get(MDC_CONN_CTX_KEY);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.snowflake.kafka.connector.internal;

import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;

public class OffsetContinuityRanges {
private final List<Pair<Long, Long>> continuousOffsets;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we could also introduce a class "OffsetRange" with startOffsetInclusive, endOffsetExclusive but I'm fine with a pair

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or a data class... oh no wait, it's not a Kotlin.
I think that Pair is good enough for this use case.

private final List<Pair<Long, Long>> missingOffsets;

public OffsetContinuityRanges(
List<Pair<Long, Long>> continuousOffsets, List<Pair<Long, Long>> missingOffsets) {
this.continuousOffsets = continuousOffsets;
this.missingOffsets = missingOffsets;
}

public String getContinuousOffsets() {
return serializeList(continuousOffsets);
}

public String getMissingOffsets() {
return serializeList(missingOffsets);
}

private static String serializeList(List<Pair<Long, Long>> list) {
return list.stream()
.map(range -> "[" + range.getLeft() + "," + range.getRight() + "]")
.collect(Collectors.joining("", "[", "]"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED;
import static com.snowflake.kafka.connector.config.TopicToTableModeExtractor.determineTopic2TableMode;
import static com.snowflake.kafka.connector.internal.FileNameUtils.searchForMissingOffsets;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_RECORD_COUNT;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_SIZE_BYTES;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_SUB_DOMAIN;
Expand Down Expand Up @@ -1085,22 +1086,35 @@ private void filterResultFromSnowpipeScan(

private void purge(List<String> files) {
if (!files.isEmpty()) {
LOGGER.debug(
"Purging loaded files for pipe:{}, loadedFileCount:{}, loadedFiles:{}",
OffsetContinuityRanges offsets = searchForMissingOffsets(files);
LOGGER.info(
"Purging loaded files for pipe: {}, loadedFileCount: {}, continuousOffsets: {},"
+ " missingOffsets: {}",
pipeName,
files.size(),
Arrays.toString(files.toArray()));
offsets.getContinuousOffsets(),
offsets.getMissingOffsets());
LOGGER.debug("Purging files: {}", files);
conn.purgeStage(stageName, files);
}
}

private void moveToTableStage(List<String> failedFiles) {
if (!failedFiles.isEmpty()) {
LOGGER.debug(
"Moving failed files for pipe:{} to tableStage failedFileCount:{}, failedFiles:{}",
pipeName,
failedFiles.size(),
Arrays.toString(failedFiles.toArray()));
OffsetContinuityRanges offsets = searchForMissingOffsets(failedFiles);
String baseLog =
String.format(
"Moving failed files for pipe: %s to tableStage failedFileCount: %d,"
+ " continuousOffsets: %s, missingOffsets: %s",
pipeName,
failedFiles.size(),
offsets.getContinuousOffsets(),
offsets.getMissingOffsets());
if (LOGGER.isDebugEnabled()) {
LOGGER.info("{} - {}", baseLog, failedFiles);
sfc-gh-dseweryn marked this conversation as resolved.
Show resolved Hide resolved
} else {
LOGGER.info(baseLog);
}
conn.moveToTableStage(tableName, stageName, failedFiles);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
package com.snowflake.kafka.connector.internal;

import static com.snowflake.kafka.connector.internal.FileNameUtils.*;
import static com.snowflake.kafka.connector.internal.FileNameUtils.searchForMissingOffsets;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
import org.junit.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class FileNameUtilsTest {

@Test
public void testFileNameFunctions() throws InterruptedException {
int partition = 123;
Expand Down Expand Up @@ -154,4 +164,58 @@ public void testFileNameWontSupportMoreThan32767Partitions() {
endOffset))
.isInstanceOf(IllegalArgumentException.class);
}

@ParameterizedTest
@MethodSource("testData")
public void testSearchForMissingOffsets(
List<String> fileNames, String expectedContinuousOffsets, String expectedMissingOffsets) {
// when
OffsetContinuityRanges result = searchForMissingOffsets(fileNames);

// then
assertThat(result.getContinuousOffsets()).isEqualTo(expectedContinuousOffsets);
assertThat(result.getMissingOffsets()).isEqualTo(expectedMissingOffsets);
}

public static Stream<Arguments> testData() {
sfc-gh-mbobowski marked this conversation as resolved.
Show resolved Hide resolved
int partition = 123;
String tableName = "test_table";
String filePrefix = filePrefix(TestUtils.TEST_CONNECTOR_NAME, tableName, "topic", partition);

return Stream.of(
Arguments.of(Collections.emptyList(), "[]", "[]"),
Arguments.of(Collections.singletonList(fileName(filePrefix, 0, 10)), "[[0,10]]", "[]"),
Arguments.of(
Arrays.asList(fileName(filePrefix, 0, 10), fileName(filePrefix, 100, 2137)),
"[[0,10][100,2137]]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✝️

"[[11,99]]"),
Arguments.of(
Arrays.asList(
fileName(filePrefix, 0, 10),
fileName(filePrefix, 11, 20),
fileName(filePrefix, 21, 100),
fileName(filePrefix, 101, 1991)),
"[[0,10][11,20][21,100][101,1991]]",
"[]"),
Arguments.of(
Arrays.asList(
fileName(filePrefix, 0, 10),
fileName(filePrefix, 11, 20),
fileName(filePrefix, 21, 100),
fileName(filePrefix, 101, 1991),
fileName(filePrefix, 1996, 2000),
fileName(filePrefix, 2001, 2024)),
"[[0,10][11,20][21,100][101,1991][1996,2000][2001,2024]]",
"[[1992,1995]]"),
Arguments.of(
Arrays.asList(
fileName(filePrefix, 1996, 2000),
fileName(filePrefix, 11, 20),
fileName(filePrefix, 21, 100),
fileName(filePrefix, 2001, 2024),
fileName(filePrefix, 101, 1991),
fileName(filePrefix, 0, 10)),
"[[0,10][11,20][21,100][101,1991][1996,2000][2001,2024]]",
"[[1992,1995]]"));
}
}
Loading