diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java index d14a3c01e..0ad7a51b3 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java @@ -168,20 +168,6 @@ public class SnowflakeSinkConnectorConfig { "Whether to optimize the streaming client to reduce cost. Note that this may affect" + " throughput or latency and can only be set if Streaming Snowpipe is enabled"; - public static final String SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT = - "snowflake.enable.new.channel.name.format"; - public static final boolean SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DEFAULT = false; - - public static final String SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DISPLAY = - "Enable Connector Name in Snowpipe Streaming Channel Name"; - - public static final String SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DOC = - "Whether to use connector name in streaming channels. If it is set to false, we will not use" - + " connector name in channel name(Which is new version of Channel Name). Note: Please" - + " use this config cautiously and it is not advised to use this if you are coming from" - + " old Snowflake Kafka Connector Version where Default Channel Name doesnt contain" - + " Connector Name, contains Topic Name and Partition # only."; - // MDC logging header public static final String ENABLE_MDC_LOGGING_CONFIG = "enable.mdc.logging"; public static final String ENABLE_MDC_LOGGING_DISPLAY = "Enable MDC logging"; @@ -605,17 +591,7 @@ static ConfigDef newConfigDef() { CONNECTOR_CONFIG, 8, ConfigDef.Width.NONE, - ENABLE_MDC_LOGGING_DISPLAY) - .define( - SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT, - Type.BOOLEAN, - SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DEFAULT, - Importance.LOW, - SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DOC, - CONNECTOR_CONFIG, - 9, - ConfigDef.Width.NONE, - SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DISPLAY); + ENABLE_MDC_LOGGING_DISPLAY); } public static class TopicToTableValidator implements ConfigDef.Validator { diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java index 382cac469..6a5e8bbed 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java @@ -1,8 +1,6 @@ package com.snowflake.kafka.connector.internal.streaming; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES_DEFAULT; -import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT; -import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DEFAULT; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ROLE; import static com.snowflake.kafka.connector.internal.streaming.StreamingUtils.STREAMING_BUFFER_COUNT_RECORDS_DEFAULT; import static com.snowflake.kafka.connector.internal.streaming.StreamingUtils.STREAMING_BUFFER_FLUSH_TIME_DEFAULT_SEC; @@ -94,7 +92,7 @@ public class SnowflakeSinkServiceV2 implements SnowflakeSinkService { private boolean enableSchematization; /** - * Key is formulated in {@link #partitionChannelKey(String, String, int, boolean)} + * Key is formulated in {@link #partitionChannelKey(String, String, int)} } * *

value is the Streaming Ingest Channel implementation (Wrapped around TopicPartitionChannel) */ @@ -103,12 +101,6 @@ public class SnowflakeSinkServiceV2 implements SnowflakeSinkService { // Cache for schema evolution private final Map tableName2SchemaEvolutionPermission; - /** - * This is the new format for channel Names. (This corresponds to the config {@link - * SnowflakeSinkConnectorConfig#SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT} ) - */ - private final boolean shouldUseConnectorNameInChannelName; - public SnowflakeSinkServiceV2( SnowflakeConnectionService conn, Map connectorConfig) { if (conn == null || conn.isClosed()) { @@ -146,11 +138,6 @@ public SnowflakeSinkServiceV2( ? "default_connector" : this.conn.getConnectorName(); this.metricsJmxReporter = new MetricsJmxReporter(new MetricRegistry(), connectorName); - this.shouldUseConnectorNameInChannelName = - Boolean.parseBoolean( - connectorConfig.getOrDefault( - SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT, - String.valueOf(SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DEFAULT))); } @VisibleForTesting @@ -196,11 +183,6 @@ public SnowflakeSinkServiceV2( populateSchemaEvolutionPermissions(tableName); }); } - this.shouldUseConnectorNameInChannelName = - Boolean.parseBoolean( - connectorConfig.getOrDefault( - SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT, - String.valueOf(SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT_DEFAULT))); } /** @@ -254,10 +236,7 @@ private void createStreamingChannelForTopicPartition( boolean hasSchemaEvolutionPermission) { final String partitionChannelKey = partitionChannelKey( - conn.getConnectorName(), - topicPartition.topic(), - topicPartition.partition(), - this.shouldUseConnectorNameInChannelName); + conn.getConnectorName(), topicPartition.topic(), topicPartition.partition()); // Create new instance of TopicPartitionChannel which will always open the channel. partitionsToChannel.put( partitionChannelKey, @@ -317,11 +296,7 @@ public void insert(Collection records) { @Override public void insert(SinkRecord record) { String partitionChannelKey = - partitionChannelKey( - this.conn.getConnectorName(), - record.topic(), - record.kafkaPartition(), - this.shouldUseConnectorNameInChannelName); + partitionChannelKey(this.conn.getConnectorName(), record.topic(), record.kafkaPartition()); // init a new topic partition if it's not presented in cache or if channel is closed if (!partitionsToChannel.containsKey(partitionChannelKey) || partitionsToChannel.get(partitionChannelKey).isChannelClosed()) { @@ -342,10 +317,7 @@ public void insert(SinkRecord record) { public long getOffset(TopicPartition topicPartition) { String partitionChannelKey = partitionChannelKey( - conn.getConnectorName(), - topicPartition.topic(), - topicPartition.partition(), - this.shouldUseConnectorNameInChannelName); + conn.getConnectorName(), topicPartition.topic(), topicPartition.partition()); if (partitionsToChannel.containsKey(partitionChannelKey)) { long offset = partitionsToChannel.get(partitionChannelKey).getOffsetSafeToCommitToKafka(); partitionsToChannel.get(partitionChannelKey).setLatestConsumerOffset(offset); @@ -400,10 +372,7 @@ public void close(Collection partitions) { topicPartition -> { final String partitionChannelKey = partitionChannelKey( - conn.getConnectorName(), - topicPartition.topic(), - topicPartition.partition(), - this.shouldUseConnectorNameInChannelName); + conn.getConnectorName(), topicPartition.topic(), topicPartition.partition()); TopicPartitionChannel topicPartitionChannel = partitionsToChannel.get(partitionChannelKey); // Check for null since it's possible that the something goes wrong even before the @@ -558,19 +527,11 @@ public Optional getMetricRegistry(String partitionChannelKey) { * or PROD) * @param topic topic name * @param partition partition number - * @param shouldUseConnectorNameInChannelName If true, use connectorName, else not. This is the - * new format for channel Name. * @return combinartion of topic and partition */ @VisibleForTesting - public static String partitionChannelKey( - String connectorName, - String topic, - int partition, - final boolean shouldUseConnectorNameInChannelName) { - return shouldUseConnectorNameInChannelName - ? connectorName + "_" + topic + "_" + partition - : topic + "_" + partition; + public static String partitionChannelKey(String connectorName, String topic, int partition) { + return connectorName + "_" + topic + "_" + partition; } /* Used for testing */ diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingUtils.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingUtils.java index b7ed370a6..2c6d877f7 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingUtils.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingUtils.java @@ -8,7 +8,6 @@ import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ErrorTolerance; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD; -import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD; import com.google.common.base.Strings; @@ -222,11 +221,6 @@ public static ImmutableMap validateStreamingSnowpipeConfig( BOOLEAN_VALIDATOR.ensureValid( ERRORS_LOG_ENABLE_CONFIG, inputConfig.get(ERRORS_LOG_ENABLE_CONFIG)); } - if (inputConfig.containsKey(SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT)) { - BOOLEAN_VALIDATOR.ensureValid( - SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT, - inputConfig.get(SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT)); - } // Valid schematization for Snowpipe Streaming invalidParams.putAll(validateSchematizationConfig(inputConfig)); diff --git a/src/test/java/com/snowflake/kafka/connector/ConnectorConfigTest.java b/src/test/java/com/snowflake/kafka/connector/ConnectorConfigTest.java index 219d62c35..da9198f11 100644 --- a/src/test/java/com/snowflake/kafka/connector/ConnectorConfigTest.java +++ b/src/test/java/com/snowflake/kafka/connector/ConnectorConfigTest.java @@ -867,34 +867,6 @@ public void testInvalidEnableOptimizeStreamingClientConfig() { } } - @Test - public void testEnableStreamingChannelFormatV2Config() { - Map config = getConfig(); - config.put( - SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT, - IngestionMethodConfig.SNOWPIPE_STREAMING.toString()); - config.put(SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT, "true"); - config.put(Utils.SF_ROLE, "ACCOUNTADMIN"); - Utils.validateConfig(config); - } - - @Test - public void testInvalidEnableStreamingChannelFormatV2Config() { - try { - Map config = getConfig(); - config.put( - SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT, - IngestionMethodConfig.SNOWPIPE_STREAMING.toString()); - config.put(Utils.SF_ROLE, "ACCOUNTADMIN"); - config.put(SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT, "yes"); - Utils.validateConfig(config); - } catch (SnowflakeKafkaConnectorException exception) { - assert exception - .getMessage() - .contains(SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT); - } - } - @Test public void testInvalidEmptyConfig() { try { diff --git a/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskStreamingTest.java b/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskStreamingTest.java index 7c77f53be..1399545ac 100644 --- a/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskStreamingTest.java +++ b/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskStreamingTest.java @@ -4,7 +4,6 @@ import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_TOLERANCE_CONFIG; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT; -import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT; import static com.snowflake.kafka.connector.internal.TestUtils.TEST_CONNECTOR_NAME; import static com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.partitionChannelKey; @@ -20,10 +19,8 @@ import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService; import com.snowflake.kafka.connector.records.RecordService; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import net.snowflake.ingest.streaming.InsertValidationResponse; import net.snowflake.ingest.streaming.OpenChannelRequest; @@ -39,29 +36,15 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; /** Unit test for testing Snowflake Sink Task Behavior with Snowpipe Streaming */ -@RunWith(Parameterized.class) public class SnowflakeSinkTaskStreamingTest { private String topicName; private static int partition = 0; private TopicPartition topicPartition; - private final boolean shouldUseConnectorNameInChannelName; - - @Parameterized.Parameters - public static List input() { - return Arrays.asList(Boolean.TRUE, Boolean.FALSE); - } - - public SnowflakeSinkTaskStreamingTest(boolean shouldUseConnectorNameInChannelName) { - this.shouldUseConnectorNameInChannelName = shouldUseConnectorNameInChannelName; - } - @Before public void setup() { topicName = TestUtils.randomTableName(); @@ -76,9 +59,6 @@ public void testSinkTaskInvalidRecord_InMemoryDLQ() throws Exception { config.put(INGESTION_METHOD_OPT, IngestionMethodConfig.SNOWPIPE_STREAMING.toString()); config.put(ERRORS_TOLERANCE_CONFIG, SnowflakeSinkConnectorConfig.ErrorTolerance.ALL.toString()); config.put(ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "test_DLQ"); - config.put( - SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT, - String.valueOf(this.shouldUseConnectorNameInChannelName)); InMemoryKafkaRecordErrorReporter errorReporter = new InMemoryKafkaRecordErrorReporter(); SnowflakeConnectionService mockConnectionService = Mockito.mock(SnowflakeConnectionServiceV1.class); @@ -108,11 +88,7 @@ public void testSinkTaskInvalidRecord_InMemoryDLQ() throws Exception { new TopicPartitionChannel( mockStreamingClient, topicPartition, - SnowflakeSinkServiceV2.partitionChannelKey( - TEST_CONNECTOR_NAME, - topicName, - partition, - this.shouldUseConnectorNameInChannelName), + SnowflakeSinkServiceV2.partitionChannelKey(TEST_CONNECTOR_NAME, topicName, partition), topicName, new StreamingBufferThreshold(10, 10_000, 1), config, @@ -122,12 +98,7 @@ public void testSinkTaskInvalidRecord_InMemoryDLQ() throws Exception { Map topicPartitionChannelMap = Collections.singletonMap( - partitionChannelKey( - TEST_CONNECTOR_NAME, - topicName, - partition, - this.shouldUseConnectorNameInChannelName), - topicPartitionChannel); + partitionChannelKey(TEST_CONNECTOR_NAME, topicName, partition), topicPartitionChannel); SnowflakeSinkServiceV2 mockSinkService = new SnowflakeSinkServiceV2( diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java index edd44ed30..64d70f49b 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java @@ -1,6 +1,5 @@ package com.snowflake.kafka.connector.internal.streaming; -import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT; import static com.snowflake.kafka.connector.internal.TestUtils.TEST_CONNECTOR_NAME; import static com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.partitionChannelKey; import static com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE; @@ -63,21 +62,14 @@ public class SnowflakeSinkServiceV2IT { // use OAuth as authenticator or not private boolean useOAuth; - private final boolean shouldUseConnectorNameInChannelName; - - @Parameterized.Parameters(name = "useOAuth: {0}, shouldUseConnectorNameInChannelName: {1}") + @Parameterized.Parameters(name = "useOAuth: {0}") public static Collection input() { - // TODO: Add {true, false} and {true, true} after SNOW-352846 is released - return Arrays.asList( - new Object[][] { - {false, false}, - {false, true} - }); + // TODO: Added {true} after SNOW-352846 is released + return Arrays.asList(new Object[][] {{false}}); } - public SnowflakeSinkServiceV2IT(boolean useOAuth, boolean shouldUseConnectorNameInChannelName) { + public SnowflakeSinkServiceV2IT(boolean useOAuth) { this.useOAuth = useOAuth; - this.shouldUseConnectorNameInChannelName = shouldUseConnectorNameInChannelName; if (!useOAuth) { conn = TestUtils.getConnectionServiceForStreaming(); } else { @@ -128,9 +120,6 @@ public void testSinkServiceV2Builder() { public void testChannelCloseIngestion() throws Exception { Map config = getConfig(); SnowflakeSinkConnectorConfig.setDefaultValues(config); - config.put( - SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT, - String.valueOf(this.shouldUseConnectorNameInChannelName)); conn.createTable(table); // opens a channel for partition 0, table and topic @@ -180,9 +169,6 @@ public void testStreamingIngest_multipleChannelPartitions_closeSubsetOfPartition throws Exception { Map config = TestUtils.getConfForStreaming(); SnowflakeSinkConnectorConfig.setDefaultValues(config); - config.put( - SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT, - String.valueOf(this.shouldUseConnectorNameInChannelName)); conn.createTable(table); TopicPartition tp1 = new TopicPartition(table, partition); TopicPartition tp2 = new TopicPartition(table, partition2); @@ -236,11 +222,7 @@ public void testStreamingIngest_multipleChannelPartitions_closeSubsetOfPartition Assert.assertTrue( snowflakeSinkServiceV2 .getTopicPartitionChannelFromCacheKey( - partitionChannelKey( - TEST_CONNECTOR_NAME, - tp2.topic(), - tp2.partition(), - this.shouldUseConnectorNameInChannelName)) + partitionChannelKey(TEST_CONNECTOR_NAME, tp2.topic(), tp2.partition())) .isPresent()); List newRecordsPartition1 = @@ -311,9 +293,6 @@ public void testRebalanceOpenCloseIngestion() throws Exception { public void testStreamingIngestion() throws Exception { Map config = getConfig(); SnowflakeSinkConnectorConfig.setDefaultValues(config); - config.put( - SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT, - String.valueOf(this.shouldUseConnectorNameInChannelName)); conn.createTable(table); // opens a channel for partition 0, table and topic @@ -378,9 +357,6 @@ public void testStreamingIngestion() throws Exception { public void testStreamingIngest_multipleChannelPartitions_withMetrics() throws Exception { Map config = getConfig(); SnowflakeSinkConnectorConfig.setDefaultValues(config); - config.put( - SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT, - String.valueOf(this.shouldUseConnectorNameInChannelName)); // set up telemetry service spy SnowflakeConnectionService connectionService = Mockito.spy(this.conn); @@ -437,11 +413,7 @@ public void testStreamingIngest_multipleChannelPartitions_withMetrics() throws E Map metricRegistry = service .getMetricRegistry( - SnowflakeSinkServiceV2.partitionChannelKey( - TEST_CONNECTOR_NAME, - topic, - partition, - this.shouldUseConnectorNameInChannelName)) + SnowflakeSinkServiceV2.partitionChannelKey(TEST_CONNECTOR_NAME, topic, partition)) .get() .getGauges(); assert metricRegistry.size() @@ -450,8 +422,7 @@ public void testStreamingIngest_multipleChannelPartitions_withMetrics() throws E // partition 1 this.verifyPartitionMetrics( metricRegistry, - partitionChannelKey( - TEST_CONNECTOR_NAME, topic, partition, this.shouldUseConnectorNameInChannelName), + partitionChannelKey(TEST_CONNECTOR_NAME, topic, partition), NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE, recordsInPartition1 - 1, recordsInPartition1, @@ -459,8 +430,7 @@ public void testStreamingIngest_multipleChannelPartitions_withMetrics() throws E this.conn.getConnectorName()); this.verifyPartitionMetrics( metricRegistry, - partitionChannelKey( - TEST_CONNECTOR_NAME, topic, partition2, this.shouldUseConnectorNameInChannelName), + partitionChannelKey(TEST_CONNECTOR_NAME, topic, partition2), NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE, recordsInPartition2 - 1, recordsInPartition2, @@ -476,8 +446,7 @@ public void testStreamingIngest_multipleChannelPartitions_withMetrics() throws E // verify metrics closed assert !service .getMetricRegistry( - SnowflakeSinkServiceV2.partitionChannelKey( - TEST_CONNECTOR_NAME, topic, partition, this.shouldUseConnectorNameInChannelName)) + SnowflakeSinkServiceV2.partitionChannelKey(TEST_CONNECTOR_NAME, topic, partition)) .isPresent(); Mockito.verify(telemetryService, Mockito.times(2)) @@ -1511,65 +1480,6 @@ public void testStreamingIngestion_invalid_file_version() throws Exception { } } - @Test - public void testStreamingIngestion_ChannelNameFormats() throws Exception { - Map config = TestUtils.getConfForStreaming(); - SnowflakeSinkConnectorConfig.setDefaultValues(config); - Map overriddenConfig = new HashMap<>(config); - - config.put( - SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT, - String.valueOf(this.shouldUseConnectorNameInChannelName)); - - conn.createTable(table); - // opens a channel for partition 0, table and topic - SnowflakeSinkService service = - SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config) - .setRecordNumber(1) - .setErrorReporter(new InMemoryKafkaRecordErrorReporter()) - .setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition))) - .addTask(table, new TopicPartition(topic, partition)) // Internally calls startTask - .build(); - - SnowflakeConverter converter = new SnowflakeJsonConverter(); - SchemaAndValue input = - converter.toConnectData(topic, "{\"name\":\"test\"}".getBytes(StandardCharsets.UTF_8)); - long offset = 0; - - SinkRecord record1 = - new SinkRecord( - topic, - partition, - Schema.STRING_SCHEMA, - "test_key" + offset, - input.schema(), - input.value(), - offset); - - // No need to verify results - service.insert(record1); - - SnowflakeSinkServiceV2 snowflakeSinkServiceV2 = (SnowflakeSinkServiceV2) service; - - TopicPartitionChannel channel = - snowflakeSinkServiceV2 - .getTopicPartitionChannelFromCacheKey( - partitionChannelKey( - TEST_CONNECTOR_NAME, - topic, - partition, - this.shouldUseConnectorNameInChannelName)) - .orElseThrow(RuntimeException::new); - assert channel - .getChannelName() - .toLowerCase() - .contains( - SnowflakeSinkServiceV2.partitionChannelKey( - TEST_CONNECTOR_NAME, topic, partition, this.shouldUseConnectorNameInChannelName) - .toLowerCase()); - service.closeAll(); - } - private void createNonNullableColumn(String tableName, String colName) { String createTableQuery = "alter table identifier(?) add " + colName + " int not null"; diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java index 81850347c..99bf7f30a 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java @@ -1,6 +1,5 @@ package com.snowflake.kafka.connector.internal.streaming; -import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT; import static com.snowflake.kafka.connector.internal.TestUtils.TEST_CONNECTOR_NAME; import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; @@ -12,8 +11,6 @@ import com.snowflake.kafka.connector.internal.TestUtils; import com.snowflake.kafka.connector.internal.streaming.telemetry.SnowflakeTelemetryServiceV2; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -26,10 +23,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -@RunWith(Parameterized.class) public class TopicPartitionChannelIT { private SnowflakeConnectionService conn = TestUtils.getConnectionServiceForStreaming(); @@ -40,17 +34,6 @@ public class TopicPartitionChannelIT { private TopicPartition topicPartition, topicPartition2; private String testChannelName, testChannelName2; - private final boolean shouldUseConnectorNameInChannelName; - - public TopicPartitionChannelIT(boolean shouldUseConnectorNameInChannelName) { - this.shouldUseConnectorNameInChannelName = shouldUseConnectorNameInChannelName; - } - - @Parameterized.Parameters(name = "shouldUseConnectorNameInChannelName: {0}") - public static Collection input() { - return Arrays.asList(new Object[][] {{true}, {false}}); - } - @Before public void beforeEach() { testTableName = TestUtils.randomTableName(); @@ -60,12 +43,10 @@ public void beforeEach() { topicPartition2 = new TopicPartition(topic, PARTITION_2); testChannelName = - SnowflakeSinkServiceV2.partitionChannelKey( - TEST_CONNECTOR_NAME, topic, PARTITION, this.shouldUseConnectorNameInChannelName); + SnowflakeSinkServiceV2.partitionChannelKey(TEST_CONNECTOR_NAME, topic, PARTITION); testChannelName2 = - SnowflakeSinkServiceV2.partitionChannelKey( - TEST_CONNECTOR_NAME, topic, PARTITION_2, this.shouldUseConnectorNameInChannelName); + SnowflakeSinkServiceV2.partitionChannelKey(TEST_CONNECTOR_NAME, topic, PARTITION_2); } @After @@ -77,9 +58,6 @@ public void afterEach() { public void testAutoChannelReopenOn_OffsetTokenSFException() throws Exception { Map config = TestUtils.getConfForStreaming(); SnowflakeSinkConnectorConfig.setDefaultValues(config); - config.put( - SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT, - String.valueOf(this.shouldUseConnectorNameInChannelName)); InMemorySinkTaskContext inMemorySinkTaskContext = new InMemorySinkTaskContext(Collections.singleton(topicPartition)); @@ -137,10 +115,6 @@ public void testAutoChannelReopenOn_OffsetTokenSFException() throws Exception { public void testInsertRowsOnChannelClosed() throws Exception { Map config = TestUtils.getConfForStreaming(); SnowflakeSinkConnectorConfig.setDefaultValues(config); - SnowflakeSinkConnectorConfig.setDefaultValues(config); - config.put( - SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT, - String.valueOf(this.shouldUseConnectorNameInChannelName)); InMemorySinkTaskContext inMemorySinkTaskContext = new InMemorySinkTaskContext(Collections.singleton(topicPartition)); @@ -205,10 +179,6 @@ public void testInsertRowsOnChannelClosed() throws Exception { public void testAutoChannelReopen_InsertRowsSFException() throws Exception { Map config = TestUtils.getConfForStreaming(); SnowflakeSinkConnectorConfig.setDefaultValues(config); - SnowflakeSinkConnectorConfig.setDefaultValues(config); - config.put( - SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT, - String.valueOf(this.shouldUseConnectorNameInChannelName)); InMemorySinkTaskContext inMemorySinkTaskContext = new InMemorySinkTaskContext(Collections.singleton(topicPartition)); @@ -297,10 +267,6 @@ public void testAutoChannelReopen_MultiplePartitionsInsertRowsSFException() thro Map config = TestUtils.getConfForStreaming(); SnowflakeSinkConnectorConfig.setDefaultValues(config); config.put(SnowflakeSinkConnectorConfig.ENABLE_STREAMING_CLIENT_OPTIMIZATION_CONFIG, "true"); - SnowflakeSinkConnectorConfig.setDefaultValues(config); - config.put( - SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT, - String.valueOf(this.shouldUseConnectorNameInChannelName)); InMemorySinkTaskContext inMemorySinkTaskContext = new InMemorySinkTaskContext(Collections.singleton(topicPartition)); @@ -420,10 +386,6 @@ public void testAutoChannelReopen_SinglePartitionsInsertRowsSFException() throws Map config = TestUtils.getConfForStreaming(); SnowflakeSinkConnectorConfig.setDefaultValues(config); config.put(SnowflakeSinkConnectorConfig.ENABLE_STREAMING_CLIENT_OPTIMIZATION_CONFIG, "true"); - SnowflakeSinkConnectorConfig.setDefaultValues(config); - config.put( - SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT, - String.valueOf(this.shouldUseConnectorNameInChannelName)); InMemorySinkTaskContext inMemorySinkTaskContext = new InMemorySinkTaskContext(Collections.singleton(topicPartition)); @@ -499,10 +461,6 @@ public void testSimpleInsertRowsFailureWithArrowBDECFormat() throws Exception { // add config which overrides the bdec file format Map overriddenConfig = new HashMap<>(TestUtils.getConfForStreaming()); overriddenConfig.put(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, "1"); - SnowflakeSinkConnectorConfig.setDefaultValues(overriddenConfig); - overriddenConfig.put( - SNOWFLAKE_ENABLE_NEW_CHANNEL_NAME_FORMAT, - String.valueOf(this.shouldUseConnectorNameInChannelName)); InMemorySinkTaskContext inMemorySinkTaskContext = new InMemorySinkTaskContext(Collections.singleton(topicPartition)); diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java index 88c127601..dd32d24f0 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java @@ -72,7 +72,7 @@ public class TopicPartitionChannelTest { private static final int PARTITION = 0; private static final String TEST_CHANNEL_NAME = - SnowflakeSinkServiceV2.partitionChannelKey(TEST_CONNECTOR_NAME, TOPIC, PARTITION, false); + SnowflakeSinkServiceV2.partitionChannelKey(TEST_CONNECTOR_NAME, TOPIC, PARTITION); private static final String TEST_TABLE_NAME = "TEST_TABLE"; private TopicPartition topicPartition; diff --git a/test/rest_request_template/test_snowpipe_streaming_channel_format_v2.json b/test/rest_request_template/test_snowpipe_streaming_channel_format_v2.json deleted file mode 100644 index ae6357717..000000000 --- a/test/rest_request_template/test_snowpipe_streaming_channel_format_v2.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "name": "SNOWFLAKE_CONNECTOR_NAME", - "config": { - "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector", - "topics": "SNOWFLAKE_TEST_TOPIC", - "tasks.max": "1", - "buffer.flush.time": "10", - "buffer.count.records": "100", - "buffer.size.bytes": "5000000", - "snowflake.url.name": "SNOWFLAKE_HOST", - "snowflake.user.name": "SNOWFLAKE_USER", - "snowflake.private.key": "SNOWFLAKE_PRIVATE_KEY", - "snowflake.database.name": "SNOWFLAKE_DATABASE", - "snowflake.schema.name": "SNOWFLAKE_SCHEMA", - "snowflake.role.name": "SNOWFLAKE_ROLE", - "snowflake.ingestion.method": "SNOWPIPE_STREAMING", - "key.converter": "org.apache.kafka.connect.storage.StringConverter", - "value.converter": "org.apache.kafka.connect.json.JsonConverter", - "value.converter.schemas.enable": "false", - "jmx": "true", - "snowflake.enable.new.channel.name.format": "true" - } -} \ No newline at end of file diff --git a/test/test_suit/test_snowpipe_streaming_channel_format_v2.py b/test/test_suit/test_snowpipe_streaming_channel_format_v2.py deleted file mode 100644 index 3c0433582..000000000 --- a/test/test_suit/test_snowpipe_streaming_channel_format_v2.py +++ /dev/null @@ -1,78 +0,0 @@ -import datetime - -from test_suit.test_utils import RetryableError, NonRetryableError -import json -from time import sleep - -class TestSnowpipeStreamingChannelFormatV2: - def __init__(self, driver, nameSalt): - self.driver = driver - self.fileName = "test_snowpipe_streaming_channel_format_v2" - self.topic = self.fileName + nameSalt - - self.topicNum = 1 - self.partitionNum = 3 - self.recordNum = 1000 - - # create topic and partitions in constructor since the post REST api will automatically create topic with only one partition - self.driver.createTopics(self.topic, partitionNum=self.partitionNum, replicationNum=1) - - def getConfigFileName(self): - return self.fileName + ".json" - - def send(self): - # create topic with n partitions and only one replication factor - print("Partition count:" + str(self.partitionNum)) - print("Topic:", self.topic) - - self.driver.describeTopic(self.topic) - - for p in range(self.partitionNum): - print("Sending in Partition:" + str(p)) - key = [] - value = [] - - for e in range(self.recordNum): - value.append(json.dumps( - {'numbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumber': str(e)} - ).encode('utf-8')) - - self.driver.sendBytesData(self.topic, value, key, partition=p) - sleep(2) - - def verify(self, round): - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] - print("Count records in table {}={}".format(self.topic, str(res))) - if res < (self.recordNum * self.partitionNum): - print("Topic:" + self.topic + " count is less, will retry") - raise RetryableError() - elif res > (self.recordNum * self.partitionNum): - print("Topic:" + self.topic + " count is more, duplicates detected") - raise NonRetryableError("Duplication occurred, number of record in table is larger than number of record sent") - else: - print("Table:" + self.topic + " count is exactly " + str(self.recordNum * self.partitionNum)) - - # for duplicates - res = self.driver.snowflake_conn.cursor().execute("Select record_metadata:\"offset\"::string as OFFSET_NO,record_metadata:\"partition\"::string as PARTITION_NO from {} group by OFFSET_NO, PARTITION_NO having count(*)>1".format(self.topic)).fetchone() - print("Duplicates:{}".format(res)) - if res is not None: - raise NonRetryableError("Duplication detected") - - # for uniqueness in offset numbers - rows = self.driver.snowflake_conn.cursor().execute("Select count(distinct record_metadata:\"offset\"::number) as UNIQUE_OFFSETS,record_metadata:\"partition\"::number as PARTITION_NO from {} group by PARTITION_NO order by PARTITION_NO".format(self.topic)).fetchall() - - if rows is None: - raise NonRetryableError("Unique offsets for partitions not found") - else: - assert len(rows) == self.partitionNum - - for p in range(self.partitionNum): - # unique offset count and partition no are two columns (returns tuple) - if rows[p][0] != self.recordNum or rows[p][1] != p: - raise NonRetryableError("Unique offsets for partitions count doesnt match") - - def clean(self): - # dropping of stage and pipe doesnt apply for snowpipe streaming. (It executes drop if exists) - self.driver.cleanTableStagePipe(self.topic) - return diff --git a/test/test_suites.py b/test/test_suites.py index 7430a86ef..8616babdd 100644 --- a/test/test_suites.py +++ b/test/test_suites.py @@ -45,7 +45,6 @@ from test_suit.test_string_avrosr import TestStringAvrosr from test_suit.test_string_json import TestStringJson from test_suit.test_string_json_ignore_tombstone import TestStringJsonIgnoreTombstone -from test_suit.test_snowpipe_streaming_channel_format_v2 import TestSnowpipeStreamingChannelFormatV2 class EndToEndTestSuite: @@ -199,6 +198,10 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS test_instance=TestSchemaNotSupportedConverter(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True )), + ("TestSchemaEvolutionDropTable", EndToEndTestSuite( + test_instance=TestSchemaEvolutionDropTable(driver, nameSalt), clean=True, run_in_confluent=True, + run_in_apache=True + )), ("TestKcDeleteCreate", EndToEndTestSuite( test_instance=TestKcDeleteCreate(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True )), @@ -236,9 +239,5 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS ("TestKcRestart", EndToEndTestSuite( test_instance=TestKcRestart(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True )), - ("TestSnowpipeStreamingChannelFormatV2", EndToEndTestSuite( - test_instance=TestSnowpipeStreamingChannelFormatV2(driver, nameSalt), clean=True, run_in_confluent=True, - run_in_apache=True - )), ]) return test_suites