diff --git a/pom.xml b/pom.xml index 09ecf17ad..de99af1e4 100644 --- a/pom.xml +++ b/pom.xml @@ -345,6 +345,8 @@ snowflake-jdbc + system + /Users/bzabek/snowflake-kafka-connector/snowflake-ingest-sdk.jar diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java index 28a56fe93..176231997 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java @@ -534,7 +534,7 @@ private void insertRowFallbackSupplier(Throwable ex) */ private void handleInsertRowFailure( List insertErrors, SinkRecord kafkaSinkRecord) { - if (enableSchemaEvolution) { + if (enableSchemaEvolution) { //zapnij sie tuej InsertValidationResponse.InsertError insertError = insertErrors.get(0); SchemaEvolutionTargetItems schemaEvolutionTargetItems = insertErrorMapper.mapToSchemaEvolutionItems(insertError, this.channel.getTableName()); diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java index 08f0bf2f3..d7bd2a438 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java @@ -16,7 +16,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.kafka.connect.sink.SinkRecord; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -35,7 +34,7 @@ protected void createIcebergTable() { @ParameterizedTest(name = "{0}") @MethodSource("prepareData") - @Disabled + // @Disabled void shouldEvolveSchemaAndInsertRecords( String description, String message, DescribeTableRow[] expectedSchema, boolean withSchema) throws Exception { @@ -81,6 +80,59 @@ void shouldEvolveSchemaAndInsertRecords( assertRecordsInTable(); } + @ParameterizedTest(name = "{0}") + @MethodSource("prepareData") + // @Disabled + void shouldEvolveSchemaAndInsertRecords_withObjects( + String description, String message, DescribeTableRow[] expectedSchema, boolean withSchema) + throws Exception { + // start off with just one column + List rows = describeTable(tableName); + assertThat(rows) + .hasSize(1) + .extracting(DescribeTableRow::getColumn) + .contains(Utils.TABLE_COLUMN_METADATA); + + SinkRecord record = createKafkaRecord(message, 0, withSchema); + service.insert(Collections.singletonList(record)); + waitForOffset(-1); + rows = describeTable(tableName); + assertThat(rows.size()).isEqualTo(9); + + // don't check metadata column schema, we have different tests for that + rows = + rows.stream() + .filter(r -> !r.getColumn().equals(Utils.TABLE_COLUMN_METADATA)) + .collect(Collectors.toList()); + + assertThat(rows).containsExactlyInAnyOrder(expectedSchema); + + // resend and store same record without any issues now + service.insert(Collections.singletonList(record)); + waitForOffset(1); + + // and another record with same schema + service.insert(Collections.singletonList(createKafkaRecord(message, 1, withSchema))); + waitForOffset(2); + + String mapRecordJson = + "{" + + " \"type\": \"map\", " + + " \"key-id\": 4, " + + " \"key\": \"string\", " + + " \"value-id\": 5, " + + " \"value-required\": false, " + + " \"value\": \"double\" " + + "}"; + + // reinsert record with extra field + service.insert(Collections.singletonList(createKafkaRecord(mapRecordJson, 2, false))); + rows = describeTable(tableName); + assertThat(rows).hasSize(15); + service.insert(Collections.singletonList(createKafkaRecord(mapRecordJson, 2, false))); + waitForOffset(3); // FAIL + } + private void assertRecordsInTable() { List> recordsWithMetadata = selectAllSchematizedRecords();