diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java
index 0ad7a51b3..d14a3c01e 100644
--- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java
+++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java
@@ -168,6 +168,20 @@ public class SnowflakeSinkConnectorConfig {
"Whether to optimize the streaming client to reduce cost. Note that this may affect"
+ " throughput or latency and can only be set if Streaming Snowpipe is enabled";
+ public static final String SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT =
+ "snowflake.enable.new.channel.name.format";
+ public static final boolean SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DEFAULT = false;
+
+ public static final String SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DISPLAY =
+ "Enable Connector Name in Snowpipe Streaming Channel Name";
+
+ public static final String SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DOC =
+ "Whether to use connector name in streaming channels. If it is set to false, we will not use"
+ + " connector name in channel name(Which is new version of Channel Name). Note: Please"
+ + " use this config cautiously and it is not advised to use this if you are coming from"
+ + " old Snowflake Kafka Connector Version where Default Channel Name doesnt contain"
+ + " Connector Name, contains Topic Name and Partition # only.";
+
// MDC logging header
public static final String ENABLE_MDC_LOGGING_CONFIG = "enable.mdc.logging";
public static final String ENABLE_MDC_LOGGING_DISPLAY = "Enable MDC logging";
@@ -591,7 +605,17 @@ static ConfigDef newConfigDef() {
CONNECTOR_CONFIG,
8,
ConfigDef.Width.NONE,
- ENABLE_MDC_LOGGING_DISPLAY);
+ ENABLE_MDC_LOGGING_DISPLAY)
+ .define(
+ SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
+ Type.BOOLEAN,
+ SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DEFAULT,
+ Importance.LOW,
+ SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DOC,
+ CONNECTOR_CONFIG,
+ 9,
+ ConfigDef.Width.NONE,
+ SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DISPLAY);
}
public static class TopicToTableValidator implements ConfigDef.Validator {
diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java
index 6a5e8bbed..382cac469 100644
--- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java
+++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java
@@ -1,6 +1,8 @@
package com.snowflake.kafka.connector.internal.streaming;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES_DEFAULT;
+import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT;
+import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DEFAULT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ROLE;
import static com.snowflake.kafka.connector.internal.streaming.StreamingUtils.STREAMING_BUFFER_COUNT_RECORDS_DEFAULT;
import static com.snowflake.kafka.connector.internal.streaming.StreamingUtils.STREAMING_BUFFER_FLUSH_TIME_DEFAULT_SEC;
@@ -92,7 +94,7 @@ public class SnowflakeSinkServiceV2 implements SnowflakeSinkService {
private boolean enableSchematization;
/**
- * Key is formulated in {@link #partitionChannelKey(String, String, int)} }
+ * Key is formulated in {@link #partitionChannelKey(String, String, int, boolean)}
*
*
value is the Streaming Ingest Channel implementation (Wrapped around TopicPartitionChannel)
*/
@@ -101,6 +103,12 @@ public class SnowflakeSinkServiceV2 implements SnowflakeSinkService {
// Cache for schema evolution
private final Map tableName2SchemaEvolutionPermission;
+ /**
+ * This is the new format for channel Names. (This corresponds to the config {@link
+ * SnowflakeSinkConnectorConfig#SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT} )
+ */
+ private final boolean shouldUseConnectorNameInChannelName;
+
public SnowflakeSinkServiceV2(
SnowflakeConnectionService conn, Map connectorConfig) {
if (conn == null || conn.isClosed()) {
@@ -138,6 +146,11 @@ public SnowflakeSinkServiceV2(
? "default_connector"
: this.conn.getConnectorName();
this.metricsJmxReporter = new MetricsJmxReporter(new MetricRegistry(), connectorName);
+ this.shouldUseConnectorNameInChannelName =
+ Boolean.parseBoolean(
+ connectorConfig.getOrDefault(
+ SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
+ String.valueOf(SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DEFAULT)));
}
@VisibleForTesting
@@ -183,6 +196,11 @@ public SnowflakeSinkServiceV2(
populateSchemaEvolutionPermissions(tableName);
});
}
+ this.shouldUseConnectorNameInChannelName =
+ Boolean.parseBoolean(
+ connectorConfig.getOrDefault(
+ SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
+ String.valueOf(SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DEFAULT)));
}
/**
@@ -236,7 +254,10 @@ private void createStreamingChannelForTopicPartition(
boolean hasSchemaEvolutionPermission) {
final String partitionChannelKey =
partitionChannelKey(
- conn.getConnectorName(), topicPartition.topic(), topicPartition.partition());
+ conn.getConnectorName(),
+ topicPartition.topic(),
+ topicPartition.partition(),
+ this.shouldUseConnectorNameInChannelName);
// Create new instance of TopicPartitionChannel which will always open the channel.
partitionsToChannel.put(
partitionChannelKey,
@@ -296,7 +317,11 @@ public void insert(Collection records) {
@Override
public void insert(SinkRecord record) {
String partitionChannelKey =
- partitionChannelKey(this.conn.getConnectorName(), record.topic(), record.kafkaPartition());
+ partitionChannelKey(
+ this.conn.getConnectorName(),
+ record.topic(),
+ record.kafkaPartition(),
+ this.shouldUseConnectorNameInChannelName);
// init a new topic partition if it's not presented in cache or if channel is closed
if (!partitionsToChannel.containsKey(partitionChannelKey)
|| partitionsToChannel.get(partitionChannelKey).isChannelClosed()) {
@@ -317,7 +342,10 @@ public void insert(SinkRecord record) {
public long getOffset(TopicPartition topicPartition) {
String partitionChannelKey =
partitionChannelKey(
- conn.getConnectorName(), topicPartition.topic(), topicPartition.partition());
+ conn.getConnectorName(),
+ topicPartition.topic(),
+ topicPartition.partition(),
+ this.shouldUseConnectorNameInChannelName);
if (partitionsToChannel.containsKey(partitionChannelKey)) {
long offset = partitionsToChannel.get(partitionChannelKey).getOffsetSafeToCommitToKafka();
partitionsToChannel.get(partitionChannelKey).setLatestConsumerOffset(offset);
@@ -372,7 +400,10 @@ public void close(Collection partitions) {
topicPartition -> {
final String partitionChannelKey =
partitionChannelKey(
- conn.getConnectorName(), topicPartition.topic(), topicPartition.partition());
+ conn.getConnectorName(),
+ topicPartition.topic(),
+ topicPartition.partition(),
+ this.shouldUseConnectorNameInChannelName);
TopicPartitionChannel topicPartitionChannel =
partitionsToChannel.get(partitionChannelKey);
// Check for null since it's possible that the something goes wrong even before the
@@ -527,11 +558,19 @@ public Optional getMetricRegistry(String partitionChannelKey) {
* or PROD)
* @param topic topic name
* @param partition partition number
+ * @param shouldUseConnectorNameInChannelName If true, use connectorName, else not. This is the
+ * new format for channel Name.
* @return combinartion of topic and partition
*/
@VisibleForTesting
- public static String partitionChannelKey(String connectorName, String topic, int partition) {
- return connectorName + "_" + topic + "_" + partition;
+ public static String partitionChannelKey(
+ String connectorName,
+ String topic,
+ int partition,
+ final boolean shouldUseConnectorNameInChannelName) {
+ return shouldUseConnectorNameInChannelName
+ ? connectorName + "_" + topic + "_" + partition
+ : topic + "_" + partition;
}
/* Used for testing */
diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingUtils.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingUtils.java
index 2c6d877f7..b7ed370a6 100644
--- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingUtils.java
+++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingUtils.java
@@ -8,6 +8,7 @@
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ErrorTolerance;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD;
+import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD;
import com.google.common.base.Strings;
@@ -221,6 +222,11 @@ public static ImmutableMap validateStreamingSnowpipeConfig(
BOOLEAN_VALIDATOR.ensureValid(
ERRORS_LOG_ENABLE_CONFIG, inputConfig.get(ERRORS_LOG_ENABLE_CONFIG));
}
+ if (inputConfig.containsKey(SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT)) {
+ BOOLEAN_VALIDATOR.ensureValid(
+ SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
+ inputConfig.get(SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT));
+ }
// Valid schematization for Snowpipe Streaming
invalidParams.putAll(validateSchematizationConfig(inputConfig));
diff --git a/src/test/java/com/snowflake/kafka/connector/ConnectorConfigTest.java b/src/test/java/com/snowflake/kafka/connector/ConnectorConfigTest.java
index da9198f11..219d62c35 100644
--- a/src/test/java/com/snowflake/kafka/connector/ConnectorConfigTest.java
+++ b/src/test/java/com/snowflake/kafka/connector/ConnectorConfigTest.java
@@ -867,6 +867,34 @@ public void testInvalidEnableOptimizeStreamingClientConfig() {
}
}
+ @Test
+ public void testEnableStreamingChannelFormatV2Config() {
+ Map config = getConfig();
+ config.put(
+ SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
+ IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
+ config.put(SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT, "true");
+ config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
+ Utils.validateConfig(config);
+ }
+
+ @Test
+ public void testInvalidEnableStreamingChannelFormatV2Config() {
+ try {
+ Map config = getConfig();
+ config.put(
+ SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
+ IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
+ config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
+ config.put(SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT, "yes");
+ Utils.validateConfig(config);
+ } catch (SnowflakeKafkaConnectorException exception) {
+ assert exception
+ .getMessage()
+ .contains(SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT);
+ }
+ }
+
@Test
public void testInvalidEmptyConfig() {
try {
diff --git a/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskStreamingTest.java b/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskStreamingTest.java
index 1399545ac..7c77f53be 100644
--- a/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskStreamingTest.java
+++ b/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskStreamingTest.java
@@ -4,6 +4,7 @@
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_TOLERANCE_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT;
+import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT;
import static com.snowflake.kafka.connector.internal.TestUtils.TEST_CONNECTOR_NAME;
import static com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.partitionChannelKey;
@@ -19,8 +20,10 @@
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
import com.snowflake.kafka.connector.records.RecordService;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
@@ -36,15 +39,29 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
/** Unit test for testing Snowflake Sink Task Behavior with Snowpipe Streaming */
+@RunWith(Parameterized.class)
public class SnowflakeSinkTaskStreamingTest {
private String topicName;
private static int partition = 0;
private TopicPartition topicPartition;
+ private final boolean shouldUseConnectorNameInChannelName;
+
+ @Parameterized.Parameters
+ public static List input() {
+ return Arrays.asList(Boolean.TRUE, Boolean.FALSE);
+ }
+
+ public SnowflakeSinkTaskStreamingTest(boolean shouldUseConnectorNameInChannelName) {
+ this.shouldUseConnectorNameInChannelName = shouldUseConnectorNameInChannelName;
+ }
+
@Before
public void setup() {
topicName = TestUtils.randomTableName();
@@ -59,6 +76,9 @@ public void testSinkTaskInvalidRecord_InMemoryDLQ() throws Exception {
config.put(INGESTION_METHOD_OPT, IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(ERRORS_TOLERANCE_CONFIG, SnowflakeSinkConnectorConfig.ErrorTolerance.ALL.toString());
config.put(ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "test_DLQ");
+ config.put(
+ SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
+ String.valueOf(this.shouldUseConnectorNameInChannelName));
InMemoryKafkaRecordErrorReporter errorReporter = new InMemoryKafkaRecordErrorReporter();
SnowflakeConnectionService mockConnectionService =
Mockito.mock(SnowflakeConnectionServiceV1.class);
@@ -88,7 +108,11 @@ public void testSinkTaskInvalidRecord_InMemoryDLQ() throws Exception {
new TopicPartitionChannel(
mockStreamingClient,
topicPartition,
- SnowflakeSinkServiceV2.partitionChannelKey(TEST_CONNECTOR_NAME, topicName, partition),
+ SnowflakeSinkServiceV2.partitionChannelKey(
+ TEST_CONNECTOR_NAME,
+ topicName,
+ partition,
+ this.shouldUseConnectorNameInChannelName),
topicName,
new StreamingBufferThreshold(10, 10_000, 1),
config,
@@ -98,7 +122,12 @@ public void testSinkTaskInvalidRecord_InMemoryDLQ() throws Exception {
Map topicPartitionChannelMap =
Collections.singletonMap(
- partitionChannelKey(TEST_CONNECTOR_NAME, topicName, partition), topicPartitionChannel);
+ partitionChannelKey(
+ TEST_CONNECTOR_NAME,
+ topicName,
+ partition,
+ this.shouldUseConnectorNameInChannelName),
+ topicPartitionChannel);
SnowflakeSinkServiceV2 mockSinkService =
new SnowflakeSinkServiceV2(
diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java
index 64d70f49b..edd44ed30 100644
--- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java
+++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java
@@ -1,5 +1,6 @@
package com.snowflake.kafka.connector.internal.streaming;
+import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT;
import static com.snowflake.kafka.connector.internal.TestUtils.TEST_CONNECTOR_NAME;
import static com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.partitionChannelKey;
import static com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE;
@@ -62,14 +63,21 @@ public class SnowflakeSinkServiceV2IT {
// use OAuth as authenticator or not
private boolean useOAuth;
- @Parameterized.Parameters(name = "useOAuth: {0}")
+ private final boolean shouldUseConnectorNameInChannelName;
+
+ @Parameterized.Parameters(name = "useOAuth: {0}, shouldUseConnectorNameInChannelName: {1}")
public static Collection