From 77437fa95241ea0e9a374553e3be78f54751a171 Mon Sep 17 00:00:00 2001 From: Wojciech Trefon Date: Fri, 6 Dec 2024 15:14:11 +0100 Subject: [PATCH] SNOW-1731255: Put null or empty json nodes to DLQ for schema evolution (#1017) --- .../connector/internal/SnowflakeErrors.java | 6 +- .../DirectTopicPartitionChannel.java | 53 ++++++----- .../iceberg/IcebergColumnTreeFactory.java | 2 +- .../iceberg/IcebergColumnTypeMapper.java | 28 ++++-- .../dlq/InMemoryKafkaRecordErrorReporter.java | 8 ++ .../iceberg/IcebergColumnTypeMapperTest.java | 44 ++++++--- .../iceberg/ParseIcebergColumnTreeTest.java | 3 +- .../streaming/iceberg/IcebergIngestionIT.java | 6 +- .../IcebergIngestionNoSchemaEvolutionIT.java | 36 ++++++- .../IcebergIngestionSchemaEvolutionIT.java | 94 ++++++++++++++++++- .../iceberg/sql/ComplexJsonRecord.java | 34 +++++-- test/test_suit/base_iceberg_test.py | 18 +++- .../iceberg_schema_evolution_json_aws.py | 24 +++-- 13 files changed, 286 insertions(+), 70 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java index 39f098289..fb7c378e0 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeErrors.java @@ -340,7 +340,11 @@ public enum SnowflakeErrors { "Could not allocate thread for file cleaner to start processing in given time. If problem" + " persists, please try setting snowflake.snowpipe.use_new_cleaner to false"), ERROR_5025( - "5025", "Unexpected data type", "Unexpected data type encountered during schema evolution."); + "5025", "Unexpected data type", "Unexpected data type encountered during schema evolution."), + ERROR_5026( + "5026", + "Invalid SinkRecord received", + "Cannot infer type from null or empty object/list during schema evolution."); // properties diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java index 635c2fb92..2f98a820c 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java @@ -14,10 +14,7 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.base.Strings; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; import com.snowflake.kafka.connector.Utils; import com.snowflake.kafka.connector.dlq.KafkaRecordErrorReporter; @@ -44,13 +41,15 @@ import java.io.ByteArrayOutputStream; import java.io.ObjectOutputStream; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import net.snowflake.ingest.streaming.*; import net.snowflake.ingest.utils.SFException; import org.apache.kafka.common.TopicPartition; @@ -536,17 +535,38 @@ private void handleInsertRowFailure( SchemaEvolutionTargetItems schemaEvolutionTargetItems = insertErrorMapper.mapToSchemaEvolutionItems(insertError, this.channel.getTableName()); if (schemaEvolutionTargetItems.hasDataForSchemaEvolution()) { - schemaEvolutionService.evolveSchemaIfNeeded( - schemaEvolutionTargetItems, kafkaSinkRecord, channel.getTableSchema()); - streamingApiFallbackSupplier( - StreamingApiFallbackInvoker.INSERT_ROWS_SCHEMA_EVOLUTION_FALLBACK); + try { + schemaEvolutionService.evolveSchemaIfNeeded( + schemaEvolutionTargetItems, kafkaSinkRecord, channel.getTableSchema()); + streamingApiFallbackSupplier( + StreamingApiFallbackInvoker.INSERT_ROWS_SCHEMA_EVOLUTION_FALLBACK); + } catch (SnowflakeKafkaConnectorException e) { + LOGGER.error( + "Error while performing schema evolution for channel:{}", + this.getChannelNameFormatV1(), + e); + if (Objects.equals(e.getCode(), SnowflakeErrors.ERROR_5026.getCode())) { + handleError(Collections.singletonList(e), kafkaSinkRecord); + } else { + throw e; + } + } + return; } } + handleError( + insertErrors.stream() + .map(InsertValidationResponse.InsertError::getException) + .collect(Collectors.toList()), + kafkaSinkRecord); + } + + private void handleError(List insertErrors, SinkRecord kafkaSinkRecord) { if (logErrors) { - for (InsertValidationResponse.InsertError insertError : insertErrors) { - LOGGER.error("Insert Row Error message:{}", insertError.getException().getMessage()); + for (Exception insertError : insertErrors) { + LOGGER.error("Insert Row Error message:{}", insertError.getMessage()); } } if (errorTolerance) { @@ -563,7 +583,6 @@ private void handleInsertRowFailure( this.kafkaRecordErrorReporter.reportError( kafkaSinkRecord, insertErrors.stream() - .map(InsertValidationResponse.InsertError::getException) .findFirst() .orElseThrow( () -> @@ -574,20 +593,12 @@ private void handleInsertRowFailure( final String errMsg = String.format( "Error inserting Records using Streaming API with msg:%s", - insertErrors.get(0).getException().getMessage()); + insertErrors.get(0).getMessage()); this.telemetryServiceV2.reportKafkaConnectFatalError(errMsg); - throw new DataException(errMsg, insertErrors.get(0).getException()); + throw new DataException(errMsg, insertErrors.get(0)); } } - private List join( - List nonNullableColumns, List nullValueForNotNullColNames) { - return Lists.newArrayList( - Iterables.concat( - Optional.ofNullable(nonNullableColumns).orElse(ImmutableList.of()), - Optional.ofNullable(nullValueForNotNullColNames).orElse(ImmutableList.of()))); - } - // TODO: SNOW-529755 POLL committed offsets in background thread @Override diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeFactory.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeFactory.java index 17e3d4cc8..5a42c5151 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeFactory.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeFactory.java @@ -73,7 +73,7 @@ private LinkedHashMap produceChildren(Type apacheIcebe // -- parse tree from kafka record payload logic -- private IcebergFieldNode createNode(String name, JsonNode jsonNode) { - String snowflakeType = mapper.mapToColumnTypeFromJson(jsonNode); + String snowflakeType = mapper.mapToColumnTypeFromJson(name, jsonNode); return new IcebergFieldNode(name, snowflakeType, produceChildren(jsonNode)); } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java index 4af7602c4..8d3f26ceb 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java @@ -1,5 +1,6 @@ package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg; +import static com.snowflake.kafka.connector.internal.SnowflakeErrors.ERROR_5026; import static org.apache.kafka.connect.data.Schema.Type.ARRAY; import static org.apache.kafka.connect.data.Schema.Type.BOOLEAN; import static org.apache.kafka.connect.data.Schema.Type.BYTES; @@ -11,6 +12,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.snowflake.kafka.connector.internal.SnowflakeErrors; +import com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.kafka.connect.data.Date; @@ -72,8 +74,8 @@ String mapToColumnTypeFromIcebergSchema(Type apacheIcebergType) { * *

Converts Types from: JsonNode -> KafkaKafka -> Snowflake. */ - String mapToColumnTypeFromJson(JsonNode value) { - Schema.Type kafkaType = mapJsonNodeTypeToKafkaType(value); + String mapToColumnTypeFromJson(String name, JsonNode value) { + Schema.Type kafkaType = mapJsonNodeTypeToKafkaType(name, value); return mapToColumnTypeFromKafkaSchema(kafkaType, null); } @@ -124,15 +126,19 @@ String mapToColumnTypeFromKafkaSchema(Schema.Type kafkaType, String schemaName) } /** - * Map the JSON node type to Kafka type + * Map the JSON node type to Kafka type. For null and empty values, we can't infer the type, so we + * throw an exception. * + * @param name column/field name * @param value JSON node + * @throws SnowflakeKafkaConnectorException if the value is null or empty array or empty object * @return Kafka type */ - Schema.Type mapJsonNodeTypeToKafkaType(JsonNode value) { - if (value == null || value.isNull()) { - return STRING; - } else if (value.isNumber()) { + Schema.Type mapJsonNodeTypeToKafkaType(String name, JsonNode value) { + if (cannotInferType(value)) { + throw ERROR_5026.getException("'" + name + "' field value is null or empty"); + } + if (value.isNumber()) { if (value.isFloat()) { return FLOAT32; } else if (value.isDouble()) { @@ -154,4 +160,12 @@ Schema.Type mapJsonNodeTypeToKafkaType(JsonNode value) { return null; } } + + boolean cannotInferType(JsonNode value) { + // cannot infer type if value null or empty array or empty object + return value == null + || value.isNull() + || (value.isArray() && value.isEmpty()) + || (value.isObject() && value.isEmpty()); + } } diff --git a/src/test/java/com/snowflake/kafka/connector/dlq/InMemoryKafkaRecordErrorReporter.java b/src/test/java/com/snowflake/kafka/connector/dlq/InMemoryKafkaRecordErrorReporter.java index 733894838..492e9cbbb 100644 --- a/src/test/java/com/snowflake/kafka/connector/dlq/InMemoryKafkaRecordErrorReporter.java +++ b/src/test/java/com/snowflake/kafka/connector/dlq/InMemoryKafkaRecordErrorReporter.java @@ -33,6 +33,14 @@ private ReportedRecord(final SinkRecord record, final Throwable e) { this.e = e; } + public SinkRecord getRecord() { + return record; + } + + public Throwable getException() { + return e; + } + @Override public String toString() { return "ReportedData{" + "record=" + record + ", e=" + e + '}'; diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapperTest.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapperTest.java index 4b2ad16d1..6f37fbad2 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapperTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapperTest.java @@ -1,15 +1,19 @@ package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg; +import static com.snowflake.kafka.connector.internal.SnowflakeErrors.ERROR_5026; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException; import java.util.stream.Stream; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Time; import org.apache.kafka.connect.data.Timestamp; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -29,7 +33,26 @@ void shouldMapKafkaTypeToSnowflakeColumnType( @ParameterizedTest() @MethodSource("jsonNodeTypesToMap") void shouldMapJsonNodeTypeToKafkaType(JsonNode value, Schema.Type expectedKafkaType) { - assertThat(mapper.mapJsonNodeTypeToKafkaType(value)).isEqualTo(expectedKafkaType); + assertThat(mapper.mapJsonNodeTypeToKafkaType("test", value)).isEqualTo(expectedKafkaType); + } + + @ParameterizedTest() + @MethodSource("jsonNodeValuesToThrowException") + void shouldThrowExceptionWhenMappingEmptyOrNullNode(JsonNode value) { + assertThatThrownBy(() -> mapper.mapJsonNodeTypeToKafkaType("test", value)) + .isInstanceOf(SnowflakeKafkaConnectorException.class) + .hasMessageContaining("'test' field value is null or empty") + .matches( + e -> ((SnowflakeKafkaConnectorException) e).getCode().equals(ERROR_5026.getCode())); + } + + @Test + void shouldThrowExceptionForNullValue() { + assertThatThrownBy(() -> mapper.mapJsonNodeTypeToKafkaType("test", null)) + .isInstanceOf(SnowflakeKafkaConnectorException.class) + .hasMessageContaining("'test' field value is null or empty") + .matches( + e -> ((SnowflakeKafkaConnectorException) e).getCode().equals(ERROR_5026.getCode())); } private static Stream kafkaTypesToMap() { @@ -52,16 +75,8 @@ private static Stream kafkaTypesToMap() { Arguments.of(Schema.Type.STRUCT, null, "OBJECT")); } - private static Stream kafkaTypesToThrowException() { - return Stream.of( - Arguments.of(Schema.Type.ARRAY), - Arguments.of(Schema.Type.MAP), - Arguments.of(Schema.Type.STRUCT)); - } - private static Stream jsonNodeTypesToMap() { return Stream.of( - Arguments.of(JsonNodeFactory.instance.nullNode(), Schema.Type.STRING), Arguments.of(JsonNodeFactory.instance.numberNode((short) 1), Schema.Type.INT64), Arguments.of(JsonNodeFactory.instance.numberNode(1), Schema.Type.INT64), Arguments.of(JsonNodeFactory.instance.numberNode(1L), Schema.Type.INT64), @@ -70,7 +85,14 @@ private static Stream jsonNodeTypesToMap() { Arguments.of(JsonNodeFactory.instance.booleanNode(true), Schema.Type.BOOLEAN), Arguments.of(JsonNodeFactory.instance.textNode("text"), Schema.Type.STRING), Arguments.of(JsonNodeFactory.instance.binaryNode(new byte[] {1, 2, 3}), Schema.Type.BYTES), - Arguments.of(JsonNodeFactory.instance.arrayNode(), Schema.Type.ARRAY), - Arguments.of(JsonNodeFactory.instance.objectNode(), Schema.Type.STRUCT)); + Arguments.of(JsonNodeFactory.instance.arrayNode().add(1), Schema.Type.ARRAY), + Arguments.of(JsonNodeFactory.instance.objectNode().put("test", 1), Schema.Type.STRUCT)); + } + + private static Stream jsonNodeValuesToThrowException() { + return Stream.of( + Arguments.of(JsonNodeFactory.instance.nullNode()), + Arguments.of(JsonNodeFactory.instance.arrayNode()), + Arguments.of(JsonNodeFactory.instance.objectNode())); } } diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java index 328210378..2bc33b111 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java @@ -148,8 +148,7 @@ static Stream parseFromJsonArguments() { + " ] } ", "ARRAY(OBJECT(name VARCHAR, id LONG))"), // array - arguments("{\"testColumnName\": [1,2,3] }", "ARRAY(LONG)"), - arguments("{ \"testColumnName\": [] }", "ARRAY(VARCHAR(16777216))")); + arguments("{\"testColumnName\": [1,2,3] }", "ARRAY(LONG)")); } @ParameterizedTest diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java index 5b2587e2b..35a2ac47e 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java @@ -37,6 +37,7 @@ public abstract class IcebergIngestionIT extends BaseIcebergIT { protected String tableName; protected TopicPartition topicPartition; protected SnowflakeSinkService service; + protected InMemoryKafkaRecordErrorReporter kafkaRecordErrorReporter; protected static final String simpleRecordJson = "{\"simple\": \"extra field\"}"; @BeforeEach @@ -51,6 +52,8 @@ public void setUp() { config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "true"); // "snowflake.streaming.max.client.lag" = 1 second, for faster tests config.put(SNOWPIPE_STREAMING_MAX_CLIENT_LAG, "1"); + config.put(ERRORS_TOLERANCE_CONFIG, SnowflakeSinkConnectorConfig.ErrorTolerance.ALL.toString()); + config.put(ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "test_DLQ"); createIcebergTable(); enableSchemaEvolution(tableName); @@ -59,10 +62,11 @@ public void setUp() { Map topic2Table = new HashMap<>(); topic2Table.put(topic, tableName); + kafkaRecordErrorReporter = new InMemoryKafkaRecordErrorReporter(); service = SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config) .setRecordNumber(1) - .setErrorReporter(new InMemoryKafkaRecordErrorReporter()) + .setErrorReporter(kafkaRecordErrorReporter) .setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition))) .setTopic2TableMap(topic2Table) .addTask(tableName, topicPartition) diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionNoSchemaEvolutionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionNoSchemaEvolutionIT.java index 9600d4c56..994c27f35 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionNoSchemaEvolutionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionNoSchemaEvolutionIT.java @@ -1,9 +1,12 @@ package com.snowflake.kafka.connector.streaming.iceberg; +import static com.snowflake.kafka.connector.streaming.iceberg.sql.ComplexJsonRecord.complexJsonPayloadExample; +import static com.snowflake.kafka.connector.streaming.iceberg.sql.ComplexJsonRecord.complexJsonPayloadWithWrongValueTypeExample; import static com.snowflake.kafka.connector.streaming.iceberg.sql.ComplexJsonRecord.complexJsonRecordValueExample; import static org.assertj.core.api.Assertions.assertThat; import com.snowflake.kafka.connector.Utils; +import com.snowflake.kafka.connector.dlq.InMemoryKafkaRecordErrorReporter; import com.snowflake.kafka.connector.streaming.iceberg.sql.ComplexJsonRecord; import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord; import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord.RecordWithMetadata; @@ -12,6 +15,8 @@ import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -89,10 +94,33 @@ void shouldInsertRecords(String description, String message, boolean withSchema) service.insert(Collections.singletonList(createKafkaRecord(message, 2, withSchema))); waitForOffset(3); - assertRecordsInTable(); + assertRecordsInTable(0L, 1L, 2L); } - private void assertRecordsInTable() { + @Test + void shouldSendValueWithWrongTypeToDLQ() throws Exception { + SinkRecord wrongValueRecord1 = + createKafkaRecord(complexJsonPayloadWithWrongValueTypeExample, 0, false); + SinkRecord wrongValueRecord2 = + createKafkaRecord(complexJsonPayloadWithWrongValueTypeExample, 2, false); + service.insert( + Arrays.asList( + wrongValueRecord1, + createKafkaRecord(complexJsonPayloadExample, 1, false), + wrongValueRecord2, + createKafkaRecord(complexJsonPayloadExample, 3, false), + createKafkaRecord(complexJsonPayloadExample, 4, false))); + waitForOffset(5); + + assertRecordsInTable(1L, 3L, 4L); + List reportedRecords = + kafkaRecordErrorReporter.getReportedRecords(); + assertThat(reportedRecords).hasSize(2); + assertThat(reportedRecords.stream().map(it -> it.getRecord()).collect(Collectors.toList())) + .containsExactlyInAnyOrder(wrongValueRecord1, wrongValueRecord2); + } + + private void assertRecordsInTable(Long... expectedOffsets) { List> recordsWithMetadata = selectAllComplexJsonRecordFromRecordContent(); assertThat(recordsWithMetadata) @@ -106,7 +134,9 @@ private void assertRecordsInTable() { recordsWithMetadata.stream() .map(RecordWithMetadata::getMetadata) .collect(Collectors.toList()); - assertThat(metadataRecords).extracting(MetadataRecord::getOffset).containsExactly(0L, 1L, 2L); + assertThat(metadataRecords) + .extracting(MetadataRecord::getOffset) + .containsExactly(expectedOffsets); assertThat(metadataRecords) .hasSize(3) .allMatch( diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java index 708dfeb2e..f759fb923 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java @@ -7,10 +7,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import com.snowflake.kafka.connector.Utils; +import com.snowflake.kafka.connector.dlq.InMemoryKafkaRecordErrorReporter; import com.snowflake.kafka.connector.internal.DescribeTableRow; import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord; import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord.RecordWithMetadata; import com.snowflake.kafka.connector.streaming.iceberg.sql.PrimitiveJsonRecord; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -256,17 +258,19 @@ public void testComplexRecordEvolution() throws Exception { new DescribeTableRow("APPROVAL", "BOOLEAN"), new DescribeTableRow("ARRAY1", "ARRAY(NUMBER(19,0))"), new DescribeTableRow("ARRAY2", "ARRAY(VARCHAR(16777216))"), - new DescribeTableRow("ARRAY3", "ARRAY(VARCHAR(16777216))"), - // "array4" : null -> VARCHAR(16777216 - new DescribeTableRow("ARRAY4", "VARCHAR(16777216)"), + new DescribeTableRow("ARRAY3", "ARRAY(BOOLEAN)"), + new DescribeTableRow("ARRAY4", "ARRAY(NUMBER(19,0))"), new DescribeTableRow("ARRAY5", "ARRAY(ARRAY(NUMBER(19,0)))"), new DescribeTableRow( "NESTEDRECORD", "OBJECT(id_int8 NUMBER(19,0), id_int16 NUMBER(19,0), rating_float32 FLOAT," + " rating_float64 FLOAT, approval BOOLEAN, id_int32 NUMBER(19,0), description" + " VARCHAR(16777216), id_int64 NUMBER(19,0))"), - // "nestedRecord2": null -> VARCHAR(16777216) - new DescribeTableRow("NESTEDRECORD2", "VARCHAR(16777216)"), + new DescribeTableRow( + "NESTEDRECORD2", + "OBJECT(id_int8 NUMBER(19,0), id_int16 NUMBER(19,0), rating_float32 FLOAT," + + " rating_float64 FLOAT, approval BOOLEAN, id_int32 NUMBER(19,0), description" + + " VARCHAR(16777216), id_int64 NUMBER(19,0))"), }; assertThat(columns).containsExactlyInAnyOrder(expectedSchema); } @@ -494,4 +498,84 @@ void shouldAppendCommentTest() throws Exception { "column created by schema evolution from Snowflake Kafka Connector", columns.get(2).getComment()); } + + @ParameterizedTest(name = "{0}") + @MethodSource("nullOrEmptyValueShouldBeSentToDLQOnlyWhenNoSchema_dataSource") + void nullOrEmptyValueShouldBeSentToDLQOnlyWhenNoSchema( + String description, String jsonWithNullOrEmpty, String jsonWithFullData) throws Exception { + // given + SinkRecord emptyOrNullRecord = createKafkaRecord(jsonWithNullOrEmpty, 0, false); + + // when + // sending null value or empty list/object record with no schema defined should be sent to DLQ, + // second record with full data should create schema so the same null/empty record sent again + // should be ingested without any issues + service.insert(Arrays.asList(emptyOrNullRecord, createKafkaRecord(jsonWithFullData, 1, false))); + // retry due to schema evolution + service.insert( + Arrays.asList( + createKafkaRecord(jsonWithFullData, 1, false), + createKafkaRecord(jsonWithNullOrEmpty, 2, false))); + + // then + waitForOffset(3); + List reportedRecords = + kafkaRecordErrorReporter.getReportedRecords(); + assertThat(reportedRecords).hasSize(1); + assertThat(reportedRecords.get(0).getRecord()).isEqualTo(emptyOrNullRecord); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("wrongTypeValueMessages_dataSource") + void shouldSendValueWithWrongTypeToDLQ( + String description, String correctValueJson, String wrongValueJson) throws Exception { + // when + // init schema with first correct value + insertWithRetry(correctValueJson, 0, false); + + // insert record with wrong value followed by + SinkRecord wrongValueRecord = createKafkaRecord(wrongValueJson, 1, false); + service.insert(Arrays.asList(wrongValueRecord, createKafkaRecord(correctValueJson, 2, false))); + + // then + waitForOffset(3); + List reportedRecords = + kafkaRecordErrorReporter.getReportedRecords(); + assertThat(reportedRecords).hasSize(1); + assertThat(reportedRecords.get(0).getRecord()).isEqualTo(wrongValueRecord); + } + + private static Stream nullOrEmptyValueShouldBeSentToDLQOnlyWhenNoSchema_dataSource() { + return Stream.of( + Arguments.of("Null int", "{\"test\" : null }", "{\"test\" : 1 }"), + // only test for int, no other primitive types to speed up the test + Arguments.of("Null list", "{\"test\" : null }", "{\"test\" : [1,2] }"), + Arguments.of("Null object", "{\"test\" : null }", "{\"test\" : {\"test2\": 1} }"), + Arguments.of("Empty list", "{\"test\" : [] }", "{\"test\" : [1,2] }"), + Arguments.of("Empty object", "{\"test\" : {} }", "{\"test\" : {\"test2\": 1} }"), + Arguments.of( + "Null in nested object", + "{\"test\" : {\"test2\": null} }", + "{\"test\" : {\"test2\": 1} }"), + Arguments.of( + "Empty list in nested object", + "{\"test\" : {\"test2\": []} }", + "{\"test\" : {\"test2\": [1]} }"), + Arguments.of( + "Empty object in nested object", + "{\"test\" : {\"test2\": {}} }", + "{\"test\" : {\"test2\": {\"test3\": 1}} }")); + } + + private static Stream wrongTypeValueMessages_dataSource() { + return Stream.of( + Arguments.of("Boolean into double column", "{\"test\" : 2.5 }", "{\"test\" : true }"), + Arguments.of("String into double column", "{\"test\" : 2.5 }", "{\"test\" : \"Solnik\" }"), + Arguments.of("Int into list", "{\"test\" : [1,2] }", "{\"test\" : 1 }"), + Arguments.of("Int into object", "{\"test\" : {\"test2\": 1} }", "{\"test\" : 1 }"), + Arguments.of( + "String into boolean in nested object", + "{\"test\" : {\"test2\": true} }", + "{\"test\" : {\"test2\": \"solnik\"} }")); + } } diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/ComplexJsonRecord.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/ComplexJsonRecord.java index 4a8637570..5dc96961d 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/ComplexJsonRecord.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/ComplexJsonRecord.java @@ -30,11 +30,11 @@ public class ComplexJsonRecord { true, ImmutableList.of(1, 2, 3), ImmutableList.of("a", "b", "c"), - ImmutableList.of(), - null, + ImmutableList.of(true), + ImmutableList.of(1, 4), ImmutableList.of(ImmutableList.of(7, 8, 9), ImmutableList.of(10, 11, 12)), PrimitiveJsonRecord.primitiveJsonRecordValueExample, - null); + PrimitiveJsonRecord.primitiveJsonRecordValueExample); public static final String complexJsonPayloadExample = "{" @@ -48,13 +48,35 @@ public class ComplexJsonRecord { + " \"approval\": true," + " \"array1\": [1, 2, 3]," + " \"array2\": [\"a\", \"b\", \"c\"]," - + " \"array3\": []," - + " \"array4\": null," + + " \"array3\": [true]," + + " \"array4\": [1, 4]," + " \"array5\": [[7, 8, 9], [10, 11, 12]]," + " \"nestedRecord\": " + PrimitiveJsonRecord.primitiveJsonExample + "," - + " \"nestedRecord2\": null" + + " \"nestedRecord2\": " + + PrimitiveJsonRecord.primitiveJsonExample + + "}"; + + public static final String complexJsonPayloadWithWrongValueTypeExample = + "{" + + " \"id_int8\": 8," + + " \"id_int16\": 16," + + " \"id_int32\": 32," + + " \"id_int64\": 64," + + " \"description\": \"dogs are the best\"," + + " \"rating_float32\": 0.5," + + " \"rating_float64\": 0.25," + + " \"approval\": true," + + " \"array1\": [1, 2, 3]," + + " \"array2\": [\"a\", \"b\", \"c\"]," + + " \"array3\": [true]," + + " \"array4\": [1, 4]," + + " \"array5\": [[7, 8, 9], [10, 11, 12]]," + + " \"nestedRecord\": " + + PrimitiveJsonRecord.primitiveJsonExample + + "," + + " \"nestedRecord2\": 25" + "}"; public static final String complexJsonWithSchemaExample = diff --git a/test/test_suit/base_iceberg_test.py b/test/test_suit/base_iceberg_test.py index b9ab7dafc..5131e5993 100644 --- a/test/test_suit/base_iceberg_test.py +++ b/test/test_suit/base_iceberg_test.py @@ -36,7 +36,7 @@ def __init__(self, driver, name_salt: str, config_file_name: str): "null_object": {"key": "value"}, "empty_array": [1, 2, 3], "some_object": { - "null_key": None, + "null_key": "solnik", "string_key": "string_key", "another_string_key": "another_string_key", "inner_object": { @@ -45,6 +45,18 @@ def __init__(self, driver, name_salt: str, config_file_name: str): } } + self.test_message_for_schema_evolution_3 = { + "extra_null_long": None, + "null_long": None, + "null_array": None, + "null_object": None, + "empty_array": [], + "some_object": { + "null_key": None, + "string_key": "string_key" + } + } + self.test_message_from_docs_schema = """ { "type":"record", @@ -264,7 +276,7 @@ def _assert_number_of_records_in_table(self, expected_number_of_records: int): raise RetryableError() elif number_of_records != expected_number_of_records: raise NonRetryableError( - "Number of record in table is different from number of record sent" + f'Number of record in table is different from number of record sent. Expected {expected_number_of_records}, got {number_of_records}.' ) @@ -308,7 +320,7 @@ def _verify_iceberg_content_for_schema_evolution_2(self, content: dict): assert_equals([1, 2, 3], content['null_array']) assert_equals('value', content['null_object']['key']) assert_equals([1, 2, 3], content['empty_array']) - assert_equals(None, content['some_object']['null_key']) + assert_equals("solnik", content['some_object']['null_key']) assert_equals('string_key', content['some_object']['string_key']) assert_equals('another_string_key', content['some_object']['another_string_key']) assert_equals(456, content['some_object']['inner_object']['inner_object_key']) diff --git a/test/test_suit/iceberg_schema_evolution_json_aws.py b/test/test_suit/iceberg_schema_evolution_json_aws.py index 82f46e18e..749dcdc97 100644 --- a/test/test_suit/iceberg_schema_evolution_json_aws.py +++ b/test/test_suit/iceberg_schema_evolution_json_aws.py @@ -16,22 +16,28 @@ def setup(self): def send(self): self._send_json_values(self.test_message_from_docs, 100) - self._send_json_values(self.test_message_for_schema_evolution_1, 100) - # TODO SNOW-1731264 - # net.snowflake.ingest.utils.SFException: The given row cannot be converted to the internal format: Object of type java.util.LinkedHashMap cannot be ingested into Snowflake column NULL_OBJECT of type STRING, rowIndex:0. Allowed Java types: String, Number, boolean, char - # self._send_json_values(self.test_message_for_schema_evolution_2, 100) + # first 10 messages should be discarded due to lack of schema for null fields, but after schema evolution + # coming from the next messages, offset should be reset and the messages should once again consumed and inserted + self._send_json_values(self.test_message_for_schema_evolution_1, 10) + # this message should be never inserted due to lack of schema for one extra null field + self._send_json_values(self.test_message_for_schema_evolution_3, 10) + self._send_json_values(self.test_message_for_schema_evolution_2, 100) + # now with the schema coming from test_message_for_schema_evolution_2 we should be able to insert null values + self._send_json_values(self.test_message_for_schema_evolution_1, 10) + # this message should be never inserted due to lack of schema for one extra null field + self._send_json_values(self.test_message_for_schema_evolution_3, 10) def verify(self, round): - self._assert_number_of_records_in_table(200) + self._assert_number_of_records_in_table(220) actual_record_from_docs_dict = self._select_schematized_record_with_offset(1) self._verify_iceberg_content_from_docs(actual_record_from_docs_dict) - actual_record_for_schema_evolution_1 = self._select_schematized_record_with_offset(100) - self._verify_iceberg_content_for_schema_evolution_1(actual_record_for_schema_evolution_1) + actual_record_for_schema_evolution_2 = self._select_schematized_record_with_offset(120) + self._verify_iceberg_content_for_schema_evolution_2(actual_record_for_schema_evolution_2) - # TODO SNOW-1731264 - # actual_record_for_schema_evolution_2 = self._select_schematized_record_with_offset(200) + actual_record_for_schema_evolution_1 = self._select_schematized_record_with_offset(210) + self._verify_iceberg_content_for_schema_evolution_1(actual_record_for_schema_evolution_1)