diff --git a/pom.xml b/pom.xml
index 09ecf17ad..de99af1e4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -345,6 +345,8 @@
snowflake-jdbc
+ system
+ /Users/bzabek/snowflake-kafka-connector/snowflake-ingest-sdk.jar
diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectStreamingClientHandler.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectStreamingClientHandler.java
index 03f5ce65c..c9c97f994 100644
--- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectStreamingClientHandler.java
+++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectStreamingClientHandler.java
@@ -23,7 +23,6 @@
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory;
import net.snowflake.ingest.utils.SFException;
-import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.kafka.connect.errors.ConnectException;
/** This class handles all calls to manage the streaming ingestion client */
@@ -49,8 +48,6 @@ public SnowflakeStreamingIngestClient createClient(
.setProperties(streamingClientProperties.clientProperties)
.setParameterOverrides(streamingClientProperties.parameterOverrides);
- setIcebergEnabled(builder, streamingClientProperties.isIcebergEnabled);
-
SnowflakeStreamingIngestClient createdClient = builder.build();
LOGGER.info(
@@ -65,17 +62,6 @@ public SnowflakeStreamingIngestClient createClient(
}
}
- private static void setIcebergEnabled(
- SnowflakeStreamingIngestClientFactory.Builder builder, boolean isIcebergEnabled) {
- try {
- // TODO reflection should be replaced by proper builder.setIceberg(true) call in SNOW-1728002
- FieldUtils.writeField(builder, "isIceberg", isIcebergEnabled, true);
- } catch (IllegalAccessException e) {
- throw new IllegalStateException(
- "Couldn't set iceberg by accessing private field: " + "isIceberg", e);
- }
- }
-
/**
* Closes the given client. Swallows any exceptions
*
diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java
index acf6bfcce..f82b9add4 100644
--- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java
+++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProperties.java
@@ -90,6 +90,10 @@ public StreamingClientProperties(Map connectorConfig) {
// Override only if the streaming client properties are explicitly set in config
this.parameterOverrides = new HashMap<>();
+ if (isIcebergEnabled) {
+ // todo extract to field
+ this.parameterOverrides.put("enable_iceberg_streaming", "true");
+ }
Optional snowpipeStreamingMaxClientLag =
Optional.ofNullable(connectorConfig.get(SNOWPIPE_STREAMING_MAX_CLIENT_LAG));
snowpipeStreamingMaxClientLag.ifPresent(
diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java
index 206c21b39..d9cd099c4 100644
--- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java
+++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java
@@ -52,7 +52,10 @@ public String mapToColumnType(Schema.Type kafkaType, String schemaName) {
} else {
return "BINARY";
}
+ case STRUCT:
+ return "OBJECT()";
case ARRAY:
+ throw new IllegalArgumentException("Arrays, struct and map not supported!");
default:
// MAP and STRUCT will go here
throw new IllegalArgumentException("Arrays, struct and map not supported!");
diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java
index 08f0bf2f3..59aa2c11e 100644
--- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java
+++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java
@@ -11,12 +11,10 @@
import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord;
import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord.RecordWithMetadata;
import com.snowflake.kafka.connector.streaming.iceberg.sql.PrimitiveJsonRecord;
-import java.util.Collections;
-import java.util.List;
+import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.connect.sink.SinkRecord;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
@@ -35,7 +33,7 @@ protected void createIcebergTable() {
@ParameterizedTest(name = "{0}")
@MethodSource("prepareData")
- @Disabled
+ // @Disabled
void shouldEvolveSchemaAndInsertRecords(
String description, String message, DescribeTableRow[] expectedSchema, boolean withSchema)
throws Exception {
@@ -81,6 +79,64 @@ void shouldEvolveSchemaAndInsertRecords(
assertRecordsInTable();
}
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("prepareData")
+ // @Disabled
+ void shouldEvolveSchemaAndInsertRecords_withObjects(
+ String description, String message, DescribeTableRow[] expectedSchema, boolean withSchema)
+ throws Exception {
+ // start off with just one column
+ List rows = describeTable(tableName);
+ assertThat(rows)
+ .hasSize(1)
+ .extracting(DescribeTableRow::getColumn)
+ .contains(Utils.TABLE_COLUMN_METADATA);
+
+ SinkRecord record = createKafkaRecord(message, 0, withSchema);
+ service.insert(Collections.singletonList(record));
+ waitForOffset(-1);
+ rows = describeTable(tableName);
+ assertThat(rows.size()).isEqualTo(9);
+
+ // don't check metadata column schema, we have different tests for that
+ rows =
+ rows.stream()
+ .filter(r -> !r.getColumn().equals(Utils.TABLE_COLUMN_METADATA))
+ .collect(Collectors.toList());
+
+ assertThat(rows).containsExactlyInAnyOrder(expectedSchema);
+
+ // resend and store same record without any issues now
+ service.insert(Collections.singletonList(record));
+ waitForOffset(1);
+
+ // and another record with same schema
+ service.insert(Collections.singletonList(createKafkaRecord(message, 1, withSchema)));
+ waitForOffset(2);
+
+ String testStruct = "{ \"testStruct\": {" + "\"k1\" : 1," + "\"k2\" : 2" + "} " + "}";
+
+ // String testStruct =
+ // "{ \"testStruct\": {" +
+ // "\"k1\" : { \"nested_key1\" : 1}," +
+ // "\"k2\" : { \"nested_key2\" : 2}" +
+ // "} " +
+ // "}";
+
+ // String testStruct =
+ // "{ \"testStruct\": {" +
+ // "\"k1\" : { \"car\" : { \"brand\" : \"vw\" } }," +
+ // "\"k2\" : { \"car\" : { \"brand\" : \"toyota\" } }" +
+ // "} " +
+ // "}";
+ // reinsert record with extra field
+ service.insert(Collections.singletonList(createKafkaRecord(testStruct, 2, false)));
+ rows = describeTable(tableName);
+ // assertThat(rows).hasSize(15);
+ service.insert(Collections.singletonList(createKafkaRecord(testStruct, 2, false)));
+ waitForOffset(3);
+ }
+
private void assertRecordsInTable() {
List> recordsWithMetadata =
selectAllSchematizedRecords();