Skip to content

Commit

Permalink
merge master
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-rcheng committed Oct 25, 2023
2 parents fbd1df2 + 3f99a07 commit 5188952
Show file tree
Hide file tree
Showing 19 changed files with 408 additions and 33 deletions.
7 changes: 6 additions & 1 deletion .github/workflows/snyk-issue.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@ on:
schedule:
- cron: '* */12 * * *'

permissions:
contents: read
issues: write
pull-requests: write

concurrency: snyk-issue

jobs:
whitesource:
snyk:
runs-on: ubuntu-latest
steps:
- name: checkout action
Expand Down
8 changes: 7 additions & 1 deletion .github/workflows/snyk-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,14 @@ on:
pull_request:
branches:
- master

permissions:
contents: read
issues: write
pull-requests: write

jobs:
whitesource:
snyk:
runs-on: ubuntu-latest
if: ${{ github.event.pull_request.user.login == 'sfc-gh-snyk-sca-sa' }}
steps:
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
repos:
- repo: [email protected]:snowflakedb/casec_precommit.git
rev: v1.3
rev: v1.29
hooks:
- id: secret-scanner
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

<groupId>com.snowflake</groupId>
<artifactId>snowflake-kafka-connector</artifactId>
<version>2.1.0</version>
<version>2.1.1</version>
<packaging>jar</packaging>
<name>Snowflake Kafka Connector</name>
<description>Snowflake Kafka Connect Sink Connector</description>
Expand Down
4 changes: 2 additions & 2 deletions pom_confluent.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

<groupId>com.snowflake</groupId>
<artifactId>snowflake-kafka-connector</artifactId>
<version>2.1.0</version>
<version>2.1.1</version>
<packaging>jar</packaging>
<name>Snowflake Kafka Connector</name>
<description>Snowflake Kafka Connect Sink Connector</description>
Expand Down Expand Up @@ -398,7 +398,7 @@
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.1</version>
<version>1.11.3</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,20 @@ public class SnowflakeSinkConnectorConfig {
"Whether to optimize the streaming client to reduce cost. Note that this may affect"
+ " throughput or latency and can only be set if Streaming Snowpipe is enabled";

public static final String SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT =
"snowflake.enable.new.channel.name.format";
public static final boolean SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DEFAULT = false;

public static final String SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DISPLAY =
"Enable Connector Name in Snowpipe Streaming Channel Name";

public static final String SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DOC =
"Whether to use connector name in streaming channels. If it is set to false, we will not use"
+ " connector name in channel name(Which is new version of Channel Name). Note: Please"
+ " use this config cautiously and it is not advised to use this if you are coming from"
+ " old Snowflake Kafka Connector Version where Default Channel Name doesnt contain"
+ " Connector Name, contains Topic Name and Partition # only.";

// MDC logging header
public static final String ENABLE_MDC_LOGGING_CONFIG = "enable.mdc.logging";
public static final String ENABLE_MDC_LOGGING_DISPLAY = "Enable MDC logging";
Expand Down Expand Up @@ -591,7 +605,17 @@ static ConfigDef newConfigDef() {
CONNECTOR_CONFIG,
8,
ConfigDef.Width.NONE,
ENABLE_MDC_LOGGING_DISPLAY);
ENABLE_MDC_LOGGING_DISPLAY)
.define(
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
Type.BOOLEAN,
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DEFAULT,
Importance.LOW,
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DOC,
CONNECTOR_CONFIG,
9,
ConfigDef.Width.NONE,
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DISPLAY);
}

public static class TopicToTableValidator implements ConfigDef.Validator {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/snowflake/kafka/connector/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
public class Utils {

// Connector version, change every release
public static final String VERSION = "2.1.0";
public static final String VERSION = "2.1.1";

// connector parameter list
public static final String NAME = "name";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.snowflake.kafka.connector.internal.streaming;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES_DEFAULT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DEFAULT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ROLE;
import static com.snowflake.kafka.connector.internal.streaming.StreamingUtils.STREAMING_BUFFER_COUNT_RECORDS_DEFAULT;
import static com.snowflake.kafka.connector.internal.streaming.StreamingUtils.STREAMING_BUFFER_FLUSH_TIME_DEFAULT_SEC;
Expand Down Expand Up @@ -92,7 +94,7 @@ public class SnowflakeSinkServiceV2 implements SnowflakeSinkService {
private boolean enableSchematization;

/**
* Key is formulated in {@link #partitionChannelKey(String, String, int)} }
* Key is formulated in {@link #partitionChannelKey(String, String, int, boolean)}
*
* <p>value is the Streaming Ingest Channel implementation (Wrapped around TopicPartitionChannel)
*/
Expand All @@ -101,6 +103,12 @@ public class SnowflakeSinkServiceV2 implements SnowflakeSinkService {
// Cache for schema evolution
private final Map<String, Boolean> tableName2SchemaEvolutionPermission;

/**
* This is the new format for channel Names. (This corresponds to the config {@link
* SnowflakeSinkConnectorConfig#SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT} )
*/
private final boolean shouldUseConnectorNameInChannelName;

public SnowflakeSinkServiceV2(
SnowflakeConnectionService conn, Map<String, String> connectorConfig) {
if (conn == null || conn.isClosed()) {
Expand Down Expand Up @@ -138,6 +146,11 @@ public SnowflakeSinkServiceV2(
? "default_connector"
: this.conn.getConnectorName();
this.metricsJmxReporter = new MetricsJmxReporter(new MetricRegistry(), connectorName);
this.shouldUseConnectorNameInChannelName =
Boolean.parseBoolean(
connectorConfig.getOrDefault(
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
String.valueOf(SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DEFAULT)));
}

@VisibleForTesting
Expand Down Expand Up @@ -183,6 +196,11 @@ public SnowflakeSinkServiceV2(
populateSchemaEvolutionPermissions(tableName);
});
}
this.shouldUseConnectorNameInChannelName =
Boolean.parseBoolean(
connectorConfig.getOrDefault(
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
String.valueOf(SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DEFAULT)));
}

/**
Expand Down Expand Up @@ -236,7 +254,10 @@ private void createStreamingChannelForTopicPartition(
boolean hasSchemaEvolutionPermission) {
final String partitionChannelKey =
partitionChannelKey(
conn.getConnectorName(), topicPartition.topic(), topicPartition.partition());
conn.getConnectorName(),
topicPartition.topic(),
topicPartition.partition(),
this.shouldUseConnectorNameInChannelName);
// Create new instance of TopicPartitionChannel which will always open the channel.
partitionsToChannel.put(
partitionChannelKey,
Expand Down Expand Up @@ -296,7 +317,11 @@ public void insert(Collection<SinkRecord> records) {
@Override
public void insert(SinkRecord record) {
String partitionChannelKey =
partitionChannelKey(this.conn.getConnectorName(), record.topic(), record.kafkaPartition());
partitionChannelKey(
this.conn.getConnectorName(),
record.topic(),
record.kafkaPartition(),
this.shouldUseConnectorNameInChannelName);
// init a new topic partition if it's not presented in cache or if channel is closed
if (!partitionsToChannel.containsKey(partitionChannelKey)
|| partitionsToChannel.get(partitionChannelKey).isChannelClosed()) {
Expand All @@ -317,7 +342,10 @@ public void insert(SinkRecord record) {
public long getOffset(TopicPartition topicPartition) {
String partitionChannelKey =
partitionChannelKey(
conn.getConnectorName(), topicPartition.topic(), topicPartition.partition());
conn.getConnectorName(),
topicPartition.topic(),
topicPartition.partition(),
this.shouldUseConnectorNameInChannelName);
if (partitionsToChannel.containsKey(partitionChannelKey)) {
long offset = partitionsToChannel.get(partitionChannelKey).getOffsetSafeToCommitToKafka();
partitionsToChannel.get(partitionChannelKey).setLatestConsumerOffset(offset);
Expand Down Expand Up @@ -372,7 +400,10 @@ public void close(Collection<TopicPartition> partitions) {
topicPartition -> {
final String partitionChannelKey =
partitionChannelKey(
conn.getConnectorName(), topicPartition.topic(), topicPartition.partition());
conn.getConnectorName(),
topicPartition.topic(),
topicPartition.partition(),
this.shouldUseConnectorNameInChannelName);
TopicPartitionChannel topicPartitionChannel =
partitionsToChannel.get(partitionChannelKey);
// Check for null since it's possible that the something goes wrong even before the
Expand Down Expand Up @@ -527,11 +558,19 @@ public Optional<MetricRegistry> getMetricRegistry(String partitionChannelKey) {
* or PROD)
* @param topic topic name
* @param partition partition number
* @param shouldUseConnectorNameInChannelName If true, use connectorName, else not. This is the
* new format for channel Name.
* @return combinartion of topic and partition
*/
@VisibleForTesting
public static String partitionChannelKey(String connectorName, String topic, int partition) {
return connectorName + "_" + topic + "_" + partition;
public static String partitionChannelKey(
String connectorName,
String topic,
int partition,
final boolean shouldUseConnectorNameInChannelName) {
return shouldUseConnectorNameInChannelName
? connectorName + "_" + topic + "_" + partition
: topic + "_" + partition;
}

/* Used for testing */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ErrorTolerance;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD;

import com.google.common.base.Strings;
Expand Down Expand Up @@ -221,6 +222,11 @@ public static ImmutableMap<String, String> validateStreamingSnowpipeConfig(
BOOLEAN_VALIDATOR.ensureValid(
ERRORS_LOG_ENABLE_CONFIG, inputConfig.get(ERRORS_LOG_ENABLE_CONFIG));
}
if (inputConfig.containsKey(SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT)) {
BOOLEAN_VALIDATOR.ensureValid(
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
inputConfig.get(SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT));
}

// Valid schematization for Snowpipe Streaming
invalidParams.putAll(validateSchematizationConfig(inputConfig));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,34 @@ public void testInvalidEnableOptimizeStreamingClientConfig() {
}
}

@Test
public void testEnableStreamingChannelFormatV2Config() {
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT, "true");
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
Utils.validateConfig(config);
}

@Test
public void testInvalidEnableStreamingChannelFormatV2Config() {
try {
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
config.put(SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT, "yes");
Utils.validateConfig(config);
} catch (SnowflakeKafkaConnectorException exception) {
assert exception
.getMessage()
.contains(SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT);
}
}

@Test
public void testInvalidEmptyConfig() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_TOLERANCE_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT;
import static com.snowflake.kafka.connector.internal.TestUtils.TEST_CONNECTOR_NAME;
import static com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.partitionChannelKey;

Expand All @@ -19,8 +20,10 @@
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
import com.snowflake.kafka.connector.records.RecordService;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
Expand All @@ -36,15 +39,29 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/** Unit test for testing Snowflake Sink Task Behavior with Snowpipe Streaming */
@RunWith(Parameterized.class)
public class SnowflakeSinkTaskStreamingTest {
private String topicName;
private static int partition = 0;
private TopicPartition topicPartition;

private final boolean shouldUseConnectorNameInChannelName;

@Parameterized.Parameters
public static List<Boolean> input() {
return Arrays.asList(Boolean.TRUE, Boolean.FALSE);
}

public SnowflakeSinkTaskStreamingTest(boolean shouldUseConnectorNameInChannelName) {
this.shouldUseConnectorNameInChannelName = shouldUseConnectorNameInChannelName;
}

@Before
public void setup() {
topicName = TestUtils.randomTableName();
Expand All @@ -59,6 +76,9 @@ public void testSinkTaskInvalidRecord_InMemoryDLQ() throws Exception {
config.put(INGESTION_METHOD_OPT, IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(ERRORS_TOLERANCE_CONFIG, SnowflakeSinkConnectorConfig.ErrorTolerance.ALL.toString());
config.put(ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "test_DLQ");
config.put(
SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT,
String.valueOf(this.shouldUseConnectorNameInChannelName));
InMemoryKafkaRecordErrorReporter errorReporter = new InMemoryKafkaRecordErrorReporter();
SnowflakeConnectionService mockConnectionService =
Mockito.mock(SnowflakeConnectionServiceV1.class);
Expand Down Expand Up @@ -88,7 +108,11 @@ public void testSinkTaskInvalidRecord_InMemoryDLQ() throws Exception {
new TopicPartitionChannel(
mockStreamingClient,
topicPartition,
SnowflakeSinkServiceV2.partitionChannelKey(TEST_CONNECTOR_NAME, topicName, partition),
SnowflakeSinkServiceV2.partitionChannelKey(
TEST_CONNECTOR_NAME,
topicName,
partition,
this.shouldUseConnectorNameInChannelName),
topicName,
new StreamingBufferThreshold(10, 10_000, 1),
config,
Expand All @@ -98,7 +122,12 @@ public void testSinkTaskInvalidRecord_InMemoryDLQ() throws Exception {

Map topicPartitionChannelMap =
Collections.singletonMap(
partitionChannelKey(TEST_CONNECTOR_NAME, topicName, partition), topicPartitionChannel);
partitionChannelKey(
TEST_CONNECTOR_NAME,
topicName,
partition,
this.shouldUseConnectorNameInChannelName),
topicPartitionChannel);

SnowflakeSinkServiceV2 mockSinkService =
new SnowflakeSinkServiceV2(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,8 @@ private void testIngestTombstoneRunner(
this.behavior == SnowflakeSinkConnectorConfig.BehaviorOnNullValues.DEFAULT
? sinkRecords.size()
: 1;
TestUtils.assertWithRetry(() -> TestUtils.tableSize(table) == expectedOffset, 10, 5);
TestUtils.assertWithRetry(() -> TestUtils.tableSize(table) == expectedOffset, 10, 20);
TestUtils.assertWithRetry(
() -> service.getOffset(new TopicPartition(topic, partition)) == expectedOffset, 10, 5);
() -> service.getOffset(new TopicPartition(topic, partition)) == expectedOffset, 10, 20);
}
}
Loading

0 comments on commit 5188952

Please sign in to comment.