Skip to content

Commit

Permalink
Refactor schema resolver
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-wtrefon committed Oct 3, 2024
1 parent c05791c commit 869aa4e
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ public ColumnInfos(String columnType, String comments) {
this.comments = comments;
}

public ColumnInfos(String columnType) {
this.columnType = columnType;
this.comments = null;
}

public String getColumnType() {
return columnType;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package com.snowflake.kafka.connector.internal.streaming.schemaevolution;

import com.google.common.collect.Maps;
import com.google.common.collect.Streams;
import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.internal.SnowflakeErrors;
import com.snowflake.kafka.connector.records.RecordService;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
Expand All @@ -30,40 +32,71 @@ protected TableSchemaResolver(ColumnTypeMapper columnTypeMapper) {
* With the list of columns, collect their data types from either the schema or the data itself
*
* @param record the sink record that contains the schema and actual data
* @param columnNames the names of the extra columns
* @param columnsToInclude the names of the columns to include in the schema
* @return a Map object where the key is column name and value is ColumnInfos
*/
public TableSchema resolveTableSchemaFromRecord(SinkRecord record, List<String> columnNames) {
if (columnNames == null) {
public TableSchema resolveTableSchemaFromRecord(
SinkRecord record, List<String> columnsToInclude) {
if (columnsToInclude == null) {
return new TableSchema(new HashMap<>());
}
Map<String, ColumnInfos> columnToType = new HashMap<>();
Map<String, ColumnInfos> schemaMap = getSchemaMapFromRecord(record);

JsonNode recordNode = RecordService.convertToJson(record.valueSchema(), record.value(), true);
Set<String> columnNamesSet = new HashSet<>(columnNames);

Iterator<Map.Entry<String, JsonNode>> fields = recordNode.fields();
while (fields.hasNext()) {
Map.Entry<String, JsonNode> field = fields.next();
String colName = Utils.quoteNameIfNeeded(field.getKey());
if (columnNamesSet.contains(colName)) {
ColumnInfos columnInfos;
if (schemaMap.isEmpty()) {
// No schema associated with the record, we will try to infer it based on the data
columnInfos = new ColumnInfos(inferDataTypeFromJsonObject(field.getValue()), null);
} else {
// Get from the schema
columnInfos = schemaMap.get(field.getKey());
if (columnInfos == null) {
// only when the type of the value is unrecognizable for JAVA
throw SnowflakeErrors.ERROR_5022.getException(
"column: " + field.getKey() + " schemaMap: " + schemaMap);
}
}
columnToType.put(colName, columnInfos);
}
Set<String> columnNamesSet = new HashSet<>(columnsToInclude);

if (hasSchema(record)) {
return getTableSchemaFromRecordSchema(recordNode, columnNamesSet, record);
} else {
return getTableSchemaFromJson(recordNode, columnNamesSet);
}
return new TableSchema(columnToType);
}

private boolean hasSchema(SinkRecord record) {
return record.valueSchema() != null
&& record.valueSchema().fields() != null
&& !record.valueSchema().fields().isEmpty();
}

private TableSchema getTableSchemaFromRecordSchema(
JsonNode recordNode, Set<String> columnNamesSet, SinkRecord record) {
Map<String, ColumnInfos> schemaMap = getFullSchemaMapFromRecord(record);
Map<String, ColumnInfos> columnsInferredFromSchema =
Streams.stream(recordNode.fields())
.map(ColumnValuePair::of)
.filter(pair -> columnNamesSet.contains(pair.getQuotedColumnName()))
.peek(
field -> {
if (!schemaMap.containsKey(field.getColumnName())) {
// only when the type of the value is unrecognizable for JAVA
throw SnowflakeErrors.ERROR_5022.getException(
"column: " + field.getColumnName() + " schemaMap: " + schemaMap);
}
})
.map(
field ->
Maps.immutableEntry(
Utils.quoteNameIfNeeded(field.getQuotedColumnName()),
schemaMap.get(field.getColumnName())))
.collect(
Collectors.toMap(
Map.Entry::getKey, Map.Entry::getValue, (oldValue, newValue) -> newValue));
return new TableSchema(columnsInferredFromSchema);
}

private TableSchema getTableSchemaFromJson(JsonNode recordNode, Set<String> columnNamesSet) {
Map<String, ColumnInfos> columnsInferredFromJson =
Streams.stream(recordNode.fields())
.map(ColumnValuePair::of)
.filter(pair -> columnNamesSet.contains(pair.getQuotedColumnName()))
.map(
pair ->
Maps.immutableEntry(
pair.getQuotedColumnName(),
new ColumnInfos(inferDataTypeFromJsonObject(pair.getJsonNode()))))
.collect(
Collectors.toMap(
Map.Entry::getKey, Map.Entry::getValue, (oldValue, newValue) -> newValue));
return new TableSchema(columnsInferredFromJson);
}

/**
Expand All @@ -72,7 +105,7 @@ public TableSchema resolveTableSchemaFromRecord(SinkRecord record, List<String>
* @param record the sink record that contains the schema and actual data
* @return a Map object where the key is column name and value is ColumnInfos
*/
private Map<String, ColumnInfos> getSchemaMapFromRecord(SinkRecord record) {
private Map<String, ColumnInfos> getFullSchemaMapFromRecord(SinkRecord record) {
Map<String, ColumnInfos> schemaMap = new HashMap<>();
Schema schema = record.valueSchema();
if (schema != null && schema.fields() != null) {
Expand Down Expand Up @@ -104,4 +137,32 @@ private String inferDataTypeFromJsonObject(JsonNode value) {
// Passing null to schemaName when there is no schema information
return columnTypeMapper.mapToColumnType(schemaType);
}

private static class ColumnValuePair {
private final String columnName;
private final String quotedColumnName;
private final JsonNode jsonNode;

public static ColumnValuePair of(Map.Entry<String, JsonNode> field) {
return new ColumnValuePair(field.getKey(), field.getValue());
}

private ColumnValuePair(String columnName, JsonNode jsonNode) {
this.columnName = columnName;
this.quotedColumnName = Utils.quoteNameIfNeeded(columnName);
this.jsonNode = jsonNode;
}

public String getColumnName() {
return columnName;
}

public String getQuotedColumnName() {
return quotedColumnName;
}

public JsonNode getJsonNode() {
return jsonNode;
}
}
}

0 comments on commit 869aa4e

Please sign in to comment.