From ff4f353d19048012f251396aa9246156873fadb3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Z=C4=85bek?= Date: Wed, 13 Nov 2024 13:54:26 +0100 Subject: [PATCH 1/7] SNOW-1728002 get rid of reflection when enabling iceberg streaming --- pom.xml | 2 +- .../streaming/DirectStreamingClientHandler.java | 14 -------------- .../streaming/StreamingClientProperties.java | 3 +++ 3 files changed, 4 insertions(+), 15 deletions(-) diff --git a/pom.xml b/pom.xml index bae78ccf0..0f18b78c3 100644 --- a/pom.xml +++ b/pom.xml @@ -338,7 +338,7 @@ net.snowflake snowflake-ingest-sdk - 2.3.0 + 3.0.0 net.snowflake diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectStreamingClientHandler.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectStreamingClientHandler.java index 03f5ce65c..c9c97f994 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectStreamingClientHandler.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectStreamingClientHandler.java @@ -23,7 +23,6 @@ import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory; import net.snowflake.ingest.utils.SFException; -import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.kafka.connect.errors.ConnectException; /** This class handles all calls to manage the streaming ingestion client */ @@ -49,8 +48,6 @@ public SnowflakeStreamingIngestClient createClient( .setProperties(streamingClientProperties.clientProperties) .setParameterOverrides(streamingClientProperties.parameterOverrides); - setIcebergEnabled(builder, streamingClientProperties.isIcebergEnabled); - SnowflakeStreamingIngestClient createdClient = builder.build(); LOGGER.info( @@ -65,17 +62,6 @@ public SnowflakeStreamingIngestClient createClient( } } - private static void setIcebergEnabled( - SnowflakeStreamingIngestClientFactory.Builder builder, boolean isIcebergEnabled) { - try { - // TODO reflection should be replaced by proper builder.setIceberg(true) call in SNOW-1728002 - FieldUtils.writeField(builder, "isIceberg", isIcebergEnabled, true); - } catch (IllegalAccessException e) { - throw new IllegalStateException( - "Couldn't set iceberg by accessing private field: " + "isIceberg", e); - } - } - /** * Closes the given client. Swallows any exceptions * diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java index acf6bfcce..de8459e96 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java @@ -21,6 +21,7 @@ import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_CLIENT_PROVIDER_OVERRIDE_MAP; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_CLIENT_LAG; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES; +import static net.snowflake.ingest.utils.ParameterProvider.ENABLE_ICEBERG_STREAMING; import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES; import static net.snowflake.ingest.utils.ParameterProvider.MAX_CLIENT_LAG; import static net.snowflake.ingest.utils.ParameterProvider.MAX_MEMORY_LIMIT_IN_BYTES; @@ -90,6 +91,8 @@ public StreamingClientProperties(Map connectorConfig) { // Override only if the streaming client properties are explicitly set in config this.parameterOverrides = new HashMap<>(); + parameterOverrides.put(ENABLE_ICEBERG_STREAMING, isIcebergEnabled); + Optional snowpipeStreamingMaxClientLag = Optional.ofNullable(connectorConfig.get(SNOWPIPE_STREAMING_MAX_CLIENT_LAG)); snowpipeStreamingMaxClientLag.ifPresent( From d25acc4bc93d619cbafcee7d6ba740e7286381ab Mon Sep 17 00:00:00 2001 From: Michal Bobowski Date: Wed, 13 Nov 2024 14:09:05 +0100 Subject: [PATCH 2/7] SNOW-1728002 Use single buffer explicitly in iceberg tests --- .../kafka/connector/streaming/iceberg/IcebergIngestionIT.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java index 11dd31868..2bd6f666c 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java @@ -2,6 +2,7 @@ import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ICEBERG_ENABLED; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER; import static com.snowflake.kafka.connector.internal.TestUtils.getConfForStreaming; import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; @@ -49,6 +50,7 @@ public void setUp() { SnowflakeSinkConnectorConfig.setDefaultValues(config); config.put(ICEBERG_ENABLED, "TRUE"); config.put(ENABLE_SCHEMATIZATION_CONFIG, isSchemaEvolutionEnabled().toString()); + config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "true"); createIcebergTable(); enableSchemaEvolution(tableName); From e5fc167bd616543e4e50a72d9f5a1358fd100992 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Z=C4=85bek?= Date: Wed, 13 Nov 2024 14:12:02 +0100 Subject: [PATCH 3/7] change the way parameter is put into overrideMap --- .../internal/streaming/StreamingClientProperties.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java index de8459e96..1672f0fc4 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java @@ -91,8 +91,9 @@ public StreamingClientProperties(Map connectorConfig) { // Override only if the streaming client properties are explicitly set in config this.parameterOverrides = new HashMap<>(); - parameterOverrides.put(ENABLE_ICEBERG_STREAMING, isIcebergEnabled); - + if (isIcebergEnabled) { + parameterOverrides.put(ENABLE_ICEBERG_STREAMING, "true"); + } Optional snowpipeStreamingMaxClientLag = Optional.ofNullable(connectorConfig.get(SNOWPIPE_STREAMING_MAX_CLIENT_LAG)); snowpipeStreamingMaxClientLag.ifPresent( From 068829a3c5fd39cebb0acb21bde537da48b519df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Z=C4=85bek?= Date: Wed, 13 Nov 2024 15:02:42 +0100 Subject: [PATCH 4/7] do not update ingest-sdk --- pom.xml | 2 +- .../internal/streaming/StreamingClientProperties.java | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index 0f18b78c3..bae78ccf0 100644 --- a/pom.xml +++ b/pom.xml @@ -338,7 +338,7 @@ net.snowflake snowflake-ingest-sdk - 3.0.0 + 2.3.0 net.snowflake diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java index 1672f0fc4..398548b57 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java @@ -21,7 +21,6 @@ import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_CLIENT_PROVIDER_OVERRIDE_MAP; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_CLIENT_LAG; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES; -import static net.snowflake.ingest.utils.ParameterProvider.ENABLE_ICEBERG_STREAMING; import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES; import static net.snowflake.ingest.utils.ParameterProvider.MAX_CLIENT_LAG; import static net.snowflake.ingest.utils.ParameterProvider.MAX_MEMORY_LIMIT_IN_BYTES; @@ -92,7 +91,7 @@ public StreamingClientProperties(Map connectorConfig) { // Override only if the streaming client properties are explicitly set in config this.parameterOverrides = new HashMap<>(); if (isIcebergEnabled) { - parameterOverrides.put(ENABLE_ICEBERG_STREAMING, "true"); + parameterOverrides.put("ENABLE_ICEBERG_STREAMING", "true"); } Optional snowpipeStreamingMaxClientLag = Optional.ofNullable(connectorConfig.get(SNOWPIPE_STREAMING_MAX_CLIENT_LAG)); From 37795985525eaaa79de11fff669a632db599d5c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Z=C4=85bek?= Date: Thu, 14 Nov 2024 10:06:48 +0100 Subject: [PATCH 5/7] put lowercase param name --- .../connector/internal/streaming/StreamingClientProperties.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java index 398548b57..76c82ecc2 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java @@ -91,7 +91,7 @@ public StreamingClientProperties(Map connectorConfig) { // Override only if the streaming client properties are explicitly set in config this.parameterOverrides = new HashMap<>(); if (isIcebergEnabled) { - parameterOverrides.put("ENABLE_ICEBERG_STREAMING", "true"); + parameterOverrides.put("enable_iceberg_streaming", "true"); } Optional snowpipeStreamingMaxClientLag = Optional.ofNullable(connectorConfig.get(SNOWPIPE_STREAMING_MAX_CLIENT_LAG)); From b03c0f6d1fefc9ae3f39308da2e11fe71d45a0a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Z=C4=85bek?= Date: Thu, 14 Nov 2024 10:16:35 +0100 Subject: [PATCH 6/7] bump ingest-sdk to 3.0.0 --- pom.xml | 2 +- pom_confluent.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index bae78ccf0..0f18b78c3 100644 --- a/pom.xml +++ b/pom.xml @@ -338,7 +338,7 @@ net.snowflake snowflake-ingest-sdk - 2.3.0 + 3.0.0 net.snowflake diff --git a/pom_confluent.xml b/pom_confluent.xml index 472046910..a4e7da1a8 100644 --- a/pom_confluent.xml +++ b/pom_confluent.xml @@ -479,7 +479,7 @@ net.snowflake snowflake-ingest-sdk - 2.3.0 + 3.0.0 net.snowflake From dcc2f0f52835f31f6f2d0e67cea54bdd17996c1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Z=C4=85bek?= Date: Thu, 14 Nov 2024 11:12:07 +0100 Subject: [PATCH 7/7] use enum --- .../internal/streaming/StreamingClientProperties.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java index 76c82ecc2..1672f0fc4 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java @@ -21,6 +21,7 @@ import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_CLIENT_PROVIDER_OVERRIDE_MAP; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_CLIENT_LAG; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_MAX_MEMORY_LIMIT_IN_BYTES; +import static net.snowflake.ingest.utils.ParameterProvider.ENABLE_ICEBERG_STREAMING; import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES; import static net.snowflake.ingest.utils.ParameterProvider.MAX_CLIENT_LAG; import static net.snowflake.ingest.utils.ParameterProvider.MAX_MEMORY_LIMIT_IN_BYTES; @@ -91,7 +92,7 @@ public StreamingClientProperties(Map connectorConfig) { // Override only if the streaming client properties are explicitly set in config this.parameterOverrides = new HashMap<>(); if (isIcebergEnabled) { - parameterOverrides.put("enable_iceberg_streaming", "true"); + parameterOverrides.put(ENABLE_ICEBERG_STREAMING, "true"); } Optional snowpipeStreamingMaxClientLag = Optional.ofNullable(connectorConfig.get(SNOWPIPE_STREAMING_MAX_CLIENT_LAG));