diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java index 0ad7a51b3..50c6d6d1c 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java @@ -168,6 +168,17 @@ public class SnowflakeSinkConnectorConfig { "Whether to optimize the streaming client to reduce cost. Note that this may affect" + " throughput or latency and can only be set if Streaming Snowpipe is enabled"; + public static final String ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG = + "enable.streaming.channel.offset.migration"; + public static final String ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DISPLAY = + "Enable Streaming Channel Offset Migration"; + public static final boolean ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DEFAULT = true; + public static final String ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DOC = + "This config is used to enable/disable streaming channel offset migration logic. If true, we" + + " will migrate offset token from channel name format V2 to name format v1. V2 channel" + + " format is deprecated and V1 will be used always, disabling this config could have" + + " ramifications. Please consult Snowflake support before setting this to false."; + // MDC logging header public static final String ENABLE_MDC_LOGGING_CONFIG = "enable.mdc.logging"; public static final String ENABLE_MDC_LOGGING_DISPLAY = "Enable MDC logging"; @@ -591,7 +602,17 @@ static ConfigDef newConfigDef() { CONNECTOR_CONFIG, 8, ConfigDef.Width.NONE, - ENABLE_MDC_LOGGING_DISPLAY); + ENABLE_MDC_LOGGING_DISPLAY) + .define( + ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG, + Type.BOOLEAN, + ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DEFAULT, + Importance.LOW, + ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DOC, + CONNECTOR_CONFIG, + 9, + ConfigDef.Width.NONE, + ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DISPLAY); } public static class TopicToTableValidator implements ConfigDef.Validator { diff --git a/src/main/java/com/snowflake/kafka/connector/Utils.java b/src/main/java/com/snowflake/kafka/connector/Utils.java index b1c711486..750236625 100644 --- a/src/main/java/com/snowflake/kafka/connector/Utils.java +++ b/src/main/java/com/snowflake/kafka/connector/Utils.java @@ -454,6 +454,14 @@ static String validateConfig(Map config) { "Streaming client optimization is only available with {}.", IngestionMethodConfig.SNOWPIPE_STREAMING.toString())); } + if (config.containsKey( + SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG)) { + invalidConfigParams.put( + SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG, + Utils.formatString( + "Streaming client Channel migration is only available with {}.", + IngestionMethodConfig.SNOWPIPE_STREAMING.toString())); + } } if (config.containsKey(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java index dad2ec22b..a8e61bf83 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java @@ -1,5 +1,6 @@ package com.snowflake.kafka.connector.internal; +import com.snowflake.kafka.connector.internal.streaming.ChannelMigrateOffsetTokenResponseDTO; import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService; import java.sql.Connection; import java.util.List; @@ -282,4 +283,28 @@ public interface SnowflakeConnectionService { * @param tableName table name */ void createTableWithOnlyMetadataColumn(String tableName); + + /** + * Migrate Streaming Channel offsetToken from a source Channel to a destination channel. + * + *

Here, source channel is the new channel format we created here * @see Commit + * + * + *

Destination channel is the original Format containing only topicName and partition number. + * + *

We catch SQLException and JsonProcessingException that might happen in this method. The + * caller should always open the Old Channel format. This old channel format will also be the key + * to many HashMaps we will create. (For instance {@link + * com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2#partitionsToChannel}) + * + * @param tableName Name of the table + * @param sourceChannelName sourceChannel name from where the offset Token will be fetched. + * Channel with this name will also be deleted. + * @param destinationChannelName destinationChannel name to where the offsetToken will be copied + * over. + * @return The DTO serialized from the migration response. + */ + ChannelMigrateOffsetTokenResponseDTO migrateStreamingChannelOffsetToken( + String tableName, String sourceChannelName, String destinationChannelName); } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java index 8457c31d3..4c8ec3ed7 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java @@ -3,7 +3,12 @@ import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_CONTENT; import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_METADATA; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.snowflake.kafka.connector.Utils; +import com.snowflake.kafka.connector.internal.streaming.ChannelMigrateOffsetTokenResponseDTO; +import com.snowflake.kafka.connector.internal.streaming.ChannelMigrationResponseCode; import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; import com.snowflake.kafka.connector.internal.streaming.SchematizationUtils; import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService; @@ -56,6 +61,8 @@ public class SnowflakeConnectionServiceV1 implements SnowflakeConnectionService // User agent suffix we want to pass in to ingest service public static final String USER_AGENT_SUFFIX_FORMAT = "SFKafkaConnector/%s provider/%s"; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + SnowflakeConnectionServiceV1( Properties prop, SnowflakeURL url, @@ -1006,4 +1013,78 @@ public Connection getConnection() { public SnowflakeInternalStage getInternalStage() { return this.internalStage; } + + @Override + public ChannelMigrateOffsetTokenResponseDTO migrateStreamingChannelOffsetToken( + String tableName, String sourceChannelName, String destinationChannelName) { + InternalUtils.assertNotEmpty("tableName", tableName); + InternalUtils.assertNotEmpty("sourceChannelName", sourceChannelName); + InternalUtils.assertNotEmpty("destinationChannelName", destinationChannelName); + String fullyQualifiedTableName = + prop.getProperty(InternalUtils.JDBC_DATABASE) + + "." + + prop.getProperty(InternalUtils.JDBC_SCHEMA) + + "." + + tableName; + String query = "select SYSTEM$SNOWPIPE_STREAMING_MIGRATE_CHANNEL_OFFSET_TOKEN((?), (?), (?));"; + + try { + PreparedStatement stmt = conn.prepareStatement(query); + stmt.setString(1, fullyQualifiedTableName); + stmt.setString(2, sourceChannelName); + stmt.setString(3, destinationChannelName); + ResultSet resultSet = stmt.executeQuery(); + + String migrateOffsetTokenResultFromSysFunc = null; + if (resultSet.next()) { + migrateOffsetTokenResultFromSysFunc = resultSet.getString(1 /*Only one column*/); + } + if (migrateOffsetTokenResultFromSysFunc == null) { + final String errorMsg = + String.format( + "No result found in Migrating OffsetToken through System Function for tableName:%s," + + " sourceChannel:%s, destinationChannel:%s", + fullyQualifiedTableName, sourceChannelName, destinationChannelName); + throw SnowflakeErrors.ERROR_5023.getException(errorMsg, this.telemetry); + } + + ChannelMigrateOffsetTokenResponseDTO channelMigrateOffsetTokenResponseDTO = + getChannelMigrateOffsetTokenResponseDTO(migrateOffsetTokenResultFromSysFunc); + + LOGGER.info( + "Migrate OffsetToken response for table:{}, sourceChannel:{}, destinationChannel:{}" + + " is:{}", + tableName, + sourceChannelName, + destinationChannelName, + channelMigrateOffsetTokenResponseDTO); + if (!ChannelMigrationResponseCode.isChannelMigrationResponseSuccessful( + channelMigrateOffsetTokenResponseDTO)) { + throw SnowflakeErrors.ERROR_5023.getException( + ChannelMigrationResponseCode.getMessageByCode( + channelMigrateOffsetTokenResponseDTO.getResponseCode()), + this.telemetry); + } + return channelMigrateOffsetTokenResponseDTO; + } catch (SQLException | JsonProcessingException e) { + final String errorMsg = + String.format( + "Migrating OffsetToken for a SourceChannel:%s in table:%s failed due to" + + " exceptionMessage:%s and stackTrace:%s", + sourceChannelName, + fullyQualifiedTableName, + e.getMessage(), + Arrays.toString(e.getStackTrace())); + throw SnowflakeErrors.ERROR_5023.getException(errorMsg, this.telemetry); + } + } + + @VisibleForTesting + protected ChannelMigrateOffsetTokenResponseDTO getChannelMigrateOffsetTokenResponseDTO( + String migrateOffsetTokenResultFromSysFunc) throws JsonProcessingException { + ChannelMigrateOffsetTokenResponseDTO channelMigrateOffsetTokenResponseDTO = + OBJECT_MAPPER.readValue( + migrateOffsetTokenResultFromSysFunc, ChannelMigrateOffsetTokenResponseDTO.class); + return channelMigrateOffsetTokenResponseDTO; + } } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java index e3eadc238..f0f10aa20 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java @@ -297,7 +297,13 @@ public enum SnowflakeErrors { "5021", "Failed to get data schema", "Failed to get data schema. Unrecognizable data type in JSON object"), - ERROR_5022("5022", "Invalid column name", "Failed to find column in the schema"); + ERROR_5022("5022", "Invalid column name", "Failed to find column in the schema"), + + ERROR_5023( + "5023", + "Failure in Streaming Channel Offset Migration Response", + "Streaming Channel Offset Migration from Source to Destination Channel has no/invalid" + + " response, please contact Snowflake Support"); // properties diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/ChannelMigrateOffsetTokenResponseDTO.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/ChannelMigrateOffsetTokenResponseDTO.java new file mode 100644 index 000000000..13e36f5f5 --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/ChannelMigrateOffsetTokenResponseDTO.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2023 Snowflake Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.snowflake.kafka.connector.internal.streaming; + +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * POJO used to serialize the System function response for migration offset from Source Channel to + * Destination. + */ +public class ChannelMigrateOffsetTokenResponseDTO { + private long responseCode; + + private String responseMessage; + + public ChannelMigrateOffsetTokenResponseDTO(long responseCode, String responseMessage) { + this.responseCode = responseCode; + this.responseMessage = responseMessage; + } + + /** Default Ctor for Jackson */ + public ChannelMigrateOffsetTokenResponseDTO() {} + + @JsonProperty("responseCode") + public long getResponseCode() { + return responseCode; + } + + @JsonProperty("responseMessage") + public String getResponseMessage() { + return responseMessage; + } + + @Override + public String toString() { + return "ChannelMigrateOffsetTokenResponseDTO{" + + "responseCode=" + + responseCode + + ", responseMessage='" + + responseMessage + + '\'' + + '}'; + } +} diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/ChannelMigrationResponseCode.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/ChannelMigrationResponseCode.java new file mode 100644 index 000000000..a63d7a1f9 --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/ChannelMigrationResponseCode.java @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2023 Snowflake Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.snowflake.kafka.connector.internal.streaming; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Response code sent from the system function to migrate offsets from Source to Destination + * Channel. Please keep this code(values) in sync with what the server side is assigned. + */ +public enum ChannelMigrationResponseCode { + ERR_TABLE_DOES_NOT_EXIST_NOT_AUTHORIZED( + 4, "The supplied table does not exist or is not authorized"), + + SUCCESS(50, "Success"), + + OFFSET_MIGRATION_SOURCE_CHANNEL_DOES_NOT_EXIST( + 51, "Source Channel does not exist for Offset Migration"), + + CHANNEL_OFFSET_TOKEN_MIGRATION_GENERAL_EXCEPTION( + 52, "Snowflake experienced a transient exception, please retry the migration request."), + + OFFSET_MIGRATION_SOURCE_AND_DESTINATION_CHANNEL_SAME( + 53, "Source and Destination Channel are same for Migration Offset Request"), + ; + + private final long statusCode; + + private final String message; + + public static final String UNKNOWN_STATUS_MESSAGE = + "Unknown status message. Please contact Snowflake support for further assistance"; + + ChannelMigrationResponseCode(int statusCode, String message) { + this.statusCode = statusCode; + this.message = message; + } + + @VisibleForTesting + public long getStatusCode() { + return statusCode; + } + + @VisibleForTesting + public String getMessage() { + return message; + } + + public static String getMessageByCode(Long statusCode) { + if (statusCode != null) { + for (ChannelMigrationResponseCode code : values()) { + if (code.statusCode == statusCode) { + return code.message; + } + } + } + return UNKNOWN_STATUS_MESSAGE; + } + + /** + * Given a response code which was received from server side, check if it as successful migration + * or a failure. + * + * @param statusCodeToCheck response from JDBC system function call. + * @return true or false + */ + private static boolean isStatusCodeSuccessful(final long statusCodeToCheck) { + return statusCodeToCheck == SUCCESS.getStatusCode() + || statusCodeToCheck == OFFSET_MIGRATION_SOURCE_CHANNEL_DOES_NOT_EXIST.getStatusCode(); + } + + /** + * Given a Response DTO, which was received from server side and serialized, check if it as + * successful migration or a failure. + * + * @param channelMigrateOffsetTokenResponseDTO response from JDBC system function call serialized + * into a DTO object. + * @return true or false + */ + public static boolean isChannelMigrationResponseSuccessful( + final ChannelMigrateOffsetTokenResponseDTO channelMigrateOffsetTokenResponseDTO) { + return isStatusCodeSuccessful(channelMigrateOffsetTokenResponseDTO.getResponseCode()); + } +} diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java index 6a5e8bbed..f5f5bc93c 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java @@ -92,7 +92,7 @@ public class SnowflakeSinkServiceV2 implements SnowflakeSinkService { private boolean enableSchematization; /** - * Key is formulated in {@link #partitionChannelKey(String, String, int)} } + * Key is formulated in {@link #partitionChannelKey(String, int)} } * *

value is the Streaming Ingest Channel implementation (Wrapped around TopicPartitionChannel) */ @@ -235,8 +235,7 @@ private void createStreamingChannelForTopicPartition( final TopicPartition topicPartition, boolean hasSchemaEvolutionPermission) { final String partitionChannelKey = - partitionChannelKey( - conn.getConnectorName(), topicPartition.topic(), topicPartition.partition()); + partitionChannelKey(topicPartition.topic(), topicPartition.partition()); // Create new instance of TopicPartitionChannel which will always open the channel. partitionsToChannel.put( partitionChannelKey, @@ -295,8 +294,7 @@ public void insert(Collection records) { */ @Override public void insert(SinkRecord record) { - String partitionChannelKey = - partitionChannelKey(this.conn.getConnectorName(), record.topic(), record.kafkaPartition()); + String partitionChannelKey = partitionChannelKey(record.topic(), record.kafkaPartition()); // init a new topic partition if it's not presented in cache or if channel is closed if (!partitionsToChannel.containsKey(partitionChannelKey) || partitionsToChannel.get(partitionChannelKey).isChannelClosed()) { @@ -316,8 +314,7 @@ public void insert(SinkRecord record) { @Override public long getOffset(TopicPartition topicPartition) { String partitionChannelKey = - partitionChannelKey( - conn.getConnectorName(), topicPartition.topic(), topicPartition.partition()); + partitionChannelKey(topicPartition.topic(), topicPartition.partition()); if (partitionsToChannel.containsKey(partitionChannelKey)) { long offset = partitionsToChannel.get(partitionChannelKey).getOffsetSafeToCommitToKafka(); partitionsToChannel.get(partitionChannelKey).setLatestConsumerOffset(offset); @@ -371,8 +368,7 @@ public void close(Collection partitions) { partitions.forEach( topicPartition -> { final String partitionChannelKey = - partitionChannelKey( - conn.getConnectorName(), topicPartition.topic(), topicPartition.partition()); + partitionChannelKey(topicPartition.topic(), topicPartition.partition()); TopicPartitionChannel topicPartitionChannel = partitionsToChannel.get(partitionChannelKey); // Check for null since it's possible that the something goes wrong even before the @@ -382,7 +378,7 @@ public void close(Collection partitions) { } LOGGER.info( "Closing partitionChannel:{}, partition:{}, topic:{}", - topicPartitionChannel == null ? null : topicPartitionChannel.getChannelName(), + topicPartitionChannel == null ? null : topicPartitionChannel.getChannelNameFormatV1(), topicPartition.topic(), topicPartition.partition()); partitionsToChannel.remove(partitionChannelKey); @@ -521,22 +517,18 @@ public Optional getMetricRegistry(String partitionChannelKey) { /** * Gets a unique identifier consisting of connector name, topic name and partition number. * - * @param connectorName Connector name is always unique. (Two connectors with same name won't be - * allowed by Connector Framework) - *

Note: Customers can have same named connector in different connector runtimes (Like DEV - * or PROD) * @param topic topic name * @param partition partition number * @return combinartion of topic and partition */ @VisibleForTesting - public static String partitionChannelKey(String connectorName, String topic, int partition) { - return connectorName + "_" + topic + "_" + partition; + public static String partitionChannelKey(String topic, int partition) { + return topic + "_" + partition; } /* Used for testing */ @VisibleForTesting - SnowflakeStreamingIngestClient getStreamingIngestClient() { + public SnowflakeStreamingIngestClient getStreamingIngestClient() { return StreamingClientProvider.getStreamingClientProviderInstance() .getClient(this.connectorConfig); } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java index 83c758f03..fbd058e7d 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java @@ -1,5 +1,7 @@ package com.snowflake.kafka.connector.internal.streaming; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DEFAULT; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_TOLERANCE_CONFIG; import static com.snowflake.kafka.connector.internal.streaming.StreamingUtils.DURATION_BETWEEN_GET_OFFSET_TOKEN_RETRY; @@ -11,6 +13,7 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; import com.snowflake.kafka.connector.Utils; import com.snowflake.kafka.connector.dlq.KafkaRecordErrorReporter; import com.snowflake.kafka.connector.internal.BufferThreshold; @@ -134,7 +137,7 @@ public class TopicPartitionChannel { private final TopicPartition topicPartition; /* Channel Name is computed from topic and partition */ - private final String channelName; + private final String channelNameFormatV1; /* table is required for opening the channel */ private final String tableName; @@ -190,24 +193,25 @@ public class TopicPartitionChannel { public TopicPartitionChannel( SnowflakeStreamingIngestClient streamingIngestClient, TopicPartition topicPartition, - final String channelName, + final String channelNameFormatV1, final String tableName, final BufferThreshold streamingBufferThreshold, final Map sfConnectorConfig, KafkaRecordErrorReporter kafkaRecordErrorReporter, SinkTaskContext sinkTaskContext, + SnowflakeConnectionService conn, SnowflakeTelemetryService telemetryService) { this( streamingIngestClient, topicPartition, - channelName, + channelNameFormatV1, tableName, false, /* No schema evolution permission */ streamingBufferThreshold, sfConnectorConfig, kafkaRecordErrorReporter, sinkTaskContext, - null, /* Null Connection */ + conn, new RecordService(telemetryService), telemetryService, false, @@ -218,7 +222,7 @@ public TopicPartitionChannel( * @param streamingIngestClient client created specifically for this task * @param topicPartition topic partition corresponding to this Streaming Channel * (TopicPartitionChannel) - * @param channelName channel Name which is deterministic for topic and partition + * @param channelNameFormatV1 channel Name which is deterministic for topic and partition * @param tableName table to ingest in snowflake * @param hasSchemaEvolutionPermission if the role has permission to perform schema evolution on * the table @@ -234,7 +238,7 @@ public TopicPartitionChannel( public TopicPartitionChannel( SnowflakeStreamingIngestClient streamingIngestClient, TopicPartition topicPartition, - final String channelName, + final String channelNameFormatV1, final String tableName, boolean hasSchemaEvolutionPermission, final BufferThreshold streamingBufferThreshold, @@ -251,7 +255,7 @@ public TopicPartitionChannel( this.streamingIngestClient = Preconditions.checkNotNull(streamingIngestClient); Preconditions.checkState(!streamingIngestClient.isClosed()); this.topicPartition = Preconditions.checkNotNull(topicPartition); - this.channelName = Preconditions.checkNotNull(channelName); + this.channelNameFormatV1 = Preconditions.checkNotNull(channelNameFormatV1); this.tableName = Preconditions.checkNotNull(tableName); this.streamingBufferThreshold = Preconditions.checkNotNull(streamingBufferThreshold); this.sfConnectorConfig = Preconditions.checkNotNull(sfConnectorConfig); @@ -278,6 +282,14 @@ public TopicPartitionChannel( this.enableSchemaEvolution = this.enableSchematization && hasSchemaEvolutionPermission; + if (isEnableChannelOffsetMigration(sfConnectorConfig)) { + /* Channel Name format V2 is computed from connector name, topic and partition */ + final String channelNameFormatV2 = + generateChannelNameFormatV2(this.channelNameFormatV1, this.conn.getConnectorName()); + conn.migrateStreamingChannelOffsetToken( + this.tableName, channelNameFormatV2, this.channelNameFormatV1); + } + // Open channel and reset the offset in kafka this.channel = Preconditions.checkNotNull(openChannelForTable()); final long lastCommittedOffsetToken = fetchOffsetTokenWithRetry(); @@ -293,7 +305,7 @@ public TopicPartitionChannel( new SnowflakeTelemetryChannelStatus( tableName, connectorName, - channelName, + channelNameFormatV1, startTime, enableCustomJMXMonitoring, metricsJmxReporter, @@ -301,7 +313,7 @@ public TopicPartitionChannel( this.processedOffset, this.latestConsumerOffset); this.telemetryServiceV2.reportKafkaPartitionStart( - new SnowflakeTelemetryChannelCreation(this.tableName, this.channelName, startTime)); + new SnowflakeTelemetryChannelCreation(this.tableName, this.channelNameFormatV1, startTime)); if (lastCommittedOffsetToken != NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) { this.sinkTaskContext.offset(this.topicPartition, lastCommittedOffsetToken + 1L); @@ -309,8 +321,51 @@ public TopicPartitionChannel( LOGGER.info( "TopicPartitionChannel:{}, offset token is NULL, will rely on Kafka to send us the" + " correct offset instead", - this.getChannelName()); + this.getChannelNameFormatV1()); + } + } + + /** + * Checks if the configuration provided in Snowflake Kafka Connect has set {@link + * SnowflakeSinkConnectorConfig#ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG} to any value. If not + * set, it fetches the default value. + * + *

If the returned is false, system function for channel offset migration will not be called + * and Channel name will use V1 format. + * + * @param sfConnectorConfig customer provided json config + * @return true is enabled, false otherwise + */ + private boolean isEnableChannelOffsetMigration(Map sfConnectorConfig) { + boolean isEnableChannelOffsetMigration = + Boolean.parseBoolean( + sfConnectorConfig.getOrDefault( + SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG, + Boolean.toString(ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DEFAULT))); + if (!isEnableChannelOffsetMigration) { + LOGGER.info( + "Config:{} is disabled for connector:{}", + ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG, + conn.getConnectorName()); } + return isEnableChannelOffsetMigration; + } + + /** + * This is the new channel Name format that was created. New channel name prefixes connector name + * in old format. Please note, we will not open channel with new format. We will run a migration + * function from this new channel format to old channel format and drop new channel format. + * + * @param channelNameFormatV1 Original format used. + * @param connectorName connector name used in SF config JSON. + * @return new channel name introduced as part of @see + * this change (released in version 2.1.0) + */ + @VisibleForTesting + public static String generateChannelNameFormatV2( + String channelNameFormatV1, String connectorName) { + return connectorName + "_" + channelNameFormatV1; } /** @@ -358,7 +413,7 @@ public void insertRecordToBuffer(SinkRecord kafkaSinkRecord) { "Flush based on buffered bytes or buffered number of records for" + " channel:{},currentBufferSizeInBytes:{}, currentBufferedRecordCount:{}," + " connectorBufferThresholds:{}", - this.getChannelName(), + this.getChannelNameFormatV1(), copiedStreamingBuffer.getBufferSizeBytes(), copiedStreamingBuffer.getSinkRecords().size(), this.streamingBufferThreshold); @@ -377,7 +432,7 @@ public void insertRecordToBuffer(SinkRecord kafkaSinkRecord) { "Skip adding offset:{} to buffer for channel:{} because" + " offsetPersistedInSnowflake:{}, processedOffset:{}", kafkaSinkRecord.kafkaOffset(), - this.getChannelName(), + this.getChannelNameFormatV1(), currentOffsetPersistedInSnowflake, currentProcessedOffset); } @@ -408,7 +463,7 @@ private boolean shouldIgnoreAddingRecordToBuffer( LOGGER.debug( "Got the desired offset:{} from Kafka, we can add this offset to buffer for channel:{}", kafkaSinkRecord.kafkaOffset(), - this.getChannelName()); + this.getChannelNameFormatV1()); isOffsetResetInKafka = false; return false; } else { @@ -416,7 +471,7 @@ private boolean shouldIgnoreAddingRecordToBuffer( "Ignore adding offset:{} to buffer for channel:{} because we recently encountered" + " error and reset offset in Kafka. currentProcessedOffset:{}", kafkaSinkRecord.kafkaOffset(), - this.getChannelName(), + this.getChannelNameFormatV1(), currentProcessedOffset); return true; } @@ -499,7 +554,7 @@ protected void insertBufferedRecordsIfFlushTimeThresholdReached() { LOGGER.debug( "Time based flush for channel:{}, CurrentTimeMs:{}, previousFlushTimeMs:{}," + " bufferThresholdSeconds:{}", - this.getChannelName(), + this.getChannelNameFormatV1(), System.currentTimeMillis(), this.previousFlushTimeStampMs, this.streamingBufferThreshold.getFlushTimeThresholdSeconds()); @@ -525,7 +580,7 @@ protected void insertBufferedRecordsIfFlushTimeThresholdReached() { InsertRowsResponse insertBufferedRecords(StreamingBuffer streamingBufferToInsert) { // intermediate buffer can be empty here if time interval reached but kafka produced no records. if (streamingBufferToInsert.isEmpty()) { - LOGGER.debug("No Rows Buffered for channel:{}, returning", this.getChannelName()); + LOGGER.debug("No Rows Buffered for channel:{}, returning", this.getChannelNameFormatV1()); this.previousFlushTimeStampMs = System.currentTimeMillis(); return null; } @@ -538,7 +593,7 @@ InsertRowsResponse insertBufferedRecords(StreamingBuffer streamingBufferToInsert LOGGER.info( "Successfully called insertRows for channel:{}, buffer:{}, insertResponseHasErrors:{}," + " needToResetOffset:{}", - this.getChannelName(), + this.getChannelNameFormatV1(), streamingBufferToInsert, response.hasErrors(), response.needToResetOffset()); @@ -559,7 +614,7 @@ InsertRowsResponse insertBufferedRecords(StreamingBuffer streamingBufferToInsert LOGGER.warn( String.format( "[INSERT_BUFFERED_RECORDS] Failure inserting buffer:%s for channel:%s", - streamingBufferToInsert, this.getChannelName()), + streamingBufferToInsert, this.getChannelNameFormatV1()), ex); } return response; @@ -600,7 +655,7 @@ private InsertRowsResponse insertRowsWithFallback(StreamingBuffer buffer) { String.format( "%s Failed to open Channel or fetching offsetToken for channel:%s", StreamingApiFallbackInvoker.INSERT_ROWS_FALLBACK, - this.getChannelName()), + this.getChannelNameFormatV1()), event.getException())) .build(); @@ -732,7 +787,7 @@ private void insertRowsFallbackSupplier(Throwable ex) String.format( "%s Failed to insert rows for channel:%s. Recovered offset from Snowflake is:%s", StreamingApiFallbackInvoker.INSERT_ROWS_FALLBACK, - this.getChannelName(), + this.getChannelNameFormatV1(), offsetRecoveredFromSnowflake), ex); } @@ -867,7 +922,7 @@ protected long fetchOffsetTokenWithRetry() { LOGGER.error( "[OFFSET_TOKEN_FALLBACK] Failed to open Channel/fetch offsetToken for" + " channel:{}, exception:{}", - this.getChannelName(), + this.getChannelNameFormatV1(), event.getException().toString())) .build(); @@ -878,7 +933,7 @@ protected long fetchOffsetTokenWithRetry() { LOGGER.error( "[OFFSET_TOKEN_RETRY_FAILSAFE] Failure to fetch offsetToken even after retry" + " and fallback from snowflake for channel:{}, elapsedTimeSeconds:{}", - this.getChannelName(), + this.getChannelNameFormatV1(), event.getElapsedTime().get(SECONDS), event.getException())) .compose(offsetTokenRetryPolicy) @@ -928,7 +983,7 @@ private void resetChannelMetadataAfterRecovery( "{} Channel:{}, offset token is NULL, will use the consumer offset managed by the" + " connector instead, consumer offset:{}", streamingApiFallbackInvoker, - this.getChannelName(), + this.getChannelNameFormatV1(), latestConsumerOffset); } @@ -949,7 +1004,7 @@ private void resetChannelMetadataAfterRecovery( "[RESET_PARTITION] Emptying current buffer:{} for Channel:{} due to reset of offsets in" + " kafka", this.streamingBuffer, - this.getChannelName()); + this.getChannelNameFormatV1()); this.streamingBuffer = new StreamingBuffer(); // Reset Offset in kafka for this topic partition. @@ -969,7 +1024,7 @@ private void resetChannelMetadataAfterRecovery( LOGGER.warn( "{} Channel:{}, OffsetRecoveredFromSnowflake:{}, reset kafka offset to:{}", streamingApiFallbackInvoker, - this.getChannelName(), + this.getChannelNameFormatV1(), offsetRecoveredFromSnowflake, offsetToResetInKafka); } @@ -985,12 +1040,13 @@ private void resetChannelMetadataAfterRecovery( */ private long getRecoveredOffsetFromSnowflake( final StreamingApiFallbackInvoker streamingApiFallbackInvoker) { - LOGGER.warn("{} Re-opening channel:{}", streamingApiFallbackInvoker, this.getChannelName()); + LOGGER.warn( + "{} Re-opening channel:{}", streamingApiFallbackInvoker, this.getChannelNameFormatV1()); this.channel = Preconditions.checkNotNull(openChannelForTable()); LOGGER.warn( "{} Fetching offsetToken after re-opening the channel:{}", streamingApiFallbackInvoker, - this.getChannelName()); + this.getChannelNameFormatV1()); return fetchLatestCommittedOffsetFromSnowflake(); } @@ -1005,12 +1061,15 @@ private long getRecoveredOffsetFromSnowflake( * snowflake. */ private long fetchLatestCommittedOffsetFromSnowflake() { - LOGGER.debug("Fetching last committed offset for partition channel:{}", this.getChannelName()); + LOGGER.debug( + "Fetching last committed offset for partition channel:{}", this.getChannelNameFormatV1()); String offsetToken = null; try { offsetToken = this.channel.getLatestCommittedOffsetToken(); LOGGER.info( - "Fetched offsetToken for channelName:{}, offset:{}", this.getChannelName(), offsetToken); + "Fetched offsetToken for channelName:{}, offset:{}", + this.getChannelNameFormatV1(), + offsetToken); return offsetToken == null ? NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE : Long.parseLong(offsetToken); @@ -1018,7 +1077,7 @@ private long fetchLatestCommittedOffsetFromSnowflake() { LOGGER.error( "The offsetToken string does not contain a parsable long:{} for channel:{}", offsetToken, - this.getChannelName()); + this.getChannelNameFormatV1()); throw new ConnectException(ex); } } @@ -1038,14 +1097,16 @@ private long fetchLatestCommittedOffsetFromSnowflake() { */ private SnowflakeStreamingIngestChannel openChannelForTable() { OpenChannelRequest channelRequest = - OpenChannelRequest.builder(this.channelName) + OpenChannelRequest.builder(this.channelNameFormatV1) .setDBName(this.sfConnectorConfig.get(Utils.SF_DATABASE)) .setSchemaName(this.sfConnectorConfig.get(Utils.SF_SCHEMA)) .setTableName(this.tableName) .setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE) .build(); LOGGER.info( - "Opening a channel with name:{} for table name:{}", this.channelName, this.tableName); + "Opening a channel with name:{} for table name:{}", + this.channelNameFormatV1, + this.tableName); return streamingIngestClient.openChannel(channelRequest); } @@ -1064,7 +1125,7 @@ public void closeChannel() { final String errMsg = String.format( "Failure closing Streaming Channel name:%s msg:%s", - this.getChannelName(), e.getMessage()); + this.getChannelNameFormatV1(), e.getMessage()); this.telemetryServiceV2.reportKafkaConnectFatalError(errMsg); LOGGER.error(errMsg, e); } @@ -1085,7 +1146,7 @@ public long getPreviousFlushTimeStampMs() { return previousFlushTimeStampMs; } - public String getChannelName() { + public String getChannelNameFormatV1() { return this.channel.getFullyQualifiedName(); } @@ -1094,7 +1155,7 @@ public String toString() { return MoreObjects.toStringHelper(this) .add("previousFlushTimeStampMs", this.previousFlushTimeStampMs) .add("offsetPersistedInSnowflake", this.offsetPersistedInSnowflake) - .add("channelName", this.getChannelName()) + .add("channelName", this.getChannelNameFormatV1()) .add("isStreamingIngestClientClosed", this.streamingIngestClient.isClosed()) .toString(); } diff --git a/src/test/java/com/snowflake/kafka/connector/ConnectorConfigTest.java b/src/test/java/com/snowflake/kafka/connector/ConnectorConfigTest.java index da9198f11..e834ec3a2 100644 --- a/src/test/java/com/snowflake/kafka/connector/ConnectorConfigTest.java +++ b/src/test/java/com/snowflake/kafka/connector/ConnectorConfigTest.java @@ -867,6 +867,53 @@ public void testInvalidEnableOptimizeStreamingClientConfig() { } } + @Test + public void testEnableStreamingChannelMigrationConfig() { + Map config = getConfig(); + config.put( + SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT, + IngestionMethodConfig.SNOWPIPE_STREAMING.toString()); + config.put(Utils.SF_ROLE, "ACCOUNTADMIN"); + config.put(SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG, "true"); + + Utils.validateConfig(config); + } + + @Test + public void testEnableStreamingChannelMigrationConfig_invalidWithSnowpipe() { + try { + Map config = getConfig(); + config.put( + SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT, + IngestionMethodConfig.SNOWPIPE.toString()); + config.put(SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG, "true"); + + Utils.validateConfig(config); + } catch (SnowflakeKafkaConnectorException exception) { + assert exception + .getMessage() + .contains(SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG); + } + } + + @Test + public void testEnableStreamingChannelMigrationConfig_invalidWithSnowpipeStreaming() { + try { + Map config = getConfig(); + config.put( + SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT, + IngestionMethodConfig.SNOWPIPE_STREAMING.toString()); + config.put(Utils.SF_ROLE, "ACCOUNTADMIN"); + config.put( + SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG, "INVALID"); + Utils.validateConfig(config); + } catch (SnowflakeKafkaConnectorException exception) { + assert exception + .getMessage() + .contains(SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG); + } + } + @Test public void testInvalidEmptyConfig() { try { diff --git a/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskStreamingTest.java b/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskStreamingTest.java index 1399545ac..a7a26a5cb 100644 --- a/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskStreamingTest.java +++ b/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskStreamingTest.java @@ -88,17 +88,17 @@ public void testSinkTaskInvalidRecord_InMemoryDLQ() throws Exception { new TopicPartitionChannel( mockStreamingClient, topicPartition, - SnowflakeSinkServiceV2.partitionChannelKey(TEST_CONNECTOR_NAME, topicName, partition), + SnowflakeSinkServiceV2.partitionChannelKey(topicName, partition), topicName, new StreamingBufferThreshold(10, 10_000, 1), config, errorReporter, inMemorySinkTaskContext, + mockConnectionService, mockTelemetryService); Map topicPartitionChannelMap = - Collections.singletonMap( - partitionChannelKey(TEST_CONNECTOR_NAME, topicName, partition), topicPartitionChannel); + Collections.singletonMap(partitionChannelKey(topicName, partition), topicPartitionChannel); SnowflakeSinkServiceV2 mockSinkService = new SnowflakeSinkServiceV2( diff --git a/src/test/java/com/snowflake/kafka/connector/internal/ConnectionServiceIT.java b/src/test/java/com/snowflake/kafka/connector/internal/ConnectionServiceIT.java index 66078b016..739a914a0 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/ConnectionServiceIT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/ConnectionServiceIT.java @@ -2,10 +2,21 @@ import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT; import static com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceV1.USER_AGENT_SUFFIX_FORMAT; +import static com.snowflake.kafka.connector.internal.TestUtils.TEST_CONNECTOR_NAME; +import static com.snowflake.kafka.connector.internal.streaming.ChannelMigrationResponseCode.OFFSET_MIGRATION_SOURCE_CHANNEL_DOES_NOT_EXIST; +import static com.snowflake.kafka.connector.internal.streaming.ChannelMigrationResponseCode.isChannelMigrationResponseSuccessful; +import static com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.generateChannelNameFormatV2; import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; import com.snowflake.kafka.connector.Utils; +import com.snowflake.kafka.connector.dlq.InMemoryKafkaRecordErrorReporter; +import com.snowflake.kafka.connector.internal.streaming.ChannelMigrateOffsetTokenResponseDTO; +import com.snowflake.kafka.connector.internal.streaming.ChannelMigrationResponseCode; +import com.snowflake.kafka.connector.internal.streaming.InMemorySinkTaskContext; import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; +import com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2; +import com.snowflake.kafka.connector.internal.streaming.StreamingBufferThreshold; +import com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel; import com.snowflake.kafka.connector.internal.streaming.telemetry.SnowflakeTelemetryServiceV2; import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryServiceV1; import java.sql.ResultSet; @@ -19,6 +30,8 @@ import net.snowflake.client.jdbc.internal.apache.http.Header; import net.snowflake.client.jdbc.internal.apache.http.HttpHeaders; import net.snowflake.client.jdbc.internal.apache.http.client.methods.HttpPost; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkRecord; import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -104,7 +117,7 @@ public void createConnectionService() { SnowflakeConnectionService service = SnowflakeConnectionServiceFactory.builder().setProperties(TestUtils.getConf()).build(); - assert service.getConnectorName().equals(TestUtils.TEST_CONNECTOR_NAME); + assert service.getConnectorName().equals(TEST_CONNECTOR_NAME); assert TestUtils.assertError( SnowflakeErrors.ERROR_0017, @@ -116,7 +129,7 @@ public void createConnectionService() { SnowflakeURL url = TestUtils.getUrl(); Properties prop = InternalUtils.createProperties(TestUtils.getConf(), url); - String appName = TestUtils.TEST_CONNECTOR_NAME; + String appName = TEST_CONNECTOR_NAME; service = SnowflakeConnectionServiceFactory.builder() @@ -166,7 +179,7 @@ public void createConnectionService_SnowpipeStreaming() { SnowflakeConnectionService service = SnowflakeConnectionServiceFactory.builder().setProperties(config).build(); - assert service.getConnectorName().equals(TestUtils.TEST_CONNECTOR_NAME); + assert service.getConnectorName().equals(TEST_CONNECTOR_NAME); assert service.getTelemetryClient() instanceof SnowflakeTelemetryServiceV2; @@ -230,10 +243,10 @@ public void testStageFunctions() { // stage exists assert conn.stageExist(stageName); // put a file to stage - String fileName = FileNameUtils.fileName(TestUtils.TEST_CONNECTOR_NAME, tableName, 1, 123, 456); + String fileName = FileNameUtils.fileName(TEST_CONNECTOR_NAME, tableName, 1, 123, 456); conn.put(stageName, fileName, "test"); // list stage with prefix - List files = conn.listStage(stageName, TestUtils.TEST_CONNECTOR_NAME); + List files = conn.listStage(stageName, TEST_CONNECTOR_NAME); assert files.size() == 1; assert files.get(0).equals(fileName); // stage is compatible @@ -274,7 +287,7 @@ public void testStageFunctions() { // still not incompatible assert !conn.isStageCompatible(stageName); // list with prefix - files = conn.listStage(stageName, TestUtils.TEST_CONNECTOR_NAME); + files = conn.listStage(stageName, TEST_CONNECTOR_NAME); // only one file assert files.size() == 1; assert files.get(0).equals(fileName); @@ -319,20 +332,20 @@ public void testStagePurgeFunctions() { // stage exists assert conn.stageExist(stageName); // put two files to stage - String fileName1 = FileNameUtils.fileName(TestUtils.TEST_CONNECTOR_NAME, tableName, 1, 1, 3); + String fileName1 = FileNameUtils.fileName(TEST_CONNECTOR_NAME, tableName, 1, 1, 3); conn.put(stageName, fileName1, "test"); - String fileName2 = FileNameUtils.fileName(TestUtils.TEST_CONNECTOR_NAME, tableName, 1, 4, 6); + String fileName2 = FileNameUtils.fileName(TEST_CONNECTOR_NAME, tableName, 1, 4, 6); conn.put(stageName, fileName2, "test"); - String fileName3 = FileNameUtils.fileName(TestUtils.TEST_CONNECTOR_NAME, tableName, 1, 14, 16); + String fileName3 = FileNameUtils.fileName(TEST_CONNECTOR_NAME, tableName, 1, 14, 16); conn.put(stageName, fileName3, "test"); - String fileName4 = FileNameUtils.fileName(TestUtils.TEST_CONNECTOR_NAME, tableName, 1, 24, 26); + String fileName4 = FileNameUtils.fileName(TEST_CONNECTOR_NAME, tableName, 1, 24, 26); conn.put(stageName, fileName4, "test"); - String fileName5 = FileNameUtils.fileName(TestUtils.TEST_CONNECTOR_NAME, tableName, 1, 34, 36); + String fileName5 = FileNameUtils.fileName(TEST_CONNECTOR_NAME, tableName, 1, 34, 36); conn.put(stageName, fileName5, "test"); - String fileName6 = FileNameUtils.fileName(TestUtils.TEST_CONNECTOR_NAME, tableName, 1, 44, 46); + String fileName6 = FileNameUtils.fileName(TEST_CONNECTOR_NAME, tableName, 1, 44, 46); conn.put(stageName, fileName6, "test"); // list stage with prefix - List files = conn.listStage(stageName, TestUtils.TEST_CONNECTOR_NAME); + List files = conn.listStage(stageName, TEST_CONNECTOR_NAME); assert files.size() == 6; List filesList = new ArrayList<>(); @@ -344,7 +357,7 @@ public void testStagePurgeFunctions() { filesList.add(fileName6); conn.purgeStage(stageName, filesList); - files = conn.listStage(stageName, TestUtils.TEST_CONNECTOR_NAME); + files = conn.listStage(stageName, TEST_CONNECTOR_NAME); assert files.size() == 0; } @@ -411,4 +424,114 @@ public void testConnectionFunction() { service.close(); assert service.isClosed(); } + + @Test + public void testStreamingChannelOffsetMigration() { + Map testConfig = TestUtils.getConfForStreaming(); + SnowflakeConnectionService conn = + SnowflakeConnectionServiceFactory.builder().setProperties(testConfig).build(); + conn.createTable(tableName); + final String channelNameFormatV1 = SnowflakeSinkServiceV2.partitionChannelKey(tableName, 0); + + final String sourceChannelName = + generateChannelNameFormatV2(channelNameFormatV1, TEST_CONNECTOR_NAME); + final String destinationChannelName = channelNameFormatV1; + + // ### TEST 1 - Both channels doesnt exist + ChannelMigrateOffsetTokenResponseDTO channelMigrateOffsetTokenResponseDTO = + conn.migrateStreamingChannelOffsetToken( + tableName, sourceChannelName, destinationChannelName); + Assert.assertTrue(isChannelMigrationResponseSuccessful(channelMigrateOffsetTokenResponseDTO)); + Assert.assertEquals( + OFFSET_MIGRATION_SOURCE_CHANNEL_DOES_NOT_EXIST.getStatusCode(), + channelMigrateOffsetTokenResponseDTO.getResponseCode()); + + try { + // ### TEST 2 - Table doesnt exist + channelMigrateOffsetTokenResponseDTO = + conn.migrateStreamingChannelOffsetToken( + tableName + "_Table_DOESNT_EXIST", sourceChannelName, destinationChannelName); + } catch (SnowflakeKafkaConnectorException ex) { + assert ex.getMessage() + .contains( + ChannelMigrationResponseCode.ERR_TABLE_DOES_NOT_EXIST_NOT_AUTHORIZED.getMessage()); + } + + try { + // ### TEST 3 - Source Channel (v2 channel doesnt exist) + Map config = TestUtils.getConfForStreaming(); + SnowflakeSinkConnectorConfig.setDefaultValues(config); + TopicPartition topicPartition = new TopicPartition(tableName, 0); + + InMemorySinkTaskContext inMemorySinkTaskContext = + new InMemorySinkTaskContext(Collections.singleton(topicPartition)); + + // This will automatically create a channel for topicPartition. + SnowflakeSinkService service = + SnowflakeSinkServiceFactory.builder( + conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config) + .setRecordNumber(1) + .setErrorReporter(new InMemoryKafkaRecordErrorReporter()) + .setSinkTaskContext(inMemorySinkTaskContext) + .addTask(tableName, topicPartition) + .build(); + + final long noOfRecords = 10; + + // send regular data + List records = + TestUtils.createJsonStringSinkRecords(0, noOfRecords, tableName, 0); + + service.insert(records); + + TestUtils.assertWithRetry( + () -> service.getOffset(new TopicPartition(tableName, 0)) == noOfRecords, 5, 5); + channelMigrateOffsetTokenResponseDTO = + conn.migrateStreamingChannelOffsetToken( + tableName, sourceChannelName, destinationChannelName); + Assert.assertTrue(isChannelMigrationResponseSuccessful(channelMigrateOffsetTokenResponseDTO)); + Assert.assertEquals( + OFFSET_MIGRATION_SOURCE_CHANNEL_DOES_NOT_EXIST.getStatusCode(), + channelMigrateOffsetTokenResponseDTO.getResponseCode()); + + // even after migration, it sends same offset from server side since source didnt exist + TestUtils.assertWithRetry( + () -> service.getOffset(new TopicPartition(tableName, 0)) == noOfRecords, 5, 5); + + // TEST 4: Do an actual migration from new channel format to old channel Format + // Step 1: create a new source channel + // Step 2: load some data + // step 3: do a migration and check if destination channel has expected offset + + // Ctor of TopicPartitionChannel tries to open the channel. + SnowflakeSinkServiceV2 snowflakeSinkServiceV2 = (SnowflakeSinkServiceV2) service; + TopicPartitionChannel newChannelFormatV2 = + new TopicPartitionChannel( + snowflakeSinkServiceV2.getStreamingIngestClient(), + topicPartition, + sourceChannelName, + tableName, + new StreamingBufferThreshold(10, 10_000, 1), + config, + new InMemoryKafkaRecordErrorReporter(), + new InMemorySinkTaskContext(Collections.singleton(topicPartition)), + conn, + conn.getTelemetryClient()); + + List recordsInChannelFormatV2 = + TestUtils.createJsonStringSinkRecords(0, noOfRecords * 2, tableName, 0); + recordsInChannelFormatV2.forEach(newChannelFormatV2::insertRecordToBuffer); + + TestUtils.assertWithRetry( + () -> newChannelFormatV2.getOffsetSafeToCommitToKafka() == (noOfRecords * 2), 5, 5); + + conn.migrateStreamingChannelOffsetToken(tableName, sourceChannelName, destinationChannelName); + TestUtils.assertWithRetry( + () -> service.getOffset(new TopicPartition(tableName, 0)) == (noOfRecords * 2), 5, 5); + } catch (Exception e) { + Assert.fail("Should not throw an exception:" + e.getMessage()); + } finally { + TestUtils.dropTable(tableName); + } + } } diff --git a/src/test/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1Test.java b/src/test/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1Test.java new file mode 100644 index 000000000..88369d527 --- /dev/null +++ b/src/test/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1Test.java @@ -0,0 +1,39 @@ +package com.snowflake.kafka.connector.internal; + +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.snowflake.kafka.connector.internal.streaming.ChannelMigrateOffsetTokenResponseDTO; +import org.junit.Test; + +public class SnowflakeConnectionServiceV1Test { + @Test + public void testChannelMigrationResponse_validResponse() throws Exception { + SnowflakeConnectionServiceV1 v1MockConnectionService = mock(SnowflakeConnectionServiceV1.class); + final String validMigrationResponse = + "{\"responseCode\": 51, \"responseMessage\": \"Source Channel does not exist for Offset" + + " Migration\"}"; + when(v1MockConnectionService.getChannelMigrateOffsetTokenResponseDTO(anyString())) + .thenCallRealMethod(); + + ChannelMigrateOffsetTokenResponseDTO migrationDTO = + v1MockConnectionService.getChannelMigrateOffsetTokenResponseDTO(validMigrationResponse); + assert migrationDTO.getResponseCode() == 51; + assert migrationDTO + .getResponseMessage() + .contains("Source Channel does not exist for Offset Migration"); + } + + @Test(expected = JsonProcessingException.class) + public void testChannelMigrationResponse_InvalidResponse() throws Exception { + SnowflakeConnectionServiceV1 v1MockConnectionService = mock(SnowflakeConnectionServiceV1.class); + final String validMigrationResponse = + "{\"responseCode\": 51, \"responseMessage\": \"Source Channel does not exist for Offset" + + " Migration\", \"unknown\":62}"; + when(v1MockConnectionService.getChannelMigrateOffsetTokenResponseDTO(anyString())) + .thenCallRealMethod(); + v1MockConnectionService.getChannelMigrateOffsetTokenResponseDTO(validMigrationResponse); + } +} diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java index b0680efb1..988ac35e0 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java @@ -1,6 +1,5 @@ package com.snowflake.kafka.connector.internal.streaming; -import static com.snowflake.kafka.connector.internal.TestUtils.TEST_CONNECTOR_NAME; import static com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.partitionChannelKey; import static com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE; @@ -221,8 +220,7 @@ public void testStreamingIngest_multipleChannelPartitions_closeSubsetOfPartition Assert.assertTrue( snowflakeSinkServiceV2 - .getTopicPartitionChannelFromCacheKey( - partitionChannelKey(TEST_CONNECTOR_NAME, tp2.topic(), tp2.partition())) + .getTopicPartitionChannelFromCacheKey(partitionChannelKey(tp2.topic(), tp2.partition())) .isPresent()); List newRecordsPartition1 = @@ -412,8 +410,7 @@ public void testStreamingIngest_multipleChannelPartitions_withMetrics() throws E // verify all metrics Map metricRegistry = service - .getMetricRegistry( - SnowflakeSinkServiceV2.partitionChannelKey(TEST_CONNECTOR_NAME, topic, partition)) + .getMetricRegistry(SnowflakeSinkServiceV2.partitionChannelKey(topic, partition)) .get() .getGauges(); assert metricRegistry.size() @@ -422,7 +419,7 @@ public void testStreamingIngest_multipleChannelPartitions_withMetrics() throws E // partition 1 this.verifyPartitionMetrics( metricRegistry, - partitionChannelKey(TEST_CONNECTOR_NAME, topic, partition), + partitionChannelKey(topic, partition), NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE, recordsInPartition1 - 1, recordsInPartition1, @@ -430,7 +427,7 @@ public void testStreamingIngest_multipleChannelPartitions_withMetrics() throws E this.conn.getConnectorName()); this.verifyPartitionMetrics( metricRegistry, - partitionChannelKey(TEST_CONNECTOR_NAME, topic, partition2), + partitionChannelKey(topic, partition2), NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE, recordsInPartition2 - 1, recordsInPartition2, @@ -445,8 +442,7 @@ public void testStreamingIngest_multipleChannelPartitions_withMetrics() throws E // verify metrics closed assert !service - .getMetricRegistry( - SnowflakeSinkServiceV2.partitionChannelKey(TEST_CONNECTOR_NAME, topic, partition)) + .getMetricRegistry(SnowflakeSinkServiceV2.partitionChannelKey(topic, partition)) .isPresent(); Mockito.verify(telemetryService, Mockito.times(2)) diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java index e4176d970..75d9de64a 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java @@ -1,6 +1,8 @@ package com.snowflake.kafka.connector.internal.streaming; -import static com.snowflake.kafka.connector.internal.TestUtils.TEST_CONNECTOR_NAME; +import static com.snowflake.kafka.connector.internal.streaming.ChannelMigrationResponseCode.SUCCESS; +import static com.snowflake.kafka.connector.internal.streaming.ChannelMigrationResponseCode.isChannelMigrationResponseSuccessful; +import static com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE; import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; import com.snowflake.kafka.connector.Utils; @@ -45,11 +47,9 @@ public void beforeEach() { topicPartition2 = new TopicPartition(topic, PARTITION_2); - testChannelName = - SnowflakeSinkServiceV2.partitionChannelKey(TEST_CONNECTOR_NAME, topic, PARTITION); + testChannelName = SnowflakeSinkServiceV2.partitionChannelKey(topic, PARTITION); - testChannelName2 = - SnowflakeSinkServiceV2.partitionChannelKey(TEST_CONNECTOR_NAME, topic, PARTITION_2); + testChannelName2 = SnowflakeSinkServiceV2.partitionChannelKey(topic, PARTITION_2); } @After @@ -98,6 +98,7 @@ public void testAutoChannelReopenOn_OffsetTokenSFException() throws Exception { config, new InMemoryKafkaRecordErrorReporter(), new InMemorySinkTaskContext(Collections.singleton(topicPartition)), + conn, conn.getTelemetryClient()); // since channel is updated, try to insert data again or may be call getOffsetToken @@ -562,4 +563,177 @@ public void testPartialBatchChannelInvalidationIngestion_schematization() throws service.closeAll(); } + + @Test + public void testChannelMigrateOffsetTokenSystemFunction_NonNullOffsetTokenForSourceChannel() + throws Exception { + Map config = TestUtils.getConfForStreaming(); + SnowflakeSinkConnectorConfig.setDefaultValues(config); + + InMemorySinkTaskContext inMemorySinkTaskContext = + new InMemorySinkTaskContext(Collections.singleton(topicPartition)); + + // This will automatically create a channel for topicPartition. + SnowflakeSinkService service = + SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config) + .setRecordNumber(1) + .setErrorReporter(new InMemoryKafkaRecordErrorReporter()) + .setSinkTaskContext(inMemorySinkTaskContext) + .addTask(testTableName, topicPartition) + .build(); + + TopicPartitionChannel topicPartitionChannel = + ((SnowflakeSinkServiceV2) service) + .getTopicPartitionChannelFromCacheKey(testChannelName) + .get(); + // Channel does exist + Assert.assertNotNull(topicPartitionChannel); + + // get the corresponding V2 format for above topic partition channel + final String channelNameFormatV2 = + topicPartitionChannel.generateChannelNameFormatV2(testChannelName, conn.getConnectorName()); + + // create a channel with new format and ingest few rows + // Ctor of TopicPartitionChannel tries to open the channel (new format) for same partition + TopicPartitionChannel topicPartitionChannelForFormatV2 = + new TopicPartitionChannel( + ((SnowflakeSinkServiceV2) service).getStreamingIngestClient(), + topicPartition, + channelNameFormatV2, + testTableName, + new StreamingBufferThreshold(10, 10_000, 1), + config, + new InMemoryKafkaRecordErrorReporter(), + new InMemorySinkTaskContext(Collections.singleton(topicPartition)), + conn, + conn.getTelemetryClient()); + + // insert few records via new channel + final int noOfRecords = 5; + // Since record 0 was not able to ingest, all records in this batch will not be added into the + // buffer. + List records = + TestUtils.createJsonStringSinkRecords(0, noOfRecords, testTableName, PARTITION); + + records.forEach(topicPartitionChannelForFormatV2::insertRecordToBuffer); + TestUtils.assertWithRetry( + () -> topicPartitionChannelForFormatV2.getOffsetSafeToCommitToKafka() == noOfRecords, 5, 5); + + // we migrate the offset from new channel format to old channel format + ChannelMigrateOffsetTokenResponseDTO channelMigrateOffsetTokenResponseDTO = + conn.migrateStreamingChannelOffsetToken( + testTableName, channelNameFormatV2, testChannelName); + Assert.assertTrue(isChannelMigrationResponseSuccessful(channelMigrateOffsetTokenResponseDTO)); + Assert.assertEquals( + SUCCESS.getStatusCode(), channelMigrateOffsetTokenResponseDTO.getResponseCode()); + + // Fetch offsetToken from API should now give you same as other channel + TestUtils.assertWithRetry( + () -> service.getOffset(new TopicPartition(topic, PARTITION)) == noOfRecords, 5, 5); + + // add few more records + records = + TestUtils.createJsonStringSinkRecords(noOfRecords, noOfRecords, testTableName, PARTITION); + records.forEach(service::insert); + TestUtils.assertWithRetry( + () -> service.getOffset(new TopicPartition(topic, PARTITION)) == noOfRecords + noOfRecords, + 5, + 5); + + service.closeAll(); + } + + @Test + public void testChannelMigrateOffsetTokenSystemFunction_NullOffsetTokenInFormatV2() + throws Exception { + Map config = TestUtils.getConfForStreaming(); + SnowflakeSinkConnectorConfig.setDefaultValues(config); + + InMemorySinkTaskContext inMemorySinkTaskContext = + new InMemorySinkTaskContext(Collections.singleton(topicPartition)); + + // This will automatically create a channel for topicPartition. + SnowflakeSinkService service = + SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config) + .setRecordNumber(1) + .setErrorReporter(new InMemoryKafkaRecordErrorReporter()) + .setSinkTaskContext(inMemorySinkTaskContext) + .addTask(testTableName, topicPartition) + .build(); + + TopicPartitionChannel topicPartitionChannel = + ((SnowflakeSinkServiceV2) service) + .getTopicPartitionChannelFromCacheKey(testChannelName) + .get(); + // Channel does exist + Assert.assertNotNull(topicPartitionChannel); + + final int recordsInPartition1 = 10; + List recordsPartition1 = + TestUtils.createJsonStringSinkRecords(0, recordsInPartition1, topic, PARTITION); + + List records = new ArrayList<>(recordsPartition1); + + service.insert(records); + + TestUtils.assertWithRetry( + () -> service.getOffset(new TopicPartition(topic, PARTITION)) == recordsInPartition1, 5, 5); + + // get the corresponding V2 format for above topic partition channel + final String channelNameFormatV2 = + topicPartitionChannel.generateChannelNameFormatV2(testChannelName, conn.getConnectorName()); + + // create a channel with new format and dont ingest anything + // Ctor of TopicPartitionChannel tries to open the channel (new format) for same partition + TopicPartitionChannel topicPartitionChannelForFormatV2 = + new TopicPartitionChannel( + ((SnowflakeSinkServiceV2) service).getStreamingIngestClient(), + topicPartition, + channelNameFormatV2, + testTableName, + new StreamingBufferThreshold(10, 10_000, 1), + config, + new InMemoryKafkaRecordErrorReporter(), + new InMemorySinkTaskContext(Collections.singleton(topicPartition)), + conn, + conn.getTelemetryClient()); + + // close the partition and open the partition to mimic migration + service.close(Collections.singletonList(topicPartition)); + + Map topic2Table = new HashMap<>(); + topic2Table.put(topic, testTableName); + service.startPartitions(Collections.singletonList(topicPartition), topic2Table); + + // this instance has changed since we removed it from cache and loaded it again. + TopicPartitionChannel topicPartitionChannelAfterCloseAndStartPartition = + ((SnowflakeSinkServiceV2) service) + .getTopicPartitionChannelFromCacheKey(testChannelName) + .get(); + + // Fetch offsetToken from API should now give you same as other channel + TestUtils.assertWithRetry( + () -> + topicPartitionChannelAfterCloseAndStartPartition.fetchOffsetTokenWithRetry() + == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE, + 5, + 5); + + recordsPartition1 = + TestUtils.createJsonStringSinkRecords( + recordsInPartition1, recordsInPartition1, topic, PARTITION); + + records = new ArrayList<>(recordsPartition1); + + service.insert(records); + + TestUtils.assertWithRetry( + () -> + service.getOffset(new TopicPartition(topic, PARTITION)) + == recordsInPartition1 + recordsInPartition1, + 5, + 5); + + service.closeAll(); + } } diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java index dd32d24f0..dc094fff7 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java @@ -1,5 +1,6 @@ package com.snowflake.kafka.connector.internal.streaming; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_LOG_ENABLE_CONFIG; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_TOLERANCE_CONFIG; @@ -7,6 +8,7 @@ import static com.snowflake.kafka.connector.internal.TestUtils.createBigAvroRecords; import static com.snowflake.kafka.connector.internal.TestUtils.createNativeJsonSinkRecords; import static com.snowflake.kafka.connector.internal.streaming.StreamingUtils.MAX_GET_OFFSET_TOKEN_RETRIES; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import com.codahale.metrics.MetricRegistry; @@ -72,7 +74,7 @@ public class TopicPartitionChannelTest { private static final int PARTITION = 0; private static final String TEST_CHANNEL_NAME = - SnowflakeSinkServiceV2.partitionChannelKey(TEST_CONNECTOR_NAME, TOPIC, PARTITION); + SnowflakeSinkServiceV2.partitionChannelKey(TOPIC, PARTITION); private static final String TEST_TABLE_NAME = "TEST_TABLE"; private TopicPartition topicPartition; @@ -126,6 +128,7 @@ public void testTopicPartitionChannelInit_streamingClientClosed() { sfConnectorConfig, mockKafkaRecordErrorReporter, mockSinkTaskContext, + mockSnowflakeConnectionService, mockTelemetryService); } @@ -143,6 +146,7 @@ public void testFetchOffsetTokenWithRetry_null() { sfConnectorConfig, mockKafkaRecordErrorReporter, mockSinkTaskContext, + mockSnowflakeConnectionService, mockTelemetryService); Assert.assertEquals(-1L, topicPartitionChannel.fetchOffsetTokenWithRetry()); @@ -163,6 +167,7 @@ public void testFetchOffsetTokenWithRetry_validLong() { sfConnectorConfig, mockKafkaRecordErrorReporter, mockSinkTaskContext, + mockSnowflakeConnectionService, mockTelemetryService); Assert.assertEquals(100L, topicPartitionChannel.fetchOffsetTokenWithRetry()); @@ -188,6 +193,7 @@ public void testFirstRecordForChannel() { sfConnectorConfig, mockKafkaRecordErrorReporter, mockSinkTaskContext, + mockSnowflakeConnectionService, mockTelemetryService); JsonConverter converter = new JsonConverter(); @@ -243,6 +249,115 @@ public void testCloseChannelException() throws Exception { topicPartitionChannel.closeChannel(); } + @Test + public void testStreamingChannelMigrationEnabledAndDisabled() { + + Mockito.when(mockStreamingChannel.getFullyQualifiedName()).thenReturn(TEST_CHANNEL_NAME); + Mockito.when( + mockSnowflakeConnectionService.migrateStreamingChannelOffsetToken( + anyString(), anyString(), Mockito.anyString())) + .thenReturn(new ChannelMigrateOffsetTokenResponseDTO(50, "SUCCESS")); + + // checking default + TopicPartitionChannel topicPartitionChannel = + new TopicPartitionChannel( + mockStreamingClient, + topicPartition, + TEST_CHANNEL_NAME, + TEST_TABLE_NAME, + true, + streamingBufferThreshold, + sfConnectorConfig, + mockKafkaRecordErrorReporter, + mockSinkTaskContext, + mockSnowflakeConnectionService, + new RecordService(mockTelemetryService), + mockTelemetryService, + false, + null); + Mockito.verify(mockSnowflakeConnectionService, Mockito.times(1)) + .migrateStreamingChannelOffsetToken(anyString(), anyString(), anyString()); + + Map customSfConfig = new HashMap<>(sfConnectorConfig); + customSfConfig.put(ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG, "true"); + + topicPartitionChannel = + new TopicPartitionChannel( + mockStreamingClient, + topicPartition, + TEST_CHANNEL_NAME, + TEST_TABLE_NAME, + true, + streamingBufferThreshold, + customSfConfig, + mockKafkaRecordErrorReporter, + mockSinkTaskContext, + mockSnowflakeConnectionService, + new RecordService(mockTelemetryService), + mockTelemetryService, + false, + null); + Mockito.verify(mockSnowflakeConnectionService, Mockito.times(2)) + .migrateStreamingChannelOffsetToken(anyString(), anyString(), anyString()); + + customSfConfig.put(ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG, "false"); + SnowflakeConnectionService anotherMockForParamDisabled = + Mockito.mock(SnowflakeConnectionService.class); + + topicPartitionChannel = + new TopicPartitionChannel( + mockStreamingClient, + topicPartition, + TEST_CHANNEL_NAME, + TEST_TABLE_NAME, + true, + streamingBufferThreshold, + customSfConfig, + mockKafkaRecordErrorReporter, + mockSinkTaskContext, + anotherMockForParamDisabled, + new RecordService(mockTelemetryService), + mockTelemetryService, + false, + null); + Mockito.verify(anotherMockForParamDisabled, Mockito.times(0)) + .migrateStreamingChannelOffsetToken(anyString(), anyString(), anyString()); + } + + @Test + public void testStreamingChannelMigration_InvalidResponse() { + + Mockito.when(mockStreamingChannel.getFullyQualifiedName()).thenReturn(TEST_CHANNEL_NAME); + Mockito.when( + mockSnowflakeConnectionService.migrateStreamingChannelOffsetToken( + anyString(), anyString(), Mockito.anyString())) + .thenThrow(new RuntimeException("Exception migrating channel offset token")); + try { + // checking default + TopicPartitionChannel topicPartitionChannel = + new TopicPartitionChannel( + mockStreamingClient, + topicPartition, + TEST_CHANNEL_NAME, + TEST_TABLE_NAME, + true, + streamingBufferThreshold, + sfConnectorConfig, + mockKafkaRecordErrorReporter, + mockSinkTaskContext, + mockSnowflakeConnectionService, + new RecordService(mockTelemetryService), + mockTelemetryService, + false, + null); + Assert.fail("Should throw an exception:"); + } catch (Exception e) { + Mockito.verify(mockSnowflakeConnectionService, Mockito.times(1)) + .migrateStreamingChannelOffsetToken(anyString(), anyString(), anyString()); + assert e.getMessage().contains("Exception migrating channel offset token"); + } + } + /* Only SFExceptions are retried and goes into fallback. */ @Test(expected = SFException.class) public void testFetchOffsetTokenWithRetry_SFException() { @@ -258,6 +373,7 @@ public void testFetchOffsetTokenWithRetry_SFException() { sfConnectorConfig, mockKafkaRecordErrorReporter, mockSinkTaskContext, + mockSnowflakeConnectionService, mockTelemetryService); try { @@ -292,6 +408,7 @@ public void testFetchOffsetTokenWithRetry_validOffsetTokenAfterThreeSFExceptions sfConnectorConfig, mockKafkaRecordErrorReporter, mockSinkTaskContext, + mockSnowflakeConnectionService, mockTelemetryService); int expectedRetries = MAX_GET_OFFSET_TOKEN_RETRIES; @@ -322,6 +439,7 @@ public void testFetchOffsetTokenWithRetry_InvalidNumber() { sfConnectorConfig, mockKafkaRecordErrorReporter, mockSinkTaskContext, + mockSnowflakeConnectionService, mockTelemetryService); try { @@ -354,6 +472,7 @@ public void testFetchOffsetTokenWithRetry_NullPointerException() { sfConnectorConfig, mockKafkaRecordErrorReporter, mockSinkTaskContext, + mockSnowflakeConnectionService, mockTelemetryService); try { @@ -382,6 +501,7 @@ public void testFetchOffsetTokenWithRetry_RuntimeException() { sfConnectorConfig, mockKafkaRecordErrorReporter, mockSinkTaskContext, + mockSnowflakeConnectionService, mockTelemetryService); try { @@ -425,6 +545,7 @@ public void testInsertRows_SuccessAfterReopenChannel() throws Exception { sfConnectorConfig, mockKafkaRecordErrorReporter, mockSinkTaskContext, + mockSnowflakeConnectionService, mockTelemetryService); expectedOpenChannelCount++; expectedGetOffsetCount++; @@ -573,6 +694,7 @@ public void testInsertRows_GetOffsetTokenFailureAfterReopenChannel() throws Exce sfConnectorConfig, mockKafkaRecordErrorReporter, mockSinkTaskContext, + mockSnowflakeConnectionService, mockTelemetryService); List records = TestUtils.createJsonStringSinkRecords(0, 1, TOPIC, PARTITION); @@ -612,6 +734,7 @@ public void testInsertRows_RuntimeException() throws Exception { sfConnectorConfig, mockKafkaRecordErrorReporter, mockSinkTaskContext, + mockSnowflakeConnectionService, mockTelemetryService); List records = TestUtils.createJsonStringSinkRecords(0, 1, TOPIC, PARTITION); @@ -701,6 +824,7 @@ public void testInsertRows_ValidationResponseHasErrors_ErrorTolerance_ALL() thro sfConnectorConfigWithErrors, kafkaRecordErrorReporter, mockSinkTaskContext, + mockSnowflakeConnectionService, mockTelemetryService); List records = TestUtils.createJsonStringSinkRecords(0, 1, TOPIC, PARTITION); @@ -746,6 +870,7 @@ public void testInsertRows_ValidationResponseHasErrors_ErrorTolerance_ALL_LogEna sfConnectorConfigWithErrors, kafkaRecordErrorReporter, mockSinkTaskContext, + mockSnowflakeConnectionService, mockTelemetryService); List records = TestUtils.createJsonStringSinkRecords(0, 1, TOPIC, PARTITION); @@ -786,6 +911,7 @@ public void testBufferBytesThreshold() throws Exception { sfConnectorConfig, mockKafkaRecordErrorReporter, mockSinkTaskContext, + mockSnowflakeConnectionService, mockTelemetryService); // Sending 5 records will trigger a buffer bytes based threshold after 4 records have been @@ -835,6 +961,7 @@ public void testBigAvroBufferBytesThreshold() throws Exception { sfConnectorConfig, mockKafkaRecordErrorReporter, mockSinkTaskContext, + mockSnowflakeConnectionService, mockTelemetryService); // Sending 3 records will trigger a buffer bytes based threshold after 2 records have been diff --git a/test/rest_request_template/test_snowpipe_streaming_channel_migration_disabled.json b/test/rest_request_template/test_snowpipe_streaming_channel_migration_disabled.json new file mode 100755 index 000000000..86306c3ae --- /dev/null +++ b/test/rest_request_template/test_snowpipe_streaming_channel_migration_disabled.json @@ -0,0 +1,28 @@ +{ + "name": "SNOWFLAKE_CONNECTOR_NAME", + "config": { + "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector", + "topics": "SNOWFLAKE_TEST_TOPIC", + "tasks.max": "1", + "buffer.flush.time": "60", + "buffer.count.records": "300", + "buffer.size.bytes": "5000000", + "snowflake.url.name": "SNOWFLAKE_HOST", + "snowflake.user.name": "SNOWFLAKE_USER", + "snowflake.private.key": "SNOWFLAKE_PRIVATE_KEY", + "snowflake.database.name": "SNOWFLAKE_DATABASE", + "snowflake.schema.name": "SNOWFLAKE_SCHEMA", + "snowflake.role.name": "SNOWFLAKE_ROLE", + "snowflake.ingestion.method": "SNOWPIPE_STREAMING", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "enable.streaming.channel.offset.migration": "false", + "jmx": "true", + "errors.tolerance": "all", + "errors.log.enable": true, + "errors.deadletterqueue.topic.name": "DLQ_TOPIC", + "errors.deadletterqueue.topic.replication.factor": 1, + "snowflake.enable.schematization": true + } +} \ No newline at end of file diff --git a/test/test_suit/test_snowpipe_streaming_channel_migration_disabled.py b/test/test_suit/test_snowpipe_streaming_channel_migration_disabled.py new file mode 100644 index 000000000..924d02632 --- /dev/null +++ b/test/test_suit/test_snowpipe_streaming_channel_migration_disabled.py @@ -0,0 +1,95 @@ +import datetime + +from test_suit.test_utils import RetryableError, NonRetryableError +import json +from time import sleep + +""" +Only config added here is about migrating channel offsets from channel created in version 2.1.0 to any future versions. +This test verifies if the functionality can be disabled +""" +class TestSnowpipeStreamingStringJsonChannelMigrationDisabled: + def __init__(self, driver, nameSalt): + self.driver = driver + self.fileName = "test_snowpipe_streaming_channel_migration_disabled" + self.topic = self.fileName + nameSalt + + self.topicNum = 1 + self.partitionNum = 3 + self.recordNum = 1000 + + # create topic and partitions in constructor since the post REST api will automatically create topic with only one partition + self.driver.createTopics(self.topic, partitionNum=self.partitionNum, replicationNum=1) + + def getConfigFileName(self): + return self.fileName + ".json" + + def send(self): + # create topic with n partitions and only one replication factor + print("Partition count:" + str(self.partitionNum)) + print("Topic:", self.topic) + + self.driver.describeTopic(self.topic) + + for p in range(self.partitionNum): + print("Sending in Partition:" + str(p)) + key = [] + value = [] + + # send two less record because we are sending tombstone records. tombstone ingestion is enabled by default + for e in range(self.recordNum - 2): + value.append(json.dumps( + {'numbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumber': str(e)} + ).encode('utf-8')) + + # append tombstone except for 2.5.1 due to this bug: https://issues.apache.org/jira/browse/KAFKA-10477 + if self.driver.testVersion == '2.5.1': + value.append(json.dumps( + {'numbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumber': str(self.recordNum - 1)} + ).encode('utf-8')) + value.append(json.dumps( + {'numbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumbernumber': str(self.recordNum)} + ).encode('utf-8')) + else: + value.append(None) + value.append("") # community converters treat this as a tombstone + + self.driver.sendBytesData(self.topic, value, key, partition=p) + sleep(2) + + def verify(self, round): + res = self.driver.snowflake_conn.cursor().execute( + "SELECT count(*) FROM {}".format(self.topic)).fetchone()[0] + print("Count records in table {}={}".format(self.topic, str(res))) + if res < (self.recordNum * self.partitionNum): + print("Topic:" + self.topic + " count is less, will retry") + raise RetryableError() + elif res > (self.recordNum * self.partitionNum): + print("Topic:" + self.topic + " count is more, duplicates detected") + raise NonRetryableError("Duplication occurred, number of record in table is larger than number of record sent") + else: + print("Table:" + self.topic + " count is exactly " + str(self.recordNum * self.partitionNum)) + + # for duplicates + res = self.driver.snowflake_conn.cursor().execute("Select record_metadata:\"offset\"::string as OFFSET_NO,record_metadata:\"partition\"::string as PARTITION_NO from {} group by OFFSET_NO, PARTITION_NO having count(*)>1".format(self.topic)).fetchone() + print("Duplicates:{}".format(res)) + if res is not None: + raise NonRetryableError("Duplication detected") + + # for uniqueness in offset numbers + rows = self.driver.snowflake_conn.cursor().execute("Select count(distinct record_metadata:\"offset\"::number) as UNIQUE_OFFSETS,record_metadata:\"partition\"::number as PARTITION_NO from {} group by PARTITION_NO order by PARTITION_NO".format(self.topic)).fetchall() + + if rows is None: + raise NonRetryableError("Unique offsets for partitions not found") + else: + assert len(rows) == 3 + + for p in range(self.partitionNum): + # unique offset count and partition no are two columns (returns tuple) + if rows[p][0] != self.recordNum or rows[p][1] != p: + raise NonRetryableError("Unique offsets for partitions count doesnt match") + + def clean(self): + # dropping of stage and pipe doesnt apply for snowpipe streaming. (It executes drop if exists) + self.driver.cleanTableStagePipe(self.topic) + return diff --git a/test/test_suites.py b/test/test_suites.py index 95d958d85..2144952c7 100644 --- a/test/test_suites.py +++ b/test/test_suites.py @@ -47,6 +47,7 @@ from test_suit.test_string_avrosr import TestStringAvrosr from test_suit.test_string_json import TestStringJson from test_suit.test_string_json_ignore_tombstone import TestStringJsonIgnoreTombstone +from test_suit.test_snowpipe_streaming_channel_migration_disabled import TestSnowpipeStreamingStringJsonChannelMigrationDisabled class EndToEndTestSuite: @@ -134,6 +135,9 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS test_instance=TestSnowpipeStreamingStringJson(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True )), + ("TestSnowpipeStreamingStringJsonChannelMigrationDisabled", EndToEndTestSuite( + test_instance=TestSnowpipeStreamingStringJsonChannelMigrationDisabled(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True + )), ("TestSnowpipeStreamingStringJsonIgnoreTombstone", EndToEndTestSuite( test_instance=TestSnowpipeStreamingStringJsonIgnoreTombstone(driver, nameSalt), clean=True, run_in_confluent=True,