Skip to content

Commit

Permalink
SNOW-938038: Support AVRO Logical Types (snowflakedb#722)
Browse files Browse the repository at this point in the history
There're 10 AVRO logical types listed in https://avro.apache.org/docs/1.11.0/spec.html#Logical+Types and we're able to support 4 of them, the mapping is as below:

date -> DATE
time-mills -> TIME(6)
timestamp-mills -> TIMESTAMP_NTZ(6)
decimal -> VARCHAR (We can't do NUMBER because we could have precision bigger than 36)

We can't do for the rest of 6 types because that's not supported by ConnectSchema, see code for more detail. We need to find another way to support other logical types or any sources (like Debezium).
  • Loading branch information
sfc-gh-tzhang authored and khsoneji committed Dec 4, 2023
1 parent 81f9c15 commit d1713dc
Show file tree
Hide file tree
Showing 11 changed files with 240 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.snowflake.kafka.connector.internal;

import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.*;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_RECORD_COUNT;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_SIZE_BYTES;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_SUB_DOMAIN;
import static org.apache.kafka.common.record.TimestampType.NO_TIMESTAMP_TYPE;

import com.codahale.metrics.Histogram;
Expand Down Expand Up @@ -660,7 +662,7 @@ private SinkRecord handleNativeRecord(SinkRecord record, boolean isKey) {
Schema schema = isKey ? record.keySchema() : record.valueSchema();
Object content = isKey ? record.key() : record.value();
try {
newSFContent = new SnowflakeRecordContent(schema, content);
newSFContent = new SnowflakeRecordContent(schema, content, false);
} catch (Exception e) {
LOGGER.error("Native content parser error:\n{}", e.getMessage());
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@
import java.util.Set;
import javax.annotation.Nonnull;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -115,7 +119,7 @@ static Map<String, String> getColumnTypes(SinkRecord record, List<String> column
}
Map<String, String> columnToType = new HashMap<>();
Map<String, String> schemaMap = getSchemaMapFromRecord(record);
JsonNode recordNode = RecordService.convertToJson(record.valueSchema(), record.value());
JsonNode recordNode = RecordService.convertToJson(record.valueSchema(), record.value(), true);
Set<String> columnNamesSet = new HashSet<>(columnNames);

Iterator<Map.Entry<String, JsonNode>> fields = recordNode.fields();
Expand Down Expand Up @@ -154,7 +158,15 @@ private static Map<String, String> getSchemaMapFromRecord(SinkRecord record) {
Schema schema = record.valueSchema();
if (schema != null && schema.fields() != null) {
for (Field field : schema.fields()) {
schemaMap.put(field.name(), convertToSnowflakeType(field.schema().type()));
String snowflakeType = convertToSnowflakeType(field.schema().type(), field.schema().name());
LOGGER.info(
"Got the snowflake data type for field:{}, schemaName:{}, kafkaType:{},"
+ " snowflakeType:{}",
field.name(),
field.schema().name(),
field.schema().type(),
snowflakeType);
schemaMap.put(field.name(), snowflakeType);
}
}
return schemaMap;
Expand All @@ -167,7 +179,8 @@ private static String inferDataTypeFromJsonObject(JsonNode value) {
// only when the type of the value is unrecognizable for JAVA
throw SnowflakeErrors.ERROR_5021.getException("class: " + value.getClass());
}
return convertToSnowflakeType(schemaType);
// Passing null to schemaName when there is no schema information
return convertToSnowflakeType(schemaType, null);
}

/** Convert a json node type to kafka data type */
Expand Down Expand Up @@ -201,16 +214,26 @@ private static Type convertJsonNodeTypeToKafkaType(JsonNode value) {
}

/** Convert the kafka data type to Snowflake data type */
private static String convertToSnowflakeType(Type kafkaType) {
private static String convertToSnowflakeType(Type kafkaType, String schemaName) {
switch (kafkaType) {
case INT8:
return "BYTEINT";
case INT16:
return "SMALLINT";
case INT32:
return "INT";
if (Date.LOGICAL_NAME.equals(schemaName)) {
return "DATE";
} else if (Time.LOGICAL_NAME.equals(schemaName)) {
return "TIME(6)";
} else {
return "INT";
}
case INT64:
return "BIGINT";
if (Timestamp.LOGICAL_NAME.equals(schemaName)) {
return "TIMESTAMP(6)";
} else {
return "BIGINT";
}
case FLOAT32:
return "FLOAT";
case FLOAT64:
Expand All @@ -220,7 +243,11 @@ private static String convertToSnowflakeType(Type kafkaType) {
case STRING:
return "VARCHAR";
case BYTES:
return "BINARY";
if (Decimal.LOGICAL_NAME.equals(schemaName)) {
return "VARCHAR";
} else {
return "BINARY";
}
case ARRAY:
return "ARRAY";
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ private SinkRecord handleNativeRecord(SinkRecord record, boolean isKey) {
Schema schema = isKey ? record.keySchema() : record.valueSchema();
Object content = isKey ? record.key() : record.value();
try {
newSFContent = new SnowflakeRecordContent(schema, content);
newSFContent = new SnowflakeRecordContent(schema, content, true);
} catch (Exception e) {
LOGGER.error("Native content parser error:\n{}", e.getMessage());
try {
Expand Down Expand Up @@ -778,7 +778,7 @@ private void handleInsertRowsFailures(
}
}

// TODO: SNOW-529755 POLL committed offsets in backgraound thread
// TODO: SNOW-529755 POLL committed offsets in background thread

/**
* Get committed offset from Snowflake. It does an HTTP call internally to find out what was the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public class RecordService {

public static final ThreadLocal<SimpleDateFormat> TIME_FORMAT =
ThreadLocal.withInitial(() -> new SimpleDateFormat("HH:mm:ss.SSSZ"));
public static final ThreadLocal<SimpleDateFormat> TIME_FORMAT_STREAMING =
ThreadLocal.withInitial(() -> new SimpleDateFormat("HH:mm:ss.SSSXXX"));
static final int MAX_SNOWFLAKE_NUMBER_PRECISION = 38;

// This class is designed to work with empty metadata config map
Expand Down Expand Up @@ -360,7 +362,7 @@ void putKey(SinkRecord record, ObjectNode meta) {
static JsonNode parseHeaders(Headers headers) {
ObjectNode result = MAPPER.createObjectNode();
for (Header header : headers) {
result.set(header.key(), convertToJson(header.schema(), header.value()));
result.set(header.key(), convertToJson(header.schema(), header.value(), false));
}
return result;
}
Expand All @@ -371,15 +373,17 @@ static JsonNode parseHeaders(Headers headers) {
*
* @param schema schema of the object
* @param logicalValue object to be converted
* @param isStreaming indicates whether this is part of snowpipe streaming
* @return a JsonNode of the object
*/
public static JsonNode convertToJson(Schema schema, Object logicalValue) {
public static JsonNode convertToJson(Schema schema, Object logicalValue, boolean isStreaming) {
if (logicalValue == null) {
if (schema
== null) // Any schema is valid and we don't have a default, so treat this as an optional
// schema
return null;
if (schema.defaultValue() != null) return convertToJson(schema, schema.defaultValue());
if (schema.defaultValue() != null)
return convertToJson(schema, schema.defaultValue(), isStreaming);
if (schema.isOptional()) return JsonNodeFactory.instance.nullNode();
throw SnowflakeErrors.ERROR_5015.getException(
"Conversion error: null value for field that is required and has no default value");
Expand Down Expand Up @@ -415,8 +419,9 @@ public static JsonNode convertToJson(Schema schema, Object logicalValue) {
ISO_DATE_TIME_FORMAT.get().format((java.util.Date) value));
}
if (schema != null && Time.LOGICAL_NAME.equals(schema.name())) {
return JsonNodeFactory.instance.textNode(
TIME_FORMAT.get().format((java.util.Date) value));
ThreadLocal<SimpleDateFormat> format =
isStreaming ? TIME_FORMAT_STREAMING : TIME_FORMAT;
return JsonNodeFactory.instance.textNode(format.get().format((java.util.Date) value));
}
return JsonNodeFactory.instance.numberNode((Integer) value);
case INT64:
Expand Down Expand Up @@ -473,7 +478,7 @@ else if (value instanceof ByteBuffer) {
ArrayNode list = JsonNodeFactory.instance.arrayNode();
for (Object elem : collection) {
Schema valueSchema = schema == null ? null : schema.valueSchema();
JsonNode fieldValue = convertToJson(valueSchema, elem);
JsonNode fieldValue = convertToJson(valueSchema, elem, isStreaming);
list.add(fieldValue);
}
return list;
Expand Down Expand Up @@ -503,8 +508,8 @@ else if (value instanceof ByteBuffer) {
for (Map.Entry<?, ?> entry : map.entrySet()) {
Schema keySchema = schema == null ? null : schema.keySchema();
Schema valueSchema = schema == null ? null : schema.valueSchema();
JsonNode mapKey = convertToJson(keySchema, entry.getKey());
JsonNode mapValue = convertToJson(valueSchema, entry.getValue());
JsonNode mapKey = convertToJson(keySchema, entry.getKey(), isStreaming);
JsonNode mapValue = convertToJson(valueSchema, entry.getValue(), isStreaming);

if (objectMode) obj.set(mapKey.asText(), mapValue);
else list.add(JsonNodeFactory.instance.arrayNode().add(mapKey).add(mapValue));
Expand All @@ -518,7 +523,7 @@ else if (value instanceof ByteBuffer) {
throw SnowflakeErrors.ERROR_5015.getException("Mismatching schema.");
ObjectNode obj = JsonNodeFactory.instance.objectNode();
for (Field field : schema.fields()) {
obj.set(field.name(), convertToJson(field.schema(), struct.get(field)));
obj.set(field.name(), convertToJson(field.schema(), struct.get(field), isStreaming));
}
return obj;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ public SnowflakeRecordContent() {
*
* @param schema schema of the object
* @param data object produced by native avro/json converters
* @param isStreaming indicates whether this is part of snowpipe streaming
*/
public SnowflakeRecordContent(Schema schema, Object data) {
public SnowflakeRecordContent(Schema schema, Object data, boolean isStreaming) {
this.content = new JsonNode[1];
this.schemaID = NON_AVRO_SCHEMA;
this.content[0] = RecordService.convertToJson(schema, data);
this.content[0] = RecordService.convertToJson(schema, data, isStreaming);
this.isBroken = false;
this.brokenData = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,8 @@ public void testConnectJsonConverter_MapInt64() throws JsonProcessingException {
jsonMap.put("test", Integer.MAX_VALUE);
SchemaAndValue schemaAndValue =
jsonConverter.toConnectData("test", mapper.writeValueAsBytes(jsonMap));
JsonNode result = RecordService.convertToJson(schemaAndValue.schema(), schemaAndValue.value());
JsonNode result =
RecordService.convertToJson(schemaAndValue.schema(), schemaAndValue.value(), false);

ObjectNode expected = mapper.createObjectNode();
expected.put("test", Integer.MAX_VALUE);
Expand Down Expand Up @@ -332,7 +333,7 @@ private boolean testSimpleDataFormat_jsonConverter_thread_safe() {
SchemaAndValue schemaInputValue = jsonConverter.toConnectData("test", value.getBytes());

JsonNode result =
RecordService.convertToJson(schemaInputValue.schema(), schemaInputValue.value());
RecordService.convertToJson(schemaInputValue.schema(), schemaInputValue.value(), false);
System.out.println("Record Service result:" + result + " Thread :" + Thread.currentThread());

String exptectedDateTimeFormatStr =
Expand All @@ -350,7 +351,8 @@ public void testConnectJsonConverter_MapBigDecimalExceedsMaxPrecision()
jsonMap.put("test", new BigDecimal("999999999999999999999999999999999999999"));
SchemaAndValue schemaAndValue =
jsonConverter.toConnectData("test", mapper.writeValueAsBytes(jsonMap));
JsonNode result = RecordService.convertToJson(schemaAndValue.schema(), schemaAndValue.value());
JsonNode result =
RecordService.convertToJson(schemaAndValue.schema(), schemaAndValue.value(), false);

ObjectNode expected = mapper.createObjectNode();
expected.put("test", new BigDecimal("999999999999999999999999999999999999999"));
Expand All @@ -367,7 +369,8 @@ public void testConnectSimpleHeaderConverter_MapDateAndOtherTypes()
SchemaAndValue schemaAndValue =
headerConverter.toConnectHeader(
"test", "h1", rawHeader.getBytes(StandardCharsets.US_ASCII));
JsonNode result = RecordService.convertToJson(schemaAndValue.schema(), schemaAndValue.value());
JsonNode result =
RecordService.convertToJson(schemaAndValue.schema(), schemaAndValue.value(), false);

ObjectNode expected = mapper.createObjectNode();
long expectedTimestampValue =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@
import java.io.IOException;
import java.math.BigDecimal;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.record.TimestampType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void test() throws IOException {
.put("map", Collections.singletonMap("field", 1))
.put("mapNonStringKeys", Collections.singletonMap(1, 1));

content = new SnowflakeRecordContent(schema, original);
content = new SnowflakeRecordContent(schema, original, false);
assert content
.getData()[0]
.toString()
Expand All @@ -117,7 +117,7 @@ public void test() throws IOException {
"{\"int8\":12,\"int16\":12,\"int32\":12,\"int64\":12,\"float32\":12.2,\"float64\":12.2,\"boolean\":true,\"string\":\"foo\",\"bytes\":\"Zm9v\",\"array\":[\"a\",\"b\",\"c\"],\"map\":{\"field\":1},\"mapNonStringKeys\":[[1,1]]}");
Map<String, Object> jsonMap =
mapper.convertValue(jsonObject, new TypeReference<Map<String, Object>>() {});
content = new SnowflakeRecordContent(null, jsonMap);
content = new SnowflakeRecordContent(null, jsonMap, false);
assert content
.getData()[0]
.toString()
Expand Down Expand Up @@ -206,24 +206,22 @@ public void testWrongKeyType() {

@Test(expected = SnowflakeKafkaConnectorException.class)
public void testConvertToJsonEmptyValue() {
assert RecordService.convertToJson(null, null) == null;

Schema schema = SchemaBuilder.int32().optional().defaultValue(123).build();
assert RecordService.convertToJson(schema, null).toString().equals("123");
assert RecordService.convertToJson(schema, null, false).toString().equals("123");

schema = SchemaBuilder.int32().build();
RecordService.convertToJson(schema, null);
RecordService.convertToJson(schema, null, false);
}

@Test(expected = SnowflakeKafkaConnectorException.class)
public void testConvertToJsonNonOptional() {
Schema schema = SchemaBuilder.int32().build();
RecordService.convertToJson(schema, null);
RecordService.convertToJson(schema, null, false);
}

@Test(expected = SnowflakeKafkaConnectorException.class)
public void testConvertToJsonNoSchemaType() {
RecordService.convertToJson(null, new SnowflakeJsonSchema());
RecordService.convertToJson(null, new SnowflakeJsonSchema(), false);
}

@Test
Expand All @@ -233,7 +231,7 @@ public void testConvertToJsonReadOnlyByteBuffer() {
String expected = "\"" + Base64.getEncoder().encodeToString(original.getBytes()) + "\"";
ByteBuffer buffer = ByteBuffer.wrap(original.getBytes()).asReadOnlyBuffer();
Schema schema = SchemaBuilder.bytes().build();
assert RecordService.convertToJson(schema, buffer).toString().equals(expected);
assert RecordService.convertToJson(schema, buffer, false).toString().equals(expected);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"name": "SNOWFLAKE_CONNECTOR_NAME",
"config": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"topics": "SNOWFLAKE_TEST_TOPIC0,SNOWFLAKE_TEST_TOPIC1",
"snowflake.topic2table.map": "SNOWFLAKE_TEST_TOPIC0:SNOWFLAKE_CONNECTOR_NAME,SNOWFLAKE_TEST_TOPIC1:SNOWFLAKE_CONNECTOR_NAME",
"tasks.max": "1",
"buffer.flush.time": "10",
"buffer.count.records": "100",
"buffer.size.bytes": "5000000",
"snowflake.url.name": "SNOWFLAKE_HOST",
"snowflake.user.name": "SNOWFLAKE_USER",
"snowflake.private.key": "SNOWFLAKE_PRIVATE_KEY",
"snowflake.database.name": "SNOWFLAKE_DATABASE",
"snowflake.schema.name": "SNOWFLAKE_SCHEMA",
"snowflake.role.name": "SNOWFLAKE_ROLE",
"snowflake.ingestion.method": "SNOWPIPE_STREAMING",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY",
"value.converter.schemas.enable": "false",
"jmx": "true",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "DLQ_TOPIC",
"errors.deadletterqueue.topic.replication.factor": 1,
"snowflake.enable.schematization": true
}
}
Loading

0 comments on commit d1713dc

Please sign in to comment.