diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectStreamingClientHandler.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectStreamingClientHandler.java index f86574c4b..03f5ce65c 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectStreamingClientHandler.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectStreamingClientHandler.java @@ -49,7 +49,7 @@ public SnowflakeStreamingIngestClient createClient( .setProperties(streamingClientProperties.clientProperties) .setParameterOverrides(streamingClientProperties.parameterOverrides); - enableIceberg(builder); + setIcebergEnabled(builder, streamingClientProperties.isIcebergEnabled); SnowflakeStreamingIngestClient createdClient = builder.build(); @@ -65,13 +65,14 @@ public SnowflakeStreamingIngestClient createClient( } } - private static void enableIceberg(SnowflakeStreamingIngestClientFactory.Builder builder) { + private static void setIcebergEnabled( + SnowflakeStreamingIngestClientFactory.Builder builder, boolean isIcebergEnabled) { try { // TODO reflection should be replaced by proper builder.setIceberg(true) call in SNOW-1728002 - FieldUtils.writeField(builder, "isIceberg", true, true); + FieldUtils.writeField(builder, "isIceberg", isIcebergEnabled, true); } catch (IllegalAccessException e) { throw new IllegalStateException( - "Couldn't enable iceberg by accessing private field: " + "isIceberg", e); + "Couldn't set iceberg by accessing private field: " + "isIceberg", e); } } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java index e5f8ca388..acf6bfcce 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java @@ -62,6 +62,8 @@ public class StreamingClientProperties { public final String clientName; public final Map parameterOverrides; + public final boolean isIcebergEnabled; + /** * Creates non-null properties, client name and parameter overrides for the {@link * net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient} from the given connectorConfig @@ -80,6 +82,8 @@ public StreamingClientProperties(Map connectorConfig) { this.clientProperties = StreamingUtils.convertConfigForStreamingClient(connectorConfig); + this.isIcebergEnabled = Utils.isIcebergEnabled(connectorConfig); + this.clientName = STREAMING_CLIENT_PREFIX_NAME + connectorConfig.getOrDefault(Utils.NAME, DEFAULT_CLIENT_NAME); @@ -225,7 +229,8 @@ public String getLoggableClientProperties() { public boolean equals(Object other) { return other.getClass().equals(StreamingClientProperties.class) && ((StreamingClientProperties) other).clientProperties.equals(this.clientProperties) - && ((StreamingClientProperties) other).parameterOverrides.equals(this.parameterOverrides); + && ((StreamingClientProperties) other).parameterOverrides.equals(this.parameterOverrides) + && ((StreamingClientProperties) other).isIcebergEnabled == this.isIcebergEnabled; } /** @@ -236,6 +241,6 @@ public boolean equals(Object other) { */ @Override public int hashCode() { - return Objects.hash(this.clientProperties, this.parameterOverrides); + return Objects.hash(this.clientProperties, this.parameterOverrides, this.isIcebergEnabled); } }