Skip to content

Commit

Permalink
PROD-39429 Implement migrate sys func from new channel(Format V2) to …
Browse files Browse the repository at this point in the history
…old channel (V1) - Push to Main (#751)
  • Loading branch information
sfc-gh-japatel authored Nov 22, 2023
1 parent afcb116 commit 6edd211
Show file tree
Hide file tree
Showing 19 changed files with 1,069 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,17 @@ public class SnowflakeSinkConnectorConfig {
"Whether to optimize the streaming client to reduce cost. Note that this may affect"
+ " throughput or latency and can only be set if Streaming Snowpipe is enabled";

public static final String ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG =
"enable.streaming.channel.offset.migration";
public static final String ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DISPLAY =
"Enable Streaming Channel Offset Migration";
public static final boolean ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DEFAULT = true;
public static final String ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DOC =
"This config is used to enable/disable streaming channel offset migration logic. If true, we"
+ " will migrate offset token from channel name format V2 to name format v1. V2 channel"
+ " format is deprecated and V1 will be used always, disabling this config could have"
+ " ramifications. Please consult Snowflake support before setting this to false.";

// MDC logging header
public static final String ENABLE_MDC_LOGGING_CONFIG = "enable.mdc.logging";
public static final String ENABLE_MDC_LOGGING_DISPLAY = "Enable MDC logging";
Expand Down Expand Up @@ -591,7 +602,17 @@ static ConfigDef newConfigDef() {
CONNECTOR_CONFIG,
8,
ConfigDef.Width.NONE,
ENABLE_MDC_LOGGING_DISPLAY);
ENABLE_MDC_LOGGING_DISPLAY)
.define(
ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG,
Type.BOOLEAN,
ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DEFAULT,
Importance.LOW,
ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DOC,
CONNECTOR_CONFIG,
9,
ConfigDef.Width.NONE,
ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DISPLAY);
}

public static class TopicToTableValidator implements ConfigDef.Validator {
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/com/snowflake/kafka/connector/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,14 @@ static String validateConfig(Map<String, String> config) {
"Streaming client optimization is only available with {}.",
IngestionMethodConfig.SNOWPIPE_STREAMING.toString()));
}
if (config.containsKey(
SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG)) {
invalidConfigParams.put(
SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG,
Utils.formatString(
"Streaming client Channel migration is only available with {}.",
IngestionMethodConfig.SNOWPIPE_STREAMING.toString()));
}
}

if (config.containsKey(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.snowflake.kafka.connector.internal;

import com.snowflake.kafka.connector.internal.streaming.ChannelMigrateOffsetTokenResponseDTO;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
import java.sql.Connection;
import java.util.List;
Expand Down Expand Up @@ -282,4 +283,28 @@ public interface SnowflakeConnectionService {
* @param tableName table name
*/
void createTableWithOnlyMetadataColumn(String tableName);

/**
* Migrate Streaming Channel offsetToken from a source Channel to a destination channel.
*
* <p>Here, source channel is the new channel format we created here * @see <a
* href="https://github.com/snowflakedb/snowflake-kafka-connector/commit/3bf9106b22510c62068f7d2f7137b9e57989274c">Commit
* </a>
*
* <p>Destination channel is the original Format containing only topicName and partition number.
*
* <p>We catch SQLException and JsonProcessingException that might happen in this method. The
* caller should always open the Old Channel format. This old channel format will also be the key
* to many HashMaps we will create. (For instance {@link
* com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2#partitionsToChannel})
*
* @param tableName Name of the table
* @param sourceChannelName sourceChannel name from where the offset Token will be fetched.
* Channel with this name will also be deleted.
* @param destinationChannelName destinationChannel name to where the offsetToken will be copied
* over.
* @return The DTO serialized from the migration response.
*/
ChannelMigrateOffsetTokenResponseDTO migrateStreamingChannelOffsetToken(
String tableName, String sourceChannelName, String destinationChannelName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_CONTENT;
import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_METADATA;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.internal.streaming.ChannelMigrateOffsetTokenResponseDTO;
import com.snowflake.kafka.connector.internal.streaming.ChannelMigrationResponseCode;
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig;
import com.snowflake.kafka.connector.internal.streaming.SchematizationUtils;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
Expand Down Expand Up @@ -56,6 +61,8 @@ public class SnowflakeConnectionServiceV1 implements SnowflakeConnectionService
// User agent suffix we want to pass in to ingest service
public static final String USER_AGENT_SUFFIX_FORMAT = "SFKafkaConnector/%s provider/%s";

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

SnowflakeConnectionServiceV1(
Properties prop,
SnowflakeURL url,
Expand Down Expand Up @@ -1006,4 +1013,78 @@ public Connection getConnection() {
public SnowflakeInternalStage getInternalStage() {
return this.internalStage;
}

@Override
public ChannelMigrateOffsetTokenResponseDTO migrateStreamingChannelOffsetToken(
String tableName, String sourceChannelName, String destinationChannelName) {
InternalUtils.assertNotEmpty("tableName", tableName);
InternalUtils.assertNotEmpty("sourceChannelName", sourceChannelName);
InternalUtils.assertNotEmpty("destinationChannelName", destinationChannelName);
String fullyQualifiedTableName =
prop.getProperty(InternalUtils.JDBC_DATABASE)
+ "."
+ prop.getProperty(InternalUtils.JDBC_SCHEMA)
+ "."
+ tableName;
String query = "select SYSTEM$SNOWPIPE_STREAMING_MIGRATE_CHANNEL_OFFSET_TOKEN((?), (?), (?));";

try {
PreparedStatement stmt = conn.prepareStatement(query);
stmt.setString(1, fullyQualifiedTableName);
stmt.setString(2, sourceChannelName);
stmt.setString(3, destinationChannelName);
ResultSet resultSet = stmt.executeQuery();

String migrateOffsetTokenResultFromSysFunc = null;
if (resultSet.next()) {
migrateOffsetTokenResultFromSysFunc = resultSet.getString(1 /*Only one column*/);
}
if (migrateOffsetTokenResultFromSysFunc == null) {
final String errorMsg =
String.format(
"No result found in Migrating OffsetToken through System Function for tableName:%s,"
+ " sourceChannel:%s, destinationChannel:%s",
fullyQualifiedTableName, sourceChannelName, destinationChannelName);
throw SnowflakeErrors.ERROR_5023.getException(errorMsg, this.telemetry);
}

ChannelMigrateOffsetTokenResponseDTO channelMigrateOffsetTokenResponseDTO =
getChannelMigrateOffsetTokenResponseDTO(migrateOffsetTokenResultFromSysFunc);

LOGGER.info(
"Migrate OffsetToken response for table:{}, sourceChannel:{}, destinationChannel:{}"
+ " is:{}",
tableName,
sourceChannelName,
destinationChannelName,
channelMigrateOffsetTokenResponseDTO);
if (!ChannelMigrationResponseCode.isChannelMigrationResponseSuccessful(
channelMigrateOffsetTokenResponseDTO)) {
throw SnowflakeErrors.ERROR_5023.getException(
ChannelMigrationResponseCode.getMessageByCode(
channelMigrateOffsetTokenResponseDTO.getResponseCode()),
this.telemetry);
}
return channelMigrateOffsetTokenResponseDTO;
} catch (SQLException | JsonProcessingException e) {
final String errorMsg =
String.format(
"Migrating OffsetToken for a SourceChannel:%s in table:%s failed due to"
+ " exceptionMessage:%s and stackTrace:%s",
sourceChannelName,
fullyQualifiedTableName,
e.getMessage(),
Arrays.toString(e.getStackTrace()));
throw SnowflakeErrors.ERROR_5023.getException(errorMsg, this.telemetry);
}
}

@VisibleForTesting
protected ChannelMigrateOffsetTokenResponseDTO getChannelMigrateOffsetTokenResponseDTO(
String migrateOffsetTokenResultFromSysFunc) throws JsonProcessingException {
ChannelMigrateOffsetTokenResponseDTO channelMigrateOffsetTokenResponseDTO =
OBJECT_MAPPER.readValue(
migrateOffsetTokenResultFromSysFunc, ChannelMigrateOffsetTokenResponseDTO.class);
return channelMigrateOffsetTokenResponseDTO;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,13 @@ public enum SnowflakeErrors {
"5021",
"Failed to get data schema",
"Failed to get data schema. Unrecognizable data type in JSON object"),
ERROR_5022("5022", "Invalid column name", "Failed to find column in the schema");
ERROR_5022("5022", "Invalid column name", "Failed to find column in the schema"),

ERROR_5023(
"5023",
"Failure in Streaming Channel Offset Migration Response",
"Streaming Channel Offset Migration from Source to Destination Channel has no/invalid"
+ " response, please contact Snowflake Support");

// properties

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c) 2023 Snowflake Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.snowflake.kafka.connector.internal.streaming;

import com.fasterxml.jackson.annotation.JsonProperty;

/**
* POJO used to serialize the System function response for migration offset from Source Channel to
* Destination.
*/
public class ChannelMigrateOffsetTokenResponseDTO {
private long responseCode;

private String responseMessage;

public ChannelMigrateOffsetTokenResponseDTO(long responseCode, String responseMessage) {
this.responseCode = responseCode;
this.responseMessage = responseMessage;
}

/** Default Ctor for Jackson */
public ChannelMigrateOffsetTokenResponseDTO() {}

@JsonProperty("responseCode")
public long getResponseCode() {
return responseCode;
}

@JsonProperty("responseMessage")
public String getResponseMessage() {
return responseMessage;
}

@Override
public String toString() {
return "ChannelMigrateOffsetTokenResponseDTO{"
+ "responseCode="
+ responseCode
+ ", responseMessage='"
+ responseMessage
+ '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (c) 2023 Snowflake Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.snowflake.kafka.connector.internal.streaming;

import com.google.common.annotations.VisibleForTesting;

/**
* Response code sent from the system function to migrate offsets from Source to Destination
* Channel. Please keep this code(values) in sync with what the server side is assigned.
*/
public enum ChannelMigrationResponseCode {
ERR_TABLE_DOES_NOT_EXIST_NOT_AUTHORIZED(
4, "The supplied table does not exist or is not authorized"),

SUCCESS(50, "Success"),

OFFSET_MIGRATION_SOURCE_CHANNEL_DOES_NOT_EXIST(
51, "Source Channel does not exist for Offset Migration"),

CHANNEL_OFFSET_TOKEN_MIGRATION_GENERAL_EXCEPTION(
52, "Snowflake experienced a transient exception, please retry the migration request."),

OFFSET_MIGRATION_SOURCE_AND_DESTINATION_CHANNEL_SAME(
53, "Source and Destination Channel are same for Migration Offset Request"),
;

private final long statusCode;

private final String message;

public static final String UNKNOWN_STATUS_MESSAGE =
"Unknown status message. Please contact Snowflake support for further assistance";

ChannelMigrationResponseCode(int statusCode, String message) {
this.statusCode = statusCode;
this.message = message;
}

@VisibleForTesting
public long getStatusCode() {
return statusCode;
}

@VisibleForTesting
public String getMessage() {
return message;
}

public static String getMessageByCode(Long statusCode) {
if (statusCode != null) {
for (ChannelMigrationResponseCode code : values()) {
if (code.statusCode == statusCode) {
return code.message;
}
}
}
return UNKNOWN_STATUS_MESSAGE;
}

/**
* Given a response code which was received from server side, check if it as successful migration
* or a failure.
*
* @param statusCodeToCheck response from JDBC system function call.
* @return true or false
*/
private static boolean isStatusCodeSuccessful(final long statusCodeToCheck) {
return statusCodeToCheck == SUCCESS.getStatusCode()
|| statusCodeToCheck == OFFSET_MIGRATION_SOURCE_CHANNEL_DOES_NOT_EXIST.getStatusCode();
}

/**
* Given a Response DTO, which was received from server side and serialized, check if it as
* successful migration or a failure.
*
* @param channelMigrateOffsetTokenResponseDTO response from JDBC system function call serialized
* into a DTO object.
* @return true or false
*/
public static boolean isChannelMigrationResponseSuccessful(
final ChannelMigrateOffsetTokenResponseDTO channelMigrateOffsetTokenResponseDTO) {
return isStatusCodeSuccessful(channelMigrateOffsetTokenResponseDTO.getResponseCode());
}
}
Loading

0 comments on commit 6edd211

Please sign in to comment.