Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PROD-39429 Implement migrate sys func from new channel(Format V2) to old channel (V1) - Push to Main #751

Merged
merged 11 commits into from
Nov 22, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,15 @@ public class SnowflakeSinkConnectorConfig {
"Whether to optimize the streaming client to reduce cost. Note that this may affect"
+ " throughput or latency and can only be set if Streaming Snowpipe is enabled";

public static final String ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG =
"enable.streaming.channel.offset.migration";
public static final String ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DISPLAY =
"Enable Streaming Channel Offset Migration";
public static final boolean ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DEFAULT = true;
public static final String ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DOC =
"This config is used to enable/disable streaming channel offset migration logic. If true, we"
+ " will migrate offset token from channel name format V2 to name format v1.";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: lets add something about how V2 is deprecated. Otherwise customers may disable this because V2 sounds fancier than V1

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, let me add!


// MDC logging header
public static final String ENABLE_MDC_LOGGING_CONFIG = "enable.mdc.logging";
public static final String ENABLE_MDC_LOGGING_DISPLAY = "Enable MDC logging";
Expand Down Expand Up @@ -591,7 +600,17 @@ static ConfigDef newConfigDef() {
CONNECTOR_CONFIG,
8,
ConfigDef.Width.NONE,
ENABLE_MDC_LOGGING_DISPLAY);
ENABLE_MDC_LOGGING_DISPLAY)
.define(
ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG,
Type.BOOLEAN,
ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DEFAULT,
Importance.LOW,
ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DOC,
CONNECTOR_CONFIG,
9,
ConfigDef.Width.NONE,
ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DISPLAY);
}

public static class TopicToTableValidator implements ConfigDef.Validator {
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/com/snowflake/kafka/connector/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,14 @@ static String validateConfig(Map<String, String> config) {
"Streaming client optimization is only available with {}.",
IngestionMethodConfig.SNOWPIPE_STREAMING.toString()));
}
if (config.containsKey(
SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG)) {
invalidConfigParams.put(
SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG,
Utils.formatString(
"Streaming client Channel migration is only available with {}.",
IngestionMethodConfig.SNOWPIPE_STREAMING.toString()));
}
}

if (config.containsKey(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,4 +282,28 @@ public interface SnowflakeConnectionService {
* @param tableName table name
*/
void createTableWithOnlyMetadataColumn(String tableName);

/**
* Migrate Streaming Channel offsetToken from a source Channel to a destination channel.
*
* <p>Here, source channel is the new channel format we created here * @see <a
* href="https://github.com/snowflakedb/snowflake-kafka-connector/commit/3bf9106b22510c62068f7d2f7137b9e57989274c">Commit
* </a>
*
* <p>Destination channel is the original Format containing only topicName and partition number.
*
* <p>We catch SQLException and JsonProcessingException that might happen in this method. The
* caller should always open the Old Channel format. This old channel format will also be the key
* to many HashMaps we will create. (For instance {@link
* com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2#partitionsToChannel})
*
* @param tableName Name of the table
* @param sourceChannelName sourceChannel name from where the offset Token will be fetched.
* Channel with this name will also be deleted.
* @param destinationChannelName destinationChannel name to where the offsetToken will be copied
* over.
* @return Whether the migration was successful or a failure.
*/
boolean migrateStreamingChannelOffsetToken(
String tableName, String sourceChannelName, String destinationChannelName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_CONTENT;
import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_METADATA;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.internal.streaming.ChannelMigrateOffsetTokenResponseDTO;
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig;
import com.snowflake.kafka.connector.internal.streaming.SchematizationUtils;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
Expand Down Expand Up @@ -56,6 +59,8 @@ public class SnowflakeConnectionServiceV1 implements SnowflakeConnectionService
// User agent suffix we want to pass in to ingest service
public static final String USER_AGENT_SUFFIX_FORMAT = "SFKafkaConnector/%s provider/%s";

private final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

SnowflakeConnectionServiceV1(
Properties prop,
SnowflakeURL url,
Expand Down Expand Up @@ -1006,4 +1011,60 @@ public Connection getConnection() {
public SnowflakeInternalStage getInternalStage() {
return this.internalStage;
}

@Override
public boolean migrateStreamingChannelOffsetToken(
String tableName, String sourceChannelName, String destinationChannelName) {
InternalUtils.assertNotEmpty("tableName", tableName);
InternalUtils.assertNotEmpty("sourceChannelName", sourceChannelName);
InternalUtils.assertNotEmpty("destinationChannelName", destinationChannelName);
String fullyQualifiedTableName =
prop.getProperty(InternalUtils.JDBC_DATABASE)
+ "."
+ prop.getProperty(InternalUtils.JDBC_SCHEMA)
+ "."
+ tableName;
String query = "select SYSTEM$SNOWPIPE_STREAMING_MIGRATE_CHANNEL_OFFSET_TOKEN((?), (?), (?));";

try {
PreparedStatement stmt = conn.prepareStatement(query);
stmt.setString(1, fullyQualifiedTableName);
stmt.setString(2, sourceChannelName);
stmt.setString(3, destinationChannelName);
ResultSet resultSet = stmt.executeQuery();

String migrateOffsetTokenResultFromSysFunc = null;
if (resultSet.next()) {
migrateOffsetTokenResultFromSysFunc = resultSet.getString(1 /*Only one column*/);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of scope for this PR - do we have any guarantees that this DTO won't change on the server side? a comment, test or something server side to warn against changes since we now throw the exception on failure

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, i think the json exception will help with that.. I think I have the comment on server side saying be cautious to make changes in the interface. but let me try to mimick and see what happens if response is changed. I could probably handle new fields in the object mapper.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved the method to its own method so that we can test it. If the response changes from server side which shouldnt, this tests methods will start failing

}
if (migrateOffsetTokenResultFromSysFunc == null) {
LOGGER.warn(
"No result found in Migrating OffsetToken through System Function for tableName:{},"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does it mean by no result found? No destination channel or source channel found? or both?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is not expected, should we throw an exception?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not expected and I thought we decided to not throw any exceptions/swallow all and continue using old channel.

I can see the concern though but at this point I feel it is about whether we want to halt ingestion or atleast ignore the exception and continue moving forward with old. Halting ingestion could be better and it helps us know something is wrong rather than continuing with old channel with ramifications we dont.
WDYT Toby?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that its better to swallow this exception here, but we should have something to track an error here - maybe an alert on the server side if we don't return an offset on the func call, or a new telemetry event similar to reportKafkaFatalError (if we aren't expecting too many hits on this error)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, both..
I think we should throw an exception if this is not expected.. I can add a server side incident and also add report telemetry..

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For an unexpected exception, I am throwing a runtime exception. PTAL! It's better to fail here IMO. Thanks folks!

+ " sourceChannel:{}, destinationChannel:{}",
fullyQualifiedTableName,
sourceChannelName,
destinationChannelName);
return false;
}

ChannelMigrateOffsetTokenResponseDTO channelMigrateOffsetTokenResponseDTO =
OBJECT_MAPPER.readValue(
migrateOffsetTokenResultFromSysFunc, ChannelMigrateOffsetTokenResponseDTO.class);
LOGGER.info(
"Migrate OffsetToken response for table:{}, sourceChannel:{}, destinationChannel:{}"
+ " is:{}",
tableName,
sourceChannelName,
destinationChannelName,
channelMigrateOffsetTokenResponseDTO);
Comment on lines +1054 to +1060
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log might be confusing to customer, could we only log when there is actually a migration being done? Or do we even need this client side log given that we have a bunch of server side logging in place?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be fine, this is only once per channel during open partition, not very often. I would like to keep this which helps us in debugging any customer issue.

return true;
} catch (SQLException | JsonProcessingException e) {
LOGGER.error(
sfc-gh-japatel marked this conversation as resolved.
Show resolved Hide resolved
"Migrating OffsetToken for a SourceChannel:{} in table:{} failed due to:{}",
sourceChannelName,
fullyQualifiedTableName,
e.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getMessage might be NULL for some exceptions, could we do better?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be worth logging stacktrace too. let me add that

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

return false;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.snowflake.kafka.connector.internal.streaming;
sfc-gh-japatel marked this conversation as resolved.
Show resolved Hide resolved

import com.fasterxml.jackson.annotation.JsonProperty;

/**
* POJO used to serialize the System function response for migration offset from Source Channel to
* Destination.
*/
public class ChannelMigrateOffsetTokenResponseDTO {
private long responseCode;

private String responseMessage;

public ChannelMigrateOffsetTokenResponseDTO(long responseCode, String responseMessage) {
this.responseCode = responseCode;
this.responseMessage = responseMessage;
}

/** Default Ctor for Jackson */
public ChannelMigrateOffsetTokenResponseDTO() {}

@JsonProperty("responseCode")
public long getResponseCode() {
return responseCode;
}

@JsonProperty("responseMessage")
public String getResponseMessage() {
return responseMessage;
}

@Override
public String toString() {
return "ChannelMigrateOffsetTokenResponseDTO{"
+ "responseCode="
+ responseCode
+ ", responseMessage='"
+ responseMessage
+ '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public class SnowflakeSinkServiceV2 implements SnowflakeSinkService {
private boolean enableSchematization;

/**
* Key is formulated in {@link #partitionChannelKey(String, String, int)} }
* Key is formulated in {@link #partitionChannelKey(String, int)} }
*
* <p>value is the Streaming Ingest Channel implementation (Wrapped around TopicPartitionChannel)
*/
Expand Down Expand Up @@ -235,8 +235,7 @@ private void createStreamingChannelForTopicPartition(
final TopicPartition topicPartition,
boolean hasSchemaEvolutionPermission) {
final String partitionChannelKey =
partitionChannelKey(
conn.getConnectorName(), topicPartition.topic(), topicPartition.partition());
partitionChannelKey(topicPartition.topic(), topicPartition.partition());
// Create new instance of TopicPartitionChannel which will always open the channel.
partitionsToChannel.put(
partitionChannelKey,
Expand Down Expand Up @@ -295,8 +294,7 @@ public void insert(Collection<SinkRecord> records) {
*/
@Override
public void insert(SinkRecord record) {
String partitionChannelKey =
partitionChannelKey(this.conn.getConnectorName(), record.topic(), record.kafkaPartition());
String partitionChannelKey = partitionChannelKey(record.topic(), record.kafkaPartition());
// init a new topic partition if it's not presented in cache or if channel is closed
if (!partitionsToChannel.containsKey(partitionChannelKey)
|| partitionsToChannel.get(partitionChannelKey).isChannelClosed()) {
Expand All @@ -316,8 +314,7 @@ public void insert(SinkRecord record) {
@Override
public long getOffset(TopicPartition topicPartition) {
String partitionChannelKey =
partitionChannelKey(
conn.getConnectorName(), topicPartition.topic(), topicPartition.partition());
partitionChannelKey(topicPartition.topic(), topicPartition.partition());
if (partitionsToChannel.containsKey(partitionChannelKey)) {
long offset = partitionsToChannel.get(partitionChannelKey).getOffsetSafeToCommitToKafka();
partitionsToChannel.get(partitionChannelKey).setLatestConsumerOffset(offset);
Expand Down Expand Up @@ -371,8 +368,7 @@ public void close(Collection<TopicPartition> partitions) {
partitions.forEach(
topicPartition -> {
final String partitionChannelKey =
partitionChannelKey(
conn.getConnectorName(), topicPartition.topic(), topicPartition.partition());
partitionChannelKey(topicPartition.topic(), topicPartition.partition());
TopicPartitionChannel topicPartitionChannel =
partitionsToChannel.get(partitionChannelKey);
// Check for null since it's possible that the something goes wrong even before the
Expand All @@ -382,7 +378,7 @@ public void close(Collection<TopicPartition> partitions) {
}
LOGGER.info(
"Closing partitionChannel:{}, partition:{}, topic:{}",
topicPartitionChannel == null ? null : topicPartitionChannel.getChannelName(),
topicPartitionChannel == null ? null : topicPartitionChannel.getChannelNameFormatV1(),
topicPartition.topic(),
topicPartition.partition());
partitionsToChannel.remove(partitionChannelKey);
Expand Down Expand Up @@ -521,17 +517,13 @@ public Optional<MetricRegistry> getMetricRegistry(String partitionChannelKey) {
/**
* Gets a unique identifier consisting of connector name, topic name and partition number.
*
* @param connectorName Connector name is always unique. (Two connectors with same name won't be
* allowed by Connector Framework)
* <p>Note: Customers can have same named connector in different connector runtimes (Like DEV
* or PROD)
* @param topic topic name
* @param partition partition number
* @return combinartion of topic and partition
*/
@VisibleForTesting
public static String partitionChannelKey(String connectorName, String topic, int partition) {
return connectorName + "_" + topic + "_" + partition;
public static String partitionChannelKey(String topic, int partition) {
return topic + "_" + partition;
}

/* Used for testing */
Expand Down
Loading
Loading