Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1737840: Adapt record mapping in RecordService #969

Merged
5 changes: 5 additions & 0 deletions src/main/java/com/snowflake/kafka/connector/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,11 @@ public static boolean isIcebergEnabled(Map<String, String> config) {
return Boolean.parseBoolean(config.get(ICEBERG_ENABLED));
}

public static boolean isSchematizationEnabled(Map<String, String> config) {
return Boolean.parseBoolean(
config.get(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG));
}

/**
* @param config config with applied default values
* @return role specified in rhe config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryPipeStatus;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
import com.snowflake.kafka.connector.records.RecordService;
import com.snowflake.kafka.connector.records.RecordServiceFactory;
import com.snowflake.kafka.connector.records.SnowflakeJsonSchema;
import com.snowflake.kafka.connector.records.SnowflakeMetadataConfig;
import com.snowflake.kafka.connector.records.SnowflakeRecordContent;
Expand Down Expand Up @@ -112,7 +113,7 @@ class SnowflakeSinkServiceV1 implements SnowflakeSinkService {
this.conn = conn;
isStopped = false;
this.telemetryService = conn.getTelemetryClient();
this.recordService = new RecordService();
this.recordService = RecordServiceFactory.createRecordService(false, false);
this.topic2TableMap = new HashMap<>();

// Setting the default value in constructor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.snowflake.kafka.connector.internal.streaming.telemetry.SnowflakeTelemetryChannelStatus;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
import com.snowflake.kafka.connector.records.RecordService;
import com.snowflake.kafka.connector.records.RecordServiceFactory;
import com.snowflake.kafka.connector.records.SnowflakeJsonSchema;
import com.snowflake.kafka.connector.records.SnowflakeRecordContent;
import dev.failsafe.Failsafe;
Expand Down Expand Up @@ -207,7 +208,7 @@ public BufferedTopicPartitionChannel(
kafkaRecordErrorReporter,
sinkTaskContext,
conn,
new RecordService(),
RecordServiceFactory.createRecordService(false, false),
telemetryService,
false,
null,
Expand Down Expand Up @@ -278,8 +279,7 @@ public BufferedTopicPartitionChannel(
!Strings.isNullOrEmpty(StreamingUtils.getDlqTopicName(this.sfConnectorConfig));

/* Schematization related properties */
this.enableSchematization =
this.recordService.setAndGetEnableSchematizationFromConfig(sfConnectorConfig);
this.enableSchematization = Utils.isSchematizationEnabled(this.sfConnectorConfig);

this.enableSchemaEvolution = this.enableSchematization && hasSchemaEvolutionPermission;
this.schemaEvolutionService = schemaEvolutionService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.snowflake.kafka.connector.internal.streaming.telemetry.SnowflakeTelemetryChannelStatus;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
import com.snowflake.kafka.connector.records.RecordService;
import com.snowflake.kafka.connector.records.RecordServiceFactory;
import com.snowflake.kafka.connector.records.SnowflakeJsonSchema;
import com.snowflake.kafka.connector.records.SnowflakeRecordContent;
import dev.failsafe.Failsafe;
Expand Down Expand Up @@ -178,7 +179,7 @@ public DirectTopicPartitionChannel(
kafkaRecordErrorReporter,
sinkTaskContext,
conn,
new RecordService(),
RecordServiceFactory.createRecordService(false, false),
telemetryService,
false,
null,
Expand Down Expand Up @@ -243,8 +244,7 @@ public DirectTopicPartitionChannel(
!Strings.isNullOrEmpty(StreamingUtils.getDlqTopicName(this.sfConnectorConfig));

/* Schematization related properties */
this.enableSchematization =
this.recordService.setAndGetEnableSchematizationFromConfig(sfConnectorConfig);
this.enableSchematization = Utils.isSchematizationEnabled(this.sfConnectorConfig);

this.enableSchemaEvolution = this.enableSchematization && hasSchemaEvolutionPermission;
this.schemaEvolutionService = schemaEvolutionService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.snowflake.kafka.connector.internal.streaming.schemaevolution.snowflake.SnowflakeSchemaEvolutionService;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
import com.snowflake.kafka.connector.records.RecordService;
import com.snowflake.kafka.connector.records.RecordServiceFactory;
import com.snowflake.kafka.connector.records.SnowflakeMetadataConfig;
import com.snowflake.kafka.connector.streaming.iceberg.IcebergInitService;
import com.snowflake.kafka.connector.streaming.iceberg.IcebergTableSchemaValidator;
Expand Down Expand Up @@ -137,7 +138,10 @@ public SnowflakeSinkServiceV2(
this.flushTimeSeconds = StreamingUtils.STREAMING_BUFFER_FLUSH_TIME_DEFAULT_SEC;
this.conn = conn;
this.telemetryService = conn.getTelemetryClient();
this.recordService = new RecordService();
boolean schematizationEnabled = Utils.isSchematizationEnabled(connectorConfig);
this.recordService =
RecordServiceFactory.createRecordService(
Utils.isIcebergEnabled(connectorConfig), schematizationEnabled);
this.icebergTableSchemaValidator = new IcebergTableSchemaValidator(conn);
this.icebergInitService = new IcebergInitService(conn);
this.schemaEvolutionService =
Expand All @@ -153,9 +157,7 @@ public SnowflakeSinkServiceV2(

this.connectorConfig = connectorConfig;

this.enableSchematization =
this.recordService.setAndGetEnableSchematizationFromConfig(this.connectorConfig);
this.recordService.setIcebergEnabledFromConfig(this.connectorConfig);
this.enableSchematization = schematizationEnabled;

this.closeChannelsInParallel =
Optional.ofNullable(connectorConfig.get(SNOWPIPE_STREAMING_CLOSE_CHANNELS_IN_PARALLEL))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package com.snowflake.kafka.connector.records;

import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_CONTENT;
import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_METADATA;
import static com.snowflake.kafka.connector.records.RecordService.*;

import com.snowflake.kafka.connector.Utils;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.stream.Collectors;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.type.TypeReference;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.NumericNode;

class IcebergTableStreamingRecordMapper implements StreamingRecordMapper {
private final ObjectMapper mapper;

private static final TypeReference<Map<String, Object>> OBJECTS_MAP_TYPE_REFERENCE =
new TypeReference<Map<String, Object>>() {};

public IcebergTableStreamingRecordMapper(ObjectMapper objectMapper) {
this.mapper = objectMapper;
}

@Override
public Map<String, Object> processSnowflakeRecord(
SnowflakeTableRow row, boolean schematizationEnabled, boolean includeAllMetadata)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe just boolean includeMetadata?

throws JsonProcessingException {
final Map<String, Object> streamingIngestRow = new HashMap<>();
for (JsonNode node : row.getContent().getData()) {
if (schematizationEnabled) {
streamingIngestRow.putAll(getMapForSchematization(node));
} else {
streamingIngestRow.put(TABLE_COLUMN_CONTENT, getMapForNoSchematization(node));
}
}
if (includeAllMetadata) {
streamingIngestRow.put(TABLE_COLUMN_METADATA, getMapForMetadata(row.getMetadata()));
}
return streamingIngestRow;
}

private Map<String, Object> getMapForNoSchematization(JsonNode node) {
return mapper.convertValue(node, OBJECTS_MAP_TYPE_REFERENCE);
}

private Map<String, Object> getMapForSchematization(JsonNode node) {
// we need to quote the keys on the first level of the map as they are column names in the table
// the rest must stay as is as the nested objects are not column names but fields name with case
// sensitivity
return mapper.convertValue(node, OBJECTS_MAP_TYPE_REFERENCE).entrySet().stream()
.map(
entry ->
new AbstractMap.SimpleEntry<>(
Utils.quoteNameIfNeeded(entry.getKey()), entry.getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

private Map<String, Object> getMapForMetadata(JsonNode metadataNode)
throws JsonProcessingException {
Map<String, Object> values = mapper.convertValue(metadataNode, OBJECTS_MAP_TYPE_REFERENCE);
// we don't want headers to be serialized as Map<String, Object> so we overwrite it as
// Map<String, String>
Map<String, String> headers = convertHeaders(metadataNode.findValue(HEADERS));
values.put(HEADERS, headers);
return values;
}

private Map<String, String> convertHeaders(JsonNode headersNode) throws JsonProcessingException {
final Map<String, String> headers = new HashMap<>();

if (headersNode == null || headersNode.isNull() || headersNode.isEmpty()) {
return headers;
}

Iterator<String> fields = headersNode.fieldNames();
while (fields.hasNext()) {
String key = fields.next();
JsonNode valueNode = headersNode.get(key);
String value;
if (valueNode.isTextual()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extract String getTextualValue(JsonNode node) and reuse in SnowflakeTableStreamingRecordMapper?

value = valueNode.textValue();
} else if (valueNode.isNull()) {
value = null;
} else {
value = writeValueAsStringOrNan(valueNode);
}
headers.put(key, value);
}
return headers;
}

private String writeValueAsStringOrNan(JsonNode columnNode) throws JsonProcessingException {
if (columnNode instanceof NumericNode && ((NumericNode) columnNode).isNaN()) {
return "NaN";
} else {
return mapper.writeValueAsString(columnNode);
}
}
}
Loading
Loading