From 3ec125fe24738cf302d9427d300c4705a818222c Mon Sep 17 00:00:00 2001 From: Michal Bobowski Date: Wed, 18 Dec 2024 14:04:45 +0100 Subject: [PATCH] SNOW-1859651 Topic2TableMode for patterns --- .../config/TopicToTableModeExtractor.java | 13 +++++++ .../ConnectorConfigValidatorTest.java | 38 +++++++++++++------ 2 files changed, 39 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/config/TopicToTableModeExtractor.java b/src/main/java/com/snowflake/kafka/connector/config/TopicToTableModeExtractor.java index 7108aacb7..44a32bc74 100644 --- a/src/main/java/com/snowflake/kafka/connector/config/TopicToTableModeExtractor.java +++ b/src/main/java/com/snowflake/kafka/connector/config/TopicToTableModeExtractor.java @@ -2,9 +2,13 @@ import com.snowflake.kafka.connector.Utils; import java.util.Map; +import java.util.regex.Pattern; public class TopicToTableModeExtractor { + private static Pattern topicRegexPattern = + Pattern.compile("\\[([0-9]-[0-9]|[a-z]-[a-z]|[A-Z]-[A-Z]|[!-@])\\][*+]?"); + /** Defines whether single target table is fed by one or many source topics. */ public enum Topic2TableMode { // Single topic = single table @@ -26,6 +30,15 @@ private TopicToTableModeExtractor() {} */ public static Topic2TableMode determineTopic2TableMode( Map topic2TableMap, String topic) { + + boolean anyTopicInMapIsRegex = + topic2TableMap.keySet().stream() + .anyMatch(topic2TableMapKey -> topicRegexPattern.matcher(topic2TableMapKey).find()); + + if (anyTopicInMapIsRegex) { + return Topic2TableMode.MANY_TOPICS_SINGLE_TABLE; + } + String tableName = Utils.tableName(topic, topic2TableMap); return topic2TableMap.values().stream() .filter(table -> table.equalsIgnoreCase(tableName)) diff --git a/src/test/java/com/snowflake/kafka/connector/ConnectorConfigValidatorTest.java b/src/test/java/com/snowflake/kafka/connector/ConnectorConfigValidatorTest.java index f0222b7ff..3c94984b6 100644 --- a/src/test/java/com/snowflake/kafka/connector/ConnectorConfigValidatorTest.java +++ b/src/test/java/com/snowflake/kafka/connector/ConnectorConfigValidatorTest.java @@ -2,6 +2,8 @@ import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.*; import static com.snowflake.kafka.connector.Utils.HTTP_NON_PROXY_HOSTS; +import static com.snowflake.kafka.connector.config.TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE; +import static com.snowflake.kafka.connector.config.TopicToTableModeExtractor.Topic2TableMode.SINGLE_TOPIC_SINGLE_TABLE; import static com.snowflake.kafka.connector.internal.TestUtils.getConfig; import static org.assertj.core.api.Assertions.*; import static org.junit.Assert.assertEquals; @@ -221,18 +223,30 @@ public void testNameMapCovered() { connectorConfigValidator.validateConfig(config); } - @Test - public void testTopic2TableCorrectlyDeterminesMode() { - Map config = getConfig(); - config.put(TOPICS_TABLES_MAP, "src1:target1,src2:target2,src3:target1"); - connectorConfigValidator.validateConfig(config); - Map topic2Table = Utils.parseTopicToTableMap(config.get(TOPICS_TABLES_MAP)); - assertThat(TopicToTableModeExtractor.determineTopic2TableMode(topic2Table, "src1")) - .isEqualTo(TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE); - assertThat(TopicToTableModeExtractor.determineTopic2TableMode(topic2Table, "src2")) - .isEqualTo(TopicToTableModeExtractor.Topic2TableMode.SINGLE_TOPIC_SINGLE_TABLE); - assertThat(TopicToTableModeExtractor.determineTopic2TableMode(topic2Table, "src3")) - .isEqualTo(TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE); + @ParameterizedTest + @MethodSource("topicToTableTestData") + public void testTopic2TableCorrectlyDeterminesMode( + String topicToTable, String topic, TopicToTableModeExtractor.Topic2TableMode expected) { + // given + Map topic2Table = Utils.parseTopicToTableMap(topicToTable); + + // when + TopicToTableModeExtractor.Topic2TableMode actual = + TopicToTableModeExtractor.determineTopic2TableMode(topic2Table, topic); + + // then + assertThat(actual).isEqualTo(expected); + } + + public static Stream topicToTableTestData() { + return Stream.of( + Arguments.of("src1:target1,src2:target2,src3:target1", "src1", MANY_TOPICS_SINGLE_TABLE), + Arguments.of("src1:target1,src2:target2,src3:target1", "src2", SINGLE_TOPIC_SINGLE_TABLE), + Arguments.of("topic[0-9]:tableA", "tableA", MANY_TOPICS_SINGLE_TABLE), + Arguments.of("to[0-9]pic:tableA", "tableA", MANY_TOPICS_SINGLE_TABLE), + Arguments.of("[0-9]topic:tableA", "tableA", MANY_TOPICS_SINGLE_TABLE), + Arguments.of("topic[a-z]:tableA", "tableA", MANY_TOPICS_SINGLE_TABLE), + Arguments.of("topic[0-9]:tableA", "randomTopicName", MANY_TOPICS_SINGLE_TABLE)); } @Test