Skip to content

Commit

Permalink
merge main
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-rcheng committed Nov 6, 2023
2 parents 6583b55 + 7edb215 commit 645d703
Show file tree
Hide file tree
Showing 13 changed files with 277 additions and 53 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.snowflake.kafka.connector.internal;

import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.*;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_RECORD_COUNT;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_SIZE_BYTES;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_SUB_DOMAIN;
import static org.apache.kafka.common.record.TimestampType.NO_TIMESTAMP_TYPE;

import com.codahale.metrics.Histogram;
Expand Down Expand Up @@ -660,7 +662,7 @@ private SinkRecord handleNativeRecord(SinkRecord record, boolean isKey) {
Schema schema = isKey ? record.keySchema() : record.valueSchema();
Object content = isKey ? record.key() : record.value();
try {
newSFContent = new SnowflakeRecordContent(schema, content);
newSFContent = new SnowflakeRecordContent(schema, content, false);
} catch (Exception e) {
LOGGER.error("Native content parser error:\n{}", e.getMessage());
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@
import java.util.Set;
import javax.annotation.Nonnull;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -115,7 +119,7 @@ static Map<String, String> getColumnTypes(SinkRecord record, List<String> column
}
Map<String, String> columnToType = new HashMap<>();
Map<String, String> schemaMap = getSchemaMapFromRecord(record);
JsonNode recordNode = RecordService.convertToJson(record.valueSchema(), record.value());
JsonNode recordNode = RecordService.convertToJson(record.valueSchema(), record.value(), true);
Set<String> columnNamesSet = new HashSet<>(columnNames);

Iterator<Map.Entry<String, JsonNode>> fields = recordNode.fields();
Expand Down Expand Up @@ -154,7 +158,15 @@ private static Map<String, String> getSchemaMapFromRecord(SinkRecord record) {
Schema schema = record.valueSchema();
if (schema != null && schema.fields() != null) {
for (Field field : schema.fields()) {
schemaMap.put(field.name(), convertToSnowflakeType(field.schema().type()));
String snowflakeType = convertToSnowflakeType(field.schema().type(), field.schema().name());
LOGGER.info(
"Got the snowflake data type for field:{}, schemaName:{}, kafkaType:{},"
+ " snowflakeType:{}",
field.name(),
field.schema().name(),
field.schema().type(),
snowflakeType);
schemaMap.put(field.name(), snowflakeType);
}
}
return schemaMap;
Expand All @@ -167,7 +179,8 @@ private static String inferDataTypeFromJsonObject(JsonNode value) {
// only when the type of the value is unrecognizable for JAVA
throw SnowflakeErrors.ERROR_5021.getException("class: " + value.getClass());
}
return convertToSnowflakeType(schemaType);
// Passing null to schemaName when there is no schema information
return convertToSnowflakeType(schemaType, null);
}

/** Convert a json node type to kafka data type */
Expand Down Expand Up @@ -201,16 +214,26 @@ private static Type convertJsonNodeTypeToKafkaType(JsonNode value) {
}

/** Convert the kafka data type to Snowflake data type */
private static String convertToSnowflakeType(Type kafkaType) {
private static String convertToSnowflakeType(Type kafkaType, String schemaName) {
switch (kafkaType) {
case INT8:
return "BYTEINT";
case INT16:
return "SMALLINT";
case INT32:
return "INT";
if (Date.LOGICAL_NAME.equals(schemaName)) {
return "DATE";
} else if (Time.LOGICAL_NAME.equals(schemaName)) {
return "TIME(6)";
} else {
return "INT";
}
case INT64:
return "BIGINT";
if (Timestamp.LOGICAL_NAME.equals(schemaName)) {
return "TIMESTAMP(6)";
} else {
return "BIGINT";
}
case FLOAT32:
return "FLOAT";
case FLOAT64:
Expand All @@ -220,7 +243,11 @@ private static String convertToSnowflakeType(Type kafkaType) {
case STRING:
return "VARCHAR";
case BYTES:
return "BINARY";
if (Decimal.LOGICAL_NAME.equals(schemaName)) {
return "VARCHAR";
} else {
return "BINARY";
}
case ARRAY:
return "ARRAY";
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ private SinkRecord handleNativeRecord(SinkRecord record, boolean isKey) {
Schema schema = isKey ? record.keySchema() : record.valueSchema();
Object content = isKey ? record.key() : record.value();
try {
newSFContent = new SnowflakeRecordContent(schema, content);
newSFContent = new SnowflakeRecordContent(schema, content, true);
} catch (Exception e) {
LOGGER.error("Native content parser error:\n{}", e.getMessage());
try {
Expand Down Expand Up @@ -778,7 +778,7 @@ private void handleInsertRowsFailures(
}
}

// TODO: SNOW-529755 POLL committed offsets in backgraound thread
// TODO: SNOW-529755 POLL committed offsets in background thread

/**
* Get committed offset from Snowflake. It does an HTTP call internally to find out what was the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -88,6 +86,8 @@ public class RecordService {

public static final ThreadLocal<SimpleDateFormat> TIME_FORMAT =
ThreadLocal.withInitial(() -> new SimpleDateFormat("HH:mm:ss.SSSZ"));
public static final ThreadLocal<SimpleDateFormat> TIME_FORMAT_STREAMING =
ThreadLocal.withInitial(() -> new SimpleDateFormat("HH:mm:ss.SSSXXX"));
static final int MAX_SNOWFLAKE_NUMBER_PRECISION = 38;

// This class is designed to work with empty metadata config map
Expand Down Expand Up @@ -287,14 +287,7 @@ private Map<String, Object> getMapFromJsonNodeForStreamingIngest(JsonNode node)
String columnName = columnNames.next();
JsonNode columnNode = node.get(columnName);
Object columnValue;
if (columnNode.isArray()) {
List<String> itemList = new ArrayList<>();
ArrayNode arrayNode = (ArrayNode) columnNode;
for (JsonNode e : arrayNode) {
itemList.add(e.isTextual() ? e.textValue() : MAPPER.writeValueAsString(e));
}
columnValue = itemList;
} else if (columnNode.isTextual()) {
if (columnNode.isTextual()) {
columnValue = columnNode.textValue();
} else if (columnNode.isNull()) {
columnValue = null;
Expand Down Expand Up @@ -369,7 +362,7 @@ void putKey(SinkRecord record, ObjectNode meta) {
static JsonNode parseHeaders(Headers headers) {
ObjectNode result = MAPPER.createObjectNode();
for (Header header : headers) {
result.set(header.key(), convertToJson(header.schema(), header.value()));
result.set(header.key(), convertToJson(header.schema(), header.value(), false));
}
return result;
}
Expand All @@ -380,15 +373,17 @@ static JsonNode parseHeaders(Headers headers) {
*
* @param schema schema of the object
* @param logicalValue object to be converted
* @param isStreaming indicates whether this is part of snowpipe streaming
* @return a JsonNode of the object
*/
public static JsonNode convertToJson(Schema schema, Object logicalValue) {
public static JsonNode convertToJson(Schema schema, Object logicalValue, boolean isStreaming) {
if (logicalValue == null) {
if (schema
== null) // Any schema is valid and we don't have a default, so treat this as an optional
// schema
return null;
if (schema.defaultValue() != null) return convertToJson(schema, schema.defaultValue());
if (schema.defaultValue() != null)
return convertToJson(schema, schema.defaultValue(), isStreaming);
if (schema.isOptional()) return JsonNodeFactory.instance.nullNode();
throw SnowflakeErrors.ERROR_5015.getException(
"Conversion error: null value for field that is required and has no default value");
Expand Down Expand Up @@ -424,8 +419,9 @@ public static JsonNode convertToJson(Schema schema, Object logicalValue) {
ISO_DATE_TIME_FORMAT.get().format((java.util.Date) value));
}
if (schema != null && Time.LOGICAL_NAME.equals(schema.name())) {
return JsonNodeFactory.instance.textNode(
TIME_FORMAT.get().format((java.util.Date) value));
ThreadLocal<SimpleDateFormat> format =
isStreaming ? TIME_FORMAT_STREAMING : TIME_FORMAT;
return JsonNodeFactory.instance.textNode(format.get().format((java.util.Date) value));
}
return JsonNodeFactory.instance.numberNode((Integer) value);
case INT64:
Expand Down Expand Up @@ -482,7 +478,7 @@ else if (value instanceof ByteBuffer) {
ArrayNode list = JsonNodeFactory.instance.arrayNode();
for (Object elem : collection) {
Schema valueSchema = schema == null ? null : schema.valueSchema();
JsonNode fieldValue = convertToJson(valueSchema, elem);
JsonNode fieldValue = convertToJson(valueSchema, elem, isStreaming);
list.add(fieldValue);
}
return list;
Expand Down Expand Up @@ -512,8 +508,8 @@ else if (value instanceof ByteBuffer) {
for (Map.Entry<?, ?> entry : map.entrySet()) {
Schema keySchema = schema == null ? null : schema.keySchema();
Schema valueSchema = schema == null ? null : schema.valueSchema();
JsonNode mapKey = convertToJson(keySchema, entry.getKey());
JsonNode mapValue = convertToJson(valueSchema, entry.getValue());
JsonNode mapKey = convertToJson(keySchema, entry.getKey(), isStreaming);
JsonNode mapValue = convertToJson(valueSchema, entry.getValue(), isStreaming);

if (objectMode) obj.set(mapKey.asText(), mapValue);
else list.add(JsonNodeFactory.instance.arrayNode().add(mapKey).add(mapValue));
Expand All @@ -527,7 +523,7 @@ else if (value instanceof ByteBuffer) {
throw SnowflakeErrors.ERROR_5015.getException("Mismatching schema.");
ObjectNode obj = JsonNodeFactory.instance.objectNode();
for (Field field : schema.fields()) {
obj.set(field.name(), convertToJson(field.schema(), struct.get(field)));
obj.set(field.name(), convertToJson(field.schema(), struct.get(field), isStreaming));
}
return obj;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ public SnowflakeRecordContent() {
*
* @param schema schema of the object
* @param data object produced by native avro/json converters
* @param isStreaming indicates whether this is part of snowpipe streaming
*/
public SnowflakeRecordContent(Schema schema, Object data) {
public SnowflakeRecordContent(Schema schema, Object data, boolean isStreaming) {
this.content = new JsonNode[1];
this.schemaID = NON_AVRO_SCHEMA;
this.content[0] = RecordService.convertToJson(schema, data);
this.content[0] = RecordService.convertToJson(schema, data, isStreaming);
this.isBroken = false;
this.brokenData = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ public class SchematizationTestUtils {
SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("RATING_FLOAT32", "FLOAT");
SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("RATING_FLOAT64", "FLOAT");
SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("APPROVAL", "BOOLEAN");
SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("INFO_ARRAY", "ARRAY");
SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("INFO_ARRAY_STRING", "ARRAY");
SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("INFO_ARRAY_INT", "ARRAY");
SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("INFO_ARRAY_JSON", "ARRAY");
SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("INFO_MAP", "VARIANT");
SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("RECORD_METADATA", "VARIANT");
}
Expand Down Expand Up @@ -54,7 +56,11 @@ public class SchematizationTestUtils {
CONTENT_FOR_AVRO_TABLE_CREATION.put("RATING_FLOAT32", 0.99);
CONTENT_FOR_AVRO_TABLE_CREATION.put("RATING_FLOAT64", 0.99);
CONTENT_FOR_AVRO_TABLE_CREATION.put("APPROVAL", true);
CONTENT_FOR_AVRO_TABLE_CREATION.put("INFO_ARRAY", "[\"a\",\"b\"]");
CONTENT_FOR_AVRO_TABLE_CREATION.put("INFO_ARRAY_STRING", "[\"a\",\"b\"]");
CONTENT_FOR_AVRO_TABLE_CREATION.put("INFO_ARRAY_INT", "[1,2]");
CONTENT_FOR_AVRO_TABLE_CREATION.put(
"INFO_ARRAY_JSON",
"[null,\"{\\\"a\\\":1,\\\"b\\\":null,\\\"c\\\":null,\\\"d\\\":\\\"89asda9s0a\\\"}\"]");
CONTENT_FOR_AVRO_TABLE_CREATION.put("INFO_MAP", "{\"field\":3}");
CONTENT_FOR_AVRO_TABLE_CREATION.put("RECORD_METADATA", "RECORD_METADATA_PLACE_HOLDER");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1230,7 +1230,9 @@ public void testSchematizationWithTableCreationAndAvroInput() throws Exception {
.field("rating_float32", Schema.FLOAT32_SCHEMA)
.field("rating_float64", Schema.FLOAT64_SCHEMA)
.field("approval", Schema.BOOLEAN_SCHEMA)
.field("info_array", SchemaBuilder.array(Schema.STRING_SCHEMA).build())
.field("info_array_string", SchemaBuilder.array(Schema.STRING_SCHEMA).build())
.field("info_array_int", SchemaBuilder.array(Schema.INT32_SCHEMA).build())
.field("info_array_json", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).build())
.field(
"info_map", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build());

Expand All @@ -1244,7 +1246,11 @@ public void testSchematizationWithTableCreationAndAvroInput() throws Exception {
.put("rating_float32", 0.99f)
.put("rating_float64", 0.99d)
.put("approval", true)
.put("info_array", Arrays.asList("a", "b"))
.put("info_array_string", Arrays.asList("a", "b"))
.put("info_array_int", Arrays.asList(1, 2))
.put(
"info_array_json",
Arrays.asList(null, "{\"a\": 1, \"b\": null, \"c\": null, \"d\": \"89asda9s0a\"}"))
.put("info_map", Collections.singletonMap("field", 3));

SchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,8 @@ public void testConnectJsonConverter_MapInt64() throws JsonProcessingException {
jsonMap.put("test", Integer.MAX_VALUE);
SchemaAndValue schemaAndValue =
jsonConverter.toConnectData("test", mapper.writeValueAsBytes(jsonMap));
JsonNode result = RecordService.convertToJson(schemaAndValue.schema(), schemaAndValue.value());
JsonNode result =
RecordService.convertToJson(schemaAndValue.schema(), schemaAndValue.value(), false);

ObjectNode expected = mapper.createObjectNode();
expected.put("test", Integer.MAX_VALUE);
Expand Down Expand Up @@ -332,7 +333,7 @@ private boolean testSimpleDataFormat_jsonConverter_thread_safe() {
SchemaAndValue schemaInputValue = jsonConverter.toConnectData("test", value.getBytes());

JsonNode result =
RecordService.convertToJson(schemaInputValue.schema(), schemaInputValue.value());
RecordService.convertToJson(schemaInputValue.schema(), schemaInputValue.value(), false);
System.out.println("Record Service result:" + result + " Thread :" + Thread.currentThread());

String exptectedDateTimeFormatStr =
Expand All @@ -350,7 +351,8 @@ public void testConnectJsonConverter_MapBigDecimalExceedsMaxPrecision()
jsonMap.put("test", new BigDecimal("999999999999999999999999999999999999999"));
SchemaAndValue schemaAndValue =
jsonConverter.toConnectData("test", mapper.writeValueAsBytes(jsonMap));
JsonNode result = RecordService.convertToJson(schemaAndValue.schema(), schemaAndValue.value());
JsonNode result =
RecordService.convertToJson(schemaAndValue.schema(), schemaAndValue.value(), false);

ObjectNode expected = mapper.createObjectNode();
expected.put("test", new BigDecimal("999999999999999999999999999999999999999"));
Expand All @@ -367,7 +369,8 @@ public void testConnectSimpleHeaderConverter_MapDateAndOtherTypes()
SchemaAndValue schemaAndValue =
headerConverter.toConnectHeader(
"test", "h1", rawHeader.getBytes(StandardCharsets.US_ASCII));
JsonNode result = RecordService.convertToJson(schemaAndValue.schema(), schemaAndValue.value());
JsonNode result =
RecordService.convertToJson(schemaAndValue.schema(), schemaAndValue.value(), false);

ObjectNode expected = mapper.createObjectNode();
long expectedTimestampValue =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@
import java.io.IOException;
import java.math.BigDecimal;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.record.TimestampType;
Expand Down
Loading

0 comments on commit 645d703

Please sign in to comment.