Skip to content

Commit

Permalink
Code formatting fix
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-achyzy committed Oct 26, 2024
1 parent 1cada6f commit 959b74e
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,10 @@ public String validateConfig(Map<String, String> config) {
IngestionMethodConfig.SNOWPIPE_STREAMING.toString()));
}
if (config.containsKey(
SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG)) {
SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG)) {
invalidConfigParams.put(
SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG,
Utils.formatString(
SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG,
Utils.formatString(
"Streaming channel offset verification function is only available with {}.",
IngestionMethodConfig.SNOWPIPE_STREAMING.toString()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,15 @@ public class SnowflakeSinkConnectorConfig {
+ " format is deprecated and V1 will be used always, disabling this config could have"
+ " ramifications. Please consult Snowflake support before setting this to false.";

public static final String ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG = "enable.streaming.channel.offset.verification";
public static final String ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG =
"enable.streaming.channel.offset.verification";
public static final String ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_DISPLAY =
"Enable streaming channel offset verification function";
"Enable streaming channel offset verification function";
public static final boolean ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_DEFAULT = true;
public static final String ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_DOC =
"Whether to enable streaming channel offset verification function. The function checks only for incremental offsets" +
" (might contain gaps) and might signal false positives in case of SMT. " +
"Can only be set if Streaming Snowpipe is enabled";
"Whether to enable streaming channel offset verification function. The function checks only"
+ " for incremental offsets (might contain gaps) and might signal false positives in case"
+ " of SMT. Can only be set if Streaming Snowpipe is enabled";

public static final String ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS =
"enable.task.fail.on.authorization.errors";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ public ImmutableMap<String, String> validate(Map<String, String> inputConfig) {
}
if (inputConfig.containsKey(ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG)) {
BOOLEAN_VALIDATOR.ensureValid(
ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG,
inputConfig.get(ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG));
ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG,
inputConfig.get(ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG));
}

if (inputConfig.containsKey(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class StreamingUtils {
if (prevBatchEndOffset != null && curBatchStartOffset != null) {
long curStart = Long.parseLong(curBatchStartOffset);
long prevEnd = Long.parseLong(prevBatchEndOffset);
return curStart > prevEnd;
return curStart > prevEnd;
}
return true;
};
Expand Down Expand Up @@ -165,5 +165,4 @@ public static boolean logErrors(Map<String, String> sfConnectorConfig) {
public static String getDlqTopicName(Map<String, String> sfConnectorConfig) {
return sfConnectorConfig.getOrDefault(ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -842,10 +842,12 @@ public void testEnableStreamingChannelMigrationConfig_invalidWithSnowpipe() {
public void testEnableStreamingChannelOffsetVerificationConfig() {
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
config.put(SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG, "true");
config.put(
SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG,
"true");

connectorConfigValidator.validateConfig(config);
}
Expand All @@ -854,30 +856,33 @@ public void testEnableStreamingChannelOffsetVerificationConfig() {
public void testEnableStreamingChannelOffsetVerificationConfig_invalidWithSnowpipe() {
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE.toString());
config.put(SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG, "true");
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE.toString());
config.put(
SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG,
"true");

assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
.hasMessageContaining(
SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG);
.isInstanceOf(SnowflakeKafkaConnectorException.class)
.hasMessageContaining(
SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG);
}

@Test
public void
testEnableStreamingChannelOffsetVerificationConfig_invalidBooleanValue_WithSnowpipeStreaming() {
testEnableStreamingChannelOffsetVerificationConfig_invalidBooleanValue_WithSnowpipeStreaming() {
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
config.put(
SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG, "INVALID");
SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG,
"INVALID");
assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
.hasMessageContaining(
SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG);
.isInstanceOf(SnowflakeKafkaConnectorException.class)
.hasMessageContaining(
SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG);
}

@Test
Expand Down

0 comments on commit 959b74e

Please sign in to comment.