Skip to content

Commit

Permalink
Add javadocs, revert to use profile.json
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-japatel committed Nov 16, 2023
1 parent 129ddcc commit c985826
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public class SnowflakeSinkConnectorConfig {
public static final String ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG =
"enable.streaming.channel.offset.migration";
public static final String ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DISPLAY =
"Enable streaming Channel Offset Migration";
"Enable Streaming Channel Offset Migration";
public static final boolean ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DEFAULT = true;
public static final String ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DOC =
"This config is used to enable/disable streaming channel offset migration logic. If true, we"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,10 @@ public interface SnowflakeConnectionService {
*
* <p>Destination channel is the original Format containing only topicName and partition number.
*
* <p>We catch all exception that might happen in this method. The caller should always open the
* Old Channel format. This old channel format will also be the key to many HashMaps we will
* create/
* <p>We catch SQLException and JsonProcessingException that might happen in this method. The
* caller should always open the Old Channel format. This old channel format will also be the key
* to many HashMaps we will create. (For instance {@link
* com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2#partitionsToChannel})
*
* @param tableName Name of the table
* @param sourceChannelName sourceChannel name from where the offset Token will be fetched.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_CONTENT;
import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_METADATA;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.internal.streaming.ChannelMigrateOffsetTokenResponseDTO;
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig;
import com.snowflake.kafka.connector.internal.streaming.SchematizationUtils;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
Expand Down Expand Up @@ -56,6 +59,8 @@ public class SnowflakeConnectionServiceV1 implements SnowflakeConnectionService
// User agent suffix we want to pass in to ingest service
public static final String USER_AGENT_SUFFIX_FORMAT = "SFKafkaConnector/%s provider/%s";

private final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

SnowflakeConnectionServiceV1(
Properties prop,
SnowflakeURL url,
Expand Down Expand Up @@ -1005,4 +1010,60 @@ public Connection getConnection() {
public SnowflakeInternalStage getInternalStage() {
return this.internalStage;
}

@Override
public boolean migrateStreamingChannelOffsetToken(
String tableName, String sourceChannelName, String destinationChannelName) {
InternalUtils.assertNotEmpty("tableName", tableName);
InternalUtils.assertNotEmpty("sourceChannelName", sourceChannelName);
InternalUtils.assertNotEmpty("destinationChannelName", destinationChannelName);
String fullyQualifiedTableName =
prop.getProperty(InternalUtils.JDBC_DATABASE)
+ "."
+ prop.getProperty(InternalUtils.JDBC_SCHEMA)
+ "."
+ tableName;
String query = "select SYSTEM$SNOWPIPE_STREAMING_MIGRATE_CHANNEL_OFFSET_TOKEN((?), (?), (?));";

try {
PreparedStatement stmt = conn.prepareStatement(query);
stmt.setString(1, fullyQualifiedTableName);
stmt.setString(2, sourceChannelName);
stmt.setString(3, destinationChannelName);
ResultSet resultSet = stmt.executeQuery();

String migrateOffsetTokenResultFromSysFunc = null;
if (resultSet.next()) {
migrateOffsetTokenResultFromSysFunc = resultSet.getString(1 /*Only one column*/);
}
if (migrateOffsetTokenResultFromSysFunc == null) {
LOGGER.warn(
"No result found in Migrating OffsetToken through System Function for tableName:{},"
+ " sourceChannel:{}, destinationChannel:{}",
fullyQualifiedTableName,
sourceChannelName,
destinationChannelName);
return false;
}

ChannelMigrateOffsetTokenResponseDTO channelMigrateOffsetTokenResponseDTO =
OBJECT_MAPPER.readValue(
migrateOffsetTokenResultFromSysFunc, ChannelMigrateOffsetTokenResponseDTO.class);
LOGGER.info(
"Migrate OffsetToken response for table:{}, sourceChannel:{}, destinationChannel:{}"
+ " is:{}",
tableName,
sourceChannelName,
destinationChannelName,
channelMigrateOffsetTokenResponseDTO);
return true;
} catch (SQLException | JsonProcessingException e) {
LOGGER.error(
"Migrating OffsetToken for a SourceChannel:{} in table:{} failed due to:{}",
sourceChannelName,
fullyQualifiedTableName,
e.getMessage());
return false;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.snowflake.kafka.connector.internal.streaming;

import com.fasterxml.jackson.annotation.JsonProperty;

/**
* POJO used to serialize the System function response for migration offset from Source Channel to
* Destination.
*/
public class ChannelMigrateOffsetTokenResponseDTO {
private long responseCode;

private String responseMessage;

public ChannelMigrateOffsetTokenResponseDTO(long responseCode, String responseMessage) {
this.responseCode = responseCode;
this.responseMessage = responseMessage;
}

/** Default Ctor for Jackson */
public ChannelMigrateOffsetTokenResponseDTO() {}

@JsonProperty("responseCode")
public long getResponseCode() {
return responseCode;
}

@JsonProperty("responseMessage")
public String getResponseMessage() {
return responseMessage;
}

@Override
public String toString() {
return "ChannelMigrateOffsetTokenResponseDTO{"
+ "responseCode="
+ responseCode
+ ", responseMessage='"
+ responseMessage
+ '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.snowflake.kafka.connector.internal.streaming;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.*;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DEFAULT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_TOLERANCE_CONFIG;
import static com.snowflake.kafka.connector.internal.streaming.StreamingUtils.DURATION_BETWEEN_GET_OFFSET_TOKEN_RETRY;
import static com.snowflake.kafka.connector.internal.streaming.StreamingUtils.MAX_GET_OFFSET_TOKEN_RETRIES;
import static java.time.temporal.ChronoUnit.SECONDS;
Expand Down Expand Up @@ -349,6 +352,17 @@ private boolean isEnableChannelOffsetMigration(Map<String, String> sfConnectorCo
return isEnableChannelOffsetMigration;
}

/**
* This is the new channel Name format that was created. New channel name prefixes connector name
* in old format. Please note, we will not open channel with new format. We will run a migration
* function from this new channel format to old channel format and drop new channel format.
*
* @param channelNameFormatV1 Original format used.
* @param connectorName connector name used in SF config JSON.
* @return new channel name introduced as part of @see <a
* href="https://github.com/snowflakedb/snowflake-kafka-connector/commit/3bf9106b22510c62068f7d2f7137b9e57989274c">
* this change (released in version 2.1.0) </a>
*/
@VisibleForTesting
protected String generateChannelNameFormatV2(String channelNameFormatV1, String connectorName) {
return connectorName + "_" + channelNameFormatV1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,29 +253,29 @@ public void testStreamingChannelMigrationEnabledAndDisabled() {

Mockito.when(mockStreamingChannel.getFullyQualifiedName()).thenReturn(TEST_CHANNEL_NAME);
Mockito.when(
mockSnowflakeConnectionService.migrateStreamingChannelOffsetToken(
anyString(), anyString(), Mockito.anyString()))
.thenReturn(true);
mockSnowflakeConnectionService.migrateStreamingChannelOffsetToken(
anyString(), anyString(), Mockito.anyString()))
.thenReturn(true);

// checking default
TopicPartitionChannel topicPartitionChannel =
new TopicPartitionChannel(
mockStreamingClient,
topicPartition,
TEST_CHANNEL_NAME,
TEST_TABLE_NAME,
true,
streamingBufferThreshold,
sfConnectorConfig,
mockKafkaRecordErrorReporter,
mockSinkTaskContext,
mockSnowflakeConnectionService,
new RecordService(mockTelemetryService),
mockTelemetryService,
false,
null);
new TopicPartitionChannel(
mockStreamingClient,
topicPartition,
TEST_CHANNEL_NAME,
TEST_TABLE_NAME,
true,
streamingBufferThreshold,
sfConnectorConfig,
mockKafkaRecordErrorReporter,
mockSinkTaskContext,
mockSnowflakeConnectionService,
new RecordService(mockTelemetryService),
mockTelemetryService,
false,
null);
Mockito.verify(mockSnowflakeConnectionService, Mockito.times(1))
.migrateStreamingChannelOffsetToken(anyString(), anyString(), anyString());
.migrateStreamingChannelOffsetToken(anyString(), anyString(), anyString());

Map<String, String> customSfConfig = new HashMap<>(sfConnectorConfig);
customSfConfig.put(ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG, "true");
Expand All @@ -299,28 +299,28 @@ public void testStreamingChannelMigrationEnabledAndDisabled() {
Mockito.verify(mockSnowflakeConnectionService, Mockito.times(2))
.migrateStreamingChannelOffsetToken(anyString(), anyString(), anyString());


customSfConfig.put(ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG, "false");
SnowflakeConnectionService anotherMockForParamDisabled = Mockito.mock(SnowflakeConnectionService.class);
SnowflakeConnectionService anotherMockForParamDisabled =
Mockito.mock(SnowflakeConnectionService.class);

topicPartitionChannel =
new TopicPartitionChannel(
mockStreamingClient,
topicPartition,
TEST_CHANNEL_NAME,
TEST_TABLE_NAME,
true,
streamingBufferThreshold,
customSfConfig,
mockKafkaRecordErrorReporter,
mockSinkTaskContext,
anotherMockForParamDisabled,
new RecordService(mockTelemetryService),
mockTelemetryService,
false,
null);
new TopicPartitionChannel(
mockStreamingClient,
topicPartition,
TEST_CHANNEL_NAME,
TEST_TABLE_NAME,
true,
streamingBufferThreshold,
customSfConfig,
mockKafkaRecordErrorReporter,
mockSinkTaskContext,
anotherMockForParamDisabled,
new RecordService(mockTelemetryService),
mockTelemetryService,
false,
null);
Mockito.verify(anotherMockForParamDisabled, Mockito.times(0))
.migrateStreamingChannelOffsetToken(anyString(), anyString(), anyString());
.migrateStreamingChannelOffsetToken(anyString(), anyString(), anyString());
}

/* Only SFExceptions are retried and goes into fallback. */
Expand Down

0 comments on commit c985826

Please sign in to comment.