Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1859651 Change stage name prefix for tables matched with regex #1030

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

import com.snowflake.kafka.connector.Utils;
import java.util.Map;
import java.util.regex.Pattern;

public class TopicToTableModeExtractor {

private static Pattern topicRegexPattern =
Pattern.compile("\\[([0-9]-[0-9]|[a-z]-[a-z]|[A-Z]-[A-Z]|[!-@])\\][*+]?");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiosity - do you think this expression could be extended somehow? the initial expression i've came up in the ticket was kinda quick thinking, I'm wonder if there could be other/better approach to tackle that?

One idea - in case we'd figue out we need to extend this - do you think it would make sense to embed this regexp in configuration, provide reasonable default and have possiblity to change it at runtime for some extreme corner cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively I was thinking about simply detecting any of the special characters like [ or ]. But I am not sure if it's better.

If we want to be more defensive with this change I would create a feature flag instead of exposing the regex. The worst case for messing sth up is that cleaner will stop working - it's not good for sure but also not terrible.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imo having a Regex for finding if a String is a multi-matching Regex is not what we want to do. (not only because it is hard and error prone). I think that we should calculate resulting table for the whole set of topics on each rebalance to create a literal topic to table map for the actual set of topics — only then we know if a topic defined in topic2table.map should be a literal topic or a topic-regex, e.g. "topic.name" could be both, literal matching "topic.name" or a regex for "topic[a-z|A-Z|0-9|._-]name".
Having a concrete mapping for the actual topics allows us to give the definite answer whether we have a MANY_TOPICS_SINGLE_TABLE case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do like this idea a lot to undwind all the topics up front to their final form rather than rely on regexes. There are two (at least) corner cases I'm worried about:

  • the order of initialization is sequential - I can see the scenario, when first topic is matched to the regex, is added to the table; at this moment - we have a table containing exactly one topic - so the logic would assume this is a signle topic and there is no need to salt the name with topic's hash.
  • the potential distribution of topics across nodes - if there is a regexp matching say 2 topics and we distribute these topics on 2 different nodes - the map would contain only 1 entry and we are back to square one.

Potentially, we could consider one more option - Michal, correct me if i'm wrong on this one - the connector name is salted with timestamp on every deployment, right? if that's the case - i think we could simply enable the salting logic for file prefixes regardless of the topic2table configuration - the fact that connector's name is changing will also impact the stage prefix, so trying to keep the naming scheme backwards compatible is already invalidated - that's something i've noticed when analysing this cleaner issue.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the order of initialization is sequential - I can see the scenario, when first topic is matched to the regex, is added to the table; at this moment - we have a table containing exactly one topic - so the logic would assume this is a signle topic and there is no need to salt the name with topic's hash.

This is not a problem since we first unwind the topics, then check if we have duplicates.

the potential distribution of topics across nodes - if there is a regexp matching say 2 topics and we distribute these topics on 2 different nodes - the map would contain only 1 entry and we are back to square one.

This is a real problem if we cannot assume that each connector name is unique. Alternatively we could generate a UUID (or something) for each connector start that would not change on rebalances. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you checked the code and that's the logic - than ok. i thought it was running the init in a sequential loop per task.

the second part - this could be affecting the assignor policy and honestly - i doubt we should be doing that.

the more i think about it - the more i'm leaning toward enabling that topic salting regardless of the topic2table detected mode....

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, if the table name was resolved using topic2table map we should salt the file prefix with the topic — otherwise there could be a clash anyway (see example below). This should not break anything but some files could stay as leftovers on the Stage after an upgrade and potentially that data could be duplicated. No data loss though.

Example:
topics=topicA,topicB or topics.regex=^topic[A-Z]$
and
topic2table.map=topicB:topicA
In the above situation, despite having only a single, non-regex entry in topic2table.map we would have a clash on the cleaners. Reversing a regex to check where the clash could happen is not feasible.


/** Defines whether single target table is fed by one or many source topics. */
public enum Topic2TableMode {
// Single topic = single table
Expand All @@ -26,6 +30,15 @@ private TopicToTableModeExtractor() {}
*/
public static Topic2TableMode determineTopic2TableMode(
Map<String, String> topic2TableMap, String topic) {

boolean anyTopicInMapIsRegex =
topic2TableMap.keySet().stream()
.anyMatch(topic2TableMapKey -> topicRegexPattern.matcher(topic2TableMapKey).find());

if (anyTopicInMapIsRegex) {
return Topic2TableMode.MANY_TOPICS_SINGLE_TABLE;
}

String tableName = Utils.tableName(topic, topic2TableMap);
return topic2TableMap.values().stream()
.filter(table -> table.equalsIgnoreCase(tableName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.*;
import static com.snowflake.kafka.connector.Utils.HTTP_NON_PROXY_HOSTS;
import static com.snowflake.kafka.connector.config.TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE;
import static com.snowflake.kafka.connector.config.TopicToTableModeExtractor.Topic2TableMode.SINGLE_TOPIC_SINGLE_TABLE;
import static com.snowflake.kafka.connector.internal.TestUtils.getConfig;
import static org.assertj.core.api.Assertions.*;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -221,18 +223,30 @@ public void testNameMapCovered() {
connectorConfigValidator.validateConfig(config);
}

@Test
public void testTopic2TableCorrectlyDeterminesMode() {
Map<String, String> config = getConfig();
config.put(TOPICS_TABLES_MAP, "src1:target1,src2:target2,src3:target1");
connectorConfigValidator.validateConfig(config);
Map<String, String> topic2Table = Utils.parseTopicToTableMap(config.get(TOPICS_TABLES_MAP));
assertThat(TopicToTableModeExtractor.determineTopic2TableMode(topic2Table, "src1"))
.isEqualTo(TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE);
assertThat(TopicToTableModeExtractor.determineTopic2TableMode(topic2Table, "src2"))
.isEqualTo(TopicToTableModeExtractor.Topic2TableMode.SINGLE_TOPIC_SINGLE_TABLE);
assertThat(TopicToTableModeExtractor.determineTopic2TableMode(topic2Table, "src3"))
.isEqualTo(TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE);
@ParameterizedTest
@MethodSource("topicToTableTestData")
public void testTopic2TableCorrectlyDeterminesMode(
String topicToTable, String topic, TopicToTableModeExtractor.Topic2TableMode expected) {
// given
Map<String, String> topic2Table = Utils.parseTopicToTableMap(topicToTable);

// when
TopicToTableModeExtractor.Topic2TableMode actual =
TopicToTableModeExtractor.determineTopic2TableMode(topic2Table, topic);

// then
assertThat(actual).isEqualTo(expected);
}

public static Stream<Arguments> topicToTableTestData() {
return Stream.of(
Arguments.of("src1:target1,src2:target2,src3:target1", "src1", MANY_TOPICS_SINGLE_TABLE),
Arguments.of("src1:target1,src2:target2,src3:target1", "src2", SINGLE_TOPIC_SINGLE_TABLE),
Arguments.of("topic[0-9]:tableA", "tableA", MANY_TOPICS_SINGLE_TABLE),
Arguments.of("to[0-9]pic:tableA", "tableA", MANY_TOPICS_SINGLE_TABLE),
Arguments.of("[0-9]topic:tableA", "tableA", MANY_TOPICS_SINGLE_TABLE),
Arguments.of("topic[a-z]:tableA", "tableA", MANY_TOPICS_SINGLE_TABLE),
Arguments.of("topic[0-9]:tableA", "randomTopicName", MANY_TOPICS_SINGLE_TABLE));
}

@Test
Expand Down
Loading