Skip to content

Commit

Permalink
SNOW-1642799: added exception handler when moving invalid file to tab…
Browse files Browse the repository at this point in the history
…le stage; file cleaner will preserve 'dirty' files a bit longer (snowflakedb#931) (#72)

Co-authored-by: Greg Jachimko <[email protected]>
  • Loading branch information
sudeshwasnik and sfc-gh-gjachimko authored Nov 27, 2024
1 parent a4dde8f commit 6c2ce0e
Show file tree
Hide file tree
Showing 17 changed files with 726 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,15 @@ public class SnowflakeSinkConnectorConfig {
public static final boolean SNOWPIPE_FILE_CLEANER_FIX_ENABLED_DEFAULT = false;
public static final int SNOWPIPE_FILE_CLEANER_THREADS_DEFAULT = 1;

public static final String SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED =
"snowflake.snowpipe.stageFileNameExtensionEnabled";
public static final boolean SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED_DEFAULT = true;

// Whether to close streaming channels in parallel.
public static final String SNOWPIPE_STREAMING_CLOSE_CHANNELS_IN_PARALLEL =
"snowflake.streaming.closeChannelsInParallel.enabled";
public static final boolean SNOWPIPE_STREAMING_CLOSE_CHANNELS_IN_PARALLEL_DEFAULT = true;

// This is the streaming max client lag which can be defined in config
public static final String SNOWPIPE_STREAMING_MAX_CLIENT_LAG =
"snowflake.streaming.max.client.lag";
Expand Down Expand Up @@ -573,6 +582,16 @@ static ConfigDef newConfigDef() {
Importance.LOW,
"Defines number of worker threads to associate with the cleaner task. By default there"
+ " is one cleaner per topic's partition and they all share one worker thread")
.define(
SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED,
ConfigDef.Type.BOOLEAN,
SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED_DEFAULT,
ConfigDef.Importance.LOW,
"Defines whether stage file names should be prefixed with source topic's name hash."
+ " This is required in scenarios, when there are multiple topics configured to"
+ " ingest data into a single table via topic2table map. If disabled, there is a"
+ " risk that files from various topics may collide with each other and be deleted"
+ " before ingestion.")
.define(
SNOWPIPE_STREAMING_MAX_CLIENT_LAG,
Type.LONG,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.snowflake.kafka.connector.config;

import com.snowflake.kafka.connector.Utils;
import java.util.Map;

public class TopicToTableModeExtractor {

/** Defines whether single target table is fed by one or many source topics. */
public enum Topic2TableMode {
// Single topic = single table
SINGLE_TOPIC_SINGLE_TABLE,
// Multiple topics = single table
MANY_TOPICS_SINGLE_TABLE,
}

private TopicToTableModeExtractor() {}

/**
* Util method - checks if given topic is defined in topic2Table map - if it is more than once, it
* means multiple topics will store data in single table - in such case, for SNOWPIPE ingestion we
* need to uniquely identify stage files so different instances of file cleaner won't handle
* other's channel files.
*
* @param topic
* @return
*/
public static Topic2TableMode determineTopic2TableMode(
Map<String, String> topic2TableMap, String topic) {
String tableName = Utils.tableName(topic, topic2TableMap);
return topic2TableMap.values().stream()
.filter(table -> table.equalsIgnoreCase(tableName))
.count()
> 1
? Topic2TableMode.MANY_TOPICS_SINGLE_TABLE
: Topic2TableMode.SINGLE_TOPIC_SINGLE_TABLE;
}
}
Original file line number Diff line number Diff line change
@@ -1,32 +1,15 @@
package com.snowflake.kafka.connector.internal;

import com.google.common.base.Strings;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kafka.common.utils.Crc32C;

public class FileNameUtils {
private static final KCLogger LOGGER = new KCLogger(FileNameUtils.class.getName());

/**
* generate file name File Name Format: app/table/partition/start_end_timeStamp.fileFormat.gz
* Note: all file names should using the this format
*
* @param appName connector name
* @param table table name
* @param partition partition number
* @param start start offset
* @param end end offset
* @return file name
*/
public static String fileName(String appName, String table, int partition, long start, long end) {
return fileName(filePrefix(appName, table, partition), start, end);
}

// Used for testing only
static String fileName(
String appName, String table, int partition, long start, long end, long time) {
return filePrefix(appName, table, partition) + start + "_" + end + "_" + time + ".json.gz";
}

/**
* generate file name
*
Expand All @@ -42,21 +25,6 @@ static String fileName(String prefix, long start, long end) {
return fileName;
}

/**
* generate file name for broken data
*
* @param appName app name
* @param table table name
* @param partition partition id
* @param offset record offset
* @param isKey is the broken record a key or a value
* @return file name
*/
static String brokenRecordFileName(
String appName, String table, int partition, long offset, boolean isKey) {
return brokenRecordFileName(filePrefix(appName, table, partition), offset, isKey);
}

/**
* generate file name for broken data
*
Expand All @@ -78,11 +46,38 @@ static String brokenRecordFileName(String prefix, long offset, boolean isKey) {
*
* @param appName connector name
* @param table table name
* @param topic topic name
* @param partition partition index
* @return file prefix
*/
static String filePrefix(String appName, String table, int partition) {
return appName + "/" + table + "/" + partition + "/";
static String filePrefix(String appName, String table, String topic, int partition) {
if (partition >= 0x8000) {
throw new IllegalArgumentException(
String.format("partition id=%d is too large (max=%d)", partition, 0x8000));
}
return appName + "/" + table + "/" + calculatePartitionPart(topic, partition) + "/";
}

private static BigInteger calculatePartitionPart(String topic, int partition) {
BigInteger partitionPart = BigInteger.valueOf(partition);
if (!Strings.isNullOrEmpty(topic)) {
// if topic is provided as part of the file prefix,
// 1. lets calculate stable hash code out of it,
// 2. bit shift it by 16 bits left,
// 3. add 0x8000 (light up 15th bit as a marker)
// 4. add partition id (which should in production use cases never reach a value above 5.000
// partitions pers topic).
// In theory - we would support 32767 partitions, which is more than any reasonable value for
// a single topic
byte[] bytes = topic.toUpperCase().getBytes(StandardCharsets.UTF_8);
BigInteger hash = BigInteger.valueOf(Crc32C.compute(bytes, 0, bytes.length));
partitionPart =
hash.abs()
.multiply(BigInteger.valueOf(0x10000))
.add(BigInteger.valueOf(0x8000))
.add(partitionPart);
}
return partitionPart;
}

// applicationName/tableName/partitionNumber
Expand Down Expand Up @@ -136,18 +131,8 @@ static long fileNameToTimeIngested(String fileName) {
* @return partition index
*/
static int fileNameToPartition(String fileName) {
return Integer.parseInt(readFromFileName(fileName, 1));
}

/**
* check whether the given file is expired
*
* @param fileName file name
* @return true if expired, otherwise false
*/
static boolean isFileExpired(String fileName) {
return System.currentTimeMillis() - fileNameToTimeIngested(fileName)
> InternalUtils.MAX_RECOVERY_TIME;
BigInteger value = new BigInteger(readFromFileName(fileName, 1));
return value.and(BigInteger.valueOf(0x7FFF)).intValue();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,19 @@ private SnowflakeSinkServiceBuilder(
if (useStageFilesProcessor) {
svc.enableStageFilesProcessor(threadCount);
}

boolean extendedStageFileNameFix =
SnowflakeSinkConnectorConfig.SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED_DEFAULT;
if (connectorConfig != null
&& connectorConfig.containsKey(
SnowflakeSinkConnectorConfig.SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED)) {
extendedStageFileNameFix =
Boolean.parseBoolean(
connectorConfig.get(
SnowflakeSinkConnectorConfig
.SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED));
}
svc.configureSingleTableLoadFromMultipleTopics(extendedStageFileNameFix);
} else {
this.service = new SnowflakeSinkServiceV2(conn, connectorConfig);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.snowflake.kafka.connector.internal;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED;
import static com.snowflake.kafka.connector.config.TopicToTableModeExtractor.determineTopic2TableMode;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_RECORD_COUNT;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_SIZE_BYTES;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_SUB_DOMAIN;
Expand All @@ -8,8 +10,10 @@
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.config.TopicToTableModeExtractor;
import com.snowflake.kafka.connector.internal.metrics.MetricsJmxReporter;
import com.snowflake.kafka.connector.internal.metrics.MetricsUtil;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryPipeCreation;
Expand All @@ -25,12 +29,15 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -86,6 +93,13 @@ class SnowflakeSinkServiceV1 implements SnowflakeSinkService {
private boolean useStageFilesProcessor = false;
@Nullable private ScheduledExecutorService cleanerServiceExecutor;

// if enabled, the prefix for stage files for a given table will contain information about source
// topic hashcode. This is required in scenarios when multiple topics are configured to ingest
// data into a single table.
private boolean enableStageFilePrefixExtension = false;

private final Set<String> perTableWarningNotifications = new HashSet<>();

SnowflakeSinkServiceV1(SnowflakeConnectionService conn) {
if (conn == null || conn.isClosed()) {
throw SnowflakeErrors.ERROR_5010.getException();
Expand Down Expand Up @@ -129,9 +143,32 @@ public void startPartition(final String tableName, final TopicPartition topicPar
tableName,
stageName,
pipeName,
topicPartition.topic(),
conn,
topicPartition.partition(),
cleanerServiceExecutor));

if (enableStageFilePrefixExtension
&& TopicToTableModeExtractor.determineTopic2TableMode(
topic2TableMap, topicPartition.topic())
== TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE) {
// if snowflake.snowpipe.stageFileNameExtensionEnabled is enabled and table is used by
// multiple topics, we may end up in a situation, when data from different topics may have
// ended up in the same bucket - after enabling this fix, that data will stay on stage
// forever - we want to give user information about such situation and we will list all
// files, which wouldn't be processed by connector anymore.
String key = String.format("%s-%d", tableName, topicPartition.partition());
synchronized (perTableWarningNotifications) {
if (!perTableWarningNotifications.contains(key)) {
perTableWarningNotifications.add(key);
ForkJoinPool.commonPool()
.submit(
() ->
checkTableStageForObsoleteFiles(
stageName, tableName, topicPartition.partition()));
}
}
}
}
}

Expand Down Expand Up @@ -370,6 +407,37 @@ protected static String getNameIndex(String topic, int partition) {
return topic + "_" + partition;
}

public void configureSingleTableLoadFromMultipleTopics(boolean fixEnabled) {
enableStageFilePrefixExtension = fixEnabled;
}

/**
* util method, checks if there are stage files present matching "appName/table/partition/" file
* name format, if they are - lists them and asks user to manually delete them. The file format
* for tables used by multiple topics is "appName/table/{hashOf(tableName) << 16 | 0x8000 |
* partition}/"
*/
private void checkTableStageForObsoleteFiles(String stageName, String tableName, int partition) {
try {
String prefix = FileNameUtils.filePrefix(conn.getConnectorName(), tableName, null, partition);
List<String> stageFiles = conn.listStage(stageName, prefix);
if (!stageFiles.isEmpty()) {
LOGGER.warn(
"NOTE: For table {} there are {} files matching {} prefix.",
tableName,
stageFiles.size(),
prefix);
stageFiles.sort(String::compareToIgnoreCase);
LOGGER.warn("Please consider manually deleting these files:");
for (List<String> names : Lists.partition(stageFiles, 10)) {
LOGGER.warn(String.join(", ", names));
}
}
} catch (Exception err) {
LOGGER.warn("could not query stage - {}<{}>", err.getMessage(), err.getClass().getName());
}
}

private class ServiceContext {
private final String tableName;
private final String stageName;
Expand Down Expand Up @@ -419,6 +487,7 @@ private ServiceContext(
String tableName,
String stageName,
String pipeName,
String topicName,
SnowflakeConnectionService conn,
int partition,
ScheduledExecutorService v2CleanerExecutor) {
Expand All @@ -430,7 +499,30 @@ private ServiceContext(
this.cleanerFileNames = new LinkedList<>();
this.buffer = new SnowpipeBuffer();
this.ingestionService = conn.buildIngestService(stageName, pipeName);
this.prefix = FileNameUtils.filePrefix(conn.getConnectorName(), tableName, partition);
// SNOW-1642799 = if multiple topics load data into single table, we need to ensure prefix is
// unique per table - otherwise, file cleaners for different channels may run into race
// condition
TopicToTableModeExtractor.Topic2TableMode mode =
determineTopic2TableMode(topic2TableMap, topicName);
if (mode == TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE
&& !enableStageFilePrefixExtension) {
LOGGER.warn(
"The table {} is used as ingestion target by multiple topics - including this one"
+ " '{}'.\n"
+ "To prevent potential data loss consider setting"
+ " '"
+ SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED
+ "' to true",
topicName,
tableName);
}
if (mode == TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE
&& enableStageFilePrefixExtension) {
this.prefix =
FileNameUtils.filePrefix(conn.getConnectorName(), tableName, topicName, partition);
} else {
this.prefix = FileNameUtils.filePrefix(conn.getConnectorName(), tableName, "", partition);
}
this.processedOffset = new AtomicLong(-1);
this.flushedOffset = new AtomicLong(-1);
this.committedOffset = new AtomicLong(0);
Expand Down Expand Up @@ -466,6 +558,8 @@ private ServiceContext(
tableName,
stageName,
prefix,
topicName,
partition,
conn,
ingestionService,
pipeStatus,
Expand Down
Loading

0 comments on commit 6c2ce0e

Please sign in to comment.