diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java index 5bf595538..cdc98b4c3 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java @@ -134,6 +134,7 @@ public class SnowflakeSinkConnectorConfig { "snowflake.streaming.enable.single.buffer"; public static final boolean SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER_DEFAULT = false; + public static final boolean SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER_ICEBERG_DEFAULT = true; public static final String SNOWPIPE_STREAMING_MAX_CLIENT_LAG = "snowflake.streaming.max.client.lag"; @@ -253,6 +254,18 @@ public static void setDefaultValues(Map config) { config, BUFFER_FLUSH_TIME_SEC, BUFFER_FLUSH_TIME_SEC_DEFAULT, "seconds"); if (isSnowpipeStreamingIngestion(config)) { + setSingleBufferDefaultValue(config); + } + } + + private static void setSingleBufferDefaultValue(Map config) { + if (Utils.isIcebergEnabled(config)) { + setFieldToDefaultValues( + config, + SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, + SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER_ICEBERG_DEFAULT, + ""); + } else { setFieldToDefaultValues( config, SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, diff --git a/src/main/java/com/snowflake/kafka/connector/config/IcebergConfigValidator.java b/src/main/java/com/snowflake/kafka/connector/config/IcebergConfigValidator.java index 670ec7508..c195a21cd 100644 --- a/src/main/java/com/snowflake/kafka/connector/config/IcebergConfigValidator.java +++ b/src/main/java/com/snowflake/kafka/connector/config/IcebergConfigValidator.java @@ -4,6 +4,7 @@ import static com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig.SNOWPIPE_STREAMING; import com.google.common.collect.ImmutableMap; +import com.snowflake.kafka.connector.internal.parameters.InternalBufferParameters; import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; import com.snowflake.kafka.connector.internal.streaming.StreamingConfigValidator; import java.util.HashMap; @@ -14,6 +15,11 @@ public class IcebergConfigValidator implements StreamingConfigValidator { private static final String INCOMPATIBLE_INGESTION_METHOD = "Ingestion to Iceberg table is supported only for Snowpipe Streaming"; + private static final String DOUBLE_BUFFER_NOT_SUPPORTED = + "Ingestion to Iceberg table is supported only with " + + SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER + + " enabled."; + @Override public ImmutableMap validate(Map inputConfig) { boolean isIcebergEnabled = Boolean.parseBoolean(inputConfig.get(ICEBERG_ENABLED)); @@ -31,6 +37,10 @@ public ImmutableMap validate(Map inputConfig) { validationErrors.put(INGESTION_METHOD_OPT, INCOMPATIBLE_INGESTION_METHOD); } + if (!InternalBufferParameters.isSingleBufferEnabled(inputConfig)) { + validationErrors.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, DOUBLE_BUFFER_NOT_SUPPORTED); + } + return ImmutableMap.copyOf(validationErrors); } } diff --git a/src/test/java/com/snowflake/kafka/connector/config/IcebergConfigValidationTest.java b/src/test/java/com/snowflake/kafka/connector/config/IcebergConfigValidationTest.java index 6bcd570d0..aaf9dc3f7 100644 --- a/src/test/java/com/snowflake/kafka/connector/config/IcebergConfigValidationTest.java +++ b/src/test/java/com/snowflake/kafka/connector/config/IcebergConfigValidationTest.java @@ -1,6 +1,7 @@ package com.snowflake.kafka.connector.config; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER; import com.google.common.collect.ImmutableMap; import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; @@ -52,6 +53,11 @@ public static Stream invalidConfigs() { SnowflakeSinkConnectorConfigBuilder.icebergConfig() .withIngestionMethod(IngestionMethodConfig.SNOWPIPE) .build(), - INGESTION_METHOD_OPT)); + INGESTION_METHOD_OPT), + Arguments.of( + SnowflakeSinkConnectorConfigBuilder.icebergConfig() + .withSingleBufferEnabled(false) + .build(), + SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER)); } } diff --git a/src/test/java/com/snowflake/kafka/connector/config/SnowflakeSinkConnectorConfigBuilder.java b/src/test/java/com/snowflake/kafka/connector/config/SnowflakeSinkConnectorConfigBuilder.java index c43ed876a..9b67c26b2 100644 --- a/src/test/java/com/snowflake/kafka/connector/config/SnowflakeSinkConnectorConfigBuilder.java +++ b/src/test/java/com/snowflake/kafka/connector/config/SnowflakeSinkConnectorConfigBuilder.java @@ -2,6 +2,7 @@ import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER; import static com.snowflake.kafka.connector.Utils.*; import static com.snowflake.kafka.connector.Utils.SF_DATABASE; @@ -34,6 +35,7 @@ public static SnowflakeSinkConnectorConfigBuilder icebergConfig() { .withIcebergEnabled() .withIngestionMethod(IngestionMethodConfig.SNOWPIPE_STREAMING) .withSchematizationEnabled(true) + .withSingleBufferEnabled(true) // default value for iceberg .withRole("role"); } @@ -124,6 +126,11 @@ public SnowflakeSinkConnectorConfigBuilder withChannelOffsetTokenVerificationFun return this; } + public SnowflakeSinkConnectorConfigBuilder withSingleBufferEnabled(boolean enabled) { + config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, Boolean.toString(enabled)); + return this; + } + public Map build() { return config; }