Skip to content

Commit

Permalink
Rename config to snowflake.enable.new.channel.name.format
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-japatel committed Oct 23, 2023
1 parent 5806f5f commit 1c11539
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,19 +168,19 @@ public class SnowflakeSinkConnectorConfig {
"Whether to optimize the streaming client to reduce cost. Note that this may affect"
+ " throughput or latency and can only be set if Streaming Snowpipe is enabled";

public static final String SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2 =
"snowflake.enable.streaming.channel.format.v2";
public static final boolean SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2_DEFAULT = false;
public static final String SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT =
"snowflake.enable.new.channel.name.format";
public static final boolean SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DEFAULT = false;

public static final String SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2_DISPLAY =
"Enable Connector Name in Snowpipe Streaming Channel Name - V2 of Channel Name";
public static final String SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DISPLAY =
"Enable Connector Name in Snowpipe Streaming Channel Name";

public static final String SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2_DOC =
public static final String SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DOC =
"Whether to use connector name in streaming channels. If it is set to false, we will not use"
+ " connector name in channel name(Which is version 2 of Channel Name). Note: Please use"
+ " this config cautiously and it is not advised to use this if you are coming from old"
+ " Snowflake Kafka Connector Version where Default Channel Name doesnt contain Connector"
+ " Name, contains Topic Name and Partition # only.";
+ " connector name in channel name(Which is new version of Channel Name). Note: Please"
+ " use this config cautiously and it is not advised to use this if you are coming from"
+ " old Snowflake Kafka Connector Version where Default Channel Name doesnt contain"
+ " Connector Name, contains Topic Name and Partition # only.";

// MDC logging header
public static final String ENABLE_MDC_LOGGING_CONFIG = "enable.mdc.logging";
Expand Down Expand Up @@ -607,15 +607,15 @@ static ConfigDef newConfigDef() {
ConfigDef.Width.NONE,
ENABLE_MDC_LOGGING_DISPLAY)
.define(
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2,
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
Type.BOOLEAN,
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2_DEFAULT,
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DEFAULT,
Importance.LOW,
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2_DOC,
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DOC,
CONNECTOR_CONFIG,
9,
ConfigDef.Width.NONE,
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2_DISPLAY);
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DISPLAY);
}

public static class TopicToTableValidator implements ConfigDef.Validator {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.snowflake.kafka.connector.internal.streaming;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES_DEFAULT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2_DEFAULT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DEFAULT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ROLE;
import static com.snowflake.kafka.connector.internal.streaming.StreamingUtils.STREAMING_BUFFER_COUNT_RECORDS_DEFAULT;
import static com.snowflake.kafka.connector.internal.streaming.StreamingUtils.STREAMING_BUFFER_FLUSH_TIME_DEFAULT_SEC;
Expand Down Expand Up @@ -94,7 +94,7 @@ public class SnowflakeSinkServiceV2 implements SnowflakeSinkService {
private boolean enableSchematization;

/**
* Key is formulated in {@link #partitionChannelKey(String, String, int)} }
* Key is formulated in {@link #partitionChannelKey(String, String, int, boolean)}
*
* <p>value is the Streaming Ingest Channel implementation (Wrapped around TopicPartitionChannel)
*/
Expand All @@ -103,8 +103,10 @@ public class SnowflakeSinkServiceV2 implements SnowflakeSinkService {
// Cache for schema evolution
private final Map<String, Boolean> tableName2SchemaEvolutionPermission;

// This is the V2 of channel Name creation. (This corresponds to the config
// SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2)
/**
* This is the new format for channel Names. (This corresponds to the config {@link
* SnowflakeSinkConnectorConfig#SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT} )
*/
private final boolean shouldUseConnectorNameInChannelName;

public SnowflakeSinkServiceV2(
Expand Down Expand Up @@ -147,8 +149,8 @@ public SnowflakeSinkServiceV2(
this.shouldUseConnectorNameInChannelName =
Boolean.parseBoolean(
connectorConfig.getOrDefault(
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2,
String.valueOf(SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2_DEFAULT)));
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
String.valueOf(SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DEFAULT)));
}

@VisibleForTesting
Expand Down Expand Up @@ -197,8 +199,8 @@ public SnowflakeSinkServiceV2(
this.shouldUseConnectorNameInChannelName =
Boolean.parseBoolean(
connectorConfig.getOrDefault(
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2,
String.valueOf(SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2_DEFAULT)));
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
String.valueOf(SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DEFAULT)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ErrorTolerance;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD;

import com.google.common.base.Strings;
Expand Down Expand Up @@ -222,10 +222,10 @@ public static ImmutableMap<String, String> validateStreamingSnowpipeConfig(
BOOLEAN_VALIDATOR.ensureValid(
ERRORS_LOG_ENABLE_CONFIG, inputConfig.get(ERRORS_LOG_ENABLE_CONFIG));
}
if (inputConfig.containsKey(SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2)) {
if (inputConfig.containsKey(SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT)) {
BOOLEAN_VALIDATOR.ensureValid(
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2,
inputConfig.get(SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2));
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
inputConfig.get(SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT));
}

// Valid schematization for Snowpipe Streaming
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,7 @@ public void testEnableStreamingChannelFormatV2Config() {
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2, "true");
config.put(SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT, "true");
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
Utils.validateConfig(config);
}
Expand All @@ -886,12 +886,12 @@ public void testInvalidEnableStreamingChannelFormatV2Config() {
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
config.put(SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2, "yes");
config.put(SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT, "yes");
Utils.validateConfig(config);
} catch (SnowflakeKafkaConnectorException exception) {
assert exception
.getMessage()
.contains(SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2);
.contains(SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_TOLERANCE_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT;
import static com.snowflake.kafka.connector.internal.TestUtils.TEST_CONNECTOR_NAME;
import static com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.partitionChannelKey;

Expand Down Expand Up @@ -77,7 +77,7 @@ public void testSinkTaskInvalidRecord_InMemoryDLQ() throws Exception {
config.put(ERRORS_TOLERANCE_CONFIG, SnowflakeSinkConnectorConfig.ErrorTolerance.ALL.toString());
config.put(ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "test_DLQ");
config.put(
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2,
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
String.valueOf(this.shouldUseConnectorNameInChannelName));
InMemoryKafkaRecordErrorReporter errorReporter = new InMemoryKafkaRecordErrorReporter();
SnowflakeConnectionService mockConnectionService =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.snowflake.kafka.connector.internal.streaming;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT;
import static com.snowflake.kafka.connector.internal.TestUtils.TEST_CONNECTOR_NAME;
import static com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.partitionChannelKey;
import static com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE;
Expand Down Expand Up @@ -129,7 +129,7 @@ public void testChannelCloseIngestion() throws Exception {
Map<String, String> config = getConfig();
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2,
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
String.valueOf(this.shouldUseConnectorNameInChannelName));
conn.createTable(table);

Expand Down Expand Up @@ -181,7 +181,7 @@ public void testStreamingIngest_multipleChannelPartitions_closeSubsetOfPartition
Map<String, String> config = TestUtils.getConfForStreaming();
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2,
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
String.valueOf(this.shouldUseConnectorNameInChannelName));
conn.createTable(table);
TopicPartition tp1 = new TopicPartition(table, partition);
Expand Down Expand Up @@ -312,7 +312,7 @@ public void testStreamingIngestion() throws Exception {
Map<String, String> config = getConfig();
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2,
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
String.valueOf(this.shouldUseConnectorNameInChannelName));
conn.createTable(table);

Expand Down Expand Up @@ -379,7 +379,7 @@ public void testStreamingIngest_multipleChannelPartitions_withMetrics() throws E
Map<String, String> config = getConfig();
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2,
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
String.valueOf(this.shouldUseConnectorNameInChannelName));

// set up telemetry service spy
Expand Down Expand Up @@ -1518,7 +1518,7 @@ public void testStreamingIngestion_ChannelNameFormats() throws Exception {
Map<String, String> overriddenConfig = new HashMap<>(config);

config.put(
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2,
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
String.valueOf(this.shouldUseConnectorNameInChannelName));

conn.createTable(table);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.snowflake.kafka.connector.internal.streaming;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT;
import static com.snowflake.kafka.connector.internal.TestUtils.TEST_CONNECTOR_NAME;

import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
Expand Down Expand Up @@ -78,7 +78,7 @@ public void testAutoChannelReopenOn_OffsetTokenSFException() throws Exception {
Map<String, String> config = TestUtils.getConfForStreaming();
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2,
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
String.valueOf(this.shouldUseConnectorNameInChannelName));

InMemorySinkTaskContext inMemorySinkTaskContext =
Expand Down Expand Up @@ -139,7 +139,7 @@ public void testInsertRowsOnChannelClosed() throws Exception {
SnowflakeSinkConnectorConfig.setDefaultValues(config);
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2,
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
String.valueOf(this.shouldUseConnectorNameInChannelName));

InMemorySinkTaskContext inMemorySinkTaskContext =
Expand Down Expand Up @@ -207,7 +207,7 @@ public void testAutoChannelReopen_InsertRowsSFException() throws Exception {
SnowflakeSinkConnectorConfig.setDefaultValues(config);
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2,
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
String.valueOf(this.shouldUseConnectorNameInChannelName));

InMemorySinkTaskContext inMemorySinkTaskContext =
Expand Down Expand Up @@ -299,7 +299,7 @@ public void testAutoChannelReopen_MultiplePartitionsInsertRowsSFException() thro
config.put(SnowflakeSinkConnectorConfig.ENABLE_STREAMING_CLIENT_OPTIMIZATION_CONFIG, "true");
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2,
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
String.valueOf(this.shouldUseConnectorNameInChannelName));

InMemorySinkTaskContext inMemorySinkTaskContext =
Expand Down Expand Up @@ -422,7 +422,7 @@ public void testAutoChannelReopen_SinglePartitionsInsertRowsSFException() throws
config.put(SnowflakeSinkConnectorConfig.ENABLE_STREAMING_CLIENT_OPTIMIZATION_CONFIG, "true");
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2,
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
String.valueOf(this.shouldUseConnectorNameInChannelName));

InMemorySinkTaskContext inMemorySinkTaskContext =
Expand Down Expand Up @@ -501,7 +501,7 @@ public void testSimpleInsertRowsFailureWithArrowBDECFormat() throws Exception {
overriddenConfig.put(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, "1");
SnowflakeSinkConnectorConfig.setDefaultValues(overriddenConfig);
overriddenConfig.put(
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2,
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
String.valueOf(this.shouldUseConnectorNameInChannelName));

InMemorySinkTaskContext inMemorySinkTaskContext =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"jmx": "true",
"snowflake.enable.streaming.channel.format.v2": "true"
"snowflake.enable.new.channel.name.format": "true"
}
}

0 comments on commit 1c11539

Please sign in to comment.