diff --git a/src/main/java/com/snowflake/kafka/connector/Utils.java b/src/main/java/com/snowflake/kafka/connector/Utils.java index 3480de9d2..2a472bafb 100644 --- a/src/main/java/com/snowflake/kafka/connector/Utils.java +++ b/src/main/java/com/snowflake/kafka/connector/Utils.java @@ -430,6 +430,37 @@ public static boolean isSnowpipeStreamingIngestion(Map config) { return !isSnowpipeIngestion(config); } + /** + * Class for returned GeneratedName. isNameFromMap equal to True indicates that the name was + * resolved by using the map passed to appropriate function. {@link + * Utils#generateTableName(String, Map)} + */ + public static class GeneratedName { + private final String name; + private final boolean isNameFromMap; + + private GeneratedName(String name, boolean isNameFromMap) { + this.name = name; + this.isNameFromMap = isNameFromMap; + } + + private static GeneratedName fromMap(String name) { + return new GeneratedName(name, true); + } + + public static GeneratedName generated(String name) { + return new GeneratedName(name, false); + } + + public String getName() { + return name; + } + + public boolean isNameFromMap() { + return isNameFromMap; + } + } + /** * modify invalid application name in config and return the generated application name * @@ -438,7 +469,7 @@ public static boolean isSnowpipeStreamingIngestion(Map config) { public static void convertAppName(Map config) { String appName = config.getOrDefault(SnowflakeSinkConnectorConfig.NAME, ""); // If appName is empty the following call will throw error - String validAppName = generateValidName(appName, new HashMap()); + String validAppName = generateValidName(appName, new HashMap<>()); config.put(SnowflakeSinkConnectorConfig.NAME, validAppName); } @@ -454,6 +485,20 @@ public static String tableName(String topic, Map topic2table) { return generateValidName(topic, topic2table); } + /** + * Verify topic name and generate a valid table name. The returned GeneratedName has a flag + * isNameFromMap that indicates if the name was retrieved from the passed topic2table map which + * has particular outcomes for the SnowflakeSinkServiceV1 + * + * @param topic input topic name + * @param topic2table topic to table map + * @return return GeneratedName with valid table name and a flag whether the name was taken from + * the topic2table + */ + public static GeneratedName generateTableName(String topic, Map topic2table) { + return generateValidNameFromMap(topic, topic2table); + } + /** * verify topic name, and generate valid table/application name * @@ -462,23 +507,35 @@ public static String tableName(String topic, Map topic2table) { * @return valid table/application name */ public static String generateValidName(String topic, Map topic2table) { + return generateValidNameFromMap(topic, topic2table).name; + } + + /** + * verify topic name, and generate valid table/application name + * + * @param topic input topic name + * @param topic2table topic to table map + * @return valid generated table/application name + */ + private static GeneratedName generateValidNameFromMap( + String topic, Map topic2table) { final String PLACE_HOLDER = "_"; if (topic == null || topic.isEmpty()) { throw SnowflakeErrors.ERROR_0020.getException("topic name: " + topic); } if (topic2table.containsKey(topic)) { - return topic2table.get(topic); + return GeneratedName.fromMap(topic2table.get(topic)); } // try matching regex tables for (String regexTopic : topic2table.keySet()) { if (topic.matches(regexTopic)) { - return topic2table.get(regexTopic); + return GeneratedName.fromMap(topic2table.get(regexTopic)); } } if (Utils.isValidSnowflakeObjectIdentifier(topic)) { - return topic; + return GeneratedName.generated(topic); } int hash = Math.abs(topic.hashCode()); @@ -507,7 +564,7 @@ public static String generateValidName(String topic, Map topic2t result.append(PLACE_HOLDER); result.append(hash); - return result.toString(); + return GeneratedName.generated(result.toString()); } public static Map parseTopicToTableMap(String input) { diff --git a/src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java b/src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java index ce594f033..2b10ddaef 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java @@ -66,11 +66,11 @@ private static BigInteger calculatePartitionPart(String topic, int partition) { BigInteger partitionPart = BigInteger.valueOf(partition); if (!Strings.isNullOrEmpty(topic)) { // if topic is provided as part of the file prefix, - // 1. lets calculate stable hash code out of it, + // 1. let's calculate stable hash code out of it, // 2. bit shift it by 16 bits left, // 3. add 0x8000 (light up 15th bit as a marker) // 4. add partition id (which should in production use cases never reach a value above 5.000 - // partitions pers topic). + // partitions per topic). // In theory - we would support 32767 partitions, which is more than any reasonable value for // a single topic byte[] bytes = topic.toUpperCase().getBytes(StandardCharsets.UTF_8); diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java index 16f027636..24869a88d 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java @@ -1,7 +1,6 @@ package com.snowflake.kafka.connector.internal; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED; -import static com.snowflake.kafka.connector.config.TopicToTableModeExtractor.determineTopic2TableMode; import static com.snowflake.kafka.connector.internal.FileNameUtils.searchForMissingOffsets; import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_RECORD_COUNT; import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_SIZE_BYTES; @@ -131,6 +130,17 @@ class SnowflakeSinkServiceV1 implements SnowflakeSinkService { */ @Override public void startPartition(final String tableName, final TopicPartition topicPartition) { + Utils.GeneratedName generatedTableName = + Utils.generateTableName(topicPartition.topic(), topic2TableMap); + if (!tableName.equals(generatedTableName.getName())) { + LOGGER.warn( + "tableNames do not match, this is acceptable in tests but not in production! Resorting to" + + " originalName and assuming no potential clashes on file prefixes. original={}," + + " recalculated={}", + tableName, + generatedTableName.getName()); + generatedTableName = Utils.GeneratedName.generated(tableName); + } String stageName = Utils.stageName(conn.getConnectorName(), tableName); String nameIndex = getNameIndex(topicPartition.topic(), topicPartition.partition()); if (pipes.containsKey(nameIndex)) { @@ -142,7 +152,7 @@ public void startPartition(final String tableName, final TopicPartition topicPar pipes.put( nameIndex, new ServiceContext( - tableName, + generatedTableName, stageName, pipeName, topicPartition.topic(), @@ -486,7 +496,7 @@ private class ServiceContext { private boolean forceCleanerFileReset = false; private ServiceContext( - String tableName, + Utils.GeneratedName generatedTableName, String stageName, String pipeName, String topicName, @@ -494,36 +504,30 @@ private ServiceContext( int partition, ScheduledExecutorService v2CleanerExecutor) { this.pipeName = pipeName; - this.tableName = tableName; + this.tableName = generatedTableName.getName(); this.stageName = stageName; this.conn = conn; this.fileNames = new LinkedList<>(); this.cleanerFileNames = new LinkedList<>(); this.buffer = new SnowpipeBuffer(); this.ingestionService = conn.buildIngestService(stageName, pipeName); - // SNOW-1642799 = if multiple topics load data into single table, we need to ensure prefix is - // unique per table - otherwise, file cleaners for different channels may run into race - // condition - TopicToTableModeExtractor.Topic2TableMode mode = - determineTopic2TableMode(topic2TableMap, topicName); - if (mode == TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE - && !enableStageFilePrefixExtension) { + // SNOW-1642799 = if multiple topics load data into single table, we need to ensure the file + // prefix is unique per topic - otherwise, file cleaners for different topics will try to + // clean the same prefixed files creating a race condition and a potential to delete + // not yet ingested files created by another topic + if (generatedTableName.isNameFromMap() && !enableStageFilePrefixExtension) { LOGGER.warn( - "The table {} is used as ingestion target by multiple topics - including this one" - + " '{}'.\n" - + "To prevent potential data loss consider setting" - + " '" - + SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED - + "' to true", + "The table {} may be used as ingestion target by multiple topics - including this one" + + " '{}'.\nTo prevent potential data loss consider setting '{}' to true", + tableName, topicName, - tableName); + SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED); } - if (mode == TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE - && enableStageFilePrefixExtension) { + { + final String topicForPrefix = + generatedTableName.isNameFromMap() && enableStageFilePrefixExtension ? topicName : ""; this.prefix = - FileNameUtils.filePrefix(conn.getConnectorName(), tableName, topicName, partition); - } else { - this.prefix = FileNameUtils.filePrefix(conn.getConnectorName(), tableName, "", partition); + FileNameUtils.filePrefix(conn.getConnectorName(), tableName, topicForPrefix, partition); } this.processedOffset = new AtomicLong(-1); this.flushedOffset = new AtomicLong(-1); diff --git a/src/test/java/com/snowflake/kafka/connector/UtilsTest.java b/src/test/java/com/snowflake/kafka/connector/UtilsTest.java index 4a21ee177..41e434154 100644 --- a/src/test/java/com/snowflake/kafka/connector/UtilsTest.java +++ b/src/test/java/com/snowflake/kafka/connector/UtilsTest.java @@ -112,6 +112,37 @@ public void testTableName() { assert Utils.tableName(topic, topic2table).equals("_12345_" + Math.abs(topic.hashCode())); } + @Test + public void testGenerateTableName() { + Map topic2table = Utils.parseTopicToTableMap("ab@cd:abcd, 1234:_1234"); + + String topic0 = "ab@cd"; + Utils.GeneratedName generatedTableName1 = Utils.generateTableName(topic0, topic2table); + Assert.assertEquals("abcd", generatedTableName1.getName()); + Assert.assertTrue(generatedTableName1.isNameFromMap()); + + String topic1 = "1234"; + Utils.GeneratedName generatedTableName2 = Utils.generateTableName(topic1, topic2table); + Assert.assertEquals("_1234", generatedTableName2.getName()); + Assert.assertTrue(generatedTableName2.isNameFromMap()); + + String topic2 = "bc*def"; + Utils.GeneratedName generatedTableName3 = Utils.generateTableName(topic2, topic2table); + Assert.assertEquals("bc_def_" + Math.abs(topic2.hashCode()), generatedTableName3.getName()); + Assert.assertFalse(generatedTableName3.isNameFromMap()); + + String topic3 = "12345"; + Utils.GeneratedName generatedTableName4 = Utils.generateTableName(topic3, topic2table); + Assert.assertEquals("_12345_" + Math.abs(topic3.hashCode()), generatedTableName4.getName()); + Assert.assertFalse(generatedTableName4.isNameFromMap()); + + TestUtils.assertError( + SnowflakeErrors.ERROR_0020, () -> Utils.generateTableName("", topic2table)); + //noinspection DataFlowIssue + TestUtils.assertError( + SnowflakeErrors.ERROR_0020, () -> Utils.generateTableName(null, topic2table)); + } + @Test public void testTableNameRegex() { String catTable = "cat_table";