Skip to content

Commit

Permalink
PROD-39429 Add parameter for connector name in channel name and rever…
Browse files Browse the repository at this point in the history
…t the behavior: (#732)
  • Loading branch information
sfc-gh-japatel authored Oct 23, 2023
1 parent 055dfca commit feaa491
Show file tree
Hide file tree
Showing 11 changed files with 386 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,20 @@ public class SnowflakeSinkConnectorConfig {
"Whether to optimize the streaming client to reduce cost. Note that this may affect"
+ " throughput or latency and can only be set if Streaming Snowpipe is enabled";

public static final String SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT =
"snowflake.enable.new.channel.name.format";
public static final boolean SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DEFAULT = false;

public static final String SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DISPLAY =
"Enable Connector Name in Snowpipe Streaming Channel Name";

public static final String SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DOC =
"Whether to use connector name in streaming channels. If it is set to false, we will not use"
+ " connector name in channel name(Which is new version of Channel Name). Note: Please"
+ " use this config cautiously and it is not advised to use this if you are coming from"
+ " old Snowflake Kafka Connector Version where Default Channel Name doesnt contain"
+ " Connector Name, contains Topic Name and Partition # only.";

// MDC logging header
public static final String ENABLE_MDC_LOGGING_CONFIG = "enable.mdc.logging";
public static final String ENABLE_MDC_LOGGING_DISPLAY = "Enable MDC logging";
Expand Down Expand Up @@ -591,7 +605,17 @@ static ConfigDef newConfigDef() {
CONNECTOR_CONFIG,
8,
ConfigDef.Width.NONE,
ENABLE_MDC_LOGGING_DISPLAY);
ENABLE_MDC_LOGGING_DISPLAY)
.define(
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
Type.BOOLEAN,
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DEFAULT,
Importance.LOW,
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DOC,
CONNECTOR_CONFIG,
9,
ConfigDef.Width.NONE,
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DISPLAY);
}

public static class TopicToTableValidator implements ConfigDef.Validator {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.snowflake.kafka.connector.internal.streaming;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES_DEFAULT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DEFAULT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ROLE;
import static com.snowflake.kafka.connector.internal.streaming.StreamingUtils.STREAMING_BUFFER_COUNT_RECORDS_DEFAULT;
import static com.snowflake.kafka.connector.internal.streaming.StreamingUtils.STREAMING_BUFFER_FLUSH_TIME_DEFAULT_SEC;
Expand Down Expand Up @@ -92,7 +94,7 @@ public class SnowflakeSinkServiceV2 implements SnowflakeSinkService {
private boolean enableSchematization;

/**
* Key is formulated in {@link #partitionChannelKey(String, String, int)} }
* Key is formulated in {@link #partitionChannelKey(String, String, int, boolean)}
*
* <p>value is the Streaming Ingest Channel implementation (Wrapped around TopicPartitionChannel)
*/
Expand All @@ -101,6 +103,12 @@ public class SnowflakeSinkServiceV2 implements SnowflakeSinkService {
// Cache for schema evolution
private final Map<String, Boolean> tableName2SchemaEvolutionPermission;

/**
* This is the new format for channel Names. (This corresponds to the config {@link
* SnowflakeSinkConnectorConfig#SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT} )
*/
private final boolean shouldUseConnectorNameInChannelName;

public SnowflakeSinkServiceV2(
SnowflakeConnectionService conn, Map<String, String> connectorConfig) {
if (conn == null || conn.isClosed()) {
Expand Down Expand Up @@ -138,6 +146,11 @@ public SnowflakeSinkServiceV2(
? "default_connector"
: this.conn.getConnectorName();
this.metricsJmxReporter = new MetricsJmxReporter(new MetricRegistry(), connectorName);
this.shouldUseConnectorNameInChannelName =
Boolean.parseBoolean(
connectorConfig.getOrDefault(
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
String.valueOf(SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DEFAULT)));
}

@VisibleForTesting
Expand Down Expand Up @@ -183,6 +196,11 @@ public SnowflakeSinkServiceV2(
populateSchemaEvolutionPermissions(tableName);
});
}
this.shouldUseConnectorNameInChannelName =
Boolean.parseBoolean(
connectorConfig.getOrDefault(
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
String.valueOf(SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DEFAULT)));
}

/**
Expand Down Expand Up @@ -236,7 +254,10 @@ private void createStreamingChannelForTopicPartition(
boolean hasSchemaEvolutionPermission) {
final String partitionChannelKey =
partitionChannelKey(
conn.getConnectorName(), topicPartition.topic(), topicPartition.partition());
conn.getConnectorName(),
topicPartition.topic(),
topicPartition.partition(),
this.shouldUseConnectorNameInChannelName);
// Create new instance of TopicPartitionChannel which will always open the channel.
partitionsToChannel.put(
partitionChannelKey,
Expand Down Expand Up @@ -296,7 +317,11 @@ public void insert(Collection<SinkRecord> records) {
@Override
public void insert(SinkRecord record) {
String partitionChannelKey =
partitionChannelKey(this.conn.getConnectorName(), record.topic(), record.kafkaPartition());
partitionChannelKey(
this.conn.getConnectorName(),
record.topic(),
record.kafkaPartition(),
this.shouldUseConnectorNameInChannelName);
// init a new topic partition if it's not presented in cache or if channel is closed
if (!partitionsToChannel.containsKey(partitionChannelKey)
|| partitionsToChannel.get(partitionChannelKey).isChannelClosed()) {
Expand All @@ -317,7 +342,10 @@ public void insert(SinkRecord record) {
public long getOffset(TopicPartition topicPartition) {
String partitionChannelKey =
partitionChannelKey(
conn.getConnectorName(), topicPartition.topic(), topicPartition.partition());
conn.getConnectorName(),
topicPartition.topic(),
topicPartition.partition(),
this.shouldUseConnectorNameInChannelName);
if (partitionsToChannel.containsKey(partitionChannelKey)) {
long offset = partitionsToChannel.get(partitionChannelKey).getOffsetSafeToCommitToKafka();
partitionsToChannel.get(partitionChannelKey).setLatestConsumerOffset(offset);
Expand Down Expand Up @@ -372,7 +400,10 @@ public void close(Collection<TopicPartition> partitions) {
topicPartition -> {
final String partitionChannelKey =
partitionChannelKey(
conn.getConnectorName(), topicPartition.topic(), topicPartition.partition());
conn.getConnectorName(),
topicPartition.topic(),
topicPartition.partition(),
this.shouldUseConnectorNameInChannelName);
TopicPartitionChannel topicPartitionChannel =
partitionsToChannel.get(partitionChannelKey);
// Check for null since it's possible that the something goes wrong even before the
Expand Down Expand Up @@ -527,11 +558,19 @@ public Optional<MetricRegistry> getMetricRegistry(String partitionChannelKey) {
* or PROD)
* @param topic topic name
* @param partition partition number
* @param shouldUseConnectorNameInChannelName If true, use connectorName, else not. This is the
* new format for channel Name.
* @return combinartion of topic and partition
*/
@VisibleForTesting
public static String partitionChannelKey(String connectorName, String topic, int partition) {
return connectorName + "_" + topic + "_" + partition;
public static String partitionChannelKey(
String connectorName,
String topic,
int partition,
final boolean shouldUseConnectorNameInChannelName) {
return shouldUseConnectorNameInChannelName
? connectorName + "_" + topic + "_" + partition
: topic + "_" + partition;
}

/* Used for testing */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ErrorTolerance;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD;

import com.google.common.base.Strings;
Expand Down Expand Up @@ -221,6 +222,11 @@ public static ImmutableMap<String, String> validateStreamingSnowpipeConfig(
BOOLEAN_VALIDATOR.ensureValid(
ERRORS_LOG_ENABLE_CONFIG, inputConfig.get(ERRORS_LOG_ENABLE_CONFIG));
}
if (inputConfig.containsKey(SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT)) {
BOOLEAN_VALIDATOR.ensureValid(
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
inputConfig.get(SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT));
}

// Valid schematization for Snowpipe Streaming
invalidParams.putAll(validateSchematizationConfig(inputConfig));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,34 @@ public void testInvalidEnableOptimizeStreamingClientConfig() {
}
}

@Test
public void testEnableStreamingChannelFormatV2Config() {
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT, "true");
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
Utils.validateConfig(config);
}

@Test
public void testInvalidEnableStreamingChannelFormatV2Config() {
try {
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
config.put(SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT, "yes");
Utils.validateConfig(config);
} catch (SnowflakeKafkaConnectorException exception) {
assert exception
.getMessage()
.contains(SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT);
}
}

@Test
public void testInvalidEmptyConfig() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_TOLERANCE_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT;
import static com.snowflake.kafka.connector.internal.TestUtils.TEST_CONNECTOR_NAME;
import static com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.partitionChannelKey;

Expand All @@ -19,8 +20,10 @@
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
import com.snowflake.kafka.connector.records.RecordService;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
Expand All @@ -36,15 +39,29 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/** Unit test for testing Snowflake Sink Task Behavior with Snowpipe Streaming */
@RunWith(Parameterized.class)
public class SnowflakeSinkTaskStreamingTest {
private String topicName;
private static int partition = 0;
private TopicPartition topicPartition;

private final boolean shouldUseConnectorNameInChannelName;

@Parameterized.Parameters
public static List<Boolean> input() {
return Arrays.asList(Boolean.TRUE, Boolean.FALSE);
}

public SnowflakeSinkTaskStreamingTest(boolean shouldUseConnectorNameInChannelName) {
this.shouldUseConnectorNameInChannelName = shouldUseConnectorNameInChannelName;
}

@Before
public void setup() {
topicName = TestUtils.randomTableName();
Expand All @@ -59,6 +76,9 @@ public void testSinkTaskInvalidRecord_InMemoryDLQ() throws Exception {
config.put(INGESTION_METHOD_OPT, IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(ERRORS_TOLERANCE_CONFIG, SnowflakeSinkConnectorConfig.ErrorTolerance.ALL.toString());
config.put(ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "test_DLQ");
config.put(
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
String.valueOf(this.shouldUseConnectorNameInChannelName));
InMemoryKafkaRecordErrorReporter errorReporter = new InMemoryKafkaRecordErrorReporter();
SnowflakeConnectionService mockConnectionService =
Mockito.mock(SnowflakeConnectionServiceV1.class);
Expand Down Expand Up @@ -88,7 +108,11 @@ public void testSinkTaskInvalidRecord_InMemoryDLQ() throws Exception {
new TopicPartitionChannel(
mockStreamingClient,
topicPartition,
SnowflakeSinkServiceV2.partitionChannelKey(TEST_CONNECTOR_NAME, topicName, partition),
SnowflakeSinkServiceV2.partitionChannelKey(
TEST_CONNECTOR_NAME,
topicName,
partition,
this.shouldUseConnectorNameInChannelName),
topicName,
new StreamingBufferThreshold(10, 10_000, 1),
config,
Expand All @@ -98,7 +122,12 @@ public void testSinkTaskInvalidRecord_InMemoryDLQ() throws Exception {

Map topicPartitionChannelMap =
Collections.singletonMap(
partitionChannelKey(TEST_CONNECTOR_NAME, topicName, partition), topicPartitionChannel);
partitionChannelKey(
TEST_CONNECTOR_NAME,
topicName,
partition,
this.shouldUseConnectorNameInChannelName),
topicPartitionChannel);

SnowflakeSinkServiceV2 mockSinkService =
new SnowflakeSinkServiceV2(
Expand Down
Loading

0 comments on commit feaa491

Please sign in to comment.