Skip to content

Commit

Permalink
Fix StreamingClientPropertiesTest
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-mbobowski committed Nov 21, 2024
1 parent 4cc734f commit f8d5273
Showing 1 changed file with 20 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@

import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.config.SnowflakeSinkConnectorConfigBuilder;
import com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException;
import com.snowflake.kafka.connector.internal.TestUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
Expand Down Expand Up @@ -85,8 +87,7 @@ public void testGetValidProperties() {
@Test
void shouldNotPropagateStreamingClientProperties_SingleBufferDisabled() {
// GIVEN
Map<String, String> connectorConfig = TestUtils.getConfForStreaming();
connectorConfig.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "false");
Map<String, String> connectorConfig = SnowflakeSinkConnectorConfigBuilder.streamingConfig().withSingleBufferEnabled(false).build();

connectorConfig.put(BUFFER_SIZE_BYTES, "10000000");
connectorConfig.put(SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES, "20000000");
Expand All @@ -102,8 +103,11 @@ void shouldNotPropagateStreamingClientProperties_SingleBufferDisabled() {
@CsvSource({"true", "false"})
void shouldPropagateStreamingClientPropertiesFromOverrideMap(String singleBufferEnabled) {
// GIVEN
Map<String, String> connectorConfig = TestUtils.getConfForStreaming();
Map<String, String> connectorConfig = SnowflakeSinkConnectorConfigBuilder.streamingConfig()
.withSingleBufferEnabled(Boolean.parseBoolean(singleBufferEnabled))
.build();

connectorConfig.remove(BUFFER_SIZE_BYTES);
connectorConfig.put(
SNOWPIPE_STREAMING_CLIENT_PROVIDER_OVERRIDE_MAP,
"MAX_CHANNEL_SIZE_IN_BYTES:1,MAX_MEMORY_LIMIT_IN_BYTES:2");
Expand All @@ -123,11 +127,12 @@ void shouldPropagateStreamingClientPropertiesFromOverrideMap(String singleBuffer
@Test
void shouldPropagateStreamingClientProperties_SingleBufferEnabled() {
// GIVEN
Map<String, String> connectorConfig = TestUtils.getConfForStreaming();
Map<String, String> connectorConfig = SnowflakeSinkConnectorConfigBuilder.streamingConfig()
.withSingleBufferEnabled(true)
.build();

connectorConfig.put(BUFFER_SIZE_BYTES, "10000000");
connectorConfig.put(SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES, "20000000");
connectorConfig.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "true");

Map<String, String> expectedParameterOverrides = new HashMap<>();
expectedParameterOverrides.put(MAX_CHANNEL_SIZE_IN_BYTES, "10000000");
Expand All @@ -143,11 +148,12 @@ void shouldPropagateStreamingClientProperties_SingleBufferEnabled() {
@Test
void explicitStreamingClientPropertiesTakePrecedenceOverOverrideMap_SingleBufferEnabled() {
// GIVEN
Map<String, String> connectorConfig = TestUtils.getConfForStreaming();
Map<String, String> connectorConfig = SnowflakeSinkConnectorConfigBuilder.streamingConfig()
.withSingleBufferEnabled(true)
.build();

connectorConfig.put(BUFFER_SIZE_BYTES, "10000000");
connectorConfig.put(SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES, "20000000");
connectorConfig.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "true");
connectorConfig.put(
SNOWPIPE_STREAMING_CLIENT_PROVIDER_OVERRIDE_MAP,
"MAX_CHANNEL_SIZE_IN_BYTES:1,MAX_MEMORY_LIMIT_IN_BYTES:2");
Expand All @@ -167,11 +173,12 @@ void explicitStreamingClientPropertiesTakePrecedenceOverOverrideMap_SingleBuffer
void
explicitStreamingClientPropertiesShouldNOTTakePrecedenceOverOverrideMap_SingleBufferDisabled() {
// GIVEN
Map<String, String> connectorConfig = TestUtils.getConfForStreaming();
Map<String, String> connectorConfig = SnowflakeSinkConnectorConfigBuilder.streamingConfig()
.withSingleBufferEnabled(false)
.build();

connectorConfig.put(BUFFER_SIZE_BYTES, "10000000");
connectorConfig.put(SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES, "20000000");
connectorConfig.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "false");
connectorConfig.put(
SNOWPIPE_STREAMING_CLIENT_PROVIDER_OVERRIDE_MAP,
"MAX_CHANNEL_SIZE_IN_BYTES:1,MAX_MEMORY_LIMIT_IN_BYTES:2");
Expand Down Expand Up @@ -252,7 +259,7 @@ public void testValidPropertiesWithOverriddenStreamingPropertiesMap_withMaxClien
public void
testValidPropertiesWithOverriddenStreamingPropertiesMap_withMaxClientLagOnlyInMapOfProperties() {

Map<String, String> connectorConfig = TestUtils.getConfForStreaming();
Map<String, String> connectorConfig = SnowflakeSinkConnectorConfigBuilder.streamingConfig().withSingleBufferEnabled(false).build();
connectorConfig.put(Utils.NAME, "testName");
connectorConfig.put(Utils.SF_URL, "testUrl");
connectorConfig.put(Utils.SF_ROLE, "testRole");
Expand All @@ -272,9 +279,9 @@ public void testValidPropertiesWithOverriddenStreamingPropertiesMap_withMaxClien
StreamingClientProperties resultProperties = new StreamingClientProperties(connectorConfig);

// verify
assert resultProperties.clientProperties.equals(expectedProps);
assert resultProperties.clientName.equals(expectedClientName);
assert resultProperties.parameterOverrides.equals(expectedParameterOverrides);
Assertions.assertEquals(expectedProps, resultProperties.clientProperties);
Assertions.assertEquals(expectedClientName, resultProperties.clientName);
Assertions.assertEquals(expectedParameterOverrides, resultProperties.parameterOverrides);
}

@Test
Expand Down

0 comments on commit f8d5273

Please sign in to comment.