Skip to content

Commit

Permalink
Fix mapping of headers
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-wtrefon committed Oct 23, 2024
1 parent 6ef1ef7 commit 7a3c56d
Showing 1 changed file with 40 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,29 @@
import com.snowflake.kafka.connector.Utils;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.stream.Collectors;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.type.TypeReference;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.NumericNode;

class IcebergTableStreamingRecordMapper implements StreamingRecordMapper {
private final ObjectMapper mapper;

private static final TypeReference<Map<String, Object>> OBJECTS_MAP_TYPE_REFERENCE =
new TypeReference<Map<String, Object>>() {};

private static final TypeReference<Map<String, String>> HEADERS_MAP_TYPE_REFERENCE =
new TypeReference<Map<String, String>>() {};

public IcebergTableStreamingRecordMapper(ObjectMapper objectMapper) {
this.mapper = objectMapper;
}

@Override
public Map<String, Object> processSnowflakeRecord(
SnowflakeTableRow row, boolean schematizationEnabled, boolean includeAllMetadata) {
SnowflakeTableRow row, boolean schematizationEnabled, boolean includeAllMetadata)
throws JsonProcessingException {
final Map<String, Object> streamingIngestRow = new HashMap<>();
for (JsonNode node : row.getContent().getData()) {
if (schematizationEnabled) {
Expand Down Expand Up @@ -59,13 +60,45 @@ private Map<String, Object> getMapForSchematization(JsonNode node) {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

private Map<String, Object> getMapForMetadata(JsonNode metadataNode) {
private Map<String, Object> getMapForMetadata(JsonNode metadataNode)
throws JsonProcessingException {
Map<String, Object> values = mapper.convertValue(metadataNode, OBJECTS_MAP_TYPE_REFERENCE);
// we don't want headers to be serialized as Map<String, Object> so we overwrite it as
// Map<String, String>
Map<String, String> headers =
mapper.convertValue(metadataNode.findValue(HEADERS), HEADERS_MAP_TYPE_REFERENCE);
Map<String, String> headers = convertHeaders(metadataNode.findValue(HEADERS));
values.put(HEADERS, headers);
return values;
}

private Map<String, String> convertHeaders(JsonNode headersNode) throws JsonProcessingException {
final Map<String, String> headers = new HashMap<>();

if (headersNode.isEmpty()) {
return headers;
}

Iterator<String> fields = headersNode.fieldNames();
while (fields.hasNext()) {
String key = fields.next();
JsonNode valueNode = headersNode.get(key);
String value;
if (valueNode.isTextual()) {
value = valueNode.textValue();
} else if (valueNode.isNull()) {
value = null;
} else {
value = writeValueAsStringOrNan(valueNode);
}
headers.put(key, value);
}
return headers;
}

private String writeValueAsStringOrNan(JsonNode columnNode) throws JsonProcessingException {
if (columnNode instanceof NumericNode && ((NumericNode) columnNode).isNaN()) {
return "NaN";
} else {
return mapper.writeValueAsString(columnNode);
}
}
}

0 comments on commit 7a3c56d

Please sign in to comment.