Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1731255: Put null or empty json nodes to DLQ for schema evolution #1017

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,11 @@ public enum SnowflakeErrors {
"Could not allocate thread for file cleaner to start processing in given time. If problem"
+ " persists, please try setting snowflake.snowpipe.use_new_cleaner to false"),
ERROR_5025(
"5025", "Unexpected data type", "Unexpected data type encountered during schema evolution.");
"5025", "Unexpected data type", "Unexpected data type encountered during schema evolution."),
ERROR_5026(
sfc-gh-mbobowski marked this conversation as resolved.
Show resolved Hide resolved
"5026",
"Invalid SinkRecord received",
"Cannot infer type from null or empty object/list during schema evolution.");

// properties

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.dlq.KafkaRecordErrorReporter;
Expand All @@ -44,13 +41,15 @@
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import net.snowflake.ingest.streaming.*;
import net.snowflake.ingest.utils.SFException;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -536,17 +535,38 @@ private void handleInsertRowFailure(
SchemaEvolutionTargetItems schemaEvolutionTargetItems =
insertErrorMapper.mapToSchemaEvolutionItems(insertError, this.channel.getTableName());
if (schemaEvolutionTargetItems.hasDataForSchemaEvolution()) {
schemaEvolutionService.evolveSchemaIfNeeded(
schemaEvolutionTargetItems, kafkaSinkRecord, channel.getTableSchema());
streamingApiFallbackSupplier(
StreamingApiFallbackInvoker.INSERT_ROWS_SCHEMA_EVOLUTION_FALLBACK);
try {
schemaEvolutionService.evolveSchemaIfNeeded(
schemaEvolutionTargetItems, kafkaSinkRecord, channel.getTableSchema());
streamingApiFallbackSupplier(
StreamingApiFallbackInvoker.INSERT_ROWS_SCHEMA_EVOLUTION_FALLBACK);
} catch (SnowflakeKafkaConnectorException e) {
LOGGER.error(
"Error while performing schema evolution for channel:{}",
this.getChannelNameFormatV1(),
e);
if (Objects.equals(e.getCode(), SnowflakeErrors.ERROR_5026.getCode())) {
handleError(Collections.singletonList(e), kafkaSinkRecord);
} else {
throw e;
}
}

return;
}
}

handleError(
insertErrors.stream()
.map(InsertValidationResponse.InsertError::getException)
.collect(Collectors.toList()),
kafkaSinkRecord);
}

private void handleError(List<Exception> insertErrors, SinkRecord kafkaSinkRecord) {
if (logErrors) {
for (InsertValidationResponse.InsertError insertError : insertErrors) {
LOGGER.error("Insert Row Error message:{}", insertError.getException().getMessage());
for (Exception insertError : insertErrors) {
LOGGER.error("Insert Row Error message:{}", insertError.getMessage());
}
}
if (errorTolerance) {
Expand All @@ -563,7 +583,6 @@ private void handleInsertRowFailure(
this.kafkaRecordErrorReporter.reportError(
kafkaSinkRecord,
insertErrors.stream()
.map(InsertValidationResponse.InsertError::getException)
.findFirst()
.orElseThrow(
() ->
Expand All @@ -574,20 +593,12 @@ private void handleInsertRowFailure(
final String errMsg =
String.format(
"Error inserting Records using Streaming API with msg:%s",
insertErrors.get(0).getException().getMessage());
insertErrors.get(0).getMessage());
this.telemetryServiceV2.reportKafkaConnectFatalError(errMsg);
throw new DataException(errMsg, insertErrors.get(0).getException());
throw new DataException(errMsg, insertErrors.get(0));
}
}

private List<String> join(
List<String> nonNullableColumns, List<String> nullValueForNotNullColNames) {
return Lists.newArrayList(
Iterables.concat(
Optional.ofNullable(nonNullableColumns).orElse(ImmutableList.of()),
Optional.ofNullable(nullValueForNotNullColNames).orElse(ImmutableList.of())));
}

// TODO: SNOW-529755 POLL committed offsets in background thread

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private LinkedHashMap<String, IcebergFieldNode> produceChildren(Type apacheIcebe

// -- parse tree from kafka record payload logic --
private IcebergFieldNode createNode(String name, JsonNode jsonNode) {
String snowflakeType = mapper.mapToColumnTypeFromJson(jsonNode);
String snowflakeType = mapper.mapToColumnTypeFromJson(name, jsonNode);
return new IcebergFieldNode(name, snowflakeType, produceChildren(jsonNode));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg;

import static com.snowflake.kafka.connector.internal.SnowflakeErrors.ERROR_5026;
import static org.apache.kafka.connect.data.Schema.Type.ARRAY;
import static org.apache.kafka.connect.data.Schema.Type.BOOLEAN;
import static org.apache.kafka.connect.data.Schema.Type.BYTES;
Expand All @@ -11,6 +12,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.snowflake.kafka.connector.internal.SnowflakeErrors;
import com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.kafka.connect.data.Date;
Expand Down Expand Up @@ -72,8 +74,8 @@ String mapToColumnTypeFromIcebergSchema(Type apacheIcebergType) {
*
* <p>Converts Types from: JsonNode -> KafkaKafka -> Snowflake.
*/
String mapToColumnTypeFromJson(JsonNode value) {
Schema.Type kafkaType = mapJsonNodeTypeToKafkaType(value);
String mapToColumnTypeFromJson(String name, JsonNode value) {
Schema.Type kafkaType = mapJsonNodeTypeToKafkaType(name, value);
return mapToColumnTypeFromKafkaSchema(kafkaType, null);
}

Expand Down Expand Up @@ -124,15 +126,19 @@ String mapToColumnTypeFromKafkaSchema(Schema.Type kafkaType, String schemaName)
}

/**
* Map the JSON node type to Kafka type
* Map the JSON node type to Kafka type. For null and empty values, we can't infer the type, so we
* throw an exception.
*
* @param name column/field name
* @param value JSON node
* @throws SnowflakeKafkaConnectorException if the value is null or empty array or empty object
* @return Kafka type
*/
Schema.Type mapJsonNodeTypeToKafkaType(JsonNode value) {
if (value == null || value.isNull()) {
return STRING;
} else if (value.isNumber()) {
Schema.Type mapJsonNodeTypeToKafkaType(String name, JsonNode value) {
if (cannotInferType(value)) {
throw ERROR_5026.getException("'" + name + "' field value is null or empty");
}
if (value.isNumber()) {
if (value.isFloat()) {
return FLOAT32;
} else if (value.isDouble()) {
Expand All @@ -154,4 +160,12 @@ Schema.Type mapJsonNodeTypeToKafkaType(JsonNode value) {
return null;
}
}

boolean cannotInferType(JsonNode value) {
// cannot infer type if value null or empty array or empty object
return value == null
|| value.isNull()
|| (value.isArray() && value.isEmpty())
|| (value.isObject() && value.isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ private ReportedRecord(final SinkRecord record, final Throwable e) {
this.e = e;
}

public SinkRecord getRecord() {
return record;
}

public Throwable getException() {
return e;
}

@Override
public String toString() {
return "ReportedData{" + "record=" + record + ", e=" + e + '}';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg;

import static com.snowflake.kafka.connector.internal.SnowflakeErrors.ERROR_5026;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException;
import java.util.stream.Stream;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand All @@ -29,7 +33,26 @@ void shouldMapKafkaTypeToSnowflakeColumnType(
@ParameterizedTest()
@MethodSource("jsonNodeTypesToMap")
void shouldMapJsonNodeTypeToKafkaType(JsonNode value, Schema.Type expectedKafkaType) {
assertThat(mapper.mapJsonNodeTypeToKafkaType(value)).isEqualTo(expectedKafkaType);
assertThat(mapper.mapJsonNodeTypeToKafkaType("test", value)).isEqualTo(expectedKafkaType);
}

@ParameterizedTest()
@MethodSource("jsonNodeValuesToThrowException")
void shouldThrowExceptionWhenMappingEmptyOrNullNode(JsonNode value) {
assertThatThrownBy(() -> mapper.mapJsonNodeTypeToKafkaType("test", value))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
.hasMessageContaining("'test' field value is null or empty")
.matches(
e -> ((SnowflakeKafkaConnectorException) e).getCode().equals(ERROR_5026.getCode()));
}

@Test
void shouldThrowExceptionForNullValue() {
assertThatThrownBy(() -> mapper.mapJsonNodeTypeToKafkaType("test", null))
.isInstanceOf(SnowflakeKafkaConnectorException.class)
.hasMessageContaining("'test' field value is null or empty")
.matches(
e -> ((SnowflakeKafkaConnectorException) e).getCode().equals(ERROR_5026.getCode()));
}

private static Stream<Arguments> kafkaTypesToMap() {
Expand All @@ -52,16 +75,8 @@ private static Stream<Arguments> kafkaTypesToMap() {
Arguments.of(Schema.Type.STRUCT, null, "OBJECT"));
}

private static Stream<Arguments> kafkaTypesToThrowException() {
return Stream.of(
Arguments.of(Schema.Type.ARRAY),
Arguments.of(Schema.Type.MAP),
Arguments.of(Schema.Type.STRUCT));
}

private static Stream<Arguments> jsonNodeTypesToMap() {
return Stream.of(
Arguments.of(JsonNodeFactory.instance.nullNode(), Schema.Type.STRING),
Arguments.of(JsonNodeFactory.instance.numberNode((short) 1), Schema.Type.INT64),
Arguments.of(JsonNodeFactory.instance.numberNode(1), Schema.Type.INT64),
Arguments.of(JsonNodeFactory.instance.numberNode(1L), Schema.Type.INT64),
Expand All @@ -70,7 +85,14 @@ private static Stream<Arguments> jsonNodeTypesToMap() {
Arguments.of(JsonNodeFactory.instance.booleanNode(true), Schema.Type.BOOLEAN),
Arguments.of(JsonNodeFactory.instance.textNode("text"), Schema.Type.STRING),
Arguments.of(JsonNodeFactory.instance.binaryNode(new byte[] {1, 2, 3}), Schema.Type.BYTES),
Arguments.of(JsonNodeFactory.instance.arrayNode(), Schema.Type.ARRAY),
Arguments.of(JsonNodeFactory.instance.objectNode(), Schema.Type.STRUCT));
Arguments.of(JsonNodeFactory.instance.arrayNode().add(1), Schema.Type.ARRAY),
Arguments.of(JsonNodeFactory.instance.objectNode().put("test", 1), Schema.Type.STRUCT));
}

private static Stream<Arguments> jsonNodeValuesToThrowException() {
return Stream.of(
Arguments.of(JsonNodeFactory.instance.nullNode()),
Arguments.of(JsonNodeFactory.instance.arrayNode()),
Arguments.of(JsonNodeFactory.instance.objectNode()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,7 @@ static Stream<Arguments> parseFromJsonArguments() {
+ " ] } ",
"ARRAY(OBJECT(name VARCHAR, id LONG))"),
// array
arguments("{\"testColumnName\": [1,2,3] }", "ARRAY(LONG)"),
arguments("{ \"testColumnName\": [] }", "ARRAY(VARCHAR(16777216))"));
arguments("{\"testColumnName\": [1,2,3] }", "ARRAY(LONG)"));
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public abstract class IcebergIngestionIT extends BaseIcebergIT {
protected String tableName;
protected TopicPartition topicPartition;
protected SnowflakeSinkService service;
protected InMemoryKafkaRecordErrorReporter kafkaRecordErrorReporter;
protected static final String simpleRecordJson = "{\"simple\": \"extra field\"}";

@BeforeEach
Expand All @@ -51,6 +52,8 @@ public void setUp() {
config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "true");
// "snowflake.streaming.max.client.lag" = 1 second, for faster tests
config.put(SNOWPIPE_STREAMING_MAX_CLIENT_LAG, "1");
config.put(ERRORS_TOLERANCE_CONFIG, SnowflakeSinkConnectorConfig.ErrorTolerance.ALL.toString());
config.put(ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "test_DLQ");

createIcebergTable();
enableSchemaEvolution(tableName);
Expand All @@ -59,10 +62,11 @@ public void setUp() {
Map<String, String> topic2Table = new HashMap<>();
topic2Table.put(topic, tableName);

kafkaRecordErrorReporter = new InMemoryKafkaRecordErrorReporter();
service =
SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config)
.setRecordNumber(1)
.setErrorReporter(new InMemoryKafkaRecordErrorReporter())
.setErrorReporter(kafkaRecordErrorReporter)
.setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition)))
.setTopic2TableMap(topic2Table)
.addTask(tableName, topicPartition)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package com.snowflake.kafka.connector.streaming.iceberg;

import static com.snowflake.kafka.connector.streaming.iceberg.sql.ComplexJsonRecord.complexJsonPayloadExample;
import static com.snowflake.kafka.connector.streaming.iceberg.sql.ComplexJsonRecord.complexJsonPayloadWithWrongValueTypeExample;
import static com.snowflake.kafka.connector.streaming.iceberg.sql.ComplexJsonRecord.complexJsonRecordValueExample;
import static org.assertj.core.api.Assertions.assertThat;

import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.dlq.InMemoryKafkaRecordErrorReporter;
import com.snowflake.kafka.connector.streaming.iceberg.sql.ComplexJsonRecord;
import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord;
import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord.RecordWithMetadata;
Expand All @@ -12,6 +15,8 @@
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand Down Expand Up @@ -89,10 +94,33 @@ void shouldInsertRecords(String description, String message, boolean withSchema)
service.insert(Collections.singletonList(createKafkaRecord(message, 2, withSchema)));
waitForOffset(3);

assertRecordsInTable();
assertRecordsInTable(0L, 1L, 2L);
}

private void assertRecordsInTable() {
@Test
void shouldSendValueWithWrongTypeToDLQ() throws Exception {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

additional test for unhappy path

SinkRecord wrongValueRecord1 =
createKafkaRecord(complexJsonPayloadWithWrongValueTypeExample, 0, false);
SinkRecord wrongValueRecord2 =
createKafkaRecord(complexJsonPayloadWithWrongValueTypeExample, 2, false);
service.insert(
Arrays.asList(
wrongValueRecord1,
createKafkaRecord(complexJsonPayloadExample, 1, false),
wrongValueRecord2,
createKafkaRecord(complexJsonPayloadExample, 3, false),
createKafkaRecord(complexJsonPayloadExample, 4, false)));
waitForOffset(5);

assertRecordsInTable(1L, 3L, 4L);
List<InMemoryKafkaRecordErrorReporter.ReportedRecord> reportedRecords =
kafkaRecordErrorReporter.getReportedRecords();
assertThat(reportedRecords).hasSize(2);
assertThat(reportedRecords.stream().map(it -> it.getRecord()).collect(Collectors.toList()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If only it would be Kotlin...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i try to forget... but still I sometimes remember the old good times

.containsExactlyInAnyOrder(wrongValueRecord1, wrongValueRecord2);
}

private void assertRecordsInTable(Long... expectedOffsets) {
List<RecordWithMetadata<ComplexJsonRecord>> recordsWithMetadata =
selectAllComplexJsonRecordFromRecordContent();
assertThat(recordsWithMetadata)
Expand All @@ -106,7 +134,9 @@ private void assertRecordsInTable() {
recordsWithMetadata.stream()
.map(RecordWithMetadata::getMetadata)
.collect(Collectors.toList());
assertThat(metadataRecords).extracting(MetadataRecord::getOffset).containsExactly(0L, 1L, 2L);
assertThat(metadataRecords)
.extracting(MetadataRecord::getOffset)
.containsExactly(expectedOffsets);
assertThat(metadataRecords)
.hasSize(3)
.allMatch(
Expand Down
Loading
Loading