Skip to content

Commit

Permalink
SNOW-1805174 Enable single buffer by default (#1005)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-mbobowski authored Nov 22, 2024
1 parent dfa33de commit e6f5154
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,10 @@ public class SnowflakeSinkConnectorConfig {
public static final String SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER =
"snowflake.streaming.enable.single.buffer";

public static final boolean SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER_DEFAULT = false;
public static final boolean SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER_DEFAULT = true;
public static final String SNOWPIPE_STREAMING_MAX_CLIENT_LAG =
"snowflake.streaming.max.client.lag";
public static final int SNOWPIPE_STREAMING_MAX_CLIENT_LAG_SECONDS_DEFAULT = 120;

public static final String SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES =
"snowflake.streaming.max.memory.limit.bytes";
Expand Down Expand Up @@ -258,6 +259,12 @@ public static void setDefaultValues(Map<String, String> config) {
SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER,
SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER_DEFAULT,
"");

setFieldToDefaultValues(
config,
SNOWPIPE_STREAMING_MAX_CLIENT_LAG,
SNOWPIPE_STREAMING_MAX_CLIENT_LAG_SECONDS_DEFAULT,
"seconds");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig.SNOWPIPE_STREAMING;

import com.google.common.collect.ImmutableMap;
import com.snowflake.kafka.connector.internal.parameters.InternalBufferParameters;
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig;
import com.snowflake.kafka.connector.internal.streaming.StreamingConfigValidator;
import java.util.HashMap;
Expand All @@ -14,6 +15,11 @@ public class IcebergConfigValidator implements StreamingConfigValidator {
private static final String INCOMPATIBLE_INGESTION_METHOD =
"Ingestion to Iceberg table is supported only for Snowpipe Streaming";

private static final String DOUBLE_BUFFER_NOT_SUPPORTED =
"Ingestion to Iceberg table is supported only with "
+ SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER
+ " enabled.";

@Override
public ImmutableMap<String, String> validate(Map<String, String> inputConfig) {
boolean isIcebergEnabled = Boolean.parseBoolean(inputConfig.get(ICEBERG_ENABLED));
Expand All @@ -31,6 +37,10 @@ public ImmutableMap<String, String> validate(Map<String, String> inputConfig) {
validationErrors.put(INGESTION_METHOD_OPT, INCOMPATIBLE_INGESTION_METHOD);
}

if (!InternalBufferParameters.isSingleBufferEnabled(inputConfig)) {
validationErrors.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, DOUBLE_BUFFER_NOT_SUPPORTED);
}

return ImmutableMap.copyOf(validationErrors);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -489,11 +489,10 @@ public void testErrorLog_DisallowedValues() {
// ---------- Streaming Buffer tests ---------- //
@Test
public void testStreamingEmptyFlushTime() {
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
Map<String, String> config =
SnowflakeSinkConnectorConfigBuilder.streamingConfig()
.withSingleBufferEnabled(false)
.build();
config.remove(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC);
assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
Expand All @@ -502,11 +501,10 @@ public void testStreamingEmptyFlushTime() {

@Test
public void testStreamingFlushTimeSmall() {
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
Map<String, String> config =
SnowflakeSinkConnectorConfigBuilder.streamingConfig()
.withSingleBufferEnabled(false)
.build();
config.put(
SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC,
(StreamingUtils.STREAMING_BUFFER_FLUSH_TIME_MINIMUM_SEC - 1) + "");
Expand All @@ -517,11 +515,10 @@ public void testStreamingFlushTimeSmall() {

@Test
public void testStreamingFlushTimeNotNumber() {
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
Map<String, String> config =
SnowflakeSinkConnectorConfigBuilder.streamingConfig()
.withSingleBufferEnabled(false)
.build();
config.put(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC, "fdas");
assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
Expand All @@ -530,11 +527,10 @@ public void testStreamingFlushTimeNotNumber() {

@Test
public void testStreamingEmptyBufferSize() {
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
Map<String, String> config =
SnowflakeSinkConnectorConfigBuilder.streamingConfig()
.withSingleBufferEnabled(false)
.build();
config.remove(BUFFER_SIZE_BYTES);
assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
Expand All @@ -543,11 +539,10 @@ public void testStreamingEmptyBufferSize() {

@Test
public void testStreamingEmptyBufferCount() {
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
Map<String, String> config =
SnowflakeSinkConnectorConfigBuilder.streamingConfig()
.withSingleBufferEnabled(false)
.build();
config.remove(BUFFER_COUNT_RECORDS);
assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
Expand All @@ -556,11 +551,10 @@ public void testStreamingEmptyBufferCount() {

@Test
public void testStreamingBufferCountNegative() {
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
Map<String, String> config =
SnowflakeSinkConnectorConfigBuilder.streamingConfig()
.withSingleBufferEnabled(false)
.build();
config.put(BUFFER_COUNT_RECORDS, "-1");
assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
Expand All @@ -569,11 +563,10 @@ public void testStreamingBufferCountNegative() {

@Test
public void testStreamingBufferCountValue() {
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
Map<String, String> config =
SnowflakeSinkConnectorConfigBuilder.streamingConfig()
.withSingleBufferEnabled(false)
.build();
config.put(BUFFER_COUNT_RECORDS, "adssadsa");
assertThatThrownBy(() -> connectorConfigValidator.validateConfig(config))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.snowflake.kafka.connector.config;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER;

import com.google.common.collect.ImmutableMap;
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig;
Expand Down Expand Up @@ -52,6 +53,11 @@ public static Stream<Arguments> invalidConfigs() {
SnowflakeSinkConnectorConfigBuilder.icebergConfig()
.withIngestionMethod(IngestionMethodConfig.SNOWPIPE)
.build(),
INGESTION_METHOD_OPT));
INGESTION_METHOD_OPT),
Arguments.of(
SnowflakeSinkConnectorConfigBuilder.icebergConfig()
.withSingleBufferEnabled(false)
.build(),
SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_CHANNEL_OFFSET_TOKEN_VERIFICATION_FUNCTION_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER;
import static com.snowflake.kafka.connector.Utils.*;
import static com.snowflake.kafka.connector.Utils.SF_DATABASE;

Expand Down Expand Up @@ -33,8 +34,7 @@ public static SnowflakeSinkConnectorConfigBuilder icebergConfig() {
return commonRequiredFields()
.withIcebergEnabled()
.withIngestionMethod(IngestionMethodConfig.SNOWPIPE_STREAMING)
.withSchematizationEnabled(true)
.withRole("role");
.withSchematizationEnabled(true);
}

private static SnowflakeSinkConnectorConfigBuilder commonRequiredFields() {
Expand All @@ -46,6 +46,7 @@ private static SnowflakeSinkConnectorConfigBuilder commonRequiredFields() {
.withDatabase("testDatabase")
.withUser("userName")
.withPrivateKey("fdsfsdfsdfdsfdsrqwrwewrwrew42314424")
.withRole("role")
.withDefaultBufferConfig();
}

Expand Down Expand Up @@ -124,6 +125,11 @@ public SnowflakeSinkConnectorConfigBuilder withChannelOffsetTokenVerificationFun
return this;
}

public SnowflakeSinkConnectorConfigBuilder withSingleBufferEnabled(boolean enabled) {
config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, Boolean.toString(enabled));
return this;
}

public Map<String, String> build() {
return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ public static Map<String, String> getConfForStreaming() {
// On top of existing configurations, add
configuration.put(Utils.SF_ROLE, getProfile(PROFILE_PATH).get(ROLE).asText());
configuration.put(Utils.TASK_ID, "0");
configuration.put(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_CLIENT_LAG, "1");
configuration.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@

import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.config.SnowflakeSinkConnectorConfigBuilder;
import com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException;
import com.snowflake.kafka.connector.internal.TestUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
Expand Down Expand Up @@ -85,7 +87,10 @@ public void testGetValidProperties() {
@Test
void shouldNotPropagateStreamingClientProperties_SingleBufferDisabled() {
// GIVEN
Map<String, String> connectorConfig = TestUtils.getConfForStreaming();
Map<String, String> connectorConfig =
SnowflakeSinkConnectorConfigBuilder.streamingConfig()
.withSingleBufferEnabled(false)
.build();

connectorConfig.put(BUFFER_SIZE_BYTES, "10000000");
connectorConfig.put(SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES, "20000000");
Expand All @@ -101,8 +106,12 @@ void shouldNotPropagateStreamingClientProperties_SingleBufferDisabled() {
@CsvSource({"true", "false"})
void shouldPropagateStreamingClientPropertiesFromOverrideMap(String singleBufferEnabled) {
// GIVEN
Map<String, String> connectorConfig = TestUtils.getConfForStreaming();
Map<String, String> connectorConfig =
SnowflakeSinkConnectorConfigBuilder.streamingConfig()
.withSingleBufferEnabled(Boolean.parseBoolean(singleBufferEnabled))
.build();

connectorConfig.remove(BUFFER_SIZE_BYTES);
connectorConfig.put(
SNOWPIPE_STREAMING_CLIENT_PROVIDER_OVERRIDE_MAP,
"MAX_CHANNEL_SIZE_IN_BYTES:1,MAX_MEMORY_LIMIT_IN_BYTES:2");
Expand All @@ -122,11 +131,11 @@ void shouldPropagateStreamingClientPropertiesFromOverrideMap(String singleBuffer
@Test
void shouldPropagateStreamingClientProperties_SingleBufferEnabled() {
// GIVEN
Map<String, String> connectorConfig = TestUtils.getConfForStreaming();
Map<String, String> connectorConfig =
SnowflakeSinkConnectorConfigBuilder.streamingConfig().withSingleBufferEnabled(true).build();

connectorConfig.put(BUFFER_SIZE_BYTES, "10000000");
connectorConfig.put(SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES, "20000000");
connectorConfig.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "true");

Map<String, String> expectedParameterOverrides = new HashMap<>();
expectedParameterOverrides.put(MAX_CHANNEL_SIZE_IN_BYTES, "10000000");
Expand All @@ -142,11 +151,11 @@ void shouldPropagateStreamingClientProperties_SingleBufferEnabled() {
@Test
void explicitStreamingClientPropertiesTakePrecedenceOverOverrideMap_SingleBufferEnabled() {
// GIVEN
Map<String, String> connectorConfig = TestUtils.getConfForStreaming();
Map<String, String> connectorConfig =
SnowflakeSinkConnectorConfigBuilder.streamingConfig().withSingleBufferEnabled(true).build();

connectorConfig.put(BUFFER_SIZE_BYTES, "10000000");
connectorConfig.put(SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES, "20000000");
connectorConfig.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "true");
connectorConfig.put(
SNOWPIPE_STREAMING_CLIENT_PROVIDER_OVERRIDE_MAP,
"MAX_CHANNEL_SIZE_IN_BYTES:1,MAX_MEMORY_LIMIT_IN_BYTES:2");
Expand All @@ -166,11 +175,13 @@ void explicitStreamingClientPropertiesTakePrecedenceOverOverrideMap_SingleBuffer
void
explicitStreamingClientPropertiesShouldNOTTakePrecedenceOverOverrideMap_SingleBufferDisabled() {
// GIVEN
Map<String, String> connectorConfig = TestUtils.getConfForStreaming();
Map<String, String> connectorConfig =
SnowflakeSinkConnectorConfigBuilder.streamingConfig()
.withSingleBufferEnabled(false)
.build();

connectorConfig.put(BUFFER_SIZE_BYTES, "10000000");
connectorConfig.put(SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES, "20000000");
connectorConfig.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "false");
connectorConfig.put(
SNOWPIPE_STREAMING_CLIENT_PROVIDER_OVERRIDE_MAP,
"MAX_CHANNEL_SIZE_IN_BYTES:1,MAX_MEMORY_LIMIT_IN_BYTES:2");
Expand Down Expand Up @@ -251,7 +262,10 @@ public void testValidPropertiesWithOverriddenStreamingPropertiesMap_withMaxClien
public void
testValidPropertiesWithOverriddenStreamingPropertiesMap_withMaxClientLagOnlyInMapOfProperties() {

Map<String, String> connectorConfig = TestUtils.getConfForStreaming();
Map<String, String> connectorConfig =
SnowflakeSinkConnectorConfigBuilder.streamingConfig()
.withSingleBufferEnabled(false)
.build();
connectorConfig.put(Utils.NAME, "testName");
connectorConfig.put(Utils.SF_URL, "testUrl");
connectorConfig.put(Utils.SF_ROLE, "testRole");
Expand All @@ -271,9 +285,9 @@ public void testValidPropertiesWithOverriddenStreamingPropertiesMap_withMaxClien
StreamingClientProperties resultProperties = new StreamingClientProperties(connectorConfig);

// verify
assert resultProperties.clientProperties.equals(expectedProps);
assert resultProperties.clientName.equals(expectedClientName);
assert resultProperties.parameterOverrides.equals(expectedParameterOverrides);
Assertions.assertEquals(expectedProps, resultProperties.clientProperties);
Assertions.assertEquals(expectedClientName, resultProperties.clientName);
Assertions.assertEquals(expectedParameterOverrides, resultProperties.parameterOverrides);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"snowflake.ingestion.method": "SNOWPIPE_STREAMING",
"snowflake.streaming.enable.single.buffer": "$SNOWFLAKE_STREAMING_ENABLE_SINGLE_BUFFER",
"snowflake.streaming.closeChannelsInParallel.enabled": true,
"snowflake.streaming.max.client.lag": "10",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def send(self):
self.driver.sendBytesData(topic, value, key)

# Sleep for some time and then verify the rows are ingested
sleep(120)
sleep(100)
self.verify("0")

# Recreate the table
Expand Down

0 comments on commit e6f5154

Please sign in to comment.