Skip to content

Commit

Permalink
Add isIceberg to props
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-wtrefon committed Oct 15, 2024
1 parent 205f928 commit af64211
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public SnowflakeStreamingIngestClient createClient(
.setProperties(streamingClientProperties.clientProperties)
.setParameterOverrides(streamingClientProperties.parameterOverrides);

enableIceberg(builder);
setIcebergEnabled(builder, streamingClientProperties.isIcebergEnabled);

SnowflakeStreamingIngestClient createdClient = builder.build();

Expand All @@ -65,13 +65,14 @@ public SnowflakeStreamingIngestClient createClient(
}
}

private static void enableIceberg(SnowflakeStreamingIngestClientFactory.Builder builder) {
private static void setIcebergEnabled(
SnowflakeStreamingIngestClientFactory.Builder builder, boolean isIcebergEnabled) {
try {
// TODO reflection should be replaced by proper builder.setIceberg(true) call in SNOW-1728002
FieldUtils.writeField(builder, "isIceberg", true, true);
FieldUtils.writeField(builder, "isIceberg", isIcebergEnabled, true);
} catch (IllegalAccessException e) {
throw new IllegalStateException(
"Couldn't enable iceberg by accessing private field: " + "isIceberg", e);
"Couldn't set iceberg by accessing private field: " + "isIceberg", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public class StreamingClientProperties {
public final String clientName;
public final Map<String, Object> parameterOverrides;

public final boolean isIcebergEnabled;

/**
* Creates non-null properties, client name and parameter overrides for the {@link
* net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient} from the given connectorConfig
Expand All @@ -80,6 +82,8 @@ public StreamingClientProperties(Map<String, String> connectorConfig) {

this.clientProperties = StreamingUtils.convertConfigForStreamingClient(connectorConfig);

this.isIcebergEnabled = Utils.isIcebergEnabled(connectorConfig);

this.clientName =
STREAMING_CLIENT_PREFIX_NAME
+ connectorConfig.getOrDefault(Utils.NAME, DEFAULT_CLIENT_NAME);
Expand Down Expand Up @@ -225,7 +229,8 @@ public String getLoggableClientProperties() {
public boolean equals(Object other) {
return other.getClass().equals(StreamingClientProperties.class)
&& ((StreamingClientProperties) other).clientProperties.equals(this.clientProperties)
&& ((StreamingClientProperties) other).parameterOverrides.equals(this.parameterOverrides);
&& ((StreamingClientProperties) other).parameterOverrides.equals(this.parameterOverrides)
&& ((StreamingClientProperties) other).isIcebergEnabled == this.isIcebergEnabled;
}

/**
Expand All @@ -236,6 +241,6 @@ public boolean equals(Object other) {
*/
@Override
public int hashCode() {
return Objects.hash(this.clientProperties, this.parameterOverrides);
return Objects.hash(this.clientProperties, this.parameterOverrides, this.isIcebergEnabled);
}
}

0 comments on commit af64211

Please sign in to comment.