Skip to content

Commit

Permalink
Revert "NO-SNOW Enable single buffering in Snowpipe Streaming by defa…
Browse files Browse the repository at this point in the history
…ult (#924)"

This reverts commit f0e5f23.
  • Loading branch information
sfc-gh-mbobowski committed Oct 29, 2024
1 parent 47158e7 commit b5503f7
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public class SnowflakeSinkConnectorConfig {
public static final String SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER =
"snowflake.streaming.enable.single.buffer";

public static final boolean SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER_DEFAULT = true;
public static final boolean SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER_DEFAULT = false;
public static final String SNOWPIPE_STREAMING_MAX_CLIENT_LAG =
"snowflake.streaming.max.client.lag";

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
package com.snowflake.kafka.connector;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.*;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BUFFER_COUNT_RECORDS;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_LOG_ENABLE_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_TOLERANCE_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.JVM_PROXY_HOST;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.JVM_PROXY_PORT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.NAME;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_DATABASE;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_SCHEMA;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_URL;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_USER;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_CLIENT_LAG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES;
import static com.snowflake.kafka.connector.Utils.HTTP_NON_PROXY_HOSTS;
import static com.snowflake.kafka.connector.internal.TestUtils.getConfig;
import static org.assertj.core.api.Assertions.*;
Expand Down Expand Up @@ -495,7 +508,6 @@ public void testStreamingEmptyFlushTime() {
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
config.remove(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC);
config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "false");
assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
.hasMessageContaining(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC);
Expand All @@ -511,7 +523,6 @@ public void testStreamingFlushTimeSmall() {
config.put(
SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC,
(StreamingUtils.STREAMING_BUFFER_FLUSH_TIME_MINIMUM_SEC - 1) + "");
config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "false");
assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
.hasMessageContaining(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC);
Expand All @@ -525,7 +536,6 @@ public void testStreamingFlushTimeNotNumber() {
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
config.put(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC, "fdas");
config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "false");
assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
.hasMessageContaining(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC);
Expand All @@ -538,7 +548,6 @@ public void testStreamingEmptyBufferSize() {
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "false");
config.remove(BUFFER_SIZE_BYTES);
assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
Expand All @@ -553,7 +562,6 @@ public void testStreamingEmptyBufferCount() {
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
config.remove(BUFFER_COUNT_RECORDS);
config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "false");
assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
.hasMessageContaining(BUFFER_COUNT_RECORDS);
Expand All @@ -567,7 +575,6 @@ public void testStreamingBufferCountNegative() {
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
config.put(BUFFER_COUNT_RECORDS, "-1");
config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "false");
assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
.hasMessageContaining(BUFFER_COUNT_RECORDS);
Expand All @@ -581,7 +588,6 @@ public void testStreamingBufferCountValue() {
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
config.put(BUFFER_COUNT_RECORDS, "adssadsa");
config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "false");
assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
.hasMessageContaining(BUFFER_COUNT_RECORDS);
Expand Down Expand Up @@ -973,7 +979,6 @@ public void testEmptyClientId() {
config.put(SnowflakeSinkConnectorConfig.AUTHENTICATOR_TYPE, Utils.OAUTH);
config.put(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_SECRET, "client_secret");
config.put(SnowflakeSinkConnectorConfig.OAUTH_REFRESH_TOKEN, "refresh_token");
config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "false");
assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
.hasMessageContaining(SnowflakeSinkConnectorConfig.OAUTH_CLIENT_ID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ void shouldNotPropagateStreamingClientProperties_SingleBufferDisabled() {

connectorConfig.put(BUFFER_SIZE_BYTES, "10000000");
connectorConfig.put(SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES, "20000000");
connectorConfig.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "false");

// WHEN
StreamingClientProperties resultProperties = new StreamingClientProperties(connectorConfig);
Expand Down

0 comments on commit b5503f7

Please sign in to comment.