diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java index d52b9d95a..5bf595538 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java @@ -133,7 +133,7 @@ public class SnowflakeSinkConnectorConfig { public static final String SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER = "snowflake.streaming.enable.single.buffer"; - public static final boolean SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER_DEFAULT = true; + public static final boolean SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER_DEFAULT = false; public static final String SNOWPIPE_STREAMING_MAX_CLIENT_LAG = "snowflake.streaming.max.client.lag"; diff --git a/src/test/java/com/snowflake/kafka/connector/ConnectorConfigValidatorTest.java b/src/test/java/com/snowflake/kafka/connector/ConnectorConfigValidatorTest.java index 080a5a91b..3f862a069 100644 --- a/src/test/java/com/snowflake/kafka/connector/ConnectorConfigValidatorTest.java +++ b/src/test/java/com/snowflake/kafka/connector/ConnectorConfigValidatorTest.java @@ -1,6 +1,19 @@ package com.snowflake.kafka.connector; -import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.*; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BUFFER_COUNT_RECORDS; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_LOG_ENABLE_CONFIG; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_TOLERANCE_CONFIG; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.JVM_PROXY_HOST; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.JVM_PROXY_PORT; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.NAME; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_DATABASE; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_SCHEMA; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_URL; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_USER; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_CLIENT_LAG; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES; import static com.snowflake.kafka.connector.Utils.HTTP_NON_PROXY_HOSTS; import static com.snowflake.kafka.connector.internal.TestUtils.getConfig; import static org.assertj.core.api.Assertions.*; @@ -495,7 +508,6 @@ public void testStreamingEmptyFlushTime() { IngestionMethodConfig.SNOWPIPE_STREAMING.toString()); config.put(Utils.SF_ROLE, "ACCOUNTADMIN"); config.remove(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC); - config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "false"); assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC); @@ -511,7 +523,6 @@ public void testStreamingFlushTimeSmall() { config.put( SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC, (StreamingUtils.STREAMING_BUFFER_FLUSH_TIME_MINIMUM_SEC - 1) + ""); - config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "false"); assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC); @@ -525,7 +536,6 @@ public void testStreamingFlushTimeNotNumber() { IngestionMethodConfig.SNOWPIPE_STREAMING.toString()); config.put(Utils.SF_ROLE, "ACCOUNTADMIN"); config.put(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC, "fdas"); - config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "false"); assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC); @@ -538,7 +548,6 @@ public void testStreamingEmptyBufferSize() { SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT, IngestionMethodConfig.SNOWPIPE_STREAMING.toString()); config.put(Utils.SF_ROLE, "ACCOUNTADMIN"); - config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "false"); config.remove(BUFFER_SIZE_BYTES); assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) @@ -553,7 +562,6 @@ public void testStreamingEmptyBufferCount() { IngestionMethodConfig.SNOWPIPE_STREAMING.toString()); config.put(Utils.SF_ROLE, "ACCOUNTADMIN"); config.remove(BUFFER_COUNT_RECORDS); - config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "false"); assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(BUFFER_COUNT_RECORDS); @@ -567,7 +575,6 @@ public void testStreamingBufferCountNegative() { IngestionMethodConfig.SNOWPIPE_STREAMING.toString()); config.put(Utils.SF_ROLE, "ACCOUNTADMIN"); config.put(BUFFER_COUNT_RECORDS, "-1"); - config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "false"); assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(BUFFER_COUNT_RECORDS); @@ -581,7 +588,6 @@ public void testStreamingBufferCountValue() { IngestionMethodConfig.SNOWPIPE_STREAMING.toString()); config.put(Utils.SF_ROLE, "ACCOUNTADMIN"); config.put(BUFFER_COUNT_RECORDS, "adssadsa"); - config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "false"); assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(BUFFER_COUNT_RECORDS); @@ -973,7 +979,6 @@ public void testEmptyClientId() { config.put(SnowflakeSinkConnectorConfig.AUTHENTICATOR_TYPE, Utils.OAUTH); config.put(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_SECRET, "client_secret"); config.put(SnowflakeSinkConnectorConfig.OAUTH_REFRESH_TOKEN, "refresh_token"); - config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "false"); assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config)) .isInstanceOf(SnowflakeKafkaConnectorException.class) .hasMessageContaining(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_ID); diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientPropertiesTest.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientPropertiesTest.java index 32eec045b..e7307c300 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientPropertiesTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientPropertiesTest.java @@ -89,7 +89,6 @@ void shouldNotPropagateStreamingClientProperties_SingleBufferDisabled() { connectorConfig.put(BUFFER_SIZE_BYTES, "10000000"); connectorConfig.put(SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES, "20000000"); - connectorConfig.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "false"); // WHEN StreamingClientProperties resultProperties = new StreamingClientProperties(connectorConfig);