Skip to content

Commit

Permalink
SNOW-1731255: Put null or empty json nodes to DLQ for schema evolution (
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-wtrefon authored Dec 6, 2024
1 parent 2f4be67 commit 77437fa
Show file tree
Hide file tree
Showing 13 changed files with 286 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,11 @@ public enum SnowflakeErrors {
"Could not allocate thread for file cleaner to start processing in given time. If problem"
+ " persists, please try setting snowflake.snowpipe.use_new_cleaner to false"),
ERROR_5025(
"5025", "Unexpected data type", "Unexpected data type encountered during schema evolution.");
"5025", "Unexpected data type", "Unexpected data type encountered during schema evolution."),
ERROR_5026(
"5026",
"Invalid SinkRecord received",
"Cannot infer type from null or empty object/list during schema evolution.");

// properties

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.dlq.KafkaRecordErrorReporter;
Expand All @@ -44,13 +41,15 @@
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import net.snowflake.ingest.streaming.*;
import net.snowflake.ingest.utils.SFException;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -536,17 +535,38 @@ private void handleInsertRowFailure(
SchemaEvolutionTargetItems schemaEvolutionTargetItems =
insertErrorMapper.mapToSchemaEvolutionItems(insertError, this.channel.getTableName());
if (schemaEvolutionTargetItems.hasDataForSchemaEvolution()) {
schemaEvolutionService.evolveSchemaIfNeeded(
schemaEvolutionTargetItems, kafkaSinkRecord, channel.getTableSchema());
streamingApiFallbackSupplier(
StreamingApiFallbackInvoker.INSERT_ROWS_SCHEMA_EVOLUTION_FALLBACK);
try {
schemaEvolutionService.evolveSchemaIfNeeded(
schemaEvolutionTargetItems, kafkaSinkRecord, channel.getTableSchema());
streamingApiFallbackSupplier(
StreamingApiFallbackInvoker.INSERT_ROWS_SCHEMA_EVOLUTION_FALLBACK);
} catch (SnowflakeKafkaConnectorException e) {
LOGGER.error(
"Error while performing schema evolution for channel:{}",
this.getChannelNameFormatV1(),
e);
if (Objects.equals(e.getCode(), SnowflakeErrors.ERROR_5026.getCode())) {
handleError(Collections.singletonList(e), kafkaSinkRecord);
} else {
throw e;
}
}

return;
}
}

handleError(
insertErrors.stream()
.map(InsertValidationResponse.InsertError::getException)
.collect(Collectors.toList()),
kafkaSinkRecord);
}

private void handleError(List<Exception> insertErrors, SinkRecord kafkaSinkRecord) {
if (logErrors) {
for (InsertValidationResponse.InsertError insertError : insertErrors) {
LOGGER.error("Insert Row Error message:{}", insertError.getException().getMessage());
for (Exception insertError : insertErrors) {
LOGGER.error("Insert Row Error message:{}", insertError.getMessage());
}
}
if (errorTolerance) {
Expand All @@ -563,7 +583,6 @@ private void handleInsertRowFailure(
this.kafkaRecordErrorReporter.reportError(
kafkaSinkRecord,
insertErrors.stream()
.map(InsertValidationResponse.InsertError::getException)
.findFirst()
.orElseThrow(
() ->
Expand All @@ -574,20 +593,12 @@ private void handleInsertRowFailure(
final String errMsg =
String.format(
"Error inserting Records using Streaming API with msg:%s",
insertErrors.get(0).getException().getMessage());
insertErrors.get(0).getMessage());
this.telemetryServiceV2.reportKafkaConnectFatalError(errMsg);
throw new DataException(errMsg, insertErrors.get(0).getException());
throw new DataException(errMsg, insertErrors.get(0));
}
}

private List<String> join(
List<String> nonNullableColumns, List<String> nullValueForNotNullColNames) {
return Lists.newArrayList(
Iterables.concat(
Optional.ofNullable(nonNullableColumns).orElse(ImmutableList.of()),
Optional.ofNullable(nullValueForNotNullColNames).orElse(ImmutableList.of())));
}

// TODO: SNOW-529755 POLL committed offsets in background thread

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private LinkedHashMap<String, IcebergFieldNode> produceChildren(Type apacheIcebe

// -- parse tree from kafka record payload logic --
private IcebergFieldNode createNode(String name, JsonNode jsonNode) {
String snowflakeType = mapper.mapToColumnTypeFromJson(jsonNode);
String snowflakeType = mapper.mapToColumnTypeFromJson(name, jsonNode);
return new IcebergFieldNode(name, snowflakeType, produceChildren(jsonNode));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg;

import static com.snowflake.kafka.connector.internal.SnowflakeErrors.ERROR_5026;
import static org.apache.kafka.connect.data.Schema.Type.ARRAY;
import static org.apache.kafka.connect.data.Schema.Type.BOOLEAN;
import static org.apache.kafka.connect.data.Schema.Type.BYTES;
Expand All @@ -11,6 +12,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.snowflake.kafka.connector.internal.SnowflakeErrors;
import com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.kafka.connect.data.Date;
Expand Down Expand Up @@ -72,8 +74,8 @@ String mapToColumnTypeFromIcebergSchema(Type apacheIcebergType) {
*
* <p>Converts Types from: JsonNode -> KafkaKafka -> Snowflake.
*/
String mapToColumnTypeFromJson(JsonNode value) {
Schema.Type kafkaType = mapJsonNodeTypeToKafkaType(value);
String mapToColumnTypeFromJson(String name, JsonNode value) {
Schema.Type kafkaType = mapJsonNodeTypeToKafkaType(name, value);
return mapToColumnTypeFromKafkaSchema(kafkaType, null);
}

Expand Down Expand Up @@ -124,15 +126,19 @@ String mapToColumnTypeFromKafkaSchema(Schema.Type kafkaType, String schemaName)
}

/**
* Map the JSON node type to Kafka type
* Map the JSON node type to Kafka type. For null and empty values, we can't infer the type, so we
* throw an exception.
*
* @param name column/field name
* @param value JSON node
* @throws SnowflakeKafkaConnectorException if the value is null or empty array or empty object
* @return Kafka type
*/
Schema.Type mapJsonNodeTypeToKafkaType(JsonNode value) {
if (value == null || value.isNull()) {
return STRING;
} else if (value.isNumber()) {
Schema.Type mapJsonNodeTypeToKafkaType(String name, JsonNode value) {
if (cannotInferType(value)) {
throw ERROR_5026.getException("'" + name + "' field value is null or empty");
}
if (value.isNumber()) {
if (value.isFloat()) {
return FLOAT32;
} else if (value.isDouble()) {
Expand All @@ -154,4 +160,12 @@ Schema.Type mapJsonNodeTypeToKafkaType(JsonNode value) {
return null;
}
}

boolean cannotInferType(JsonNode value) {
// cannot infer type if value null or empty array or empty object
return value == null
|| value.isNull()
|| (value.isArray() && value.isEmpty())
|| (value.isObject() && value.isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ private ReportedRecord(final SinkRecord record, final Throwable e) {
this.e = e;
}

public SinkRecord getRecord() {
return record;
}

public Throwable getException() {
return e;
}

@Override
public String toString() {
return "ReportedData{" + "record=" + record + ", e=" + e + '}';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg;

import static com.snowflake.kafka.connector.internal.SnowflakeErrors.ERROR_5026;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException;
import java.util.stream.Stream;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand All @@ -29,7 +33,26 @@ void shouldMapKafkaTypeToSnowflakeColumnType(
@ParameterizedTest()
@MethodSource("jsonNodeTypesToMap")
void shouldMapJsonNodeTypeToKafkaType(JsonNode value, Schema.Type expectedKafkaType) {
assertThat(mapper.mapJsonNodeTypeToKafkaType(value)).isEqualTo(expectedKafkaType);
assertThat(mapper.mapJsonNodeTypeToKafkaType("test", value)).isEqualTo(expectedKafkaType);
}

@ParameterizedTest()
@MethodSource("jsonNodeValuesToThrowException")
void shouldThrowExceptionWhenMappingEmptyOrNullNode(JsonNode value) {
assertThatThrownBy(() -> mapper.mapJsonNodeTypeToKafkaType("test", value))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
.hasMessageContaining("'test' field value is null or empty")
.matches(
e -> ((SnowflakeKafkaConnectorException) e).getCode().equals(ERROR_5026.getCode()));
}

@Test
void shouldThrowExceptionForNullValue() {
assertThatThrownBy(() -> mapper.mapJsonNodeTypeToKafkaType("test", null))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
.hasMessageContaining("'test' field value is null or empty")
.matches(
e -> ((SnowflakeKafkaConnectorException) e).getCode().equals(ERROR_5026.getCode()));
}

private static Stream<Arguments> kafkaTypesToMap() {
Expand All @@ -52,16 +75,8 @@ private static Stream<Arguments> kafkaTypesToMap() {
Arguments.of(Schema.Type.STRUCT, null, "OBJECT"));
}

private static Stream<Arguments> kafkaTypesToThrowException() {
return Stream.of(
Arguments.of(Schema.Type.ARRAY),
Arguments.of(Schema.Type.MAP),
Arguments.of(Schema.Type.STRUCT));
}

private static Stream<Arguments> jsonNodeTypesToMap() {
return Stream.of(
Arguments.of(JsonNodeFactory.instance.nullNode(), Schema.Type.STRING),
Arguments.of(JsonNodeFactory.instance.numberNode((short) 1), Schema.Type.INT64),
Arguments.of(JsonNodeFactory.instance.numberNode(1), Schema.Type.INT64),
Arguments.of(JsonNodeFactory.instance.numberNode(1L), Schema.Type.INT64),
Expand All @@ -70,7 +85,14 @@ private static Stream<Arguments> jsonNodeTypesToMap() {
Arguments.of(JsonNodeFactory.instance.booleanNode(true), Schema.Type.BOOLEAN),
Arguments.of(JsonNodeFactory.instance.textNode("text"), Schema.Type.STRING),
Arguments.of(JsonNodeFactory.instance.binaryNode(new byte[] {1, 2, 3}), Schema.Type.BYTES),
Arguments.of(JsonNodeFactory.instance.arrayNode(), Schema.Type.ARRAY),
Arguments.of(JsonNodeFactory.instance.objectNode(), Schema.Type.STRUCT));
Arguments.of(JsonNodeFactory.instance.arrayNode().add(1), Schema.Type.ARRAY),
Arguments.of(JsonNodeFactory.instance.objectNode().put("test", 1), Schema.Type.STRUCT));
}

private static Stream<Arguments> jsonNodeValuesToThrowException() {
return Stream.of(
Arguments.of(JsonNodeFactory.instance.nullNode()),
Arguments.of(JsonNodeFactory.instance.arrayNode()),
Arguments.of(JsonNodeFactory.instance.objectNode()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,7 @@ static Stream<Arguments> parseFromJsonArguments() {
+ " ] } ",
"ARRAY(OBJECT(name VARCHAR, id LONG))"),
// array
arguments("{\"testColumnName\": [1,2,3] }", "ARRAY(LONG)"),
arguments("{ \"testColumnName\": [] }", "ARRAY(VARCHAR(16777216))"));
arguments("{\"testColumnName\": [1,2,3] }", "ARRAY(LONG)"));
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public abstract class IcebergIngestionIT extends BaseIcebergIT {
protected String tableName;
protected TopicPartition topicPartition;
protected SnowflakeSinkService service;
protected InMemoryKafkaRecordErrorReporter kafkaRecordErrorReporter;
protected static final String simpleRecordJson = "{\"simple\": \"extra field\"}";

@BeforeEach
Expand All @@ -51,6 +52,8 @@ public void setUp() {
config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "true");
// "snowflake.streaming.max.client.lag" = 1 second, for faster tests
config.put(SNOWPIPE_STREAMING_MAX_CLIENT_LAG, "1");
config.put(ERRORS_TOLERANCE_CONFIG, SnowflakeSinkConnectorConfig.ErrorTolerance.ALL.toString());
config.put(ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "test_DLQ");

createIcebergTable();
enableSchemaEvolution(tableName);
Expand All @@ -59,10 +62,11 @@ public void setUp() {
Map<String, String> topic2Table = new HashMap<>();
topic2Table.put(topic, tableName);

kafkaRecordErrorReporter = new InMemoryKafkaRecordErrorReporter();
service =
SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config)
.setRecordNumber(1)
.setErrorReporter(new InMemoryKafkaRecordErrorReporter())
.setErrorReporter(kafkaRecordErrorReporter)
.setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition)))
.setTopic2TableMap(topic2Table)
.addTask(tableName, topicPartition)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package com.snowflake.kafka.connector.streaming.iceberg;

import static com.snowflake.kafka.connector.streaming.iceberg.sql.ComplexJsonRecord.complexJsonPayloadExample;
import static com.snowflake.kafka.connector.streaming.iceberg.sql.ComplexJsonRecord.complexJsonPayloadWithWrongValueTypeExample;
import static com.snowflake.kafka.connector.streaming.iceberg.sql.ComplexJsonRecord.complexJsonRecordValueExample;
import static org.assertj.core.api.Assertions.assertThat;

import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.dlq.InMemoryKafkaRecordErrorReporter;
import com.snowflake.kafka.connector.streaming.iceberg.sql.ComplexJsonRecord;
import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord;
import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord.RecordWithMetadata;
Expand All @@ -12,6 +15,8 @@
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand Down Expand Up @@ -89,10 +94,33 @@ void shouldInsertRecords(String description, String message, boolean withSchema)
service.insert(Collections.singletonList(createKafkaRecord(message, 2, withSchema)));
waitForOffset(3);

assertRecordsInTable();
assertRecordsInTable(0L, 1L, 2L);
}

private void assertRecordsInTable() {
@Test
void shouldSendValueWithWrongTypeToDLQ() throws Exception {
SinkRecord wrongValueRecord1 =
createKafkaRecord(complexJsonPayloadWithWrongValueTypeExample, 0, false);
SinkRecord wrongValueRecord2 =
createKafkaRecord(complexJsonPayloadWithWrongValueTypeExample, 2, false);
service.insert(
Arrays.asList(
wrongValueRecord1,
createKafkaRecord(complexJsonPayloadExample, 1, false),
wrongValueRecord2,
createKafkaRecord(complexJsonPayloadExample, 3, false),
createKafkaRecord(complexJsonPayloadExample, 4, false)));
waitForOffset(5);

assertRecordsInTable(1L, 3L, 4L);
List<InMemoryKafkaRecordErrorReporter.ReportedRecord> reportedRecords =
kafkaRecordErrorReporter.getReportedRecords();
assertThat(reportedRecords).hasSize(2);
assertThat(reportedRecords.stream().map(it -> it.getRecord()).collect(Collectors.toList()))
.containsExactlyInAnyOrder(wrongValueRecord1, wrongValueRecord2);
}

private void assertRecordsInTable(Long... expectedOffsets) {
List<RecordWithMetadata<ComplexJsonRecord>> recordsWithMetadata =
selectAllComplexJsonRecordFromRecordContent();
assertThat(recordsWithMetadata)
Expand All @@ -106,7 +134,9 @@ private void assertRecordsInTable() {
recordsWithMetadata.stream()
.map(RecordWithMetadata::getMetadata)
.collect(Collectors.toList());
assertThat(metadataRecords).extracting(MetadataRecord::getOffset).containsExactly(0L, 1L, 2L);
assertThat(metadataRecords)
.extracting(MetadataRecord::getOffset)
.containsExactly(expectedOffsets);
assertThat(metadataRecords)
.hasSize(3)
.allMatch(
Expand Down
Loading

0 comments on commit 77437fa

Please sign in to comment.