Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1859651 Salt prefix for files mapped from different topics #1035

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 62 additions & 5 deletions src/main/java/com/snowflake/kafka/connector/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,37 @@ public static boolean isSnowpipeStreamingIngestion(Map<String, String> config) {
return !isSnowpipeIngestion(config);
}

/**
* Class for returned GeneratedName. isNameFromMap equal to True indicates that the name was
* resolved by using the map passed to appropriate function. {@link
* Utils#generateTableName(String, Map)}
*/
public static class GeneratedName {
private final String name;
private final boolean isNameFromMap;

private GeneratedName(String name, boolean isNameFromMap) {
this.name = name;
this.isNameFromMap = isNameFromMap;
}

private static GeneratedName fromMap(String name) {
return new GeneratedName(name, true);
}

public static GeneratedName generated(String name) {
return new GeneratedName(name, false);
}

public String getName() {
return name;
}

public boolean isNameFromMap() {
return isNameFromMap;
}
}

/**
* modify invalid application name in config and return the generated application name
*
Expand All @@ -438,7 +469,7 @@ public static boolean isSnowpipeStreamingIngestion(Map<String, String> config) {
public static void convertAppName(Map<String, String> config) {
String appName = config.getOrDefault(SnowflakeSinkConnectorConfig.NAME, "");
// If appName is empty the following call will throw error
String validAppName = generateValidName(appName, new HashMap<String, String>());
String validAppName = generateValidName(appName, new HashMap<>());

config.put(SnowflakeSinkConnectorConfig.NAME, validAppName);
}
Expand All @@ -454,6 +485,20 @@ public static String tableName(String topic, Map<String, String> topic2table) {
return generateValidName(topic, topic2table);
}

/**
* Verify topic name and generate a valid table name. The returned GeneratedName has a flag
* isNameFromMap that indicates if the name was retrieved from the passed topic2table map which
* has particular outcomes for the SnowflakeSinkServiceV1
*
* @param topic input topic name
* @param topic2table topic to table map
* @return return GeneratedName with valid table name and a flag whether the name was taken from
* the topic2table
*/
public static GeneratedName generateTableName(String topic, Map<String, String> topic2table) {
return generateValidNameFromMap(topic, topic2table);
}

/**
* verify topic name, and generate valid table/application name
*
Expand All @@ -462,23 +507,35 @@ public static String tableName(String topic, Map<String, String> topic2table) {
* @return valid table/application name
*/
public static String generateValidName(String topic, Map<String, String> topic2table) {
return generateValidNameFromMap(topic, topic2table).name;
}

/**
* verify topic name, and generate valid table/application name
*
* @param topic input topic name
* @param topic2table topic to table map
* @return valid generated table/application name
*/
private static GeneratedName generateValidNameFromMap(
String topic, Map<String, String> topic2table) {
final String PLACE_HOLDER = "_";
if (topic == null || topic.isEmpty()) {
throw SnowflakeErrors.ERROR_0020.getException("topic name: " + topic);
}
if (topic2table.containsKey(topic)) {
return topic2table.get(topic);
return GeneratedName.fromMap(topic2table.get(topic));
}

// try matching regex tables
for (String regexTopic : topic2table.keySet()) {
if (topic.matches(regexTopic)) {
return topic2table.get(regexTopic);
return GeneratedName.fromMap(topic2table.get(regexTopic));
}
}

if (Utils.isValidSnowflakeObjectIdentifier(topic)) {
return topic;
return GeneratedName.generated(topic);
}
int hash = Math.abs(topic.hashCode());

Expand Down Expand Up @@ -507,7 +564,7 @@ public static String generateValidName(String topic, Map<String, String> topic2t
result.append(PLACE_HOLDER);
result.append(hash);

return result.toString();
return GeneratedName.generated(result.toString());
}

public static Map<String, String> parseTopicToTableMap(String input) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ private static BigInteger calculatePartitionPart(String topic, int partition) {
BigInteger partitionPart = BigInteger.valueOf(partition);
if (!Strings.isNullOrEmpty(topic)) {
// if topic is provided as part of the file prefix,
// 1. lets calculate stable hash code out of it,
// 1. let's calculate stable hash code out of it,
// 2. bit shift it by 16 bits left,
// 3. add 0x8000 (light up 15th bit as a marker)
// 4. add partition id (which should in production use cases never reach a value above 5.000
// partitions pers topic).
// partitions per topic).
// In theory - we would support 32767 partitions, which is more than any reasonable value for
// a single topic
byte[] bytes = topic.toUpperCase().getBytes(StandardCharsets.UTF_8);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.snowflake.kafka.connector.internal;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED;
import static com.snowflake.kafka.connector.config.TopicToTableModeExtractor.determineTopic2TableMode;
import static com.snowflake.kafka.connector.internal.FileNameUtils.searchForMissingOffsets;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_RECORD_COUNT;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_SIZE_BYTES;
Expand Down Expand Up @@ -131,6 +130,17 @@ class SnowflakeSinkServiceV1 implements SnowflakeSinkService {
*/
@Override
public void startPartition(final String tableName, final TopicPartition topicPartition) {
Utils.GeneratedName generatedTableName =
Utils.generateTableName(topicPartition.topic(), topic2TableMap);
if (!tableName.equals(generatedTableName.getName())) {
LOGGER.warn(
"tableNames do not match, this is acceptable in tests but not in production! Resorting to"
+ " originalName and assuming no potential clashes on file prefixes. original={},"
+ " recalculated={}",
tableName,
generatedTableName.getName());
generatedTableName = Utils.GeneratedName.generated(tableName);
}
String stageName = Utils.stageName(conn.getConnectorName(), tableName);
String nameIndex = getNameIndex(topicPartition.topic(), topicPartition.partition());
if (pipes.containsKey(nameIndex)) {
Expand All @@ -142,7 +152,7 @@ public void startPartition(final String tableName, final TopicPartition topicPar
pipes.put(
nameIndex,
new ServiceContext(
tableName,
generatedTableName,
stageName,
pipeName,
topicPartition.topic(),
Expand Down Expand Up @@ -486,44 +496,38 @@ private class ServiceContext {
private boolean forceCleanerFileReset = false;

private ServiceContext(
String tableName,
Utils.GeneratedName generatedTableName,
String stageName,
String pipeName,
String topicName,
SnowflakeConnectionService conn,
int partition,
ScheduledExecutorService v2CleanerExecutor) {
this.pipeName = pipeName;
this.tableName = tableName;
this.tableName = generatedTableName.getName();
this.stageName = stageName;
this.conn = conn;
this.fileNames = new LinkedList<>();
this.cleanerFileNames = new LinkedList<>();
this.buffer = new SnowpipeBuffer();
this.ingestionService = conn.buildIngestService(stageName, pipeName);
// SNOW-1642799 = if multiple topics load data into single table, we need to ensure prefix is
// unique per table - otherwise, file cleaners for different channels may run into race
// condition
TopicToTableModeExtractor.Topic2TableMode mode =
determineTopic2TableMode(topic2TableMap, topicName);
if (mode == TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE
&& !enableStageFilePrefixExtension) {
// SNOW-1642799 = if multiple topics load data into single table, we need to ensure the file
// prefix is unique per topic - otherwise, file cleaners for different topics will try to
// clean the same prefixed files creating a race condition and a potential to delete
// not yet ingested files created by another topic
if (generatedTableName.isNameFromMap() && !enableStageFilePrefixExtension) {
LOGGER.warn(
"The table {} is used as ingestion target by multiple topics - including this one"
+ " '{}'.\n"
+ "To prevent potential data loss consider setting"
+ " '"
+ SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED
+ "' to true",
"The table {} may be used as ingestion target by multiple topics - including this one"
+ " '{}'.\nTo prevent potential data loss consider setting '{}' to true",
tableName,
topicName,
tableName);
SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED);
}
if (mode == TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE
&& enableStageFilePrefixExtension) {
{
final String topicForPrefix =
generatedTableName.isNameFromMap() && enableStageFilePrefixExtension ? topicName : "";
this.prefix =
FileNameUtils.filePrefix(conn.getConnectorName(), tableName, topicName, partition);
} else {
this.prefix = FileNameUtils.filePrefix(conn.getConnectorName(), tableName, "", partition);
FileNameUtils.filePrefix(conn.getConnectorName(), tableName, topicForPrefix, partition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, if i read this correctly - we will not generate the salted prefix for all cases but only for a subset of them, right?
why not enable this by default for all?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. There are three benefits I can think of when topic2table.map is not used for particular topic:

  • no chances of regression
  • no leftover files just because the connector has been updated
  • easier to identify partition numbers

}
this.processedOffset = new AtomicLong(-1);
this.flushedOffset = new AtomicLong(-1);
Expand Down
31 changes: 31 additions & 0 deletions src/test/java/com/snowflake/kafka/connector/UtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,37 @@ public void testTableName() {
assert Utils.tableName(topic, topic2table).equals("_12345_" + Math.abs(topic.hashCode()));
}

@Test
public void testGenerateTableName() {
Map<String, String> topic2table = Utils.parseTopicToTableMap("ab@cd:abcd, 1234:_1234");

String topic0 = "ab@cd";
Utils.GeneratedName generatedTableName1 = Utils.generateTableName(topic0, topic2table);
Assert.assertEquals("abcd", generatedTableName1.getName());
Assert.assertTrue(generatedTableName1.isNameFromMap());

String topic1 = "1234";
Utils.GeneratedName generatedTableName2 = Utils.generateTableName(topic1, topic2table);
Assert.assertEquals("_1234", generatedTableName2.getName());
Assert.assertTrue(generatedTableName2.isNameFromMap());

String topic2 = "bc*def";
Utils.GeneratedName generatedTableName3 = Utils.generateTableName(topic2, topic2table);
Assert.assertEquals("bc_def_" + Math.abs(topic2.hashCode()), generatedTableName3.getName());
Assert.assertFalse(generatedTableName3.isNameFromMap());

String topic3 = "12345";
Utils.GeneratedName generatedTableName4 = Utils.generateTableName(topic3, topic2table);
Assert.assertEquals("_12345_" + Math.abs(topic3.hashCode()), generatedTableName4.getName());
Assert.assertFalse(generatedTableName4.isNameFromMap());

TestUtils.assertError(
SnowflakeErrors.ERROR_0020, () -> Utils.generateTableName("", topic2table));
//noinspection DataFlowIssue
TestUtils.assertError(
SnowflakeErrors.ERROR_0020, () -> Utils.generateTableName(null, topic2table));
}

@Test
public void testTableNameRegex() {
String catTable = "cat_table";
Expand Down
Loading