Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1859651 Salt prefix for files mapped from different topics #1035

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 54 additions & 5 deletions src/main/java/com/snowflake/kafka/connector/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,29 @@ public static boolean isSnowpipeStreamingIngestion(Map<String, String> config) {
return !isSnowpipeIngestion(config);
}

/**
* Class for returned GeneratedName. isNameFromMap equal to True indicates that the name was
* resolved by using the map passed to appropriate function. {@link
* Utils#generateTableName(String, Map)}
*/
public static class GeneratedName {
public final String name;
public final boolean isNameFromMap;
sfc-gh-dseweryn marked this conversation as resolved.
Show resolved Hide resolved

private GeneratedName(String name, boolean isNameFromMap) {
this.name = name;
this.isNameFromMap = isNameFromMap;
}

private static GeneratedName fromMap(String name) {
return new GeneratedName(name, true);
}

private static GeneratedName generated(String name) {
return new GeneratedName(name, false);
}
}

/**
* modify invalid application name in config and return the generated application name
*
Expand All @@ -438,7 +461,7 @@ public static boolean isSnowpipeStreamingIngestion(Map<String, String> config) {
public static void convertAppName(Map<String, String> config) {
String appName = config.getOrDefault(SnowflakeSinkConnectorConfig.NAME, "");
// If appName is empty the following call will throw error
String validAppName = generateValidName(appName, new HashMap<String, String>());
String validAppName = generateValidName(appName, new HashMap<>());

config.put(SnowflakeSinkConnectorConfig.NAME, validAppName);
}
Expand All @@ -454,6 +477,20 @@ public static String tableName(String topic, Map<String, String> topic2table) {
return generateValidName(topic, topic2table);
}

/**
* Verify topic name and generate a valid table name. The returned GeneratedName has a flag
* isNameFromMap that indicates if the name was retrieved from the passed topic2table map which
* has particular outcomes for the SnowflakeSinkServiceV1
*
* @param topic input topic name
* @param topic2table topic to table map
* @return return GeneratedName with valid table name and a flag whether the name was taken from
* the topic2table
*/
public static GeneratedName generateTableName(String topic, Map<String, String> topic2table) {
return generateValidNameFromMap(topic, topic2table);
}

/**
* verify topic name, and generate valid table/application name
*
Expand All @@ -462,23 +499,35 @@ public static String tableName(String topic, Map<String, String> topic2table) {
* @return valid table/application name
*/
public static String generateValidName(String topic, Map<String, String> topic2table) {
return generateValidNameFromMap(topic, topic2table).name;
}

/**
* verify topic name, and generate valid table/application name
*
* @param topic input topic name
* @param topic2table topic to table map
* @return valid generated table/application name
*/
private static GeneratedName generateValidNameFromMap(
String topic, Map<String, String> topic2table) {
final String PLACE_HOLDER = "_";
if (topic == null || topic.isEmpty()) {
throw SnowflakeErrors.ERROR_0020.getException("topic name: " + topic);
}
if (topic2table.containsKey(topic)) {
return topic2table.get(topic);
return GeneratedName.fromMap(topic2table.get(topic));
}

// try matching regex tables
for (String regexTopic : topic2table.keySet()) {
if (topic.matches(regexTopic)) {
return topic2table.get(regexTopic);
return GeneratedName.fromMap(topic2table.get(regexTopic));
}
}

if (Utils.isValidSnowflakeObjectIdentifier(topic)) {
return topic;
return GeneratedName.generated(topic);
}
int hash = Math.abs(topic.hashCode());

Expand Down Expand Up @@ -507,7 +556,7 @@ public static String generateValidName(String topic, Map<String, String> topic2t
result.append(PLACE_HOLDER);
result.append(hash);

return result.toString();
return GeneratedName.generated(result.toString());
}

public static Map<String, String> parseTopicToTableMap(String input) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ private static BigInteger calculatePartitionPart(String topic, int partition) {
BigInteger partitionPart = BigInteger.valueOf(partition);
if (!Strings.isNullOrEmpty(topic)) {
// if topic is provided as part of the file prefix,
// 1. lets calculate stable hash code out of it,
// 1. let's calculate stable hash code out of it,
// 2. bit shift it by 16 bits left,
// 3. add 0x8000 (light up 15th bit as a marker)
// 4. add partition id (which should in production use cases never reach a value above 5.000
// partitions pers topic).
// partitions per topic).
// In theory - we would support 32767 partitions, which is more than any reasonable value for
// a single topic
byte[] bytes = topic.toUpperCase().getBytes(StandardCharsets.UTF_8);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.snowflake.kafka.connector.internal;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED;
import static com.snowflake.kafka.connector.config.TopicToTableModeExtractor.determineTopic2TableMode;
import static com.snowflake.kafka.connector.internal.FileNameUtils.searchForMissingOffsets;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_RECORD_COUNT;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_SIZE_BYTES;
Expand Down Expand Up @@ -126,11 +125,19 @@ class SnowflakeSinkServiceV1 implements SnowflakeSinkService {
* Create new ingestion task from existing table and stage, tries to reuse existing pipe and
* recover previous task, otherwise, create a new pipe.
*
* @param tableName destination table name in Snowflake
* @param ignoredTableName destination table name in Snowflake. Is ignored and recalculated to
* accommodate proper cleaning of staged files.
* @param topicPartition TopicPartition passed from Kafka
*/
@Override
public void startPartition(final String tableName, final TopicPartition topicPartition) {
public void startPartition(final String ignoredTableName, final TopicPartition topicPartition) {
Utils.GeneratedName generatedTableName =
Utils.generateTableName(topicPartition.topic(), topic2TableMap);
final String tableName = generatedTableName.name;
if (!tableName.equals(ignoredTableName)) {
LOGGER.warn(
"tableNames do not match: original={}, recalculated={}", ignoredTableName, tableName);
}
String stageName = Utils.stageName(conn.getConnectorName(), tableName);
String nameIndex = getNameIndex(topicPartition.topic(), topicPartition.partition());
if (pipes.containsKey(nameIndex)) {
Expand All @@ -142,7 +149,7 @@ public void startPartition(final String tableName, final TopicPartition topicPar
pipes.put(
nameIndex,
new ServiceContext(
tableName,
generatedTableName,
stageName,
pipeName,
topicPartition.topic(),
Expand Down Expand Up @@ -486,40 +493,34 @@ private class ServiceContext {
private boolean forceCleanerFileReset = false;

private ServiceContext(
String tableName,
Utils.GeneratedName generatedTableName,
String stageName,
String pipeName,
String topicName,
SnowflakeConnectionService conn,
int partition,
ScheduledExecutorService v2CleanerExecutor) {
this.pipeName = pipeName;
this.tableName = tableName;
this.tableName = generatedTableName.name;
this.stageName = stageName;
this.conn = conn;
this.fileNames = new LinkedList<>();
this.cleanerFileNames = new LinkedList<>();
this.buffer = new SnowpipeBuffer();
this.ingestionService = conn.buildIngestService(stageName, pipeName);
// SNOW-1642799 = if multiple topics load data into single table, we need to ensure prefix is
// unique per table - otherwise, file cleaners for different channels may run into race
// condition
TopicToTableModeExtractor.Topic2TableMode mode =
determineTopic2TableMode(topic2TableMap, topicName);
if (mode == TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE
&& !enableStageFilePrefixExtension) {
// SNOW-1642799 = if multiple topics load data into single table, we need to ensure the file
// prefix is unique per topic - otherwise, file cleaners for different topics will try to
// clean the same prefixed files creating a race condition and a potential to delete
// not yet ingested files created by another topic
if (generatedTableName.isNameFromMap && !enableStageFilePrefixExtension) {
LOGGER.warn(
"The table {} is used as ingestion target by multiple topics - including this one"
+ " '{}'.\n"
+ "To prevent potential data loss consider setting"
+ " '"
+ SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED
+ "' to true",
"The table {} may be used as ingestion target by multiple topics - including this one"
+ " '{}'.\nTo prevent potential data loss consider setting '{}' to true",
tableName,
topicName,
tableName);
SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED);
}
if (mode == TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE
&& enableStageFilePrefixExtension) {
if (generatedTableName.isNameFromMap && enableStageFilePrefixExtension) {
this.prefix =
FileNameUtils.filePrefix(conn.getConnectorName(), tableName, topicName, partition);
} else {
Expand Down
31 changes: 31 additions & 0 deletions src/test/java/com/snowflake/kafka/connector/UtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,37 @@ public void testTableName() {
assert Utils.tableName(topic, topic2table).equals("_12345_" + Math.abs(topic.hashCode()));
}

@Test
public void testGenerateTableName() {
Map<String, String> topic2table = Utils.parseTopicToTableMap("ab@cd:abcd, 1234:_1234");

String topic0 = "ab@cd";
Utils.GeneratedName generatedTableName1 = Utils.generateTableName(topic0, topic2table);
Assert.assertEquals("abcd", generatedTableName1.name);
Assert.assertTrue(generatedTableName1.isNameFromMap);

String topic1 = "1234";
Utils.GeneratedName generatedTableName2 = Utils.generateTableName(topic1, topic2table);
Assert.assertEquals("_1234", generatedTableName2.name);
Assert.assertTrue(generatedTableName2.isNameFromMap);

String topic2 = "bc*def";
Utils.GeneratedName generatedTableName3 = Utils.generateTableName(topic2, topic2table);
Assert.assertEquals("bc_def_" + Math.abs(topic2.hashCode()), generatedTableName3.name);
Assert.assertFalse(generatedTableName3.isNameFromMap);

String topic3 = "12345";
Utils.GeneratedName generatedTableName4 = Utils.generateTableName(topic3, topic2table);
Assert.assertEquals("_12345_" + Math.abs(topic3.hashCode()), generatedTableName4.name);
Assert.assertFalse(generatedTableName4.isNameFromMap);

TestUtils.assertError(
SnowflakeErrors.ERROR_0020, () -> Utils.generateTableName("", topic2table));
//noinspection DataFlowIssue
TestUtils.assertError(
SnowflakeErrors.ERROR_0020, () -> Utils.generateTableName(null, topic2table));
}

@Test
public void testTableNameRegex() {
String catTable = "cat_table";
Expand Down
Loading