Skip to content

Commit

Permalink
Config builder
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-mbobowski committed Oct 28, 2024
1 parent 959b74e commit cc3d6d3
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -840,28 +840,26 @@ public void testEnableStreamingChannelMigrationConfig_invalidWithSnowpipe() {

@Test
public void testEnableStreamingChannelOffsetVerificationConfig() {
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
config.put(
SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG,
"true");
// given
Map<String, String> config =
SnowflakeSinkConnectorConfigBuilder.streamingConfig()
.withRole("ACCOUNTADMIN")
.withChannelOffsetTokenVerificationFunctionEnabled(true)
.build();

// when, then
connectorConfigValidator.validateConfig(config);
}

@Test
public void testEnableStreamingChannelOffsetVerificationConfig_invalidWithSnowpipe() {
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE.toString());
config.put(
SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG,
"true");
// given
Map<String, String> config =
SnowflakeSinkConnectorConfigBuilder.snowpipeConfig()
.withChannelOffsetTokenVerificationFunctionEnabled(true)
.build();

// when, then
assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
.hasMessageContaining(
Expand All @@ -871,14 +869,13 @@ public void testEnableStreamingChannelOffsetVerificationConfig_invalidWithSnowpi
@Test
public void
testEnableStreamingChannelOffsetVerificationConfig_invalidBooleanValue_WithSnowpipeStreaming() {
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
// given
Map<String, String> config = SnowflakeSinkConnectorConfigBuilder.streamingConfig().build();
config.put(
SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG,
"INVALID");

// when, then
assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
.hasMessageContaining(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.snowflake.kafka.connector.config;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG;
import static com.snowflake.kafka.connector.Utils.*;
import static com.snowflake.kafka.connector.Utils.SF_DATABASE;
Expand All @@ -24,6 +25,10 @@ public static SnowflakeSinkConnectorConfigBuilder snowpipeConfig() {
return commonRequiredFields().withIngestionMethod(IngestionMethodConfig.SNOWPIPE);
}

public static SnowflakeSinkConnectorConfigBuilder streamingConfig() {
return commonRequiredFields().withIngestionMethod(IngestionMethodConfig.SNOWPIPE_STREAMING);
}

public static SnowflakeSinkConnectorConfigBuilder icebergConfig() {
return commonRequiredFields()
.withIcebergEnabled()
Expand Down Expand Up @@ -113,6 +118,12 @@ public SnowflakeSinkConnectorConfigBuilder withSchematizationEnabled(boolean ena
return this;
}

public SnowflakeSinkConnectorConfigBuilder withChannelOffsetTokenVerificationFunctionEnabled(
boolean enabled) {
config.put(ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG, Boolean.toString(enabled));
return this;
}

public Map<String, String> build() {
return config;
}
Expand Down

0 comments on commit cc3d6d3

Please sign in to comment.