From 8a7a30575ff997c8ba5d2c809498174a6fbbb52f Mon Sep 17 00:00:00 2001 From: Wojciech Trefon Date: Fri, 25 Oct 2024 08:56:13 +0200 Subject: [PATCH] SNOW-1737840: Adapt record mapping in RecordService (#969) --- .../com/snowflake/kafka/connector/Utils.java | 5 + .../internal/SnowflakeSinkServiceV1.java | 3 +- .../BufferedTopicPartitionChannel.java | 7 +- .../DirectTopicPartitionChannel.java | 7 +- .../streaming/SnowflakeSinkServiceV2.java | 10 +- .../IcebergTableStreamingRecordMapper.java | 86 ++++ .../connector/records/RecordService.java | 168 +------- .../records/RecordServiceFactory.java | 18 + .../SnowflakeTableStreamingRecordMapper.java | 64 +++ .../records/StreamingRecordMapper.java | 42 ++ .../SnowflakeStreamingSinkTaskBuilder.java | 4 +- .../BufferedTopicPartitionChannelTest.java | 4 +- .../streaming/TopicPartitionChannelTest.java | 31 +- .../records/AbstractMetaColumnTest.java | 8 +- .../kafka/connector/records/HeaderTest.java | 2 +- ...IcebergTableStreamingRecordMapperTest.java | 402 ++++++++++++++++++ .../connector/records/ProcessRecordTest.java | 2 +- .../connector/records/RecordContentTest.java | 15 +- .../records/SnowpipeMetaColumnTest.java | 2 +- .../SnowpipeStreamingMetaColumnTest.java | 7 +- .../streaming/iceberg/IcebergIngestionIT.java | 85 +--- .../IcebergIngestionNoSchemaEvolutionIT.java | 42 +- .../IcebergIngestionSchemaEvolutionIT.java | 12 +- .../iceberg/sql/ComplexJsonRecord.java | 381 +++++++++++++++++ .../streaming/iceberg/sql/MetadataRecord.java | 10 +- .../iceberg/sql/PrimitiveJsonRecord.java | 70 ++- 26 files changed, 1208 insertions(+), 279 deletions(-) create mode 100644 src/main/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapper.java create mode 100644 src/main/java/com/snowflake/kafka/connector/records/RecordServiceFactory.java create mode 100644 src/main/java/com/snowflake/kafka/connector/records/SnowflakeTableStreamingRecordMapper.java create mode 100644 src/main/java/com/snowflake/kafka/connector/records/StreamingRecordMapper.java create mode 100644 src/test/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapperTest.java create mode 100644 src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/ComplexJsonRecord.java diff --git a/src/main/java/com/snowflake/kafka/connector/Utils.java b/src/main/java/com/snowflake/kafka/connector/Utils.java index 16db39388..7fb138fd3 100644 --- a/src/main/java/com/snowflake/kafka/connector/Utils.java +++ b/src/main/java/com/snowflake/kafka/connector/Utils.java @@ -407,6 +407,11 @@ public static boolean isIcebergEnabled(Map config) { return Boolean.parseBoolean(config.get(ICEBERG_ENABLED)); } + public static boolean isSchematizationEnabled(Map config) { + return Boolean.parseBoolean( + config.get(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG)); + } + /** * @param config config with applied default values * @return role specified in rhe config diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java index 616a90bd1..14c599ff7 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java @@ -20,6 +20,7 @@ import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryPipeStatus; import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService; import com.snowflake.kafka.connector.records.RecordService; +import com.snowflake.kafka.connector.records.RecordServiceFactory; import com.snowflake.kafka.connector.records.SnowflakeJsonSchema; import com.snowflake.kafka.connector.records.SnowflakeMetadataConfig; import com.snowflake.kafka.connector.records.SnowflakeRecordContent; @@ -112,7 +113,7 @@ class SnowflakeSinkServiceV1 implements SnowflakeSinkService { this.conn = conn; isStopped = false; this.telemetryService = conn.getTelemetryClient(); - this.recordService = new RecordService(); + this.recordService = RecordServiceFactory.createRecordService(false, false); this.topic2TableMap = new HashMap<>(); // Setting the default value in constructor diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java index 3ec0d5744..1099c5ddc 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java @@ -31,6 +31,7 @@ import com.snowflake.kafka.connector.internal.streaming.telemetry.SnowflakeTelemetryChannelStatus; import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService; import com.snowflake.kafka.connector.records.RecordService; +import com.snowflake.kafka.connector.records.RecordServiceFactory; import com.snowflake.kafka.connector.records.SnowflakeJsonSchema; import com.snowflake.kafka.connector.records.SnowflakeRecordContent; import dev.failsafe.Failsafe; @@ -207,7 +208,8 @@ public BufferedTopicPartitionChannel( kafkaRecordErrorReporter, sinkTaskContext, conn, - new RecordService(), + RecordServiceFactory.createRecordService( + false, Utils.isSchematizationEnabled(sfConnectorConfig)), telemetryService, false, null, @@ -278,8 +280,7 @@ public BufferedTopicPartitionChannel( !Strings.isNullOrEmpty(StreamingUtils.getDlqTopicName(this.sfConnectorConfig)); /* Schematization related properties */ - this.enableSchematization = - this.recordService.setAndGetEnableSchematizationFromConfig(sfConnectorConfig); + this.enableSchematization = Utils.isSchematizationEnabled(this.sfConnectorConfig); this.enableSchemaEvolution = this.enableSchematization && hasSchemaEvolutionPermission; this.schemaEvolutionService = schemaEvolutionService; diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java index 8a27314e2..28a56fe93 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java @@ -34,6 +34,7 @@ import com.snowflake.kafka.connector.internal.streaming.telemetry.SnowflakeTelemetryChannelStatus; import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService; import com.snowflake.kafka.connector.records.RecordService; +import com.snowflake.kafka.connector.records.RecordServiceFactory; import com.snowflake.kafka.connector.records.SnowflakeJsonSchema; import com.snowflake.kafka.connector.records.SnowflakeRecordContent; import dev.failsafe.Failsafe; @@ -178,7 +179,8 @@ public DirectTopicPartitionChannel( kafkaRecordErrorReporter, sinkTaskContext, conn, - new RecordService(), + RecordServiceFactory.createRecordService( + false, Utils.isSchematizationEnabled(sfConnectorConfig)), telemetryService, false, null, @@ -243,8 +245,7 @@ public DirectTopicPartitionChannel( !Strings.isNullOrEmpty(StreamingUtils.getDlqTopicName(this.sfConnectorConfig)); /* Schematization related properties */ - this.enableSchematization = - this.recordService.setAndGetEnableSchematizationFromConfig(sfConnectorConfig); + this.enableSchematization = Utils.isSchematizationEnabled(this.sfConnectorConfig); this.enableSchemaEvolution = this.enableSchematization && hasSchemaEvolutionPermission; this.schemaEvolutionService = schemaEvolutionService; diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java index c0d01ac56..171a9a34c 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java @@ -28,6 +28,7 @@ import com.snowflake.kafka.connector.internal.streaming.schemaevolution.snowflake.SnowflakeSchemaEvolutionService; import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService; import com.snowflake.kafka.connector.records.RecordService; +import com.snowflake.kafka.connector.records.RecordServiceFactory; import com.snowflake.kafka.connector.records.SnowflakeMetadataConfig; import com.snowflake.kafka.connector.streaming.iceberg.IcebergInitService; import com.snowflake.kafka.connector.streaming.iceberg.IcebergTableSchemaValidator; @@ -137,7 +138,10 @@ public SnowflakeSinkServiceV2( this.flushTimeSeconds = StreamingUtils.STREAMING_BUFFER_FLUSH_TIME_DEFAULT_SEC; this.conn = conn; this.telemetryService = conn.getTelemetryClient(); - this.recordService = new RecordService(); + boolean schematizationEnabled = Utils.isSchematizationEnabled(connectorConfig); + this.recordService = + RecordServiceFactory.createRecordService( + Utils.isIcebergEnabled(connectorConfig), schematizationEnabled); this.icebergTableSchemaValidator = new IcebergTableSchemaValidator(conn); this.icebergInitService = new IcebergInitService(conn); this.schemaEvolutionService = @@ -153,9 +157,7 @@ public SnowflakeSinkServiceV2( this.connectorConfig = connectorConfig; - this.enableSchematization = - this.recordService.setAndGetEnableSchematizationFromConfig(this.connectorConfig); - this.recordService.setIcebergEnabledFromConfig(this.connectorConfig); + this.enableSchematization = schematizationEnabled; this.closeChannelsInParallel = Optional.ofNullable(connectorConfig.get(SNOWPIPE_STREAMING_CLOSE_CHANNELS_IN_PARALLEL)) diff --git a/src/main/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapper.java b/src/main/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapper.java new file mode 100644 index 000000000..a368c4e44 --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapper.java @@ -0,0 +1,86 @@ +package com.snowflake.kafka.connector.records; + +import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_CONTENT; +import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_METADATA; +import static com.snowflake.kafka.connector.records.RecordService.*; + +import com.snowflake.kafka.connector.Utils; +import java.util.AbstractMap; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.stream.Collectors; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.type.TypeReference; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; + +class IcebergTableStreamingRecordMapper extends StreamingRecordMapper { + private static final TypeReference> OBJECTS_MAP_TYPE_REFERENCE = + new TypeReference>() {}; + + public IcebergTableStreamingRecordMapper( + ObjectMapper objectMapper, boolean schematizationEnabled) { + super(objectMapper, schematizationEnabled); + } + + @Override + public Map processSnowflakeRecord(SnowflakeTableRow row, boolean includeMetadata) + throws JsonProcessingException { + final Map streamingIngestRow = new HashMap<>(); + for (JsonNode node : row.getContent().getData()) { + if (schematizationEnabled) { + streamingIngestRow.putAll(getMapForSchematization(node)); + } else { + streamingIngestRow.put(TABLE_COLUMN_CONTENT, getMapForNoSchematization(node)); + } + } + if (includeMetadata) { + streamingIngestRow.put(TABLE_COLUMN_METADATA, getMapForMetadata(row.getMetadata())); + } + return streamingIngestRow; + } + + private Map getMapForNoSchematization(JsonNode node) { + return mapper.convertValue(node, OBJECTS_MAP_TYPE_REFERENCE); + } + + private Map getMapForSchematization(JsonNode node) { + // we need to quote the keys on the first level of the map as they are column names in the table + // the rest must stay as is as the nested objects are not column names but fields name with case + // sensitivity + return mapper.convertValue(node, OBJECTS_MAP_TYPE_REFERENCE).entrySet().stream() + .map( + entry -> + new AbstractMap.SimpleEntry<>( + Utils.quoteNameIfNeeded(entry.getKey()), entry.getValue())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + private Map getMapForMetadata(JsonNode metadataNode) + throws JsonProcessingException { + Map values = mapper.convertValue(metadataNode, OBJECTS_MAP_TYPE_REFERENCE); + // we don't want headers to be serialized as Map so we overwrite it as + // Map + Map headers = convertHeaders(metadataNode.findValue(HEADERS)); + values.put(HEADERS, headers); + return values; + } + + private Map convertHeaders(JsonNode headersNode) throws JsonProcessingException { + final Map headers = new HashMap<>(); + + if (headersNode == null || headersNode.isNull() || headersNode.isEmpty()) { + return headers; + } + + Iterator fields = headersNode.fieldNames(); + while (fields.hasNext()) { + String key = fields.next(); + JsonNode valueNode = headersNode.get(key); + String value = getTextualValue(valueNode); + headers.put(key, value); + } + return headers; + } +} diff --git a/src/main/java/com/snowflake/kafka/connector/records/RecordService.java b/src/main/java/com/snowflake/kafka/connector/records/RecordService.java index 16a8b5f47..1a92238e9 100644 --- a/src/main/java/com/snowflake/kafka/connector/records/RecordService.java +++ b/src/main/java/com/snowflake/kafka/connector/records/RecordService.java @@ -16,12 +16,7 @@ */ package com.snowflake.kafka.connector.records; -import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_CONTENT; -import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_METADATA; - -import com.google.common.annotations.VisibleForTesting; import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; -import com.snowflake.kafka.connector.Utils; import com.snowflake.kafka.connector.internal.KCLogger; import com.snowflake.kafka.connector.internal.SnowflakeErrors; import java.math.BigDecimal; @@ -31,8 +26,6 @@ import java.time.Instant; import java.util.Arrays; import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; import java.util.Map; import java.util.TimeZone; import javax.annotation.Nullable; @@ -41,7 +34,6 @@ import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ArrayNode; import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.JsonNodeFactory; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.NumericNode; import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.connect.data.ConnectSchema; @@ -63,7 +55,7 @@ public class RecordService { private final KCLogger LOGGER = new KCLogger(RecordService.class.getName()); - private static final ObjectMapper MAPPER = new ObjectMapper(); + private final ObjectMapper mapper; // deleted private to use these values in test static final String OFFSET = "offset"; @@ -77,9 +69,7 @@ public class RecordService { private static final String KEY_SCHEMA_ID = "key_schema_id"; static final String HEADERS = "headers"; - private boolean enableSchematization = false; - - private boolean icebergEnabled = false; + private final StreamingRecordMapper streamingRecordMapper; // For each task, we require a separate instance of SimpleDataFormat, since they are not // inherently thread safe @@ -103,54 +93,21 @@ public class RecordService { // This class is designed to work with empty metadata config map private SnowflakeMetadataConfig metadataConfig = new SnowflakeMetadataConfig(); - RecordService(Clock clock) { + RecordService(Clock clock, StreamingRecordMapper streamingRecordMapper, ObjectMapper mapper) { this.clock = clock; + this.streamingRecordMapper = streamingRecordMapper; + this.mapper = mapper; } /** Creates a record service with a UTC {@link Clock}. */ - public RecordService() { - this(Clock.systemUTC()); + RecordService(StreamingRecordMapper streamingRecordMapper, ObjectMapper mapper) { + this(Clock.systemUTC(), streamingRecordMapper, mapper); } public void setMetadataConfig(SnowflakeMetadataConfig metadataConfigIn) { metadataConfig = metadataConfigIn; } - /** - * extract enableSchematization from the connector config and set the value for the recordService - * - *

The extracted boolean is returned for external usage. - * - * @param connectorConfig the connector config map - * @return a boolean indicating whether schematization is enabled - */ - public boolean setAndGetEnableSchematizationFromConfig( - final Map connectorConfig) { - if (connectorConfig.containsKey(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG)) { - this.enableSchematization = - Boolean.parseBoolean( - connectorConfig.get(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG)); - } - return this.enableSchematization; - } - - public boolean setIcebergEnabledFromConfig(final Map connectorConfig) { - this.icebergEnabled = Utils.isIcebergEnabled(connectorConfig); - return this.icebergEnabled; - } - - /** - * Directly set the enableSchematization through param - * - *

This method is only for testing - * - * @param enableSchematization whether we should enable schematization or not - */ - @VisibleForTesting - public void setEnableSchematization(final boolean enableSchematization) { - this.enableSchematization = enableSchematization; - } - /** * process given SinkRecord, only support snowflake converters * @@ -175,7 +132,7 @@ private SnowflakeTableRow processRecord(SinkRecord record, @Nullable Instant con valueContent = (SnowflakeRecordContent) record.value(); } - ObjectNode meta = MAPPER.createObjectNode(); + ObjectNode meta = mapper.createObjectNode(); if (metadataConfig.topicFlag) { meta.put(TOPIC, record.topic()); } @@ -223,7 +180,7 @@ public String getProcessedRecordForSnowpipe(SinkRecord record) { record, /*connectorPushTime=*/ null); // ConnectorPushTime is not used for Snowpipe. StringBuilder buffer = new StringBuilder(); for (JsonNode node : row.content.getData()) { - ObjectNode data = MAPPER.createObjectNode(); + ObjectNode data = mapper.createObjectNode(); data.set(CONTENT, node); if (metadataConfig.allFlag) { data.set(META, row.metadata); @@ -252,100 +209,11 @@ public Map getProcessedRecordForStreamingIngest(SinkRecord recor throws JsonProcessingException { SnowflakeTableRow row = processRecord(record, clock.instant()); - if (icebergEnabled) { - return processIcebergRecord(row); - } else { - return processSnowflakeRecord(row); - } - } - - private Map processSnowflakeRecord(SnowflakeTableRow row) - throws JsonProcessingException { - final Map streamingIngestRow = new HashMap<>(); - for (JsonNode node : row.content.getData()) { - if (enableSchematization) { - streamingIngestRow.putAll(getMapFromJsonNodeForStreamingIngest(node)); - } else { - streamingIngestRow.put(TABLE_COLUMN_CONTENT, MAPPER.writeValueAsString(node)); - } - if (metadataConfig.allFlag) { - streamingIngestRow.put(TABLE_COLUMN_METADATA, MAPPER.writeValueAsString(row.metadata)); - } - } - return streamingIngestRow; - } - - private Map processIcebergRecord(SnowflakeTableRow row) - throws JsonProcessingException { - // TODO this not cover all cases, full implementation will be done in SNOW-1737840 - final Map streamingIngestRow = new HashMap<>(); - for (JsonNode node : row.content.getData()) { - if (enableSchematization) { - streamingIngestRow.putAll(getMapFromJsonNodeForStreamingIngest(node)); - } else { - streamingIngestRow.put(TABLE_COLUMN_CONTENT, getMapFromJsonNodeForIceberg(node)); - } - if (metadataConfig.allFlag) { - streamingIngestRow.put(TABLE_COLUMN_METADATA, getMapFromJsonNodeForIceberg(row.metadata)); - } - } - return streamingIngestRow; - } - - private Map getMapFromJsonNodeForStreamingIngest(JsonNode node) - throws JsonProcessingException { - return getMapFromJsonNodeForStreamingIngest(node, true); - } - - private Map getMapFromJsonNodeForIceberg(JsonNode node) - throws JsonProcessingException { - return getMapFromJsonNodeForStreamingIngest(node, false); - } - - private Map getMapFromJsonNodeForStreamingIngest( - JsonNode node, boolean quoteColumnName) throws JsonProcessingException { - final Map streamingIngestRow = new HashMap<>(); - - // return empty if tombstone record - if (node.isEmpty()) { - return streamingIngestRow; - } - - Iterator columnNames = node.fieldNames(); - while (columnNames.hasNext()) { - String columnName = columnNames.next(); - JsonNode columnNode = node.get(columnName); - Object columnValue; - if (columnNode.isTextual()) { - columnValue = columnNode.textValue(); - } else if (columnNode.isNull()) { - columnValue = null; - } else { - columnValue = writeValueAsStringOrNan(columnNode); - } - // while the value is always dumped into a string, the Streaming Ingest SDK - // will transform the value according to its type in the table - streamingIngestRow.put( - quoteColumnName ? Utils.quoteNameIfNeeded(columnName) : columnName, columnValue); - } - // Thrown an exception if the input JsonNode is not in the expected format - if (streamingIngestRow.isEmpty()) { - throw SnowflakeErrors.ERROR_0010.getException( - "Not able to convert node to Snowpipe Streaming input format"); - } - return streamingIngestRow; - } - - private String writeValueAsStringOrNan(JsonNode columnNode) throws JsonProcessingException { - if (columnNode instanceof NumericNode && ((NumericNode) columnNode).isNaN()) { - return "NaN"; - } else { - return MAPPER.writeValueAsString(columnNode); - } + return streamingRecordMapper.processSnowflakeRecord(row, metadataConfig.allFlag); } /** For now there are two columns one is content and other is metadata. Both are Json */ - private static class SnowflakeTableRow { + static class SnowflakeTableRow { // This can be a JsonNode but we will keep this as is. private final SnowflakeRecordContent content; private final JsonNode metadata; @@ -354,6 +222,14 @@ public SnowflakeTableRow(SnowflakeRecordContent content, JsonNode metadata) { this.content = content; this.metadata = metadata; } + + public SnowflakeRecordContent getContent() { + return content; + } + + public JsonNode getMetadata() { + return metadata; + } } void putKey(SinkRecord record, ObjectNode meta) { @@ -382,7 +258,7 @@ void putKey(SinkRecord record, ObjectNode meta) { if (keyData.length == 1) { meta.set(KEY, keyData[0]); } else { - ArrayNode keyNode = MAPPER.createArrayNode(); + ArrayNode keyNode = mapper.createArrayNode(); keyNode.addAll(Arrays.asList(keyData)); meta.set(KEY, keyNode); } @@ -397,8 +273,8 @@ void putKey(SinkRecord record, ObjectNode meta) { } } - static JsonNode parseHeaders(Headers headers) { - ObjectNode result = MAPPER.createObjectNode(); + private JsonNode parseHeaders(Headers headers) { + ObjectNode result = mapper.createObjectNode(); for (Header header : headers) { result.set(header.key(), convertToJson(header.schema(), header.value(), false)); } diff --git a/src/main/java/com/snowflake/kafka/connector/records/RecordServiceFactory.java b/src/main/java/com/snowflake/kafka/connector/records/RecordServiceFactory.java new file mode 100644 index 000000000..ab8d3deac --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/records/RecordServiceFactory.java @@ -0,0 +1,18 @@ +package com.snowflake.kafka.connector.records; + +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; + +public class RecordServiceFactory { + public static RecordService createRecordService( + boolean isIcebergEnabled, boolean enableSchematization) { + ObjectMapper objectMapper = new ObjectMapper(); + if (isIcebergEnabled) { + return new RecordService( + new IcebergTableStreamingRecordMapper(objectMapper, enableSchematization), objectMapper); + } else { + return new RecordService( + new SnowflakeTableStreamingRecordMapper(objectMapper, enableSchematization), + objectMapper); + } + } +} diff --git a/src/main/java/com/snowflake/kafka/connector/records/SnowflakeTableStreamingRecordMapper.java b/src/main/java/com/snowflake/kafka/connector/records/SnowflakeTableStreamingRecordMapper.java new file mode 100644 index 000000000..06e83c1c9 --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/records/SnowflakeTableStreamingRecordMapper.java @@ -0,0 +1,64 @@ +package com.snowflake.kafka.connector.records; + +import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_CONTENT; +import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_METADATA; + +import com.snowflake.kafka.connector.Utils; +import com.snowflake.kafka.connector.internal.SnowflakeErrors; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; + +class SnowflakeTableStreamingRecordMapper extends StreamingRecordMapper { + + public SnowflakeTableStreamingRecordMapper(ObjectMapper mapper, boolean schematizationEnabled) { + super(mapper, schematizationEnabled); + } + + @Override + public Map processSnowflakeRecord( + RecordService.SnowflakeTableRow row, boolean includeAllMetadata) + throws JsonProcessingException { + final Map streamingIngestRow = new HashMap<>(); + for (JsonNode node : row.getContent().getData()) { + if (schematizationEnabled) { + streamingIngestRow.putAll(getMapFromJsonNodeForStreamingIngest(node)); + } else { + streamingIngestRow.put(TABLE_COLUMN_CONTENT, mapper.writeValueAsString(node)); + } + } + if (includeAllMetadata) { + streamingIngestRow.put(TABLE_COLUMN_METADATA, mapper.writeValueAsString(row.getMetadata())); + } + return streamingIngestRow; + } + + private Map getMapFromJsonNodeForStreamingIngest(JsonNode node) + throws JsonProcessingException { + final Map streamingIngestRow = new HashMap<>(); + + // return empty if tombstone record + if (node.isEmpty()) { + return streamingIngestRow; + } + + Iterator columnNames = node.fieldNames(); + while (columnNames.hasNext()) { + String columnName = columnNames.next(); + JsonNode columnNode = node.get(columnName); + Object columnValue = getTextualValue(columnNode); + // while the value is always dumped into a string, the Streaming Ingest SDK + // will transform the value according to its type in the table + streamingIngestRow.put(Utils.quoteNameIfNeeded(columnName), columnValue); + } + // Thrown an exception if the input JsonNode is not in the expected format + if (streamingIngestRow.isEmpty()) { + throw SnowflakeErrors.ERROR_0010.getException( + "Not able to convert node to Snowpipe Streaming input format"); + } + return streamingIngestRow; + } +} diff --git a/src/main/java/com/snowflake/kafka/connector/records/StreamingRecordMapper.java b/src/main/java/com/snowflake/kafka/connector/records/StreamingRecordMapper.java new file mode 100644 index 000000000..f58b751bf --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/records/StreamingRecordMapper.java @@ -0,0 +1,42 @@ +package com.snowflake.kafka.connector.records; + +import com.snowflake.kafka.connector.records.RecordService.SnowflakeTableRow; +import java.util.Map; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.NumericNode; + +abstract class StreamingRecordMapper { + + protected final ObjectMapper mapper; + protected final boolean schematizationEnabled; + + public StreamingRecordMapper(ObjectMapper mapper, boolean schematizationEnabled) { + this.mapper = mapper; + this.schematizationEnabled = schematizationEnabled; + } + + abstract Map processSnowflakeRecord( + SnowflakeTableRow row, boolean includeAllMetadata) throws JsonProcessingException; + + protected String getTextualValue(JsonNode valueNode) throws JsonProcessingException { + String value; + if (valueNode.isTextual()) { + value = valueNode.textValue(); + } else if (valueNode.isNull()) { + value = null; + } else { + value = writeValueAsStringOrNan(valueNode); + } + return value; + } + + protected String writeValueAsStringOrNan(JsonNode columnNode) throws JsonProcessingException { + if (columnNode instanceof NumericNode && ((NumericNode) columnNode).isNaN()) { + return "NaN"; + } else { + return mapper.writeValueAsString(columnNode); + } + } +} diff --git a/src/test/java/com/snowflake/kafka/connector/SnowflakeStreamingSinkTaskBuilder.java b/src/test/java/com/snowflake/kafka/connector/SnowflakeStreamingSinkTaskBuilder.java index 914a6c8c2..c057245a6 100644 --- a/src/test/java/com/snowflake/kafka/connector/SnowflakeStreamingSinkTaskBuilder.java +++ b/src/test/java/com/snowflake/kafka/connector/SnowflakeStreamingSinkTaskBuilder.java @@ -21,7 +21,7 @@ import com.snowflake.kafka.connector.internal.streaming.schemaevolution.InsertErrorMapper; import com.snowflake.kafka.connector.internal.streaming.schemaevolution.snowflake.SnowflakeSchemaEvolutionService; import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService; -import com.snowflake.kafka.connector.records.RecordService; +import com.snowflake.kafka.connector.records.RecordServiceFactory; import com.snowflake.kafka.connector.streaming.iceberg.IcebergInitService; import com.snowflake.kafka.connector.streaming.iceberg.IcebergTableSchemaValidator; import java.util.Collections; @@ -123,7 +123,7 @@ private SnowflakeSinkServiceV2 streamingSinkService( 10 * 1024 * 1024, 1, mockConnectionService, - new RecordService(), + RecordServiceFactory.createRecordService(false, Utils.isSchematizationEnabled(config)), mockTelemetryService, mockIcebergTableSchemaValidator, mockIcebergInitService, diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannelTest.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannelTest.java index 92d076536..ddadc27b5 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannelTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannelTest.java @@ -13,7 +13,7 @@ import com.snowflake.kafka.connector.internal.streaming.schemaevolution.InsertErrorMapper; import com.snowflake.kafka.connector.internal.streaming.schemaevolution.SchemaEvolutionService; import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService; -import com.snowflake.kafka.connector.records.RecordService; +import com.snowflake.kafka.connector.records.RecordServiceFactory; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -276,7 +276,7 @@ public void testInsertRows_ValidationResponseHasErrors_NoErrorTolerance() throws mockKafkaRecordErrorReporter, mockSinkTaskContext, mockSnowflakeConnectionService, - new RecordService(), + RecordServiceFactory.createRecordService(false, false), mockTelemetryService, false, null, diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java index 683189189..93210c828 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java @@ -31,6 +31,7 @@ import com.snowflake.kafka.connector.internal.streaming.telemetry.SnowflakeTelemetryChannelStatus; import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService; import com.snowflake.kafka.connector.records.RecordService; +import com.snowflake.kafka.connector.records.RecordServiceFactory; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collection; @@ -268,7 +269,7 @@ public void testCloseChannelException() throws Exception { mockKafkaRecordErrorReporter, mockSinkTaskContext, mockSnowflakeConnectionService, - new RecordService(), + RecordServiceFactory.createRecordService(false, false), mockTelemetryService, false, null, @@ -298,7 +299,7 @@ public void closeChannel_withSFExceptionInFuture_swallowsException() { mockKafkaRecordErrorReporter, mockSinkTaskContext, mockSnowflakeConnectionService, - new RecordService(), + RecordServiceFactory.createRecordService(false, false), mockTelemetryService, false, null, @@ -330,7 +331,7 @@ public void closeChannel_withSFExceptionThrown_swallowsException() { mockKafkaRecordErrorReporter, mockSinkTaskContext, mockSnowflakeConnectionService, - new RecordService(), + RecordServiceFactory.createRecordService(false, false), mockTelemetryService, false, null, @@ -367,7 +368,7 @@ public void closeChannelAsync_withNotSFExceptionInFuture_propagatesException() { mockKafkaRecordErrorReporter, mockSinkTaskContext, mockSnowflakeConnectionService, - new RecordService(), + RecordServiceFactory.createRecordService(false, false), mockTelemetryService, false, null, @@ -406,7 +407,7 @@ public void closeChannelAsync_withSFExceptionInFuture_swallowsException() { mockKafkaRecordErrorReporter, mockSinkTaskContext, mockSnowflakeConnectionService, - new RecordService(), + RecordServiceFactory.createRecordService(false, false), mockTelemetryService, false, null, @@ -438,7 +439,7 @@ public void closeChannelAsync_withSFExceptionThrown_swallowsException() { mockKafkaRecordErrorReporter, mockSinkTaskContext, mockSnowflakeConnectionService, - new RecordService(), + RecordServiceFactory.createRecordService(false, false), mockTelemetryService, false, null, @@ -474,7 +475,7 @@ public void testStreamingChannelMigrationEnabledAndDisabled() { mockKafkaRecordErrorReporter, mockSinkTaskContext, mockSnowflakeConnectionService, - new RecordService(), + RecordServiceFactory.createRecordService(false, false), mockTelemetryService, false, null, @@ -497,7 +498,7 @@ public void testStreamingChannelMigrationEnabledAndDisabled() { mockKafkaRecordErrorReporter, mockSinkTaskContext, mockSnowflakeConnectionService, - new RecordService(), + RecordServiceFactory.createRecordService(false, false), mockTelemetryService, false, null, @@ -521,7 +522,7 @@ public void testStreamingChannelMigrationEnabledAndDisabled() { mockKafkaRecordErrorReporter, mockSinkTaskContext, anotherMockForParamDisabled, - new RecordService(), + RecordServiceFactory.createRecordService(false, false), mockTelemetryService, false, null, @@ -552,7 +553,7 @@ public void testStreamingChannelMigration_InvalidResponse() { mockKafkaRecordErrorReporter, mockSinkTaskContext, mockSnowflakeConnectionService, - new RecordService(), + RecordServiceFactory.createRecordService(false, false), mockTelemetryService, false, null, @@ -889,7 +890,7 @@ public void testInsertRowsWithSchemaEvolution() throws Exception { kafkaRecordErrorReporter, mockSinkTaskContext, conn, - new RecordService(), + RecordServiceFactory.createRecordService(false, false), mockTelemetryService, false, null, @@ -1082,7 +1083,7 @@ public void testTopicPartitionChannelMetrics() throws Exception { this.mockKafkaRecordErrorReporter, this.mockSinkTaskContext, this.mockSnowflakeConnectionService, - new RecordService(), + RecordServiceFactory.createRecordService(false, false), this.mockTelemetryService, true, metricsJmxReporter, @@ -1159,7 +1160,7 @@ public void testTopicPartitionChannelInvalidJmxReporter() throws Exception { this.mockKafkaRecordErrorReporter, this.mockSinkTaskContext, this.mockSnowflakeConnectionService, - new RecordService(), + RecordServiceFactory.createRecordService(false, false), this.mockTelemetryService, true, null, @@ -1324,7 +1325,7 @@ public void assignANewChannelAfterTheSetupIsFullyDone() throws Exception { this.mockKafkaRecordErrorReporter, this.mockSinkTaskContext, this.mockSnowflakeConnectionService, - new RecordService(), + RecordServiceFactory.createRecordService(false, false), this.mockTelemetryService, true, null, @@ -1382,7 +1383,7 @@ public void assignANewChannel_whenNoOffsetIsPresentInSnowflake() { this.mockKafkaRecordErrorReporter, this.mockSinkTaskContext, this.mockSnowflakeConnectionService, - new RecordService(), + RecordServiceFactory.createRecordService(false, false), this.mockTelemetryService, true, null, diff --git a/src/test/java/com/snowflake/kafka/connector/records/AbstractMetaColumnTest.java b/src/test/java/com/snowflake/kafka/connector/records/AbstractMetaColumnTest.java index 32fa62f86..c23d53b5d 100644 --- a/src/test/java/com/snowflake/kafka/connector/records/AbstractMetaColumnTest.java +++ b/src/test/java/com/snowflake/kafka/connector/records/AbstractMetaColumnTest.java @@ -61,7 +61,7 @@ abstract class AbstractMetaColumnTest { @Test public void testKey() throws IOException { - RecordService service = new RecordService(); + RecordService service = RecordServiceFactory.createRecordService(false, false); SchemaAndValue input = getJsonInputData(); long timestamp = System.currentTimeMillis(); @@ -103,7 +103,7 @@ void whenSomeFieldsDisabled(Map config, Set enabledField .withTimestamp(System.currentTimeMillis(), timestampType) .build(); - RecordService service = new RecordService(); + RecordService service = RecordServiceFactory.createRecordService(false, false); service.setMetadataConfig(new SnowflakeMetadataConfig(config)); // when @@ -152,7 +152,7 @@ void whenMetadataDisabled() throws IOException { Map config = ImmutableMap.of(SNOWFLAKE_METADATA_ALL, "false"); - RecordService service = new RecordService(); + RecordService service = RecordServiceFactory.createRecordService(false, false); service.setMetadataConfig(new SnowflakeMetadataConfig(config)); // when @@ -164,7 +164,7 @@ void whenMetadataDisabled() throws IOException { @Test public void testTimeStamp() throws IOException { - RecordService service = new RecordService(); + RecordService service = RecordServiceFactory.createRecordService(false, false); SchemaAndValue input = getJsonInputData(); long timestamp = System.currentTimeMillis(); diff --git a/src/test/java/com/snowflake/kafka/connector/records/HeaderTest.java b/src/test/java/com/snowflake/kafka/connector/records/HeaderTest.java index d81d4a8de..4d3591866 100644 --- a/src/test/java/com/snowflake/kafka/connector/records/HeaderTest.java +++ b/src/test/java/com/snowflake/kafka/connector/records/HeaderTest.java @@ -43,7 +43,7 @@ public void testTypes() throws IOException { // empty headers SinkRecord record = createTestRecord(headers); - RecordService service = new RecordService(); + RecordService service = RecordServiceFactory.createRecordService(false, false); JsonNode node = MAPPER.readTree(service.getProcessedRecordForSnowpipe(record)); assertFalse(node.get("meta").has("headers")); diff --git a/src/test/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapperTest.java b/src/test/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapperTest.java new file mode 100644 index 000000000..226d04824 --- /dev/null +++ b/src/test/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapperTest.java @@ -0,0 +1,402 @@ +package com.snowflake.kafka.connector.records; + +import static com.snowflake.kafka.connector.streaming.iceberg.sql.PrimitiveJsonRecord.primitiveJsonExample; +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.snowflake.kafka.connector.Utils; +import com.snowflake.kafka.connector.records.RecordService.SnowflakeTableRow; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Stream; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +class IcebergTableStreamingRecordMapperTest { + private static final ObjectMapper objectMapper = new ObjectMapper(); + + private static final ImmutableMap primitiveJsonAsMap = + ImmutableMap.of( + "id_int8", + 8, + "id_int16", + 16, + "id_int32", + 32, + "id_int64", + 64, + "description", + "dogs are the best", + "rating_float32", + 0.5, + "rating_float64", + 0.25, + "approval", + true); + + private static final String fullMetadataJsonExample = + "{" + + "\"offset\": 10," + + "\"topic\": \"topic\"," + + "\"partition\": 0," + + "\"key\": \"key\"," + + "\"schema_id\": 1," + + "\"key_schema_id\": 2," + + "\"CreateTime\": 3," + + "\"LogAppendTime\": 4," + + "\"SnowflakeConnectorPushTime\": 5," + + "\"headers\": {\"objectAsJsonStringHeader\": {" + + "\"key1\": \"value1\"," + + "\"key2\": \"value2\"" + + "}," + + "\"header2\": \"testheaderstring\"," + + "\"header3\": 3.5}" + + "}"; + + private static final Map fullMetadataJsonAsMap = + ImmutableMap.of( + "offset", + 10, + "topic", + "topic", + "partition", + 0, + "key", + "key", + "schema_id", + 1, + "key_schema_id", + 2, + "CreateTime", + 3, + "LogAppendTime", + 4, + "SnowflakeConnectorPushTime", + 5, + "headers", + ImmutableMap.of( + "objectAsJsonStringHeader", + "{\"key1\":\"value1\",\"key2\":\"value2\"}", + "header2", + "testheaderstring", + "header3", + "3.5")); + + @ParameterizedTest(name = "{0}") + @MethodSource("prepareSchematizationData") + void shouldMapRecord_schematizationEnabled( + String description, SnowflakeTableRow row, Map expected) + throws JsonProcessingException { + // When + IcebergTableStreamingRecordMapper mapper = + new IcebergTableStreamingRecordMapper(objectMapper, true); + Map result = mapper.processSnowflakeRecord(row, true); + + // Then + assertThat(result).isEqualTo(expected); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("prepareMetadataData") + void shouldMapMetadata(String description, SnowflakeTableRow row, Map expected) + throws JsonProcessingException { + // When + IcebergTableStreamingRecordMapper mapper = + new IcebergTableStreamingRecordMapper(objectMapper, false); + IcebergTableStreamingRecordMapper mapperSchematization = + new IcebergTableStreamingRecordMapper(objectMapper, true); + Map result = mapper.processSnowflakeRecord(row, true); + Map resultSchematized = mapperSchematization.processSnowflakeRecord(row, true); + + // Then + assertThat(result.get(Utils.TABLE_COLUMN_METADATA)).isEqualTo(expected); + assertThat(resultSchematized.get(Utils.TABLE_COLUMN_METADATA)).isEqualTo(expected); + } + + @Test + void shouldSkipMapMetadata() throws JsonProcessingException { + // Given + SnowflakeTableRow row = buildRow(primitiveJsonExample); + + // When + IcebergTableStreamingRecordMapper mapper = + new IcebergTableStreamingRecordMapper(objectMapper, false); + IcebergTableStreamingRecordMapper mapperSchematization = + new IcebergTableStreamingRecordMapper(objectMapper, true); + Map result = mapper.processSnowflakeRecord(row, false); + Map resultSchematized = mapperSchematization.processSnowflakeRecord(row, false); + + // Then + assertThat(result).doesNotContainKey(Utils.TABLE_COLUMN_METADATA); + assertThat(resultSchematized).doesNotContainKey(Utils.TABLE_COLUMN_METADATA); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("prepareNoSchematizationData") + void shouldMapRecord_schematizationDisabled( + String description, SnowflakeTableRow row, Map expected) + throws JsonProcessingException { + // When + IcebergTableStreamingRecordMapper mapper = + new IcebergTableStreamingRecordMapper(objectMapper, false); + Map result = mapper.processSnowflakeRecord(row, true); + + // Then + assertThat(result).isEqualTo(expected); + } + + private static Stream prepareSchematizationData() throws JsonProcessingException { + return Stream.of( + Arguments.of( + "Empty JSON", + buildRow("{}"), + ImmutableMap.of(Utils.TABLE_COLUMN_METADATA, fullMetadataJsonAsMap)), + Arguments.of( + "Simple JSON", + buildRow("{\"key\": \"value\"}"), + ImmutableMap.of( + "\"KEY\"", "value", Utils.TABLE_COLUMN_METADATA, fullMetadataJsonAsMap)), + Arguments.of( + "Already quoted key JSON", + buildRow("{\"\\\"key\\\"\": \"value\"}"), + ImmutableMap.of( + "\"key\"", "value", Utils.TABLE_COLUMN_METADATA, fullMetadataJsonAsMap)), + Arguments.of( + "Already quoted UPPERCASE key JSON", + buildRow("{\"\\\"KEY\\\"\": \"value\"}"), + ImmutableMap.of( + "\"KEY\"", "value", Utils.TABLE_COLUMN_METADATA, fullMetadataJsonAsMap)), + Arguments.of( + "JSON with different primitive types", + buildRow(primitiveJsonExample), + ImmutableMap.of( + "\"ID_INT8\"", + 8, + "\"ID_INT16\"", + 16, + "\"ID_INT32\"", + 32, + "\"ID_INT64\"", + 64, + "\"DESCRIPTION\"", + "dogs are the best", + "\"RATING_FLOAT32\"", + 0.5, + "\"RATING_FLOAT64\"", + 0.25, + "\"APPROVAL\"", + true, + Utils.TABLE_COLUMN_METADATA, + fullMetadataJsonAsMap)), + Arguments.of( + "Nested array", + buildRow("{\"key\": [" + primitiveJsonExample + ", " + primitiveJsonExample + "]}"), + ImmutableMap.of( + "\"KEY\"", + ImmutableList.of(primitiveJsonAsMap, primitiveJsonAsMap), + Utils.TABLE_COLUMN_METADATA, + fullMetadataJsonAsMap)), + Arguments.of( + "Empty nested array", + buildRow("{\"key\": []}"), + ImmutableMap.of( + "\"KEY\"", ImmutableList.of(), Utils.TABLE_COLUMN_METADATA, fullMetadataJsonAsMap)), + Arguments.of( + "Empty object", + buildRow("{\"key\": {}}"), + ImmutableMap.of( + "\"KEY\"", ImmutableMap.of(), Utils.TABLE_COLUMN_METADATA, fullMetadataJsonAsMap)), + Arguments.of( + "Nested object", + buildRow("{\"key\":" + primitiveJsonExample + "}"), + ImmutableMap.of( + "\"KEY\"", primitiveJsonAsMap, Utils.TABLE_COLUMN_METADATA, fullMetadataJsonAsMap)), + Arguments.of( + "Double nested object", + buildRow("{\"key\":{\"key2\":" + primitiveJsonExample + "}}"), + ImmutableMap.of( + "\"KEY\"", + ImmutableMap.of("key2", primitiveJsonAsMap), + Utils.TABLE_COLUMN_METADATA, + fullMetadataJsonAsMap)), + Arguments.of( + "Nested objects and primitive types", + buildRow( + primitiveJsonExample.replaceFirst("}", "") + + ",\"key\":" + + primitiveJsonExample + + "}"), + ImmutableMap.of( + "\"ID_INT8\"", + 8, + "\"ID_INT16\"", + 16, + "\"ID_INT32\"", + 32, + "\"ID_INT64\"", + 64, + "\"DESCRIPTION\"", + "dogs are the best", + "\"RATING_FLOAT32\"", + 0.5, + "\"RATING_FLOAT64\"", + 0.25, + "\"APPROVAL\"", + true, + "\"KEY\"", + primitiveJsonAsMap, + Utils.TABLE_COLUMN_METADATA, + fullMetadataJsonAsMap))); + } + + private static Stream prepareMetadataData() throws JsonProcessingException { + return Stream.of( + Arguments.of("Full metadata", buildRow("{}"), fullMetadataJsonAsMap), + Arguments.of( + "Empty metadata", buildRow("{}", "{}"), ImmutableMap.of("headers", ImmutableMap.of())), + Arguments.of( + "Metadata with null headers", + buildRow("{}", "{\"headers\": null}"), + ImmutableMap.of("headers", ImmutableMap.of())), + Arguments.of( + "Metadata with empty headers", + buildRow("{}", "{\"headers\": {}}"), + ImmutableMap.of("headers", ImmutableMap.of())), + Arguments.of( + "Metadata with headers with null keys", + buildRow("{}", "{\"headers\": {\"key\": null}}"), + ImmutableMap.of("headers", mapWithNullableValuesOf("key", null))), + Arguments.of( + "Metadata with headers with nested null keys", + buildRow("{}", "{\"headers\": {\"key\": {\"key2\": null }}}"), + ImmutableMap.of("headers", ImmutableMap.of("key", "{\"key2\":null}"))), + Arguments.of( + "Metadata with null field value", + buildRow("{}", "{\"offset\": null}"), + mapWithNullableValuesOf("offset", null, "headers", ImmutableMap.of()))); + } + + private static Stream prepareNoSchematizationData() throws JsonProcessingException { + return Stream.of( + Arguments.of( + "Empty JSON", + buildRow("{}"), + ImmutableMap.of( + Utils.TABLE_COLUMN_CONTENT, + ImmutableMap.of(), + Utils.TABLE_COLUMN_METADATA, + fullMetadataJsonAsMap)), + Arguments.of( + "Simple JSON", + buildRow("{\"key\": \"value\"}"), + ImmutableMap.of( + Utils.TABLE_COLUMN_CONTENT, + ImmutableMap.of("key", "value"), + Utils.TABLE_COLUMN_METADATA, + fullMetadataJsonAsMap)), + Arguments.of( + "UPPERCASE key JSON", + buildRow("{\"KEY\": \"value\"}"), + ImmutableMap.of( + Utils.TABLE_COLUMN_CONTENT, + ImmutableMap.of("KEY", "value"), + Utils.TABLE_COLUMN_METADATA, + fullMetadataJsonAsMap)), + Arguments.of( + "JSON with different primitive types", + buildRow(primitiveJsonExample), + ImmutableMap.of( + Utils.TABLE_COLUMN_CONTENT, + primitiveJsonAsMap, + Utils.TABLE_COLUMN_METADATA, + fullMetadataJsonAsMap)), + Arguments.of( + "Nested array", + buildRow("{\"key\": [" + primitiveJsonExample + ", " + primitiveJsonExample + "]}"), + ImmutableMap.of( + Utils.TABLE_COLUMN_CONTENT, + ImmutableMap.of("key", ImmutableList.of(primitiveJsonAsMap, primitiveJsonAsMap)), + Utils.TABLE_COLUMN_METADATA, + fullMetadataJsonAsMap)), + Arguments.of( + "Empty nested array", + buildRow("{\"key\": []}"), + ImmutableMap.of( + Utils.TABLE_COLUMN_CONTENT, + ImmutableMap.of("key", ImmutableList.of()), + Utils.TABLE_COLUMN_METADATA, + fullMetadataJsonAsMap)), + Arguments.of( + "Empty object", + buildRow("{\"key\": {}}"), + ImmutableMap.of( + Utils.TABLE_COLUMN_CONTENT, + ImmutableMap.of("key", ImmutableMap.of()), + Utils.TABLE_COLUMN_METADATA, + fullMetadataJsonAsMap)), + Arguments.of( + "Nested object", + buildRow("{\"key\":" + primitiveJsonExample + "}"), + ImmutableMap.of( + Utils.TABLE_COLUMN_CONTENT, + ImmutableMap.of("key", primitiveJsonAsMap), + Utils.TABLE_COLUMN_METADATA, + fullMetadataJsonAsMap)), + Arguments.of( + "Double nested object", + buildRow("{\"key\":{\"key2\":" + primitiveJsonExample + "}}"), + ImmutableMap.of( + Utils.TABLE_COLUMN_CONTENT, + ImmutableMap.of("key", ImmutableMap.of("key2", primitiveJsonAsMap)), + Utils.TABLE_COLUMN_METADATA, + fullMetadataJsonAsMap)), + Arguments.of( + "Nested objects and primitive types", + buildRow( + primitiveJsonExample.replaceFirst("}", "") + + ",\"key\":" + + primitiveJsonExample + + "}"), + ImmutableMap.of( + Utils.TABLE_COLUMN_CONTENT, + addToMap(primitiveJsonAsMap, "key", primitiveJsonAsMap), + Utils.TABLE_COLUMN_METADATA, + fullMetadataJsonAsMap))); + } + + private static Map addToMap(Map map, String key, T value) { + HashMap newMap = new HashMap<>(map); + newMap.put(key, value); + return newMap; + } + + private static Map mapWithNullableValuesOf(String key, T value) { + Map map = new HashMap<>(); + map.put(key, value); + return map; + } + + private static Map mapWithNullableValuesOf( + String key, T value, String key2, T value2) { + Map map = new HashMap<>(); + map.put(key, value); + map.put(key2, value2); + return map; + } + + private static SnowflakeTableRow buildRow(String content) throws JsonProcessingException { + return buildRow(content, fullMetadataJsonExample); + } + + private static SnowflakeTableRow buildRow(String content, String metadata) + throws JsonProcessingException { + return new SnowflakeTableRow( + new SnowflakeRecordContent(objectMapper.readTree(content)), + objectMapper.readTree(metadata)); + } +} diff --git a/src/test/java/com/snowflake/kafka/connector/records/ProcessRecordTest.java b/src/test/java/com/snowflake/kafka/connector/records/ProcessRecordTest.java index 1da00bc39..c8cb098b7 100644 --- a/src/test/java/com/snowflake/kafka/connector/records/ProcessRecordTest.java +++ b/src/test/java/com/snowflake/kafka/connector/records/ProcessRecordTest.java @@ -29,7 +29,7 @@ public class ProcessRecordTest { @ParameterizedTest(name = "{index}: {0}") @MethodSource("data") public void test(Case testCase) throws IOException { - RecordService service = new RecordService(); + RecordService service = RecordServiceFactory.createRecordService(false, false); SinkRecord record = new SinkRecord( diff --git a/src/test/java/com/snowflake/kafka/connector/records/RecordContentTest.java b/src/test/java/com/snowflake/kafka/connector/records/RecordContentTest.java index 66d61de63..e1796018b 100644 --- a/src/test/java/com/snowflake/kafka/connector/records/RecordContentTest.java +++ b/src/test/java/com/snowflake/kafka/connector/records/RecordContentTest.java @@ -137,7 +137,7 @@ public void test() throws IOException { public void recordService_getProcessedRecordForSnowpipe_whenInvalidSchema_throwException( Schema schema, Object value) { // given - RecordService service = new RecordService(); + RecordService service = RecordServiceFactory.createRecordService(false, false); SinkRecord record = SinkRecordBuilder.forTopicPartition(TOPIC, PARTITION) .withValueSchema(schema) @@ -162,7 +162,7 @@ public static Stream invalidSchemaSource() throws JsonProcessingExcep @MethodSource("invalidPutKeyInputSource") public void recordService_putKey_whenInvalidInput_throwException(Schema keySchema, Object key) { // given - RecordService service = new RecordService(); + RecordService service = RecordServiceFactory.createRecordService(false, false); SinkRecord record = SinkRecordBuilder.forTopicPartition(TOPIC, PARTITION) .withKeySchema(keySchema) @@ -216,10 +216,9 @@ public void testConvertToJsonReadOnlyByteBuffer() { @Test public void testSchematizationStringField() throws JsonProcessingException { - RecordService service = new RecordService(); + RecordService service = RecordServiceFactory.createRecordService(false, true); SnowflakeJsonConverter jsonConverter = new SnowflakeJsonConverter(); - service.setEnableSchematization(true); String value = "{\"name\":\"sf\",\"answer\":42}"; byte[] valueContents = (value).getBytes(StandardCharsets.UTF_8); SchemaAndValue sv = jsonConverter.toConnectData(TOPIC, valueContents); @@ -237,10 +236,9 @@ public void testSchematizationStringField() throws JsonProcessingException { @Test public void testSchematizationArrayOfObject() throws JsonProcessingException { - RecordService service = new RecordService(); + RecordService service = RecordServiceFactory.createRecordService(false, true); SnowflakeJsonConverter jsonConverter = new SnowflakeJsonConverter(); - service.setEnableSchematization(true); String value = "{\"players\":[{\"name\":\"John Doe\",\"age\":30},{\"name\":\"Jane Doe\",\"age\":30}]}"; byte[] valueContents = (value).getBytes(StandardCharsets.UTF_8); @@ -257,10 +255,9 @@ public void testSchematizationArrayOfObject() throws JsonProcessingException { @Test public void testColumnNameFormatting() throws JsonProcessingException { - RecordService service = new RecordService(); + RecordService service = RecordServiceFactory.createRecordService(false, true); SnowflakeJsonConverter jsonConverter = new SnowflakeJsonConverter(); - service.setEnableSchematization(true); String value = "{\"\\\"NaMe\\\"\":\"sf\",\"AnSwEr\":42}"; byte[] valueContents = (value).getBytes(StandardCharsets.UTF_8); SchemaAndValue sv = jsonConverter.toConnectData(TOPIC, valueContents); @@ -341,7 +338,7 @@ public void testGetProcessedRecord() throws JsonProcessingException { private void testGetProcessedRecordRunner( SinkRecord record, String expectedRecordContent, String expectedRecordMetadataKey) throws JsonProcessingException { - RecordService service = new RecordService(); + RecordService service = RecordServiceFactory.createRecordService(false, false); Map recordData = service.getProcessedRecordForStreamingIngest(record); assertEquals(2, recordData.size()); diff --git a/src/test/java/com/snowflake/kafka/connector/records/SnowpipeMetaColumnTest.java b/src/test/java/com/snowflake/kafka/connector/records/SnowpipeMetaColumnTest.java index 29dca3d25..f3b308820 100644 --- a/src/test/java/com/snowflake/kafka/connector/records/SnowpipeMetaColumnTest.java +++ b/src/test/java/com/snowflake/kafka/connector/records/SnowpipeMetaColumnTest.java @@ -39,7 +39,7 @@ void connectorTimestamp_alwaysNull() throws JsonProcessingException { .withValue(input.value()) .build(); - RecordService service = new RecordService(); + RecordService service = RecordServiceFactory.createRecordService(false, false); Map config = ImmutableMap.of(SNOWFLAKE_STREAMING_METADATA_CONNECTOR_PUSH_TIME, "true"); diff --git a/src/test/java/com/snowflake/kafka/connector/records/SnowpipeStreamingMetaColumnTest.java b/src/test/java/com/snowflake/kafka/connector/records/SnowpipeStreamingMetaColumnTest.java index ae4bf79ed..895d0bacc 100644 --- a/src/test/java/com/snowflake/kafka/connector/records/SnowpipeStreamingMetaColumnTest.java +++ b/src/test/java/com/snowflake/kafka/connector/records/SnowpipeStreamingMetaColumnTest.java @@ -48,7 +48,10 @@ void connectorTimestamp_byDefault_writes() throws JsonProcessingException { Instant fixedNow = Instant.now(); Clock fixedClock = Clock.fixed(fixedNow, ZoneOffset.UTC); - RecordService service = new RecordService(fixedClock); + ObjectMapper mapper = new ObjectMapper(); + RecordService service = + new RecordService( + fixedClock, new SnowflakeTableStreamingRecordMapper(mapper, false), mapper); // when Map recordData = service.getProcessedRecordForStreamingIngest(record); @@ -69,7 +72,7 @@ void connectorTimestamp_whenDisabled_ignores() throws JsonProcessingException { .withValue(input.value()) .build(); - RecordService service = new RecordService(); + RecordService service = RecordServiceFactory.createRecordService(false, false); Map config = ImmutableMap.of(SNOWFLAKE_STREAMING_METADATA_CONNECTOR_PUSH_TIME, "false"); diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java index 26b1d0fa7..11dd31868 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java @@ -11,6 +11,7 @@ import com.snowflake.kafka.connector.internal.TestUtils; import com.snowflake.kafka.connector.internal.streaming.InMemorySinkTaskContext; import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; +import com.snowflake.kafka.connector.streaming.iceberg.sql.ComplexJsonRecord; import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord.RecordWithMetadata; import com.snowflake.kafka.connector.streaming.iceberg.sql.PrimitiveJsonRecord; import java.nio.charset.StandardCharsets; @@ -19,8 +20,11 @@ import java.util.List; import java.util.Map; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.jupiter.api.AfterEach; @@ -36,69 +40,6 @@ public abstract class IcebergIngestionIT extends BaseIcebergIT { protected SnowflakeSinkService service; protected static final String simpleRecordJson = "{\"simple\": \"extra field\"}"; - protected static final PrimitiveJsonRecord primitiveJsonRecordValue = - // FIXME: there is currently some bug in Iceberg when storing int64 values - new PrimitiveJsonRecord(8L, 16L, 32L, /*64L,*/ "dogs are the best", 0.5, 0.25, true); - protected static final PrimitiveJsonRecord emptyPrimitiveJsonRecordValue = - // FIXME: there is currently some bug in Iceberg when storing int64 values - new PrimitiveJsonRecord(0L, 0L, 0L, /*0L, */ null, 0.0, 0.0, false); - protected static final String primitiveJson = - "{" - + " \"id_int8\": 8," - + " \"id_int16\": 16," - + " \"id_int32\": 32," - + " \"id_int64\": 64," - + " \"description\": \"dogs are the best\"," - + " \"rating_float32\": 0.5," - + " \"rating_float64\": 0.25," - + " \"approval\": true" - + "}"; - - protected static final String primitiveJsonWithSchema = - "{" - + " \"schema\": {" - + " \"type\": \"struct\"," - + " \"fields\": [" - + " {" - + " \"field\": \"id_int8\"," - + " \"type\": \"int8\"" - + " }," - + " {" - + " \"field\": \"id_int16\"," - + " \"type\": \"int16\"" - + " }," - + " {" - + " \"field\": \"id_int32\"," - + " \"type\": \"int32\"" - + " }," - + " {" - + " \"field\": \"id_int64\"," - + " \"type\": \"int64\"" - + " }," - + " {" - + " \"field\": \"description\"," - + " \"type\": \"string\"" - + " }," - + " {" - + " \"field\": \"rating_float32\"," - + " \"type\": \"float\"" - + " }," - + " {" - + " \"field\": \"rating_float64\"," - + " \"type\": \"double\"" - + " }," - + " {" - + " \"field\": \"approval\"," - + " \"type\": \"boolean\"" - + " }" - + " ]," - + " \"optional\": false," - + " \"name\": \"sf.kc.test\"" - + " }," - + " \"payload\": " - + primitiveJson - + "}"; - @BeforeEach public void setUp() { tableName = TestUtils.randomTableName(); @@ -148,6 +89,14 @@ protected SinkRecord createKafkaRecord(String jsonString, int offset, boolean wi Collections.singletonMap("schemas.enable", Boolean.toString(withSchema)), false); SchemaAndValue inputValue = converter.toConnectData(topic, jsonString.getBytes(StandardCharsets.UTF_8)); + Headers headers = new ConnectHeaders(); + headers.addBoolean("booleanHeader", true); + headers.addString("stringHeader", "test"); + headers.addInt("intHeader", 123); + headers.addDouble("doubleHeader", 1.234); + headers.addFloat("floatHeader", 1.234f); + headers.addLong("longHeader", 123L); + headers.addShort("shortHeader", (short) 123); return new SinkRecord( topic, PARTITION, @@ -155,7 +104,10 @@ protected SinkRecord createKafkaRecord(String jsonString, int offset, boolean wi "test", inputValue.schema(), inputValue.value(), - offset); + offset, + System.currentTimeMillis(), + TimestampType.CREATE_TIME, + headers); } private final String selectAllSortByOffset = @@ -174,4 +126,9 @@ protected List> selectAllSchematizedReco protected List> selectAllFromRecordContent() { return select(tableName, selectAllSortByOffset, PrimitiveJsonRecord::fromRecordContentColumn); } + + protected List> + selectAllComplexJsonRecordFromRecordContent() { + return select(tableName, selectAllSortByOffset, ComplexJsonRecord::fromRecordContentColumn); + } } diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionNoSchemaEvolutionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionNoSchemaEvolutionIT.java index 8e8c63c6f..a58d42239 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionNoSchemaEvolutionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionNoSchemaEvolutionIT.java @@ -1,11 +1,12 @@ package com.snowflake.kafka.connector.streaming.iceberg; +import static com.snowflake.kafka.connector.streaming.iceberg.sql.ComplexJsonRecord.complexJsonRecordValueExample; import static org.assertj.core.api.Assertions.assertThat; import com.snowflake.kafka.connector.Utils; +import com.snowflake.kafka.connector.streaming.iceberg.sql.ComplexJsonRecord; import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord; import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord.RecordWithMetadata; -import com.snowflake.kafka.connector.streaming.iceberg.sql.PrimitiveJsonRecord; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -18,7 +19,7 @@ public class IcebergIngestionNoSchemaEvolutionIT extends IcebergIngestionIT { - private static final String RECORD_CONTENT_OBJECT_SCHEMA = + private static final String PRIMITIVE_JSON_RECORD_CONTENT_OBJECT_SCHEMA = "object(" + "id_int8 NUMBER(10,0)," + "id_int16 NUMBER(10,0)," @@ -30,6 +31,28 @@ public class IcebergIngestionNoSchemaEvolutionIT extends IcebergIngestionIT { + "approval BOOLEAN" + ")"; + private static final String COMPLEX_JSON_RECORD_CONTENT_OBJECT_SCHEMA = + "object(" + + "id_int8 NUMBER(10,0)," + + "id_int16 NUMBER(10,0)," + + "id_int32 NUMBER(10,0)," + + "id_int64 NUMBER(19,0)," + + "description VARCHAR(16777216)," + + "rating_float32 FLOAT," + + "rating_float64 FLOAT," + + "approval BOOLEAN," + + "array1 ARRAY(LONG)," + + "array2 ARRAY(VARCHAR(16777216))," + + "array3 ARRAY(BOOLEAN)," + + "array4 ARRAY(LONG)," + + "array5 ARRAY(ARRAY(LONG))," + + "nestedRecord " + + PRIMITIVE_JSON_RECORD_CONTENT_OBJECT_SCHEMA + + "," + + "nestedRecord2 " + + PRIMITIVE_JSON_RECORD_CONTENT_OBJECT_SCHEMA + + ")"; + @Override protected Boolean isSchemaEvolutionEnabled() { return false; @@ -45,13 +68,14 @@ protected void createIcebergTable() { + ", " + Utils.TABLE_COLUMN_CONTENT + " " - + RECORD_CONTENT_OBJECT_SCHEMA); + + COMPLEX_JSON_RECORD_CONTENT_OBJECT_SCHEMA); } private static Stream prepareData() { return Stream.of( - Arguments.of("Primitive JSON with schema", primitiveJsonWithSchema, true), - Arguments.of("Primitive JSON without schema", primitiveJson, false)); + Arguments.of( + "Complex JSON with schema", ComplexJsonRecord.complexJsonWithSchemaExample, true), + Arguments.of("Complex JSON without schema", ComplexJsonRecord.complexJsonExample, false)); } @ParameterizedTest(name = "{0}") @@ -70,13 +94,15 @@ void shouldInsertRecords(String description, String message, boolean withSchema) } private void assertRecordsInTable() { - List> recordsWithMetadata = - selectAllFromRecordContent(); + List> recordsWithMetadata = + selectAllComplexJsonRecordFromRecordContent(); assertThat(recordsWithMetadata) .hasSize(3) .extracting(RecordWithMetadata::getRecord) .containsExactly( - primitiveJsonRecordValue, primitiveJsonRecordValue, primitiveJsonRecordValue); + complexJsonRecordValueExample, + complexJsonRecordValueExample, + complexJsonRecordValueExample); List metadataRecords = recordsWithMetadata.stream() .map(RecordWithMetadata::getMetadata) diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java index 27c35fb09..08f0bf2f3 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java @@ -1,5 +1,9 @@ package com.snowflake.kafka.connector.streaming.iceberg; +import static com.snowflake.kafka.connector.streaming.iceberg.sql.PrimitiveJsonRecord.emptyPrimitiveJsonRecordValueExample; +import static com.snowflake.kafka.connector.streaming.iceberg.sql.PrimitiveJsonRecord.primitiveJsonExample; +import static com.snowflake.kafka.connector.streaming.iceberg.sql.PrimitiveJsonRecord.primitiveJsonRecordValueExample; +import static com.snowflake.kafka.connector.streaming.iceberg.sql.PrimitiveJsonRecord.primitiveJsonWithSchemaExample; import static org.assertj.core.api.Assertions.assertThat; import com.snowflake.kafka.connector.Utils; @@ -85,7 +89,9 @@ private void assertRecordsInTable() { .hasSize(3) .extracting(RecordWithMetadata::getRecord) .containsExactly( - primitiveJsonRecordValue, primitiveJsonRecordValue, emptyPrimitiveJsonRecordValue); + primitiveJsonRecordValueExample, + primitiveJsonRecordValueExample, + emptyPrimitiveJsonRecordValueExample); List metadataRecords = recordsWithMetadata.stream() .map(RecordWithMetadata::getMetadata) @@ -105,7 +111,7 @@ private static Stream prepareData() { return Stream.of( Arguments.of( "Primitive JSON with schema", - primitiveJsonWithSchema, + primitiveJsonWithSchemaExample, new DescribeTableRow[] { new DescribeTableRow("ID_INT8", "NUMBER(10,0)"), new DescribeTableRow("ID_INT16", "NUMBER(10,0)"), @@ -119,7 +125,7 @@ private static Stream prepareData() { true), Arguments.of( "Primitive JSON without schema", - primitiveJson, + primitiveJsonExample, new DescribeTableRow[] { new DescribeTableRow("ID_INT8", "NUMBER(19,0)"), new DescribeTableRow("ID_INT16", "NUMBER(19,0)"), diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/ComplexJsonRecord.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/ComplexJsonRecord.java new file mode 100644 index 000000000..b8d88fa70 --- /dev/null +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/ComplexJsonRecord.java @@ -0,0 +1,381 @@ +package com.snowflake.kafka.connector.streaming.iceberg.sql; + +import static net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; + +import com.google.common.collect.ImmutableList; +import com.snowflake.kafka.connector.Utils; +import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord.RecordWithMetadata; +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.annotation.JsonCreator; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.annotation.JsonProperty; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; +import org.assertj.core.api.Assertions; + +public class ComplexJsonRecord { + + public static final ComplexJsonRecord complexJsonRecordValueExample = + new ComplexJsonRecord( + 8L, + 16L, + 32L, + 64L, + "dogs are the best", + 0.5, + 0.25, + true, + ImmutableList.of(1, 2, 3), + ImmutableList.of("a", "b", "c"), + ImmutableList.of(true), // FIXME: SNOW-1761519 here should be empty array + null, + ImmutableList.of(ImmutableList.of(7, 8, 9), ImmutableList.of(10, 11, 12)), + PrimitiveJsonRecord.primitiveJsonRecordValueExample, + null); + + public static final String complexJsonExample = + "{" + + " \"id_int8\": 8," + + " \"id_int16\": 16," + + " \"id_int32\": 32," + + " \"id_int64\": 64," + + " \"description\": \"dogs are the best\"," + + " \"rating_float32\": 0.5," + + " \"rating_float64\": 0.25," + + " \"approval\": true," + + " \"array1\": [1, 2, 3]," + + " \"array2\": [\"a\", \"b\", \"c\"]," + + " \"array3\": [true]," // FIXME: SNOW-1761519 here should be empty array + + " \"array4\": null," + + " \"array5\": [[7, 8, 9], [10, 11, 12]]," + + " \"nestedRecord\": " + + PrimitiveJsonRecord.primitiveJsonExample + + "," + + " \"nestedRecord2\": null" + + "}"; + + public static final String complexJsonWithSchemaExample = + "{" + + " \"schema\": {" + + " \"type\": \"struct\"," + + " \"fields\": [" + + " {" + + " \"field\": \"id_int8\"," + + " \"type\": \"int8\"" + + " }," + + " {" + + " \"field\": \"id_int16\"," + + " \"type\": \"int16\"" + + " }," + + " {" + + " \"field\": \"id_int32\"," + + " \"type\": \"int32\"" + + " }," + + " {" + + " \"field\": \"id_int64\"," + + " \"type\": \"int64\"" + + " }," + + " {" + + " \"field\": \"description\"," + + " \"type\": \"string\"" + + " }," + + " {" + + " \"field\": \"rating_float32\"," + + " \"type\": \"float\"" + + " }," + + " {" + + " \"field\": \"rating_float64\"," + + " \"type\": \"double\"" + + " }," + + " {" + + " \"field\": \"approval\"," + + " \"type\": \"boolean\"" + + " }," + + " {" + + " \"field\": \"array1\"," + + " \"type\": \"array\"," + + " \"items\": {" + + " \"type\": \"int32\"" + + " }" + + " }," + + " {" + + " \"field\": \"array2\"," + + " \"type\": \"array\"," + + " \"items\": {" + + " \"type\": \"string\"" + + " }" + + " }," + + " {" + + " \"field\": \"array3\"," + + " \"type\": \"array\"," + + " \"items\": {" + + " \"type\": \"boolean\"" + + " }" + + " }," + + " {" + + " \"field\": \"array4\"," + + " \"type\": \"array\"," + + " \"items\": {" + + " \"type\": \"int32\"" + + " }," + + " \"optional\": true" + + " }," + + " {" + + " \"field\": \"array5\"," + + " \"type\": \"array\"," + + " \"items\": {" + + " \"type\": \"array\"," + + " \"items\": {" + + " \"type\": \"int32\"" + + " }" + + " }" + + " }," + + " {" + + " \"field\": \"nestedRecord\"," + + " \"type\": \"struct\"," + + " \"fields\": [" + + " {" + + " \"field\": \"id_int8\"," + + " \"type\": \"int8\"" + + " }," + + " {" + + " \"field\": \"id_int16\"," + + " \"type\": \"int16\"" + + " }," + + " {" + + " \"field\": \"id_int32\"," + + " \"type\": \"int32\"" + + " }," + + " {" + + " \"field\": \"id_int64\"," + + " \"type\": \"int64\"" + + " }," + + " {" + + " \"field\": \"description\"," + + " \"type\": \"string\"" + + " }," + + " {" + + " \"field\": \"rating_float32\"," + + " \"type\": \"float\"" + + " }," + + " {" + + " \"field\": \"rating_float64\"," + + " \"type\": \"double\"" + + " }," + + " {" + + " \"field\": \"approval\"," + + " \"type\": \"boolean\"" + + " }" + + " ]," + + " \"optional\": true," + + " \"name\": \"sf.kc.test\"" + + " }," + + " {" + + " \"field\": \"nestedRecord2\"," + + " \"type\": \"struct\"," + + " \"fields\": [" + + " {" + + " \"field\": \"id_int8\"," + + " \"type\": \"int8\"" + + " }," + + " {" + + " \"field\": \"id_int16\"," + + " \"type\": \"int16\"" + + " }," + + " {" + + " \"field\": \"id_int32\"," + + " \"type\": \"int32\"" + + " }," + + " {" + + " \"field\": \"id_int64\"," + + " \"type\": \"int64\"" + + " }," + + " {" + + " \"field\": \"description\"," + + " \"type\": \"string\"" + + " }," + + " {" + + " \"field\": \"rating_float32\"," + + " \"type\": \"float\"" + + " }," + + " {" + + " \"field\": \"rating_float64\"," + + " \"type\": \"double\"" + + " }," + + " {" + + " \"field\": \"approval\"," + + " \"type\": \"boolean\"" + + " }" + + " ]," + + " \"optional\": true," + + " \"name\": \"sf.kc.test\"" + + " }" + + " ]," + + " \"optional\": false," + + " \"name\": \"sf.kc.test\"" + + " }," + + " \"payload\": " + + complexJsonExample + + "}"; + + private static final ObjectMapper MAPPER = + new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false); + + private final Long idInt8; + + private final Long idInt16; + + private final Long idInt32; + + private final Long idInt64; + + private final String description; + + private final Double ratingFloat32; + + private final Double ratingFloat64; + + private final Boolean approval; + + private final List array1; + private final List array2; + private final List array3; + private final List array4; + private final List> array5; + + private final PrimitiveJsonRecord nestedRecord; + private final PrimitiveJsonRecord nestedRecord2; + + @JsonCreator + public ComplexJsonRecord( + @JsonProperty("id_int8") Long idInt8, + @JsonProperty("id_int16") Long idInt16, + @JsonProperty("id_int32") Long idInt32, + @JsonProperty("id_int64") Long idInt64, + @JsonProperty("description") String description, + @JsonProperty("rating_float32") Double ratingFloat32, + @JsonProperty("rating_float64") Double ratingFloat64, + @JsonProperty("approval") Boolean approval, + @JsonProperty("array1") List array1, + @JsonProperty("array2") List array2, + @JsonProperty("array3") List array3, + @JsonProperty("array4") List array4, + @JsonProperty("array5") List> array5, + @JsonProperty("nestedRecord") PrimitiveJsonRecord nestedRecord, + @JsonProperty("nestedRecord2") PrimitiveJsonRecord nestedRecord2) { + this.idInt8 = idInt8; + this.idInt16 = idInt16; + this.idInt32 = idInt32; + this.idInt64 = idInt64; + this.description = description; + this.ratingFloat32 = ratingFloat32; + this.ratingFloat64 = ratingFloat64; + this.approval = approval; + this.array1 = array1; + this.array2 = array2; + this.array3 = array3; + this.array4 = array4; + this.array5 = array5; + this.nestedRecord = nestedRecord; + this.nestedRecord2 = nestedRecord2; + } + + public static List> fromRecordContentColumn( + ResultSet resultSet) { + List> records = new ArrayList<>(); + + try { + while (resultSet.next()) { + String jsonString = resultSet.getString(Utils.TABLE_COLUMN_CONTENT); + ComplexJsonRecord record = MAPPER.readValue(jsonString, ComplexJsonRecord.class); + MetadataRecord metadata = MetadataRecord.fromMetadataSingleRow(resultSet); + records.add(RecordWithMetadata.of(metadata, record)); + } + } catch (SQLException | IOException e) { + Assertions.fail("Couldn't map ResultSet to ComplexJsonRecord: " + e.getMessage()); + } + return records; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ComplexJsonRecord that = (ComplexJsonRecord) o; + return Objects.equals(idInt8, that.idInt8) + && Objects.equals(idInt16, that.idInt16) + && Objects.equals(idInt32, that.idInt32) + && Objects.equals(idInt64, that.idInt64) + && Objects.equals(description, that.description) + && Objects.equals(ratingFloat32, that.ratingFloat32) + && Objects.equals(ratingFloat64, that.ratingFloat64) + && Objects.equals(approval, that.approval) + && Objects.equals(array1, that.array1) + && Objects.equals(array2, that.array2) + && Objects.equals(array3, that.array3) + && Objects.equals(array4, that.array4) + && Objects.equals(array5, that.array5) + && Objects.equals(nestedRecord, that.nestedRecord) + && Objects.equals(nestedRecord2, that.nestedRecord2); + } + + @Override + public int hashCode() { + return Objects.hash( + idInt8, + idInt16, + idInt32, + idInt64, + description, + ratingFloat32, + ratingFloat64, + approval, + array1, + array2, + array3, + array4, + array5, + nestedRecord, + nestedRecord2); + } + + @Override + public String toString() { + return "ComplexJsonRecord{" + + "idInt8=" + + idInt8 + + ", idInt16=" + + idInt16 + + ", idInt32=" + + idInt32 + + ", idInt64=" + + idInt64 + + ", description='" + + description + + '\'' + + ", ratingFloat32=" + + ratingFloat32 + + ", ratingFloat64=" + + ratingFloat64 + + ", approval=" + + approval + + ", array1=" + + array1 + + ", array2=" + + array2 + + ", array3=" + + array3 + + ", array4=" + + array4 + + ", array5=" + + array5 + + ", nestedRecord=" + + nestedRecord + + ", nestedRecord2=" + + nestedRecord2 + + '}'; + } +} diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/MetadataRecord.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/MetadataRecord.java index 11362cc46..c0f7de137 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/MetadataRecord.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/MetadataRecord.java @@ -4,6 +4,7 @@ import java.io.IOException; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.Map; import java.util.Objects; import net.snowflake.client.jdbc.internal.fasterxml.jackson.annotation.JsonCreator; import net.snowflake.client.jdbc.internal.fasterxml.jackson.annotation.JsonProperty; @@ -20,7 +21,7 @@ public class MetadataRecord { private final Long createTime; private final Long logAppendTime; private final Long snowflakeConnectorPushTime; - private final String headers; + private final Map headers; private static final ObjectMapper MAPPER = new ObjectMapper(); @@ -35,7 +36,7 @@ public MetadataRecord( @JsonProperty("CreateTime") Long createTime, @JsonProperty("LogAppendTime") Long logAppendTime, @JsonProperty("SnowflakeConnectorPushTime") Long snowflakeConnectorPushTime, - @JsonProperty("headers") String headers) { + @JsonProperty("headers") Map headers) { this.offset = offset; this.topic = topic; this.partition = partition; @@ -95,7 +96,7 @@ public Long getSnowflakeConnectorPushTime() { return snowflakeConnectorPushTime; } - public String getHeaders() { + public Map getHeaders() { return headers; } @@ -154,9 +155,8 @@ public String toString() { + logAppendTime + ", snowflakeConnectorPushTime=" + snowflakeConnectorPushTime - + ", headers='" + + ", headers=" + headers - + '\'' + '}'; } diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/PrimitiveJsonRecord.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/PrimitiveJsonRecord.java index 3b4a9983a..a195e21ed 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/PrimitiveJsonRecord.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/PrimitiveJsonRecord.java @@ -17,6 +17,68 @@ public class PrimitiveJsonRecord { + public static final PrimitiveJsonRecord primitiveJsonRecordValueExample = + new PrimitiveJsonRecord(8L, 16L, 32L, 64L, "dogs are the best", 0.5, 0.25, true); + public static final PrimitiveJsonRecord emptyPrimitiveJsonRecordValueExample = + new PrimitiveJsonRecord(0L, 0L, 0L, 0L, null, 0.0, 0.0, false); + + public static final String primitiveJsonExample = + "{" + + " \"id_int8\": 8," + + " \"id_int16\": 16," + + " \"id_int32\": 32," + + " \"id_int64\": 64," + + " \"description\": \"dogs are the best\"," + + " \"rating_float32\": 0.5," + + " \"rating_float64\": 0.25," + + " \"approval\": true" + + "}"; + + public static final String primitiveJsonWithSchemaExample = + "{" + + " \"schema\": {" + + " \"type\": \"struct\"," + + " \"fields\": [" + + " {" + + " \"field\": \"id_int8\"," + + " \"type\": \"int8\"" + + " }," + + " {" + + " \"field\": \"id_int16\"," + + " \"type\": \"int16\"" + + " }," + + " {" + + " \"field\": \"id_int32\"," + + " \"type\": \"int32\"" + + " }," + + " {" + + " \"field\": \"id_int64\"," + + " \"type\": \"int64\"" + + " }," + + " {" + + " \"field\": \"description\"," + + " \"type\": \"string\"" + + " }," + + " {" + + " \"field\": \"rating_float32\"," + + " \"type\": \"float\"" + + " }," + + " {" + + " \"field\": \"rating_float64\"," + + " \"type\": \"double\"" + + " }," + + " {" + + " \"field\": \"approval\"," + + " \"type\": \"boolean\"" + + " }" + + " ]," + + " \"optional\": false," + + " \"name\": \"sf.kc.test\"" + + " }," + + " \"payload\": " + + primitiveJsonExample + + "}"; + private static final ObjectMapper MAPPER = new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false); @@ -41,8 +103,7 @@ public PrimitiveJsonRecord( @JsonProperty("id_int8") Long idInt8, @JsonProperty("id_int16") Long idInt16, @JsonProperty("id_int32") Long idInt32, - // FIXME: there is currently some bug in Iceberg when storing int64 values - // @JsonProperty("id_int64") Long idInt64, + @JsonProperty("id_int64") Long idInt64, @JsonProperty("description") String description, @JsonProperty("rating_float32") Double ratingFloat32, @JsonProperty("rating_float64") Double ratingFloat64, @@ -50,7 +111,7 @@ public PrimitiveJsonRecord( this.idInt8 = idInt8; this.idInt16 = idInt16; this.idInt32 = idInt32; - this.idInt64 = 64L; + this.idInt64 = idInt64; this.description = description; this.ratingFloat32 = ratingFloat32; this.ratingFloat64 = ratingFloat64; @@ -67,8 +128,7 @@ public static List> fromSchematizedResul resultSet.getLong("ID_INT8"), resultSet.getLong("ID_INT16"), resultSet.getLong("ID_INT32"), - // FIXME: there is currently some bug in Iceberg when storing int64 values - // resultSet.getLong("ID_INT64"), + resultSet.getLong("ID_INT64"), resultSet.getString("DESCRIPTION"), resultSet.getDouble("RATING_FLOAT32"), resultSet.getDouble("RATING_FLOAT64"),