Skip to content

Commit

Permalink
Introduce new config enable.streaming.channel.offset.verification for…
Browse files Browse the repository at this point in the history
… enabling token verification function
  • Loading branch information
sfc-gh-achyzy committed Oct 26, 2024
1 parent 785a6ea commit 1cada6f
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,14 @@ public String validateConfig(Map<String, String> config) {
"Streaming client Channel migration is only available with {}.",
IngestionMethodConfig.SNOWPIPE_STREAMING.toString()));
}
if (config.containsKey(
SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG)) {
invalidConfigParams.put(
SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG,
Utils.formatString(
"Streaming channel offset verification function is only available with {}.",
IngestionMethodConfig.SNOWPIPE_STREAMING.toString()));
}
}

if (config.containsKey(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,15 @@ public class SnowflakeSinkConnectorConfig {
+ " format is deprecated and V1 will be used always, disabling this config could have"
+ " ramifications. Please consult Snowflake support before setting this to false.";

public static final String ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG = "enable.streaming.channel.offset.verification";
public static final String ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_DISPLAY =
"Enable streaming channel offset verification function";
public static final boolean ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_DEFAULT = true;
public static final String ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_DOC =
"Whether to enable streaming channel offset verification function. The function checks only for incremental offsets" +
" (might contain gaps) and might signal false positives in case of SMT. " +
"Can only be set if Streaming Snowpipe is enabled";

public static final String ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS =
"enable.task.fail.on.authorization.errors";
public static final boolean ENABLE_TASK_FAIL_ON_AUTHORIZATION_ERRORS_DEFAULT = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,16 @@ public static ConfigDef getConfig() {
ICEBERG_ENABLED_DEFAULT_VALUE,
ConfigDef.Importance.HIGH,
"When set to true the connector will ingest data into the Iceberg table. Check the"
+ " official Snowflake documentation for the prerequisites.");
+ " official Snowflake documentation for the prerequisites.")
.define(
ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG,
ConfigDef.Type.BOOLEAN,
ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_DEFAULT,
ConfigDef.Importance.LOW,
ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_DOC,
CONNECTOR_CONFIG_DOC,
11,
ConfigDef.Width.NONE,
ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_DISPLAY);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ public ImmutableMap<String, String> validate(Map<String, String> inputConfig) {
ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG,
inputConfig.get(ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG));
}
if (inputConfig.containsKey(ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG)) {
BOOLEAN_VALIDATOR.ensureValid(
ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG,
inputConfig.get(ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG));
}

if (inputConfig.containsKey(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER)) {
BOOLEAN_VALIDATOR.ensureValid(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,4 +165,5 @@ public static boolean logErrors(Map<String, String> sfConnectorConfig) {
public static String getDlqTopicName(Map<String, String> sfConnectorConfig) {
return sfConnectorConfig.getOrDefault(ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,48 @@ public void testEnableStreamingChannelMigrationConfig_invalidWithSnowpipe() {
SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG);
}

@Test
public void testEnableStreamingChannelOffsetVerificationConfig() {
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
config.put(SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG, "true");

connectorConfigValidator.validateConfig(config);
}

@Test
public void testEnableStreamingChannelOffsetVerificationConfig_invalidWithSnowpipe() {
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE.toString());
config.put(SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG, "true");

assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
.hasMessageContaining(
SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG);
}

@Test
public void
testEnableStreamingChannelOffsetVerificationConfig_invalidBooleanValue_WithSnowpipeStreaming() {
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
config.put(
SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG, "INVALID");
assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
.hasMessageContaining(
SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG);
}

@Test
public void testStreamingProviderOverrideConfig_invalidWithSnowpipe() {
Map<String, String> config = getConfig();
Expand Down

0 comments on commit 1cada6f

Please sign in to comment.